Skip to content

Commit

Permalink
Add GraphRun object to make use of next more ergonomic (#833)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmontagu authored Feb 20, 2025
1 parent 1887280 commit dfc919c
Show file tree
Hide file tree
Showing 31 changed files with 1,717 additions and 908 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ testcov: test ## Run tests and generate a coverage report

.PHONY: update-examples
update-examples: ## Update documentation examples
uv run -m pytest --update-examples
uv run -m pytest --update-examples tests/test_examples.py

# `--no-strict` so you can build the docs without insiders packages
.PHONY: docs
Expand Down
140 changes: 133 additions & 7 deletions docs/agents.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ print(result.data)

## Running Agents

There are three ways to run an agent:
There are four ways to run an agent:

1. [`agent.run()`][pydantic_ai.Agent.run] — a coroutine which returns a [`RunResult`][pydantic_ai.result.RunResult] containing a completed response
2. [`agent.run_sync()`][pydantic_ai.Agent.run_sync] — a plain, synchronous function which returns a [`RunResult`][pydantic_ai.result.RunResult] containing a completed response (internally, this just calls `loop.run_until_complete(self.run())`)
3. [`agent.run_stream()`][pydantic_ai.Agent.run_stream] — a coroutine which returns a [`StreamedRunResult`][pydantic_ai.result.StreamedRunResult], which contains methods to stream a response as an async iterable
1. [`agent.run()`][pydantic_ai.Agent.run] — a coroutine which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response.
2. [`agent.run_sync()`][pydantic_ai.Agent.run_sync] — a plain, synchronous function which returns a [`RunResult`][pydantic_ai.agent.AgentRunResult] containing a completed response (internally, this just calls `loop.run_until_complete(self.run())`).
3. [`agent.run_stream()`][pydantic_ai.Agent.run_stream] — a coroutine which returns a [`StreamedRunResult`][pydantic_ai.result.StreamedRunResult], which contains methods to stream a response as an async iterable.
4. [`agent.iter()`][pydantic_ai.Agent.iter] — a context manager which returns an [`AgentRun`][pydantic_ai.agent.AgentRun], an async-iterable over the nodes of the agent's underlying [`Graph`][pydantic_graph.graph.Graph].

Here's a simple example demonstrating all three:
Here's a simple example demonstrating the first three:

```python {title="run_agent.py"}
from pydantic_ai import Agent
Expand All @@ -94,6 +95,131 @@ _(This example is complete, it can be run "as is" — you'll need to add `asynci
You can also pass messages from previous runs to continue a conversation or provide context, as described in [Messages and Chat History](message-history.md).


### Iterating Over an Agent's Graph

Under the hood, each `Agent` in PydanticAI uses **pydantic-graph** to manage its execution flow. **pydantic-graph** is a generic, type-centric library for building and running finite state machines in Python. It doesn't actually depend on PydanticAI — you can use it standalone for workflows that have nothing to do with GenAI — but PydanticAI makes use of it to orchestrate the handling of model requests and model responses in an agent's run.

In many scenarios, you don't need to worry about pydantic-graph at all; calling `agent.run(...)` simply traverses the underlying graph from start to finish. However, if you need deeper insight or control — for example to capture each tool invocation, or to inject your own logic at specific stages — PydanticAI exposes the lower-level iteration process via [`Agent.iter`][pydantic_ai.Agent.iter]. This method returns an [`AgentRun`][pydantic_ai.agent.AgentRun], which you can async-iterate over, or manually drive node-by-node via the [`next`][pydantic_ai.agent.AgentRun.next] method. Once the agent's graph returns an [`End`][pydantic_graph.nodes.End], you have the final result along with a detailed history of all steps.

#### `async for` iteration

Here's an example of using `async for` with `iter` to record each node the agent executes:

```python {title="agent_iter_async_for.py"}
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')


async def main():
nodes = []
# Begin an AgentRun, which is an async-iterable over the nodes of the agent's graph
with agent.iter('What is the capital of France?') as agent_run:
async for node in agent_run:
# Each node represents a step in the agent's execution
nodes.append(node)
print(nodes)
"""
[
ModelRequestNode(
request=ModelRequest(
parts=[
UserPromptPart(
content='What is the capital of France?',
timestamp=datetime.datetime(...),
part_kind='user-prompt',
)
],
kind='request',
)
),
HandleResponseNode(
model_response=ModelResponse(
parts=[TextPart(content='Paris', part_kind='text')],
model_name='function:model_logic',
timestamp=datetime.datetime(...),
kind='response',
)
),
End(data=FinalResult(data='Paris', tool_name=None)),
]
"""
print(agent_run.result.data)
#> Paris
```

- The `AgentRun` is an async iterator that yields each node (`BaseNode` or `End`) in the flow.
- The run ends when an `End` node is returned.

#### Using `.next(...)` manually

You can also drive the iteration manually by passing the node you want to run next to the `AgentRun.next(...)` method. This allows you to inspect or modify the node before it executes or skip nodes based on your own logic, and to catch errors in `next()` more easily:

```python {title="agent_iter_next.py"}
from pydantic_ai import Agent
from pydantic_graph import End

agent = Agent('openai:gpt-4o')


async def main():
with agent.iter('What is the capital of France?') as agent_run:
node = agent_run.next_node # (1)!

all_nodes = [node]

# Drive the iteration manually:
while not isinstance(node, End): # (2)!
node = await agent_run.next(node) # (3)!
all_nodes.append(node) # (4)!

print(all_nodes)
"""
[
UserPromptNode(
user_prompt='What is the capital of France?',
system_prompts=(),
system_prompt_functions=[],
system_prompt_dynamic_functions={},
),
ModelRequestNode(
request=ModelRequest(
parts=[
UserPromptPart(
content='What is the capital of France?',
timestamp=datetime.datetime(...),
part_kind='user-prompt',
)
],
kind='request',
)
),
HandleResponseNode(
model_response=ModelResponse(
parts=[TextPart(content='Paris', part_kind='text')],
model_name='function:model_logic',
timestamp=datetime.datetime(...),
kind='response',
)
),
End(data=FinalResult(data='Paris', tool_name=None)),
]
"""
```

1. We start by grabbing the first node that will be run in the agent's graph.
2. The agent run is finished once an `End` node has been produced; instances of `End` cannot be passed to `next`.
3. When you call `await agent_run.next(node)`, it executes that node in the agent's graph, updates the run's history, and returns the *next* node to run.
4. You could also inspect or mutate the new `node` here as needed.

#### Accessing usage and the final result

You can retrieve usage statistics (tokens, requests, etc.) at any time from the [`AgentRun`][pydantic_ai.agent.AgentRun] object via `agent_run.usage()`. This method returns a [`Usage`][pydantic_ai.usage.Usage] object containing the usage data.

Once the run finishes, `agent_run.final_result` becomes a [`AgentRunResult`][pydantic_ai.agent.AgentRunResult] object containing the final output (and related metadata).

---

### Additional Configuration

#### Usage Limits
Expand Down Expand Up @@ -177,7 +303,7 @@ except UsageLimitExceeded as e:
2. This run will error after 3 requests, preventing the infinite tool calling.

!!! note
This is especially relevant if you're registered a lot of tools, `request_limit` can be used to prevent the model from choosing to make too many of these calls.
This is especially relevant if you've registered many tools. The `request_limit` can be used to prevent the model from calling them in a loop too many times.

#### Model (Run) Settings

Expand Down Expand Up @@ -441,7 +567,7 @@ If models behave unexpectedly (e.g., the retry limit is exceeded, or their API r

In these cases, [`capture_run_messages`][pydantic_ai.capture_run_messages] can be used to access the messages exchanged during the run to help diagnose the issue.

```python
```python {title="agent_model_errors.py"}
from pydantic_ai import Agent, ModelRetry, UnexpectedModelBehavior, capture_run_messages

agent = Agent('openai:gpt-4o')
Expand Down
4 changes: 3 additions & 1 deletion docs/api/agent.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
options:
members:
- Agent
- AgentRun
- AgentRunResult
- EndStrategy
- RunResultData
- RunResultDataT
- capture_run_messages
5 changes: 4 additions & 1 deletion docs/api/result.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@

::: pydantic_ai.result
options:
inherited_members: true
inherited_members: true
members:
- ResultDataT
- StreamedRunResult
Loading

0 comments on commit dfc919c

Please sign in to comment.