Skip to content

Commit

Permalink
faststream: Fix broker logging (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
vrslev authored Feb 10, 2025
1 parent be5e973 commit 1acadd9
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 16 deletions.
19 changes: 8 additions & 11 deletions packages/faststream-stomp/faststream_stomp/broker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import types
from collections.abc import Callable, Iterable, Mapping, Sequence
from typing import Any
from typing import Any, Unpack

import anyio
import stompman
Expand All @@ -11,11 +11,11 @@
from faststream.broker.types import BrokerMiddleware, CustomCallable
from faststream.log.logging import get_broker_logger
from faststream.security import BaseSecurity
from faststream.types import AnyDict, Decorator, LoggerProto, SendableMessage
from faststream.types import EMPTY, AnyDict, Decorator, LoggerProto, SendableMessage

from faststream_stomp.publisher import StompProducer, StompPublisher
from faststream_stomp.registrator import StompRegistrator
from faststream_stomp.subscriber import StompSubscriber
from faststream_stomp.subscriber import StompLogContext, StompSubscriber


class StompSecurity(BaseSecurity):
Expand Down Expand Up @@ -46,7 +46,7 @@ def __init__(
middlewares: Sequence[BrokerMiddleware[stompman.MessageFrame]] = (),
graceful_timeout: float | None = 15.0,
# Logging args
logger: LoggerProto | None = None,
logger: LoggerProto | None = EMPTY,
log_level: int = logging.INFO,
log_fmt: str | None = None,
# FastDepends args
Expand Down Expand Up @@ -87,10 +87,7 @@ async def start(self) -> None:
await super().start()

for handler in self._subscribers.values():
self._log(
f"`{handler.call_name}` waiting for messages",
extra=handler.get_log_context(None),
)
self._log(f"`{handler.call_name}` waiting for messages", extra=handler.get_log_context(None))
await handler.start()

async def _connect(self, client: stompman.Client) -> stompman.Client: # type: ignore[override]
Expand Down Expand Up @@ -125,15 +122,15 @@ async def ping(self, timeout: float | None = None) -> bool:
return False # pragma: no cover

def get_fmt(self) -> str:
# `StompLogContext`
return (
"%(asctime)s %(levelname)-8s - "
f"%(channel)-{self._max_channel_name}s | "
f"%(destination)-{self._max_channel_name}s | "
f"%(message_id)-{self.__max_msg_id_ln}s "
"- %(message)s"
)

def _setup_log_context(self, *, channel: str | None = None) -> None:
self._max_channel_name = max((self._max_channel_name, len(channel or "")))
def _setup_log_context(self, **log_context: Unpack[StompLogContext]) -> None: ... # type: ignore[override]

@property
def _subscriber_setup_extra(self) -> "AnyDict":
Expand Down
5 changes: 2 additions & 3 deletions packages/faststream-stomp/faststream_stomp/message.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import typing
from typing import cast
from typing import Self, cast

import stompman
from faststream.broker.message import StreamMessage, gen_cor_id
Expand All @@ -22,7 +21,7 @@ async def reject(self) -> None:
return await super().reject()

@classmethod
async def from_frame(cls, message: stompman.AckableMessageFrame) -> typing.Self:
async def from_frame(cls, message: stompman.AckableMessageFrame) -> Self:
return cls(
raw_message=message,
body=message.body,
Expand Down
14 changes: 13 additions & 1 deletion packages/faststream-stomp/faststream_stomp/subscriber.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections.abc import Callable, Iterable, Sequence
from typing import Any
from typing import Any, TypedDict, cast

import stompman
from fast_depends.dependencies import Depends
Expand All @@ -16,6 +16,11 @@
from faststream_stomp.message import StompStreamMessage


class StompLogContext(TypedDict):
destination: str
message_id: str


class StompSubscriber(SubscriberUsecase[stompman.MessageFrame]):
def __init__(
self,
Expand Down Expand Up @@ -128,3 +133,10 @@ def get_schema(self) -> dict[str, Channel]:
),
)
}

def get_log_context(self, message: StreamMessage[stompman.MessageFrame] | None) -> dict[str, str]:
log_context: StompLogContext = {
"destination": message.raw_message.headers["destination"] if message else self.destination,
"message_id": message.message_id if message else "",
}
return cast("dict[str, str]", log_context)
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import asyncio
from typing import Annotated
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Annotated
from unittest import mock

import faker
import faststream_stomp
import pytest
import stompman
from faststream import BaseMiddleware, Context, FastStream
from faststream.broker.message import gen_cor_id
from faststream.broker.middlewares.logging import CriticalLogMiddleware
from faststream.exceptions import AckMessage, NackMessage, RejectMessage
from faststream_stomp.message import StompStreamMessage

if TYPE_CHECKING:
from faststream_stomp.broker import StompBroker

pytestmark = pytest.mark.anyio


Expand Down Expand Up @@ -151,3 +158,62 @@ async def _(message: Annotated[StompStreamMessage, Context()]) -> None:
await broker.start()
await broker.publish(faker.pystr(), destination)
await event.wait()


class TestLogging:
async def test_ok(
self, monkeypatch: pytest.MonkeyPatch, request: pytest.FixtureRequest, faker: faker.Faker
) -> None:
monkeypatch.delenv("PYTEST_CURRENT_TEST")
broker: StompBroker = request.getfixturevalue("broker")
assert broker.logger
broker.logger = mock.Mock(log=(log_mock := mock.Mock()))

@broker.subscriber(destination := faker.pystr())
def some_handler() -> None: ...

async with broker:
await broker.start()

assert log_mock.mock_calls == [
mock.call(
logging.INFO,
"`SomeHandler` waiting for messages",
extra={"destination": destination, "message_id": ""},
exc_info=None,
)
]

async def test_raises(
self, monkeypatch: pytest.MonkeyPatch, request: pytest.FixtureRequest, faker: faker.Faker
) -> None:
monkeypatch.delenv("PYTEST_CURRENT_TEST")
broker: StompBroker = request.getfixturevalue("broker")
assert isinstance(broker._middlewares[0], CriticalLogMiddleware)
assert broker._middlewares[0].logger
broker._middlewares[0].logger = mock.Mock(log=(log_mock := mock.Mock()))
event = asyncio.Event()
message_id: str | None = None

@dataclass
class MyError(Exception): ...

@broker.subscriber(destination := faker.pystr())
def some_handler(message_frame: Annotated[stompman.MessageFrame, Context("message.raw_message")]) -> None:
nonlocal message_id
message_id = message_frame.headers["message-id"]
event.set()
raise MyError

async with broker:
await broker.start()
await broker.publish(faker.pystr(), destination)
await event.wait()

assert message_id
extra = {"destination": destination, "message_id": message_id}
assert log_mock.mock_calls == [
mock.call(logging.INFO, "Received", extra=extra),
mock.call(logging.ERROR, "MyError: ", extra=extra, exc_info=MyError()),
mock.call(logging.INFO, "Processed", extra=extra),
]

0 comments on commit 1acadd9

Please sign in to comment.