Skip to content

Commit

Permalink
Queue should expire TTL milliseconds after last consumer left (#924)
Browse files Browse the repository at this point in the history
This fixes a bug where if a queue was declared with x-expires X and a
consumer disconnected after X+1 the queue was deleted immediately.
  • Loading branch information
carlhoerberg authored Jan 28, 2025
1 parent 09876b6 commit 5a37697
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 29 deletions.
13 changes: 6 additions & 7 deletions spec/policies_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,17 @@ describe LavinMQ::VHost do
end
end

it "should refresh queue last_get_time when expire policy applied" do
it "should update queue expiration" do
with_amqp_server do |s|
defs = {"expires" => JSON::Any.new(50_i64)} of String => JSON::Any
with_channel(s) do |ch|
ch.queue("qttl")
ch.queue("qttl", args: AMQP::Client::Arguments.new({"x-expires" => 100}))
queue = s.vhosts["/"].queues["qttl"]
first = queue.last_get_time
sleep 0.1.seconds
s.vhosts["/"].add_policy("qttl", "^.*$", "all", defs, 12_i8)
s.vhosts["/"].add_policy("qttl", "^.*$", "all", {"expires" => JSON::Any.new(200)}, 2_i8)
sleep 0.1.seconds
last = queue.last_get_time
last.should be > first
queue.closed?.should be_false
sleep 0.2.seconds
queue.closed?.should be_true
end
end
end
Expand Down
17 changes: 17 additions & 0 deletions spec/queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,23 @@ require "./spec_helper"
require "./../src/lavinmq/amqp/queue"

describe LavinMQ::AMQP::Queue do
it "should expire it self after last consumer disconnects" do
with_amqp_server do |s|
with_channel(s) do |ch|
q = ch.queue("qexpires", args: AMQP::Client::Arguments.new({"x-expires" => 100}))
queue = s.vhosts["/"].queues["qexpires"]
tag = q.subscribe { }
sleep 100.milliseconds
queue.closed?.should be_false
ch.basic_cancel(tag)
sleep 100.milliseconds
queue.closed?.should be_false
sleep 100.milliseconds
queue.closed?.should be_true
end
end
end

it "Should dead letter expired messages" do
with_amqp_server do |s|
with_channel(s) do |ch|
Expand Down
27 changes: 5 additions & 22 deletions src/lavinmq/amqp/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,17 @@ module LavinMQ::AMQP
getter consumer_timeout : UInt64? = Config.instance.consumer_timeout

@consumers_empty_change = ::Channel(Bool).new
@queue_expiration_ttl_change = ::Channel(Nil).new

private def queue_expire_loop
loop do
break unless @expires
if @consumers.empty? && (ttl = queue_expiration_ttl)
break unless ttl = @expires
if @consumers.empty?
@log.debug { "Queue expires in #{ttl}" }
select
when @queue_expiration_ttl_change.receive
when @consumers_empty_change.receive
when timeout ttl
when timeout ttl.milliseconds
expire_queue
close
break
Expand Down Expand Up @@ -115,7 +116,7 @@ module LavinMQ::AMQP
{"ack", "deliver", "deliver_get", "confirm", "get", "get_no_ack", "publish", "redeliver", "reject", "return_unroutable", "dedup"},
{"message_count", "unacked_count"})

getter name, arguments, vhost, consumers, last_get_time
getter name, arguments, vhost, consumers
getter? auto_delete, exclusive
getter policy : Policy?
getter operator_policy : OperatorPolicy?
Expand All @@ -133,7 +134,6 @@ module LavinMQ::AMQP
def initialize(@vhost : VHost, @name : String,
@exclusive = false, @auto_delete = false,
@arguments = AMQP::Table.new)
@last_get_time = RoughTime.monotonic
@data_dir = make_data_dir
@metadata = ::Log::Metadata.new(nil, {queue: @name, vhost: @vhost.name})
@log = Logger.new(Log, @metadata)
Expand Down Expand Up @@ -186,7 +186,6 @@ module LavinMQ::AMQP
end

def redeclare
@last_get_time = RoughTime.monotonic
@queue_expiration_ttl_change.try_send? nil
end

Expand Down Expand Up @@ -217,7 +216,6 @@ module LavinMQ::AMQP
when "expires"
unless @expires.try &.< v.as_i64
@expires = v.as_i64
@last_get_time = RoughTime.monotonic
spawn queue_expire_loop, name: "Queue#queue_expire_loop #{@vhost.name}/#{@name}"
@queue_expiration_ttl_change.try_send? nil
end
Expand Down Expand Up @@ -339,19 +337,6 @@ module LavinMQ::AMQP
File.delete(File.join(@data_dir, ".paused"))
end

@queue_expiration_ttl_change = ::Channel(Nil).new

private def queue_expiration_ttl : Time::Span?
if e = @expires
expires_in = @last_get_time + e.milliseconds - RoughTime.monotonic
if expires_in > Time::Span.zero
expires_in
else
Time::Span.zero
end
end
end

def close : Bool
return false if @closed
@closed = true
Expand Down Expand Up @@ -688,7 +673,6 @@ module LavinMQ::AMQP

def basic_get(no_ack, force = false, & : Envelope -> Nil) : Bool
return false if !@state.running? && (@state.paused? && !force)
@last_get_time = RoughTime.monotonic
@queue_expiration_ttl_change.try_send? nil
@get_count += 1
@deliver_get_count += 1
Expand Down Expand Up @@ -833,7 +817,6 @@ module LavinMQ::AMQP

def add_consumer(consumer : Client::Channel::Consumer)
return if @closed
@last_get_time = RoughTime.monotonic
@consumers_lock.synchronize do
was_empty = @consumers.empty?
@consumers << consumer
Expand Down

0 comments on commit 5a37697

Please sign in to comment.