Skip to content

Commit

Permalink
Refactor AgentGenerator class
Browse files Browse the repository at this point in the history
- Add  --template-file and --output-dir options to generate-agents command, so we don't need to access the config in the AgentGenerator class
  • Loading branch information
afausti committed Oct 5, 2021
1 parent 56ab1fc commit e340599
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 58 deletions.
29 changes: 24 additions & 5 deletions src/kafkaaggregator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@

from faust.cli import AppCommand, option

from kafkaaggregator.app import app
from kafkaaggregator.config import Configuration, ExampleConfiguration
from kafkaaggregator.app import app, config
from kafkaaggregator.config import ExampleConfiguration
from kafkaaggregator.example.example import AggregationExample
from kafkaaggregator.generator import AgentGenerator

logger = logging.getLogger("kafkaaggregator")

config = Configuration()
example_config = ExampleConfiguration()


Expand Down Expand Up @@ -61,6 +60,20 @@ async def init_example(self: AppCommand) -> None:


@app.command(
option(
"--template-file",
type=str,
default=config.agent_template_file,
help="Name of the agent Jinja2 template file.",
show_default=True,
),
option(
"--output-dir",
type=str,
default=config.agents_output_dir,
help="Name of output directory for the agents' code.",
show_default=True,
),
option(
"--config-file",
type=str,
Expand All @@ -78,8 +91,14 @@ async def init_example(self: AppCommand) -> None:
),
)
async def generate_agents(
self: AppCommand, config_file: str, aggregated_topic: str
self: AppCommand,
config_file: str,
aggregated_topic: str,
template_file: str,
output_dir: str,
) -> None:
"""Generate Faust agents' code."""
agent_generator = AgentGenerator(Path(config_file), aggregated_topic)
agent_generator = AgentGenerator(
Path(config_file), aggregated_topic, template_file, output_dir
)
await agent_generator.run()
98 changes: 45 additions & 53 deletions src/kafkaaggregator/generator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Generates Faust agents based on the agent.j2 template."""
"""Generates Faust agents (stream processors) using the agent.j2 template."""

__all__ = ["AgentGenerator"]

Expand All @@ -11,104 +11,96 @@
from jinja2 import Environment, PackageLoader, Template, TemplateError

from kafkaaggregator.aggregator_config import AggregatorConfig
from kafkaaggregator.app import config

logger = logging.getLogger("kafkaaggregator")


class AgentGenerator:
"""Generate Faust agents from a list of source topics.
"""Generate a Faust agent for an aggregated topic.
Creates the context and renders the agents code template.
Parameters
----------
source_topic_names : `list`
List of source topic names.
"""
config_file : `Path`
Aggregator configuration file.
aggegated_topic: str
Name of the aggregated topic.
template_file: str
Name of the agent Jinja2 template file
logger = logger
def __init__(self, config_file: Path, aggregated_topic: str) -> None:
"""

self._aggregated_topic_name = aggregated_topic
logger = logger

# TODO: fix to aggregate multiple topics
config = AggregatorConfig(config_file).get(aggregated_topic)
def __init__(
self,
config_file: Path,
aggregated_topic: str,
template_file: str,
output_dir: str,
) -> None:

self._aggregated_topic = aggregated_topic
aggregator_config = AggregatorConfig(config_file).get(
self._aggregated_topic
)

# Supports the 1 source topic -> 1 aggregated topic case for the moment
self._source_topic_name = config.source_topics[0]
self._source_topic = aggregator_config.source_topics[0]
self._template: Template = self._load_template()

@property
def template(self) -> Template:
"""Get the agent template."""
return self._template

@staticmethod
def _create_filepath(source_topic_name: str) -> str:
"""Return the file path for the agent.
The directory name comes from the agents_output_dir configuration
parameter and the file name is based on source topic name.
Parameters
----------
source_topic_name : `str`
Name of the source topic to aggregate.
"""
agents_output_dir = config.agents_output_dir

filepath = os.path.join(agents_output_dir, f"{source_topic_name}.py")

return filepath
self._window_size_secods = (
aggregator_config.window_aggregation.window_size_seconds
)
self._window_expiration_seconds = (
aggregator_config.window_aggregation.window_expiration_seconds
)
self._template_file = template_file
self._output_dir = output_dir

def _create_context(self) -> Mapping[str, Any]:
"""Create the template context.
The template context stores the values passed to the template.
Parameters
----------
source_topic_name : `str`
Name of the source topic to aggregate
Returns
-------
context : `dict`
A dictionary with values passed to the template.
"""
cls_name = self._source_topic_name.title().replace("-", "")
cls_name = self._aggregated_topic.title().replace("-", "")

context = dict(
cls_name=cls_name,
source_topic_name=self._source_topic_name,
aggregation_topic_name=self._aggregated_topic_name,
source_topic=self._source_topic,
aggregated_topic=self._aggregated_topic,
)

return context

@staticmethod
def _load_template() -> Template:
def _load_template(self) -> Template:
"""Load the agent template file."""
agent_template_file = config.agent_template_file

env = Environment(
loader=PackageLoader("kafkaaggregator"), keep_trailing_newline=True
)
try:
template = env.get_template(agent_template_file)
template = env.get_template(self._template_file)
except TemplateError as e:
logger.error("Error loading the agent template file.")
raise e

return template

async def run(self) -> None:
"""Run agents code generation."""
logger.info(f"Generating agent code for {self._source_topic_name}.")
filepath = self._create_filepath(self._source_topic_name)
logger.info(f"Generating agent code for {self._aggregated_topic}.")

template = self._load_template()
context = self._create_context()

async with aiofiles.open(filepath, "w") as file:
await file.write(self._template.render(**context))
output_file = os.path.join(
self._output_dir, f"{self._aggregated_topic}.py"
)

async with aiofiles.open(output_file, "w") as f:
await f.write(template.render(**context))

0 comments on commit e340599

Please sign in to comment.