diff --git a/pydantic_ai_slim/pydantic_ai/result.py b/pydantic_ai_slim/pydantic_ai/result.py index c8953bfcb..5fda2a0cf 100644 --- a/pydantic_ai_slim/pydantic_ai/result.py +++ b/pydantic_ai_slim/pydantic_ai/result.py @@ -11,6 +11,7 @@ from typing_extensions import TypeVar from . import _result, _utils, exceptions, messages as _messages, models +from .messages import ModelResponseStreamEvent, PartStartEvent from .tools import AgentDeps, RunContext from .usage import Usage, UsageLimits @@ -271,6 +272,23 @@ async def _stream_text_deltas() -> AsyncIterator[str]: lf_span.set_attribute('combined_text', combined_validated_text) await self._marked_completed(_messages.ModelResponse.from_text(combined_validated_text)) + async def stream_events(self) -> AsyncIterator[ModelResponseStreamEvent]: + usage_checking_stream = _get_usage_checking_stream_response( + self._stream_response, self._usage_limits, self.usage + ) + with _logfire.span('response stream events') as lf_span: + # Yield an "event" for each existing part before continuing to stream new events + msg = self._stream_response.get() + for idx, part in enumerate(msg.parts): + yield PartStartEvent(idx, part) + + async for event in usage_checking_stream: + yield event + + msg = self._stream_response.get() + lf_span.set_attribute('structured_response', msg) + await self._marked_completed(msg) + async def stream_structured( self, *, debounce_by: float | None = 0.1 ) -> AsyncIterator[tuple[_messages.ModelResponse, bool]]: