Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitMQ 3.11 Support #7

Merged
merged 70 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 69 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
a1b0e6b
refactor: Code organization
VictorGaiva Dec 30, 2023
8fcffc0
refactor: Lifecycle is now its own Genserver
VictorGaiva Dec 30, 2023
47076a1
refactor: Lifecycle manager renamed to Server
VictorGaiva Dec 30, 2023
67310e5
refactor: Separation of concerns on each module.
VictorGaiva Dec 30, 2023
2957ed8
Merge remote-tracking branch 'origin/main'
VictorGaiva Jan 2, 2024
ec5416a
refactor: Data decoding functions in the same module
VictorGaiva Jan 3, 2024
5d92fc8
refactor: Removed get_state, and added route and partitions commands
VictorGaiva Jan 3, 2024
4c941f9
hotfix: A version is related to the command, no the connection
VictorGaiva Jan 3, 2024
e2196a0
wip: Publish version 2
VictorGaiva Jan 3, 2024
3643e3b
refactor: Command outbox buffering
VictorGaiva Jan 6, 2024
a96b4b4
refactor: Moved osiris to its folder
VictorGaiva Jan 6, 2024
5598d60
refactor: Code organization for encoding and decoding modules
VictorGaiva Jan 6, 2024
a7d1f5c
test: Initial support for handler testing
VictorGaiva Jan 6, 2024
b213fae
feat: Adds serialization options for messages
VictorGaiva Jan 6, 2024
c28e8cc
feat: Added support for `exchange_command_versions` command
VictorGaiva Jan 6, 2024
bde9588
config: Set connection defaults at configuration
VictorGaiva Jan 6, 2024
e8c15d0
feat: TLS Support
VictorGaiva Jan 6, 2024
75b1afe
feat: Send all supported commands to server
VictorGaiva Jan 6, 2024
443e26f
ci: Added docker-compose service
VictorGaiva Jan 7, 2024
8765761
ci: Update CI workflow to include ci.yaml in the watchlist
VictorGaiva Jan 7, 2024
bb01f19
hotfix: Fix CI flag
VictorGaiva Jan 7, 2024
08c75c5
refactor: Remove unnecessary code
VictorGaiva Jan 7, 2024
43bb6db
hotfix: Update docker-compose file name
VictorGaiva Jan 7, 2024
8a62629
ci: Removed services specification
VictorGaiva Jan 7, 2024
27596f4
ci: Run test in multiple RabbitMQ versions
VictorGaiva Jan 7, 2024
226714e
feat: Able to correctly use the 'single-active-consumer' subscriber p…
VictorGaiva Jan 8, 2024
7e53133
refactor: Add typing information to commands
VictorGaiva Jan 9, 2024
23fc71b
refactor: Remove no longder supported versions
VictorGaiva Jan 10, 2024
e7e3d3f
feat: Support for single-active-consumer and filter-value
VictorGaiva Jan 10, 2024
dfb7404
refactor: Renamed Subscriber to Consumer
VictorGaiva Jan 10, 2024
785bc68
refactor: Consumer GenServer module consolidated
VictorGaiva Jan 10, 2024
12a0214
feat: Support for `streamstats` command
VictorGaiva Jan 10, 2024
734422c
refactor: Renaming modules and configuration options
VictorGaiva Jan 14, 2024
5095ae7
doc: Writing some documentation
VictorGaiva Jan 14, 2024
1d7832d
feat: Attempt to store the offset before shutting down
VictorGaiva Jan 14, 2024
1c83f36
feat: Create and Delete superstream commands
VictorGaiva Jan 14, 2024
691dc96
feat: Create and Delete superstreams
VictorGaiva Jan 14, 2024
aba784d
wip: SuperStreams
VictorGaiva Jan 14, 2024
a9bf820
wip: Isolating name registration
VictorGaiva Jan 14, 2024
765b664
feat: Using custom registry for registring partitions
VictorGaiva Jan 14, 2024
b8eff14
refactor: Publisher and Subscriber no longer expects a Module Connection
VictorGaiva Jan 15, 2024
f6ab758
refactor: Simplified sequence tracking
VictorGaiva Jan 15, 2024
5c5d080
hotfix: Fix cyclic depencency
VictorGaiva Jan 16, 2024
5dd166c
refactor: Remove unnecessary state persistence
VictorGaiva Jan 16, 2024
b74d395
refactor: Rename commands map and format
VictorGaiva Jan 16, 2024
6932595
ci: Only include necessary tests
VictorGaiva Jan 16, 2024
e067f1a
hotfix: Command version check on 3.11
VictorGaiva Jan 16, 2024
e63520b
feat: Consumer 'before_start/2' callback
VictorGaiva Jan 16, 2024
ffe3dd5
refactor: Don't raise, just warn
VictorGaiva Jan 16, 2024
eccc703
refactor: Setup must be done by user
VictorGaiva Jan 16, 2024
cbd9905
hotfix: Don't override options when not necessary
VictorGaiva Jan 17, 2024
f4f9fac
refactor: Serializer and options consolidation
VictorGaiva Jan 17, 2024
4c9924f
feat: Functional SuperPublisher and SuperConsumers
VictorGaiva Jan 17, 2024
c25559b
feat: SuperStreams tested functionallities
VictorGaiva Jan 18, 2024
f3f374e
hotfix: Encoding accepts empty arrays and integers
VictorGaiva Jan 18, 2024
4387dc8
doc: Added documentation to Connection
VictorGaiva Jan 18, 2024
241cac0
refactor: Improved api for creating super-streams
VictorGaiva Jan 18, 2024
0202166
test: Creating and Querying SuperStream metadata
VictorGaiva Jan 18, 2024
a54df49
test: Check if test pre-condition
VictorGaiva Jan 18, 2024
2948df4
doc: Updated support table
VictorGaiva Jan 18, 2024
d16a686
ci: Disable failfast
VictorGaiva Jan 18, 2024
c54d4f6
refactor: Renamed Publisher to Producer
VictorGaiva Jan 18, 2024
819b200
ci: Create SuperStream for testing
VictorGaiva Jan 18, 2024
5fb0b0d
ci: Exec by name
VictorGaiva Jan 18, 2024
7851b92
ci: wait for startup
VictorGaiva Jan 19, 2024
97312d7
ci: Don't fail on 137
VictorGaiva Jan 19, 2024
14d91c2
ci: Don't specify the pid
VictorGaiva Jan 19, 2024
f87e116
ci: Adding the PID
VictorGaiva Jan 19, 2024
e345883
ci: Manually wait a bit
VictorGaiva Jan 19, 2024
d5fcae2
Update guides/concepts/offset.md
VictorGaiva Jan 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,23 @@ on:
- "mix.exs"
- "mix.lock"
- "test/**"
- ".github/workflows/ci.yaml"
push:
branches:
- main
paths:
- "lib/**"
- "mix.exs"
- "mix.lock"
- ".github/workflows/ci.yaml"

jobs:
test:
runs-on: ubuntu-latest
services:
rabbitmq:
image: rabbitmq:3.11
ports:
- 5552:5552
strategy:
fail-fast: false
matrix:
version: ["3_13", "3_12", "3_11"]
steps:
- uses: erlef/setup-beam@v1
with:
Expand All @@ -33,10 +34,17 @@ jobs:

- uses: actions/checkout@v3

- name: Enable rabbitmq management plugin
run: |
DOCKER_NAME=$(docker ps --filter ancestor=rabbitmq:3.11 --format "{{.Names}}")
docker exec $DOCKER_NAME rabbitmq-plugins enable rabbitmq_stream
- uses: isbang/compose-action@v1.5.1
with:
compose-file: "./services/docker-compose.yaml"
services: "rabbitmq_stream_${{ matrix.version }}"

- name: Wait RabbitMQ is Up
run: sleep 10s
shell: bash

- name: Create 'invoices' SuperStream
run: docker exec rabbitmq_stream rabbitmq-streams add_super_stream invoices --partitions 3

- name: Install Dependencies
run: |
Expand All @@ -50,4 +58,4 @@ jobs:
run: mix compile

- name: Run tests
run: mix test
run: mix test --exclude test --include v${{ matrix.version }}
62 changes: 44 additions & 18 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,48 +1,74 @@
# Changelog

## 0.1.0
## 0.4.0

Initial release with the following features:
Added support for RabbitMQ 3.13, with Route, Partitions and SuperStreams support.

- Opening connection to RabbitMQ server
- Declaring a Stream
- Creating a Stream Publisher
- Subscribing to Stream Messages
- Initial Hex Release
### 0.4.0 Features

## 0.2.0
- Support for `:consumerupdate`, `:exchangecommandversions`, `:streamstats`, commands.
- Serialization options for encoding and decoding messages.
- TLS Support
- Functional `single-active-consumer`.
- Initial support for `filter_value` consumer parameter, and `:createsuperstream`, `:deletesuperstream`, `:route`, `:partitions` commands.
- Initial support for SuperStreams, with RabbitMQStream.SuperConsumer and RabbitMQStream.SuperPublisher.

The main objective of this release is to remove the manually added code from `rabbitmq_stream_common`'s Erlang implementation of Encoding and Decoding logic, with frame buffering.
### 0.4.0 Changes

## 0.2.1
The 'Message' module tree was refactored to make all the Encoding and Decoding logic stay close to each other.

Documentation and Configuration refactoring
- Improved the cleanup logic for closing the connection.
- Publishers and Consumers now expects any name of a GenServer process, instead of a Module.
- Added checks on supported commands based on Server version, and exchanged commands versions.

- It is now possible to define the connection and subscriber parameters throught the `config.exs` file
- Documentation improvements, and examples
### 0.4.0 Breaking Changes

- Renamed `RabbitMQStream.Subscriber` to `RabbitMQStream.Consumer`
- Renamed `RabbitMQStream.Publisher` to `RabbitMQStream.Producer`

## 0.3.0

Added an implementation for a stream Subscriber, fixed bugs and improved the documentation.
Added an implementation for a stream Consumer, fixed bugs and improved the documentation.

### Features
### 0.3.0 Features

- Added the `:credit` command.
- Added `RabbitMQStream.Subscriber`, which subscribes to a stream, while tracking its offset and credit based on customizeable strategies.
- Added the possibility of globally configuring the default Connection for Publishers and Subscribers

### Bug Fixes
### 0.3.0 Bug Fixes

- Fixed an issue where tcp packages with multiple commands where not being correctly parsed, and in reversed order

### Changes
### 0.3.0 Changes

- `RabbitMQStream.Publisher` no longer calls `connect` on the Connection during its setup.
- Moved `RabbitMQStream.Publisher`'s setup logic into `handle_continue`, to prevent locking up the application startup.
- `RabbitMQStream.Publisher` no longer declares the stream if it doesn't exists.
- `RabbitMQStream.Publisher` module now can optionally declare a `before_start/2` callback, which is called before it calls `declare_publisher/2`, and can be used to create the stream if it doesn't exists.
- `RabbitMQStream.Connection` now buffers the requests while the connection is not yet `:open`.

### Breaking Changes
### 0.3.0 Breaking Changes

- Subscription deliver messages are now in the format `{:chunk, %RabbitMQ.OsirisChunk{}}`.

## 0.2.1

Documentation and Configuration refactoring

- It is now possible to define the connection and subscriber parameters throught the `config.exs` file
- Documentation improvements, and examples

## 0.2.0

The main objective of this release is to remove the manually added code from `rabbitmq_stream_common`'s Erlang implementation of Encoding and Decoding logic, with frame buffering.

## 0.1.0

Initial release with the following features:

- Opening connection to RabbitMQ server
- Declaring a Stream
- Creating a Stream Publisher
- Subscribing to Stream Messages
- Initial Hex Release
48 changes: 36 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Zero dependencies Elixir Client for [RabbitMQ Streams Protocol](https://www.rabb

## Usage

### Subscribing to stream
### Consuming from stream

First you define a connection

Expand All @@ -20,23 +20,23 @@ defmodule MyApp.MyConnection do
end
```

You then can declare a subscriber module with the `RabbitMQStream.Subscriber`:
You then can declare a consumer module with the `RabbitMQStream.Consumer`:

```elixir
defmodule MyApp.MySubscriber do
use RabbitMQStream.Subscriber,
defmodule MyApp.MyConsumer do
use RabbitMQStream.Consumer,
connection: MyApp.MyConnection,
stream_name: "my_stream",
initial_offset: :first

@impl true
def handle_chunk(%RabbitMQStream.OsirisChunk{}=_chunk, _subscriber) do
def handle_chunk(%RabbitMQStream.OsirisChunk{}=_chunk, _consumer) do
:ok
end
end
```

Or you could manually subscribe to the stream with
Or you could manually consume from the stream with

```elixir
{:ok, _subscription_id} = MyApp.MyConnection.subscribe("stream-01", self(), :next, 999)
Expand All @@ -51,17 +51,17 @@ def handle_info({:chunk, %RabbitMQStream.OsirisChunk{} = chunk}, state) do
end
```

You can take a look at an example Subscriber GenServer at the [Subscribing Documentation](guides/tutorial/subscribing.md).
You can take a look at an example Consumer GenServer at the [Consuming Documentation](guides/tutorial/consuming.md).

### Publishing to stream

RabbitMQ Streams protocol needs a static `:reference_name` per publisher. This is used to prevent message duplication. For this reason, each stream needs, for now, a static module to publish messages, which keeps track of its own `publishing_id`.
RabbitMQ Streams protocol needs a static `:reference_name` per producer. This is used to prevent message duplication. For this reason, each stream needs, for now, a static module to publish messages, which keeps track of its own `publishing_id`.

You can define a `Publisher` module like this:
You can define a `Producer` module like this:

```elixir
defmodule MyApp.MyPublisher do
use RabbitMQStream.Publisher,
defmodule MyApp.MyProducer do
use RabbitMQStream.Producer,
stream: "stream-01",
connection: MyApp.MyConnection
end
Expand All @@ -70,7 +70,7 @@ end
Then you can publish messages to the stream:

```elixir
MyApp.MyPublisher.publish("Hello World")
MyApp.MyProducer.publish("Hello World")
```

## Installation
Expand Down Expand Up @@ -99,4 +99,28 @@ end

```

You can configure a default Serializer module by passing it to the defaults configuration option

```elixir
config :rabbitmq_stream, :defaults,
serializer: Jason
end
```

## TLS Support

You can configure the RabbitmqStream to use TLS connections:

```elixir
coonfig :rabbitmq_stream, :defaults,
connection: [
transport: :ssl,
ssl_opts: [
keyfile: "services/cert/client_box_key.pem",
certfile: "services/cert/client_box_certificate.pem",
cacertfile: "services/cert/ca_certificate.pem"
]
]
```

For more information, check the [documentation](https://hexdocs.pm/rabbitmq_stream/).
5 changes: 5 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@ import Config

# Prevents the CI from being spammed with logs
config :logger, :level, :info

config :rabbitmq_stream, :defaults,
connection: [
port: 5553
]
64 changes: 32 additions & 32 deletions docs/support-table.md
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
# Support Table

| Command | Supported? |
|-------------------------|------------|
| declarepublisher | ✔️ |
| publish | ✔️ |
| publishconfirm | ✔️ |
| publisherror | ✔️ |
| querypublishersequence | ✔️ |
| deletepublisher | ✔️ |
| subscribe | ✔️ |
| deliver | ✔️ |
| credit | ✔️ |
| storeoffset | ✔️ |
| queryoffset | ✔️ |
| unsubscribe | ✔️ |
| create | ✔️ |
| delete | ✔️ |
| metadata | ✔️ |
| metadataupdate | ✔️ |
| peerproperties | ✔️ |
| saslhandshake | ✔️ |
| saslauthenticate | ✔️ |
| tune | ✔️ |
| open | ✔️ |
| close | ✔️ |
| heartbeat | ✔️ |
| route | ❌ |
| partitions | ❌ |
| consumerupdate | ❌ |
| exchangecommandversions | ❌ |
| streamstats | ❌ |
| createsuperstream | ❌ |
| deletesuperstream | ❌ |
| Command | Supported? | Minimal version |
|-------------------------|------------|-----------------|
| declarepublisher | ✔️ | 3.9 |
| publish | ✔️ | 3.9 |
| publishconfirm | ✔️ | 3.9 |
| publisherror | ✔️ | 3.9 |
| querypublishersequence | ✔️ | 3.9 |
| deletepublisher | ✔️ | 3.9 |
| subscribe | ✔️ | 3.9 |
| deliver | ✔️ | 3.9 |
| credit | ✔️ | 3.9 |
| storeoffset | ✔️ | 3.9 |
| queryoffset | ✔️ | 3.9 |
| unsubscribe | ✔️ | 3.9 |
| create | ✔️ | 3.9 |
| delete | ✔️ | 3.9 |
| metadata | ✔️ | 3.9 |
| metadataupdate | ✔️ | 3.9 |
| peerproperties | ✔️ | 3.9 |
| saslhandshake | ✔️ | 3.9 |
| saslauthenticate | ✔️ | 3.9 |
| tune | ✔️ | 3.9 |
| open | ✔️ | 3.9 |
| close | ✔️ | 3.9 |
| heartbeat | ✔️ | 3.9 |
| consumerupdate | ✔️ | 3.11 |
| streamstats | ✔️ | 3.11 |
| exchangecommandversions | ✔️ | 3.13 |
| createsuperstream | ✔️ | 3.13 |
| deletesuperstream | ✔️ | 3.13 |
| route | ✔️ | 3.13 |
| partitions | ✔️ | 3.13 |
Loading