Skip to content

Commit

Permalink
keep track of first offset and timestamp per segment (#817)
Browse files Browse the repository at this point in the history
Saves a hash with the first offset/timestamp for each segment, greatly reducing the time (and memory used) to find a message by offset and timestamps.

---------

Co-authored-by: Carl Hörberg <carl@84codes.com>
  • Loading branch information
viktorerlingsson and carlhoerberg authored Nov 13, 2024
1 parent 719c3e6 commit 1b9678e
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Remove the 'reset vhost' feature [#822](https://github.com/cloudamqp/lavinmq/pull/822/files)

### Added

- Added some indexing for streams, greatly increasing performance when looking up by offset or timestamp [#817](https://github.com/cloudamqp/lavinmq/pull/817)

## [2.0.0] - 2024-10-31

With the release of 2.0.0 we introduce High Availablility for LavinMQ in the form of clustering. With clustering, LavinMQ replicates data between nodes with our own replication protocol, and uses etcd for leader election. See [this post](https://lavinmq.com/blog/lavinmq-high-availability) in the LavinMQ blog or the [readme](https://github.com/cloudamqp/lavinmq?tab=readme-ov-file#clustering) for more information about clustering.
Expand Down
37 changes: 36 additions & 1 deletion src/lavinmq/amqp/queue/stream_queue_message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ module LavinMQ::AMQP
property max_age : Time::Span | Time::MonthSpan | Nil
getter last_offset : Int64
@segment_last_ts = Hash(UInt32, Int64).new(0i64) # used for max-age
@offset_index = Hash(UInt32, Int64).new # segment_id => offset of first msg
@timestamp_index = Hash(UInt32, Int64).new # segment_id => ts of first msg

def initialize(*args, **kwargs)
super
@last_offset = get_last_offset
build_segment_indexes
drop_overflow
end

Expand Down Expand Up @@ -62,7 +65,7 @@ module LavinMQ::AMQP
end

private def find_offset_in_segments(offset : Int | Time) : Tuple(Int64, UInt32, UInt32)
segment = @segments.first_key
segment = offset_index_lookup(offset)
pos = 4u32
msg_offset = 0i64
loop do
Expand All @@ -88,6 +91,23 @@ module LavinMQ::AMQP
{msg_offset, segment, pos}
end

private def offset_index_lookup(offset) : UInt32
seg = @segments.first_key
case offset
when Int
@offset_index.each do |seg_id, first_seg_offset|
break if first_seg_offset > offset
seg = seg_id
end
when Time
@timestamp_index.each do |seg_id, first_seg_ts|
break if Time.unix_ms(first_seg_ts) > offset
seg = seg_id
end
end
seg
end

def shift?(consumer : AMQP::StreamConsumer) : Envelope?
raise ClosedError.new if @closed

Expand Down Expand Up @@ -147,6 +167,8 @@ module LavinMQ::AMQP
private def open_new_segment(next_msg_size = 0) : MFile
super.tap do
drop_overflow
@offset_index[@segments.last_key] = @last_offset + 1
@timestamp_index[@segments.last_key] = RoughTime.unix_ms
end
end

Expand Down Expand Up @@ -178,6 +200,8 @@ module LavinMQ::AMQP
msg_count = @segment_msg_count.delete(seg_id)
@size -= msg_count if msg_count
@segment_last_ts.delete(seg_id)
@offset_index.delete(seg_id)
@timestamp_index.delete(seg_id)
@bytesize -= mfile.size - 4
mfile.delete.close
@replicator.try &.delete_file(mfile.path)
Expand Down Expand Up @@ -211,6 +235,17 @@ module LavinMQ::AMQP
private def offset_from_headers(headers) : Int64
headers.not_nil!("Message lacks headers")["x-stream-offset"].as(Int64)
end

private def build_segment_indexes
@segments.each do |seg_id, mfile|
msg = BytesMessage.from_bytes(mfile.to_slice + 4u32)
@offset_index[seg_id] = offset_from_headers(msg.properties.headers)
@timestamp_index[seg_id] = msg.timestamp
rescue IndexError
@offset_index[seg_id] = @last_offset
@timestamp_index[seg_id] = RoughTime.unix_ms
end
end
end
end
end

0 comments on commit 1b9678e

Please sign in to comment.