diff --git a/lib/client/lifecycle.ex b/lib/client/lifecycle.ex index 94be1cc..f598d81 100644 --- a/lib/client/lifecycle.ex +++ b/lib/client/lifecycle.ex @@ -2,6 +2,7 @@ defmodule RabbitMQStream.Client.Lifecycle do use GenServer require Logger alias RabbitMQStream.Client + alias RabbitMQStream.Message @moduledoc """ This module defines the lifecycle of the RabbitMQStream.Client. @@ -185,27 +186,33 @@ defmodule RabbitMQStream.Client.Lifecycle do {:noreply, conn} end - # An issue with only forwarding the messages to the broker is that it adds an extra message pass. - # To workaround this issue we could buffer messages so that it offsets the possible performance hit. - # Or we could attempt to use ':ets' in some way to prevent work around this, but there doesn't - # seem to be a way to do this while keeping the exact same interface as a Connection. - def handle_cast({:publish, opts}, %Client{} = conn) do + def handle_cast({:respond, opts}, %Client{} = conn) do + # We only accept one type of 'respond' command + %Message.Request{ + command: :consumer_update, + data: %Message.Types.ConsumerUpdateRequestData{subscription_id: subscription_id} + } = Keyword.fetch!(opts, :request) + broker_pid = conn.clients - |> Map.get(opts[:producer_id]) + |> Map.get(subscription_id) |> then(&elem(&1, 1)) - GenServer.cast(broker_pid, {:publish, opts}) + GenServer.cast(broker_pid, {:respond, opts}) {:noreply, conn} end - def handle_cast({:respond, opts}, %Client{} = conn) do + # An issue with only forwarding the messages to the broker is that it adds an extra message pass. + # To workaround this issue we could buffer messages so that it offsets the possible performance hit. + # Or we could attempt to use ':ets' in some way to prevent work around this, but there doesn't + # seem to be a way to do this while keeping the exact same interface as a Connection. + def handle_cast({:publish, opts}, %Client{} = conn) do broker_pid = conn.clients - |> Map.get(opts[:subscription_id]) + |> Map.get(opts[:producer_id]) |> then(&elem(&1, 1)) - GenServer.cast(broker_pid, {:respond, opts}) + GenServer.cast(broker_pid, {:publish, opts}) {:noreply, conn} end