Skip to content

Commit

Permalink
Merge pull request #44 from cloudamqp/support_3_13
Browse files Browse the repository at this point in the history
Make plugin compatible with 3.13
  • Loading branch information
michaelklishin authored Jan 16, 2025
2 parents 40a4a73 + 02b72f0 commit 535ed49
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 56 deletions.
4 changes: 3 additions & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
build --enable_bzlmod
# build --registry=https://raw.githubusercontent.com/rabbitmq/bazel-central-registry/dev/

build --registry=https://bcr.bazel.build/
build --registry=https://raw.githubusercontent.com/rabbitmq/bazel-central-registry/erlang-packages/

build --incompatible_strict_action_env
build --local_test_jobs=1
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
PROJECT = rabbitmq_lvc_exchange
PROJECT_DESCRIPTION = RabbitMQ Last Value Cache exchange

RABBITMQ_VERSION ?= v3.12.x
RABBITMQ_VERSION ?= v3.13.x
current_rmq_ref = $(RABBITMQ_VERSION)

define PROJECT_APP_EXTRA_KEYS
{broker_version_requirements, ["3.12.0"]}
{broker_version_requirements, ["3.13.0"]}
endef

dep_amqp_client = git_rmq-subfolder rabbitmq-erlang-client $(RABBITMQ_VERSION)
Expand Down
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ binding key.

## Supported RabbitMQ Versions

The most recent release of this plugin targets RabbitMQ 3.12.x.
The most recent release of this plugin targets RabbitMQ 3.13.x.

This plugin requires Mnesia and does not work with Khepri!

## Supported Erlang/OTP Versions

Latest version of this plugin [requires Erlang 25.0 or later versions](https://www.rabbitmq.com/which-erlang.html), same as RabbitMQ 3.12.x.
Latest version of this plugin [requires Erlang 26.0 or later versions](https://www.rabbitmq.com/which-erlang.html), same as RabbitMQ 3.13.x.

## Installation

Expand Down Expand Up @@ -89,6 +91,6 @@ reverse routing match procedure.

1. Update `broker_version_requirements` in `helpers.bzl` & `Makefile` (Optional)
1. Update the plugin version in `MODULE.bazel`
1. Push a tag (i.e. `v3.12.0`) with the matching version
1. Push a tag (i.e. `v3.13.0`) with the matching version
1. Allow the Release workflow to run and create a draft release
1. Review and publish the release
104 changes: 54 additions & 50 deletions src/rabbit_exchange_type_lvc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
-module(rabbit_exchange_type_lvc).
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit/include/amqqueue.hrl").
-include_lib("rabbit/include/mc.hrl").
-include("rabbit_lvc_plugin.hrl").

-behaviour(rabbit_exchange_type).

-export([description/0, serialise_events/0, route/2]).
-export([description/0, serialise_events/0, route/3]).
-export([validate/1, validate_binding/2,
create/2, recover/2, delete/2, policy_changed/2,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
Expand All @@ -25,21 +26,16 @@ description() ->

serialise_events() -> false.

route(#exchange{name = Name},
#delivery{message = Msg}) ->
#basic_message{routing_keys = RKs} = Msg,
Keys = case RKs of
CC when is_list(CC) -> CC;
To -> [To]
end,
route(#exchange{name = Name}, Msg, _Opts) ->
RKs = mc:routing_keys(Msg),
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
[mnesia:write(?LVC_TABLE,
#cached{key = #cachekey{exchange=Name,
routing_key=K},
content = Msg},
write) ||
K <- Keys]
K <- RKs]
end),
rabbit_router:match_routing_key(Name, RKs).

Expand All @@ -64,49 +60,37 @@ delete(_Serial, _X) ->

policy_changed(_X1, _X2) -> ok.

add_binding(none, #exchange{ name = XName },
#binding{ key = RoutingKey,
destination = DestinationName }) ->
case rabbit_amqqueue:lookup(DestinationName) of
{error, not_found} ->

case rabbit_exchange:lookup(DestinationName) of
{error, not_found} ->
rabbit_misc:protocol_error(
internal_error,
"could not find destination '~s'",
[DestinationName]);

{ok, E = #exchange{}} ->

case mnesia:dirty_read(
?LVC_TABLE,
#cachekey{ exchange=XName,
routing_key=RoutingKey }) of
[] ->
add_binding(none,
#exchange{name = XName },
#binding{key = RoutingKey,
destination = #resource{kind = queue} = QName}) ->
_ = case rabbit_amqqueue:lookup(QName) of
{error, not_found} ->
destination_not_found_error(QName);
{ok, Q} ->
case get_msg_from_cache(XName, RoutingKey) of
not_found ->
ok;
[#cached{content = Msg}] ->
Delivery = rabbit_basic:delivery(
false, false, Msg, undefined),
Qs = rabbit_amqqueue:lookup_many(
rabbit_exchange:route(E, Delivery)),
rabbit_amqqueue:deliver(Qs, Delivery)
Msg ->
rabbit_queue_type:deliver([Q], Msg, #{}, stateless)
end
end;


{ok, Q} when ?is_amqqueue(Q) ->
case mnesia:dirty_read(
?LVC_TABLE,
#cachekey{ exchange=XName,
routing_key=RoutingKey }) of
[] ->
ok;
[#cached{content = Msg}] ->
rabbit_amqqueue:deliver(
[Q], rabbit_basic:delivery(false, false, Msg, undefined))
end
end,
end,
ok;
add_binding(none,
#exchange{name = XName},
#binding{key = RoutingKey,
destination = #resource{kind = exchange} = DestName}) ->
_ = case rabbit_exchange:lookup(DestName) of
{error, not_found} ->
destination_not_found_error(DestName);
{ok, X} ->
case get_msg_from_cache(XName, RoutingKey) of
not_found ->
ok;
Msg ->
rabbit_queue_type:publish_at_most_once(X, Msg)
end
end,
ok;
add_binding(_Serial, _X, _B) ->
ok.
Expand All @@ -115,3 +99,23 @@ remove_bindings(_Serial, _X, _Bs) -> ok.

assert_args_equivalence(X, Args) ->
rabbit_exchange_type_direct:assert_args_equivalence(X, Args).

-spec get_msg_from_cache(rabbit_types:exchange_name(),
rabbit_types:routing_key()) -> mc:state() | not_found.
get_msg_from_cache(XName, RoutingKey) ->
case mnesia:dirty_read(
?LVC_TABLE,
#cachekey{exchange = XName,
routing_key = RoutingKey }) of
[] ->
not_found;
[#cached{content = Msg}] ->
mc:set_annotation(?ANN_ROUTING_KEYS, [RoutingKey], Msg)
end.

-spec destination_not_found_error(rabbit_types:r('exchange' | 'queue')) -> no_return().
destination_not_found_error(DestName) ->
rabbit_misc:protocol_error(
internal_error,
"could not find destination '~ts'",
[rabbit_misc:rs(DestName)]).

0 comments on commit 535ed49

Please sign in to comment.