Skip to content

Commit

Permalink
handle false for x-stream-automatic-offset-tracking and add spec for …
Browse files Browse the repository at this point in the history
…it. Lint: Refactor validate_preconditions, move validate_stream_offset and validate_stream_filter to separate methods
  • Loading branch information
viktorerlingsson committed Jan 27, 2025
1 parent 7e94d70 commit 9666a4f
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 15 deletions.
29 changes: 24 additions & 5 deletions spec/stream_queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -504,19 +504,38 @@ describe LavinMQ::AMQP::StreamQueue do
it "should use saved offset if x-stream-offset & x-stream-automatic-offset-tracking is set" do
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0, "x-stream-automatic-offset-tracking": true})
c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0, "x-stream-automatic-offset-tracking": "true"})

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 2)

# get message without x-stream-offset, tracks offset
# tracks offset
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1.seconds

# consume with x-stream-offset set, should consume the same message again
msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 2
# should continue from tracked offset
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 2
end
end

it "should not use saved offset if x-stream-automatic-offset-tracking is false" do
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0, "x-stream-automatic-offset-tracking": "false"})

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 2)

# does not track offset
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1.seconds

# should consume the same message again, no tracked offset
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
end
end

Expand Down
36 changes: 26 additions & 10 deletions src/lavinmq/amqp/stream_consumer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,43 @@ module LavinMQ
if frame.arguments.has_key? "x-priority"
raise LavinMQ::Error::PreconditionFailed.new("x-priority not supported on stream queues")
end
validate_stream_offset(frame)
validate_stream_filter(frame.arguments["x-stream-filter"]?)
case match_unfiltered = frame.arguments["x-stream-match-unfiltered"]?
when Bool
@match_unfiltered = match_unfiltered
when Nil
# noop
else raise LavinMQ::Error::PreconditionFailed.new("x-stream-match-unfiltered must be a boolean")
end
end

private def validate_stream_offset(frame)
case frame.arguments["x-stream-offset"]?
when Nil
@track_offset = true unless @tag.starts_with?("amq.ctag-")
when Int, Time, "first", "next", "last"
@track_offset = true if frame.arguments["x-stream-automatic-offset-tracking"]?
case frame.arguments["x-stream-automatic-offset-tracking"]?
when Bool
@track_offset = frame.arguments["x-stream-automatic-offset-tracking"]?.as(Bool)
when String
@track_offset = frame.arguments["x-stream-automatic-offset-tracking"]? == "true"
end
else raise LavinMQ::Error::PreconditionFailed.new("x-stream-offset must be an integer, a timestamp, 'first', 'next' or 'last'")
end
case filter = frame.arguments["x-stream-filter"]?
end

private def validate_stream_filter(arg)
case arg
when String
@filter = filter.split(',').sort!
@filter = arg.split(',').sort!
when Nil
# noop
else raise LavinMQ::Error::PreconditionFailed.new("x-stream-filter-value must be a string")
end
case match_unfiltered = frame.arguments["x-stream-match-unfiltered"]?
when Bool
@match_unfiltered = match_unfiltered
when Nil
# noop
else raise LavinMQ::Error::PreconditionFailed.new("x-stream-match-unfiltered must be a boolean")
end
end

private def validate_offset_tracking(arg)
end

private def deliver_loop
Expand Down

0 comments on commit 9666a4f

Please sign in to comment.