From 1b9678ed12354b950dff42b2b87c72be340d9574 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Wed, 13 Nov 2024 09:22:55 +0100 Subject: [PATCH] keep track of first offset and timestamp per segment (#817) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Saves a hash with the first offset/timestamp for each segment, greatly reducing the time (and memory used) to find a message by offset and timestamps. --------- Co-authored-by: Carl Hörberg --- CHANGELOG.md | 4 ++ .../amqp/queue/stream_queue_message_store.cr | 37 ++++++++++++++++++- 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fe7bdb491f..e16d8651cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Remove the 'reset vhost' feature [#822](https://github.com/cloudamqp/lavinmq/pull/822/files) +### Added + +- Added some indexing for streams, greatly increasing performance when looking up by offset or timestamp [#817](https://github.com/cloudamqp/lavinmq/pull/817) + ## [2.0.0] - 2024-10-31 With the release of 2.0.0 we introduce High Availablility for LavinMQ in the form of clustering. With clustering, LavinMQ replicates data between nodes with our own replication protocol, and uses etcd for leader election. See [this post](https://lavinmq.com/blog/lavinmq-high-availability) in the LavinMQ blog or the [readme](https://github.com/cloudamqp/lavinmq?tab=readme-ov-file#clustering) for more information about clustering. diff --git a/src/lavinmq/amqp/queue/stream_queue_message_store.cr b/src/lavinmq/amqp/queue/stream_queue_message_store.cr index 12a3a85ed6..44a1adc8c7 100644 --- a/src/lavinmq/amqp/queue/stream_queue_message_store.cr +++ b/src/lavinmq/amqp/queue/stream_queue_message_store.cr @@ -9,10 +9,13 @@ module LavinMQ::AMQP property max_age : Time::Span | Time::MonthSpan | Nil getter last_offset : Int64 @segment_last_ts = Hash(UInt32, Int64).new(0i64) # used for max-age + @offset_index = Hash(UInt32, Int64).new # segment_id => offset of first msg + @timestamp_index = Hash(UInt32, Int64).new # segment_id => ts of first msg def initialize(*args, **kwargs) super @last_offset = get_last_offset + build_segment_indexes drop_overflow end @@ -62,7 +65,7 @@ module LavinMQ::AMQP end private def find_offset_in_segments(offset : Int | Time) : Tuple(Int64, UInt32, UInt32) - segment = @segments.first_key + segment = offset_index_lookup(offset) pos = 4u32 msg_offset = 0i64 loop do @@ -88,6 +91,23 @@ module LavinMQ::AMQP {msg_offset, segment, pos} end + private def offset_index_lookup(offset) : UInt32 + seg = @segments.first_key + case offset + when Int + @offset_index.each do |seg_id, first_seg_offset| + break if first_seg_offset > offset + seg = seg_id + end + when Time + @timestamp_index.each do |seg_id, first_seg_ts| + break if Time.unix_ms(first_seg_ts) > offset + seg = seg_id + end + end + seg + end + def shift?(consumer : AMQP::StreamConsumer) : Envelope? raise ClosedError.new if @closed @@ -147,6 +167,8 @@ module LavinMQ::AMQP private def open_new_segment(next_msg_size = 0) : MFile super.tap do drop_overflow + @offset_index[@segments.last_key] = @last_offset + 1 + @timestamp_index[@segments.last_key] = RoughTime.unix_ms end end @@ -178,6 +200,8 @@ module LavinMQ::AMQP msg_count = @segment_msg_count.delete(seg_id) @size -= msg_count if msg_count @segment_last_ts.delete(seg_id) + @offset_index.delete(seg_id) + @timestamp_index.delete(seg_id) @bytesize -= mfile.size - 4 mfile.delete.close @replicator.try &.delete_file(mfile.path) @@ -211,6 +235,17 @@ module LavinMQ::AMQP private def offset_from_headers(headers) : Int64 headers.not_nil!("Message lacks headers")["x-stream-offset"].as(Int64) end + + private def build_segment_indexes + @segments.each do |seg_id, mfile| + msg = BytesMessage.from_bytes(mfile.to_slice + 4u32) + @offset_index[seg_id] = offset_from_headers(msg.properties.headers) + @timestamp_index[seg_id] = msg.timestamp + rescue IndexError + @offset_index[seg_id] = @last_offset + @timestamp_index[seg_id] = RoughTime.unix_ms + end + end end end end