From 19a9fb8df77541311a2175c741af96b8acf0239d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 7 Feb 2022 10:28:20 +0100 Subject: [PATCH] Handle stream coordinator error response for topology request A stream may not be available or in an inconsistent state and the stream coordinator reports this with an error that the stream manager handles like an appropriate response. The stream protocol adapter then tries to extract the topology from the error. This commit makes the stream manager handles the error correctly so that the stream protocol adapter reports the unavailability of the stream to the client. (cherry picked from commit 043989f4fbc80754cf6aa02ea6e9b653efb31bca) --- .../src/rabbit_stream_manager.erl | 58 +++++++++---------- .../src/rabbit_stream_reader.erl | 2 +- 2 files changed, 29 insertions(+), 31 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index b8f4fdc923c1..2bb65d581d57 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is Pivotal Software, Inc. -%% Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved. +%% Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. %% -module(rabbit_stream_manager). @@ -361,35 +361,33 @@ handle_call({topology, VirtualHost, Stream}, _From, State) -> true -> QState = amqqueue:get_type_state(Q), #{name := StreamName} = QState, - StreamMembers = - case rabbit_stream_coordinator:members(StreamName) - of - {ok, Members} -> - maps:fold(fun (_Node, {undefined, _Role}, - Acc) -> - Acc; - (LeaderNode, {_Pid, writer}, - Acc) -> - Acc#{leader_node => - LeaderNode}; - (ReplicaNode, - {_Pid, replica}, Acc) -> - #{replica_nodes := - ReplicaNodes} = - Acc, - Acc#{replica_nodes => - ReplicaNodes - ++ [ReplicaNode]}; - (_Node, _, Acc) -> - Acc - end, - #{leader_node => undefined, - replica_nodes => []}, - Members); - _ -> - {error, stream_not_found} - end, - {ok, StreamMembers}; + case rabbit_stream_coordinator:members(StreamName) of + {ok, Members} -> + {ok, + maps:fold(fun (_Node, {undefined, _Role}, + Acc) -> + Acc; + (LeaderNode, {_Pid, writer}, + Acc) -> + Acc#{leader_node => + LeaderNode}; + (ReplicaNode, {_Pid, replica}, + Acc) -> + #{replica_nodes := + ReplicaNodes} = + Acc, + Acc#{replica_nodes => + ReplicaNodes + ++ [ReplicaNode]}; + (_Node, _, Acc) -> + Acc + end, + #{leader_node => undefined, + replica_nodes => []}, + Members)}; + _ -> + {error, stream_not_available} + end; _ -> {error, stream_not_found} end; diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 30ae4f5a4f12..831d342cb7dd 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -9,7 +9,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is Pivotal Software, Inc. -%% Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved. +%% Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved. %% -module(rabbit_stream_reader).