Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue should expire TTL milliseconds after last consumer left #924

Merged
merged 2 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions spec/policies_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,17 @@ describe LavinMQ::VHost do
end
end

it "should refresh queue last_get_time when expire policy applied" do
it "should update queue expiration" do
with_amqp_server do |s|
defs = {"expires" => JSON::Any.new(50_i64)} of String => JSON::Any
with_channel(s) do |ch|
ch.queue("qttl")
ch.queue("qttl", args: AMQP::Client::Arguments.new({"x-expires" => 100}))
queue = s.vhosts["/"].queues["qttl"]
first = queue.last_get_time
sleep 0.1.seconds
s.vhosts["/"].add_policy("qttl", "^.*$", "all", defs, 12_i8)
s.vhosts["/"].add_policy("qttl", "^.*$", "all", {"expires" => JSON::Any.new(200)}, 2_i8)
sleep 0.1.seconds
last = queue.last_get_time
last.should be > first
queue.closed?.should be_false
sleep 0.2.seconds
queue.closed?.should be_true
end
end
end
Expand Down
17 changes: 17 additions & 0 deletions spec/queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,23 @@ require "./spec_helper"
require "./../src/lavinmq/amqp/queue"

describe LavinMQ::AMQP::Queue do
it "should expire it self after last consumer disconnects" do
with_amqp_server do |s|
with_channel(s) do |ch|
q = ch.queue("qexpires", args: AMQP::Client::Arguments.new({"x-expires" => 100}))
queue = s.vhosts["/"].queues["qexpires"]
tag = q.subscribe { }
sleep 100.milliseconds
queue.closed?.should be_false
ch.basic_cancel(tag)
sleep 100.milliseconds
queue.closed?.should be_false
sleep 100.milliseconds
queue.closed?.should be_true
end
end
end

it "Should dead letter expired messages" do
with_amqp_server do |s|
with_channel(s) do |ch|
Expand Down
27 changes: 5 additions & 22 deletions src/lavinmq/amqp/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,17 @@ module LavinMQ::AMQP
getter consumer_timeout : UInt64? = Config.instance.consumer_timeout

@consumers_empty_change = ::Channel(Bool).new
@queue_expiration_ttl_change = ::Channel(Nil).new

private def queue_expire_loop
loop do
break unless @expires
if @consumers.empty? && (ttl = queue_expiration_ttl)
break unless ttl = @expires
if @consumers.empty?
@log.debug { "Queue expires in #{ttl}" }
select
when @queue_expiration_ttl_change.receive
when @consumers_empty_change.receive
when timeout ttl
when timeout ttl.milliseconds
expire_queue
close
break
Expand Down Expand Up @@ -115,7 +116,7 @@ module LavinMQ::AMQP
{"ack", "deliver", "deliver_get", "confirm", "get", "get_no_ack", "publish", "redeliver", "reject", "return_unroutable", "dedup"},
{"message_count", "unacked_count"})

getter name, arguments, vhost, consumers, last_get_time
getter name, arguments, vhost, consumers
getter? auto_delete, exclusive
getter policy : Policy?
getter operator_policy : OperatorPolicy?
Expand All @@ -133,7 +134,6 @@ module LavinMQ::AMQP
def initialize(@vhost : VHost, @name : String,
@exclusive = false, @auto_delete = false,
@arguments = AMQP::Table.new)
@last_get_time = RoughTime.monotonic
@data_dir = make_data_dir
@metadata = ::Log::Metadata.new(nil, {queue: @name, vhost: @vhost.name})
@log = Logger.new(Log, @metadata)
Expand Down Expand Up @@ -186,7 +186,6 @@ module LavinMQ::AMQP
end

def redeclare
@last_get_time = RoughTime.monotonic
@queue_expiration_ttl_change.try_send? nil
end

Expand Down Expand Up @@ -217,7 +216,6 @@ module LavinMQ::AMQP
when "expires"
unless @expires.try &.< v.as_i64
@expires = v.as_i64
@last_get_time = RoughTime.monotonic
spawn queue_expire_loop, name: "Queue#queue_expire_loop #{@vhost.name}/#{@name}"
@queue_expiration_ttl_change.try_send? nil
end
Expand Down Expand Up @@ -339,19 +337,6 @@ module LavinMQ::AMQP
File.delete(File.join(@data_dir, ".paused"))
end

@queue_expiration_ttl_change = ::Channel(Nil).new

private def queue_expiration_ttl : Time::Span?
if e = @expires
expires_in = @last_get_time + e.milliseconds - RoughTime.monotonic
if expires_in > Time::Span.zero
expires_in
else
Time::Span.zero
end
end
end

def close : Bool
return false if @closed
@closed = true
Expand Down Expand Up @@ -688,7 +673,6 @@ module LavinMQ::AMQP

def basic_get(no_ack, force = false, & : Envelope -> Nil) : Bool
return false if !@state.running? && (@state.paused? && !force)
@last_get_time = RoughTime.monotonic
@queue_expiration_ttl_change.try_send? nil
@get_count += 1
@deliver_get_count += 1
Expand Down Expand Up @@ -833,7 +817,6 @@ module LavinMQ::AMQP

def add_consumer(consumer : Client::Channel::Consumer)
return if @closed
@last_get_time = RoughTime.monotonic
@consumers_lock.synchronize do
was_empty = @consumers.empty?
@consumers << consumer
Expand Down
Loading