diff --git a/guides/concepts/producing.md b/guides/concepts/producing.md new file mode 100644 index 0000000..d7c47e3 --- /dev/null +++ b/guides/concepts/producing.md @@ -0,0 +1,5 @@ +# Producing + +## Message De-duplication + +## Filter Value diff --git a/guides/concepts/single-active-consumer.md b/guides/concepts/single-active-consumer.md deleted file mode 100644 index e69de29..0000000 diff --git a/guides/concepts/streams.md b/guides/concepts/streams.md deleted file mode 100644 index e69de29..0000000 diff --git a/guides/concepts/super-streams.md b/guides/concepts/super-streams.md index e69de29..84a63dc 100644 --- a/guides/concepts/super-streams.md +++ b/guides/concepts/super-streams.md @@ -0,0 +1,9 @@ +# Overview + +## Streams + +## Single Active Consumer + +### Upgrade + +## Super Streams diff --git a/guides/setup/configuration.md b/guides/setup/configuration.md index e394fec..2136c2a 100644 --- a/guides/setup/configuration.md +++ b/guides/setup/configuration.md @@ -92,3 +92,9 @@ end ``` For more information, you can check the documentation at each actor's module; + +## Global Options + +### Serializer + +You can define a Serializer module to be used by the Publihser and Consumer modules. It is expected to implement `encode!/1` and `decode!/1` callbacks, and must be defined at compile-time level configurations. diff --git a/lib/connection/connection.ex b/lib/connection/connection.ex index 99633fb..9a18244 100644 --- a/lib/connection/connection.ex +++ b/lib/connection/connection.ex @@ -125,7 +125,7 @@ defmodule RabbitMQStream.Connection do end def create_stream(name, arguments \\ []) when is_binary(name) do - GenServer.call(__MODULE__, {:create_stream, arguments ++ [name: name]}) + GenServer.call(__MODULE__, {:create_stream, [name: name, arguments: arguments]}) end def delete_stream(name) when is_binary(name) do diff --git a/lib/connection/lifecycle.ex b/lib/connection/lifecycle.ex index 3cdc4f5..45019e4 100644 --- a/lib/connection/lifecycle.ex +++ b/lib/connection/lifecycle.ex @@ -80,12 +80,10 @@ defmodule RabbitMQStream.Connection.Lifecycle do end def handle_call({:subscribe, opts}, from, %Connection{} = conn) do - subscription_id = conn.subscriber_sequence - conn = conn - |> Helpers.push_request_tracker(:subscribe, from, {subscription_id, opts[:pid]}) - |> send_request(:subscribe, opts ++ [subscriber_sum: 1, subscription_id: subscription_id]) + |> Helpers.push_request_tracker(:subscribe, from, {conn.subscriber_sequence, opts[:pid]}) + |> send_request(:subscribe, opts ++ [subscriber_sum: 1, subscription_id: conn.subscriber_sequence]) {:noreply, conn} end diff --git a/lib/consumer/consumer.ex b/lib/consumer/consumer.ex index 4be7beb..f158a09 100644 --- a/lib/consumer/consumer.ex +++ b/lib/consumer/consumer.ex @@ -26,6 +26,7 @@ defmodule RabbitMQStream.Consumer do * `:initial_credit` - The initial credit to request from the server. Defaults to `50_000`. * `:offset_tracking` - Offset tracking strategies to use. Defaults to `[count: [store_after: 50]]`. * `:flow_control` - Flow control strategy to use. Defaults to `[count: [credit_after: {:count, 1}]]`. + * `:offset_reference` - * `:private` - Private data that can hold any value, and is passed to the `handle_chunk/2` callback. * `:serializer` - The module to use to decode the message. Defaults to `__MODULE__`, which means that the consumer will use the `decode!/1` callback to decode the message, which is implemented by default to return the message as is. @@ -122,18 +123,22 @@ defmodule RabbitMQStream.Consumer do |> Keyword.merge(Application.get_env(:rabbitmq_stream, __MODULE__, [])) |> Keyword.merge(@opts) |> Keyword.merge(opts) - # |> Keyword.validate!([:connection, :stream_name, :initial_offset]) |> Keyword.put_new(:initial_credit, 50_000) |> Keyword.put_new(:offset_tracking, count: [store_after: 50]) |> Keyword.put_new(:flow_control, count: [credit_after: {:count, 1}]) |> Keyword.put_new(:offset_reference, Atom.to_string(__MODULE__)) |> Keyword.put_new(:serializer, __MODULE__) |> Keyword.put_new(:properties, []) - |> Keyword.put(:consumer_module, __MODULE__) + # Undocumented option. + |> Keyword.put_new(:consumer_module, __MODULE__) GenServer.start_link(RabbitMQStream.Consumer.LifeCycle, opts, name: __MODULE__) end + def child_spec(opts) do + %{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}} + end + def credit(amount) do GenServer.cast(__MODULE__, {:credit, amount}) end diff --git a/lib/message/data/data.ex b/lib/message/data/data.ex index a2a4d84..20f3fec 100644 --- a/lib/message/data/data.ex +++ b/lib/message/data/data.ex @@ -107,7 +107,7 @@ defmodule RabbitMQStream.Message.Data do end) data = %Types.QueryMetadataResponseData.StreamData{ - code: code, + code: decode_code(code), name: name, leader: leader, replicas: replicas diff --git a/lib/message/data/types.ex b/lib/message/data/types.ex index 50b5fc1..7748fbc 100644 --- a/lib/message/data/types.ex +++ b/lib/message/data/types.ex @@ -211,7 +211,7 @@ defmodule RabbitMQStream.Message.Types do @moduledoc false @enforce_keys [:code, :name, :leader, :replicas] @type t :: %{ - code: String.t(), + code: RabbitMQStream.Message.Helpers.code(), name: String.t(), leader: non_neg_integer(), replicas: [non_neg_integer()] diff --git a/lib/publisher/lifecycle.ex b/lib/publisher/lifecycle.ex index e496bc2..637108e 100644 --- a/lib/publisher/lifecycle.ex +++ b/lib/publisher/lifecycle.ex @@ -9,16 +9,13 @@ defmodule RabbitMQStream.Publisher.Lifecycle do connection = Keyword.get(opts, :connection) || raise(":connection is required") stream_name = Keyword.get(opts, :stream_name) || raise(":stream_name is required") - # An implemented `encode!/1` callback takes precedence over the serializer option - serializer = Keyword.get(opts, :serializer, opts[:publisher_module]) - state = %RabbitMQStream.Publisher{ id: nil, sequence: nil, stream_name: stream_name, connection: connection, reference_name: reference_name, - serializer: serializer, + serializer: opts[:serializer], publisher_module: opts[:publisher_module] } diff --git a/lib/publisher/publisher.ex b/lib/publisher/publisher.ex index f2cd0d1..c1ffa28 100644 --- a/lib/publisher/publisher.ex +++ b/lib/publisher/publisher.ex @@ -88,12 +88,21 @@ defmodule RabbitMQStream.Publisher do """ defmacro __using__(opts) do - quote bind_quoted: [opts: opts], location: :keep do - @opts opts + defaults = Application.get_env(:rabbitmq_stream, :defaults, []) + # defaults = Application.compile_env(:rabbitmq_stream, :defaults, []) + + serializer = Keyword.get(opts, :serializer, Keyword.get(defaults, :serializer)) + + quote location: :keep do + @opts unquote(opts) @behaviour RabbitMQStream.Publisher def start_link(opts \\ []) do + unless !Keyword.has_key?(opts, :serializer) do + raise "You can only pass `:serializer` option to compile-time options." + end + opts = Application.get_env(:rabbitmq_stream, :defaults, []) |> Keyword.get(:publisher, []) @@ -122,12 +131,19 @@ defmodule RabbitMQStream.Publisher do def before_start(_opts, state), do: state def filter_value(_), do: nil - # If there is a global serializer defined, we use it - if serializer = Application.compile_env(:rabbitmq_stream, [:defaults, :serializer]) do - def encode!(message), do: unquote(serializer).encode!(message) - else - def encode!(message), do: message - end + unquote( + # We need this piece of logic so we can garantee that the 'encode!/1' call is executed + # by the caller process, not the Publisher process itself. + if serializer != nil do + quote do + def encode!(message), do: unquote(serializer).encode!(message) + end + else + quote do + def encode!(message), do: message + end + end + ) defoverridable RabbitMQStream.Publisher end diff --git a/lib/super_consumer/manager.ex b/lib/super_consumer/manager.ex new file mode 100644 index 0000000..01f5d18 --- /dev/null +++ b/lib/super_consumer/manager.ex @@ -0,0 +1,64 @@ +defmodule RabbitMQStream.SuperConsumer.Manager do + defmodule PartitionConsumer do + use RabbitMQStream.Consumer, + initial_offset: :last + + def handle_update(_, true) do + {:ok, :last} + end + end + + alias RabbitMQStream.SuperConsumer + + use GenServer + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) + end + + @impl true + def init(opts \\ []) do + state = struct(SuperConsumer, opts) + + {:ok, state, {:continue, :start}} + end + + @impl true + def handle_continue(:start, %SuperConsumer{} = state) do + # If stream exists, fetch its paritions information + {:ok, data} = + state.partitions + |> Enum.map(&"#{state.super_stream}-#{&1}") + |> state.connection.query_metadata() + + # We create all the missing streams + for %{code: :stream_does_not_exist, name: name} <- data.streams do + :ok = state.connection.create_stream(name) + end + + dbg(state.partitions) + + for partition <- state.partitions do + dbg(partition) + # We want to start each child, but don't really care about its state + {:ok, _pid} = + DynamicSupervisor.start_child( + state.dynamic_supervisor, + { + PartitionConsumer, + Keyword.merge(state.consumer_opts, + name: partition, + connection: state.connection, + stream_name: "#{state.super_stream}-#{partition}", + properties: [ + single_active_consumer: true, + super_stream: state.super_stream + ] + ) + } + ) + end + + {:noreply, state} + end +end diff --git a/lib/super_consumer/super_consumer.ex b/lib/super_consumer/super_consumer.ex new file mode 100644 index 0000000..59fba9e --- /dev/null +++ b/lib/super_consumer/super_consumer.ex @@ -0,0 +1,53 @@ +defmodule RabbitMQStream.SuperConsumer do + defmacro __using__(opts) do + quote do + @opts unquote(opts) + @behaviour RabbitMQStream.SuperConsumer + + use Supervisor + + def start_link(opts) do + Supervisor.start_link(__MODULE__, opts, name: __MODULE__) + end + + @impl true + def init(opts) do + children = [ + {DynamicSupervisor, strategy: :one_for_one, name: __MODULE__.DynamicSupervisor}, + {RabbitMQStream.SuperConsumer.Manager, + opts ++ [name: __MODULE__.Manager, dynamic_supervisor: __MODULE__.DynamicSupervisor]} + ] + + # We use `one_for_all` because if the DynamicSupervisor shuts down for some reason, we must be able to + # re-build all the children from the Manager + Supervisor.init(children, strategy: :one_for_all) + end + end + end + + @optional_callbacks handle_chunk: 1, handle_chunk: 2 + @callback handle_chunk(chunk :: RabbitMQStream.OsirisChunk.t()) :: term() + @callback handle_chunk(chunk :: RabbitMQStream.OsirisChunk.t(), state :: RabbitMQStream.Consumer.t()) :: term() + + defstruct [ + :super_stream, + :partitions, + :connection, + :consumer_opts, + :dynamic_supervisor + ] + + @type t :: %__MODULE__{ + super_stream: String.t(), + partitions: [String.t()], + connection: module(), + dynamic_supervisor: module(), + consumer_opts: [RabbitMQStream.Consumer.consumer_option()] | nil + } + + @type super_consumer_option :: + {:super_stream, String.t()} + | {:partitions, [String.t()]} + | {:connection, module()} + | {:consumer_opts, [RabbitMQStream.Consumer.consumer_option()]} +end diff --git a/test/connection_test.exs b/test/connection_test.exs index 987d758..b12303e 100644 --- a/test/connection_test.exs +++ b/test/connection_test.exs @@ -161,7 +161,8 @@ defmodule RabbitMQStreamTest.Connection do :ok = SupervisedConnection.connect() SupervisedConnection.delete_stream(@stream) :ok = SupervisedConnection.create_stream(@stream) - {:ok, _data} = SupervisedConnection.stream_stats(@stream) + assert {:ok, _data} = SupervisedConnection.stream_stats(@stream) + assert {:error, :stream_does_not_exist} = SupervisedConnection.stream_stats("#{@stream}-NON-EXISTENT") end @tag min_version: "3.13" diff --git a/test/consumer/consumer_test.exs b/test/consumer_test.exs similarity index 88% rename from test/consumer/consumer_test.exs rename to test/consumer_test.exs index 8ff1a9e..9cee4e0 100644 --- a/test/consumer/consumer_test.exs +++ b/test/consumer_test.exs @@ -19,6 +19,19 @@ defmodule RabbitMQStreamTest.Consumer do end end + defmodule SupervisorPublisher2 do + use RabbitMQStream.Publisher, + connection: SupervisedConnection, + serializer: Jason + + @impl true + def before_start(_opts, state) do + state.connection.create_stream(state.stream_name) + + state + end + end + defmodule Consumer do use RabbitMQStream.Consumer, connection: SupervisedConnection, @@ -93,7 +106,7 @@ defmodule RabbitMQStreamTest.Consumer do SupervisedConnection.delete_stream(@stream) {:ok, _publisher} = - SupervisorPublisher.start_link(reference_name: @reference_name, stream_name: @stream, serializer: Jason) + SupervisorPublisher2.start_link(reference_name: @reference_name, stream_name: @stream) {:ok, _subscriber} = Consumer.start_link( @@ -106,10 +119,10 @@ defmodule RabbitMQStreamTest.Consumer do message1 = %{"message" => "Consumer Test: 1"} message2 = %{"message" => "Consumer Test: 2"} - SupervisorPublisher.publish(message1) + SupervisorPublisher2.publish(message1) assert_receive {:handle_chunk, [^message1]}, 500 - SupervisorPublisher.publish(message2) + SupervisorPublisher2.publish(message2) assert_receive {:handle_chunk, [^message2]}, 500 :ok = GenServer.stop(Consumer, :normal) diff --git a/test/consumer/filter_value_consumer.exs b/test/filter_value_consumer.exs similarity index 96% rename from test/consumer/filter_value_consumer.exs rename to test/filter_value_consumer.exs index 2963a09..4c10166 100644 --- a/test/consumer/filter_value_consumer.exs +++ b/test/filter_value_consumer.exs @@ -68,7 +68,8 @@ defmodule RabbitMQStreamTest.Consumer.FilterValue do end end - test "should always have exactly 1 active consumer" do + @tag min_version: "3.13" + test "should receive only the filtered messages" do {:ok, _} = Conn1.start_link() :ok = Conn1.connect() Conn1.delete_stream("filter-value-01") diff --git a/test/consumer/single_active_consumer.exs b/test/single_active_consumer.exs similarity index 98% rename from test/consumer/single_active_consumer.exs rename to test/single_active_consumer.exs index 156dc60..3ce3ac5 100644 --- a/test/consumer/single_active_consumer.exs +++ b/test/single_active_consumer.exs @@ -103,7 +103,7 @@ defmodule RabbitMQStreamTest.Consumer.SingleActiveConsumer do :ok = Conn3.connect() :ok = Conn4.connect() - :ok = Conn4.delete_stream("super-stream-test-01") + Conn4.delete_stream("super-stream-test-01") {:ok, _} = Publisher.start_link(stream_name: "super-stream-test-01") diff --git a/test/super_stream_test.exs b/test/super_stream_test.exs new file mode 100644 index 0000000..990cd19 --- /dev/null +++ b/test/super_stream_test.exs @@ -0,0 +1,41 @@ +defmodule RabbitMQStreamTest.SuperStream do + use ExUnit.Case, async: false + alias RabbitMQStream.OsirisChunk + require Logger + + defmodule SupervisedConnection do + use RabbitMQStream.Connection + end + + defmodule SuperConsumer do + use RabbitMQStream.SuperConsumer, + connection: SupervisedConnection + + @impl true + def handle_chunk(%OsirisChunk{data_entries: entries}, %{private: parent}) do + send(parent, {:handle_chunk, entries}) + + :ok + end + end + + setup do + {:ok, _conn} = SupervisedConnection.start_link(host: "localhost", vhost: "/") + :ok = SupervisedConnection.connect() + + :ok + end + + @stream "super-streams-01" + test "should create super streams" do + {:ok, _} = + SuperConsumer.start_link( + super_stream: @stream, + partitions: ["01", "02", "03"], + connection: SupervisedConnection, + consumer_opts: [] + ) + + Process.sleep(1_000) + end +end