Skip to content

Commit

Permalink
feat: Athena POC for querying predictions (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
kartikey-vyas authored Jun 23, 2024
1 parent 1b5d4eb commit c3a9556
Show file tree
Hide file tree
Showing 4 changed files with 2,408 additions and 1,790 deletions.
319 changes: 319 additions & 0 deletions notebooks/athena-poc.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Testing the use of Athena for bulk querying of the cloud prediction store\n",
"\n",
"We want to enable bulk queries of the data lake/analytical cloud store of precalculations. Instead of having to run complex scans over DynamoDB, since we don't have strict latency requirements, we can use Athena to run queries over the structured precalculation data in S3."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Querying Athena\n",
"\n",
"We will use `awswrangler` as a python interface to query Athena, and also to read and write from S3. It's an open source package developed by AWS to make data lake interactions easier for data scientists/engineers."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"import awswrangler as wr\n",
"import pandas as pd"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"I've created a table called `eos3b5e` for now, to demonstrate how we can use Athena on top of data in S3. Going forward, the inference pipeline will take care of creating tables/inserting new data, but for now this one was click-ops only."
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"df = wr.athena.read_sql_query(\"SELECT * FROM eos3b5e limit 20\", database=\"precalcs_test\")\n",
"df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Using the test input, we can read it in and get a list of SMILEs strings. We read a CSV because this is how we expect users to interact with the Ersilia Model Hub."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"model_id = \"eos3b5e\"\n",
"df_input = pd.read_csv(\"../test_input.csv\", header=None)\n",
"df_input[\"input\"] = df_input[0].apply(lambda x: f\"'{x}'\")\n",
"smiles = \",\".join(df_input.input.values)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We collect all the smiles strings and use them to form a simple query with a WHERE clause."
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [],
"source": [
"query = f\"select * from {model_id} where input in ({smiles})\""
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {},
"outputs": [],
"source": [
"df_out = wr.athena.read_sql_query(query, database=\"precalcs_test\")\n",
"df_out.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Testing the speed with and input size of 1000; we see its around 15s on average. This seems reasonable for our purposes.\n",
"\n",
"(`test_input_large.csv` is just a copy of the reference library)"
]
},
{
"cell_type": "code",
"execution_count": 45,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"16.4 s ± 857 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%%timeit\n",
"model_id = \"eos3b5e\"\n",
"dfi = pd.read_csv(\"../test_input_large.csv\", nrows=1000)\n",
"dfi[\"input\"] = dfi[0].apply(lambda x: f\"'{x}'\")\n",
"smiles = \",\".join(dfi.input.values)\n",
"query = f\"select * from {model_id} where input in ({smiles})\"\n",
"df_out_large = wr.athena.read_sql_query(query, database=\"precalcs_test\")"
]
},
{
"cell_type": "code",
"execution_count": 53,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"<class 'pandas.core.frame.DataFrame'>\n",
"RangeIndex: 999 entries, 0 to 998\n",
"Data columns (total 3 columns):\n",
" # Column Non-Null Count Dtype \n",
"--- ------ -------------- ----- \n",
" 0 key 999 non-null string \n",
" 1 input 999 non-null string \n",
" 2 mw 999 non-null float32\n",
"dtypes: float32(1), string(2)\n",
"memory usage: 19.6 KB\n"
]
}
],
"source": [
"model_id = \"eos3b5e\"\n",
"dfi = pd.read_csv(\"../test_input_large.csv\", nrows=1000)\n",
"dfi[\"input\"] = dfi[0].apply(lambda x: f\"'{x}'\")\n",
"smiles = \",\".join(dfi.input.values)\n",
"query = f\"select * from {model_id} where input in ({smiles})\"\n",
"df_out_large = wr.athena.read_sql_query(query, database=\"precalcs_test\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We tried with 10k inputs and got ThrottlingErrors. This approach doesn't feel scalable and the condition itself is not best practice for most SQL engines. Instead, we can try to upload the input file to the data lake, then use an INNER JOIN within Athena to return the desired predictions."
]
},
{
"cell_type": "code",
"execution_count": 58,
"metadata": {},
"outputs": [],
"source": [
"model_id = \"eos3b5e\"\n",
"dfi = pd.read_csv(\"../test_input_large.csv\", nrows=input_size)"
]
},
{
"cell_type": "code",
"execution_count": 61,
"metadata": {},
"outputs": [],
"source": [
"request_id = \"test_request\"\n",
"dfi[\"request\"] = request_id\n",
"dfi[\"model\"] = model_id\n",
"dfi.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We add some supplementary information. This means that for every request to the API, we record:\n",
"- a unique identifier for that request\n",
"- the model requested\n",
"- the molecules requested\n",
"\n",
"We can play around with the partition scheme and add more information later (like a unique _user_ ID)."
]
},
{
"cell_type": "code",
"execution_count": 64,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'paths': ['s3://precalculations-bucket/in/test/model=eos3b5e/request=test_request/fed3ae8f053349939b4bd878df8b8727.snappy.parquet'],\n",
" 'partitions_values': {'s3://precalculations-bucket/in/test/model=eos3b5e/request=test_request/': ['eos3b5e',\n",
" 'test_request']}}"
]
},
"execution_count": 64,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"wr.s3.to_parquet(\n",
" df=dfi,\n",
" path=\"s3://precalculations-bucket/in/test\",\n",
" dataset=True,\n",
" database=\"precalcs_test\",\n",
" table=\"requests\",\n",
" partition_cols=[\"model\", \"request\"]\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The query is a simple inner join"
]
},
{
"cell_type": "code",
"execution_count": 66,
"metadata": {},
"outputs": [],
"source": [
"query = f\"\"\"\n",
"select\n",
" p.key,\n",
" p.input,\n",
" p.mw\n",
"from\n",
" {model_id} p\n",
" inner join requests r\n",
" on p.input = r.smiles\n",
"where \n",
" r.model = '{model_id}'\n",
" and r.request = '{request_id}';\n",
"\n",
"\"\"\""
]
},
{
"cell_type": "code",
"execution_count": 67,
"metadata": {},
"outputs": [],
"source": [
"df_out_large = wr.athena.read_sql_query(query, database=\"precalcs_test\")"
]
},
{
"cell_type": "code",
"execution_count": 68,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"<class 'pandas.core.frame.DataFrame'>\n",
"RangeIndex: 10000 entries, 0 to 9999\n",
"Data columns (total 3 columns):\n",
" # Column Non-Null Count Dtype \n",
"--- ------ -------------- ----- \n",
" 0 key 10000 non-null string \n",
" 1 input 10000 non-null string \n",
" 2 mw 10000 non-null float32\n",
"dtypes: float32(1), string(2)\n",
"memory usage: 195.4 KB\n"
]
}
],
"source": [
"df_out_large.info()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we can scale out to 10k and beyond (should the need arise), and we have enhanced auditability within our data lake. This information may seem redundant for now, but storage is cheap, and with a smart partitioning scheme like we've implemented, the compute requirements for individual requests won't increase over time. Keeping all this information enables analytics in the future."
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading

0 comments on commit c3a9556

Please sign in to comment.