diff --git a/examples/rag.py b/examples/rag.py index 3300d5c7b..e2ae6bbe8 100644 --- a/examples/rag.py +++ b/examples/rag.py @@ -128,7 +128,8 @@ async def worker(): while True: s = cast(DocsSection, await queue.get()) try: - await insert_doc_section(openai, pool, s) + with logfire.span('inserting {queue_size=} {url=}', queue_size=queue.qsize(), url=s.url()): + await insert_doc_section(openai, pool, s) except Exception: logfire.exception('Error inserting {url=}', url=s.url()) raise @@ -136,14 +137,13 @@ async def worker(): queue.task_done() with logfire.span('inserting doc sections'): - tasks = [asyncio.create_task(worker()) for _ in range(30)] + tasks = [asyncio.create_task(worker()) for _ in range(10)] await queue.join() for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) -@logfire.instrument() async def insert_doc_section(openai: AsyncOpenAI, pool: asyncpg.Pool, section: DocsSection) -> bool: url = section.url() exists = await pool.fetchval('SELECT 1 FROM doc_sections WHERE url = $1', url)