Skip to content

Commit

Permalink
Refactor AgentGenerator class
Browse files Browse the repository at this point in the history
  • Loading branch information
afausti committed Oct 4, 2021
1 parent 56ab1fc commit 98a6874
Showing 1 changed file with 45 additions and 53 deletions.
98 changes: 45 additions & 53 deletions src/kafkaaggregator/generator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Generates Faust agents based on the agent.j2 template."""
"""Generates Faust agents (stream processors) using the agent.j2 template."""

__all__ = ["AgentGenerator"]

Expand All @@ -11,104 +11,96 @@
from jinja2 import Environment, PackageLoader, Template, TemplateError

from kafkaaggregator.aggregator_config import AggregatorConfig
from kafkaaggregator.app import config

logger = logging.getLogger("kafkaaggregator")


class AgentGenerator:
"""Generate Faust agents from a list of source topics.
"""Generate a Faust agent for an aggregated topic.
Creates the context and renders the agents code template.
Parameters
----------
source_topic_names : `list`
List of source topic names.
"""
config_file : `Path`
Aggregator configuration file.
aggegated_topic: str
Name of the aggregated topic.
template_file: str
Name of the agent Jinja2 template file
logger = logger
def __init__(self, config_file: Path, aggregated_topic: str) -> None:
"""

self._aggregated_topic_name = aggregated_topic
logger = logger

# TODO: fix to aggregate multiple topics
config = AggregatorConfig(config_file).get(aggregated_topic)
def __init__(
self,
config_file: Path,
aggregated_topic: str,
template_file: str,
output_dir: str,
) -> None:

self._aggregated_topic = aggregated_topic
aggregator_config = AggregatorConfig(config_file).get(
self._aggregated_topic
)

# Supports the 1 source topic -> 1 aggregated topic case for the moment
self._source_topic_name = config.source_topics[0]
self._source_topic = aggregator_config.source_topics[0]
self._template: Template = self._load_template()

@property
def template(self) -> Template:
"""Get the agent template."""
return self._template

@staticmethod
def _create_filepath(source_topic_name: str) -> str:
"""Return the file path for the agent.
The directory name comes from the agents_output_dir configuration
parameter and the file name is based on source topic name.
Parameters
----------
source_topic_name : `str`
Name of the source topic to aggregate.
"""
agents_output_dir = config.agents_output_dir

filepath = os.path.join(agents_output_dir, f"{source_topic_name}.py")

return filepath
self._window_size_secods = (
aggregator_config.window_aggregation.window_size_seconds
)
self._window_expiration_seconds = (
aggregator_config.window_aggregation.window_expiration_seconds
)
self._template_file = template_file
self._output_dir = output_dir

def _create_context(self) -> Mapping[str, Any]:
"""Create the template context.
The template context stores the values passed to the template.
Parameters
----------
source_topic_name : `str`
Name of the source topic to aggregate
Returns
-------
context : `dict`
A dictionary with values passed to the template.
"""
cls_name = self._source_topic_name.title().replace("-", "")
cls_name = self._aggregated_topic.title().replace("-", "")

context = dict(
cls_name=cls_name,
source_topic_name=self._source_topic_name,
aggregation_topic_name=self._aggregated_topic_name,
source_topic=self._source_topic,
aggregated_topic=self._aggregated_topic,
)

return context

@staticmethod
def _load_template() -> Template:
def _load_template(self) -> Template:
"""Load the agent template file."""
agent_template_file = config.agent_template_file

env = Environment(
loader=PackageLoader("kafkaaggregator"), keep_trailing_newline=True
)
try:
template = env.get_template(agent_template_file)
template = env.get_template(self._template_file)
except TemplateError as e:
logger.error("Error loading the agent template file.")
raise e

return template

async def run(self) -> None:
"""Run agents code generation."""
logger.info(f"Generating agent code for {self._source_topic_name}.")
filepath = self._create_filepath(self._source_topic_name)
logger.info(f"Generating agent code for {self._aggregated_topic}.")

template = self._load_template()
context = self._create_context()

async with aiofiles.open(filepath, "w") as file:
await file.write(self._template.render(**context))
output_file = os.path.join(
self._output_dir, f"{self._aggregated_topic}.py"
)

async with aiofiles.open(output_file, "w") as f:
await f.write(template.render(**context))

0 comments on commit 98a6874

Please sign in to comment.