Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic offset tracking for stream queues #661

Merged
merged 73 commits into from
Feb 21, 2025
Merged
Changes from 1 commit
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
b6c4b91
WIP adds automatic offset tracking for streams
viktorerlingsson Apr 16, 2024
64408ae
specs
viktorerlingsson Apr 16, 2024
be71216
add specs for edge cases. refactor specs so they are more readable
viktorerlingsson Apr 17, 2024
c8f9275
format
viktorerlingsson Apr 17, 2024
9716175
refactor, dont init consumer tracking stuff unless needed. resize fil…
viktorerlingsson Apr 17, 2024
c5eab18
add spec that checks that only one entry is saved per consumer tag
viktorerlingsson Apr 17, 2024
8e8290d
format
viktorerlingsson Apr 17, 2024
8a5745d
lint
viktorerlingsson Apr 17, 2024
65c4fc1
add function to remove consumer tags from file
viktorerlingsson Apr 22, 2024
4055f8f
spec for removing consumer tags
viktorerlingsson Apr 22, 2024
76d0d97
save length of ctag in file, dont use space as deliminator
viktorerlingsson Apr 22, 2024
c8fbc9a
format
viktorerlingsson Apr 22, 2024
b6bb89a
lint
viktorerlingsson Apr 22, 2024
52919a3
remove methods, use instance variables directly. add cleanup function
viktorerlingsson Apr 23, 2024
589f868
add spec for cleanup
viktorerlingsson Apr 23, 2024
77d9a38
raise before updating instance variable
viktorerlingsson Apr 23, 2024
8b29074
refactor
viktorerlingsson Apr 23, 2024
affa800
remove comment
viktorerlingsson Apr 23, 2024
7307e77
refactor
viktorerlingsson Apr 23, 2024
451b8f9
refactor
viktorerlingsson Apr 23, 2024
ff12ef7
format
viktorerlingsson Apr 23, 2024
e66145b
set size to 32768 for offsets file, change path name
viktorerlingsson Apr 24, 2024
1a8763f
remove comment
viktorerlingsson Apr 24, 2024
d0f343a
refactor restore_consumer_offset_positions
viktorerlingsson Apr 24, 2024
e7d9592
remove unused var
viktorerlingsson Apr 24, 2024
9cb22bb
save_offset_by_consumer_tag -> update_consumer_offset
viktorerlingsson Apr 24, 2024
b42be91
write_new_ctag_to_file -> store_consumer_offset
viktorerlingsson Apr 24, 2024
f7cf45e
refactor update_consumer_offset
viktorerlingsson Apr 24, 2024
74dc942
add option to get a writeable slice from mfile
viktorerlingsson Apr 24, 2024
c0f1021
encode directly into mmap. fix bug with pos/size
viktorerlingsson Apr 24, 2024
148fe88
update spec
viktorerlingsson Apr 24, 2024
af6172f
more strict with types
viktorerlingsson Apr 24, 2024
e16682d
format
viktorerlingsson Apr 24, 2024
2567e4c
lint
viktorerlingsson Apr 24, 2024
83d5536
lint
viktorerlingsson Apr 24, 2024
e7ace22
String#bytesize
viktorerlingsson May 16, 2024
85d9d39
refactor store_consumer_offset and restore_consumer_offset_positions
viktorerlingsson May 16, 2024
9019d1e
refactor cleanup_consumer_offsets
viktorerlingsson May 16, 2024
9c97f9b
replace offset file instead of delete
viktorerlingsson May 16, 2024
a22ec07
lint
viktorerlingsson May 16, 2024
1a443f3
lint
viktorerlingsson May 16, 2024
778d299
dont track offsets when consumer tag is generated
viktorerlingsson May 23, 2024
d7bc434
remove unused code
viktorerlingsson May 23, 2024
15564b6
cleanup consumer offsets when dropping overflow
viktorerlingsson May 28, 2024
dcb8556
format
viktorerlingsson May 28, 2024
9face38
lint
viktorerlingsson May 28, 2024
f465889
handle large messages causing first segment to be empty
viktorerlingsson May 30, 2024
724ded7
cleanup spec
viktorerlingsson May 30, 2024
b4fc6aa
add option to use broker tracking when x-stream-offset is set by usin…
viktorerlingsson Jun 4, 2024
7018f31
add spec
viktorerlingsson Jun 4, 2024
5796d2b
no need to truncate mfile, it's being deleted
viktorerlingsson Jun 17, 2024
4246e91
x-stream-use-automatic-offset -> x-stream-automatic-offset-tracking
viktorerlingsson Jun 17, 2024
a85a6ac
implement rename in mfile
viktorerlingsson Jun 17, 2024
7fc3f85
expand consumer offsets file if full
viktorerlingsson Jun 17, 2024
508762c
remove unused code
viktorerlingsson Jun 17, 2024
c1c0c5f
use LittleEndian
viktorerlingsson Jun 17, 2024
455fe9d
remove instance variables
viktorerlingsson Jun 19, 2024
fcc0498
use old_consumer_offsets.path
viktorerlingsson Jun 19, 2024
cc8c1b7
start reading at pos=4 after IndexError in offset_at
viktorerlingsson Jun 19, 2024
e29a4c8
update specs to start amqp servers where needed
viktorerlingsson Jun 19, 2024
c3a7266
ameba:disable Metrics/CyclomaticComplexity for find_offset
viktorerlingsson Jun 19, 2024
9b91824
include tag size prefix byte in consumer_offset_file_full?
viktorerlingsson Jun 20, 2024
17ac96a
only append consumer offsets file, compact when full, expand if still…
viktorerlingsson Jun 26, 2024
f22d5cb
set capacity of consumer offsets file to 1000*current size when compa…
viktorerlingsson Jun 27, 2024
04c7650
replicate consumer offsets file. remove unused code
viktorerlingsson Jun 27, 2024
5222c9e
fixes to match changes from main
viktorerlingsson Nov 12, 2024
397222e
lint
viktorerlingsson Nov 12, 2024
18055c7
add .seconds to timespans
viktorerlingsson Nov 15, 2024
d3d69ab
read_only is no longer used
viktorerlingsson Jan 27, 2025
7e94d70
fix specs
viktorerlingsson Jan 27, 2025
9666a4f
handle false for x-stream-automatic-offset-tracking and add spec for …
viktorerlingsson Jan 27, 2025
e6fd1ee
fixup! handle false for x-stream-automatic-offset-tracking and add sp…
viktorerlingsson Jan 27, 2025
78b5caf
dont remove read_only from Bytes.new()
viktorerlingsson Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
lint
  • Loading branch information
viktorerlingsson committed Jan 27, 2025
commit 83d55369eadcdeaf62d720b4491a99cfab65b92b
9 changes: 4 additions & 5 deletions src/lavinmq/amqp/queue/stream_queue_message_store.cr
Original file line number Diff line number Diff line change
@@ -169,11 +169,10 @@ module LavinMQ::AMQP

def cleanup_consumer_offsets
offsets_to_save = Hash(String, Int64).new
lowest_offset_in_stream, _seg, _pos = offset_at(@segments.first_key, 4u32) # handle
@consumer_offset_positions.each do |ctag, pos|
offset = last_offset_by_consumer_tag(ctag).not_nil!
next if offset < lowest_offset_in_stream
# Other scenarios to remove?
lowest_offset_in_stream, _seg, _pos = offset_at(@segments.first_key, 4u32)
@consumer_offset_positions.each do |ctag, _pos|
offset = last_offset_by_consumer_tag(ctag)
next if !offset || offset < lowest_offset_in_stream
offsets_to_save[ctag] = offset
end