Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic offset tracking for stream queues #661

Open
wants to merge 72 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
b6c4b91
WIP adds automatic offset tracking for streams
viktorerlingsson Apr 16, 2024
64408ae
specs
viktorerlingsson Apr 16, 2024
be71216
add specs for edge cases. refactor specs so they are more readable
viktorerlingsson Apr 17, 2024
c8f9275
format
viktorerlingsson Apr 17, 2024
9716175
refactor, dont init consumer tracking stuff unless needed. resize fil…
viktorerlingsson Apr 17, 2024
c5eab18
add spec that checks that only one entry is saved per consumer tag
viktorerlingsson Apr 17, 2024
8e8290d
format
viktorerlingsson Apr 17, 2024
8a5745d
lint
viktorerlingsson Apr 17, 2024
65c4fc1
add function to remove consumer tags from file
viktorerlingsson Apr 22, 2024
4055f8f
spec for removing consumer tags
viktorerlingsson Apr 22, 2024
76d0d97
save length of ctag in file, dont use space as deliminator
viktorerlingsson Apr 22, 2024
c8fbc9a
format
viktorerlingsson Apr 22, 2024
b6bb89a
lint
viktorerlingsson Apr 22, 2024
52919a3
remove methods, use instance variables directly. add cleanup function
viktorerlingsson Apr 23, 2024
589f868
add spec for cleanup
viktorerlingsson Apr 23, 2024
77d9a38
raise before updating instance variable
viktorerlingsson Apr 23, 2024
8b29074
refactor
viktorerlingsson Apr 23, 2024
affa800
remove comment
viktorerlingsson Apr 23, 2024
7307e77
refactor
viktorerlingsson Apr 23, 2024
451b8f9
refactor
viktorerlingsson Apr 23, 2024
ff12ef7
format
viktorerlingsson Apr 23, 2024
e66145b
set size to 32768 for offsets file, change path name
viktorerlingsson Apr 24, 2024
1a8763f
remove comment
viktorerlingsson Apr 24, 2024
d0f343a
refactor restore_consumer_offset_positions
viktorerlingsson Apr 24, 2024
e7d9592
remove unused var
viktorerlingsson Apr 24, 2024
9cb22bb
save_offset_by_consumer_tag -> update_consumer_offset
viktorerlingsson Apr 24, 2024
b42be91
write_new_ctag_to_file -> store_consumer_offset
viktorerlingsson Apr 24, 2024
f7cf45e
refactor update_consumer_offset
viktorerlingsson Apr 24, 2024
74dc942
add option to get a writeable slice from mfile
viktorerlingsson Apr 24, 2024
c0f1021
encode directly into mmap. fix bug with pos/size
viktorerlingsson Apr 24, 2024
148fe88
update spec
viktorerlingsson Apr 24, 2024
af6172f
more strict with types
viktorerlingsson Apr 24, 2024
e16682d
format
viktorerlingsson Apr 24, 2024
2567e4c
lint
viktorerlingsson Apr 24, 2024
83d5536
lint
viktorerlingsson Apr 24, 2024
e7ace22
String#bytesize
viktorerlingsson May 16, 2024
85d9d39
refactor store_consumer_offset and restore_consumer_offset_positions
viktorerlingsson May 16, 2024
9019d1e
refactor cleanup_consumer_offsets
viktorerlingsson May 16, 2024
9c97f9b
replace offset file instead of delete
viktorerlingsson May 16, 2024
a22ec07
lint
viktorerlingsson May 16, 2024
1a443f3
lint
viktorerlingsson May 16, 2024
778d299
dont track offsets when consumer tag is generated
viktorerlingsson May 23, 2024
d7bc434
remove unused code
viktorerlingsson May 23, 2024
15564b6
cleanup consumer offsets when dropping overflow
viktorerlingsson May 28, 2024
dcb8556
format
viktorerlingsson May 28, 2024
9face38
lint
viktorerlingsson May 28, 2024
f465889
handle large messages causing first segment to be empty
viktorerlingsson May 30, 2024
724ded7
cleanup spec
viktorerlingsson May 30, 2024
b4fc6aa
add option to use broker tracking when x-stream-offset is set by usin…
viktorerlingsson Jun 4, 2024
7018f31
add spec
viktorerlingsson Jun 4, 2024
5796d2b
no need to truncate mfile, it's being deleted
viktorerlingsson Jun 17, 2024
4246e91
x-stream-use-automatic-offset -> x-stream-automatic-offset-tracking
viktorerlingsson Jun 17, 2024
a85a6ac
implement rename in mfile
viktorerlingsson Jun 17, 2024
7fc3f85
expand consumer offsets file if full
viktorerlingsson Jun 17, 2024
508762c
remove unused code
viktorerlingsson Jun 17, 2024
c1c0c5f
use LittleEndian
viktorerlingsson Jun 17, 2024
455fe9d
remove instance variables
viktorerlingsson Jun 19, 2024
fcc0498
use old_consumer_offsets.path
viktorerlingsson Jun 19, 2024
cc8c1b7
start reading at pos=4 after IndexError in offset_at
viktorerlingsson Jun 19, 2024
e29a4c8
update specs to start amqp servers where needed
viktorerlingsson Jun 19, 2024
c3a7266
ameba:disable Metrics/CyclomaticComplexity for find_offset
viktorerlingsson Jun 19, 2024
9b91824
include tag size prefix byte in consumer_offset_file_full?
viktorerlingsson Jun 20, 2024
17ac96a
only append consumer offsets file, compact when full, expand if still…
viktorerlingsson Jun 26, 2024
f22d5cb
set capacity of consumer offsets file to 1000*current size when compa…
viktorerlingsson Jun 27, 2024
04c7650
replicate consumer offsets file. remove unused code
viktorerlingsson Jun 27, 2024
5222c9e
fixes to match changes from main
viktorerlingsson Nov 12, 2024
397222e
lint
viktorerlingsson Nov 12, 2024
18055c7
add .seconds to timespans
viktorerlingsson Nov 15, 2024
d3d69ab
read_only is no longer used
viktorerlingsson Jan 27, 2025
7e94d70
fix specs
viktorerlingsson Jan 27, 2025
9666a4f
handle false for x-stream-automatic-offset-tracking and add spec for …
viktorerlingsson Jan 27, 2025
e6fd1ee
fixup! handle false for x-stream-automatic-offset-tracking and add sp…
viktorerlingsson Jan 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 320 additions & 0 deletions spec/stream_queue_spec.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,38 @@
require "./spec_helper"
require "./../src/lavinmq/amqp/queue"

module StreamQueueSpecHelpers
def self.publish(s, queue_name, nr_of_messages)
args = {"x-queue-type": "stream"}
with_channel(s) do |ch|
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
nr_of_messages.times { |i| q.publish "m#{i}" }
end
end

def self.consume_one(s, queue_name, c_tag, c_args = AMQP::Client::Arguments.new)
args = {"x-queue-type": "stream"}
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, tag: c_tag, args: c_args) do |msg|
msgs.send msg
msg.ack
end
msgs.receive
end
end

def self.offset_from_headers(headers)
if headers
headers["x-stream-offset"].as(Int64)
else
fail("No headers found")
end
end
end

describe LavinMQ::AMQP::StreamQueue do
stream_queue_args = LavinMQ::AMQP::Table.new({"x-queue-type": "stream"})

Expand Down Expand Up @@ -330,4 +362,292 @@ describe LavinMQ::AMQP::StreamQueue do
end
end
end
describe "Automatic consumer offset tracking" do
it "resumes from last offset on reconnect" do
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
offset = 3

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, offset + 1)
offset.times { StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag) }
sleep 0.1.seconds

# consume again, should start from last offset automatically
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq offset + 1
end
end

it "reads offsets from file on init" do
queue_name = Random::Secure.hex
offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64]
tag_prefix = "ctag-"

with_amqp_server do |s|
vhost = s.vhosts["/"]
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.store_consumer_offset(tag_prefix + i.to_s, offset)
end
msg_store.close
wait_for { msg_store.@closed }

msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.last_offset_by_consumer_tag(tag_prefix + i.to_s).should eq offset
end
msg_store.close
end
end

it "appends consumer tag file" do
queue_name = Random::Secure.hex
offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64]
consumer_tag = "ctag-1"
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each do |offset|
msg_store.store_consumer_offset(consumer_tag, offset)
end
bytesize = consumer_tag.bytesize + 1 + 8
msg_store.@consumer_offsets.size.should eq bytesize*5
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets.last
msg_store.close
end
end

it "compacts consumer tag file on restart" do
queue_name = Random::Secure.hex
offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64]
consumer_tag = "ctag-1"
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each do |offset|
msg_store.store_consumer_offset(consumer_tag, offset)
end
msg_store.close

msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets.last
bytesize = consumer_tag.bytesize + 1 + 8
msg_store.@consumer_offsets.size.should eq bytesize
msg_store.close
end
end

it "compacts consumer tag file when full" do
queue_name = Random::Secure.hex
offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64]
consumer_tag = Random::Secure.hex(32)
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
bytesize = consumer_tag.bytesize + 1 + 8

offsets = (LavinMQ::Config.instance.segment_size / bytesize).to_i32 + 1
offsets.times do |i|
msg_store.store_consumer_offset(consumer_tag, i)
end
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets - 1
msg_store.@consumer_offsets.size.should eq bytesize*2
msg_store.close
end
end

it "does not track offset if x-stream-offset is set" do
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0})

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 2)
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1.seconds

# should consume the same message again since tracking was not saved from last consume
msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag)
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1
end
end

it "should not use saved offset if x-stream-offset is set" do
viktorerlingsson marked this conversation as resolved.
Show resolved Hide resolved
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0})

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 2)

# get message without x-stream-offset, tracks offset
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1.seconds

# consume with x-stream-offset set, should consume the same message again
msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1
end
end

it "should use saved offset if x-stream-offset & x-stream-automatic-offset-tracking is set" do
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0, "x-stream-automatic-offset-tracking": "true"})

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 2)

# tracks offset
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1.seconds

# should continue from tracked offset
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 2
end
end

it "should not use saved offset if x-stream-automatic-offset-tracking is false" do
queue_name = Random::Secure.hex
consumer_tag = Random::Secure.hex
c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0, "x-stream-automatic-offset-tracking": "false"})

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 2)

# does not track offset
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1.seconds

# should consume the same message again, no tracked offset
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
end
end

it "cleanup_consumer_offsets removes outdated offset" do
queue_name = Random::Secure.hex
offsets = [84_i64, -10_i64]
tag_prefix = "ctag-"

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)

data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
msg_store.store_consumer_offset(tag_prefix + i.to_s, offset)
end
sleep 0.1.seconds
msg_store.cleanup_consumer_offsets
msg_store.close
sleep 0.1.seconds

msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(tag_prefix + 1.to_s).should eq nil
msg_store.last_offset_by_consumer_tag(tag_prefix + 0.to_s).should eq offsets[0]
msg_store.close
end
end

it "runs cleanup when removing segment" do
consumer_tag = "ctag-1"
queue_name = Random::Secure.hex
args = {"x-queue-type": "stream", "x-max-length": 1}
msg_body = Bytes.new(LavinMQ::Config.instance.segment_size)

with_amqp_server do |s|
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)

with_channel(s) do |ch|
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
q.publish_confirm msg_body
end

with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, tag: consumer_tag) do |msg|
msgs.send msg
msg.ack
end
msgs.receive
end

msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq 2

with_channel(s) do |ch|
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
2.times { q.publish_confirm msg_body }
end

msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(consumer_tag).should eq nil
end
end

it "does not track offset if c-tag is auto-generated" do
queue_name = Random::Secure.hex

with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)
args = {"x-queue-type": "stream"}
c_tag = ""
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args))
msgs = Channel(AMQP::Client::DeliverMessage).new
c_tag = q.subscribe(no_ack: false) do |msg|
msgs.send msg
msg.ack
end
msgs.receive
end

sleep 0.1.seconds
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(c_tag).should eq nil
end
end

it "expands consumer offset file when needed" do
queue_name = Random::Secure.hex
consumer_tag_prefix = Random::Secure.hex(32)
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, 1)
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
one_offset_bytesize = "#{consumer_tag_prefix}1000".bytesize + 1 + 8
offsets = (LavinMQ::Config.instance.segment_size / one_offset_bytesize).to_i32 + 1
bytesize = 0
offsets.times do |i|
consumer_tag = "#{consumer_tag_prefix}#{i + 1000}"
msg_store.store_consumer_offset(consumer_tag, i + 1000)
bytesize += consumer_tag.bytesize + 1 + 8
end
msg_store.@consumer_offsets.size.should eq bytesize
msg_store.@consumer_offsets.size.should be > LavinMQ::Config.instance.segment_size
offsets.times do |i|
msg_store.last_offset_by_consumer_tag("#{consumer_tag_prefix}#{i + 1000}").should eq i + 1000
end
end
end
end
end
4 changes: 4 additions & 0 deletions src/lavinmq/amqp/queue/stream_queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ module LavinMQ::AMQP
end
end

def store_consumer_offset(consumer_tag : String, offset : Int64) : Nil
stream_queue_msg_store.store_consumer_offset(consumer_tag, offset)
end

# yield the next message in the ready queue
# returns true if a message was deliviered, false otherwise
# if we encouncer an unrecoverable ReadError, close queue
Expand Down
Loading
Loading