Skip to content

Commit

Permalink
Merge pull request #299 from crestalnetwork/feat/multi-autonomous
Browse files Browse the repository at this point in the history
Feat: multi autonomous
  • Loading branch information
taiyangc authored Mar 3, 2025
2 parents 997f1bd + ca7b17b commit f00ecd7
Show file tree
Hide file tree
Showing 30 changed files with 2,567 additions and 1,398 deletions.
126 changes: 56 additions & 70 deletions app/admin/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@
)
from fastapi.responses import PlainTextResponse
from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.exc import NoResultFound
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from yaml import safe_load

from app.config.config import config
from app.core.engine import clean_agent_memory
from models.agent import Agent, AgentData, AgentResponse
from models.agent import (
Agent,
AgentCreate,
AgentData,
AgentDataTable,
AgentResponse,
AgentTable,
)
from models.db import get_db
from utils.middleware import create_jwt_middleware
from utils.slack_alert import send_slack_message
Expand All @@ -42,7 +49,7 @@


async def _process_agent(
agent: Agent, subject: str | None = None, slack_message: str | None = None
agent: AgentCreate, subject: str | None = None, slack_message: str | None = None
) -> tuple[Agent, AgentData]:
"""Shared function to process agent creation or update.
Expand Down Expand Up @@ -188,7 +195,7 @@ async def _process_agent(
"/agents", tags=["Agent"], status_code=201, operation_id="post_agent"
)
async def create_agent(
agent: Agent = Body(Agent, description="Agent configuration"),
agent: AgentCreate = Body(AgentCreate, description="Agent configuration"),
subject: str = Depends(verify_jwt),
) -> AgentResponse:
"""Create or update an agent.
Expand Down Expand Up @@ -227,18 +234,23 @@ async def get_agents(db: AsyncSession = Depends(get_db)) -> list[AgentResponse]:
* `list[AgentResponse]` - List of agents with their quota information and additional processed data
"""
# Query all agents first
agents = (await db.exec(select(Agent))).all()
agents = (await db.scalars(select(AgentTable))).all()

# Batch get agent data
agent_ids = [agent.id for agent in agents]
agent_data_list = (
await db.exec(select(AgentData).where(AgentData.id.in_(agent_ids)))
).all()
agent_data_list = await db.scalars(
select(AgentDataTable).where(AgentDataTable.id.in_(agent_ids))
)
agent_data_map = {data.id: data for data in agent_data_list}

# Convert to AgentResponse objects
return [
AgentResponse.from_agent(agent, agent_data_map.get(agent.id))
AgentResponse.from_agent(
Agent.model_validate(agent),
AgentData.model_validate(agent_data_map.get(agent.id))
if agent.id in agent_data_map
else None,
)
for agent in agents
]

Expand All @@ -251,7 +263,6 @@ async def get_agents(db: AsyncSession = Depends(get_db)) -> list[AgentResponse]:
)
async def get_agent(
agent_id: str = Path(..., description="ID of the agent to retrieve"),
db: AsyncSession = Depends(get_db),
) -> AgentResponse:
"""Get a single agent by ID.
Expand All @@ -265,17 +276,10 @@ async def get_agent(
* `HTTPException`:
- 404: Agent not found
"""
agent = (await db.exec(select(Agent).where(Agent.id == agent_id))).first()
if not agent:
raise HTTPException(
status_code=404,
detail=f"Agent with id {agent_id} not found",
)
agent = await Agent.get(agent_id)

# Get agent data
agent_data = (
await db.exec(select(AgentData).where(AgentData.id == agent_id))
).first()
agent_data = await AgentData.get(agent_id)

return AgentResponse.from_agent(agent, agent_data)

Expand Down Expand Up @@ -307,7 +311,6 @@ async def clean_memory(
request: MemCleanRequest = Body(
MemCleanRequest, description="Agent memory cleanup request"
),
db: AsyncSession = Depends(get_db),
) -> str:
"""Clear an agent memory.
Expand Down Expand Up @@ -374,22 +377,16 @@ async def export_agent(
* `HTTPException`:
- 404: Agent not found
"""
try:
agent = await Agent.get(agent_id)
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")

yaml_content = agent.to_yaml()
return Response(
content=yaml_content,
media_type="application/x-yaml",
headers={"Content-Disposition": f'attachment; filename="{agent_id}.yaml"'},
)
except NoResultFound:
agent = await Agent.get(agent_id)
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
except Exception as e:
logger.error(f"Error exporting agent {agent_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))

yaml_content = agent.to_yaml()
return Response(
content=yaml_content,
media_type="application/x-yaml",
headers={"Content-Disposition": f'attachment; filename="{agent_id}.yaml"'},
)


@admin_router.put(
Expand Down Expand Up @@ -423,42 +420,31 @@ async def import_agent(
- 404: Agent not found
- 500: Server error
"""
try:
# First check if agent exists
existing_agent = await Agent.get(agent_id)
if not existing_agent:
raise HTTPException(status_code=404, detail="Agent not found")

# Read and parse YAML
content = await file.read()
try:
yaml_data = safe_load(content)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid YAML format: {e}")

# Ensure agent ID matches
if yaml_data.get("id") != agent_id:
raise HTTPException(
status_code=400, detail="Agent ID in YAML does not match URL parameter"
)
# First check if agent exists
existing_agent = await Agent.get(agent_id)
if not existing_agent:
raise HTTPException(status_code=404, detail="Agent not found")

# Create Agent instance from YAML
try:
agent = Agent(**yaml_data)
except ValidationError as e:
raise HTTPException(
status_code=400, detail=f"Invalid agent configuration: {e}"
)
# Read and parse YAML
content = await file.read()
try:
yaml_data = safe_load(content)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid YAML format: {e}")

# Process the agent
await _process_agent(
agent, subject, slack_message="Agent Updated via YAML Import"
# Ensure agent ID matches
if yaml_data.get("id") != agent_id:
raise HTTPException(
status_code=400, detail="Agent ID in YAML does not match URL parameter"
)

return "Agent import successful"
# Create Agent instance from YAML
try:
agent = AgentCreate.model_validate(yaml_data)
except ValidationError as e:
raise HTTPException(status_code=400, detail=f"Invalid agent configuration: {e}")

except HTTPException:
raise
except Exception as e:
logger.error(f"Error importing agent {agent_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Process the agent
await _process_agent(agent, subject, slack_message="Agent Updated via YAML Import")

return "Agent import successful"
14 changes: 8 additions & 6 deletions app/admin/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlmodel import update
from sqlalchemy import update

from app.config.config import config
from app.services.twitter.oauth2_refresh import refresh_expiring_tokens
from models.agent import AgentQuota
from models.agent import AgentQuotaTable
from models.db import get_session, init_db


Expand All @@ -17,8 +17,10 @@ async def reset_daily_quotas():
Resets message_count_daily and twitter_count_daily to 0.
"""
async with get_session() as session:
stmt = update(AgentQuota).values(message_count_daily=0, twitter_count_daily=0)
await session.exec(stmt)
stmt = update(AgentQuotaTable).values(
message_count_daily=0, twitter_count_daily=0
)
await session.execute(stmt)
await session.commit()


Expand All @@ -27,10 +29,10 @@ async def reset_monthly_quotas():
Resets message_count_monthly and autonomous_count_monthly to 0.
"""
async with get_session() as session:
stmt = update(AgentQuota).values(
stmt = update(AgentQuotaTable).values(
message_count_monthly=0, autonomous_count_monthly=0
)
await session.exec(stmt)
await session.execute(stmt)
await session.commit()


Expand Down
92 changes: 88 additions & 4 deletions app/autonomous.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,24 @@
import logging
import signal
import sys
from datetime import datetime
from typing import Dict

import sentry_sdk
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy import select

from app.config.config import config
from app.entrypoints.autonomous import run_autonomous_agents
from models.db import init_db
from app.entrypoints.autonomous import run_autonomous_agents, run_autonomous_task
from models.agent import Agent, AgentTable
from models.db import get_session, init_db

logger = logging.getLogger(__name__)

# Global dictionary to store task_id and last updated time
autonomous_tasks_updated_at: Dict[str, datetime] = {}

if config.sentry_dsn:
sentry_sdk.init(
dsn=config.sentry_dsn,
Expand All @@ -23,6 +31,72 @@
server_name="intent-autonomous",
)


async def schedule_agent_autonomous_tasks(scheduler: AsyncIOScheduler):
"""
Find all agents with autonomous tasks and schedule them.
This function is called periodically to update the scheduler with new or modified tasks.
"""
logger.info("Checking for agent autonomous tasks...")

async with get_session() as db:
# Get all agents with autonomous configuration
query = select(AgentTable).where(AgentTable.autonomous != None) # noqa: E711
agents = await db.scalars(query)

for item in agents:
agent = Agent.model_validate(item)
if not agent.autonomous or len(agent.autonomous) == 0:
continue

for autonomous in agent.autonomous:
if not autonomous.enabled:
continue

# Create a unique task ID for this autonomous task
task_id = f"{agent.id}-{autonomous.id}"

# Check if task exists and needs updating
if (
task_id in autonomous_tasks_updated_at
and autonomous_tasks_updated_at[task_id] >= agent.updated_at
):
# Task exists and agent hasn't been updated, skip
continue

# Schedule new job based on minutes or cron
if autonomous.cron:
logger.info(
f"Scheduling cron task {task_id} with cron: {autonomous.cron}"
)
scheduler.add_job(
run_autonomous_task,
CronTrigger.from_crontab(autonomous.cron),
id=task_id,
args=[agent.id, autonomous.id, autonomous.prompt],
replace_existing=True,
)
elif autonomous.minutes is not None:
logger.info(
f"Scheduling interval task {task_id} every {autonomous.minutes} minutes"
)
scheduler.add_job(
run_autonomous_task,
"interval",
id=task_id,
args=[agent.id, autonomous.id, autonomous.prompt],
minutes=autonomous.minutes,
replace_existing=True,
)
else:
logger.error(
f"Invalid autonomous configuration for task {task_id}: {autonomous}"
)

# Update the last updated time
autonomous_tasks_updated_at[task_id] = agent.updated_at


if __name__ == "__main__":

async def main():
Expand All @@ -32,8 +106,18 @@ async def main():
# Initialize scheduler
scheduler = AsyncIOScheduler()

# Add job to run every minute
scheduler.add_job(run_autonomous_agents, "interval", minutes=1)
# Add job to run legacy autonomous agents every minute
scheduler.add_job(run_autonomous_agents, "interval", minutes=100)

# Add job to schedule agent autonomous tasks every 5 minutes
# Run it immediately on startup and then every 5 minutes
scheduler.add_job(
schedule_agent_autonomous_tasks,
"interval",
args=[scheduler],
minutes=5,
next_run_time=datetime.now(),
)

# Signal handler for graceful shutdown
def signal_handler(signum, frame):
Expand Down
1 change: 1 addition & 0 deletions app/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def __init__(self):
self.openai_api_key = self.load("OPENAI_API_KEY")
self.deepseek_api_key = self.load("DEEPSEEK_API_KEY")
self.xai_api_key = self.load("XAI_API_KEY")
self.eternal_api_key = self.load("ETERNAL_API_KEY")
self.system_prompt = self.load("SYSTEM_PROMPT")
self.input_token_limit = int(self.load("INPUT_TOKEN_LIMIT", "60000"))
# Telegram server settings
Expand Down
Loading

0 comments on commit f00ecd7

Please sign in to comment.