Skip to content

Commit

Permalink
replicate consumer offsets file. remove unused code
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorerlingsson committed Jun 27, 2024
1 parent 834bb33 commit 221f248
Showing 1 changed file with 3 additions and 7 deletions.
10 changes: 3 additions & 7 deletions src/lavinmq/queue/stream_queue_message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module LavinMQ
super
@last_offset = get_last_offset
@consumer_offsets = MFile.new(File.join(@queue_data_dir, "consumer_offsets"), Config.instance.segment_size)
@replicator.try &.register_file @consumer_offsets
@consumer_offset_positions = restore_consumer_offset_positions
drop_overflow
end
Expand Down Expand Up @@ -133,19 +134,13 @@ module LavinMQ
@consumer_offsets.write_bytes AMQ::Protocol::ShortString.new(consumer_tag)
@consumer_offset_positions[consumer_tag] = @consumer_offsets.size
@consumer_offsets.write_bytes new_offset
# replicate
@replicator.try &.append(@consumer_offsets.path, (@consumer_offsets.size - consumer_tag.bytesize - 1 - 8).to_i32)
end

def consumer_offset_file_full?(consumer_tag)
(@consumer_offsets.size + 1 + consumer_tag.bytesize + 8) >= @consumer_offsets.capacity
end

def expand_consumer_offset_file
pos = @consumer_offsets.size
@consumer_offsets = MFile.new(@consumer_offsets.path, @consumer_offsets.capacity + Config.instance.segment_size)
@consumer_offsets.resize(pos)
end

def cleanup_consumer_offsets
return if @consumer_offsets.size.zero?

Expand All @@ -172,6 +167,7 @@ module LavinMQ
yield # fill the new file with correct data in this block
@consumer_offsets.rename(old_consumer_offsets.path)
old_consumer_offsets.close(truncate_to_size: false)
@replicator.try &.replace_file @consumer_offsets.path
end

def shift?(consumer : Client::Channel::StreamConsumer) : Envelope?
Expand Down

0 comments on commit 221f248

Please sign in to comment.