Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop overflow in loop for stream_queue to make sure max-age is respected #608

Merged
merged 3 commits into from
Dec 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- Don't update user's password hash if given password is the same as current [#586](https://github.com/cloudamqp/lavinmq/pull/586)
- Remove old segments in the background for stream queues [#608](https://github.com/cloudamqp/lavinmq/pull/608)

## [1.2.5] - 2023-11-06

9 changes: 5 additions & 4 deletions src/lavinmq/queue/stream_queue.cr
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ module LavinMQ
@exclusive = false, @auto_delete = false,
@arguments = AMQP::Table.new)
super
spawn unmap_unused_segments_loop, name: "StreamQueue#unmap_unused_segments_loop"
spawn unmap_and_remove_segments_loop, name: "StreamQueue#unmap_and_remove_segments_loop"
end

def apply_policy(policy : Policy?, operator_policy : OperatorPolicy?)
@@ -146,21 +146,22 @@ module LavinMQ
end
end

private def unmap_unused_segments_loop
private def unmap_and_remove_segments_loop
until closed?
sleep 60
unmap_unused_segments
unmap_and_remove_segments
end
end

private def unmap_unused_segments
private def unmap_and_remove_segments
used_segments = Set(UInt32).new
@consumers_lock.synchronize do
@consumers.each do |consumer|
used_segments << consumer.as(Client::Channel::StreamConsumer).segment
end
end
@msg_store_lock.synchronize do
stream_queue_msg_store.drop_overflow
stream_queue_msg_store.unmap_segments(except: used_segments)
end
end