From 7f8e3ad649ede3f6cc30e07d535456ece6ac9656 Mon Sep 17 00:00:00 2001 From: Magnus Landerblom Date: Mon, 27 Nov 2023 13:56:35 +0100 Subject: [PATCH 1/6] Bugfix: DLX parameters aren't updated when the messages is expired The effect of this bug is that `count` on a DLX message is counted up to 2 but never higher. It's set to 1 when the headers are created, we get the frist count and increase that but instead of updating in place the new count is added as a new field in the table. AMP::Table doesn't support `Symbol` for it's `#delete` method which results in when using `#merge!` the new pairs are added onto the backing IO for each call. The result is that after three expires the table looks like this: ``` AMQ::Protocol::Table(@queue="alarms.notifications", @reason="rejected", @exchange="amq.topic", @count=1, @time=2023-11-27 12:46:51.0 UTC, @routing-keys=["alarms.notify"], @count=2, @time=2023-11-27 12:47:01.0 UTC, @routing-keys=["alarms.notify"], @count=2, @time=2023-11-27 12:47:11.0 UTC, @routing-keys=["alarms.notify"]) ``` If instead each key is set it overrides the previous value as expected. --- src/lavinmq/queue/queue.cr | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/lavinmq/queue/queue.cr b/src/lavinmq/queue/queue.cr index 2c7f935aa9..979928f1c9 100644 --- a/src/lavinmq/queue/queue.cr +++ b/src/lavinmq/queue/queue.cr @@ -600,11 +600,9 @@ module LavinMQ next if xd["reason"]? != reason.to_s next if xd["exchange"]? != exchange_name count = xd["count"].as?(Int) || 0 - xd.merge!({ - count: count + 1, - time: RoughTime.utc, - "routing-keys": routing_keys, - }) + xd["count"] = count + 1 + xd["time"] = RoughTime.utc + xd["routing-keys"] = routing_keys xd["original-expiration"] = expiration if expiration found_at = idx break From ed334bf4eb9c3f24c4dcc84e935df638e354b67e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Tue, 28 Nov 2023 09:26:15 +0100 Subject: [PATCH 2/6] Add a spec to verify that count is updated --- spec/dlx_spec.cr | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/spec/dlx_spec.cr b/spec/dlx_spec.cr index 6589876cd3..d1ffd0848c 100644 --- a/spec/dlx_spec.cr +++ b/spec/dlx_spec.cr @@ -58,4 +58,37 @@ describe "Dead lettering" do msg.timestamp.should be > ts end + + it "should update count in x-death" do + with_channel do |ch| + q_name = "q_dlx" + q = ch.queue(q_name, args: AMQP::Client::Arguments.new( + {"x-dead-letter-exchange" => "", "x-dead-letter-routing-key" => q_name} + )) + + done = Channel(AMQP::Client::DeliverMessage).new + i = 0 + q.subscribe(no_ack: false) do |env| + if i == 10 + env.ack + done.send env + else + env.reject + end + i += 1 + end + ch.default_exchange.publish_confirm("msg", q.name) + + msg = done.receive + headers = msg.properties.headers.should_not be_nil + x_death = headers["x-death"].as?(Array(AMQ::Protocol::Field)).should_not be_nil + x_death_q_dlx_rejected = x_death.find do |xd| + xd = xd.as(AMQ::Protocol::Table) + xd["queue"] == q_name && + xd["reason"] == "rejected" + end + x_death_q_dlx_rejected = x_death_q_dlx_rejected.as?(AMQ::Protocol::Table).should_not be_nil + x_death_q_dlx_rejected["count"].should eq 10 + end + end end From 0ceab85cfa99deaf9e668b3b91ce26674947e4d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Tue, 28 Nov 2023 20:43:48 +0100 Subject: [PATCH 3/6] Use two queues two get two x-deaths --- spec/dlx_spec.cr | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/spec/dlx_spec.cr b/spec/dlx_spec.cr index d1ffd0848c..bbb4a276d6 100644 --- a/spec/dlx_spec.cr +++ b/spec/dlx_spec.cr @@ -63,17 +63,19 @@ describe "Dead lettering" do with_channel do |ch| q_name = "q_dlx" q = ch.queue(q_name, args: AMQP::Client::Arguments.new( - {"x-dead-letter-exchange" => "", "x-dead-letter-routing-key" => q_name} + {"x-dead-letter-exchange" => "", "x-dead-letter-routing-key" => "#{q_name}2"} + )) + _q2 = ch.queue("#{q_name}2", args: AMQP::Client::Arguments.new( + {"x-dead-letter-exchange" => "", "x-dead-letter-routing-key" => q_name, "x-message-ttl" => 1} )) done = Channel(AMQP::Client::DeliverMessage).new i = 0 q.subscribe(no_ack: false) do |env| + env.reject if i == 10 env.ack done.send env - else - env.reject end i += 1 end @@ -89,6 +91,14 @@ describe "Dead lettering" do end x_death_q_dlx_rejected = x_death_q_dlx_rejected.as?(AMQ::Protocol::Table).should_not be_nil x_death_q_dlx_rejected["count"].should eq 10 + + x_death_q_dlx2_expired = x_death.find do |xd| + xd = xd.as(AMQ::Protocol::Table) + xd["queue"] == "#{q_name}2" && + xd["reason"] == "expired" + end + x_death_q_dlx2_expired = x_death_q_dlx2_expired.as?(AMQ::Protocol::Table).should_not be_nil + x_death_q_dlx2_expired["count"].should eq 10 end end end From b08dfebb0c7f6bd6e6b0b877923e1f132fa7e6a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Wed, 29 Nov 2023 13:16:17 +0100 Subject: [PATCH 4/6] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d783618e8..7e5223d48e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Don't update user's password hash if given password is the same as current [#586](https://github.com/cloudamqp/lavinmq/pull/586) - Remove old segments in the background for stream queues [#608](https://github.com/cloudamqp/lavinmq/pull/608) +- A bug prevented `count` from being updated correctly in `x-death` headers [#601](https://github.com/cloudamqp/lavinmq/pull/601) ## [1.2.5] - 2023-11-06 From 3f0bcfb347c0f57f2087ddce86ce02b10ec455fc Mon Sep 17 00:00:00 2001 From: Christina Date: Tue, 12 Dec 2023 12:10:12 +0100 Subject: [PATCH 5/6] Revert "Bugfix: DLX parameters aren't updated when the messages is expired" This reverts commit 7f8e3ad649ede3f6cc30e07d535456ece6ac9656. --- src/lavinmq/queue/queue.cr | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/lavinmq/queue/queue.cr b/src/lavinmq/queue/queue.cr index 979928f1c9..2c7f935aa9 100644 --- a/src/lavinmq/queue/queue.cr +++ b/src/lavinmq/queue/queue.cr @@ -600,9 +600,11 @@ module LavinMQ next if xd["reason"]? != reason.to_s next if xd["exchange"]? != exchange_name count = xd["count"].as?(Int) || 0 - xd["count"] = count + 1 - xd["time"] = RoughTime.utc - xd["routing-keys"] = routing_keys + xd.merge!({ + count: count + 1, + time: RoughTime.utc, + "routing-keys": routing_keys, + }) xd["original-expiration"] = expiration if expiration found_at = idx break From e3e5fae52edef36494e8b41b6c01fd46191fa23d Mon Sep 17 00:00:00 2001 From: Christina Date: Tue, 12 Dec 2023 12:18:42 +0100 Subject: [PATCH 6/6] remove reverted bugfix from changelog --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e5223d48e..6d783618e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Don't update user's password hash if given password is the same as current [#586](https://github.com/cloudamqp/lavinmq/pull/586) - Remove old segments in the background for stream queues [#608](https://github.com/cloudamqp/lavinmq/pull/608) -- A bug prevented `count` from being updated correctly in `x-death` headers [#601](https://github.com/cloudamqp/lavinmq/pull/601) ## [1.2.5] - 2023-11-06