Skip to content

Commit

Permalink
fixes to match changes from main
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorerlingsson committed Nov 12, 2024
1 parent cf1cca7 commit f7aad09
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 121 deletions.
26 changes: 13 additions & 13 deletions spec/stream_queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ module StreamQueueSpecHelpers
end
end

describe LavinMQ::StreamQueue do
describe LavinMQ::AMQP::StreamQueue do
stream_queue_args = LavinMQ::AMQP::Table.new({"x-queue-type": "stream"})

describe "Consume" do
Expand Down Expand Up @@ -263,14 +263,14 @@ describe LavinMQ::StreamQueue do
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.store_consumer_offset(tag_prefix + i.to_s, offset)
end
msg_store.close
sleep 0.1

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.last_offset_by_consumer_tag(tag_prefix + i.to_s).should eq offset
end
Expand All @@ -286,7 +286,7 @@ describe LavinMQ::StreamQueue do
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each do |offset|
msg_store.store_consumer_offset(consumer_tag, offset)
end
Expand All @@ -305,13 +305,13 @@ describe LavinMQ::StreamQueue do
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each do |offset|
msg_store.store_consumer_offset(consumer_tag, offset)
end
msg_store.close

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets.last
bytesize = consumer_tag.bytesize + 1 + 8
msg_store.@consumer_offsets.size.should eq bytesize
Expand All @@ -326,7 +326,7 @@ describe LavinMQ::StreamQueue do
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
bytesize = consumer_tag.bytesize + 1 + 8

offsets = (LavinMQ::Config.instance.segment_size / bytesize).to_i32 + 1
Expand Down Expand Up @@ -403,7 +403,7 @@ describe LavinMQ::StreamQueue do
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.store_consumer_offset(tag_prefix + i.to_s, offset)
end
Expand All @@ -412,7 +412,7 @@ describe LavinMQ::StreamQueue do
msg_store.close
sleep 0.1

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(tag_prefix + 1.to_s).should eq nil
msg_store.last_offset_by_consumer_tag(tag_prefix + 0.to_s).should eq offsets[0]
msg_store.close
Expand Down Expand Up @@ -444,15 +444,15 @@ describe LavinMQ::StreamQueue do
msgs.receive
end

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq 2

with_channel(s) do |ch|
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
2.times { q.publish_confirm msg_body }
end

msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq nil
end
end
Expand All @@ -477,7 +477,7 @@ describe LavinMQ::StreamQueue do

sleep 0.1
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(c_tag).should eq nil
end
end
Expand All @@ -488,7 +488,7 @@ describe LavinMQ::StreamQueue do
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
one_offset_bytesize = "#{consumer_tag_prefix}#{1000}".bytesize + 1 + 8
offsets = (LavinMQ::Config.instance.segment_size / one_offset_bytesize).to_i32 + 1
bytesize = 0
Expand Down
3 changes: 2 additions & 1 deletion src/lavinmq/amqp/queue/stream_queue_message_store.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "./stream_queue"
require "../stream_consumer"

module LavinMQ::AMQP
class StreamQueue < DurableQueue
Expand Down Expand Up @@ -170,7 +171,7 @@ module LavinMQ::AMQP
@replicator.try &.replace_file @consumer_offsets.path
end

def shift?(consumer : Client::Channel::StreamConsumer) : Envelope?
def shift?(consumer : AMQP::StreamConsumer) : Envelope?
raise ClosedError.new if @closed

if env = shift_requeued(consumer.requeued)
Expand Down
14 changes: 12 additions & 2 deletions src/lavinmq/amqp/stream_consumer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ module LavinMQ
property segment : UInt32
property pos : UInt32
getter requeued = Deque(SegmentPosition).new
@track_offset = false

def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume)
@tag = frame.consumer_tag
validate_preconditions(frame)
offset = frame.arguments["x-stream-offset"]?
@offset, @segment, @pos = stream_queue.find_offset(offset)
@offset, @segment, @pos = stream_queue.find_offset(offset, @tag, @track_offset)
super
end

Expand All @@ -34,7 +36,10 @@ module LavinMQ
raise LavinMQ::Error::PreconditionFailed.new("x-priority not supported on stream queues")
end
case frame.arguments["x-stream-offset"]?
when Nil, Int, Time, "first", "next", "last"
when Nil
@track_offset = true unless @tag.starts_with?("amq.ctag-")
when Int, Time, "first", "next", "last"
@track_offset = true if frame.arguments["x-stream-automatic-offset-tracking"]?
else raise LavinMQ::Error::PreconditionFailed.new("x-stream-offset must be an integer, a timestamp, 'first', 'next' or 'last'")
end
end
Expand Down Expand Up @@ -82,6 +87,11 @@ module LavinMQ
@queue.as(StreamQueue)
end

def ack(sp)
stream_queue.store_consumer_offset(@tag, @offset) if @track_offset
super
end

def reject(sp, requeue : Bool)
super
if requeue
Expand Down
105 changes: 0 additions & 105 deletions src/lavinmq/client/channel/stream_consumer.cr

This file was deleted.

0 comments on commit f7aad09

Please sign in to comment.