Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Split user and internal commands buffers #13

Merged
merged 1 commit into from
Mar 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions lib/connection/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@
| {:frame_max, non_neg_integer()}
| {:heartbeat, non_neg_integer()}
| {:lazy, boolean()}
@type t() :: %RabbitMQStream.Connection{

Check warning on line 514 in lib/connection/connection.ex

View workflow job for this annotation

GitHub Actions / docs

documentation references type "RabbitMQStream.Message.Helpers.command()" but it is hidden or private

Check warning on line 514 in lib/connection/connection.ex

View workflow job for this annotation

GitHub Actions / docs

documentation references type "RabbitMQStream.Message.Buffer.t()" but it is hidden or private

Check warning on line 514 in lib/connection/connection.ex

View workflow job for this annotation

GitHub Actions / docs

documentation references type "RabbitMQStream.Connection.Transport.t()" but it is hidden or private
options: connection_options,
socket: :gen_tcp.socket(),
state: :closed | :connecting | :open | :closing,
Expand All @@ -529,7 +529,8 @@
},
frames_buffer: RabbitMQStream.Message.Buffer.t(),
request_buffer: :queue.queue({term(), pid()}),
commands_buffer: :queue.queue({atom(), atom(), list({atom(), term()})}),
internal_buffer: :queue.queue({atom(), atom(), list({atom(), term()})}),
user_buffer: :queue.queue({atom(), atom(), list({atom(), term()})}),
# this should not be here. Should find a better way to return the close reason from the 'handler' module
close_reason: String.t() | atom() | nil,
transport: RabbitMQStream.Connection.Transport.t()
Expand All @@ -552,7 +553,8 @@
commands: %{},
request_buffer: :queue.new(),
frames_buffer: RabbitMQStream.Message.Buffer.init(),
commands_buffer: :queue.new(),
internal_buffer: :queue.new(),
user_buffer: :queue.new(),
close_reason: nil
]
end
23 changes: 13 additions & 10 deletions lib/connection/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule RabbitMQStream.Connection.Handler do
Logger.debug("Connection closed")

%{conn | state: :closing, close_reason: request.data.reason}
|> Helpers.push(:response, :close, correlation_id: request.correlation_id, code: :ok)
|> Helpers.push_internal(:response, :close, correlation_id: request.correlation_id, code: :ok)
end

def handle_message(%Connection{} = conn, %Request{command: :tune} = request) do
Expand All @@ -22,8 +22,8 @@ defmodule RabbitMQStream.Connection.Handler do
options = Keyword.merge(conn.options, frame_max: request.data.frame_max, heartbeat: request.data.heartbeat)

%{conn | options: options, state: :opening}
|> Helpers.push(:response, :tune, correlation_id: 0)
|> Helpers.push(:request, :open)
|> Helpers.push_internal(:response, :tune, correlation_id: 0)
|> Helpers.push_internal(:request, :open)
end

def handle_message(%Connection{} = conn, %Request{command: :heartbeat}) do
Expand All @@ -32,7 +32,7 @@ defmodule RabbitMQStream.Connection.Handler do

def handle_message(%Connection{} = conn, %Request{command: :metadata_update} = request) do
conn
|> Helpers.push(:request, :query_metadata, streams: [request.data.stream_name])
|> Helpers.push_internal(:request, :query_metadata, streams: [request.data.stream_name])
end

def handle_message(%Connection{} = conn, %Request{command: :deliver} = response) do
Expand Down Expand Up @@ -129,14 +129,14 @@ defmodule RabbitMQStream.Connection.Handler do
peer_properties = Map.put(response.data.peer_properties, "base-version", version)

%{conn | peer_properties: peer_properties}
|> Helpers.push(:request, :sasl_handshake)
|> Helpers.push_internal(:request, :sasl_handshake)
end

def handle_message(%Connection{} = conn, %Response{command: :sasl_handshake} = response) do
Logger.debug("SASL handshake successful. Initiating authentication.")

%{conn | mechanisms: response.data.mechanisms}
|> Helpers.push(:request, :sasl_authenticate)
|> Helpers.push_internal(:request, :sasl_authenticate)
end

def handle_message(%Connection{} = conn, %Response{command: :sasl_authenticate, data: %{sasl_opaque_data: ""}}) do
Expand All @@ -150,7 +150,7 @@ defmodule RabbitMQStream.Connection.Handler do
Logger.debug("Opening connection to vhost: \"#{conn.options[:vhost]}\"")

conn
|> Helpers.push(:request, :open)
|> Helpers.push_internal(:request, :open)
|> Map.put(:state, :opening)
end

Expand All @@ -162,7 +162,7 @@ defmodule RabbitMQStream.Connection.Handler do

%{conn | options: options}
|> Map.put(:state, :opening)
|> Helpers.push(:request, :open)
|> Helpers.push_internal(:request, :open)
end

# If the server has a version lower than 3.13, this is the 'terminating' response.
Expand Down Expand Up @@ -192,7 +192,7 @@ defmodule RabbitMQStream.Connection.Handler do
)

%{conn | connection_properties: response.data.connection_properties}
|> Helpers.push(:request, :exchange_command_versions)
|> Helpers.push_internal(:request, :exchange_command_versions)
end

def handle_message(%Connection{} = conn, %Response{command: :query_metadata} = response) do
Expand Down Expand Up @@ -310,7 +310,10 @@ defmodule RabbitMQStream.Connection.Handler do
conn
else
conn
|> Helpers.push(:response, :consumer_update, correlation_id: request.correlation_id, code: :internal_error)
|> Helpers.push_internal(:response, :consumer_update,
correlation_id: request.correlation_id,
code: :internal_error
)
end
end
end
12 changes: 9 additions & 3 deletions lib/connection/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@ defmodule RabbitMQStream.Connection.Helpers do
{entry, %{conn | request_tracker: request_tracker}}
end

def push(conn, action, command, opts \\ []) do
commands_buffer = :queue.in({action, command, opts}, conn.commands_buffer)
def push_user(conn, action, command, opts \\ []) do
user_buffer = :queue.in({action, command, opts}, conn.user_buffer)

%{conn | commands_buffer: commands_buffer}
%{conn | user_buffer: user_buffer}
end

def push_internal(conn, action, command, opts \\ []) do
internal_buffer = :queue.in({action, command, opts}, conn.internal_buffer)

%{conn | internal_buffer: internal_buffer}
end

defguard is_offset(offset)
Expand Down
48 changes: 36 additions & 12 deletions lib/connection/lifecycle.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ defmodule RabbitMQStream.Connection.Lifecycle do

conn =
%{conn | connect_requests: [from | conn.connect_requests]}
|> send_request(:peer_properties)
|> Helpers.push_internal(:request, :peer_properties)
|> flush_buffer(:internal)

{:noreply, conn}
else
Expand Down Expand Up @@ -229,7 +230,8 @@ defmodule RabbitMQStream.Connection.Lifecycle do
# command to the socket. This also would allow us to better test the 'handler' logic.
commands
|> Enum.reduce(conn, &Handler.handle_message(&2, &1))
|> flush_commands()
|> flush_buffer(:internal)
|> flush_buffer(:user)
|> handle_closing()
end

Expand All @@ -252,7 +254,10 @@ defmodule RabbitMQStream.Connection.Lifecycle do
def handle_info({:heartbeat}, conn) do
Process.send_after(self(), {:heartbeat}, conn.options[:heartbeat] * 1000)

conn = send_request(conn, :heartbeat, correlation_sum: 0)
conn =
conn
|> Helpers.push_internal(:request, :heartbeat, correlation_sum: 0)
|> flush_buffer(:internal)

{:noreply, conn}
end
Expand Down Expand Up @@ -292,7 +297,8 @@ defmodule RabbitMQStream.Connection.Lifecycle do

conn =
conn
|> send_request(:peer_properties)
|> Helpers.push_internal(:request, :peer_properties)
|> flush_buffer(:internal)

{:noreply, conn}
else
Expand Down Expand Up @@ -328,30 +334,48 @@ defmodule RabbitMQStream.Connection.Lifecycle do

defp handle_closing(conn), do: {:noreply, conn}

defp send_request(%Connection{} = conn, command, opts \\ []) do
defp send_request(%Connection{} = conn, command, opts) do
conn
|> Helpers.push(:request, command, opts)
|> flush_commands()
|> Helpers.push_user(:request, command, opts)
|> flush_buffer(:user)
end

defp send_response(%Connection{} = conn, command, opts) do
conn
|> Helpers.push(:response, command, opts)
|> flush_commands()
|> Helpers.push_user(:response, command, opts)
|> flush_buffer(:user)
end

defp flush_buffer(%Connection{} = conn, :internal) do
conn =
:queue.fold(
fn
command, conn ->
send_command(conn, command)
end,
conn,
conn.internal_buffer
)

%{conn | internal_buffer: :queue.new()}
end

defp flush_commands(%Connection{} = conn) do
defp flush_buffer(%Connection{state: :open} = conn, :user) do
conn =
:queue.fold(
fn
command, conn ->
send_command(conn, command)
end,
conn,
conn.commands_buffer
conn.user_buffer
)

%{conn | commands_buffer: :queue.new()}
%{conn | user_buffer: :queue.new()}
end

defp flush_buffer(%Connection{} = conn, :user) do
conn
end

defp send_command(%Connection{} = conn, {:request, command, opts}) do
Expand Down
8 changes: 8 additions & 0 deletions test/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,12 @@ defmodule RabbitMQStreamTest.Connection do
assert {:ok, _data} = SupervisedConnection.stream_stats(@stream)
assert {:error, :stream_does_not_exist} = SupervisedConnection.stream_stats("#{@stream}-NON-EXISTENT")
end

# I'm not really sure how to test this.
# @stream "consumer-test-stream-11"
# test "should buffer user commands before the connection is open" do
# {:ok, _conn} = SupervisedConnection.start_link(host: "localhost", vhost: "/")
# :ok = SupervisedConnection.connect()

# end
end
Loading