Skip to content

Commit

Permalink
feat: Create and Delete superstream commands
Browse files Browse the repository at this point in the history
  • Loading branch information
VictorGaiva committed Jan 14, 2024
1 parent 1d7832d commit 1c83f36
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 15 deletions.
4 changes: 2 additions & 2 deletions docs/support-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
| heartbeat | ✔️ | 3.9 |
| consumerupdate | ✔️ | 3.11 |
| streamstats | ✔️ | 3.11 |
| createsuperstream | | 3.11 |
| deletesuperstream | | 3.11 |
| createsuperstream | ✔️ | 3.11 |
| deletesuperstream | ✔️ | 3.11 |
| exchangecommandversions | ✔️ | 3.13 |
| route || 3.13 |
| partitions || 3.13 |
30 changes: 23 additions & 7 deletions lib/connection/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,22 @@ defmodule RabbitMQStream.Connection do
GenServer.call(__MODULE__, {:partitions, stream_name: stream_name})
end

def create_super_stream(name, partitions, binding_keys, arguments \\ [])
when is_binary(name) and
is_list(partitions) and
length(partitions) > 0 and
is_list(binding_keys) and
length(binding_keys) > 0 do
GenServer.call(
__MODULE__,
{:create_super_stream, name: name, partitions: partitions, binding_keys: binding_keys, arguments: arguments}
)
end

def delete_super_stream(name) when is_binary(name) do
GenServer.call(__MODULE__, {:delete_super_stream, name: name})
end

def respond(request, opts) when is_list(opts) do
GenServer.cast(__MODULE__, {:respond, request, opts})
end
Expand Down Expand Up @@ -285,19 +301,19 @@ defmodule RabbitMQStream.Connection do
:ok | {:error, reason :: atom()}

@doc """
The server will sometimes send a request to the client, which we must send a response to.
The server will sometimes send a request to the client, which we must send a response to.
And example request is the 'ConsumerUpdate', where the server expects a response with the
offset. So the connection sends the request to the subscription handler, which then calls
this function to send the response back to the server.
And example request is the 'ConsumerUpdate', where the server expects a response with the
offset. So the connection sends the request to the subscription handler, which then calls
this function to send the response back to the server.
"""
@callback respond(request :: RabbitMQStream.Message.Request.t(), opts :: Keyword.t()) :: :ok

@doc """
Adds the specified amount of credits to the subscription under the given `subscription_id`.
Adds the specified amount of credits to the subscription under the given `subscription_id`.
This function instantly returns `:ok` as the RabbitMQ Server only sends a response if the command fails,
which only happens if the subscription is not found. In that case the error is logged.
This function instantly returns `:ok` as the RabbitMQ Server only sends a response if the command fails,
which only happens if the subscription is not found. In that case the error is logged.
"""
@callback credit(subscription_id :: non_neg_integer(), credit :: non_neg_integer()) :: :ok
Expand Down
6 changes: 4 additions & 2 deletions lib/connection/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ defmodule RabbitMQStream.Connection.Handler do
:delete_publisher,
:subscribe,
:unsubscribe,
:stream_stats
:stream_stats,
:create_super_stream,
:delete_super_stream
] and
code not in [:ok, nil] do
{{pid, _data}, conn} = Helpers.pop_request_tracker(conn, command, response.correlation_id)
Expand Down Expand Up @@ -285,7 +287,7 @@ defmodule RabbitMQStream.Connection.Handler do
end

def handle_message(%Connection{} = conn, %Response{command: command} = response)
when command in [:create_stream, :delete_stream, :delete_publisher] do
when command in [:create_stream, :delete_stream, :delete_publisher, :create_super_stream, :delete_super_stream] do
{{pid, _data}, conn} = Helpers.pop_request_tracker(conn, command, response.correlation_id)

if pid != nil do
Expand Down
5 changes: 3 additions & 2 deletions lib/connection/lifecycle.ex
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ defmodule RabbitMQStream.Connection.Lifecycle do
end

def handle_call({command, opts}, from, %Connection{} = conn)
when command in [:route, :partitions] and is_map_key(conn.server_commands_versions, command) do
when command in [:route, :partitions, :create_super_stream, :delete_super_stream] and
is_map_key(conn.server_commands_versions, command) do
conn =
conn
|> Helpers.push_request_tracker(command, from)
Expand All @@ -128,7 +129,7 @@ defmodule RabbitMQStream.Connection.Lifecycle do
end

def handle_call({command, _opts}, _from, %Connection{peer_properties: %{"version" => version}} = conn)
when command in [:route, :partitions] do
when command in [:route, :partitions, :create_super_stream, :delete_super_stream] do
Logger.error("Command #{command} is not supported by the server. Its current informed version is '#{version}'.")

{:reply, {:error, :unsupported}, conn}
Expand Down
18 changes: 18 additions & 0 deletions lib/message/data/data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ defmodule RabbitMQStream.Message.Data do
def decode(%Response{command: :unsubscribe}, ""), do: %Types.UnsubscribeResponseData{}
def decode(%Response{command: :credit}, ""), do: %Types.CreditResponseData{}
def decode(%Response{command: :store_offset}, ""), do: %Types.StoreOffsetResponseData{}
def decode(%Response{command: :create_super_stream}, ""), do: %Types.CreateSuperStreamResponseData{}
def decode(%Response{command: :delete_super_stream}, ""), do: %Types.DeleteSuperStreamResponseData{}

def decode(%Request{command: :publish_confirm}, buffer) do
<<publisher_id::unsigned-integer-size(8), buffer::binary>> = buffer
Expand Down Expand Up @@ -399,6 +401,9 @@ defmodule RabbitMQStream.Message.Data do
{:single_active_consumer, name}, acc ->
[{"single-active-consumer", true}, {"name", name} | acc]

{:super_stream, name}, acc ->
[{"super-stream", name} | acc]

{key, value}, acc ->
[{String.replace("#{key}", "_", "-"), value} | acc]
end)
Expand Down Expand Up @@ -469,4 +474,17 @@ defmodule RabbitMQStream.Message.Data do
def encode(%Request{command: :stream_stats, data: data}) do
encode_string(data.stream_name)
end

def encode(%Request{command: :create_super_stream, data: data}) do
name = encode_string(data.name)
partitions = encode_array(data.partitions, &encode_string/1)
binding_keys = encode_array(data.binding_keys, &encode_string/1)
arguments = encode_map(data.arguments)

<<name::binary, partitions::binary, binding_keys::binary, arguments::binary>>
end

def encode(%Request{command: :delete_super_stream, data: data}) do
encode_string(data.name)
end
end
30 changes: 30 additions & 0 deletions lib/message/data/types.ex
Original file line number Diff line number Diff line change
Expand Up @@ -547,4 +547,34 @@ defmodule RabbitMQStream.Message.Types do
@type t :: %{stats: %{String.t() => integer()}}
defstruct [:stats]
end

defmodule CreateSuperStreamRequestData do
@enforce_keys [:name, :partitions, :binding_keys, :arguments]
@type t :: %{
name: String.t(),
partitions: [String.t()],
binding_keys: [String.t()],
arguments: Keyword.t(String.t())
}
defstruct [:name, :partitions, :binding_keys, :arguments]
end

defmodule CreateSuperStreamResponseData do
@moduledoc false
@type t :: %{}
defstruct []
end

defmodule DeleteSuperStreamRequestData do
@moduledoc false
@enforce_keys [:name]
@type t :: %{name: String.t()}
defstruct [:name]
end

defmodule DeleteSuperStreamResponseData do
@moduledoc false
@type t :: %{}
defstruct []
end
end
4 changes: 3 additions & 1 deletion lib/message/decoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ defmodule RabbitMQStream.Message.Decoder do
:partitions,
:exchange_command_versions,
:consumer_update,
:stream_stats
:stream_stats,
:create_super_stream,
:delete_super_stream
] do
<<correlation_id::unsigned-integer-size(32), code::unsigned-integer-size(16), buffer::binary>> = buffer

Expand Down
4 changes: 3 additions & 1 deletion lib/message/encoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ defmodule RabbitMQStream.Message.Encoder do
:route,
:partitions,
:exchange_command_versions,
:stream_stats
:stream_stats,
:create_super_stream,
:delete_super_stream
] do
<<
encode_command(command)::unsigned-integer-size(16),
Expand Down
7 changes: 7 additions & 0 deletions lib/message/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ defmodule RabbitMQStream.Message.Helpers do
<<size::integer-size(32), arr::binary>>
end

def encode_array(arr, foo) when is_function(foo, 1) do
size = Enum.count(arr)
arr = arr |> Enum.map(foo) |> Enum.reduce(&<>/2)

<<size::integer-size(32), arr::binary>>
end

def encode_map(nil) do
encode_array([])
end
Expand Down
18 changes: 18 additions & 0 deletions lib/message/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,24 @@ defmodule RabbitMQStream.Message do
}
end

def new_request(%Connection{} = conn, :create_super_stream, opts) do
%Request{
version: 1,
command: :create_super_stream,
correlation_id: conn.correlation_sequence,
data: struct(Types.CreateSuperStreamRequestData, opts)
}
end

def new_request(%Connection{} = conn, :delete_super_stream, opts) do
%Request{
version: 1,
command: :delete_super_stream,
correlation_id: conn.correlation_sequence,
data: struct(Types.DeleteSuperStreamRequestData, opts)
}
end

def new_response(%Connection{options: options}, :tune, correlation_id: correlation_id) do
%Response{
version: 1,
Expand Down
34 changes: 34 additions & 0 deletions test/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,38 @@ defmodule RabbitMQStreamTest.Connection do
:ok = SupervisedConnection.create_stream(@stream)
{:ok, _data} = SupervisedConnection.stream_stats(@stream)
end

@tag min_version: "3.13"
test "should create and delete a super_stream" do
{:ok, _} = SupervisedConnection.start_link(host: "localhost", vhost: "/")
:ok = SupervisedConnection.connect()

SupervisedConnection.delete_super_stream("invoices")

:ok =
SupervisedConnection.create_super_stream(
"invoices",
["invoices-0", "invoices-1", "invoices-2"],
["0", "1", "2"]
)

:ok = SupervisedConnection.delete_super_stream("invoices")
end

@tag min_version: "3.13"
test "should create and delete a super_stream" do
{:ok, _} = SupervisedConnection.start_link(host: "localhost", vhost: "/")
:ok = SupervisedConnection.connect()

SupervisedConnection.delete_super_stream("invoices")

:ok =
SupervisedConnection.create_super_stream(
"invoices",
["invoices-0", "invoices-1", "invoices-2"],
["0", "1", "2"]
)

:ok = SupervisedConnection.delete_super_stream("invoices")
end
end

0 comments on commit 1c83f36

Please sign in to comment.