From 583aa63f6cc3dd6b5d8800d82f6a6727aaa9f987 Mon Sep 17 00:00:00 2001 From: Kartikey Vyas Date: Sun, 25 Aug 2024 14:40:44 +1000 Subject: [PATCH 01/14] refactor prediction writing --- .github/workflows/predict.yml | 18 +-- config/app.py | 10 ++ notebooks/athena-poc.ipynb | 2 +- precalculator/fetcher.py | 25 +--- precalculator/models.py | 45 +++++- precalculator/writer.py | 133 ++++++++++++++++-- scripts/generate_predictions.py | 87 +++++++----- ...nerate_predictions_with_aws_and_ersilia.py | 100 ------------- 8 files changed, 245 insertions(+), 175 deletions(-) delete mode 100644 scripts/generate_predictions_with_aws_and_ersilia.py diff --git a/.github/workflows/predict.yml b/.github/workflows/predict.yml index 7d09ae9..8463072 100644 --- a/.github/workflows/predict.yml +++ b/.github/workflows/predict.yml @@ -12,12 +12,12 @@ on: model-id: required: true type: string + sha: + required: true + type: string sample-only: required: false type: string - SHA: - required: true - type: string jobs: infer-and-upload: @@ -70,9 +70,9 @@ jobs: - name: Run Python script to generate predictions and upload to S3 env: - MODEL_ID: ${{ inputs.model-id }} - SHA: ${{ inputs.SHA }} - numerator: ${{ inputs.numerator }} - sample-only: ${{ inputs.sample-only }} - GITHUB_REPOSITORY: ${{ github.event.repository.full_name }} - run: .venv/bin/python scripts/generate_predictions_with_aws_and_ersilia.py ci \ No newline at end of file + INPUT_MODEL_ID: ${{ inputs.model-id }} + INPUT_SHA: ${{ inputs.SHA }} + INPUT_NUMERATOR: ${{ inputs.numerator }} + INPUT_DENOMINATOR: ${{ inputs.denominator }} + INPUT_SAMPLE_ONLY: ${{ inputs.sample-only }} + run: .venv/bin/python scripts/generate_predictions.py prod \ No newline at end of file diff --git a/config/app.py b/config/app.py index 8d25310..ceb2e91 100644 --- a/config/app.py +++ b/config/app.py @@ -1,3 +1,6 @@ +from typing import Optional + +from pydantic import BaseModel from pydantic_settings import BaseSettings, SettingsConfigDict @@ -12,3 +15,10 @@ class DataLakeConfig(BaseSettings): athena_database: str athena_prediction_table: str athena_request_table: str + + +class WorkerConfig(BaseModel): + git_sha: str + denominator: int # the total number of workers to split data over + numerator: int # the number assigned to this worker + sample: Optional[int] = None # sample size of reference library (in percentage terms, 0-100) diff --git a/notebooks/athena-poc.ipynb b/notebooks/athena-poc.ipynb index 04ca846..81a8515 100644 --- a/notebooks/athena-poc.ipynb +++ b/notebooks/athena-poc.ipynb @@ -263,7 +263,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.9" + "version": "3.10.13" } }, "nbformat": 4, diff --git a/precalculator/fetcher.py b/precalculator/fetcher.py index 043b1bf..a59f63c 100644 --- a/precalculator/fetcher.py +++ b/precalculator/fetcher.py @@ -7,27 +7,20 @@ from config.app import DataLakeConfig -# CLI ensures that input is correctly formatted: -# | input key | -# on the -# presignurl -> a specific S3 bucket, object name is the request ID, prefix is the model ID -# -# s3://bucket/model-id/request-id.csv - class PredictionFetcher: def __init__(self, config: DataLakeConfig, user_id: str, request_id: str, model_id: str, dev: bool = False): self.config = config self.user_id = user_id self.request_id = request_id - # TODO: decide on multi model implementation, for now assume a list of 1 model ID self.model_id = model_id self.dev = dev + self.logger = logging.getLogger("PredictionFetcher") + self.logger.setLevel(logging.INFO) + if self.dev: logging.basicConfig(stream=sys.stdout, level=logging.INFO) - self.logger = logging.getLogger(__name__) - self.logger.setLevel(logging.INFO) logging.getLogger("botocore").setLevel(logging.WARNING) def check_availability(self) -> str: @@ -50,18 +43,10 @@ def fetch(self, path_to_input: str) -> pd.DataFrame: input_df = self._read_input_data(path_to_input) logger.info("writing input to athena") - try: - self._write_inputs_s3(input_df) - except Exception as e: - print(f"error {e}") - raise (e) + self._write_inputs_s3(input_df) logger.info("fetching outputs from athena") - try: - output_df = self._read_predictions_from_s3() - except Exception as e: - print(f"error {e}") - raise (e) + output_df = self._read_predictions_from_s3() return output_df diff --git a/precalculator/models.py b/precalculator/models.py index 28d8070..9b1b834 100644 --- a/precalculator/models.py +++ b/precalculator/models.py @@ -1,15 +1,15 @@ -from typing import Any +import pandas as pd from pydantic import BaseModel, Field class Prediction(BaseModel): """Dataclass to represent a single prediction""" - model_id: str input_key: str smiles: str - output: list[Any] + output: dict + model_id: str class Metadata(BaseModel): @@ -22,3 +22,42 @@ class Metadata(BaseModel): pipeline_latest_start_time: int = Field(default=0) pipeline_latest_duration: int = Field(default=0) pipeline_meta_s3_uri: str = Field(default="") + + +class SchemaValidationError(Exception): + def __init__(self, errors: list[str]): + self.errors = errors + error_message = "\n".join(errors) + super().__init__(f"Schema validation failed with the following errors:\n{error_message}\n") + + +def validate_dataframe_schema(df: pd.DataFrame, model: BaseModel) -> None: + errors = [] + schema = model.model_fields + + for field_name, field in schema.items(): + if field_name not in df.columns: + errors.append(f"Missing column: {field_name}") + else: + pandas_dtype = df[field_name].dtype + pydantic_type = field.annotation + if not check_type_compatibility(pandas_dtype, pydantic_type): + errors.append(f"Column {field_name} has type {pandas_dtype}, expected {pydantic_type}") + + for column in df.columns: + if column not in schema: + errors.append(f"Unexpected column: {column}") + + if errors: + raise SchemaValidationError(errors) + + +def check_type_compatibility(pandas_dtype, pydantic_type): + type_map = { + "object": [str, dict, list], + "int64": [int], + "float64": [float], + "bool": [bool], + "datetime64": [pd.Timestamp], + } + return pydantic_type in type_map.get(str(pandas_dtype)) diff --git a/precalculator/writer.py b/precalculator/writer.py index f65ec50..9073db3 100644 --- a/precalculator/writer.py +++ b/precalculator/writer.py @@ -1,17 +1,134 @@ -import json +# import json +import logging +import os +import subprocess +import sys +import awswrangler as wr import boto3 +import pandas as pd -from config.app import DataLakeConfig -from precalculator.models import Metadata +from config.app import DataLakeConfig, WorkerConfig +from precalculator.models import ( + Prediction, + validate_dataframe_schema, +) -s3_client = boto3.client("s3") +INPUT_NAME = "reference_library" +INPUT_FILE_NAME = f"{INPUT_NAME}.csv" +PROCESSED_FILE_NAME = "input.csv" +OUTPUT_FILE_NAME = "output.csv" class PredictionWriter: - def __init__(self, config: DataLakeConfig, model_id: str): - self.config = config + def __init__(self, data_config: DataLakeConfig, worker_config: WorkerConfig, model_id: str, dev: bool): + self.data_config = data_config + self.worker_config = worker_config self.model_id = model_id + self.s3 = boto3.client("s3") - def write_metadata(self, bucket: str, metadata_key: str, metadata: Metadata) -> None: - s3_client.put_object(Bucket=bucket, Key=metadata_key, Body=json.dumps(metadata.model_dump_json())) + self.logger = logging.getLogger("PredictionWriter") + self.logger.setLevel(logging.INFO) + + if self.dev: + logging.basicConfig(stream=sys.stdout, level=logging.INFO) + logging.getLogger("botocore").setLevel(logging.WARNING) + + # def write_metadata(self, bucket: str, metadata_key: str, metadata: Metadata) -> None: + # self.s3.put_object(Bucket=bucket, Key=metadata_key, Body=json.dumps(metadata.model_dump_json())) + + def fetch(self) -> str: + """Fetch and split inputs for this worker, ready to pass to Ersilia CLI""" + logger = self.logger + + if self.worker_config.sample: + input_filename = f"{INPUT_NAME}_{self.worker_config.sample}.csv" + else: + input_filename = INPUT_FILE_NAME + + self.s3.download_file( + self.data_config.s3_bucket_name, + input_filename, + INPUT_FILE_NAME, + ) + + logger.info(f"Downloaded {input_filename} from S3") + + partition_metadata = self._split_csv() + + logger.info(f"Predicting for rows {partition_metadata[0]} to {partition_metadata[1]}") + + return PROCESSED_FILE_NAME + + def predict(self, input_file_path: str) -> str: + """Calls Ersilia CLI to generate predictions for provided input CSV. + + This method gets Ersilia to pull and serve the relevant model container. + + Args: + input_file_path (str): path to input CSV + """ + logger = self.logger + + logger.info(f"Calling Ersilia CLI for model {self.model_id}") + + subprocess.run([".venv/bin/ersilia", "-v", "serve", self.model_id]) # type: ignore + subprocess.run([".venv/bin/ersilia", "-v", "run", "-i", input_file_path, "-o", OUTPUT_FILE_NAME]) + + return OUTPUT_FILE_NAME + + def postprocess(self, ersilia_output_path: str) -> pd.DataFrame: + """Postprocessing for output file from Ersilia CLI + + Args: + ersilia_output_path (str): location of output CSV + + Returns: + pd.DataFrame: postprocessed dataframe of outputs + """ + + logger = self.logger + logger.info("Postprocessing outputs from Ersilia model") + + df = pd.read_csv(ersilia_output_path) + + output_cols = df.columns[2:] + output_records = df[output_cols].to_dict(orient="records") + + df["output"] = output_records + df["model_id"] = self.model_id + df = df[["key", "input", "output", "model_id"]] + df = df.rename(columns={"key": "input_key", "input": "smiles"}) + + return df + + def write_to_lake(self, outputs: pd.DataFrame): + validate_dataframe_schema(outputs, Prediction) + + wr.s3.to_parquet( + df=outputs, + path=os.path.join( + "s3://", + self.data_config.s3_bucket_name, + self.data_config.athena_prediction_table, + ), + dataset=True, + database=self.data_config.athena_database, + table=self.data_config.athena_prediction_table, + partition_cols=["model_id"], + ) + + def _split_csv(self) -> tuple[int, int]: + """Partition CSV file such that the worker has the correct set of rows to predict on""" + df = pd.read_csv(INPUT_FILE_NAME) + + total_length = len(df) + chunk_size = total_length // self.worker_config.denominator + + start_row = (self.worker_config.numerator - 1) * chunk_size + end_row = start_row + chunk_size + + df = df.iloc[start_row:end_row] + df.to_csv(PROCESSED_FILE_NAME, index=False) + + return start_row, end_row diff --git a/scripts/generate_predictions.py b/scripts/generate_predictions.py index 7690308..5b7507c 100644 --- a/scripts/generate_predictions.py +++ b/scripts/generate_predictions.py @@ -1,44 +1,63 @@ +import argparse import logging -import sys -import time -from typing import List +import os -from ersilia import ErsiliaModel # type: ignore +from config.app import DataLakeConfig, WorkerConfig +from precalculator.writer import PredictionWriter -EXAMPLE_MODEL_ID = "eos2zmb" +TEST_ENV = { + "INPUT_MODEL_ID": "eos2zmb", + "INPUT_SHA": "1234", + "INPUT_NUMERATOR": 1, + "INPUT_DENOMINATOR": 2, + "INPUT_SAMPLE_ONLY": 10, +} -logger = logging.Logger("logger") +logger = logging.getLogger("GeneratePredictionsScript") +logger.setLevel(logging.INFO) +parser = argparse.ArgumentParser() +parser.add_argument("-e", "--env", choices=["dev", "ci", "prod"], default="dev", help="Specify environment") -def read_input_from_file(path_to_input: str = "reference_library.csv") -> List[str]: - start = time.time() - with open(path_to_input, "r") as file: - contents = file.readlines() - logger.info(f"Reading took {time.time() - start :2f} seconds") +if __name__ == "__main__": + args = parser.parse_args() - logger.info(f"Input file has {len(contents)} rows") + env_source = os.environ + dev = False - return contents + if args.env == "dev": + env_source = TEST_ENV + dev = True + logger.info(f"Environment: {args.env}") -if __name__ == "__main__": - input_path = sys.argv[1] - output_path = sys.argv[2] if len(sys.argv) > 2 else "prediction_output" - model_id = sys.argv[3] if len(sys.argv) > 3 else EXAMPLE_MODEL_ID - format = sys.argv[4] if len(sys.argv) > 4 else "csv" - - input_items = read_input_from_file(input_path) - - with ErsiliaModel(model_id) as mdl: - logger.info(f"Fetched model {model_id}") - - start = time.time() - predictions = mdl.run(input_items, output="pandas") - logger.info(f"Inference took {time.time() - start :2f} seconds") - - if format == "csv": - predictions.to_csv(output_path + ".csv") # type: ignore - elif format == "parquet": - predictions.to_parquet(output_path + ".parquet") # type: ignore - else: - print("unsupported format") + logger.info("Setting up writer configuration") + model_id = env_source.get("INPUT_MODEL_ID") # type: ignore + sha = env_source.get("INPUT_SHA") # type: ignore + numerator = int(env_source.get("INPUT_NUMERATOR")) # type: ignore + denominator = int(env_source.get("INPUT_DENOMINATOR")) # type: ignore + sample_only = env_source.get("INPUT_SAMPLE_ONLY") # type: ignore + + data_config = DataLakeConfig() + worker_config = WorkerConfig( + git_sha=sha, + denominator=denominator, + numerator=numerator, + sample=sample_only, + ) + + logger.debug( + "Configured writer with following settings: \n%s", + "\n".join(f"{k}: {v}" for k, v in data_config.model_dump().items()), + "\n".join(f"{k}: {v}" for k, v in worker_config.model_dump().items()), + ) + + writer = PredictionWriter(data_config=data_config, worker_config=worker_config, model_id=model_id, dev=dev) + + input_file = writer.fetch() + + output_file = writer.predict(input_file) + + df_predictions = writer.postprocess(output_file) + + writer.write_to_lake(df_predictions) diff --git a/scripts/generate_predictions_with_aws_and_ersilia.py b/scripts/generate_predictions_with_aws_and_ersilia.py deleted file mode 100644 index ede4229..0000000 --- a/scripts/generate_predictions_with_aws_and_ersilia.py +++ /dev/null @@ -1,100 +0,0 @@ -import argparse -import logging -import os -import subprocess - -import awswrangler as wr -import boto3 -import pandas as pd - -TEST_ENV = { - "MODEL_ID": "eos2zmb", - "SHA": "1234", - "numerator": 1, - "denominator": 2, - "sample-only": 10, - "GITHUB_REPOSITORY": "precalculations-bucket", -} - -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) - - -def split_csv(input_path: str, numerator: int, denominator: int) -> str: - df = pd.read_csv(input_path) - - total_length = len(df) - chunk_size = total_length // denominator - - start_row = (numerator - 1) * chunk_size - end_row = start_row + chunk_size - - df = df.iloc[start_row:end_row] - df.to_csv("input.csv", index=False) - - return "input.csv" - - -def fetch_input_from_s3(bucket_name: str, filename: str, local_filename: str) -> None: - s3 = boto3.client("s3") - s3.download_file(bucket_name, filename, local_filename) - logger.info(f"Downloaded {filename} from S3") - - -def upload_to_s3_via_cli(local_filename: str, s3_destination: str) -> None: - subprocess.run(["aws", "s3", "cp", local_filename, s3_destination]) - - -parser = argparse.ArgumentParser() -parser.add_argument("-e", "--env", choices=["dev", "ci", "prod"], default="dev", help="Specify environment") - -if __name__ == "__main__": - args = parser.parse_args() - - env_source = TEST_ENV if args.env == "dev" else os.environ - logger.info(f"environment: {args.env}") - - model_id = env_source.get("MODEL_ID") # type: ignore - sha = env_source.get("SHA") # type: ignore - numerator = int(env_source.get("numerator")) # type: ignore - denominator = int(env_source.get("denominator")) # type: ignore - sample_only = env_source.get("sample-only") # type: ignore - bucket_name = env_source.get("GITHUB_REPOSITORY").replace("/", "-") # type: ignore - - # sample-only defines size of smaller reference library files from s3 - input_filename = f"reference_library_{sample_only}.csv" if sample_only else "reference_library.csv" - - logger.info(f"fetching input {bucket_name, input_filename} from s3") - fetch_input_from_s3(bucket_name, input_filename, input_filename) - - partitioned_input = split_csv(input_filename, numerator, denominator) - - logger.info(f"calling ersilia for model {model_id}") - subprocess.run([".venv/bin/ersilia", "-v", "serve", model_id]) # type: ignore - subprocess.run([".venv/bin/ersilia", "-v", "run", "-i", partitioned_input, "-o", "output.csv"]) - - s3_destination = f"s3://precalculations-bucket/out/{model_id}/{sha}/{sha}_{numerator - 1:04d}.csv" - - logger.info("postprocessing predicitons") - df = pd.read_csv("output.csv") - columns_to_use = df.columns[-2:] - output = df[columns_to_use].to_dict(orient="records") - df["output"] = output - df["model_id"] = model_id - df = df[["key", "input", "output", "model_id"]] - df = df.rename(columns={"key": "input_key", "input": "smiles"}) - - logger.info("writing predicitons to s3") - - wr.s3.to_parquet( - df=df, - path=os.path.join( - "s3://", - "precalculations-bucket", - "predictions", - ), - dataset=True, - database="precalcs_test", - table="predictions", - partition_cols=["model_id"], - ) From bbb3452fb4d21d19ada7f92fba1e1522a6333e9d Mon Sep 17 00:00:00 2001 From: Kartikey Vyas Date: Sun, 25 Aug 2024 14:51:03 +1000 Subject: [PATCH 02/14] fix environment set up for writing --- .gitignore | 4 +++- Makefile | 2 +- poetry.lock | 22 +++++++++++++++++++++- precalculator/writer.py | 1 + pyproject.toml | 1 + 5 files changed, 27 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 7c6d9d8..9028e98 100644 --- a/.gitignore +++ b/.gitignore @@ -172,4 +172,6 @@ data/ .DS_Store # VSCode -.vscode/ \ No newline at end of file +.vscode/ + +*.csv \ No newline at end of file diff --git a/Makefile b/Makefile index 8e1f4be..10a2574 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,7 @@ install: @if [ "$(shell which poetry)" = "" ]; then \ $(MAKE) install-poetry; \ fi - @$(MAKE) setup-poetry install-hooks + @$(MAKE) install-ersilia setup-poetry install-hooks install-prod: @if [ "$(shell which poetry)" = "" ]; then \ diff --git a/poetry.lock b/poetry.lock index 087ba98..e30beb8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2954,6 +2954,26 @@ files = [ [package.dependencies] typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0" +[[package]] +name = "pydantic-settings" +version = "2.4.0" +description = "Settings management using Pydantic" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pydantic_settings-2.4.0-py3-none-any.whl", hash = "sha256:bb6849dc067f1687574c12a639e231f3a6feeed0a12d710c1382045c5db1c315"}, + {file = "pydantic_settings-2.4.0.tar.gz", hash = "sha256:ed81c3a0f46392b4d7c0a565c05884e6e54b3456e6f0fe4d8814981172dc9a88"}, +] + +[package.dependencies] +pydantic = ">=2.7.0" +python-dotenv = ">=0.21.0" + +[package.extras] +azure-key-vault = ["azure-identity (>=1.16.0)", "azure-keyvault-secrets (>=4.8.0)"] +toml = ["tomli (>=2.0.1)"] +yaml = ["pyyaml (>=6.0.1)"] + [[package]] name = "pygments" version = "2.18.0" @@ -4221,4 +4241,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "13c9f75df84d7ea1fa7aa3ab78c7be8cb7bf3048f49f5e05ab305e6e0c844fff" +content-hash = "ea27b6292d43a907457011d608e71e13c877ee57f58f9ab1aa0a24772b320ad2" diff --git a/precalculator/writer.py b/precalculator/writer.py index 9073db3..9f69652 100644 --- a/precalculator/writer.py +++ b/precalculator/writer.py @@ -26,6 +26,7 @@ def __init__(self, data_config: DataLakeConfig, worker_config: WorkerConfig, mod self.worker_config = worker_config self.model_id = model_id self.s3 = boto3.client("s3") + self.dev = dev self.logger = logging.getLogger("PredictionWriter") self.logger.setLevel(logging.INFO) diff --git a/pyproject.toml b/pyproject.toml index 2dfdca5..c74f477 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ awswrangler = "^3.9.0" numpy = "^1.26.4" pyairtable = "^2.3.3" ersilia = {path = "ersilia", develop = true} +pydantic-settings = "^2.4.0" [tool.poetry.group.dev.dependencies] pytest = "^7.4.0" From 80d46a157d57f594edd8ababb2aa66d5aee5587e Mon Sep 17 00:00:00 2001 From: Kartikey Vyas Date: Sun, 25 Aug 2024 15:07:16 +1000 Subject: [PATCH 03/14] lint and format --- config/app.py | 6 +++--- precalculator/models.py | 7 +++---- precalculator/writer.py | 6 +++--- scripts/generate_predictions.py | 25 +++++++++++++++---------- 4 files changed, 24 insertions(+), 20 deletions(-) diff --git a/config/app.py b/config/app.py index ceb2e91..dd8d3bf 100644 --- a/config/app.py +++ b/config/app.py @@ -19,6 +19,6 @@ class DataLakeConfig(BaseSettings): class WorkerConfig(BaseModel): git_sha: str - denominator: int # the total number of workers to split data over - numerator: int # the number assigned to this worker - sample: Optional[int] = None # sample size of reference library (in percentage terms, 0-100) + denominator: int # the total number of workers to split data over + numerator: int # the number assigned to this worker + sample: Optional[str] = None # sample size of reference library (in number of rows) diff --git a/precalculator/models.py b/precalculator/models.py index 9b1b834..ccebbb5 100644 --- a/precalculator/models.py +++ b/precalculator/models.py @@ -1,4 +1,3 @@ - import pandas as pd from pydantic import BaseModel, Field @@ -41,7 +40,7 @@ def validate_dataframe_schema(df: pd.DataFrame, model: BaseModel) -> None: else: pandas_dtype = df[field_name].dtype pydantic_type = field.annotation - if not check_type_compatibility(pandas_dtype, pydantic_type): + if not _check_type_compatibility(pandas_dtype, pydantic_type): errors.append(f"Column {field_name} has type {pandas_dtype}, expected {pydantic_type}") for column in df.columns: @@ -52,7 +51,7 @@ def validate_dataframe_schema(df: pd.DataFrame, model: BaseModel) -> None: raise SchemaValidationError(errors) -def check_type_compatibility(pandas_dtype, pydantic_type): +def _check_type_compatibility(pandas_dtype, pydantic_type) -> bool: # noqa: ANN001 type_map = { "object": [str, dict, list], "int64": [int], @@ -60,4 +59,4 @@ def check_type_compatibility(pandas_dtype, pydantic_type): "bool": [bool], "datetime64": [pd.Timestamp], } - return pydantic_type in type_map.get(str(pandas_dtype)) + return pydantic_type in type_map.get(str(pandas_dtype)) # type: ignore diff --git a/precalculator/writer.py b/precalculator/writer.py index 9f69652..59fc586 100644 --- a/precalculator/writer.py +++ b/precalculator/writer.py @@ -57,7 +57,7 @@ def fetch(self) -> str: partition_metadata = self._split_csv() - logger.info(f"Predicting for rows {partition_metadata[0]} to {partition_metadata[1]}") + logger.info(f"Partitioned rows {partition_metadata[0]} to {partition_metadata[1]}") return PROCESSED_FILE_NAME @@ -103,8 +103,8 @@ def postprocess(self, ersilia_output_path: str) -> pd.DataFrame: return df - def write_to_lake(self, outputs: pd.DataFrame): - validate_dataframe_schema(outputs, Prediction) + def write_to_lake(self, outputs: pd.DataFrame) -> None: + validate_dataframe_schema(outputs, Prediction) # type: ignore wr.s3.to_parquet( df=outputs, diff --git a/scripts/generate_predictions.py b/scripts/generate_predictions.py index 5b7507c..9040598 100644 --- a/scripts/generate_predictions.py +++ b/scripts/generate_predictions.py @@ -1,6 +1,7 @@ import argparse import logging import os +import sys from config.app import DataLakeConfig, WorkerConfig from precalculator.writer import PredictionWriter @@ -14,7 +15,7 @@ } logger = logging.getLogger("GeneratePredictionsScript") -logger.setLevel(logging.INFO) +logging.basicConfig(stream=sys.stdout, level=logging.INFO) parser = argparse.ArgumentParser() parser.add_argument("-e", "--env", choices=["dev", "ci", "prod"], default="dev", help="Specify environment") @@ -26,29 +27,33 @@ dev = False if args.env == "dev": - env_source = TEST_ENV + env_source = TEST_ENV # type: ignore dev = True logger.info(f"Environment: {args.env}") logger.info("Setting up writer configuration") - model_id = env_source.get("INPUT_MODEL_ID") # type: ignore + model_id = str(env_source.get("INPUT_MODEL_ID")) # type: ignore sha = env_source.get("INPUT_SHA") # type: ignore numerator = int(env_source.get("INPUT_NUMERATOR")) # type: ignore denominator = int(env_source.get("INPUT_DENOMINATOR")) # type: ignore sample_only = env_source.get("INPUT_SAMPLE_ONLY") # type: ignore - data_config = DataLakeConfig() - worker_config = WorkerConfig( - git_sha=sha, - denominator=denominator, - numerator=numerator, + data_config = DataLakeConfig() # type: ignore + worker_config = WorkerConfig( # type: ignore + git_sha=sha, # type: ignore + denominator=denominator, # type: ignore + numerator=numerator, # type: ignore sample=sample_only, ) - logger.debug( - "Configured writer with following settings: \n%s", + logger.info( + "Configured writer with following DataLake settings: \n%s", "\n".join(f"{k}: {v}" for k, v in data_config.model_dump().items()), + ) + + logger.info( + "Configured writer with following Worker settings: \n%s", "\n".join(f"{k}: {v}" for k, v in worker_config.model_dump().items()), ) From 32a053a2f0c66c3c2e33b68b356edef842f9fbee Mon Sep 17 00:00:00 2001 From: Kartikey Vyas Date: Sun, 25 Aug 2024 15:13:00 +1000 Subject: [PATCH 04/14] parameterise matrix in predict parallel --- .github/workflows/predict-parallel.yml | 30 ++++++++++++++++++-------- .github/workflows/predict.yml | 2 +- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/.github/workflows/predict-parallel.yml b/.github/workflows/predict-parallel.yml index bec8819..59e9305 100644 --- a/.github/workflows/predict-parallel.yml +++ b/.github/workflows/predict-parallel.yml @@ -8,6 +8,11 @@ on: sample-only: required: false type: string + n-workers: + description: 'number of workers to use (max 50)' + required: true + default: '50' + type: string permissions: contents: read @@ -21,25 +26,32 @@ jobs: - name: Get start time id: start-time run: echo "start-time=$(date +%s)" >> "$GITHUB_OUTPUT" + + + generate-matrix: + runs-on: ubuntu-latest + outputs: + matrix: ${{ steps.set-matrix.outputs.matrix }} + steps: + - id: set-matrix + run: | + start=1 + end=${{ inputs.n-workers }} + matrix=$(seq -s ',' $start $end) + echo "matrix=[${matrix}]" >> $GITHUB_OUTPUT + matrix-inference: if: github.repository != 'ersilia-os/eos-template' strategy: matrix: - # numerator: [ - # 1,2,3,4,5,6,7,8,9,10, - # 11,12,13,14,15,16,17,18,19,20, - # 21,22,23,24,25,26,27,28,29,30, - # 31,32,33,34,35,36,37,38,39,40, - # 41,42,43,44,45,46,47,48,49,50 - # ] - numerator: [1] + numerator: ${{ fromJson(needs.generate-matrix.outputs.matrix) }} uses: ./.github/workflows/predict.yml with: numerator: ${{ matrix.numerator }} - denominator: 50 + denominator: ${{ inputs.n-workers }} model-id: ${{ inputs.model-id }} sample-only: ${{ inputs.sample-only }} SHA: ${{ github.sha }} diff --git a/.github/workflows/predict.yml b/.github/workflows/predict.yml index 8463072..2753e59 100644 --- a/.github/workflows/predict.yml +++ b/.github/workflows/predict.yml @@ -75,4 +75,4 @@ jobs: INPUT_NUMERATOR: ${{ inputs.numerator }} INPUT_DENOMINATOR: ${{ inputs.denominator }} INPUT_SAMPLE_ONLY: ${{ inputs.sample-only }} - run: .venv/bin/python scripts/generate_predictions.py prod \ No newline at end of file + run: .venv/bin/python scripts/generate_predictions.py --env prod \ No newline at end of file From c7a523472de482bb2ab824c233b32f5f3cc6bfdf Mon Sep 17 00:00:00 2001 From: Kartikey Vyas Date: Sun, 25 Aug 2024 15:33:13 +1000 Subject: [PATCH 05/14] fix syntax to retrieve variable --- .github/workflows/predict-parallel.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/predict-parallel.yml b/.github/workflows/predict-parallel.yml index 59e9305..f4164a9 100644 --- a/.github/workflows/predict-parallel.yml +++ b/.github/workflows/predict-parallel.yml @@ -35,13 +35,14 @@ jobs: steps: - id: set-matrix run: | - start=1 + start='1' end=${{ inputs.n-workers }} matrix=$(seq -s ',' $start $end) echo "matrix=[${matrix}]" >> $GITHUB_OUTPUT matrix-inference: + needs: generate-matrix if: github.repository != 'ersilia-os/eos-template' strategy: From d876aa06c3a3c9265e9ffd1ec48f497c9ce513e0 Mon Sep 17 00:00:00 2001 From: Kartikey Vyas Date: Sun, 25 Aug 2024 15:35:22 +1000 Subject: [PATCH 06/14] use string params --- .github/workflows/predict.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/predict.yml b/.github/workflows/predict.yml index 2753e59..35ab627 100644 --- a/.github/workflows/predict.yml +++ b/.github/workflows/predict.yml @@ -5,10 +5,10 @@ on: inputs: numerator: required: true - type: number + type: string denominator: required: true - type: number + type: string model-id: required: true type: string From cff81adfaeb5b283418f60389fa77c5ae1a4bee3 Mon Sep 17 00:00:00 2001 From: Kartikey Vyas Date: Sun, 25 Aug 2024 16:35:29 +1000 Subject: [PATCH 07/14] cache docker image --- .github/workflows/predict-parallel.yml | 38 +++++++++++++++++++++++++- .github/workflows/predict.yml | 19 ++++++++++--- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/.github/workflows/predict-parallel.yml b/.github/workflows/predict-parallel.yml index f4164a9..8289caa 100644 --- a/.github/workflows/predict-parallel.yml +++ b/.github/workflows/predict-parallel.yml @@ -27,7 +27,43 @@ jobs: id: start-time run: echo "start-time=$(date +%s)" >> "$GITHUB_OUTPUT" - + pull-model-image: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + repository: ersilia-os/ersilia + + - uses: actions/setup-python@v5 + with: + python-version: '3.10' + + - name: set up ersilia + run: | + echo $CONDA/bin >> $GITHUB_PATH + source $CONDA/etc/profile.d/conda.sh + conda install -y python=3.10 + conda init + conda install git-lfs -c conda-forge + git-lfs install + conda install gh -c conda-forge + python -m pip install -e .[test] + pip install pyairtable + + - name: fetch and save requested model image + run: | + source activate + ersilia fetch ${{ inputs.model-id }} + docker save ersiliaos/${{ inputs.model-id }} -o /tmp/${{ inputs.model-id }}.tar + + - name: cache model Docker image + uses: actions/cache@v3 + with: + path: /tmp/${{ inputs.model-id }}.tar + key: ${{ runner.os }}-docker-${{ inputs.model-id }} + restore-keys: | + ${{ runner.os }}-docker-${{ inputs.model-id }} + generate-matrix: runs-on: ubuntu-latest outputs: diff --git a/.github/workflows/predict.yml b/.github/workflows/predict.yml index 35ab627..a8510ec 100644 --- a/.github/workflows/predict.yml +++ b/.github/workflows/predict.yml @@ -44,12 +44,11 @@ jobs: run: sudo apt-get update && sudo apt-get install -y make - name: Set up Python environment - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: '3.10' - - - name: Run make install - run: make install-prod + cache: 'poetry' + - run: make install-prod # we need this step as ersilia will use the default conda environment to run the example model during `ersilia serve` # could get around this eventually if we only use conda for env management, but there are complexities around referencing a dev @@ -67,6 +66,18 @@ jobs: - name: Activate virtual environment run: source .venv/bin/activate + + - name: Restore cached Docker image + uses: actions/cache@v3 + with: + path: /tmp/${{ inputs.model-id }}.tar + key: ${{ runner.os }}-docker-${{ inputs.model-id }} + restore-keys: | + ${{ runner.os }}-docker-${{ inputs.model-id }} + + - name: Load Docker image + run: | + docker load -i /tmp/${{ inputs.model-id }}.tar - name: Run Python script to generate predictions and upload to S3 env: From 3da555c53a8b3ed0148b73e11a8ef18e87b39703 Mon Sep 17 00:00:00 2001 From: Kartikey Vyas Date: Sun, 25 Aug 2024 16:47:20 +1000 Subject: [PATCH 08/14] try to cache docker image and only pull on cache miss --- .github/workflows/predict-parallel.yml | 18 +++++++++++------- .github/workflows/predict.yml | 2 ++ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/.github/workflows/predict-parallel.yml b/.github/workflows/predict-parallel.yml index 8289caa..1a1e086 100644 --- a/.github/workflows/predict-parallel.yml +++ b/.github/workflows/predict-parallel.yml @@ -49,14 +49,9 @@ jobs: conda install gh -c conda-forge python -m pip install -e .[test] pip install pyairtable - - - name: fetch and save requested model image - run: | - source activate - ersilia fetch ${{ inputs.model-id }} - docker save ersiliaos/${{ inputs.model-id }} -o /tmp/${{ inputs.model-id }}.tar - + - name: cache model Docker image + id: cache uses: actions/cache@v3 with: path: /tmp/${{ inputs.model-id }}.tar @@ -64,7 +59,16 @@ jobs: restore-keys: | ${{ runner.os }}-docker-${{ inputs.model-id }} + - name: fetch and save requested model image + if: steps.cache.outputs.cache-hit != 'true' + run: | + source activate + ersilia fetch --from_dockerhub ${{ inputs.model-id }} + docker save ersiliaos/${{ inputs.model-id }} -o /tmp/${{ inputs.model-id }}.tar + + generate-matrix: + needs: pull-model-image runs-on: ubuntu-latest outputs: matrix: ${{ steps.set-matrix.outputs.matrix }} diff --git a/.github/workflows/predict.yml b/.github/workflows/predict.yml index a8510ec..6ddb26f 100644 --- a/.github/workflows/predict.yml +++ b/.github/workflows/predict.yml @@ -68,6 +68,7 @@ jobs: run: source .venv/bin/activate - name: Restore cached Docker image + id: cache uses: actions/cache@v3 with: path: /tmp/${{ inputs.model-id }}.tar @@ -76,6 +77,7 @@ jobs: ${{ runner.os }}-docker-${{ inputs.model-id }} - name: Load Docker image + if: steps.cache.outputs.cache-hit != 'true' run: | docker load -i /tmp/${{ inputs.model-id }}.tar From 510efb4187e3584e2d991673f6530ce62db210a1 Mon Sep 17 00:00:00 2001 From: Kartikey Vyas Date: Sun, 25 Aug 2024 16:53:16 +1000 Subject: [PATCH 09/14] pull image directly from docker, don't use ersilia --- .github/workflows/predict-parallel.yml | 25 ++----------------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/.github/workflows/predict-parallel.yml b/.github/workflows/predict-parallel.yml index 1a1e086..85810ab 100644 --- a/.github/workflows/predict-parallel.yml +++ b/.github/workflows/predict-parallel.yml @@ -30,26 +30,6 @@ jobs: pull-model-image: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 - with: - repository: ersilia-os/ersilia - - - uses: actions/setup-python@v5 - with: - python-version: '3.10' - - - name: set up ersilia - run: | - echo $CONDA/bin >> $GITHUB_PATH - source $CONDA/etc/profile.d/conda.sh - conda install -y python=3.10 - conda init - conda install git-lfs -c conda-forge - git-lfs install - conda install gh -c conda-forge - python -m pip install -e .[test] - pip install pyairtable - - name: cache model Docker image id: cache uses: actions/cache@v3 @@ -59,11 +39,10 @@ jobs: restore-keys: | ${{ runner.os }}-docker-${{ inputs.model-id }} - - name: fetch and save requested model image + - name: pull and save requested model image if: steps.cache.outputs.cache-hit != 'true' run: | - source activate - ersilia fetch --from_dockerhub ${{ inputs.model-id }} + docker pull ersiliaos/${{ inputs.model-id }}:latest docker save ersiliaos/${{ inputs.model-id }} -o /tmp/${{ inputs.model-id }}.tar From da66c361a6cdd160989654d2ef623d5cd7cfcb95 Mon Sep 17 00:00:00 2001 From: Kartikey Vyas Date: Sun, 25 Aug 2024 17:02:25 +1000 Subject: [PATCH 10/14] remove unnecessary files to free up space --- .github/workflows/predict-parallel.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/predict-parallel.yml b/.github/workflows/predict-parallel.yml index 85810ab..032b587 100644 --- a/.github/workflows/predict-parallel.yml +++ b/.github/workflows/predict-parallel.yml @@ -30,6 +30,12 @@ jobs: pull-model-image: runs-on: ubuntu-latest steps: + - name: Remove unnecessary files + run: | + sudo rm -rf /usr/share/dotnet + sudo rm -rf "$AGENT_TOOLSDIRECTORY" + sudo rm -rf /opt/ghc + sudo rm -rf "/usr/local/share/boost" - name: cache model Docker image id: cache uses: actions/cache@v3 From a83c1b183bc7e51f063bbcf7607d1f972ece76e9 Mon Sep 17 00:00:00 2001 From: Kartikey Vyas Date: Sun, 25 Aug 2024 22:13:20 +1000 Subject: [PATCH 11/14] cache pip instead of poetry --- .github/workflows/predict.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/predict.yml b/.github/workflows/predict.yml index 6ddb26f..fad8e99 100644 --- a/.github/workflows/predict.yml +++ b/.github/workflows/predict.yml @@ -47,7 +47,7 @@ jobs: uses: actions/setup-python@v5 with: python-version: '3.10' - cache: 'poetry' + cache: 'pip' - run: make install-prod # we need this step as ersilia will use the default conda environment to run the example model during `ersilia serve` From d25b9d6b469be194b0118f0572234479481a6751 Mon Sep 17 00:00:00 2001 From: Kartikey Vyas Date: Sun, 25 Aug 2024 22:18:24 +1000 Subject: [PATCH 12/14] run load docker image on cache hit --- .github/workflows/predict.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/predict.yml b/.github/workflows/predict.yml index fad8e99..a9212f7 100644 --- a/.github/workflows/predict.yml +++ b/.github/workflows/predict.yml @@ -77,7 +77,7 @@ jobs: ${{ runner.os }}-docker-${{ inputs.model-id }} - name: Load Docker image - if: steps.cache.outputs.cache-hit != 'true' + if: steps.cache.outputs.cache-hit == 'true' run: | docker load -i /tmp/${{ inputs.model-id }}.tar From 7d7ca1b54c47db421dbf52bbabbb3fd85b60fa35 Mon Sep 17 00:00:00 2001 From: Kartikey Vyas Date: Sun, 25 Aug 2024 22:26:49 +1000 Subject: [PATCH 13/14] clear space on worker --- .github/workflows/predict.yml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.github/workflows/predict.yml b/.github/workflows/predict.yml index a9212f7..4e82219 100644 --- a/.github/workflows/predict.yml +++ b/.github/workflows/predict.yml @@ -67,6 +67,13 @@ jobs: - name: Activate virtual environment run: source .venv/bin/activate + - name: Remove unnecessary files + run: | + sudo rm -rf /usr/share/dotnet + sudo rm -rf "$AGENT_TOOLSDIRECTORY" + sudo rm -rf /opt/ghc + sudo rm -rf "/usr/local/share/boost" + - name: Restore cached Docker image id: cache uses: actions/cache@v3 From 7fbe229b9b8fef86c322ece1e94e7c507d529e1c Mon Sep 17 00:00:00 2001 From: Kartikey Vyas Date: Sun, 25 Aug 2024 22:43:13 +1000 Subject: [PATCH 14/14] comment out image caching --- .github/workflows/predict-parallel.yml | 46 +++++++++++++------------- .github/workflows/predict.yml | 24 +++++++------- 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/.github/workflows/predict-parallel.yml b/.github/workflows/predict-parallel.yml index 032b587..c0e24d4 100644 --- a/.github/workflows/predict-parallel.yml +++ b/.github/workflows/predict-parallel.yml @@ -27,33 +27,33 @@ jobs: id: start-time run: echo "start-time=$(date +%s)" >> "$GITHUB_OUTPUT" - pull-model-image: - runs-on: ubuntu-latest - steps: - - name: Remove unnecessary files - run: | - sudo rm -rf /usr/share/dotnet - sudo rm -rf "$AGENT_TOOLSDIRECTORY" - sudo rm -rf /opt/ghc - sudo rm -rf "/usr/local/share/boost" - - name: cache model Docker image - id: cache - uses: actions/cache@v3 - with: - path: /tmp/${{ inputs.model-id }}.tar - key: ${{ runner.os }}-docker-${{ inputs.model-id }} - restore-keys: | - ${{ runner.os }}-docker-${{ inputs.model-id }} + # pull-model-image: + # runs-on: ubuntu-latest + # steps: + # - name: Remove unnecessary files + # run: | + # sudo rm -rf /usr/share/dotnet + # sudo rm -rf "$AGENT_TOOLSDIRECTORY" + # sudo rm -rf /opt/ghc + # sudo rm -rf "/usr/local/share/boost" + # - name: cache model Docker image + # id: cache + # uses: actions/cache@v3 + # with: + # path: /tmp/${{ inputs.model-id }}.tar + # key: ${{ runner.os }}-docker-${{ inputs.model-id }} + # restore-keys: | + # ${{ runner.os }}-docker-${{ inputs.model-id }} - - name: pull and save requested model image - if: steps.cache.outputs.cache-hit != 'true' - run: | - docker pull ersiliaos/${{ inputs.model-id }}:latest - docker save ersiliaos/${{ inputs.model-id }} -o /tmp/${{ inputs.model-id }}.tar + # - name: pull and save requested model image + # if: steps.cache.outputs.cache-hit != 'true' + # run: | + # docker pull ersiliaos/${{ inputs.model-id }}:latest + # docker save ersiliaos/${{ inputs.model-id }} -o /tmp/${{ inputs.model-id }}.tar generate-matrix: - needs: pull-model-image + # needs: pull-model-image runs-on: ubuntu-latest outputs: matrix: ${{ steps.set-matrix.outputs.matrix }} diff --git a/.github/workflows/predict.yml b/.github/workflows/predict.yml index 4e82219..a0daffe 100644 --- a/.github/workflows/predict.yml +++ b/.github/workflows/predict.yml @@ -74,19 +74,19 @@ jobs: sudo rm -rf /opt/ghc sudo rm -rf "/usr/local/share/boost" - - name: Restore cached Docker image - id: cache - uses: actions/cache@v3 - with: - path: /tmp/${{ inputs.model-id }}.tar - key: ${{ runner.os }}-docker-${{ inputs.model-id }} - restore-keys: | - ${{ runner.os }}-docker-${{ inputs.model-id }} + # - name: Restore cached Docker image + # id: cache + # uses: actions/cache@v3 + # with: + # path: /tmp/${{ inputs.model-id }}.tar + # key: ${{ runner.os }}-docker-${{ inputs.model-id }} + # restore-keys: | + # ${{ runner.os }}-docker-${{ inputs.model-id }} - - name: Load Docker image - if: steps.cache.outputs.cache-hit == 'true' - run: | - docker load -i /tmp/${{ inputs.model-id }}.tar + # - name: Load Docker image + # if: steps.cache.outputs.cache-hit == 'true' + # run: | + # docker load -i /tmp/${{ inputs.model-id }}.tar - name: Run Python script to generate predictions and upload to S3 env: