Skip to content

Commit

Permalink
wip: SuperStreams
Browse files Browse the repository at this point in the history
  • Loading branch information
VictorGaiva committed Jan 14, 2024
1 parent 691dc96 commit aba784d
Show file tree
Hide file tree
Showing 19 changed files with 236 additions and 27 deletions.
5 changes: 5 additions & 0 deletions guides/concepts/producing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Producing

## Message De-duplication

## Filter Value
Empty file.
Empty file removed guides/concepts/streams.md
Empty file.
9 changes: 9 additions & 0 deletions guides/concepts/super-streams.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Overview

## Streams

## Single Active Consumer

### Upgrade

## Super Streams
6 changes: 6 additions & 0 deletions guides/setup/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,9 @@ end
```

For more information, you can check the documentation at each actor's module;

## Global Options

### Serializer

You can define a Serializer module to be used by the Publihser and Consumer modules. It is expected to implement `encode!/1` and `decode!/1` callbacks, and must be defined at compile-time level configurations.
2 changes: 1 addition & 1 deletion lib/connection/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ defmodule RabbitMQStream.Connection do
end

def create_stream(name, arguments \\ []) when is_binary(name) do
GenServer.call(__MODULE__, {:create_stream, arguments ++ [name: name]})
GenServer.call(__MODULE__, {:create_stream, [name: name, arguments: arguments]})
end

def delete_stream(name) when is_binary(name) do
Expand Down
6 changes: 2 additions & 4 deletions lib/connection/lifecycle.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,10 @@ defmodule RabbitMQStream.Connection.Lifecycle do
end

def handle_call({:subscribe, opts}, from, %Connection{} = conn) do
subscription_id = conn.subscriber_sequence

conn =
conn
|> Helpers.push_request_tracker(:subscribe, from, {subscription_id, opts[:pid]})
|> send_request(:subscribe, opts ++ [subscriber_sum: 1, subscription_id: subscription_id])
|> Helpers.push_request_tracker(:subscribe, from, {conn.subscriber_sequence, opts[:pid]})
|> send_request(:subscribe, opts ++ [subscriber_sum: 1, subscription_id: conn.subscriber_sequence])

{:noreply, conn}
end
Expand Down
9 changes: 7 additions & 2 deletions lib/consumer/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ defmodule RabbitMQStream.Consumer do
* `:initial_credit` - The initial credit to request from the server. Defaults to `50_000`.
* `:offset_tracking` - Offset tracking strategies to use. Defaults to `[count: [store_after: 50]]`.
* `:flow_control` - Flow control strategy to use. Defaults to `[count: [credit_after: {:count, 1}]]`.
* `:offset_reference` -
* `:private` - Private data that can hold any value, and is passed to the `handle_chunk/2` callback.
* `:serializer` - The module to use to decode the message. Defaults to `__MODULE__`,
which means that the consumer will use the `decode!/1` callback to decode the message, which is implemented by default to return the message as is.
Expand Down Expand Up @@ -122,18 +123,22 @@ defmodule RabbitMQStream.Consumer do
|> Keyword.merge(Application.get_env(:rabbitmq_stream, __MODULE__, []))
|> Keyword.merge(@opts)
|> Keyword.merge(opts)
# |> Keyword.validate!([:connection, :stream_name, :initial_offset])
|> Keyword.put_new(:initial_credit, 50_000)
|> Keyword.put_new(:offset_tracking, count: [store_after: 50])
|> Keyword.put_new(:flow_control, count: [credit_after: {:count, 1}])
|> Keyword.put_new(:offset_reference, Atom.to_string(__MODULE__))
|> Keyword.put_new(:serializer, __MODULE__)
|> Keyword.put_new(:properties, [])
|> Keyword.put(:consumer_module, __MODULE__)
# Undocumented option.
|> Keyword.put_new(:consumer_module, __MODULE__)

GenServer.start_link(RabbitMQStream.Consumer.LifeCycle, opts, name: __MODULE__)
end

def child_spec(opts) do
%{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}}
end

def credit(amount) do
GenServer.cast(__MODULE__, {:credit, amount})
end
Expand Down
2 changes: 1 addition & 1 deletion lib/message/data/data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ defmodule RabbitMQStream.Message.Data do
end)

data = %Types.QueryMetadataResponseData.StreamData{
code: code,
code: decode_code(code),
name: name,
leader: leader,
replicas: replicas
Expand Down
2 changes: 1 addition & 1 deletion lib/message/data/types.ex
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ defmodule RabbitMQStream.Message.Types do
@moduledoc false
@enforce_keys [:code, :name, :leader, :replicas]
@type t :: %{
code: String.t(),
code: RabbitMQStream.Message.Helpers.code(),
name: String.t(),
leader: non_neg_integer(),
replicas: [non_neg_integer()]
Expand Down
5 changes: 1 addition & 4 deletions lib/publisher/lifecycle.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,13 @@ defmodule RabbitMQStream.Publisher.Lifecycle do
connection = Keyword.get(opts, :connection) || raise(":connection is required")
stream_name = Keyword.get(opts, :stream_name) || raise(":stream_name is required")

# An implemented `encode!/1` callback takes precedence over the serializer option
serializer = Keyword.get(opts, :serializer, opts[:publisher_module])

state = %RabbitMQStream.Publisher{
id: nil,
sequence: nil,
stream_name: stream_name,
connection: connection,
reference_name: reference_name,
serializer: serializer,
serializer: opts[:serializer],
publisher_module: opts[:publisher_module]
}

Expand Down
32 changes: 24 additions & 8 deletions lib/publisher/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,21 @@ defmodule RabbitMQStream.Publisher do
"""

defmacro __using__(opts) do
quote bind_quoted: [opts: opts], location: :keep do
@opts opts
defaults = Application.get_env(:rabbitmq_stream, :defaults, [])
# defaults = Application.compile_env(:rabbitmq_stream, :defaults, [])

serializer = Keyword.get(opts, :serializer, Keyword.get(defaults, :serializer))

quote location: :keep do
@opts unquote(opts)

@behaviour RabbitMQStream.Publisher

def start_link(opts \\ []) do
unless !Keyword.has_key?(opts, :serializer) do
raise "You can only pass `:serializer` option to compile-time options."
end

opts =
Application.get_env(:rabbitmq_stream, :defaults, [])
|> Keyword.get(:publisher, [])
Expand Down Expand Up @@ -122,12 +131,19 @@ defmodule RabbitMQStream.Publisher do
def before_start(_opts, state), do: state
def filter_value(_), do: nil

# If there is a global serializer defined, we use it
if serializer = Application.compile_env(:rabbitmq_stream, [:defaults, :serializer]) do
def encode!(message), do: unquote(serializer).encode!(message)
else
def encode!(message), do: message
end
unquote(
# We need this piece of logic so we can garantee that the 'encode!/1' call is executed
# by the caller process, not the Publisher process itself.
if serializer != nil do
quote do
def encode!(message), do: unquote(serializer).encode!(message)
end
else
quote do
def encode!(message), do: message
end
end
)

defoverridable RabbitMQStream.Publisher
end
Expand Down
64 changes: 64 additions & 0 deletions lib/super_consumer/manager.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
defmodule RabbitMQStream.SuperConsumer.Manager do
defmodule PartitionConsumer do
use RabbitMQStream.Consumer,
initial_offset: :last

def handle_update(_, true) do
{:ok, :last}
end
end

alias RabbitMQStream.SuperConsumer

use GenServer

def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end

@impl true
def init(opts \\ []) do
state = struct(SuperConsumer, opts)

{:ok, state, {:continue, :start}}
end

@impl true
def handle_continue(:start, %SuperConsumer{} = state) do
# If stream exists, fetch its paritions information
{:ok, data} =
state.partitions
|> Enum.map(&"#{state.super_stream}-#{&1}")
|> state.connection.query_metadata()

# We create all the missing streams
for %{code: :stream_does_not_exist, name: name} <- data.streams do
:ok = state.connection.create_stream(name)
end

dbg(state.partitions)

for partition <- state.partitions do
dbg(partition)
# We want to start each child, but don't really care about its state
{:ok, _pid} =
DynamicSupervisor.start_child(
state.dynamic_supervisor,
{
PartitionConsumer,
Keyword.merge(state.consumer_opts,
name: partition,
connection: state.connection,
stream_name: "#{state.super_stream}-#{partition}",
properties: [
single_active_consumer: true,
super_stream: state.super_stream
]
)
}
)
end

{:noreply, state}
end
end
53 changes: 53 additions & 0 deletions lib/super_consumer/super_consumer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
defmodule RabbitMQStream.SuperConsumer do
defmacro __using__(opts) do
quote do
@opts unquote(opts)
@behaviour RabbitMQStream.SuperConsumer

use Supervisor

def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end

@impl true
def init(opts) do
children = [
{DynamicSupervisor, strategy: :one_for_one, name: __MODULE__.DynamicSupervisor},
{RabbitMQStream.SuperConsumer.Manager,
opts ++ [name: __MODULE__.Manager, dynamic_supervisor: __MODULE__.DynamicSupervisor]}
]

# We use `one_for_all` because if the DynamicSupervisor shuts down for some reason, we must be able to
# re-build all the children from the Manager
Supervisor.init(children, strategy: :one_for_all)
end
end
end

@optional_callbacks handle_chunk: 1, handle_chunk: 2
@callback handle_chunk(chunk :: RabbitMQStream.OsirisChunk.t()) :: term()
@callback handle_chunk(chunk :: RabbitMQStream.OsirisChunk.t(), state :: RabbitMQStream.Consumer.t()) :: term()

defstruct [
:super_stream,
:partitions,
:connection,
:consumer_opts,
:dynamic_supervisor
]

@type t :: %__MODULE__{
super_stream: String.t(),
partitions: [String.t()],
connection: module(),
dynamic_supervisor: module(),
consumer_opts: [RabbitMQStream.Consumer.consumer_option()] | nil
}

@type super_consumer_option ::
{:super_stream, String.t()}
| {:partitions, [String.t()]}
| {:connection, module()}
| {:consumer_opts, [RabbitMQStream.Consumer.consumer_option()]}
end
3 changes: 2 additions & 1 deletion test/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ defmodule RabbitMQStreamTest.Connection do
:ok = SupervisedConnection.connect()
SupervisedConnection.delete_stream(@stream)
:ok = SupervisedConnection.create_stream(@stream)
{:ok, _data} = SupervisedConnection.stream_stats(@stream)
assert {:ok, _data} = SupervisedConnection.stream_stats(@stream)
assert {:error, :stream_does_not_exist} = SupervisedConnection.stream_stats("#{@stream}-NON-EXISTENT")
end

@tag min_version: "3.13"
Expand Down
19 changes: 16 additions & 3 deletions test/consumer/consumer_test.exs → test/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ defmodule RabbitMQStreamTest.Consumer do
end
end

defmodule SupervisorPublisher2 do
use RabbitMQStream.Publisher,
connection: SupervisedConnection,
serializer: Jason

@impl true
def before_start(_opts, state) do
state.connection.create_stream(state.stream_name)

state
end
end

defmodule Consumer do
use RabbitMQStream.Consumer,
connection: SupervisedConnection,
Expand Down Expand Up @@ -93,7 +106,7 @@ defmodule RabbitMQStreamTest.Consumer do
SupervisedConnection.delete_stream(@stream)

{:ok, _publisher} =
SupervisorPublisher.start_link(reference_name: @reference_name, stream_name: @stream, serializer: Jason)
SupervisorPublisher2.start_link(reference_name: @reference_name, stream_name: @stream)

{:ok, _subscriber} =
Consumer.start_link(
Expand All @@ -106,10 +119,10 @@ defmodule RabbitMQStreamTest.Consumer do
message1 = %{"message" => "Consumer Test: 1"}
message2 = %{"message" => "Consumer Test: 2"}

SupervisorPublisher.publish(message1)
SupervisorPublisher2.publish(message1)
assert_receive {:handle_chunk, [^message1]}, 500

SupervisorPublisher.publish(message2)
SupervisorPublisher2.publish(message2)
assert_receive {:handle_chunk, [^message2]}, 500

:ok = GenServer.stop(Consumer, :normal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ defmodule RabbitMQStreamTest.Consumer.FilterValue do
end
end

test "should always have exactly 1 active consumer" do
@tag min_version: "3.13"
test "should receive only the filtered messages" do
{:ok, _} = Conn1.start_link()
:ok = Conn1.connect()
Conn1.delete_stream("filter-value-01")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ defmodule RabbitMQStreamTest.Consumer.SingleActiveConsumer do
:ok = Conn3.connect()
:ok = Conn4.connect()

:ok = Conn4.delete_stream("super-stream-test-01")
Conn4.delete_stream("super-stream-test-01")

{:ok, _} = Publisher.start_link(stream_name: "super-stream-test-01")

Expand Down
Loading

0 comments on commit aba784d

Please sign in to comment.