Skip to content

Commit

Permalink
feat: Create and Delete superstreams
Browse files Browse the repository at this point in the history
  • Loading branch information
VictorGaiva committed Jan 14, 2024
1 parent 1c83f36 commit 691dc96
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 38 deletions.
6 changes: 3 additions & 3 deletions docs/support-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
| heartbeat | ✔️ | 3.9 |
| consumerupdate | ✔️ | 3.11 |
| streamstats | ✔️ | 3.11 |
| createsuperstream | ✔️ | 3.11 |
| deletesuperstream | ✔️ | 3.11 |
| exchangecommandversions | ✔️ | 3.13 |
| createsuperstream | ✔️ | 3.13 |
| deletesuperstream | ✔️ | 3.13 |
| route || 3.13 |
| partitions | | 3.13 |
| partitions | ✔️ | 3.13 |
4 changes: 2 additions & 2 deletions lib/connection/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ defmodule RabbitMQStream.Connection do
GenServer.call(__MODULE__, {:stream_stats, stream_name: stream_name})
end

def partitions(stream_name) when is_binary(stream_name) do
GenServer.call(__MODULE__, {:partitions, stream_name: stream_name})
def partitions(super_stream) when is_binary(super_stream) do
GenServer.call(__MODULE__, {:partitions, super_stream: super_stream})
end

def create_super_stream(name, partitions, binding_keys, arguments \\ [])
Expand Down
9 changes: 6 additions & 3 deletions lib/connection/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ defmodule RabbitMQStream.Connection.Handler do
:unsubscribe,
:stream_stats,
:create_super_stream,
:delete_super_stream
:delete_super_stream,
:route,
:partitions
] and
code not in [:ok, nil] do
{{pid, _data}, conn} = Helpers.pop_request_tracker(conn, command, response.correlation_id)
Expand Down Expand Up @@ -276,8 +278,9 @@ defmodule RabbitMQStream.Connection.Handler do
%{conn | state: :open, connect_requests: [], server_commands_versions: server_commands_versions}
end

def handle_message(%Connection{} = conn, %Response{command: :stream_stats, data: data} = response) do
{{pid, _data}, conn} = Helpers.pop_request_tracker(conn, :stream_stats, response.correlation_id)
def handle_message(%Connection{} = conn, %Response{command: command, data: data} = response)
when command in [:route, :partitions, :stream_stats] do
{{pid, _data}, conn} = Helpers.pop_request_tracker(conn, command, response.correlation_id)

if pid != nil do
GenServer.reply(pid, {:ok, data})
Expand Down
16 changes: 12 additions & 4 deletions lib/message/data/data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,23 @@ defmodule RabbitMQStream.Message.Data do
end

def decode(%Response{command: :route}, buffer) do
{"", stream} = decode_string(buffer)
{"", streams} =
decode_array(buffer, fn buffer, acc ->
{buffer, value} = decode_string(buffer)
{buffer, [value | acc]}
end)

%Types.RouteResponseData{stream: stream}
%Types.RouteResponseData{streams: streams}
end

def decode(%Response{command: :partitions}, buffer) do
{"", stream} = decode_string(buffer)
{"", streams} =
decode_array(buffer, fn buffer, acc ->
{buffer, value} = decode_string(buffer)
{buffer, [value | acc]}
end)

%Types.RouteResponseData{stream: stream}
%Types.PartitionsQueryResponseData{streams: streams}
end

def decode(%Response{command: :exchange_command_versions}, buffer) do
Expand Down
12 changes: 6 additions & 6 deletions lib/message/data/types.ex
Original file line number Diff line number Diff line change
Expand Up @@ -446,9 +446,9 @@ defmodule RabbitMQStream.Message.Types do

defmodule RouteResponseData do
@moduledoc false
@enforce_keys [:stream]
@type t :: %{stream: String.t()}
defstruct [:stream]
@enforce_keys [:streams]
@type t :: %{streams: [String.t()]}
defstruct [:streams]
end

defmodule PartitionsQueryRequestData do
Expand All @@ -460,9 +460,9 @@ defmodule RabbitMQStream.Message.Types do

defmodule PartitionsQueryResponseData do
@moduledoc false
@enforce_keys [:stream]
@type t :: %{stream: String.t()}
defstruct [:stream]
@enforce_keys [:streams]
@type t :: %{streams: [String.t()]}
defstruct [:streams]
end

defmodule DeliverData do
Expand Down
25 changes: 5 additions & 20 deletions test/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -171,29 +171,14 @@ defmodule RabbitMQStreamTest.Connection do

SupervisedConnection.delete_super_stream("invoices")

:ok =
SupervisedConnection.create_super_stream(
"invoices",
["invoices-0", "invoices-1", "invoices-2"],
["0", "1", "2"]
)

:ok = SupervisedConnection.delete_super_stream("invoices")
end
partitions = ["invoices-0", "invoices-1", "invoices-2"]

@tag min_version: "3.13"
test "should create and delete a super_stream" do
{:ok, _} = SupervisedConnection.start_link(host: "localhost", vhost: "/")
:ok = SupervisedConnection.connect()
:ok =
SupervisedConnection.create_super_stream("invoices", partitions, ["0", "1", "2"])

SupervisedConnection.delete_super_stream("invoices")
{:ok, %{streams: streams}} = SupervisedConnection.partitions("invoices")

:ok =
SupervisedConnection.create_super_stream(
"invoices",
["invoices-0", "invoices-1", "invoices-2"],
["0", "1", "2"]
)
assert Enum.all?(partitions, &(&1 in streams))

:ok = SupervisedConnection.delete_super_stream("invoices")
end
Expand Down

0 comments on commit 691dc96

Please sign in to comment.