diff --git a/CHANGELOG.md b/CHANGELOG.md index 34a4dca21a..6d783618e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Don't update user's password hash if given password is the same as current [#586](https://github.com/cloudamqp/lavinmq/pull/586) +- Remove old segments in the background for stream queues [#608](https://github.com/cloudamqp/lavinmq/pull/608) ## [1.2.5] - 2023-11-06 diff --git a/src/lavinmq/queue/stream_queue.cr b/src/lavinmq/queue/stream_queue.cr index 5d784d7eeb..766087aa21 100644 --- a/src/lavinmq/queue/stream_queue.cr +++ b/src/lavinmq/queue/stream_queue.cr @@ -8,7 +8,7 @@ module LavinMQ @exclusive = false, @auto_delete = false, @arguments = AMQP::Table.new) super - spawn unmap_unused_segments_loop, name: "StreamQueue#unmap_unused_segments_loop" + spawn unmap_and_remove_segments_loop, name: "StreamQueue#unmap_and_remove_segments_loop" end def apply_policy(policy : Policy?, operator_policy : OperatorPolicy?) @@ -146,14 +146,14 @@ module LavinMQ end end - private def unmap_unused_segments_loop + private def unmap_and_remove_segments_loop until closed? sleep 60 - unmap_unused_segments + unmap_and_remove_segments end end - private def unmap_unused_segments + private def unmap_and_remove_segments used_segments = Set(UInt32).new @consumers_lock.synchronize do @consumers.each do |consumer| @@ -161,6 +161,7 @@ module LavinMQ end end @msg_store_lock.synchronize do + stream_queue_msg_store.drop_overflow stream_queue_msg_store.unmap_segments(except: used_segments) end end