Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the filter transformation #121

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Project
agents/
.DS_Store

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
2 changes: 1 addition & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ API Reference

.. automodapi:: kafkaaggregator.fields

.. automodapi:: kafkaaggregator.topics
.. automodapi:: kafkaaggregator.topic_schema

.. automodapi:: kafkaaggregator.models
:no-inheritance-diagram:
Expand Down
2 changes: 1 addition & 1 deletion docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ There is a Helm chart for kafka-aggregator available from the `Rubin Observatory
Argo CD
=======

kafka-aggregator aaa is deployed using Argo CD. An example of Argo CD app using the Helm chart can be found `here <https://github.com/lsst-sqre/argocd-efd/tree/master/apps/kafka-aggregator>`_.
kafka-aggregator is deployed using Argo CD. An example of Argo CD app using the Helm chart can be found `here <https://github.com/lsst-sqre/argocd-efd/tree/master/apps/kafka-aggregator>`_.
74 changes: 40 additions & 34 deletions docs/userguide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ Running locally with docker-compose

In this guide, we use ``docker-compose`` to illustrate how to run kafka-aggregator. To run kafka-aggregator on a Kubernetes environment see the :ref:`installation` section instead.

kafka-aggregator `docker-compose configuration`_ includes services to run Confluent Kafka (zookeeper, broker, schema-registry and control-center) and was based on `this example`_.
kafka-aggregator `docker-compose configuration`_ run Confluent Kafka services.

.. Make a footnote ref to `this example`_.

.. _docker-compose configuration: https://github.com/lsst-sqre/kafka-aggregator/blob/master/docker-compose.yaml
.. _this example: https://github.com/confluentinc/examples/blob/5.3.2-post/cp-all-in-one/docker-compose.yml
Expand All @@ -17,67 +19,65 @@ Clone the kafka-aggregator repository:

.. code-block:: bash

$ git clone https://github.com/lsst-sqre/kafka-aggregator.git
git clone https://github.com/lsst-sqre/kafka-aggregator.git

Start the `zookeeper`, `broker`, and `schema-registry` services:
Start Zookeeper, a Broker, and the Confluent Schema Registry services:

.. code-block:: bash

cd kafka-aggregator
docker-compose up -d zookeeper broker schema-registry

On another terminal session, create a new Python virtual environment and install kafka-aggregator locally:
Create a new Python virtual environment and install kafka-aggregator locally (kafka-aggregator has been tested with Python 3.9):

.. code-block:: bash

$ cd kafka-aggregator
$ virtualenv -p Python3 venv
$ source venv/bin/activate
$ make update
python -m venv venv
source venv/bin/activate
make update


Initializing source topics
==========================

.. note::
In a production environment we expect that the source topics already exist in Kafka and that their Avro schemas are available from the Schema Registry.

In practice, kafka-aggregator expects that the source topics already exist in Kafka and that their Avro schemas are available from the Confluent Schema Registry. The instructions below are only necessary to run the kafka-aggregator example module.

Using the kafka-aggregator example module, you can initialize source topics in Kafka, control the number of fields in each topic, and produce messages for those topics at a given frequency.
Using the kafka-aggregator example module you can initialize source topics in Kafka, and produce messages for those topics.

With the default :ref:`configuration`, this command will initialize 10 source topics with 10 fields each and register their Avro schemas with the Schema Registry.
The following command initializes the source topics in ``example/aggregator-config.yaml`` using the default values in the ``ExampleConfiguration`` class.

.. code-block:: bash

kafkaaggregator -l info init-example

You can check that the source topics were created in Kafka:
You can check wether the source topics were created in kafka:

.. code-block:: bash

docker-compose exec broker kafka-topics --bootstrap-server broker:29092 --list


The Avro schemas were registered with the Schema Registry:
The Avro schemas for the source topics are also registered at this point. You can use this command to retrieve the schema for one of the source topics, for example:

.. code-block:: bash

curl http://localhost:8081/subjects
curl -s -X GET http://localhost:8081/schemas/ids/1 | jq


Generating Faust agents
=======================
Generating the Faust agents
===========================

Use this command to generate the Faust agents to process the source topics.
The following command generates the Faust agents to process the source topics:

.. code-block:: bash

kafkaaggregator -l info generate-agents

.. note::

By default agents are generated under the ``./agents`` folder where kafka-aggregator runs.
By default agents are generated under the ``agents`` folder from where kafka-aggregator runs.

For the source topics initialized with the kafka-aggregator example module you should have this output:
For the source topics initialized above, you should have an output similar to this one:

.. code-block:: bash

Expand All @@ -103,39 +103,45 @@ For the source topics initialized with the kafka-aggregator example module you s
[2020-07-06 18:30:59,156] [54727] [INFO] [^Worker]: Closing event loop


Starting a worker
=================
Running the Faust agents
========================

Use this command to start a kafka-aggregator worker:
Start a kafka-aggregator worker:

.. code-block:: bash

kafkaaggregator -l info worker


Producing messages
==================

On another terminal use this command to produce messages for the source topics. This command produces 6000 messages at 10Hz.
On another terminal produce messages for the source topics. For example, the following will produce 6000 messages at 10Hz.

.. code-block:: bash

kafkaaggregator -l info produce --frequency 10 --max-messages 6000

You can use `Confluent Control Center <http://localhost:9021>`_ to inspect the messages for the source and aggregation topics or use the following from the command line:
As soon as new messages are produced, you should see the worker processing the source topics.


Inspecting the results
======================

You can inspect the messages produced for the source and aggregated topics with the following:

.. code-block:: bash

docker-compose exec broker /bin/bash
root@broker:/# kafka-console-consumer --bootstrap-server broker:9092 --topic example-000
...
root@broker:/# kafka-console-consumer --bootstrap-server broker:9092 --topic example-000-aggregated
root@broker:/# kafka-console-consumer --bootstrap-server broker:9092 --topic aggregated-example-000


Inspecting the consumer lag
===========================
Consumer lag
============

An important aspect to look at is the lag for the ``kafkaaggregator`` consumers.

....

An important aspect to look at is the consumer lag for the ``kafkaaggregator`` consumers. An advantage of Faust is that you can easily add more workers to distribute the workload of the application. If the source topics are created with multiple partitions, individual partitions are assigned to different workers.
An advantage of Faust is that you can easily add more workers to distribute the workload of the application. If the source topics are created with multiple partitions, individual partitions are assigned to different workers.


Internal vs. external managed topics
Expand Down
103 changes: 49 additions & 54 deletions src/kafkaaggregator/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
import asyncio
import json
import logging
from pathlib import Path
from statistics import StatisticsError
from typing import Any, List

from faust_avro import Record

from kafkaaggregator.aggregator_config import AggregatorConfig
from kafkaaggregator.aggregator_config import AggregatedTopic
from kafkaaggregator.fields import Field
from kafkaaggregator.models import create_record
from kafkaaggregator.operations import ( # noqa: F401
Expand All @@ -30,7 +29,10 @@
q3,
stdev,
)
from kafkaaggregator.topics import AggregatedTopic, SourceTopic
from kafkaaggregator.topic_schema import (
AggregatedTopicSchema,
SourceTopicSchema,
)

logger = logging.getLogger("kafkaaggregator")

Expand All @@ -55,24 +57,29 @@ class Aggregator:

logger = logger

def __init__(self, configfile: Path, aggregated_topic: str) -> None:
def __init__(self, aggregated_topic: AggregatedTopic) -> None:

self._name = aggregated_topic.name

self._aggregated_topic = AggregatedTopic(name=aggregated_topic)
self._aggregated_topic_schema = AggregatedTopicSchema(name=self._name)

config = AggregatorConfig(configfile).get(aggregated_topic)
self._operations = config.window_aggregation.operations
self._window_size_secods = (
config.window_aggregation.window_size_seconds
aggregated_topic.window_aggregation.window_size_seconds
)
self._operations = aggregated_topic.window_aggregation.operations
self._min_sample_size = (
aggregated_topic.window_aggregation.min_sample_size
)
self._min_sample_size = config.window_aggregation.min_sample_size

# Supports the 1 source topic -> 1 aggregated topic case for the moment
source_topic = config.source_topics[0]
source_topic_name = aggregated_topic.source_topics[0]

self._source_topic = SourceTopic(name=source_topic)
self._fields = config.get(source_topic).fields
self._create_record = create_record
self._source_topic_schema = SourceTopicSchema(name=source_topic_name)

# TODO: return field names prefixed by source topic
self._fields = aggregated_topic.get(source_topic_name).fields

self._create_record = create_record
self._aggregated_fields: List[Field] = []
self._record: Record = None

Expand Down Expand Up @@ -100,10 +107,10 @@ def _create_aggregated_fields(
List of aggregation fields.
"""
time = Field(name="time", type=float)
window_size = Field(name="window_size", type=float)
window_size_seconds = Field(name="window_size_seconds", type=float)
count = Field(name="count", type=int)

aggregated_fields = [time, window_size, count]
aggregated_fields = [time, window_size_seconds, count]

for field in fields:
# Only numeric fields are aggregated
Expand All @@ -119,64 +126,52 @@ def _create_aggregated_fields(

return aggregated_fields

async def create_record(self) -> Record:
"""Create a Faust-avro Record class for the aggregation topic.
async def create_record_and_register(self) -> Record:
"""Create a Faust Record and scehma for the aggregation topic."""
logger.info(f"Create Faust record for topic {self._name}.")

Returns
-------
record : `Record`
Faust-avro Record class for the aggreated topic.
"""
aggregated_topic_name = self._aggregated_topic.name
logger.info(f"Create Faust record for topic {aggregated_topic_name}.")
cls_name = self._name.title().replace("-", "")

cls_name = aggregated_topic_name.title().replace("-", "")
# TODO: add ability to filter fields (use self._fields)
# The source fields are already validated when the
# AggregatedTopic object is created

# TODO: add ability to filter fields
source_fields = await self._source_topic.get_fields()
fields = await self._source_topic_schema.get_fields(self._fields)

self._aggregated_fields = self._create_aggregated_fields(
source_fields, self._operations
fields, self._operations
)

# Create Faust Record
self._record = self._create_record(
cls_name=cls_name,
fields=self._aggregated_fields,
doc=f"Faust record for topic {aggregated_topic_name}.",
doc=f"Faust record for topic {self._name}.",
)

logger.info(f"Register Avro schema for topic {self._name}.")

# Convert Faust Record to Avro Schema
avro_schema = self._record.to_avro(
registry=self._aggregated_topic_schema._registry
)

await self._register(self._record)
# Register Avro Schema with Schema Registry
await self._aggregated_topic_schema.register(
schema=json.dumps(avro_schema)
)

return self._record

def async_create_record(self) -> Record:
def async_create_record_and_register(self) -> None:
"""Sync call to ``async create_record()``.

Get the current event loop and call the async ``create_record()``
Get the current event loop and call the async
``create_record_and_register()``
method.

Returns
-------
record : `Record`
Faust-avro Record class for the aggreation topic.
"""
loop = asyncio.get_event_loop()
record = loop.run_until_complete(self.create_record())
return record

async def _register(self, record: Record) -> None:
"""Register the Avro schema for the aggregation topic.

Parameters
----------
record: `Record`
Faust-avro Record for the aggregation model.
"""
topic_name = self._aggregated_topic.name
logger.info(f"Register Avro schema for topic {topic_name}.")
schema = record.to_avro(registry=self._aggregated_topic._registry)

await self._aggregated_topic.register(schema=json.dumps(schema))
loop.run_until_complete(self.create_record_and_register())

def compute(
self,
Expand Down Expand Up @@ -210,7 +205,7 @@ def compute(
aggregated_values = {
"count": count,
"time": time,
"window_size": self._window_size_secods,
"window_size_seconds": self._window_size_secods,
}

for aggregated_field in self._aggregated_fields:
Expand Down
10 changes: 9 additions & 1 deletion src/kafkaaggregator/aggregator_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Pydantic models for the aggregator configuration."""

from pathlib import Path
from typing import Any, List, Mapping, Optional
from typing import Any, List, Mapping, Optional, Set

import yaml
from pydantic import BaseModel, validator
Expand Down Expand Up @@ -159,6 +159,14 @@ def aggregated_topics(self) -> List:
aggregated_topic.append(topic.name)
return aggregated_topic

@property
def source_topics(self) -> Set:
"""Return all source topics in the aggregator config."""
source_topics: Set = set()
for topic in self._config.aggregated_topics:
source_topics.update(topic.source_topics)
return source_topics

def get(self, aggregated_topic: str) -> AggregatedTopic:
"""Get aggregated topic object by its name."""
for topic in self._config.aggregated_topics:
Expand Down
Loading