Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add async support #3

Merged
merged 1 commit into from
Mar 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
# Change Log

## [0.10.0] - 2024-03-17

### Added

- Add async support in `Application`. New methods: `call_async`, `execute_async`, `publish_async`
- Add async support in `TransactionContext`. New methods: `call_async`, `execute_async`, `publish_async`

## [0.9.0] - 2024-02-28

### Added

- Add Sphinx docs

### Changed

- Rename `Task` to `Command`
Expand All @@ -12,4 +22,4 @@

## [0.8.0] - 2024-01-08

No history.
Early release.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
format:
pre-commit run --all-files

test:
pytest
mypy lato
pytest --doctest-modules lato

docs_autobuild:
sphinx-autobuild --watch lato -E docs docs/_build/html
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ Based on dependency injection and Python 3.6+ type hints.

- **Minimalistic**: Intuitive and lean API for rapid development without the bloat.

- **Async Support**: Concurrency and async / await is supported.


## Installation

Expand Down
22 changes: 22 additions & 0 deletions examples/async_example/example1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import asyncio

from lato import Application, TransactionContext


async def add_async(a, b):
await asyncio.sleep(1)
return a + b


if __name__ == "__main__":
with TransactionContext() as ctx:
result = ctx.call(add_async, 1, 2)
print(result)

result = asyncio.run(result)

print("got result from asyncio.run", result)

app = Application("async")
result = app.call(add_async, 1, 2)
print(result)
21 changes: 21 additions & 0 deletions examples/async_example/example2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import asyncio

from lato import Application, Command


class MultiplyCommand(Command):
a: int
b: int


app = Application("async")


@app.handler(MultiplyCommand)
async def multiply_async(command: MultiplyCommand):
await asyncio.sleep(1)
return command.a * command.b


coroutine = app.execute(MultiplyCommand(a=10, b=20))
print("execution result", coroutine)
57 changes: 57 additions & 0 deletions examples/async_example/toy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import asyncio
import logging

from lato import Application, TransactionContext

logging.basicConfig(level=logging.DEBUG)
root_logger = logging.getLogger("toy")

app = Application()


class Counter:
def __init__(self):
self.value = 0

def next(self):
self.value += 1
return self.value


counter = Counter()


@app.on_enter_transaction_context
async def on_enter_transaction_context(ctx: TransactionContext):
correlation_id = str(counter.next())
logger = root_logger.getChild(correlation_id)
ctx.set_dependencies(logger=logger)
logger.info("Connecting to database")
await asyncio.sleep(0.001)
logger.info("Connected")


@app.on_exit_transaction_context
async def on_exit_transaction_context(ctx: TransactionContext, exception=None):
logger = ctx["logger"]
logger.info("Disconnecting from database")
await asyncio.sleep(0.001)
logger.info("Disconnected from database")


@app.handler("foo")
async def handle_foo(x, logger):
logger.info(f"Starting foo, x={x}")
await asyncio.sleep(0.001)
logger.info("Finished foo")


async def main() -> None:
await asyncio.gather(
app.call_async("foo", x=1),
app.call_async("foo", x=2),
app.call_async("foo", x=3),
)


asyncio.run(main())
150 changes: 131 additions & 19 deletions lato/application.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import logging
from collections.abc import Callable
from typing import Any, Optional, Union, List
from lato.types import DependencyIdentifier
from collections.abc import Awaitable, Callable
from typing import Any, Optional, Union

from lato.application_module import ApplicationModule
from lato.dependency_provider import BasicDependencyProvider, DependencyProvider
from lato.message import Command, Event, Message
from lato.transaction_context import TransactionContext
from lato.message import Event, Message
from lato.transaction_context import (
ComposerFunction,
MiddlewareFunction,
OnEnterTransactionContextCallback,
OnExitTransactionContextCallback,
TransactionContext,
)
from lato.types import DependencyIdentifier

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -39,11 +46,15 @@ def __init__(
self.dependency_provider = (
dependency_provider or self.dependency_provider_factory(**kwargs)
)
self._transaction_context_factory = None
self._on_enter_transaction_context = lambda ctx: None
self._on_exit_transaction_context = lambda ctx, exception=None: None
self._transaction_middlewares: List[Callable] = []
self._composers: dict[Union[Message, str], Callable] = {}
self._transaction_context_factory: Optional[Callable] = None
self._on_enter_transaction_context: Optional[
OnEnterTransactionContextCallback
] = None
self._on_exit_transaction_context: Optional[
OnExitTransactionContextCallback
] = None
self._transaction_middlewares: list[MiddlewareFunction] = []
self._composers: dict[Union[Message, str], ComposerFunction] = {}

def get_dependency(self, identifier: DependencyIdentifier) -> Any:
"""Gets a dependency from the dependency provider. Dependencies can be resolved either by name or by type.
Expand All @@ -57,7 +68,7 @@ def get_dependency(self, identifier: DependencyIdentifier) -> Any:
def __getitem__(self, identifier: DependencyIdentifier) -> Any:
return self.get_dependency(identifier)

def call(self, func: Union[Callable, str], *args, **kwargs):
def call(self, func: Union[Callable[..., Any], str], *args, **kwargs) -> Any:
"""Invokes a function with `args` and `kwargs` within the :class:`TransactionContext`.
If `func` is a string, then it is an alias, and the corresponding handler for the alias is retrieved.
Any missing arguments are provided by the dependency provider of a transaction context,
Expand All @@ -81,6 +92,32 @@ def call(self, func: Union[Callable, str], *args, **kwargs):
result = ctx.call(func, *args, **kwargs)
return result

async def call_async(
self, func: Union[Callable[..., Awaitable[Any]], str], *args, **kwargs
) -> Any:
"""Invokes an async function with `args` and `kwargs` within the :class:`TransactionContext`.
If `func` is a string, then it is an alias, and the corresponding handler for the alias is retrieved.
Any missing arguments are provided by the dependency provider of a transaction context,
and args and kwargs parameters.

:param func: The async function to invoke, or an alias.
:param args: Arguments to pass to the function.
:param kwargs: Keyword arguments to pass to the function.

:return: The result of the invoked function.

:raises ValueError: If an alias is provided, but no corresponding handler is found.
"""
if isinstance(func, str):
try:
func = next(self.iterate_handlers_for(alias=func))
except StopIteration:
raise ValueError(f"Handler not found", func)

async with self.transaction_context() as ctx:
result = await ctx.call_async(func, *args, **kwargs)
return result

def execute(self, message: Message) -> Any:
"""Executes a command within the :class:`TransactionContext`.
Use :func:`handler` decorator to register a handler for the command.
Expand All @@ -96,28 +133,88 @@ def execute(self, message: Message) -> Any:
result = ctx.execute(message)
return result

async def execute_async(self, message: Message) -> Any:
"""Asynchronously executes a command within the :class:`TransactionContext`.
Use :func:`handler` decorator to register a handler for the command.
If a command is handled by multiple handlers, then the final result is
composed to a single return value using :func:`compose` decorator.

:param message: The message to be executed (usually, a :class:`Command` or :class:`Query` subclass).
:return: The result of the invoked message handler.

:raises: ValueError: If no handlers are found for the message.
"""
async with self.transaction_context() as ctx:
result = await ctx.execute(message)
return result

def emit(self, event: Event) -> dict[Callable, Any]:
"""Deprecated. Use `publish()` instead."""
return self.publish(event)

def publish(self, event: Event) -> dict[Callable, Any]:
"""
Publish an event by calling all handlers for that event.

:param event: The event to publish, or an alias of an event handler to call.
:return: A dictionary mapping handlers to their results.
"""
with self.transaction_context() as ctx:
result = ctx.emit(event)
result = ctx.publish(event)
return result

async def publish_async(self, event: Event) -> dict[Callable, Any]:
"""
Asynchronously publish an event by calling all handlers for that event.

:param event: The event to publish, or an alias of an event handler to call.
:return: A dictionary mapping handlers to their results.
"""
async with self.transaction_context() as ctx:
result = await ctx.publish_async(event)
return result

def on_enter_transaction_context(self, func):
"""
Decorator for registering a function to be called when entering a transaction context

:param func:
:return:
:param func: callback to be called when entering a transaction context
:return: the decorated function

**Example:**

>>> from lato import Application, TransactionContext
>>> app = Application()
>>> @app.on_enter_transaction_context
... def on_enter_transaction_context(ctx: TransactionContext):
... print('entering transaction context')
... ctx.set_dependencies(foo="foo")
>>> app.call(lambda foo: print(foo))
entering transaction context
foo
"""

self._on_enter_transaction_context = func
return func

def on_exit_transaction_context(self, func):
"""
Decorator for registering a function to be called when exiting a transaction context

:param func:
:return:
:param func: callback to be called when exiting a transaction context
:return: the decorated function

**Example:**

>>> from lato import Application, TransactionContext
>>> app = Application()
>>>
>>> @app.on_exit_transaction_context
... def on_exit_transaction_context(ctx: TransactionContext, exception=None):
... print("exiting context")
>>> app.call(lambda: print("calling"))
calling
exiting context
"""
self._on_exit_transaction_context = func
return func
Expand All @@ -126,8 +223,23 @@ def on_create_transaction_context(self, func):
"""
Decorator for overriding default transaction context creation

:param func:
:return:
:param func: callback to be called when creating a transaction context
:return: the decorated function

**Example:**

>>> from lato import Application, TransactionContext
>>> app = Application()
>>>
>>> class CustomTransactionContext(TransactionContext):
... pass
>>>
>>> @app.on_create_transaction_context
... def create_transaction_context(**kwargs):
... return CustomTransactionContext(**kwargs)
>>>
>>> print(app.transaction_context(foo="bar").__class__.__name__)
CustomTransactionContext
"""
self._transaction_context_factory = func
return func
Expand All @@ -136,7 +248,7 @@ def transaction_middleware(self, middleware_func):
"""
Decorator for registering a middleware function to be called when executing a function in a transaction context
:param middleware_func:
:return:
:return: the decorated function
"""
self._transaction_middlewares.insert(0, middleware_func)
return middleware_func
Expand Down
Loading