Skip to content

Commit

Permalink
Storing report in the DB instead of writing to file (#675)
Browse files Browse the repository at this point in the history
* Storing report in the DB instead of writing to file

* Fixes before PR review

* typo

* Update prediction_market_agent/agents/microchain_agent/nft_treasury_game/scripts/generate_report.py

Co-authored-by: Peter Jung <peter@jung.ninja>

* Update prediction_market_agent/agents/microchain_agent/nft_treasury_game/scripts/generate_report.py

Co-authored-by: Peter Jung <peter@jung.ninja>

* Update prediction_market_agent/agents/microchain_agent/nft_treasury_game/scripts/generate_report.py

Co-authored-by: Peter Jung <peter@jung.ninja>

* Implemented PR comments

---------

Co-authored-by: Peter Jung <peter@jung.ninja>
  • Loading branch information
gabrielfior and kongzii authored Feb 6, 2025
1 parent ad7f254 commit aeff41e
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
from datetime import datetime
from pathlib import Path
import typing as t

from langchain_core.prompts import PromptTemplate
from prediction_market_agent_tooling.gtypes import ChecksumAddress, xdai_type
from prediction_market_agent_tooling.gtypes import ChecksumAddress, xDai
from prediction_market_agent_tooling.loggers import logger
from prediction_market_agent_tooling.tools.balances import get_balances
from prediction_market_agent_tooling.tools.contract import (
ContractOwnableERC721OnGnosisChain,
)
from prediction_market_agent_tooling.tools.datetime_utc import DatetimeUTC
from prediction_market_agent_tooling.tools.parallelism import par_map
from tabulate import tabulate
from tenacity import retry, stop_after_attempt, wait_fixed
from web3 import Web3

from prediction_market_agent.agents.identifiers import AgentIdentifier
from prediction_market_agent.agents.microchain_agent.memory import DatedChatMessage
from prediction_market_agent.agents.microchain_agent.memory_functions import (
fetch_memories_from_last_run,
Expand All @@ -24,16 +25,18 @@
DEPLOYED_NFT_AGENTS,
)
from prediction_market_agent.agents.utils import _summarize_learnings
from prediction_market_agent.db.models import LongTermMemories
from prediction_market_agent.db.models import LongTermMemories, ReportNFTGame
from prediction_market_agent.db.report_table_handler import ReportNFTGameTableHandler

SUMMARY_PROMPT_TEMPLATE = """
Summarize the memories below. They represent the actions taken by AI agents competing on an NFT game.
You must include the key transfers in the summary, and also the messages which led to these keys being transferred.
Memories:
{memories}"""

FINAL_SUMMARY_PROMPT_TEMPLATE = """
Make a final summary of a collection of memories from each agent. Describe the main activities that took place on the game.
Make a final summary of a collection of memories from each agent. Describe the main activities that took place on the game, specially which keys were transferred and which messages yielded best results.
Memories:
{memories}
Expand Down Expand Up @@ -61,36 +64,60 @@ def summarize_past_actions_from_agent(agent_memories: list[LongTermMemories]) ->
return learnings_per_agent


def summarize_prompts_from_all_agents() -> str:
def store_all_learnings_in_db(
final_summary: str, learnings_per_agent: dict[AgentIdentifier, str]
) -> None:
table_handler = ReportNFTGameTableHandler()
for agent_id, learnings in learnings_per_agent.items():
report = ReportNFTGame(
agent_id=agent_id, learnings=learnings, datetime_=DatetimeUTC.now()
)
logger.info(f"Saving report from {agent_id}")
table_handler.save_report(report)

final_report = ReportNFTGame(
agent_id=None, learnings=final_summary, datetime_=DatetimeUTC.now()
)

logger.info(f"Saving final summary")
table_handler.save_report(final_report)


def summarize_prompts_from_all_agents() -> tuple[dict[AgentIdentifier, str], str]:
memories_last_run = fetch_memories_from_last_run(
agent_identifiers=[i.identifier for i in DEPLOYED_NFT_AGENTS]
)
# We generate the learnings from each agent's memories.
learnings: list[str] = par_map(
items=list(memories_last_run.values()), func=summarize_past_actions_from_agent
)

# We combine each agent's memories into a final summary.
final_summary = _summarize_learnings(
memories=learnings,
prompt_template=PromptTemplate.from_template(FINAL_SUMMARY_PROMPT_TEMPLATE),
)
return final_summary

learnings_per_agent = {
agent.identifier: learnings_from_agent
for agent, learnings_from_agent in zip(DEPLOYED_NFT_AGENTS, learnings)
}
return learnings_per_agent, final_summary

def generate_report(
rpc_url: str, output_dir: Path, initial_xdai_balance_per_agent: int = 200
) -> None:

def calculate_nft_and_xdai_balances_diff(
rpc_url: str, initial_balance: xDai
) -> list[dict[str, t.Any]]:
w3 = Web3(Web3.HTTPProvider(rpc_url))
lookup = {agent.wallet_address: agent.identifier for agent in DEPLOYED_NFT_AGENTS}
# Initial balance of each agent at the beginning of the game.
initial_balance = xdai_type(initial_xdai_balance_per_agent)
# We retry the functions below due to RPC errors that can occur.
get_balances_retry = retry(stop=stop_after_attempt(3), wait=wait_fixed(1))(
get_balances
)
get_nft_balance_retry = retry(stop=stop_after_attempt(3), wait=wait_fixed(1))(
get_nft_balance
)

balances_diff = []
for agent_address, agent_id in lookup.items():
balance = get_balances_retry(
Expand All @@ -108,11 +135,22 @@ def generate_report(
"nft_balance_end": nft_balance,
}
)
return balances_diff

learnings = summarize_prompts_from_all_agents()
current_utc_datetime = datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S")
with open(output_dir / f"report_{current_utc_datetime}.md", "w") as file:
file.write(
tabulate([x.values() for x in balances_diff], list(balances_diff[0].keys()))
)
file.write("\n\n---\n" + learnings)

def generate_report(rpc_url: str, initial_xdai_balance_per_agent: xDai) -> None:
balances_diff = calculate_nft_and_xdai_balances_diff(
rpc_url=rpc_url, initial_balance=initial_xdai_balance_per_agent
)

(
learnings_per_agent,
final_summary,
) = summarize_prompts_from_all_agents()
balances_data = tabulate(
[x.values() for x in balances_diff], list(balances_diff[0].keys())
)
final_summary = balances_data + "\n\n---\n" + final_summary
store_all_learnings_in_db(
final_summary=final_summary, learnings_per_agent=learnings_per_agent
)
16 changes: 16 additions & 0 deletions prediction_market_agent/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,19 @@ def __str__(self) -> str:
Value: {wei_to_xdai(self.value_wei_parsed)} xDai
Message: {self.data_field}
"""


class ReportNFTGame(SQLModel, table=True):
"""Reports summarizing activities that took place during the NFT game."""

__tablename__ = "report_nft_game"
__table_args__ = {
"extend_existing": True,
}
id: Optional[int] = Field(default=None, primary_key=True)
agent_id: Optional[
str
] = None # we keep it optional to allow for the final summary (involving all agents) to be stored in this table
# as well.
learnings: str
datetime_: DatetimeUTC
16 changes: 16 additions & 0 deletions prediction_market_agent/db/report_table_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from prediction_market_agent.db.models import ReportNFTGame
from prediction_market_agent.db.sql_handler import SQLHandler


class ReportNFTGameTableHandler:
def __init__(
self,
sqlalchemy_db_url: str | None = None,
):
self.sql_handler = SQLHandler(
model=ReportNFTGame, sqlalchemy_db_url=sqlalchemy_db_url
)

def save_report(self, report: ReportNFTGame) -> None:
"""Save item to storage."""
self.sql_handler.save_multiple([report])
6 changes: 2 additions & 4 deletions prediction_market_agent/run_reset_game.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
python run_reset_game.py <RPC_URL> <OUTPUT_DIR> <xDai_balance_per_agent> <new_balance_treasury_xdai>
"""
import time
from pathlib import Path
from typing import Annotated

import typer
from prediction_market_agent_tooling.gtypes import xdai_type
from prediction_market_agent_tooling.loggers import logger

from prediction_market_agent.agents.microchain_agent.nft_treasury_game.scripts.generate_report import (
Expand All @@ -25,7 +25,6 @@
@APP.command()
def main(
rpc_url: str,
output_dir: Path,
xdai_balance_per_agent: Annotated[int, typer.Argument()] = 200,
new_balance_treasury_xdai: Annotated[int, typer.Argument()] = 100,
) -> None:
Expand All @@ -38,8 +37,7 @@ def main(

generate_report(
rpc_url=rpc_url,
output_dir=output_dir,
initial_xdai_balance_per_agent=xdai_balance_per_agent,
initial_xdai_balance_per_agent=xdai_type(xdai_balance_per_agent),
)
reset_balances(
rpc_url=rpc_url,
Expand Down

0 comments on commit aeff41e

Please sign in to comment.