From 986195e6c797d6375b701784e559858e39f58580 Mon Sep 17 00:00:00 2001 From: Hao Zhu Date: Fri, 25 Oct 2024 17:39:29 -0400 Subject: [PATCH] docs (#7) --- .github/workflows/docs.yml | 50 ++++++++++ pyproject.toml | 3 + src/aact/__init__.py | 198 ++++++++++++++++++++++++++++++++++++- src/aact/messages/base.py | 73 ++++++++++++++ src/aact/nodes/__init__.py | 8 ++ src/aact/nodes/base.py | 138 ++++++++++++++++++++++++++ src/aact/nodes/registry.py | 39 ++++++++ uv.lock | 20 +++- 8 files changed, 527 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/docs.yml diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml new file mode 100644 index 0000000..7eda337 --- /dev/null +++ b/.github/workflows/docs.yml @@ -0,0 +1,50 @@ +name: website + +# build the documentation whenever there are new commits on main +on: + push: + branches: + - main + - docs/* + # Alternative: only build for tags. + # tags: + # - '*' + +# security: restrict permissions for CI jobs. +permissions: + contents: read + +jobs: + # Build the documentation and upload the static HTML files as an artifact. + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: '3.13' + + # ADJUST THIS: install all dependencies (including pdoc) + - run: pip install uv + # ADJUST THIS: build your documentation into docs/. + # We use a custom build script for pdoc itself, ideally you just run `pdoc -o docs/ ...` here. + - run: uv run --extra doc pdoc --output docs --mermaid aact + + - uses: actions/upload-pages-artifact@v3 + with: + path: docs/ + + # Deploy the artifact to GitHub pages. + # This is a separate job so that only actions/deploy-pages has the necessary permissions. + deploy: + needs: build + runs-on: ubuntu-latest + permissions: + pages: write + id-token: write + environment: + name: github-pages + url: ${{ steps.deployment.outputs.page_url }} + steps: + - id: deployment + uses: actions/deploy-pages@v4 diff --git a/pyproject.toml b/pyproject.toml index 77b0c1d..a0acdfb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,9 @@ gui = [ ai = [ "openai" ] +doc = [ + "pdoc>=15.0.0, <16.0.0", +] [build-system] requires = ["hatchling"] diff --git a/src/aact/__init__.py b/src/aact/__init__.py index b094fd0..f6191ec 100644 --- a/src/aact/__init__.py +++ b/src/aact/__init__.py @@ -1,4 +1,200 @@ +r""" +# What is AAct? + +AAct is designed for communicating sensors, neural networks, agents, users, and environments. + + +
+Can you expand on that? + +AAct is a Python library for building asynchronous, actor-based, concurrent systems. +Specifically, it is designed to be used in the context of building systems with +components that communicate with each other but don't block each other. +
+ +## How does AAct work? + +AAct is built around the concept of nodes and dataflow, where nodes are self-contained units +which receive messages from input channels, process the messages, and send messages to output channels. +Nodes are connected to each other to form a dataflow graph, where messages flow from one node to another. +Each node runs in its own event loop, and the nodes communicate with each other using Redis Pub/Sub. + +## Why should I use AAct? + +1. Non-blocking: the nodes are relatively independent of each other, so if you are waiting for users' input, + you can still process sensor data in the background. +2. Scalable: you can a large number of nodes on one machine or distribute them across multiple machines. +3. Hackable: you can easily design your own nodes and connect them to the existing nodes. +4. Zero-code configuration: the `dataflow.toml` allows you to design the dataflow graph without writing any + Python code. + +# Quickstart + +## Installation + +System requirement: + +1. Python 3.10 or higher +2. Redis server + +
+ +Redis installation + +The easiest way to install Redis is to use Docker: +```bash +docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest +``` +According to your system, you can also install Redis from the official website: https://redis.io/download + +Note: we will only require a standard Redis server (without RedisJSON / RedisSearch) in this library. + +
+ +```bash +pip install aact +``` + +
+ from source + +```bash +git clone https://github.com/ProKil/aact.git +cd aact +pip install . +``` + +For power users, please use `uv` for package management. +
+ + +## Quick Start Example + +Assuming your Redis is hosted on `localhost:6379` using docker. +You can create a `dataflow.toml` file: + +```toml +redis_url = "redis://localhost:6379/0" # required + +[[nodes]] +node_name = "print" +node_class = "print" + +[nodes.node_args.print_channel_types] +"tick/secs/1" = "tick" + +[[nodes]] +node_name = "tick" +node_class = "tick" +``` + +To run the dataflow: +```bash +aact run-dataflow dataflow.toml +``` + +This will start the `tick` node and the `print` node. The `tick` node sends a message every second to the `print` node, which prints the message to the console. + +## Usage + +### CLI + +You can start from CLI and progress to more advanced usages. + +1. `aact --help` to see all commands +2. `aact run-dataflow ` to run a dataflow. Check [Dataflow.toml syntax](#dataflowtoml-syntax) +3. `aact run-node` to run one node in a dataflow. +4. `aact draw-dataflow --svg-path ` to draw dataflow. + + +### Customized Node + +Here is the minimal knowledge you would need to implement a customized node. + +```python +from aact import Node, NodeFactory, Message + +@NodeFactory.register("node_name") +class YourNode(Node[your_input_type, your_output_type]): + + # event_handler is the only function your **have** to implement + def event_handler(self, input_channel: str, input_message: Message[your_input_type]) -> AsyncIterator[str, Message[your_output_type]]: + match input_channel: + case input_channel_1: + + yield output_channel_1, Message[your_output_type](data=your_output_message) + case input_channel_2: + ... + + # implement other functions: __init__, _wait_for_input, event_loop, __aenter__, __aexit__ + +# To run a node without CLI +async with NodeFactory.make("node_name", arg_1, arg_2) as node: + await node.event_loop() +``` + +## Concepts + +There are three important concepts to understand aact. + +```mermaid +graph TD + n1[Node 1] -->|channel_1| n2[Node 2] +``` + +### Nodes + +Nodes (`aact.Nodes`) are designed to run in parallel asynchronously. This design is especially useful for deploying the nodes onto different machines. +A node should inherit `aact.Node` class, which extends `pydantic.BaseModel`. + +### Channels + +Channel is an inherited concept from Redis Pub/Sub. You can think of it as a radio channel. +Multiple publishers (nodes) can publish messages to the same channel, and multiple subscribers (nodes) can subscribe to the same channel. + +### Messages + +Messages are the data sent through the channels. Each message type is a class in the format of `Message[T]` , where `T` is a subclass or a union of subclasses of `DataModel`. + +#### Customized Message Type + +If you want to create a new message type, you can create a new class that inherits from `DataModel`. +```python +@DataModelFactory.register("new_type") +class NewType(DataModel): + new_type_data: ... = ... + + +# For example +@DataModelFactory.register("integer") +class Integer(DataModel): + integer_data: int = Field(default=0) +``` + +## Dataflow.toml syntax + +```toml +redis_url = "redis://..." # required +extra_modules = ["package1.module1", "package2.module2"] # optional + +[[nodes]] +node_name = "node_name_1" # A unique name in the dataflow +node_class = "node_class_1" # node_class should match the class name passed into NodeFactory.register + +[node.node_args] +node_arg_1 = "value_1" + +[[nodes]] +node_name = "node_name_2" +node_class = "node_class_2" + +# ... +``` + + +""" + from .nodes import Node, NodeFactory from .messages import Message -__all__ = ["Node", "NodeFactory", "Message"] +__all__ = ["Node", "NodeFactory", "Message", "nodes", "messages", "cli"] diff --git a/src/aact/messages/base.py b/src/aact/messages/base.py index 4c6076f..d5a8925 100644 --- a/src/aact/messages/base.py +++ b/src/aact/messages/base.py @@ -4,11 +4,84 @@ class DataModel(BaseModel): + """ + # DataModel + + A datamodel in `aact` is a pydantic BaseModel with an additional field `data_type` to differentiate between different message types. + + Here are the built-in data models: + + - `aact.message.Tick`: A data model with a single field `tick` of type `int`. This is useful for sending clock ticks. + - `aact.messages.Float`: A data model with a single field `value` of type `float`. This is useful for sending floating-point numbers. + - `aact.messages.Image`: A data model with a single field `image` of type `bytes`. This is useful for sending images. + - `aact.messages.Text`: A data model with a single field `text` of type `str`. This is useful for sending text messages. + - `aact.messages.Audio`: A data model with a single field `audio` of type `bytes`. This is useful for sending audio files. + - `aact.messages.Zero`: A dummy data model with no fields. This is useful when the nodes do not receive or send any data. + + ## Customize DataModels + + For custimizing your own data models, here is an example: + + ```python + + from aact.messages import DataModel, DataModelFactory + + + @DataModelFactory.register("my_data_model") + class MyDataModel(DataModel): + my_field: str + ``` + + You can see that you don't need to define the `data_type` field in your custom data models. The `DataModelFactory` will take care of it for you. + """ + data_type: Literal[""] = Field("") + """ + @private + """ T = TypeVar("T", bound=DataModel) class Message(BaseModel, Generic[T]): + """ + # Messages + Message class is the base class for all of the messages passing through the channels. + It is a pydantic BaseModel with a single field `data` containing the actual data. + The `data` field is a subclass of `aact.messages.DataModel`. + + ## Usage + + To create a message type with DataModel `T`, you can use `Message[T]`. + To initialize a message, you can use `Message[T](data=your_data_model_instance)`. + +
+ + Why have an additional wrapper over DataModel? + + The reason for having a separate class for messages is to leverage the [pydantic's tagged union feature](https://docs.pydantic.dev/latest/concepts/performance/#use-tagged-union-not-union). + This allows us to differentiate between different message types at runtime. + + For example, the following code snippet shows how to decide the message type at runtime: + + ```python + from aact import Message, DataModel + from aact.messages import Image, Tick + + tick = 123 + tick_message = Message[Tick](data=Tick(tick=tick)) + tick_message_json = tick_message.model_dump_json() + + possible_image_or_tick_message = Message[Tick | Image].model_validate_json( + tick_message_json + ) + assert isinstance(possible_image_or_tick_message.data, Tick) + ``` +
+ """ + data: T = Field(discriminator="data_type") + """ + @private + """ diff --git a/src/aact/nodes/__init__.py b/src/aact/nodes/__init__.py index 7e940eb..bad3bc3 100644 --- a/src/aact/nodes/__init__.py +++ b/src/aact/nodes/__init__.py @@ -1,3 +1,11 @@ +""" +@public + +Nodes are the basic computation units in the AAct library. +A node specifies the input and output channels and types. +All nodes must inherit from the `aact.Node` class. +""" + from .base import Node from .tick import TickNode from .random import RandomNode diff --git a/src/aact/nodes/base.py b/src/aact/nodes/base.py index 81eb12c..96972b4 100644 --- a/src/aact/nodes/base.py +++ b/src/aact/nodes/base.py @@ -28,10 +28,131 @@ class NodeConfigurationError(Exception): class Node(BaseModel, Generic[InputType, OutputType]): + """ + Node is the base class for all nodes in the aact framework. It is a generic class that takes two type parameters: + `InputType` and `OutputType`. The InputType and OutputType is used not only for static type checking but also for + runtime message type validation, so it is important that you pass the correct types. + + Each of `InputType` and `OutputType` can be either: + 1. a subclass of `aact.messages.DataModel`, or + 2. a union of multiple `aact.DataModel` subclasses, or + 3. `aact.DataModel` itself to allow any type of message (not recommended).[^1] + + Any subclass of `aact.Node` must implement the `event_handler` method, which is the main computation logic of the + node. The `event_handler` method takes two arguments: `input_channel` and `input_message`, and returns an async + iterator of tuples of output channel and output message. + + For example, the following code snippet shows a simple node that takes a `aact.messages.Text` message from the `a` + channel and echo it to the `b` channel. + + ```python + from aact import Node, Message + from aact.messages import Text + + from typing import AsyncIterator + + class EchoNode(Node[Text, Text]): + def event_handler(self, input_channel: str, input_message: Message[Text]) -> AsyncIterator[str, Message[Text]]: + yield "b", Message[Text](data=input_message.data) + ``` + + ## Built-in Nodes + + aact provides several built-in nodes that you can use out of the box. Here are some of the built-in nodes: + + - `aact.nodes.listener.ListenerNode`: A node that listens to the audio input from the microphone. + - `aact.nodes.speaker.SpeakerNode`: A node that plays the audio output to the speaker. + - `aact.nodes.record.RecordNode`: A node that records the messages to a file. + - `aact.nodes.print.PrintNode`: A node that prints the messages to the console. + - `aact.nodes.tick.TickNode`: A node that sends a tick message at a fixed interval. + - `aact.nodes.random.RandomNode`: A node that sends a random number message. + - `aact.nodes.transcriber.TranscriberNode`: A node that transcribes the audio messages to text. + - `aact.nodes.tts.TTSNode`: A node that converts the text messages to audio. + + ## Common usage + + The usage of nodes is in the [quick start guide](aact.html/#usage). + + ## Advanced usage + + ### Send messages on your own + + The default behavior of sending messages in the base Node class is handled in the `event_loop` method. If you want to + send messages on your own, you can directly use the Redis instance `r` to publish messages to the output channels. + + ```python + + class YourNode(Node[InputType, OutputType]): + + async def func_where_you_send_messages(self): + await self.r.publish(your_output_channel, Message[OutputType](data=your_output_message).model_dump_json()) + + ``` + + ### Customize set up and tear down + + You can customize the set up and tear down of the node by overriding the `__aenter__` and `__aexit__` methods. For + example, you can open a file in the `__aenter__` method and close it in the `__aexit__` method. + + ```python + + class YourNode(Node[InputType, OutputType]): + + async def __aenter__(self) -> Self: + self.file = open("your_file.txt", "w") + return await super().__aenter__() + + async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: + self.file.close() + return await super().__aexit__(exc_type, exc_value, traceback) + ``` + + This will ensure the file is closed properly even if an exception is raised. + + ### Background tasks + + You can run background tasks in the node by creating a task in the `__aenter__` method and cancelling it in the + `__aexit__` method. + + ```python + + class YourNode(Node[InputType, OutputType]): + + async def __aenter__(self) -> Self: + + self.task = asyncio.create_task(self.background_task()) + return await super().__aenter__() + + async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: + self.task.cancel() + + try: + await self.task + except asyncio.CancelledError: + pass + ``` + + [^1]: Only if you know what you are doing. For example, in the `aact.nodes.record.RecordNode`, the `InputType` is + `aact.messages.DataModel` because it can accept any type of message. But in most cases, you should specify the + `InputType` and `OutputType` to be a specific subclass of `aact.messages.DataModel`. + """ + input_channel_types: dict[str, Type[InputType]] + """ + A dictionary that maps the input channel names to the corresponding input message types. + """ output_channel_types: dict[str, Type[OutputType]] + """ + A dictionary that maps the output channel names to the corresponding output message types. + """ redis_url: str + """ + The URL of the Redis server. It should be in the format of `redis://:/`. + """ model_config = ConfigDict(extra="allow") + """ + @private + """ def __init__( self, @@ -55,8 +176,17 @@ def __init__( ) self.r: Redis = Redis.from_url(redis_url) + """ + @private + """ self.pubsub = self.r.pubsub() + """ + @private + """ self.logger = logging.getLogger("aact.nodes.base.Node") + """ + @private + """ async def __aenter__(self) -> Self: try: @@ -87,6 +217,11 @@ async def _wait_for_input( async def event_loop( self, ) -> None: + """ + The main event loop of the node. + The default implementation of the event loop is to wait for input messages from the input channels and call the + `event_handler` method for each input message, and send each output message to the corresponding output channel. + """ try: async for input_channel, input_message in self._wait_for_input(): async for output_channel, output_message in self.event_handler( @@ -104,5 +239,8 @@ async def event_loop( async def event_handler( self, _: str, __: Message[InputType] ) -> AsyncIterator[tuple[str, Message[OutputType]]]: + """ + @private + """ raise NotImplementedError("event_handler must be implemented in a subclass.") yield "", self.output_type() # unreachable: dummy return value diff --git a/src/aact/nodes/registry.py b/src/aact/nodes/registry.py index f1879af..7cce2d6 100644 --- a/src/aact/nodes/registry.py +++ b/src/aact/nodes/registry.py @@ -10,7 +10,39 @@ class NodeFactory: + """ + To use nodes in the dataflow, you need to register them in the NodeFactory before using them. + The reason for this is to allow users write string names in toml files which can be converted + to actual classes at runtime. + + To register a node, you need to use the `@NodeFactory.register` decorator. + + Example: + ```python + from aact import Node, NodeFactory + + @NodeFactory.register("node_name") + class YourNode(Node[your_input_type, your_output_type]): + # Your implementation of the node + ``` + +
+ + For power users + + You can initialize a node using the `NodeFactory.make` method. + + ```python + from aact import NodeFactory + + node = NodeFactory.make("node_name", ...)# your arguments + ``` + """ + registry: dict[str, type[Node[DataModel, DataModel]]] = {} + """ + @private + """ @classmethod def register( @@ -18,6 +50,10 @@ def register( ) -> Callable[ [type[Node[InputType, OutputType]]], type[Node[InputType, OutputType]] ]: + """ + @private + """ + def inner_wrapper( wrapped_class: type[Node[InputType, OutputType]], ) -> type[Node[InputType, OutputType]]: @@ -30,6 +66,9 @@ def inner_wrapper( @classmethod def make(cls, name: str, **kwargs: Any) -> Node[DataModel, DataModel]: + """ + @private + """ if name not in cls.registry: raise ValueError(f"Executor {name} not found in registry") return cls.registry[name](**kwargs) diff --git a/uv.lock b/uv.lock index d068d64..6f32a73 100644 --- a/uv.lock +++ b/uv.lock @@ -20,7 +20,7 @@ resolution-markers = [ [[package]] name = "aact" -version = "0.0.4" +version = "0.0.9" source = { editable = "." } dependencies = [ { name = "aiofiles" }, @@ -42,6 +42,9 @@ audio = [ { name = "pyaudio" }, { name = "types-pyaudio" }, ] +doc = [ + { name = "pdoc" }, +] google = [ { name = "google-cloud-speech" }, { name = "google-cloud-texttospeech" }, @@ -78,6 +81,7 @@ requires-dist = [ { name = "numpy", specifier = ">=1.24.0" }, { name = "openai", marker = "extra == 'ai'" }, { name = "opencv-python", marker = "extra == 'vision'" }, + { name = "pdoc", marker = "extra == 'doc'", specifier = ">=15.0.0,<16.0.0" }, { name = "pyaudio", marker = "extra == 'audio'", specifier = ">=0.2.14" }, { name = "pydantic", specifier = ">=2.8.2" }, { name = "pywebview", marker = "extra == 'gui'", specifier = ">=3.4.0" }, @@ -1131,6 +1135,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/08/aa/cc0199a5f0ad350994d660967a8efb233fe0416e4639146c089643407ce6/packaging-24.1-py3-none-any.whl", hash = "sha256:5b8f2217dbdbd2f7f384c41c628544e6d52f2d0f53c6d0c3ea61aa5d1d7ff124", size = 53985 }, ] +[[package]] +name = "pdoc" +version = "15.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "jinja2" }, + { name = "markupsafe" }, + { name = "pygments" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/d1/4a/b645514246e487a02f211023d2fc1f019a25cdd2bf2f4266ff5e92c54f41/pdoc-15.0.0.tar.gz", hash = "sha256:b761220d3ba129cd87e6da1bb7b62c8e799973ab9c595de7ba1a514850d86da5", size = 154109 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3d/4a/e4a5490bb6d39214cd7778dfd8cf6b353b94f17aafa703bd1d11673ea025/pdoc-15.0.0-py3-none-any.whl", hash = "sha256:151b0187a25eaf827099e981d6dbe3a4f68aeb18d0d637c24edcab788d5540f1", size = 144184 }, +] + [[package]] name = "proto-plus" version = "1.24.0"