Skip to content

Commit

Permalink
Refactor the example code
Browse files Browse the repository at this point in the history
- Refactor initialize() and produce() methods to get source topics from the aggregator configuration
- Remove unnecessary configuration parameters
- Add --config-file option for ini_example and produce commands
  • Loading branch information
afausti committed Oct 5, 2021
1 parent e245654 commit 44c8ce9
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 29 deletions.
25 changes: 20 additions & 5 deletions src/kafkaaggregator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,19 @@ def main() -> None:
help="The maximum number of messages to produce.",
show_default=True,
),
option(
"--config-file",
type=str,
default=config.aggregator_config_file,
help="Aggregator configuration file.",
show_default=True,
),
)
async def produce(
self: AppCommand, frequency: float, max_messages: int
self: AppCommand, frequency: float, max_messages: int, config_file: str
) -> None:
"""Produce messages for the aggregation example."""
example = AggregationExample()
example = AggregationExample(Path(__file__).parent.joinpath(config_file))

try:
await example.produce(
Expand All @@ -53,10 +60,18 @@ async def produce(
logger.error(e)


@app.command()
async def init_example(self: AppCommand) -> None:
@app.command(
option(
"--config-file",
type=str,
default=config.aggregator_config_file,
help="Aggregator configuration file.",
show_default=True,
),
)
async def init_example(self: AppCommand, config_file: str) -> None:
"""Initialize the source topic used in the aggregation example."""
example = AggregationExample()
example = AggregationExample(Path(__file__).parent.joinpath(config_file))
await example.initialize(app=app)


Expand Down
9 changes: 0 additions & 9 deletions src/kafkaaggregator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ def _strtolist(self, s: str) -> List[str]:
class ExampleConfiguration:
"""Configuration for the Kafkaaggregator example."""

ntopics: int = int(os.getenv("NTOPICS", "10"))
"""Number of source topics used in the aggregation example."""

nfields: int = int(os.getenv("NFIELDS", "10"))
"""Number of fields for source topics used in the aggregation example."""

Expand All @@ -142,9 +139,3 @@ class ExampleConfiguration:
"""The maximum number of messages to produce. Set max_messages to a number
smaller than 1 to produce an indefinite number of messages.
"""

source_topic_name_prefix: str = os.getenv(
"SOURCE_TOPIC_NAME_PRFIX", "example"
)
"""The prefix for source topic names to use with the aggregator example.
"""
30 changes: 15 additions & 15 deletions src/kafkaaggregator/example/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
import json
import logging
import random
from pathlib import Path
from time import time
from typing import List

import faust_avro
from faust_avro import Record

from kafkaaggregator.aggregator_config import AggregatorConfig
from kafkaaggregator.config import ExampleConfiguration
from kafkaaggregator.fields import Field
from kafkaaggregator.models import create_record
Expand All @@ -22,7 +24,7 @@

logger = logging.getLogger("kafkaaggregator")

config = ExampleConfiguration()
example_config = ExampleConfiguration()


class AggregationExample:
Expand All @@ -35,11 +37,13 @@ class AggregationExample:
MAX_NTOPICS = 999
MAX_NFIELDS = 999

def __init__(self) -> None:
self._ntopics = min(config.ntopics, AggregationExample.MAX_NTOPICS)
self._nfields = min(config.nfields, AggregationExample.MAX_NFIELDS)
self._source_topic_names: List = []
def __init__(self, config_file: Path) -> None:
self._nfields = min(
example_config.nfields, AggregationExample.MAX_NFIELDS
)
self._create_record = create_record
aggregator_config = AggregatorConfig(config_file)
self._source_topics = aggregator_config.source_topics

def make_fields(self) -> List[Field]:
"""Make fields for the example topics.
Expand Down Expand Up @@ -89,18 +93,14 @@ async def initialize(self, app: faust_avro.App) -> None:
app : `faust_avro.App`
Faust application
"""
for n in range(self._ntopics):
source_topic_name = f"{config.source_topic_name_prefix}-{n:03d}"
source_topic = SourceTopicSchema(name=source_topic_name)
record = self.create_record(name=source_topic_name)
for topic in self._source_topics:
source_topic = SourceTopicSchema(name=topic)
record = self.create_record(name=topic)
schema = record.to_avro(registry=source_topic._registry)
await source_topic.register(schema=json.dumps(schema))
# Declare the source topic as an internal topic in Faust.
internal_topic = app.topic(
source_topic_name, value_type=record, internal=True
)
internal_topic = app.topic(topic, value_type=record, internal=True)
await internal_topic.declare()
self._source_topic_names.append(source_topic_name)

async def produce(
self, app: faust_avro.App, frequency: float, max_messages: int
Expand Down Expand Up @@ -132,8 +132,8 @@ async def produce(
value = random.random()
message.update({f"value{n}": value})
# The same message is sent for all source topics
for source_topic_name in self._source_topic_names:
source_topic = app.topic(source_topic_name)
for topic in self._source_topics:
source_topic = app.topic(topic)
await source_topic.send(value=message)
send_count += 1

Expand Down

0 comments on commit 44c8ce9

Please sign in to comment.