diff --git a/CHANGELOG.md b/CHANGELOG.md index fe7bdb491f..e16d8651cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Remove the 'reset vhost' feature [#822](https://github.com/cloudamqp/lavinmq/pull/822/files) +### Added + +- Added some indexing for streams, greatly increasing performance when looking up by offset or timestamp [#817](https://github.com/cloudamqp/lavinmq/pull/817) + ## [2.0.0] - 2024-10-31 With the release of 2.0.0 we introduce High Availablility for LavinMQ in the form of clustering. With clustering, LavinMQ replicates data between nodes with our own replication protocol, and uses etcd for leader election. See [this post](https://lavinmq.com/blog/lavinmq-high-availability) in the LavinMQ blog or the [readme](https://github.com/cloudamqp/lavinmq?tab=readme-ov-file#clustering) for more information about clustering. diff --git a/src/lavinmq/amqp/queue/stream_queue_message_store.cr b/src/lavinmq/amqp/queue/stream_queue_message_store.cr index 12a3a85ed6..44a1adc8c7 100644 --- a/src/lavinmq/amqp/queue/stream_queue_message_store.cr +++ b/src/lavinmq/amqp/queue/stream_queue_message_store.cr @@ -9,10 +9,13 @@ module LavinMQ::AMQP property max_age : Time::Span | Time::MonthSpan | Nil getter last_offset : Int64 @segment_last_ts = Hash(UInt32, Int64).new(0i64) # used for max-age + @offset_index = Hash(UInt32, Int64).new # segment_id => offset of first msg + @timestamp_index = Hash(UInt32, Int64).new # segment_id => ts of first msg def initialize(*args, **kwargs) super @last_offset = get_last_offset + build_segment_indexes drop_overflow end @@ -62,7 +65,7 @@ module LavinMQ::AMQP end private def find_offset_in_segments(offset : Int | Time) : Tuple(Int64, UInt32, UInt32) - segment = @segments.first_key + segment = offset_index_lookup(offset) pos = 4u32 msg_offset = 0i64 loop do @@ -88,6 +91,23 @@ module LavinMQ::AMQP {msg_offset, segment, pos} end + private def offset_index_lookup(offset) : UInt32 + seg = @segments.first_key + case offset + when Int + @offset_index.each do |seg_id, first_seg_offset| + break if first_seg_offset > offset + seg = seg_id + end + when Time + @timestamp_index.each do |seg_id, first_seg_ts| + break if Time.unix_ms(first_seg_ts) > offset + seg = seg_id + end + end + seg + end + def shift?(consumer : AMQP::StreamConsumer) : Envelope? raise ClosedError.new if @closed @@ -147,6 +167,8 @@ module LavinMQ::AMQP private def open_new_segment(next_msg_size = 0) : MFile super.tap do drop_overflow + @offset_index[@segments.last_key] = @last_offset + 1 + @timestamp_index[@segments.last_key] = RoughTime.unix_ms end end @@ -178,6 +200,8 @@ module LavinMQ::AMQP msg_count = @segment_msg_count.delete(seg_id) @size -= msg_count if msg_count @segment_last_ts.delete(seg_id) + @offset_index.delete(seg_id) + @timestamp_index.delete(seg_id) @bytesize -= mfile.size - 4 mfile.delete.close @replicator.try &.delete_file(mfile.path) @@ -211,6 +235,17 @@ module LavinMQ::AMQP private def offset_from_headers(headers) : Int64 headers.not_nil!("Message lacks headers")["x-stream-offset"].as(Int64) end + + private def build_segment_indexes + @segments.each do |seg_id, mfile| + msg = BytesMessage.from_bytes(mfile.to_slice + 4u32) + @offset_index[seg_id] = offset_from_headers(msg.properties.headers) + @timestamp_index[seg_id] = msg.timestamp + rescue IndexError + @offset_index[seg_id] = @last_offset + @timestamp_index[seg_id] = RoughTime.unix_ms + end + end end end end