Skip to content

Commit

Permalink
wip: Isolating name registration
Browse files Browse the repository at this point in the history
  • Loading branch information
VictorGaiva committed Jan 14, 2024
1 parent aba784d commit a9bf820
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 27 deletions.
38 changes: 25 additions & 13 deletions lib/consumer/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -109,30 +109,25 @@ defmodule RabbitMQStream.Consumer do
"""
defmacro __using__(opts) do
quote bind_quoted: [opts: opts], location: :keep do
quote location: :keep do
@behaviour RabbitMQStream.Consumer

@opts opts
@opts unquote(opts)

def start_link(opts \\ []) do
name = Keyword.get(opts, :name, __MODULE__)

opts =
Application.get_env(:rabbitmq_stream, :defaults, [])
|> Keyword.get(:consumer, [])
|> Keyword.drop([:stream_name, :offset_reference, :private])
|> Keyword.merge(Application.get_env(:rabbitmq_stream, :defaults, []) |> Keyword.take([:serializer]))
|> Keyword.merge(Application.get_env(:rabbitmq_stream, __MODULE__, []))
Application.get_env(:rabbitmq_stream, name, [])
|> Keyword.merge(@opts)
|> Keyword.merge(opts)
|> Keyword.put_new(:initial_credit, 50_000)
|> Keyword.put_new(:offset_tracking, count: [store_after: 50])
|> Keyword.put_new(:flow_control, count: [credit_after: {:count, 1}])
|> Keyword.put_new(:offset_reference, Atom.to_string(__MODULE__))
|> Keyword.put_new(:offset_reference, Atom.to_string(name))
|> Keyword.put_new(:serializer, __MODULE__)
|> Keyword.put_new(:properties, [])
# Undocumented option.
|> Keyword.put_new(:consumer_module, __MODULE__)
|> Keyword.put(:name, name)

GenServer.start_link(RabbitMQStream.Consumer.LifeCycle, opts, name: __MODULE__)
RabbitMQStream.Consumer.start_link(opts)
end

def child_spec(opts) do
Expand All @@ -153,6 +148,23 @@ defmodule RabbitMQStream.Consumer do
end
end

def start_link(opts \\ []) do
opts =
Application.get_env(:rabbitmq_stream, :defaults, [])
|> Keyword.get(:consumer, [])
|> Keyword.merge(Application.get_env(:rabbitmq_stream, :defaults, []) |> Keyword.take([:serializer]))
|> Keyword.drop([:stream_name, :offset_reference, :private])
|> Keyword.merge(opts)

dbg(opts)

GenServer.start_link(RabbitMQStream.Consumer.LifeCycle, opts, name: opts[:name])
end

def child_spec(opts) do
%{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}}
end

@optional_callbacks handle_chunk: 1, handle_chunk: 2, decode!: 1, handle_update: 2

@doc """
Expand Down
7 changes: 7 additions & 0 deletions lib/consumer/lifecycle.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@ defmodule RabbitMQStream.Consumer.LifeCycle do

opts =
opts
|> Keyword.put_new(:initial_credit, 50_000)
|> Keyword.put_new(:offset_tracking, count: [store_after: 50])
|> Keyword.put_new(:flow_control, count: [credit_after: {:count, 1}])
|> Keyword.put_new(:properties, [])
|> Keyword.put(:credits, opts[:initial_credit])

opts =
opts
|> Keyword.put(:offset_tracking, OffsetTracking.init(opts[:offset_tracking], opts))
|> Keyword.put(:flow_control, FlowControl.init(opts[:flow_control], opts))

Expand Down
22 changes: 8 additions & 14 deletions lib/super_consumer/manager.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,4 @@
defmodule RabbitMQStream.SuperConsumer.Manager do
defmodule PartitionConsumer do
use RabbitMQStream.Consumer,
initial_offset: :last

def handle_update(_, true) do
{:ok, :last}
end
end

alias RabbitMQStream.SuperConsumer

use GenServer
Expand Down Expand Up @@ -36,20 +27,19 @@ defmodule RabbitMQStream.SuperConsumer.Manager do
:ok = state.connection.create_stream(name)
end

dbg(state.partitions)

for partition <- state.partitions do
dbg(partition)
# We want to start each child, but don't really care about its state
{:ok, _pid} =
DynamicSupervisor.start_child(
state.dynamic_supervisor,
{
PartitionConsumer,
RabbitMQStream.Consumer,
Keyword.merge(state.consumer_opts,
name: partition,
name: String.to_atom("#{state.super_stream}-#{partition}"),
initial_offset: :last,
connection: state.connection,
stream_name: "#{state.super_stream}-#{partition}",
consumer_module: __MODULE__,
properties: [
single_active_consumer: true,
super_stream: state.super_stream
Expand All @@ -61,4 +51,8 @@ defmodule RabbitMQStream.SuperConsumer.Manager do

{:noreply, state}
end

def handle_update(_, true) do
{:ok, :last}
end
end

0 comments on commit a9bf820

Please sign in to comment.