As a developer I want to get a feel of what it is like to create an AI solution using langgraph for a well known usecase of QA based on website content. I want to explore the following:
- Jump start the solution (e.g., using ready solution templates)
- Extending the solution
- Debugging the solution
- And finally deploying the solution in cloud
Langgraph had a ready template for Retrieval Augmented Generation (RAG) solutions. So I started with that and extended it for my specific requirements:
- Crawl from starter_urls list within a certain number of hops to build vector index
- Use open source vector DB - Milvus as a retriever
Here's a snapshot of how I modified the template solution.
Install the LangGraph CLI
pip install langgraph-cli --upgrade
Create a new app from a template
langgraph new
Add starter_urls, and hops to IndexConfiguration
in configuration.py
. Let's also add a method to get the list of urls from the comma-separated starter_urls
string.
starter_urls: str = field(
default="https://zohlar.com",
metadata={
"description": "Comma-separated string of starter URLs to crawl for indexing web pages."
},
)
hops: int = field(
default=2,
metadata={
"description": "Maximum number of hops to traverse pages linked to the starter URLs."
},
)
def parse_starter_urls(self) -> list[str]:
"""Parse the starter URLs into a list.
Returns:
list[str]: A list of URLs parsed from the comma-separated string.
"""
return [url.strip() for url in self.starter_urls.split(",") if url.strip()]
Let's also add milvus
to retriever_provider
list
retriever_provider: Annotated[
Literal["elastic", "elastic-local", "pinecone", "mongodb", "milvus"],
{"__template_metadata__": {"kind": "retriever"}},
] = field(
default="milvus",
metadata={
"description": "The vector store provider to use for retrieval. Options are 'elastic', 'pinecone', 'mongodb', or, 'milvus'."
},
)
Let's begin by adding langchain-milvus
as a dependency in pyproject.toml
Let's add a new method to create a milvus retriever in retrieval.py
@contextmanager
def make_milvus_retriever(
configuration: IndexConfiguration, embedding_model: Embeddings
) -> Generator[VectorStoreRetriever, None, None]:
"""Configure this agent to use milvus lite file based uri to store the vector index."""
from langchain_milvus.vectorstores import Milvus
vstore = Milvus (
embedding_function=embedding_model,
collection_name=configuration.user_id,
connection_args={"uri": os.environ["MILVUS_DB"]},
auto_id=True
)
yield vstore.as_retriever()
and then use this in the factory method
@contextmanager
def make_retriever(
config: RunnableConfig,
) -> Generator[VectorStoreRetriever, None, None]:
# ... same code as before
match configuration.retriever_provider:
# ... same code as before
case "milvus":
with make_milvus_retriever(configuration, embedding_model) as retriever:
yield retriever
case _:
# ... as before
For milvus lite we'll use the following file uri to store the vector index:
## Milvus
MILVUS_DB=milvus.db
Note: While Milvus Lite (file based URI) worked fine on MacOS, it didn't work on Windows :(. Here's how I worked around that for windows:
- Installed Docker Desktop.
- My windows machine has Nvidia GPU, therefore, I downloaded the milvus-io/milvus/deployments/docker/gpu/standalone/docker-compose.yml. As an alternative one may use the standalone/docker-compose.yml (without GPI).
- Run
docker compose up -d
to downloaded all the required images and start the container. This startsmilvus standalone
along with its dependenciesetcd
andminio
. From this point on I used the docker destop to check logs, or stop / restart milvus as I needed.
In this case we'll set the following milvus standalone URI:
MILVUS_DB=http://localhost:19530
Out of the box implementation of index_graph
(in index_graph.py) expects as input all the documents to be indexed. Since we are enhancing the graph to include an ingestion pipeline that crawls starting at the specified URL, we'll modify the index_docs
node to kick start the crawl if docs list in the state is empty and starter_urls
configuration has been provided.
async def index_docs(
state: IndexState, *, config: Optional[RunnableConfig] = None
) -> dict[str, str]:
# ... as before
with retrieval.make_retriever(config) as retriever:
# code to kick start crawl if required
configuration = IndexConfiguration.from_runnable_config(config)
if not state.docs and configuration.starter_urls:
print(f"starting crawl ...")
state.docs = await crawl (
configuration.user_id,
configuration.parse_starter_urls(),
configuration.hops
)
# rest remains the same as before
stamped_docs = ensure_docs_have_user_id(state.docs, config)
if configuration.retriever_provider == "milvus":
retriever.add_documents(stamped_docs)
else:
await retriever.aadd_documents(stamped_docs)
return {"docs": "delete"}
Add the following functions that wrap around the new Crawler component
async def crawl(tenant: str, starter_urls: list, hops: int):
allowed_domains = set(urlparse(url).netloc for url in starter_urls)
crawler = WebCrawler(starter_urls, hops, allowed_domains, tenant)
await crawler.crawl()
return [
Document(page_content=get_file_content(page["local_filepath"]), metadata={"url": page["url"]})
for page in crawler.crawled_pages
]
def get_file_content(file_path: str) -> str:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
The Crawler component
uses playwright
, which uses headless browsers. Let's add playwright
& requests
as dependencies in pyproject.toml
.
Just adding playwright
package to the python environment is not sufficient. The headless browser, and its dependencies also need to be installed. So we need to run playright install
, and playwright install-deps
as well.
Note: You may use LangGraph Studio app on mac, using docker to locally deploy your graphs. In this case playwright install
and playwright install-deps
need to happen in the docker. For that we'll add the following to langgraph.json
"dockerfile_lines": ["RUN pip install playwright", "RUN python -m playwright install", "RUN python -m playwright install-deps"],
While my Crawler component
worked well on MacOS, Playwright event loop on windows doesn't play well with asyncio event loop used by LangGraph. I wasn't able to resolve playwright issues on Windows. On windows, LangGraph Studio app is not available as well, so instead of docker, I was using langgraph-cli
for local deployment of langgraph api.
I added an APIfy based crawl to index_graph.py
. Here are the enhancements:
# new imports
import json
from langchain_community.utilities import ApifyWrapper
from langchain_community.document_loaders import ApifyDatasetLoader
# ... existing code
def load_site_dataset_map() -> dict:
with open("sites_dataset_map.json", 'r', encoding='utf-8') as file:
return json.load(file)
def apify_crawl(tenant: str, starter_urls: list, hops: int):
site_dataset_map = load_site_dataset_map()
if dataset_id := site_dataset_map.get(tenant):
loader = ApifyDatasetLoader(
dataset_id=dataset_id,
dataset_mapping_function=lambda item: Document(
page_content=item["html"] or "", metadata={"url": item["url"]}
),
)
else:
apify = ApifyWrapper()
loader = apify.call_actor(
actor_id="apify/website-content-crawler",
run_input={
"startUrls": starter_urls,
"saveHtml": True,
"htmlTransformer": "none"
},
dataset_mapping_function=lambda item: Document(
page_content=item["html"] or "", metadata={"url": item["url"]}
),
)
print(f"Site: {tenant} crawled and loaded into Apify dataset: {loader.dataset_id}")
return loader.load()
# ... existing code
async def index_docs(
state: IndexState, *, config: Optional[RunnableConfig] = None
) -> dict[str, str]:
# ... as before
with retrieval.make_retriever(config) as retriever:
# code to kick start crawl if required
configuration = IndexConfiguration.from_runnable_config(config)
if not state.docs and configuration.starter_urls:
print(f"starting crawl ...")
# state.docs = await crawl (
# configuration.user_id,
# configuration.parse_starter_urls(),
# configuration.hops
# )
state.docs = apify_crawl (
configuration.user_id,
[{"url": url} for url in configuration.parse_starter_urls()],
configuration.hops
)
# rest remains the same as before
stamped_docs = ensure_docs_have_user_id(state.docs, config)
if configuration.retriever_provider == "milvus":
retriever.add_documents(stamped_docs)
else:
await retriever.aadd_documents(stamped_docs)
return {"docs": "delete"}
# ... existing code
I changed default values for parameters saveHtml
, and htmlTransformer
because, OpenAI embedding models understand HTML very well, and the clean up performed by html transformer loses some useful information.
The index_docs
node (in index_graph.py) takes IndexState
(in state.py) as input. Let's add the following logic in the reducer function (reduce_docs
) to empower the user to ask for crawl.
def reduce_docs(
existing: Optional[Sequence[Document]],
new: Union[
Sequence[Document],
Sequence[dict[str, Any]],
Sequence[str],
str,
Literal["delete"],
],
) -> Sequence[Document]:
if new == "crawl"
return []
# rest if as before
With this enhancement, the user may just set docs
to "crawl"
as follows: