Skip to content

Commit

Permalink
add indexing for timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorerlingsson committed Nov 4, 2024
1 parent 5791b10 commit eb8bcb4
Showing 1 changed file with 22 additions and 4 deletions.
26 changes: 22 additions & 4 deletions src/lavinmq/amqp/queue/stream_queue_message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ module LavinMQ::AMQP
getter last_offset : Int64
@segment_last_ts = Hash(UInt32, Int64).new(0i64) # used for max-age
@offset_index : Hash(UInt32, Int64) # segment_id => offset of first msg
@offset_index_ts : Hash(UInt32, Int64) # segment_id => ts of first msg

def initialize(*args, **kwargs)
super
@last_offset = get_last_offset
@offset_index = build_segment_offset_index
@offset_index_ts = build_segment_offset_index_ts
drop_overflow
end

Expand Down Expand Up @@ -92,10 +94,17 @@ module LavinMQ::AMQP

private def offset_index_lookup(offset) : UInt32
seg = @segments.first_key
return seg unless offset.is_a?(Int)
@offset_index.each do |seg_id, first_seg_offset|
break if first_seg_offset >= offset
seg = seg_id
case offset
when Int
@offset_index.each do |seg_id, first_seg_offset|
break if first_seg_offset >= offset
seg = seg_id
end
when Time
@offset_index_ts.each do |seg_id, first_seg_ts|
break if Time.unix_ms(first_seg_ts) >= offset
seg = seg_id
end
end
seg
end
Expand Down Expand Up @@ -234,6 +243,15 @@ module LavinMQ::AMQP
@last_offset
end
end

private def build_segment_offset_index_ts : Hash(UInt32, Int64)
@segments.transform_values do |mfile|
msg = BytesMessage.from_bytes(mfile.to_slice + 4u32)
msg.timestamp
rescue IndexError
RoughTime.unix_ms
end
end
end
end
end

0 comments on commit eb8bcb4

Please sign in to comment.