diff --git a/latest/404.html b/latest/404.html index ced3776..9e2999b 100644 --- a/latest/404.html +++ b/latest/404.html @@ -282,6 +282,33 @@ + + + + + + + + + + + + +
  • + + + + Applications + + +
  • + + + + + + + @@ -662,6 +689,122 @@ + + + + + + + + + + + + + + + +
  • + + + + + + + + + + +
  • + + + diff --git a/latest/api/cli/index.html b/latest/api/cli/index.html index f0ed22e..f77ce7a 100644 --- a/latest/api/cli/index.html +++ b/latest/api/cli/index.html @@ -16,6 +16,8 @@ + + @@ -316,6 +318,33 @@ + + + + + + + + + + + + +
  • + + + + Applications + + +
  • + + + + + + + @@ -717,6 +746,122 @@ + + + + + + + + + + + + + + + +
  • + + + + + + + + + + +
  • + + + diff --git a/latest/api/messages/index.html b/latest/api/messages/index.html index f341e8e..ba8fc09 100644 --- a/latest/api/messages/index.html +++ b/latest/api/messages/index.html @@ -323,6 +323,33 @@ + + + + + + + + + + + + +
  • + + + + Applications + + +
  • + + + + + + + @@ -836,6 +863,122 @@ + + + + + + + + + + + + + + + +
  • + + + + + + + + + + +
  • + + + diff --git a/latest/api/nodes/index.html b/latest/api/nodes/index.html index 62872f6..9686566 100644 --- a/latest/api/nodes/index.html +++ b/latest/api/nodes/index.html @@ -323,6 +323,33 @@ + + + + + + + + + + + + +
  • + + + + Applications + + +
  • + + + + + + + @@ -950,6 +977,122 @@ + + + + + + + + + + + + + + + +
  • + + + + + + + + + + +
  • + + + diff --git a/latest/applications/robot-teleoperation/index.html b/latest/applications/robot-teleoperation/index.html index 9d1e22d..2174550 100644 --- a/latest/applications/robot-teleoperation/index.html +++ b/latest/applications/robot-teleoperation/index.html @@ -13,6 +13,8 @@ + + @@ -20,7 +22,7 @@ - Teleoperating Robots with AAct, Quest, and Stretch - AAct + Robot Teleoperation - AAct @@ -59,7 +61,7 @@ - + @@ -75,7 +77,7 @@ - + @@ -139,7 +141,7 @@
    - Teleoperating Robots with AAct, Quest, and Stretch + Robot Teleoperation
    @@ -315,6 +317,37 @@ + + + + + + + + + + + + + + + + +
  • + + + + Applications + + +
  • + + + + + + + @@ -694,6 +727,265 @@ + + + + + + + + + + + + + + + + + + + + + +
  • + + + + + + + + + + +
  • + + @@ -738,11 +1030,80 @@ + + +
  • + + + Software + + +
  • + + +
  • + + + Steps + + + + + +
  • + +
  • + + + Demo + + +
  • @@ -771,8 +1132,41 @@

    Hardware

    +

    Software

    + +

    Steps

    +

    The overall steps are:

    + +

    Launch Stretch control loop

    +

    Before running nodes on stretch, please do these:

    +
      +
    1. Homing: python -m teleop.stretch_home
    2. +
    3. Running deamon control loop in a tmux or nohup or screen: python -m teleop.stretch_control_loop
    4. +
    +

    Launch AAct nodes on Stretch

    +

    You can easily launch the AAct nodes on Stretch by running the following command:

    +
    aact run-dataflow dataflows/examples/stretch_zmq_streaming.toml
    +
    +

    Launch AAct nodes on a local machine

    +

    Before this step, please get the IP of your Oculus Quest. And change line 40 in dataflows/examples/quest_local_redis.toml to your IP.

    +

    Then, you can launch the AAct nodes on a local machine by running the following command:

    +
    aact run-dataflow dataflows/examples/quest_local_redis.toml
    +
    +

    Build and launch app on Meta Quest

    +

    We provide the APK file for the app. You can install it on your Meta Quest by running the following command:

    +
    adb install -r app.apk
    +
    +

    But you can also build the app manually, by building the Unity Project.

    +

    Demo

    + diff --git a/latest/assets/images/social/applications/robot-teleoperation.png b/latest/assets/images/social/applications/robot-teleoperation.png index 2e9d2e5..ef3bac3 100644 Binary files a/latest/assets/images/social/applications/robot-teleoperation.png and b/latest/assets/images/social/applications/robot-teleoperation.png differ diff --git a/latest/index.html b/latest/index.html index d1f7740..5eb01c0 100644 --- a/latest/index.html +++ b/latest/index.html @@ -319,6 +319,33 @@ + + + + + + + + + + + + +
  • + + + + Applications + + +
  • + + + + + + + @@ -753,6 +780,122 @@ + + + + + + + + + + + + + + + +
  • + + + + + + + + + + +
  • + + + diff --git a/latest/install/index.html b/latest/install/index.html index baff3d1..9c23541 100644 --- a/latest/install/index.html +++ b/latest/install/index.html @@ -321,6 +321,33 @@ + + + + + + + + + + + + +
  • + + + + Applications + + +
  • + + + + + + + @@ -764,6 +791,122 @@ + + + + + + + + + + + + + + + +
  • + + + + + + + + + + +
  • + + + diff --git a/latest/search/search_index.json b/latest/search/search_index.json index 259a9aa..5c92e6e 100644 --- a/latest/search/search_index.json +++ b/latest/search/search_index.json @@ -1 +1 @@ -{"config":{"lang":["en"],"separator":"[\\s\\-]+","pipeline":["stopWordFilter"]},"docs":[{"location":"","title":"What is AAct?","text":"

    AAct is designed for communicating sensors, neural networks, agents, users, and environments.

    Can you expand on that? AAct is a Python library for building asynchronous, actor-based, concurrent systems. Specifically, it is designed to be used in the context of building systems with components that communicate with each other but don't block each other."},{"location":"#how-does-aact-work","title":"How does AAct work?","text":"

    AAct is built around the concept of nodes and dataflow, where nodes are self-contained units which receive messages from input channels, process the messages, and send messages to output channels. Nodes are connected to each other to form a dataflow graph, where messages flow from one node to another. Each node runs in its own event loop, and the nodes communicate with each other using Redis Pub/Sub.

    "},{"location":"install/","title":"Quickstart","text":""},{"location":"install/#installation","title":"Installation","text":"

    System requirement:

    1. Python 3.10 or higher
    2. Redis server
    Redis installation The easiest way to install Redis is to use Docker:
    docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest\n
    According to your system, you can also install Redis from the official website: https://redis.io/download Note: we will only require a standard Redis server (without RedisJSON / RedisSearch) in this library.
    pip install aact\n
    from source
    git clone https://github.com/ProKil/aact.git\ncd aact\npip install .\n
    For power users, please use `uv` for package management."},{"location":"install/#quick-start-example","title":"Quick Start Example","text":"

    Assuming your Redis is hosted on localhost:6379 using docker. You can create a dataflow.toml file:

    redis_url = \"redis://localhost:6379/0\" # required\n\n[[nodes]]\nnode_name = \"print\"\nnode_class = \"print\"\n\n[nodes.node_args.print_channel_types]\n\"tick/secs/1\" = \"tick\"\n\n[[nodes]]\nnode_name = \"tick\"\nnode_class = \"tick\"\n

    To run the dataflow:

    aact run-dataflow dataflow.toml\n

    This will start the tick node and the print node. The tick node sends a message every second to the print node, which prints the message to the console.

    "},{"location":"usage/","title":"Usage","text":""},{"location":"usage/#usage","title":"Usage","text":""},{"location":"usage/#cli","title":"CLI","text":"

    You can start from CLI and progress to more advanced usages.

    1. aact --help to see all commands
    2. aact run-dataflow <dataflow_name.toml> to run a dataflow. Check Dataflow.toml syntax
    3. aact run-node to run one node in a dataflow.
    4. aact draw-dataflow <dataflow_name_1.toml> <dataflow_name_2.toml> --svg-path <output.svg> to draw dataflow.
    "},{"location":"usage/#customized-node","title":"Customized Node","text":"

    Here is the minimal knowledge you would need to implement a customized node.

    from aact import Node, NodeFactory, Message\n\n@NodeFactory.register(\"node_name\")\nclass YourNode(Node[your_input_type, your_output_type]):\n\n    # event_handler is the only function your **have** to implement\n    def event_handler(self, input_channel: str, input_message: Message[your_input_type]) -> AsyncIterator[str, Message[your_output_type]]:\n        match input_channel:\n            case input_channel_1:\n                <do_your_stuff>\n                yield output_channel_1, Message[your_output_type](data=your_output_message)\n            case input_channel_2:\n                ...\n\n   # implement other functions: __init__, _wait_for_input, event_loop, __aenter__, __aexit__\n\n# To run a node without CLI\nasync with NodeFactory.make(\"node_name\", arg_1, arg_2) as node:\n    await node.event_loop()\n
    "},{"location":"usage/#concepts","title":"Concepts","text":"

    There are three important concepts to understand aact.

    graph TD\n    n1[Node 1] -->|channel_1| n2[Node 2]\n
    "},{"location":"usage/#nodes","title":"Nodes","text":"

    Nodes (aact.Nodes) are designed to run in parallel asynchronously. This design is especially useful for deploying the nodes onto different machines. A node should inherit aact.Node class, which extends pydantic.BaseModel.

    "},{"location":"usage/#channels","title":"Channels","text":"

    Channel is an inherited concept from Redis Pub/Sub. You can think of it as a radio channel. Multiple publishers (nodes) can publish messages to the same channel, and multiple subscribers (nodes) can subscribe to the same channel.

    "},{"location":"usage/#messages","title":"Messages","text":"

    Messages are the data sent through the channels. Each message type is a class in the format of Message[T] , where T is a subclass or a union of subclasses of DataModel.

    "},{"location":"usage/#customized-message-type","title":"Customized Message Type","text":"

    If you want to create a new message type, you can create a new class that inherits from DataModel.

    @DataModelFactory.register(\"new_type\")\nclass NewType(DataModel):\n    new_type_data: ... = ...\n\n\n# For example\n@DataModelFactory.register(\"integer\")\nclass Integer(DataModel):\n    integer_data: int = Field(default=0)\n
    "},{"location":"usage/#dataflowtoml-syntax","title":"Dataflow.toml syntax","text":"
    redis_url = \"redis://...\" # required\nextra_modules = [\"package1.module1\", \"package2.module2\"] # optional\n\n[[nodes]]\nnode_name = \"node_name_1\" # A unique name in the dataflow\nnode_class = \"node_class_1\" # node_class should match the class name passed into NodeFactory.register\n\n[node.node_args]\nnode_arg_1 = \"value_1\"\n\n[[nodes]]\nnode_name = \"node_name_2\"\nnode_class = \"node_class_2\"\n\n# ...\n
    "},{"location":"why/","title":"Why use AAct","text":""},{"location":"why/#why-should-i-use-aact","title":"Why should I use AAct?","text":"
    1. Non-blocking: the nodes are relatively independent of each other, so if you are waiting for users' input, you can still process sensor data in the background.
    2. Scalable: you can a large number of nodes on one machine or distribute them across multiple machines.
    3. Hackable: you can easily design your own nodes and connect them to the existing nodes.
    4. Zero-code configuration: the dataflow.toml allows you to design the dataflow graph without writing any Python code.
    "},{"location":"api/messages/","title":"Messages","text":"

    Bases: BaseModel, Generic[T]

    Bases: BaseModel

    "},{"location":"api/messages/#aact.Message--messages","title":"Messages","text":"

    Message class is the base class for all of the messages passing through the channels. It is a pydantic BaseModel with a single field data containing the actual data. The data field is a subclass of aact.messages.DataModel.

    "},{"location":"api/messages/#aact.Message--usage","title":"Usage","text":"

    To create a message type with DataModel T, you can use Message[T]. To initialize a message, you can use Message[T](data=your_data_model_instance).

    Why have an additional wrapper over DataModel? The reason for having a separate class for messages is to leverage the [pydantic's tagged union feature](https://docs.pydantic.dev/latest/concepts/performance/#use-tagged-union-not-union). This allows us to differentiate between different message types at runtime. For example, the following code snippet shows how to decide the message type at runtime:
    from aact import Message, DataModel\nfrom aact.messages import Image, Tick\n\ntick = 123\ntick_message = Message[Tick](data=Tick(tick=tick))\ntick_message_json = tick_message.model_dump_json()\n\npossible_image_or_tick_message = Message[Tick | Image].model_validate_json(\n    tick_message_json\n)\nassert isinstance(possible_image_or_tick_message.data, Tick)\n
    "},{"location":"api/messages/#aact.Message.data","title":"data class-attribute instance-attribute","text":"
    data: T = Field(discriminator='data_type')\n

    @private

    "},{"location":"api/messages/#aact.messages.DataModel--datamodel","title":"DataModel","text":"

    A datamodel in aact is a pydantic BaseModel with an additional field data_type to differentiate between different message types.

    Here are the built-in data models:

    "},{"location":"api/messages/#aact.messages.DataModel--customize-datamodels","title":"Customize DataModels","text":"

    For custimizing your own data models, here is an example:

    \nfrom aact.messages import DataModel, DataModelFactory\n\n\n@DataModelFactory.register(\"my_data_model\")\nclass MyDataModel(DataModel):\n    my_field: str\n

    You can see that you don't need to define the data_type field in your custom data models. The DataModelFactory will take care of it for you.

    "},{"location":"api/messages/#aact.messages.DataModel.data_type","title":"data_type class-attribute instance-attribute","text":"
    data_type: Literal[''] = Field('')\n

    @private

    "},{"location":"api/nodes/","title":"Nodes","text":"

    AAct nodes are simply classes which inherit from Node and implements different ways of handling and sending messages.

    "},{"location":"api/nodes/#aact.Node","title":"aact.Node","text":"

    Bases: BaseModel, Generic[InputType, OutputType]

    Node is the base class for all nodes in the aact framework. It is a generic class that takes two type parameters: InputType and OutputType. The InputType and OutputType is used not only for static type checking but also for runtime message type validation, so it is important that you pass the correct types.

    Each of InputType and OutputType can be either: 1. a subclass of aact.messages.DataModel, or 2. a union of multiple aact.DataModel subclasses, or 3. aact.DataModel itself to allow any type of message (not recommended).[^1]

    Any subclass of aact.Node must implement the event_handler method, which is the main computation logic of the node. The event_handler method takes two arguments: input_channel and input_message, and returns an async iterator of tuples of output channel and output message.

    For example, the following code snippet shows a simple node that takes a aact.messages.Text message from the a channel and echo it to the b channel.

    from aact import Node, Message\nfrom aact.messages import Text\n\nfrom typing import AsyncIterator\n\nclass EchoNode(Node[Text, Text]):\n    def event_handler(self, input_channel: str, input_message: Message[Text]) -> AsyncIterator[str, Message[Text]]:\n        yield \"b\", Message[Text](data=input_message.data)\n
    "},{"location":"api/nodes/#aact.Node--built-in-nodes","title":"Built-in Nodes","text":"

    aact provides several built-in nodes that you can use out of the box. Here are some of the built-in nodes:

    "},{"location":"api/nodes/#aact.Node--common-usage","title":"Common usage","text":"

    The usage of nodes is in the quick start guide.

    "},{"location":"api/nodes/#aact.Node--advanced-usage","title":"Advanced usage","text":""},{"location":"api/nodes/#aact.Node--send-messages-on-your-own","title":"Send messages on your own","text":"

    The default behavior of sending messages in the base Node class is handled in the event_loop method. If you want to send messages on your own, you can directly use the Redis instance r to publish messages to the output channels.

    \nclass YourNode(Node[InputType, OutputType]):\n\n    async def func_where_you_send_messages(self):\n        await self.r.publish(your_output_channel, Message[OutputType](data=your_output_message).model_dump_json())\n\n
    "},{"location":"api/nodes/#aact.Node--customize-set-up-and-tear-down","title":"Customize set up and tear down","text":"

    You can customize the set up and tear down of the node by overriding the __aenter__ and __aexit__ methods. For example, you can open a file in the __aenter__ method and close it in the __aexit__ method.

    \nclass YourNode(Node[InputType, OutputType]):\n\n    async def __aenter__(self) -> Self:\n        self.file = open(\"your_file.txt\", \"w\")\n        return await super().__aenter__()\n\n    async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:\n        self.file.close()\n        return await super().__aexit__(exc_type, exc_value, traceback)\n

    This will ensure the file is closed properly even if an exception is raised.

    "},{"location":"api/nodes/#aact.Node--background-tasks","title":"Background tasks","text":"

    You can run background tasks in the node by creating a task in the __aenter__ method and cancelling it in the __aexit__ method.

    \nclass YourNode(Node[InputType, OutputType]):\n\n    async def __aenter__(self) -> Self:\n\n        self.task = asyncio.create_task(self.background_task())\n        return await super().__aenter__()\n\n    async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:\n        self.task.cancel()\n\n        try:\n            await self.task\n        except asyncio.CancelledError:\n            pass\n

    [^1]: Only if you know what you are doing. For example, in the aact.nodes.record.RecordNode, the InputType is aact.messages.DataModel because it can accept any type of message. But in most cases, you should specify the InputType and OutputType to be a specific subclass of aact.messages.DataModel.

    Source code in src/aact/nodes/base.py
    class Node(BaseModel, Generic[InputType, OutputType]):\n    \"\"\"\n    Node is the base class for all nodes in the aact framework. It is a generic class that takes two type parameters:\n    `InputType` and `OutputType`. The InputType and OutputType is used not only for static type checking but also for\n    runtime message type validation, so it is important that you pass the correct types.\n\n    Each of `InputType` and `OutputType` can be either:\n    1. a subclass of `aact.messages.DataModel`, or\n    2. a union of multiple `aact.DataModel` subclasses, or\n    3. `aact.DataModel` itself to allow any type of message (not recommended).[^1]\n\n    Any subclass of `aact.Node` must implement the `event_handler` method, which is the main computation logic of the\n    node. The `event_handler` method takes two arguments: `input_channel` and `input_message`, and returns an async\n    iterator of tuples of output channel and output message.\n\n    For example, the following code snippet shows a simple node that takes a `aact.messages.Text` message from the `a`\n    channel and echo it to the `b` channel.\n\n    ```python\n    from aact import Node, Message\n    from aact.messages import Text\n\n    from typing import AsyncIterator\n\n    class EchoNode(Node[Text, Text]):\n        def event_handler(self, input_channel: str, input_message: Message[Text]) -> AsyncIterator[str, Message[Text]]:\n            yield \"b\", Message[Text](data=input_message.data)\n    ```\n\n    ## Built-in Nodes\n\n    aact provides several built-in nodes that you can use out of the box. Here are some of the built-in nodes:\n\n    - `aact.nodes.listener.ListenerNode`: A node that listens to the audio input from the microphone.\n    - `aact.nodes.speaker.SpeakerNode`: A node that plays the audio output to the speaker.\n    - `aact.nodes.record.RecordNode`: A node that records the messages to a file.\n    - `aact.nodes.print.PrintNode`: A node that prints the messages to the console.\n    - `aact.nodes.tick.TickNode`: A node that sends a tick message at a fixed interval.\n    - `aact.nodes.random.RandomNode`: A node that sends a random number message.\n    - `aact.nodes.transcriber.TranscriberNode`: A node that transcribes the audio messages to text.\n    - `aact.nodes.tts.TTSNode`: A node that converts the text messages to audio.\n\n    ## Common usage\n\n    The usage of nodes is in the [quick start guide](aact.html/#usage).\n\n    ## Advanced usage\n\n    ### Send messages on your own\n\n    The default behavior of sending messages in the base Node class is handled in the `event_loop` method. If you want to\n    send messages on your own, you can directly use the Redis instance `r` to publish messages to the output channels.\n\n    ```python\n\n    class YourNode(Node[InputType, OutputType]):\n\n        async def func_where_you_send_messages(self):\n            await self.r.publish(your_output_channel, Message[OutputType](data=your_output_message).model_dump_json())\n\n    ```\n\n    ### Customize set up and tear down\n\n    You can customize the set up and tear down of the node by overriding the `__aenter__` and `__aexit__` methods. For\n    example, you can open a file in the `__aenter__` method and close it in the `__aexit__` method.\n\n    ```python\n\n    class YourNode(Node[InputType, OutputType]):\n\n        async def __aenter__(self) -> Self:\n            self.file = open(\"your_file.txt\", \"w\")\n            return await super().__aenter__()\n\n        async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:\n            self.file.close()\n            return await super().__aexit__(exc_type, exc_value, traceback)\n    ```\n\n    This will ensure the file is closed properly even if an exception is raised.\n\n    ### Background tasks\n\n    You can run background tasks in the node by creating a task in the `__aenter__` method and cancelling it in the\n    `__aexit__` method.\n\n    ```python\n\n    class YourNode(Node[InputType, OutputType]):\n\n        async def __aenter__(self) -> Self:\n\n            self.task = asyncio.create_task(self.background_task())\n            return await super().__aenter__()\n\n        async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:\n            self.task.cancel()\n\n            try:\n                await self.task\n            except asyncio.CancelledError:\n                pass\n    ```\n\n    [^1]: Only if you know what you are doing. For example, in the `aact.nodes.record.RecordNode`, the `InputType` is\n    `aact.messages.DataModel` because it can accept any type of message. But in most cases, you should specify the\n    `InputType` and `OutputType` to be a specific subclass of `aact.messages.DataModel`.\n    \"\"\"\n\n    input_channel_types: dict[str, Type[InputType]]\n    \"\"\"\n    A dictionary that maps the input channel names to the corresponding input message types.\n    \"\"\"\n    output_channel_types: dict[str, Type[OutputType]]\n    \"\"\"\n    A dictionary that maps the output channel names to the corresponding output message types.\n    \"\"\"\n    redis_url: str\n    \"\"\"\n    The URL of the Redis server. It should be in the format of `redis://<host>:<port>/<db>`.\n    \"\"\"\n    model_config = ConfigDict(extra=\"allow\")\n    \"\"\"\n    @private\n    \"\"\"\n\n    def __init__(\n        self,\n        input_channel_types: list[tuple[str, Type[InputType]]],\n        output_channel_types: list[tuple[str, Type[OutputType]]],\n        redis_url: str = \"redis://localhost:6379/0\",\n    ):\n        try:\n            super().__init__(\n                input_channel_types=dict(input_channel_types),\n                output_channel_types=dict(output_channel_types),\n                redis_url=redis_url,\n            )\n        except ValidationError as _:\n            raise NodeConfigurationError(\n                \"You passed an invalid configuration to the Node.\\n\"\n                f\"The required input channel types are: {self.model_fields['input_channel_types'].annotation}\\n\"\n                f\"The input channel types are: {input_channel_types}\\n\"\n                f\"The required output channel types are: {self.model_fields['output_channel_types'].annotation}\\n\"\n                f\"The output channel types are: {output_channel_types}\\n\"\n            )\n\n        self.r: Redis = Redis.from_url(redis_url)\n        \"\"\"\n        @private\n        \"\"\"\n        self.pubsub = self.r.pubsub()\n        \"\"\"\n        @private\n        \"\"\"\n        self.logger = logging.getLogger(\"aact.nodes.base.Node\")\n        \"\"\"\n        @private\n        \"\"\"\n\n    async def __aenter__(self) -> Self:\n        try:\n            await self.r.ping()\n        except ConnectionError:\n            raise ValueError(\n                f\"Could not connect to Redis with the provided url. {self.redis_url}\"\n            )\n        await self.pubsub.subscribe(*self.input_channel_types.keys())\n        return self\n\n    async def __aexit__(self, _: Any, __: Any, ___: Any) -> None:\n        await self.pubsub.unsubscribe()\n        await self.r.aclose()\n\n    async def _wait_for_input(\n        self,\n    ) -> AsyncIterator[tuple[str, Message[InputType]]]:\n        async for message in self.pubsub.listen():\n            channel = message[\"channel\"].decode(\"utf-8\")\n            if message[\"type\"] == \"message\" and channel in self.input_channel_types:\n                try:\n                    data = Message[\n                        self.input_channel_types[channel]  # type: ignore[name-defined]\n                    ].model_validate_json(message[\"data\"])\n                except ValidationError as e:\n                    self.logger.error(\n                        f\"Failed to validate message from {channel}: {message['data']}. Error: {e}\"\n                    )\n                    raise e\n                yield channel, data\n        raise Exception(\"Input channel closed unexpectedly\")\n\n    async def event_loop(\n        self,\n    ) -> None:\n        \"\"\"\n        The main event loop of the node.\n        The default implementation of the event loop is to wait for input messages from the input channels and call the\n        `event_handler` method for each input message, and send each output message to the corresponding output channel.\n        \"\"\"\n        try:\n            async for input_channel, input_message in self._wait_for_input():\n                async for output_channel, output_message in self.event_handler(\n                    input_channel, input_message\n                ):\n                    await self.r.publish(\n                        output_channel, output_message.model_dump_json()\n                    )\n        except NodeExitSignal as e:\n            self.logger.info(f\"Event loop cancelled: {e}. Exiting gracefully.\")\n        except Exception as e:\n            raise e\n\n    @abstractmethod\n    async def event_handler(\n        self, _: str, __: Message[InputType]\n    ) -> AsyncIterator[tuple[str, Message[OutputType]]]:\n        \"\"\"\n        @private\n        \"\"\"\n        raise NotImplementedError(\"event_handler must be implemented in a subclass.\")\n        yield \"\", self.output_type()  # unreachable: dummy return value\n
    "},{"location":"api/nodes/#aact.Node.input_channel_types","title":"input_channel_types instance-attribute","text":"
    input_channel_types: dict[str, Type[InputType]]\n

    A dictionary that maps the input channel names to the corresponding input message types.

    "},{"location":"api/nodes/#aact.Node.output_channel_types","title":"output_channel_types instance-attribute","text":"
    output_channel_types: dict[str, Type[OutputType]]\n

    A dictionary that maps the output channel names to the corresponding output message types.

    "},{"location":"api/nodes/#aact.Node.redis_url","title":"redis_url instance-attribute","text":"
    redis_url: str\n

    The URL of the Redis server. It should be in the format of redis://<host>:<port>/<db>.

    "},{"location":"api/nodes/#aact.Node.model_config","title":"model_config class-attribute instance-attribute","text":"
    model_config = ConfigDict(extra='allow')\n

    @private

    "},{"location":"api/nodes/#aact.Node.r","title":"r instance-attribute","text":"
    r: Redis = from_url(redis_url)\n

    @private

    "},{"location":"api/nodes/#aact.Node.pubsub","title":"pubsub instance-attribute","text":"
    pubsub = pubsub()\n

    @private

    "},{"location":"api/nodes/#aact.Node.logger","title":"logger instance-attribute","text":"
    logger = getLogger('aact.nodes.base.Node')\n

    @private

    "},{"location":"api/nodes/#aact.Node.event_loop","title":"event_loop async","text":"
    event_loop() -> None\n

    The main event loop of the node. The default implementation of the event loop is to wait for input messages from the input channels and call the event_handler method for each input message, and send each output message to the corresponding output channel.

    Source code in src/aact/nodes/base.py
    async def event_loop(\n    self,\n) -> None:\n    \"\"\"\n    The main event loop of the node.\n    The default implementation of the event loop is to wait for input messages from the input channels and call the\n    `event_handler` method for each input message, and send each output message to the corresponding output channel.\n    \"\"\"\n    try:\n        async for input_channel, input_message in self._wait_for_input():\n            async for output_channel, output_message in self.event_handler(\n                input_channel, input_message\n            ):\n                await self.r.publish(\n                    output_channel, output_message.model_dump_json()\n                )\n    except NodeExitSignal as e:\n        self.logger.info(f\"Event loop cancelled: {e}. Exiting gracefully.\")\n    except Exception as e:\n        raise e\n
    "},{"location":"api/nodes/#aact.Node.event_handler","title":"event_handler abstractmethod async","text":"
    event_handler(_: str, __: Message[InputType]) -> AsyncIterator[tuple[str, Message[OutputType]]]\n

    @private

    Source code in src/aact/nodes/base.py
    @abstractmethod\nasync def event_handler(\n    self, _: str, __: Message[InputType]\n) -> AsyncIterator[tuple[str, Message[OutputType]]]:\n    \"\"\"\n    @private\n    \"\"\"\n    raise NotImplementedError(\"event_handler must be implemented in a subclass.\")\n    yield \"\", self.output_type()  # unreachable: dummy return value\n
    "},{"location":"api/nodes/#aact.NodeFactory","title":"aact.NodeFactory","text":"

    To use nodes in the dataflow, you need to register them in the NodeFactory before using them. The reason for this is to allow users write string names in toml files which can be converted to actual classes at runtime.

    To register a node, you need to use the @NodeFactory.register decorator.

    Example:

    from aact import Node, NodeFactory\n\n@NodeFactory.register(\"node_name\")\nclass YourNode(Node[your_input_type, your_output_type]):\n    # Your implementation of the node\n
    For power users You can initialize a node using the `NodeFactory.make` method.
    from aact import NodeFactory\n\nnode = NodeFactory.make(\"node_name\", ...)# your arguments\n
    "},{"location":"api/nodes/#aact.NodeFactory.registry","title":"registry class-attribute instance-attribute","text":"
    registry: dict[str, type[Node[DataModel, DataModel]]] = {}\n

    @private

    "},{"location":"api/nodes/#aact.NodeFactory.register","title":"register classmethod","text":"
    register(name: str) -> Callable[[type[Node[InputType, OutputType]]], type[Node[InputType, OutputType]]]\n

    @private

    Source code in src/aact/nodes/registry.py
    @classmethod\ndef register(\n    cls, name: str\n) -> Callable[\n    [type[Node[InputType, OutputType]]], type[Node[InputType, OutputType]]\n]:\n    \"\"\"\n    @private\n    \"\"\"\n\n    def inner_wrapper(\n        wrapped_class: type[Node[InputType, OutputType]],\n    ) -> type[Node[InputType, OutputType]]:\n        if name in cls.registry:\n            logger.warning(\"Executor %s already exists. Will replace it\", name)\n        cls.registry[name] = wrapped_class\n        return wrapped_class\n\n    return inner_wrapper\n
    "},{"location":"api/nodes/#aact.NodeFactory.make","title":"make classmethod","text":"
    make(name: str, **kwargs: Any) -> Node[DataModel, DataModel]\n

    @private

    Source code in src/aact/nodes/registry.py
    @classmethod\ndef make(cls, name: str, **kwargs: Any) -> Node[DataModel, DataModel]:\n    \"\"\"\n    @private\n    \"\"\"\n    if name not in cls.registry:\n        raise ValueError(f\"Executor {name} not found in registry\")\n    return cls.registry[name](**kwargs)\n
    "},{"location":"applications/robot-teleoperation/","title":"Teleoperating Robots with AAct, Quest, and Stretch","text":"

    The latency of the AAct system is low enough to allow for teleoperating robots in real-time. This is a powerful capability that can be used for a variety of applications, such as teleoperating a robot to perform a task in a remote location, collecting ego-centric (or together with exocentric) data for training robotics models, or deploying and evaluating models in the real world.

    In this demo (live demoed at CoRL 2024), we are going to use Meta Oculus Quest 3 / Pro and Stretch 3 mobile manipulator.

    "},{"location":"applications/robot-teleoperation/#prerequisites","title":"Prerequisites","text":""},{"location":"applications/robot-teleoperation/#hardware","title":"Hardware","text":""}]} \ No newline at end of file +{"config":{"lang":["en"],"separator":"[\\s\\-]+","pipeline":["stopWordFilter"]},"docs":[{"location":"","title":"What is AAct?","text":"

    AAct is designed for communicating sensors, neural networks, agents, users, and environments.

    Can you expand on that? AAct is a Python library for building asynchronous, actor-based, concurrent systems. Specifically, it is designed to be used in the context of building systems with components that communicate with each other but don't block each other."},{"location":"#how-does-aact-work","title":"How does AAct work?","text":"

    AAct is built around the concept of nodes and dataflow, where nodes are self-contained units which receive messages from input channels, process the messages, and send messages to output channels. Nodes are connected to each other to form a dataflow graph, where messages flow from one node to another. Each node runs in its own event loop, and the nodes communicate with each other using Redis Pub/Sub.

    "},{"location":"install/","title":"Quickstart","text":""},{"location":"install/#installation","title":"Installation","text":"

    System requirement:

    1. Python 3.10 or higher
    2. Redis server
    Redis installation The easiest way to install Redis is to use Docker:
    docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest\n
    According to your system, you can also install Redis from the official website: https://redis.io/download Note: we will only require a standard Redis server (without RedisJSON / RedisSearch) in this library.
    pip install aact\n
    from source
    git clone https://github.com/ProKil/aact.git\ncd aact\npip install .\n
    For power users, please use `uv` for package management."},{"location":"install/#quick-start-example","title":"Quick Start Example","text":"

    Assuming your Redis is hosted on localhost:6379 using docker. You can create a dataflow.toml file:

    redis_url = \"redis://localhost:6379/0\" # required\n\n[[nodes]]\nnode_name = \"print\"\nnode_class = \"print\"\n\n[nodes.node_args.print_channel_types]\n\"tick/secs/1\" = \"tick\"\n\n[[nodes]]\nnode_name = \"tick\"\nnode_class = \"tick\"\n

    To run the dataflow:

    aact run-dataflow dataflow.toml\n

    This will start the tick node and the print node. The tick node sends a message every second to the print node, which prints the message to the console.

    "},{"location":"usage/","title":"Usage","text":""},{"location":"usage/#usage","title":"Usage","text":""},{"location":"usage/#cli","title":"CLI","text":"

    You can start from CLI and progress to more advanced usages.

    1. aact --help to see all commands
    2. aact run-dataflow <dataflow_name.toml> to run a dataflow. Check Dataflow.toml syntax
    3. aact run-node to run one node in a dataflow.
    4. aact draw-dataflow <dataflow_name_1.toml> <dataflow_name_2.toml> --svg-path <output.svg> to draw dataflow.
    "},{"location":"usage/#customized-node","title":"Customized Node","text":"

    Here is the minimal knowledge you would need to implement a customized node.

    from aact import Node, NodeFactory, Message\n\n@NodeFactory.register(\"node_name\")\nclass YourNode(Node[your_input_type, your_output_type]):\n\n    # event_handler is the only function your **have** to implement\n    def event_handler(self, input_channel: str, input_message: Message[your_input_type]) -> AsyncIterator[str, Message[your_output_type]]:\n        match input_channel:\n            case input_channel_1:\n                <do_your_stuff>\n                yield output_channel_1, Message[your_output_type](data=your_output_message)\n            case input_channel_2:\n                ...\n\n   # implement other functions: __init__, _wait_for_input, event_loop, __aenter__, __aexit__\n\n# To run a node without CLI\nasync with NodeFactory.make(\"node_name\", arg_1, arg_2) as node:\n    await node.event_loop()\n
    "},{"location":"usage/#concepts","title":"Concepts","text":"

    There are three important concepts to understand aact.

    graph TD\n    n1[Node 1] -->|channel_1| n2[Node 2]\n
    "},{"location":"usage/#nodes","title":"Nodes","text":"

    Nodes (aact.Nodes) are designed to run in parallel asynchronously. This design is especially useful for deploying the nodes onto different machines. A node should inherit aact.Node class, which extends pydantic.BaseModel.

    "},{"location":"usage/#channels","title":"Channels","text":"

    Channel is an inherited concept from Redis Pub/Sub. You can think of it as a radio channel. Multiple publishers (nodes) can publish messages to the same channel, and multiple subscribers (nodes) can subscribe to the same channel.

    "},{"location":"usage/#messages","title":"Messages","text":"

    Messages are the data sent through the channels. Each message type is a class in the format of Message[T] , where T is a subclass or a union of subclasses of DataModel.

    "},{"location":"usage/#customized-message-type","title":"Customized Message Type","text":"

    If you want to create a new message type, you can create a new class that inherits from DataModel.

    @DataModelFactory.register(\"new_type\")\nclass NewType(DataModel):\n    new_type_data: ... = ...\n\n\n# For example\n@DataModelFactory.register(\"integer\")\nclass Integer(DataModel):\n    integer_data: int = Field(default=0)\n
    "},{"location":"usage/#dataflowtoml-syntax","title":"Dataflow.toml syntax","text":"
    redis_url = \"redis://...\" # required\nextra_modules = [\"package1.module1\", \"package2.module2\"] # optional\n\n[[nodes]]\nnode_name = \"node_name_1\" # A unique name in the dataflow\nnode_class = \"node_class_1\" # node_class should match the class name passed into NodeFactory.register\n\n[node.node_args]\nnode_arg_1 = \"value_1\"\n\n[[nodes]]\nnode_name = \"node_name_2\"\nnode_class = \"node_class_2\"\n\n# ...\n
    "},{"location":"why/","title":"Why use AAct","text":""},{"location":"why/#why-should-i-use-aact","title":"Why should I use AAct?","text":"
    1. Non-blocking: the nodes are relatively independent of each other, so if you are waiting for users' input, you can still process sensor data in the background.
    2. Scalable: you can a large number of nodes on one machine or distribute them across multiple machines.
    3. Hackable: you can easily design your own nodes and connect them to the existing nodes.
    4. Zero-code configuration: the dataflow.toml allows you to design the dataflow graph without writing any Python code.
    "},{"location":"api/messages/","title":"Messages","text":"

    Bases: BaseModel, Generic[T]

    Bases: BaseModel

    "},{"location":"api/messages/#aact.Message--messages","title":"Messages","text":"

    Message class is the base class for all of the messages passing through the channels. It is a pydantic BaseModel with a single field data containing the actual data. The data field is a subclass of aact.messages.DataModel.

    "},{"location":"api/messages/#aact.Message--usage","title":"Usage","text":"

    To create a message type with DataModel T, you can use Message[T]. To initialize a message, you can use Message[T](data=your_data_model_instance).

    Why have an additional wrapper over DataModel? The reason for having a separate class for messages is to leverage the [pydantic's tagged union feature](https://docs.pydantic.dev/latest/concepts/performance/#use-tagged-union-not-union). This allows us to differentiate between different message types at runtime. For example, the following code snippet shows how to decide the message type at runtime:
    from aact import Message, DataModel\nfrom aact.messages import Image, Tick\n\ntick = 123\ntick_message = Message[Tick](data=Tick(tick=tick))\ntick_message_json = tick_message.model_dump_json()\n\npossible_image_or_tick_message = Message[Tick | Image].model_validate_json(\n    tick_message_json\n)\nassert isinstance(possible_image_or_tick_message.data, Tick)\n
    "},{"location":"api/messages/#aact.Message.data","title":"data class-attribute instance-attribute","text":"
    data: T = Field(discriminator='data_type')\n

    @private

    "},{"location":"api/messages/#aact.messages.DataModel--datamodel","title":"DataModel","text":"

    A datamodel in aact is a pydantic BaseModel with an additional field data_type to differentiate between different message types.

    Here are the built-in data models:

    "},{"location":"api/messages/#aact.messages.DataModel--customize-datamodels","title":"Customize DataModels","text":"

    For custimizing your own data models, here is an example:

    \nfrom aact.messages import DataModel, DataModelFactory\n\n\n@DataModelFactory.register(\"my_data_model\")\nclass MyDataModel(DataModel):\n    my_field: str\n

    You can see that you don't need to define the data_type field in your custom data models. The DataModelFactory will take care of it for you.

    "},{"location":"api/messages/#aact.messages.DataModel.data_type","title":"data_type class-attribute instance-attribute","text":"
    data_type: Literal[''] = Field('')\n

    @private

    "},{"location":"api/nodes/","title":"Nodes","text":"

    AAct nodes are simply classes which inherit from Node and implements different ways of handling and sending messages.

    "},{"location":"api/nodes/#aact.Node","title":"aact.Node","text":"

    Bases: BaseModel, Generic[InputType, OutputType]

    Node is the base class for all nodes in the aact framework. It is a generic class that takes two type parameters: InputType and OutputType. The InputType and OutputType is used not only for static type checking but also for runtime message type validation, so it is important that you pass the correct types.

    Each of InputType and OutputType can be either: 1. a subclass of aact.messages.DataModel, or 2. a union of multiple aact.DataModel subclasses, or 3. aact.DataModel itself to allow any type of message (not recommended).[^1]

    Any subclass of aact.Node must implement the event_handler method, which is the main computation logic of the node. The event_handler method takes two arguments: input_channel and input_message, and returns an async iterator of tuples of output channel and output message.

    For example, the following code snippet shows a simple node that takes a aact.messages.Text message from the a channel and echo it to the b channel.

    from aact import Node, Message\nfrom aact.messages import Text\n\nfrom typing import AsyncIterator\n\nclass EchoNode(Node[Text, Text]):\n    def event_handler(self, input_channel: str, input_message: Message[Text]) -> AsyncIterator[str, Message[Text]]:\n        yield \"b\", Message[Text](data=input_message.data)\n
    "},{"location":"api/nodes/#aact.Node--built-in-nodes","title":"Built-in Nodes","text":"

    aact provides several built-in nodes that you can use out of the box. Here are some of the built-in nodes:

    "},{"location":"api/nodes/#aact.Node--common-usage","title":"Common usage","text":"

    The usage of nodes is in the quick start guide.

    "},{"location":"api/nodes/#aact.Node--advanced-usage","title":"Advanced usage","text":""},{"location":"api/nodes/#aact.Node--send-messages-on-your-own","title":"Send messages on your own","text":"

    The default behavior of sending messages in the base Node class is handled in the event_loop method. If you want to send messages on your own, you can directly use the Redis instance r to publish messages to the output channels.

    \nclass YourNode(Node[InputType, OutputType]):\n\n    async def func_where_you_send_messages(self):\n        await self.r.publish(your_output_channel, Message[OutputType](data=your_output_message).model_dump_json())\n\n
    "},{"location":"api/nodes/#aact.Node--customize-set-up-and-tear-down","title":"Customize set up and tear down","text":"

    You can customize the set up and tear down of the node by overriding the __aenter__ and __aexit__ methods. For example, you can open a file in the __aenter__ method and close it in the __aexit__ method.

    \nclass YourNode(Node[InputType, OutputType]):\n\n    async def __aenter__(self) -> Self:\n        self.file = open(\"your_file.txt\", \"w\")\n        return await super().__aenter__()\n\n    async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:\n        self.file.close()\n        return await super().__aexit__(exc_type, exc_value, traceback)\n

    This will ensure the file is closed properly even if an exception is raised.

    "},{"location":"api/nodes/#aact.Node--background-tasks","title":"Background tasks","text":"

    You can run background tasks in the node by creating a task in the __aenter__ method and cancelling it in the __aexit__ method.

    \nclass YourNode(Node[InputType, OutputType]):\n\n    async def __aenter__(self) -> Self:\n\n        self.task = asyncio.create_task(self.background_task())\n        return await super().__aenter__()\n\n    async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:\n        self.task.cancel()\n\n        try:\n            await self.task\n        except asyncio.CancelledError:\n            pass\n

    [^1]: Only if you know what you are doing. For example, in the aact.nodes.record.RecordNode, the InputType is aact.messages.DataModel because it can accept any type of message. But in most cases, you should specify the InputType and OutputType to be a specific subclass of aact.messages.DataModel.

    Source code in src/aact/nodes/base.py
    class Node(BaseModel, Generic[InputType, OutputType]):\n    \"\"\"\n    Node is the base class for all nodes in the aact framework. It is a generic class that takes two type parameters:\n    `InputType` and `OutputType`. The InputType and OutputType is used not only for static type checking but also for\n    runtime message type validation, so it is important that you pass the correct types.\n\n    Each of `InputType` and `OutputType` can be either:\n    1. a subclass of `aact.messages.DataModel`, or\n    2. a union of multiple `aact.DataModel` subclasses, or\n    3. `aact.DataModel` itself to allow any type of message (not recommended).[^1]\n\n    Any subclass of `aact.Node` must implement the `event_handler` method, which is the main computation logic of the\n    node. The `event_handler` method takes two arguments: `input_channel` and `input_message`, and returns an async\n    iterator of tuples of output channel and output message.\n\n    For example, the following code snippet shows a simple node that takes a `aact.messages.Text` message from the `a`\n    channel and echo it to the `b` channel.\n\n    ```python\n    from aact import Node, Message\n    from aact.messages import Text\n\n    from typing import AsyncIterator\n\n    class EchoNode(Node[Text, Text]):\n        def event_handler(self, input_channel: str, input_message: Message[Text]) -> AsyncIterator[str, Message[Text]]:\n            yield \"b\", Message[Text](data=input_message.data)\n    ```\n\n    ## Built-in Nodes\n\n    aact provides several built-in nodes that you can use out of the box. Here are some of the built-in nodes:\n\n    - `aact.nodes.listener.ListenerNode`: A node that listens to the audio input from the microphone.\n    - `aact.nodes.speaker.SpeakerNode`: A node that plays the audio output to the speaker.\n    - `aact.nodes.record.RecordNode`: A node that records the messages to a file.\n    - `aact.nodes.print.PrintNode`: A node that prints the messages to the console.\n    - `aact.nodes.tick.TickNode`: A node that sends a tick message at a fixed interval.\n    - `aact.nodes.random.RandomNode`: A node that sends a random number message.\n    - `aact.nodes.transcriber.TranscriberNode`: A node that transcribes the audio messages to text.\n    - `aact.nodes.tts.TTSNode`: A node that converts the text messages to audio.\n\n    ## Common usage\n\n    The usage of nodes is in the [quick start guide](aact.html/#usage).\n\n    ## Advanced usage\n\n    ### Send messages on your own\n\n    The default behavior of sending messages in the base Node class is handled in the `event_loop` method. If you want to\n    send messages on your own, you can directly use the Redis instance `r` to publish messages to the output channels.\n\n    ```python\n\n    class YourNode(Node[InputType, OutputType]):\n\n        async def func_where_you_send_messages(self):\n            await self.r.publish(your_output_channel, Message[OutputType](data=your_output_message).model_dump_json())\n\n    ```\n\n    ### Customize set up and tear down\n\n    You can customize the set up and tear down of the node by overriding the `__aenter__` and `__aexit__` methods. For\n    example, you can open a file in the `__aenter__` method and close it in the `__aexit__` method.\n\n    ```python\n\n    class YourNode(Node[InputType, OutputType]):\n\n        async def __aenter__(self) -> Self:\n            self.file = open(\"your_file.txt\", \"w\")\n            return await super().__aenter__()\n\n        async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:\n            self.file.close()\n            return await super().__aexit__(exc_type, exc_value, traceback)\n    ```\n\n    This will ensure the file is closed properly even if an exception is raised.\n\n    ### Background tasks\n\n    You can run background tasks in the node by creating a task in the `__aenter__` method and cancelling it in the\n    `__aexit__` method.\n\n    ```python\n\n    class YourNode(Node[InputType, OutputType]):\n\n        async def __aenter__(self) -> Self:\n\n            self.task = asyncio.create_task(self.background_task())\n            return await super().__aenter__()\n\n        async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:\n            self.task.cancel()\n\n            try:\n                await self.task\n            except asyncio.CancelledError:\n                pass\n    ```\n\n    [^1]: Only if you know what you are doing. For example, in the `aact.nodes.record.RecordNode`, the `InputType` is\n    `aact.messages.DataModel` because it can accept any type of message. But in most cases, you should specify the\n    `InputType` and `OutputType` to be a specific subclass of `aact.messages.DataModel`.\n    \"\"\"\n\n    input_channel_types: dict[str, Type[InputType]]\n    \"\"\"\n    A dictionary that maps the input channel names to the corresponding input message types.\n    \"\"\"\n    output_channel_types: dict[str, Type[OutputType]]\n    \"\"\"\n    A dictionary that maps the output channel names to the corresponding output message types.\n    \"\"\"\n    redis_url: str\n    \"\"\"\n    The URL of the Redis server. It should be in the format of `redis://<host>:<port>/<db>`.\n    \"\"\"\n    model_config = ConfigDict(extra=\"allow\")\n    \"\"\"\n    @private\n    \"\"\"\n\n    def __init__(\n        self,\n        input_channel_types: list[tuple[str, Type[InputType]]],\n        output_channel_types: list[tuple[str, Type[OutputType]]],\n        redis_url: str = \"redis://localhost:6379/0\",\n    ):\n        try:\n            super().__init__(\n                input_channel_types=dict(input_channel_types),\n                output_channel_types=dict(output_channel_types),\n                redis_url=redis_url,\n            )\n        except ValidationError as _:\n            raise NodeConfigurationError(\n                \"You passed an invalid configuration to the Node.\\n\"\n                f\"The required input channel types are: {self.model_fields['input_channel_types'].annotation}\\n\"\n                f\"The input channel types are: {input_channel_types}\\n\"\n                f\"The required output channel types are: {self.model_fields['output_channel_types'].annotation}\\n\"\n                f\"The output channel types are: {output_channel_types}\\n\"\n            )\n\n        self.r: Redis = Redis.from_url(redis_url)\n        \"\"\"\n        @private\n        \"\"\"\n        self.pubsub = self.r.pubsub()\n        \"\"\"\n        @private\n        \"\"\"\n        self.logger = logging.getLogger(\"aact.nodes.base.Node\")\n        \"\"\"\n        @private\n        \"\"\"\n\n    async def __aenter__(self) -> Self:\n        try:\n            await self.r.ping()\n        except ConnectionError:\n            raise ValueError(\n                f\"Could not connect to Redis with the provided url. {self.redis_url}\"\n            )\n        await self.pubsub.subscribe(*self.input_channel_types.keys())\n        return self\n\n    async def __aexit__(self, _: Any, __: Any, ___: Any) -> None:\n        await self.pubsub.unsubscribe()\n        await self.r.aclose()\n\n    async def _wait_for_input(\n        self,\n    ) -> AsyncIterator[tuple[str, Message[InputType]]]:\n        async for message in self.pubsub.listen():\n            channel = message[\"channel\"].decode(\"utf-8\")\n            if message[\"type\"] == \"message\" and channel in self.input_channel_types:\n                try:\n                    data = Message[\n                        self.input_channel_types[channel]  # type: ignore[name-defined]\n                    ].model_validate_json(message[\"data\"])\n                except ValidationError as e:\n                    self.logger.error(\n                        f\"Failed to validate message from {channel}: {message['data']}. Error: {e}\"\n                    )\n                    raise e\n                yield channel, data\n        raise Exception(\"Input channel closed unexpectedly\")\n\n    async def event_loop(\n        self,\n    ) -> None:\n        \"\"\"\n        The main event loop of the node.\n        The default implementation of the event loop is to wait for input messages from the input channels and call the\n        `event_handler` method for each input message, and send each output message to the corresponding output channel.\n        \"\"\"\n        try:\n            async for input_channel, input_message in self._wait_for_input():\n                async for output_channel, output_message in self.event_handler(\n                    input_channel, input_message\n                ):\n                    await self.r.publish(\n                        output_channel, output_message.model_dump_json()\n                    )\n        except NodeExitSignal as e:\n            self.logger.info(f\"Event loop cancelled: {e}. Exiting gracefully.\")\n        except Exception as e:\n            raise e\n\n    @abstractmethod\n    async def event_handler(\n        self, _: str, __: Message[InputType]\n    ) -> AsyncIterator[tuple[str, Message[OutputType]]]:\n        \"\"\"\n        @private\n        \"\"\"\n        raise NotImplementedError(\"event_handler must be implemented in a subclass.\")\n        yield \"\", self.output_type()  # unreachable: dummy return value\n
    "},{"location":"api/nodes/#aact.Node.input_channel_types","title":"input_channel_types instance-attribute","text":"
    input_channel_types: dict[str, Type[InputType]]\n

    A dictionary that maps the input channel names to the corresponding input message types.

    "},{"location":"api/nodes/#aact.Node.output_channel_types","title":"output_channel_types instance-attribute","text":"
    output_channel_types: dict[str, Type[OutputType]]\n

    A dictionary that maps the output channel names to the corresponding output message types.

    "},{"location":"api/nodes/#aact.Node.redis_url","title":"redis_url instance-attribute","text":"
    redis_url: str\n

    The URL of the Redis server. It should be in the format of redis://<host>:<port>/<db>.

    "},{"location":"api/nodes/#aact.Node.model_config","title":"model_config class-attribute instance-attribute","text":"
    model_config = ConfigDict(extra='allow')\n

    @private

    "},{"location":"api/nodes/#aact.Node.r","title":"r instance-attribute","text":"
    r: Redis = from_url(redis_url)\n

    @private

    "},{"location":"api/nodes/#aact.Node.pubsub","title":"pubsub instance-attribute","text":"
    pubsub = pubsub()\n

    @private

    "},{"location":"api/nodes/#aact.Node.logger","title":"logger instance-attribute","text":"
    logger = getLogger('aact.nodes.base.Node')\n

    @private

    "},{"location":"api/nodes/#aact.Node.event_loop","title":"event_loop async","text":"
    event_loop() -> None\n

    The main event loop of the node. The default implementation of the event loop is to wait for input messages from the input channels and call the event_handler method for each input message, and send each output message to the corresponding output channel.

    Source code in src/aact/nodes/base.py
    async def event_loop(\n    self,\n) -> None:\n    \"\"\"\n    The main event loop of the node.\n    The default implementation of the event loop is to wait for input messages from the input channels and call the\n    `event_handler` method for each input message, and send each output message to the corresponding output channel.\n    \"\"\"\n    try:\n        async for input_channel, input_message in self._wait_for_input():\n            async for output_channel, output_message in self.event_handler(\n                input_channel, input_message\n            ):\n                await self.r.publish(\n                    output_channel, output_message.model_dump_json()\n                )\n    except NodeExitSignal as e:\n        self.logger.info(f\"Event loop cancelled: {e}. Exiting gracefully.\")\n    except Exception as e:\n        raise e\n
    "},{"location":"api/nodes/#aact.Node.event_handler","title":"event_handler abstractmethod async","text":"
    event_handler(_: str, __: Message[InputType]) -> AsyncIterator[tuple[str, Message[OutputType]]]\n

    @private

    Source code in src/aact/nodes/base.py
    @abstractmethod\nasync def event_handler(\n    self, _: str, __: Message[InputType]\n) -> AsyncIterator[tuple[str, Message[OutputType]]]:\n    \"\"\"\n    @private\n    \"\"\"\n    raise NotImplementedError(\"event_handler must be implemented in a subclass.\")\n    yield \"\", self.output_type()  # unreachable: dummy return value\n
    "},{"location":"api/nodes/#aact.NodeFactory","title":"aact.NodeFactory","text":"

    To use nodes in the dataflow, you need to register them in the NodeFactory before using them. The reason for this is to allow users write string names in toml files which can be converted to actual classes at runtime.

    To register a node, you need to use the @NodeFactory.register decorator.

    Example:

    from aact import Node, NodeFactory\n\n@NodeFactory.register(\"node_name\")\nclass YourNode(Node[your_input_type, your_output_type]):\n    # Your implementation of the node\n
    For power users You can initialize a node using the `NodeFactory.make` method.
    from aact import NodeFactory\n\nnode = NodeFactory.make(\"node_name\", ...)# your arguments\n
    "},{"location":"api/nodes/#aact.NodeFactory.registry","title":"registry class-attribute instance-attribute","text":"
    registry: dict[str, type[Node[DataModel, DataModel]]] = {}\n

    @private

    "},{"location":"api/nodes/#aact.NodeFactory.register","title":"register classmethod","text":"
    register(name: str) -> Callable[[type[Node[InputType, OutputType]]], type[Node[InputType, OutputType]]]\n

    @private

    Source code in src/aact/nodes/registry.py
    @classmethod\ndef register(\n    cls, name: str\n) -> Callable[\n    [type[Node[InputType, OutputType]]], type[Node[InputType, OutputType]]\n]:\n    \"\"\"\n    @private\n    \"\"\"\n\n    def inner_wrapper(\n        wrapped_class: type[Node[InputType, OutputType]],\n    ) -> type[Node[InputType, OutputType]]:\n        if name in cls.registry:\n            logger.warning(\"Executor %s already exists. Will replace it\", name)\n        cls.registry[name] = wrapped_class\n        return wrapped_class\n\n    return inner_wrapper\n
    "},{"location":"api/nodes/#aact.NodeFactory.make","title":"make classmethod","text":"
    make(name: str, **kwargs: Any) -> Node[DataModel, DataModel]\n

    @private

    Source code in src/aact/nodes/registry.py
    @classmethod\ndef make(cls, name: str, **kwargs: Any) -> Node[DataModel, DataModel]:\n    \"\"\"\n    @private\n    \"\"\"\n    if name not in cls.registry:\n        raise ValueError(f\"Executor {name} not found in registry\")\n    return cls.registry[name](**kwargs)\n
    "},{"location":"applications/robot-teleoperation/","title":"Teleoperating Robots with AAct, Quest, and Stretch","text":"

    The latency of the AAct system is low enough to allow for teleoperating robots in real-time. This is a powerful capability that can be used for a variety of applications, such as teleoperating a robot to perform a task in a remote location, collecting ego-centric (or together with exocentric) data for training robotics models, or deploying and evaluating models in the real world.

    In this demo (live demoed at CoRL 2024), we are going to use Meta Oculus Quest 3 / Pro and Stretch 3 mobile manipulator.

    "},{"location":"applications/robot-teleoperation/#prerequisites","title":"Prerequisites","text":""},{"location":"applications/robot-teleoperation/#hardware","title":"Hardware","text":""},{"location":"applications/robot-teleoperation/#software","title":"Software","text":""},{"location":"applications/robot-teleoperation/#steps","title":"Steps","text":"

    The overall steps are:

    "},{"location":"applications/robot-teleoperation/#launch-stretch-control-loop","title":"Launch Stretch control loop","text":"

    Before running nodes on stretch, please do these:

    1. Homing: python -m teleop.stretch_home
    2. Running deamon control loop in a tmux or nohup or screen: python -m teleop.stretch_control_loop
    "},{"location":"applications/robot-teleoperation/#launch-aact-nodes-on-stretch","title":"Launch AAct nodes on Stretch","text":"

    You can easily launch the AAct nodes on Stretch by running the following command:

    aact run-dataflow dataflows/examples/stretch_zmq_streaming.toml\n
    "},{"location":"applications/robot-teleoperation/#launch-aact-nodes-on-a-local-machine","title":"Launch AAct nodes on a local machine","text":"

    Before this step, please get the IP of your Oculus Quest. And change line 40 in dataflows/examples/quest_local_redis.toml to your IP.

    Then, you can launch the AAct nodes on a local machine by running the following command:

    aact run-dataflow dataflows/examples/quest_local_redis.toml\n
    "},{"location":"applications/robot-teleoperation/#build-and-launch-app-on-meta-quest","title":"Build and launch app on Meta Quest","text":"

    We provide the APK file for the app. You can install it on your Meta Quest by running the following command:

    adb install -r app.apk\n

    But you can also build the app manually, by building the Unity Project.

    "},{"location":"applications/robot-teleoperation/#demo","title":"Demo","text":""}]} \ No newline at end of file diff --git a/latest/usage/index.html b/latest/usage/index.html index b2a8579..f82eaec 100644 --- a/latest/usage/index.html +++ b/latest/usage/index.html @@ -321,6 +321,33 @@ + + + + + + + + + + + + +
  • + + + + Applications + + +
  • + + + + + + + @@ -836,6 +863,122 @@ + + + + + + + + + + + + + + + +
  • + + + + + + + + + + +
  • + + + diff --git a/latest/why/index.html b/latest/why/index.html index 2bde3ee..2855305 100644 --- a/latest/why/index.html +++ b/latest/why/index.html @@ -321,6 +321,33 @@ + + + + + + + + + + + + +
  • + + + + Applications + + +
  • + + + + + + + @@ -751,6 +778,122 @@ + + + + + + + + + + + + + + + +
  • + + + + + + + + + + +
  • + + +