Skip to content

Commit

Permalink
Drop overflow in loop for stream_queue to make sure max-age is respec…
Browse files Browse the repository at this point in the history
…ted (#608)

Runs drop_overflow every 60s in unmap_and_remove_segments_loop to make sure segments with old messages are removed

Co-authored-by: Carl Hörberg <carl@84codes.com>
  • Loading branch information
viktorerlingsson and carlhoerberg authored Dec 11, 2023
1 parent 9b4ccbc commit d8553d4
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- Don't update user's password hash if given password is the same as current [#586](https://github.com/cloudamqp/lavinmq/pull/586)
- Remove old segments in the background for stream queues [#608](https://github.com/cloudamqp/lavinmq/pull/608)

## [1.2.5] - 2023-11-06

Expand Down
9 changes: 5 additions & 4 deletions src/lavinmq/queue/stream_queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module LavinMQ
@exclusive = false, @auto_delete = false,
@arguments = AMQP::Table.new)
super
spawn unmap_unused_segments_loop, name: "StreamQueue#unmap_unused_segments_loop"
spawn unmap_and_remove_segments_loop, name: "StreamQueue#unmap_and_remove_segments_loop"
end

def apply_policy(policy : Policy?, operator_policy : OperatorPolicy?)
Expand Down Expand Up @@ -146,21 +146,22 @@ module LavinMQ
end
end

private def unmap_unused_segments_loop
private def unmap_and_remove_segments_loop
until closed?
sleep 60
unmap_unused_segments
unmap_and_remove_segments
end
end

private def unmap_unused_segments
private def unmap_and_remove_segments
used_segments = Set(UInt32).new
@consumers_lock.synchronize do
@consumers.each do |consumer|
used_segments << consumer.as(Client::Channel::StreamConsumer).segment
end
end
@msg_store_lock.synchronize do
stream_queue_msg_store.drop_overflow
stream_queue_msg_store.unmap_segments(except: used_segments)
end
end
Expand Down

0 comments on commit d8553d4

Please sign in to comment.