Skip to content

Commit

Permalink
Merge pull request #302 from crestalnetwork/feat/user-concept
Browse files Browse the repository at this point in the history
Feat: split agent create and update api
  • Loading branch information
hyacinthus authored Mar 4, 2025
2 parents ece0bc9 + 34bc1c3 commit 89f6fdd
Show file tree
Hide file tree
Showing 8 changed files with 484 additions and 153 deletions.
240 changes: 190 additions & 50 deletions app/admin/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
AgentDataTable,
AgentResponse,
AgentTable,
AgentUpdate,
)
from models.db import get_db
from utils.middleware import create_jwt_middleware
Expand Down Expand Up @@ -67,117 +68,162 @@ async def _process_agent(
# Get the latest agent from create_or_update
latest_agent, is_new = await agent.create_or_update()

# Process common post-creation/update steps
agent_data = await _process_agent_post_actions(latest_agent, is_new, slack_message)

return latest_agent, agent_data


async def _process_agent_post_actions(
agent: Agent, is_new: bool = True, slack_message: str | None = None
) -> AgentData:
"""Process common actions after agent creation or update.
Args:
agent: The agent that was created or updated
is_new: Whether the agent is newly created
slack_message: Optional custom message for Slack notification
Returns:
AgentData: The processed agent data
"""
has_wallet = False
agent_data = None

if not is_new:
agent_data = await AgentData.get(latest_agent.id)
agent_data = await AgentData.get(agent.id)
if agent_data and agent_data.cdp_wallet_data:
has_wallet = True
wallet_data = json.loads(agent_data.cdp_wallet_data)
# Run clean_agent_memory in background
asyncio.create_task(clean_agent_memory(latest_agent.id, clean_agent=True))
asyncio.create_task(clean_agent_memory(agent.id, clean_agent=True))

if not has_wallet:
# create the wallet
Cdp.configure(
api_key_name=config.cdp_api_key_name,
private_key=config.cdp_api_key_private_key.replace("\\n", "\n"),
)
wallet = Wallet.create(network_id=latest_agent.cdp_network_id)
wallet = Wallet.create(network_id=agent.cdp_network_id)
wallet_data = wallet.export_data().to_dict()
wallet_data["default_address_id"] = wallet.default_address.address_id
if not agent_data:
agent_data = AgentData(
id=latest_agent.id, cdp_wallet_data=json.dumps(wallet_data)
)
agent_data = AgentData(id=agent.id, cdp_wallet_data=json.dumps(wallet_data))
else:
agent_data.cdp_wallet_data = json.dumps(wallet_data)
await agent_data.save()
logger.info(
"Wallet created for agent %s: %s",
latest_agent.id,
agent.id,
wallet_data["default_address_id"],
)

if agent.telegram_config:
tg_bot_token = agent.telegram_config.get("token")
if tg_bot_token:
try:
bot = Bot(token=tg_bot_token)
bot_info = await bot.get_me()
agent_data.telegram_id = str(bot_info.id)
agent_data.telegram_username = bot_info.username
agent_data.telegram_name = bot_info.first_name
await agent_data.save()
try:
await bot.close()
except Exception:
pass
except (
TelegramUnauthorizedError,
TelegramConflictError,
TokenValidationError,
) as req_err:
raise HTTPException(
status_code=400,
detail=f"Unauthorized err getting telegram bot username with token {tg_bot_token}: {req_err}",
)
except Exception as e:
raise Exception(
f"Error getting telegram bot username with token {tg_bot_token}: {e}"
)
await _process_telegram_config(agent, agent_data)

# Send Slack notification
slack_message = slack_message or ("Agent Created" if is_new else "Agent Updated")
await _send_agent_notification(agent, wallet_data, slack_message)

return agent_data


async def _process_telegram_config(agent: Agent, agent_data: AgentData) -> None:
"""Process telegram configuration for an agent.
Args:
agent: The agent with telegram configuration
agent_data: The agent data to update
"""
if not hasattr(agent, "telegram_config") or not agent.telegram_config:
return

tg_bot_token = agent.telegram_config.get("token")
if tg_bot_token:
try:
bot = Bot(token=tg_bot_token)
bot_info = await bot.get_me()
agent_data.telegram_id = str(bot_info.id)
agent_data.telegram_username = bot_info.username
agent_data.telegram_name = bot_info.first_name
await agent_data.save()
try:
await bot.close()
except Exception:
pass
except (
TelegramUnauthorizedError,
TelegramConflictError,
TokenValidationError,
) as req_err:
raise HTTPException(
status_code=400,
detail=f"Unauthorized err getting telegram bot username with token {tg_bot_token}: {req_err}",
)
except Exception as e:
raise Exception(
f"Error getting telegram bot username with token {tg_bot_token}: {e}"
)


async def _send_agent_notification(
agent: Agent, wallet_data: dict, message: str
) -> None:
"""Send a notification about agent creation or update.
Args:
agent: The agent that was created or updated
wallet_data: The agent's wallet data
message: The notification message
"""
send_slack_message(
slack_message,
message,
attachments=[
{
"color": "good",
"fields": [
{"title": "ENV", "short": True, "value": config.env},
{"title": "Total", "short": True, "value": await Agent.count()},
{"title": "ID", "short": True, "value": latest_agent.id},
{"title": "Name", "short": True, "value": latest_agent.name},
{"title": "Model", "short": True, "value": latest_agent.model},
{"title": "ID", "short": True, "value": agent.id},
{"title": "Name", "short": True, "value": agent.name},
{"title": "Model", "short": True, "value": agent.model},
{
"title": "GOAT Enabled",
"short": True,
"value": str(latest_agent.goat_enabled),
"value": str(agent.goat_enabled),
},
{
"title": "CDP Enabled",
"short": True,
"value": str(latest_agent.cdp_enabled),
"value": str(agent.cdp_enabled),
},
{
"title": "CDP Network",
"short": True,
"value": latest_agent.cdp_network_id or "Default",
"value": agent.cdp_network_id or "Default",
},
{
"title": "Autonomous",
"short": True,
"value": str(latest_agent.autonomous_enabled),
"value": str(agent.autonomous_enabled),
},
{
"title": "Autonomous Interval",
"short": True,
"value": str(latest_agent.autonomous_minutes),
"value": str(agent.autonomous_minutes),
},
{
"title": "Twitter Entrypoint",
"short": True,
"value": str(latest_agent.twitter_entrypoint_enabled),
"value": str(agent.twitter_entrypoint_enabled),
},
{
"title": "Telegram Entrypoint",
"short": True,
"value": str(latest_agent.telegram_entrypoint_enabled),
"value": str(agent.telegram_entrypoint_enabled),
},
{
"title": "Twitter Skills",
"value": str(latest_agent.twitter_skills),
"value": str(agent.twitter_skills),
},
{
"title": "CDP Wallet Address",
Expand All @@ -188,18 +234,22 @@ async def _process_agent(
],
)

return latest_agent, agent_data


@admin_router.post(
"/agents", tags=["Agent"], status_code=201, operation_id="post_agent"
"/agents",
tags=["Agent"],
status_code=201,
operation_id="post_agent_deprecated",
deprecated=True,
)
async def create_agent(
async def create_or_update_agent(
agent: AgentCreate = Body(AgentCreate, description="Agent configuration"),
subject: str = Depends(verify_jwt),
) -> AgentResponse:
"""Create or update an agent.
THIS ENDPOINT IS DEPRECATED. Please use POST /agents/v2 for creating new agents.
This endpoint:
1. Validates agent ID format
2. Creates or updates agent configuration
Expand All @@ -221,6 +271,96 @@ async def create_agent(
return AgentResponse.from_agent(latest_agent, agent_data)


@admin_router.post(
"/agents/v2", tags=["Agent"], status_code=201, operation_id="post_agent_v2"
)
async def create_agent(
agent: AgentCreate = Body(AgentCreate, description="Agent configuration"),
subject: str = Depends(verify_jwt),
) -> AgentResponse:
"""Create a new agent.
This endpoint:
1. Validates agent ID format
2. Creates a new agent configuration (returns 400 error if agent ID already exists)
3. Masks sensitive data in response
**Request Body:**
* `agent` - Agent configuration
**Returns:**
* `AgentResponse` - Created agent configuration with additional processed data
**Raises:**
* `HTTPException`:
- 400: Invalid agent ID format or agent ID already exists
- 500: Database error
"""
if subject:
agent.owner = subject

# Check if agent with this ID already exists
existing_agent = await Agent.get(agent.id)
if existing_agent:
raise HTTPException(
status_code=400, detail=f"Agent with id {agent.id} already exists"
)

# Create new agent
await agent.check_upstream_id()
agent.check_prompt()
latest_agent = await agent.create()

# Process common post-creation actions
agent_data = await _process_agent_post_actions(latest_agent, True, "Agent Created")

return AgentResponse.from_agent(latest_agent, agent_data)


@admin_router.patch(
"/agents/{agent_id}", tags=["Agent"], status_code=200, operation_id="update_agent"
)
async def update_agent(
agent_id: str = Path(..., description="ID of the agent to update"),
agent: AgentUpdate = Body(AgentUpdate, description="Agent update configuration"),
subject: str = Depends(verify_jwt),
) -> AgentResponse:
"""Update an existing agent.
This endpoint:
1. Validates agent ID format
2. Updates the agent configuration if it exists
3. Reinitializes agent if already in cache
4. Masks sensitive data in response
**Path Parameters:**
* `agent_id` - ID of the agent to update
**Request Body:**
* `agent` - Agent update configuration
**Returns:**
* `AgentResponse` - Updated agent configuration with additional processed data
**Raises:**
* `HTTPException`:
- 400: Invalid agent ID format
- 404: Agent not found
- 403: Permission denied (if owner mismatch)
- 500: Database error
"""
if subject:
agent.owner = subject

# Update agent
latest_agent = await agent.update(agent_id)

# Process common post-update actions
agent_data = await _process_agent_post_actions(latest_agent, False, "Agent Updated")

return AgentResponse.from_agent(latest_agent, agent_data)


@admin_router_readonly.get(
"/agents",
tags=["Agent"],
Expand Down
2 changes: 1 addition & 1 deletion app/entrypoints/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ async def get_skill_history(
select(ChatMessageTable)
.where(
ChatMessageTable.agent_id == aid,
ChatMessageTable.author_type == AuthorType.SKILL
ChatMessageTable.author_type == AuthorType.SKILL,
)
.order_by(desc(ChatMessageTable.created_at))
.limit(50)
Expand Down
Loading

0 comments on commit 89f6fdd

Please sign in to comment.