From fa3093351a92b05e654c79a815d99e0e850c44df Mon Sep 17 00:00:00 2001 From: Victor Gaiva <13839490+VictorGaiva@users.noreply.github.com> Date: Mon, 4 Mar 2024 09:35:55 -0300 Subject: [PATCH] docs: Improving documentation on mudes themselves --- README.md | 28 +++++++++--------- guides/setup/getting-started.md | 2 +- lib/consumer/consumer.ex | 52 +++++++++++++++++++++++++++++++++ lib/producer/producer.ex | 28 +++++++++++++++++- 4 files changed, 94 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 8c6785c..9412eeb 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,19 @@ Elixir Client for [RabbitMQ Streams Protocol](https://www.rabbitmq.com/streams.h ## Usage +## Installation + +The package can be installed by adding `rabbitmq_stream` to your list of dependencies in `mix.exs`: + +```elixir +def deps do + [ + {:rabbitmq_stream, "~> 0.4.0"}, + # ... + ] +end +``` + ### Consuming from stream First you define a connection @@ -62,7 +75,7 @@ You can define a `Producer` module like this: ```elixir defmodule MyApp.MyProducer do use RabbitMQStream.Producer, - stream: "stream-01", + stream_name: "stream-01", connection: MyApp.MyConnection end ``` @@ -73,19 +86,6 @@ Then you can publish messages to the stream: MyApp.MyProducer.publish("Hello World") ``` -## Installation - -The package can be installed by adding `rabbitmq_stream` to your list of dependencies in `mix.exs`: - -```elixir -def deps do - [ - {:rabbitmq_stream, "~> 0.4.0"}, - # ... - ] -end -``` - ## Configuration The configuration for the connection can be set in your `config.exs` file: diff --git a/guides/setup/getting-started.md b/guides/setup/getting-started.md index e1ee05f..a378ed3 100644 --- a/guides/setup/getting-started.md +++ b/guides/setup/getting-started.md @@ -84,7 +84,7 @@ To prevent message duplication, RabbitMQ requires us to declare a named Producer ```elixir defmodule MyApp.MyProducer use RabbitMQStream.Producer, - stream: "my_stream", + stream_name: "my_stream", connection: MyApp.MyConnection end ``` diff --git a/lib/consumer/consumer.ex b/lib/consumer/consumer.ex index 34d50f0..9defb17 100644 --- a/lib/consumer/consumer.ex +++ b/lib/consumer/consumer.ex @@ -116,6 +116,58 @@ defmodule RabbitMQStream.Consumer do * `:offset_reference` * `:private` + # Decoding + + You can declare a function for decoding each message by declaring a `decode!/1` callback as follows: + defmodule MyApp.MyConsumer do + use RabbitMQStream.Consumer, + connection: MyApp.MyConnection, + stream_name: "my_stream", + initial_offset: :first + + @impl true + def decode!(message) do + Jason.decode!(message) + end + end + + Or by passing a `:serializer` option to the `use` macro: + defmodule MyApp.MyConsumer do + use RabbitMQStream.Consumer, + connection: MyApp.MyConnection, + stream_name: "my_stream", + initial_offset: :first, + serializer: Jason + end + + The default value for the `:serializer` is the module itself, unless a default is defined at a higher level of the + configuration. If there is a `decode!/1` callback defined, it is always used + + + # Properties + You can provide additional properties to the consumer to change its behavior, by passing `:properties`. + + ## Single active consumer + To use it, you must provide a "group_name". The server manages each consumer so the only one will of each group + will be receiving chunks at a time. + + Although there is only one Consumer active, we must provide the server the offset a consumer starts on when being upgraded + to the being the active one. To do so you must implement the `handle_update/2` callback, which must return a `{:ok, offset}` tuple. + + @impl true + def handle_update(_, :upgrade) do + {:ok, :last} + end + + @impl true + def handle_update(_, :downgrade) do + # Must return something when being downgraded, but it is not used by the server. + # Could be useful to use some external logic to persist the offset somewhere, + # so that it can be queried by the other consumer that is being upgraded + {:ok, :last} + end + + """ defmacro __using__(opts) do defaults = Application.get_env(:rabbitmq_stream, :defaults, []) diff --git a/lib/producer/producer.ex b/lib/producer/producer.ex index 36bad27..0e17827 100644 --- a/lib/producer/producer.ex +++ b/lib/producer/producer.ex @@ -85,6 +85,32 @@ defmodule RabbitMQStream.Producer do * `:stream_name` * `:reference_name` + + # Encoding + + You can declare a function for encoding each message by declaring a `encode!/1` callback as follows:alarm_handler + defmodule MyApp.MyProducer do + use RabbitMQStream.Producer, + stream_name: "my-stream", + connection: MyApp.MyConnection + + @impl true + def encode!(message) do + Jason.encode!(message) + end + end + + Or by passing a `:serializer` option to the `use` macro: + defmodule MyApp.MyProducer do + use RabbitMQStream.Producer, + stream_name: "my-stream", + connection: MyApp.MyConnection, + serializer: Jason + end + + The default value for the `:serializer` is the module itself, unless a default is defined at a higher level of the + configuration. If there is a `encode!/1` callback defined, it is always used + """ defmacro __using__(opts) do @@ -205,7 +231,7 @@ defmodule RabbitMQStream.Producer do message["key"] end """ - @callback filter_value(message :: term()) :: String.t() + @callback filter_value(message :: term()) :: String.t() | nil @optional_callbacks [before_start: 2, filter_value: 1] defstruct [