Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: shorten intermediate outputs of the companion #358

Merged
merged 18 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/agents/common/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
IS_LAST_STEP,
MESSAGES,
MY_TASK,
SUBTASKS,
SUMMARIZATION,
)
from agents.common.state import BaseAgentState, SubTaskStatus
Expand Down Expand Up @@ -180,7 +181,7 @@ def _finalizer_node(self, state: BaseAgentState, config: RunnableConfig) -> Any:
if state.my_task is not None:
state.my_task.complete()
# clean all agent messages to avoid populating the checkpoint with unnecessary messages.
return {MESSAGES: [state.agent_messages[-1]]}
return {MESSAGES: [state.agent_messages[-1]], SUBTASKS: state.subtasks}

def _build_graph(self, state_class: type) -> Any:
# Define a new graph
Expand Down
10 changes: 8 additions & 2 deletions src/agents/common/constants.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
PLANNER = "Planner"

COMMON = "Common"

SUMMARIZATION = "Summarization"

FINALIZER = "Finalizer"
Expand Down Expand Up @@ -42,8 +40,16 @@

K8S_AGENT = "KubernetesAgent"

K8S_AGENT_TASK_DESCRIPTION = "Fetching data from Kubernetes"

KYMA_AGENT = "KymaAgent"

KYMA_AGENT_TASK_DESCRIPTION = "Fetching data from Kyma"

COMMON = "Common"

COMMON_TASK_DESCRIPTION = "Answering general queries"

RESPONSE_CONVERTER = "ResponseConverter"

NEW_YAML = "New"
Expand Down
5 changes: 4 additions & 1 deletion src/agents/common/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ class SubTask(BaseModel):
description: str = Field(
description="user query with original wording for the assigned agent"
)
task_title: str = Field(
description="""Generate a title of 4 to 5 words, only use these:
'Retrieving', 'Fetching', 'Extracting' or 'Checking'. Never use 'Creating'."""
)
assigned_to: Literal[KYMA_AGENT, K8S_AGENT, COMMON] # type: ignore
status: str = Field(default=SubTaskStatus.PENDING)
result: str | None = None

def complete(self) -> None:
"""Update the result of the task."""
Expand Down
8 changes: 6 additions & 2 deletions src/agents/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
COMMON,
MESSAGES,
MESSAGES_SUMMARY,
SUBTASKS,
SUMMARIZATION,
)
from agents.common.data import Message
Expand Down Expand Up @@ -174,6 +175,7 @@ async def _common_node(self, state: CompanionState) -> dict[str, Any]:
name=COMMON,
)
],
SUBTASKS: state.subtasks,
}
except Exception as e:
logger.error(f"Error in common node: {e}")
Expand All @@ -183,15 +185,17 @@ async def _common_node(self, state: CompanionState) -> dict[str, Any]:
content="Sorry, I am unable to process the request.",
name=COMMON,
)
]
],
SUBTASKS: state.subtasks,
}
return {
MESSAGES: [
AIMessage(
content="All my subtasks are already completed.",
name=COMMON,
)
]
],
SUBTASKS: state.subtasks,
}

def _build_graph(self) -> CompiledGraph:
Expand Down
2 changes: 2 additions & 0 deletions src/agents/supervisor/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ async def _plan(self, state: SupervisorState) -> dict[str, Any]:
if plan.response:
return create_node_output(
message=AIMessage(content=plan.response, name=PLANNER),
subtasks=[], # empty subtask to make the companion response consistent
next=END,
)

Expand All @@ -194,6 +195,7 @@ async def _plan(self, state: SupervisorState) -> dict[str, Any]:
raise SubtasksMissingError(str(state.messages[-1].content))
# return the plan with the subtasks to be dispatched by the Router
return create_node_output(
message=AIMessage(content="", name=PLANNER),
next=ROUTER,
subtasks=plan.subtasks,
)
Expand Down
26 changes: 12 additions & 14 deletions src/agents/supervisor/prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,22 @@
- **Be Context-Aware**: Always consider the broader context of the conversation to ensure responses are relevant and accurate.

# SAMPLE QUERIES AND RESPONSES:
- Kyma or Kubernetes related queries:
Query: "What is Kyma serverless? what is the status of my cluster?"
Query: "What is Kyma serverless? what is the status of my cluster?"

"response": None,
"subtasks": [
("description": "What is Kyma serverless?","assigned_to": "KymaAgent") ,
("description": "what is the status of my cluster?","assigned_to": "KubernetesAgent")
]

"response": None,
"subtasks": [
("description": "What is Kyma serverless?","assigned_to": "KymaAgent" , "task_title" : "Fetching info about Kyma serverless") ,
("description": "what is the status of my cluster?","assigned_to": "KubernetesAgent", "task_title" : "Checking status of cluster")]


Query: "What is kubernetes and Create a hello world app and deploy it with Kyma?"

- Common and Kyma related queries:
Query: "parse the json script in python and deploy it with Kyma?"

"response": None,
"subtasks": [
("description": "parse the json script in python", "assigned_to": "Common"),
("description": "deploy the app with Kyma","assigned_to": "KymaAgent")
]
("description": "What is kubernetes", "assigned_to": "KubernetesAgent"),
("description": "Create a hello world app", "assigned_to": "Common"),
("description": "deploy the app with Kyma","assigned_to": "KymaAgent")
]

- General query and can be answered directly:
Query: "Where is Nils river located?"
Expand Down
4 changes: 3 additions & 1 deletion src/routers/conversations.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,12 @@ async def messages(

return StreamingResponse(
(
prepare_chunk_response(chunk) + b"\n"
chunk_response + b"\n"
async for chunk in conversation_service.handle_request(
conversation_id, message, k8s_client
)
for chunk_response in (prepare_chunk_response(chunk),)
if chunk_response is not None
),
media_type="text/event-stream",
)
Expand Down
55 changes: 50 additions & 5 deletions src/utils/response.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,35 @@
import json
from typing import Any

from agents.common.constants import PLANNER
from agents.common.constants import (
FINALIZER,
NEXT,
PLANNER,
SUMMARIZATION,
)
from agents.common.state import SubTaskStatus
from agents.supervisor.agent import SUPERVISOR
from utils.logging import get_logger

logger = get_logger(__name__)


def reformat_subtasks(subtasks: list[dict[Any, Any]]) -> list[dict[str, Any]]:
"""Reformat subtasks list for companion response"""
tasks = []
if subtasks:
for i, subtask in enumerate(subtasks):
task = {
"task_id": i,
"task_name": subtask["task_title"],
"status": subtask["status"],
"agent": subtask["assigned_to"],
}

tasks.append(task)
return tasks


def process_response(data: dict[str, Any], agent: str) -> dict[str, Any]:
"""Process agent data and return the last message only."""
agent_data = data[agent]
Expand All @@ -16,19 +38,30 @@ def process_response(data: dict[str, Any], agent: str) -> dict[str, Any]:
return {"agent": agent, "error": agent_data["error"]}

answer = {}

if "messages" in agent_data and agent_data["messages"]:
answer["content"] = agent_data["messages"][-1].get("content")

if agent == PLANNER:
answer["subtasks"] = agent_data.get("subtasks")
answer["tasks"] = reformat_subtasks(agent_data.get("subtasks"))

if agent == SUPERVISOR:
answer["next"] = agent_data.get("next")
answer[NEXT] = agent_data.get(NEXT)
else:
if agent_data.get("subtasks"):
pending_subtask = [
subtask["assigned_to"]
for subtask in agent_data.get("subtasks")
if subtask["status"] == SubTaskStatus.PENDING
]
if pending_subtask:
answer[NEXT] = pending_subtask[0]
else:
answer[NEXT] = FINALIZER

return {"agent": agent, "answer": answer}


def prepare_chunk_response(chunk: bytes) -> bytes:
def prepare_chunk_response(chunk: bytes) -> bytes | None:
"""Converts and prepares a final chunk response."""
try:
data = json.loads(chunk)
Expand All @@ -39,12 +72,24 @@ def prepare_chunk_response(chunk: bytes) -> bytes:
).encode()

agent = next(iter(data.keys()), None)

# skip summarization node
if agent == SUMMARIZATION:
return None

if not agent:
logger.error(f"Agent {agent} is not found in the json data")
return json.dumps(
{"event": "unknown", "data": {"error": "No agent found"}}
).encode()

agent_data = data[agent]
if agent_data.get("messages"):
last_agent = agent_data["messages"][-1].get("name")
# skip all intermediate supervisor response
if agent == SUPERVISOR and last_agent != PLANNER and last_agent != FINALIZER:
return None

new_data = process_response(data, agent)

return json.dumps(
Expand Down
23 changes: 20 additions & 3 deletions tests/integration/agents/kyma/test_kyma_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,14 @@ def kyma_agent(app_models):
subtasks=[
{
"description": "What is wrong with ApiRule?",
"task_title": "What is wrong with ApiRule?",
"assigned_to": "KymaAgent",
}
],
my_task=SubTask(
description="What is wrong with API rule?", assigned_to="KymaAgent"
description="What is wrong with API rule?",
task_title="What is wrong with API rule?",
assigned_to="KymaAgent",
),
k8s_client=Mock(spec_set=IK8sClient),
is_last_step=False,
Expand All @@ -156,11 +159,14 @@ def kyma_agent(app_models):
subtasks=[
{
"description": "What is wrong with ApiRule?",
"task_title": "What is wrong with ApiRule?",
"assigned_to": "KymaAgent",
}
],
my_task=SubTask(
description="What is wrong with api rule?", assigned_to="KymaAgent"
description="What is wrong with api rule?",
task_title="What is wrong with api rule?",
assigned_to="KymaAgent",
),
k8s_client=Mock(spec_set=IK8sClient), # noqa
is_last_step=False,
Expand Down Expand Up @@ -204,11 +210,14 @@ def kyma_agent(app_models):
subtasks=[
{
"description": "What is wrong with ApiRule?",
"task_title": "What is wrong with ApiRule?",
"assigned_to": "KymaAgent",
}
],
my_task=SubTask(
description="What is wrong with ApiRule?", assigned_to="KymaAgent"
description="What is wrong with ApiRule?",
task_title="What is wrong with ApiRule?",
assigned_to="KymaAgent",
),
k8s_client=Mock(spec_set=IK8sClient), # noqa
is_last_step=False,
Expand Down Expand Up @@ -268,11 +277,13 @@ def kyma_agent(app_models):
subtasks=[
{
"description": "What is wrong with Function 'func1' in namespace 'kyma-app-serverless-syntax-err' with api version 'serverless.kyma-project.io/v1alpha2'?",
"task_title": "What is wrong with Function 'func1' in namespace 'kyma-app-serverless-syntax-err' with api version 'serverless.kyma-project.io/v1alpha2'?",
"assigned_to": "KymaAgent",
}
],
my_task=SubTask(
description="What is wrong with Function 'func1' in namespace 'kyma-app-serverless-syntax-err' with api version 'serverless.kyma-project.io/v1alpha2'?",
task_title="What is wrong with Function 'func1' in namespace 'kyma-app-serverless-syntax-err' with api version 'serverless.kyma-project.io/v1alpha2'?",
assigned_to="KymaAgent",
),
k8s_client=Mock(spec_set=IK8sClient), # noqa
Expand Down Expand Up @@ -330,11 +341,13 @@ def kyma_agent(app_models):
subtasks=[
{
"description": "Why the pod of the serverless Function is not ready?",
"task_title": "Why the pod of the serverless Function is not ready?",
"assigned_to": "KymaAgent",
}
],
my_task=SubTask(
description="Why the pod of the serverless Function is not ready?",
task_title="Why the pod of the serverless Function is not ready?",
assigned_to="KymaAgent",
),
k8s_client=Mock(spec_set=IK8sClient), # noqa
Expand Down Expand Up @@ -376,11 +389,13 @@ def kyma_agent(app_models):
subtasks=[
{
"description": "what are the BTP Operator features?",
"task_title": "what are the BTP Operator features?",
"assigned_to": "KymaAgent",
}
],
my_task=SubTask(
description="What are the BTP Operator features?",
task_title="What are the BTP Operator features?",
assigned_to="KymaAgent",
),
k8s_client=Mock(spec_set=IK8sClient), # noqa
Expand Down Expand Up @@ -420,11 +435,13 @@ def kyma_agent(app_models):
subtasks=[
{
"description": "what are the BTP Operator features?",
"task_title": "what are the BTP Operator features?",
"assigned_to": "KymaAgent",
}
],
my_task=SubTask(
description="What are the BTP Operator features?",
task_title="What are the BTP Operator features?",
assigned_to="KymaAgent",
),
k8s_client=Mock(spec_set=IK8sClient), # noqa
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/agents/supervisor/test_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def planner_correctness_metric(evaluator_model):
),
HumanMessage(content="What is the capital of Germany?"),
],
'{"subtasks": null, "response": "Berlin" }',
'{"subtasks":null,"response":"The capital of Germany is Berlin."}',
True,
),
(
Expand Down Expand Up @@ -88,7 +88,7 @@ def planner_correctness_metric(evaluator_model):
),
HumanMessage(content="why the pod is failing?"),
],
"{'subtasks': [{ 'description': 'why the pod is failing?', 'assigned_to': 'KubernetesAgent' ,'status' : 'pending'}] , 'response': null}",
"{'subtasks': None , 'response': 'pods is failing due to a context cancellation.'}",
False,
),
(
Expand All @@ -110,10 +110,10 @@ def planner_correctness_metric(evaluator_model):
content="The user query is related to: "
"{'resource_api_version': 'v1', 'resource_namespace': 'nginx-oom'}"
),
HumanMessage(content="What is Kubernetes? Explain Kyma function"),
HumanMessage(content="What is Kubernetes and Explain Kyma function"),
],
'{"subtasks": [{ "description": "What is Kubernetes?", "assigned_to": "KubernetesAgent","status" : "pending"},'
'{"description": "Explain Kyma function", "assigned_to": "KymaAgent","status" : "pending"}] , "response": null}',
'{"subtasks": [{ "description": "What is Kubernetes", "assigned_to": "KubernetesAgent","status" : "pending"},'
'{"description": "Explain Kyma function", "assigned_to": "KymaAgent","status" : "pending"}] , "response": null}',
False,
),
(
Expand Down
Loading
Loading