diff --git a/lib/message/encoder.ex b/lib/message/encoder.ex index 5258478..8fbe129 100644 --- a/lib/message/encoder.ex +++ b/lib/message/encoder.ex @@ -3,227 +3,117 @@ defmodule RabbitMQStream.Message.Encoder do alias RabbitMQStream.Message.{Response, Request, Frame} - alias RabbitMQStream.Message.Data.{ - TuneData, - CloseData, - CreateStreamData, - DeleteStreamData, - StoreOffsetData, - QueryOffsetData, - DeclarePublisherData, - DeletePublisherData, - QueryMetadataData, - QueryPublisherSequenceData, - PublishData, - SubscribeRequestData, - UnsubscribeRequestData, - CreditRequestData - } - - def encode!(%Request{command: :peer_properties} = request) do - properties = encode_map(request.data.peer_properties) - - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), - request.correlation_id::unsigned-integer-size(32), - properties::binary - >> + def encode!(command) do + payload = encode_payload!(command) - wrap(data) + wrap(command, payload) end - def encode!(%Request{command: :sasl_handshake} = request) do - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), - request.correlation_id::unsigned-integer-size(32) - >> + defp encode_payload!(%Request{command: :peer_properties, data: data}) do + properties = encode_map(data.peer_properties) - wrap(data) + <> end - def encode!(%Request{command: :sasl_authenticate} = request) do - mechanism = encode_string(request.data.mechanism) - - credentials = - encode_bytes("\u0000#{request.data.sasl_opaque_data[:username]}\u0000#{request.data.sasl_opaque_data[:password]}") - - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), - request.correlation_id::unsigned-integer-size(32), - mechanism::binary, - credentials::binary - >> - - wrap(data) + defp encode_payload!(%Request{command: :sasl_handshake}) do + <<>> end - def encode!(%Request{command: :open} = request) do - vhost = encode_string(request.data.vhost) + defp encode_payload!(%Request{command: :sasl_authenticate, data: data}) do + mechanism = encode_string(data.mechanism) - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), - request.correlation_id::unsigned-integer-size(32), - vhost::binary - >> + credentials = + encode_bytes("\u0000#{data.sasl_opaque_data[:username]}\u0000#{data.sasl_opaque_data[:password]}") - wrap(data) + <> end - def encode!(%Request{command: :heartbeat} = request) do - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16) - >> + defp encode_payload!(%Request{command: :open, data: data}) do + vhost = encode_string(data.vhost) - wrap(data) + <> end - def encode!(%Request{command: :tune, data: %TuneData{} = data} = request) do - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), - data.frame_max::unsigned-integer-size(32), - data.heartbeat::unsigned-integer-size(32) - >> + defp encode_payload!(%Request{command: :heartbeat}) do + <<>> + end - wrap(data) + defp encode_payload!(%Request{command: :tune, data: data}) do + <> end - def encode!(%Request{command: :close, data: %CloseData{} = data} = request) do + defp encode_payload!(%Request{command: :close, data: data}) do reason = encode_string(data.reason) - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), - request.correlation_id::unsigned-integer-size(32), - data.code::unsigned-integer-size(16), - reason::binary - >> - - wrap(data) + <> end - def encode!(%Request{command: :create_stream, data: %CreateStreamData{} = data} = request) do + defp encode_payload!(%Request{command: :create_stream, data: data}) do stream_name = encode_string(data.stream_name) arguments = encode_map(data.arguments) - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), - request.correlation_id::unsigned-integer-size(32), - stream_name::binary, - arguments::binary - >> - - wrap(data) + <> end - def encode!(%Request{command: :delete_stream, data: %DeleteStreamData{} = data} = request) do + defp encode_payload!(%Request{command: :delete_stream, data: data}) do stream_name = encode_string(data.stream_name) - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), - request.correlation_id::unsigned-integer-size(32), - stream_name::binary - >> - - wrap(data) + <> end - def encode!(%Request{command: :store_offset, data: %StoreOffsetData{} = data} = request) do + defp encode_payload!(%Request{command: :store_offset, data: data}) do offset_reference = encode_string(data.offset_reference) stream_name = encode_string(data.stream_name) - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), + << offset_reference::binary, stream_name::binary, data.offset::unsigned-integer-size(64) >> - - wrap(data) end - def encode!(%Request{command: :query_offset, data: %QueryOffsetData{} = data} = request) do + defp encode_payload!(%Request{command: :query_offset, data: data}) do offset_reference = encode_string(data.offset_reference) stream_name = encode_string(data.stream_name) - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), - request.correlation_id::unsigned-integer-size(32), + << offset_reference::binary, stream_name::binary >> - - wrap(data) end - def encode!(%Request{command: :declare_publisher, data: %DeclarePublisherData{} = data} = request) do + defp encode_payload!(%Request{command: :declare_publisher, data: data}) do publisher_reference = encode_string(data.publisher_reference) stream_name = encode_string(data.stream_name) - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), - request.correlation_id::unsigned-integer-size(32), + << data.id::unsigned-integer-size(8), publisher_reference::binary, stream_name::binary >> - - wrap(data) end - def encode!(%Request{command: :delete_publisher, data: %DeletePublisherData{} = data} = request) do - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), - request.correlation_id::unsigned-integer-size(32), - data.publisher_id::unsigned-integer-size(8) - >> - - wrap(data) + defp encode_payload!(%Request{command: :delete_publisher, data: data}) do + <> end - def encode!(%Request{command: :query_metadata, data: %QueryMetadataData{} = data} = request) do + defp encode_payload!(%Request{command: :query_metadata, data: data}) do streams = data.streams |> Enum.map(&encode_string/1) |> encode_array() - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), - request.correlation_id::unsigned-integer-size(32), - streams::binary - >> - - wrap(data) + <> end - def encode!(%Request{command: :query_publisher_sequence, data: %QueryPublisherSequenceData{} = data} = request) do + defp encode_payload!(%Request{command: :query_publisher_sequence, data: data}) do publisher_reference = encode_string(data.publisher_reference) stream_name = encode_string(data.stream_name) - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), - request.correlation_id::unsigned-integer-size(32), - publisher_reference::binary, - stream_name::binary - >> - - wrap(data) + <> end - def encode!(%Request{command: :publish, data: %PublishData{} = data} = request) do + defp encode_payload!(%Request{command: :publish, data: data}) do messages = encode_array( for {publishing_id, message} <- data.published_messages do @@ -231,17 +121,10 @@ defmodule RabbitMQStream.Message.Encoder do end ) - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), - data.publisher_id::unsigned-integer-size(8), - messages::binary - >> - - wrap(data) + <> end - def encode!(%Request{command: :subscribe, data: %SubscribeRequestData{} = data} = request) do + defp encode_payload!(%Request{command: :subscribe, data: data}) do stream_name = encode_string(data.stream_name) offset = @@ -255,68 +138,90 @@ defmodule RabbitMQStream.Message.Encoder do properties = encode_map(data.properties) - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), - request.correlation_id::unsigned-integer-size(32), + << data.subscription_id::unsigned-integer-size(8), stream_name::binary, offset::binary, data.credit::unsigned-integer-size(16), properties::binary >> - - wrap(data) end - def encode!(%Request{command: :unsubscribe, data: %UnsubscribeRequestData{} = data} = request) do - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), - request.correlation_id::unsigned-integer-size(32), - data.subscription_id::unsigned-integer-size(8) - >> - - wrap(data) + defp encode_payload!(%Request{command: :unsubscribe, data: data}) do + <> end - def encode!(%Request{command: :credit, data: %CreditRequestData{} = data} = request) do - data = << - Frame.command_to_code(request.command)::unsigned-integer-size(16), - request.version::unsigned-integer-size(16), + defp encode_payload!(%Request{command: :credit, data: data}) do + << data.subscription_id::unsigned-integer-size(8), data.credit::unsigned-integer-size(16) >> - - wrap(data) end - def encode!(%Response{command: :tune, data: %TuneData{} = data} = response) do - data = << - 0b1::1, - Frame.command_to_code(response.command)::unsigned-integer-size(15), - response.version::unsigned-integer-size(16), + defp encode_payload!(%Response{command: :tune, data: data}) do + << data.frame_max::unsigned-integer-size(32), data.heartbeat::unsigned-integer-size(32) >> + end - <> + defp encode_payload!(%Response{command: :close, code: code}) do + << + Frame.atom_to_response_code(code)::unsigned-integer-size(16) + >> end - def encode!(%Response{command: :close} = response) do - data = << - 0b1::1, - Frame.command_to_code(response.command)::unsigned-integer-size(15), - response.version::unsigned-integer-size(16), - response.correlation_id::unsigned-integer-size(32), - Frame.atom_to_response_code(response.code)::unsigned-integer-size(16) + defp wrap(request, payload) do + header = bake_header(request) + + buffer = <> + + <> + end + + defp bake_header(%Request{command: command, version: version, correlation_id: correlation_id}) + when command in [ + :peer_properties, + :sasl_handshake, + :sasl_authenticate, + :open, + :tune, + :close, + :create_stream, + :delete_stream, + :query_offset, + :declare_publisher, + :delete_publisher, + :query_metadata, + :query_publisher_sequence, + :subscribe, + :unsubscribe + ] do + << + Frame.command_to_code(command)::unsigned-integer-size(16), + version::unsigned-integer-size(16), + correlation_id::unsigned-integer-size(32) >> + end + + defp bake_header(%Request{command: command, version: version}) + when command in [:heartbeat, :store_offset, :publish, :credit] do + <> + end - wrap(data) + defp bake_header(%Response{command: command, version: version, correlation_id: correlation_id}) + when command in [:close] do + << + 0b1::1, + Frame.command_to_code(command)::unsigned-integer-size(15), + version::unsigned-integer-size(16), + correlation_id::unsigned-integer-size(32) + >> end - defp wrap(payload) do - <> + defp bake_header(%Response{command: command, version: version}) + when command in [:tune] do + <<0b1::1, Frame.command_to_code(command)::unsigned-integer-size(15), version::unsigned-integer-size(16)>> end defp encode_string(value) when is_atom(value) do