Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams - keep track of first offset and timestamp per segment #817

Merged
merged 14 commits into from
Nov 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -18,6 +18,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Remove the 'reset vhost' feature [#822](https://github.com/cloudamqp/lavinmq/pull/822/files)

### Added

- Added some indexing for streams, greatly increasing performance when looking up by offset or timestamp [#817](https://github.com/cloudamqp/lavinmq/pull/817)

## [2.0.0] - 2024-10-31

With the release of 2.0.0 we introduce High Availablility for LavinMQ in the form of clustering. With clustering, LavinMQ replicates data between nodes with our own replication protocol, and uses etcd for leader election. See [this post](https://lavinmq.com/blog/lavinmq-high-availability) in the LavinMQ blog or the [readme](https://github.com/cloudamqp/lavinmq?tab=readme-ov-file#clustering) for more information about clustering.
37 changes: 36 additions & 1 deletion src/lavinmq/amqp/queue/stream_queue_message_store.cr
Original file line number Diff line number Diff line change
@@ -9,10 +9,13 @@ module LavinMQ::AMQP
property max_age : Time::Span | Time::MonthSpan | Nil
getter last_offset : Int64
@segment_last_ts = Hash(UInt32, Int64).new(0i64) # used for max-age
@offset_index = Hash(UInt32, Int64).new # segment_id => offset of first msg
@timestamp_index = Hash(UInt32, Int64).new # segment_id => ts of first msg

def initialize(*args, **kwargs)
super
@last_offset = get_last_offset
build_segment_indexes
drop_overflow
end

@@ -62,7 +65,7 @@ module LavinMQ::AMQP
end

private def find_offset_in_segments(offset : Int | Time) : Tuple(Int64, UInt32, UInt32)
segment = @segments.first_key
segment = offset_index_lookup(offset)
pos = 4u32
msg_offset = 0i64
loop do
@@ -88,6 +91,23 @@ module LavinMQ::AMQP
{msg_offset, segment, pos}
end

private def offset_index_lookup(offset) : UInt32
seg = @segments.first_key
case offset
when Int
@offset_index.each do |seg_id, first_seg_offset|
break if first_seg_offset > offset
seg = seg_id
end
when Time
@timestamp_index.each do |seg_id, first_seg_ts|
break if Time.unix_ms(first_seg_ts) > offset
seg = seg_id
end
end
seg
end

def shift?(consumer : AMQP::StreamConsumer) : Envelope?
raise ClosedError.new if @closed

@@ -147,6 +167,8 @@ module LavinMQ::AMQP
private def open_new_segment(next_msg_size = 0) : MFile
super.tap do
drop_overflow
@offset_index[@segments.last_key] = @last_offset + 1
@timestamp_index[@segments.last_key] = RoughTime.unix_ms
end
end

@@ -178,6 +200,8 @@ module LavinMQ::AMQP
msg_count = @segment_msg_count.delete(seg_id)
@size -= msg_count if msg_count
@segment_last_ts.delete(seg_id)
@offset_index.delete(seg_id)
@timestamp_index.delete(seg_id)
@bytesize -= mfile.size - 4
mfile.delete.close
@replicator.try &.delete_file(mfile.path)
@@ -211,6 +235,17 @@ module LavinMQ::AMQP
private def offset_from_headers(headers) : Int64
headers.not_nil!("Message lacks headers")["x-stream-offset"].as(Int64)
end

private def build_segment_indexes
@segments.each do |seg_id, mfile|
msg = BytesMessage.from_bytes(mfile.to_slice + 4u32)
@offset_index[seg_id] = offset_from_headers(msg.properties.headers)
@timestamp_index[seg_id] = msg.timestamp
rescue IndexError
@offset_index[seg_id] = @last_offset
@timestamp_index[seg_id] = RoughTime.unix_ms
end
end
end
end
end
Loading
Oops, something went wrong.