The project integrates advanced machine learning techniques with MLOps practices to create a scalable, automated, and efficient pipeline for financial market forecasting. The approach focuses on stock market analysis, combining data from various sources including FRED (Federal Reserve Economic Data), Fama-French factors. The project utilizes modern tools and platforms such as Google Cloud Platform (GCP), Vertex AI, GitHub Actions, and Airflow, emphasizing automation, monitoring, and model lifecycle management.
Here's an overview of the projectc how we build a predictive model for stock price forecasting while implementing a complete MLOps workflow for data collection, model training, deployment, and monitoring to ensure scalability, reproducibility, and maintainability.
To read all files:
Refer to Assignments Submissions
To read current phase:
Refer to Model Deployment
- Directory Structure
- Data Sources
- Setup and Usage
- Environment Setup
- Running the Pipeline
- Test Functions
- Reproducibility and Data Versioning
- Airflow Implementation
- Pipeline Components
- CI/CD Prerequisites
- Model Serving and Deployment
- Monitoring and Maintenance
- Notifications
- UI Dashboard for Stock Price Prediction
.
├── artifacts
│ ├── drift_detection_log.txt
│ └── schema.pbtxt
├── assets # airflow*, gcp*, and github* images
│ ├── airflow_dags.png
│ ├── Model serving.png
│ ├── wandb_main_dashboard.png
│ └── VM_instance.png
├── Assignments_Submissions # Reports and documentation
│ ├── DataPipeline Phase # Includes README and pipeline documentation
│ ├── Model Deployment # Deployment phase documentation
│ ├── Model Development Pipeline Phase
│ ├── Scoping Phase # Scoping reports and user needs
├── data # Raw and preprocessed datasets
│ ├── raw # Unprocessed datasets
│ ├── preprocessed # Cleaned datasets
├── GCP # Google Cloud-related files and scripts
│ ├── application credentials, deployment configs
│ ├── gcpdeploy # Scripts for training and serving models
│ └── wandb # Weights & Biases logs and metadata
├── pipeline # Pipeline scripts and configurations
│ ├── airflow # DAGs, logs, and DAG-related scripts
│ ├── dags/data # Data source files for pipeline tasks
│ ├── artifacts # Artifacts generated from DAGs
│ ├── tests # Unit test scripts for pipeline tasks
│ └── wandb # Workflow and run logs
├── src # Core source code (Py script, Notebook, Model scripts)
│ ├── Data
│ ├── best_model.py
│ └── Datadrift_detection_updated.ipynb
├── requirements.txt # Python dependencies
├── dockerfile
├── LICENSE
└── README.md # Main documentation
To set up and run the pipeline:
Refer to Environment Setup
Refer to Running Pipeline
To set up and run this project, ensure the following are installed:
- Python (3.8 or later)
- Docker (for running Apache Airflow)
- DVC (for data version control)
- Google Cloud SDK (we are deploying on GCP)
-
Clone the Repository
git clone https://github.com/IE7374-MachineLearningOperations/StockPricePrediction.git cd Stock-Price-Prediction
-
Install Python Dependencies Install all required packages listed in
requirements.txt
:pip install -r requirements.txt
-
Initialize DVC Set up DVC to manage large data files by pulling the tracked data:
dvc pull
To execute the data pipeline, follow these steps:
-
Start Airflow Services Run Docker Compose to start services of the Airflow web server, scheduler:
cd pipeline/airflow/ docker-compose up
-
Access Airflow UI Open
http://localhost:8080
in your browser. Log into the Airflow UI and enable the DAG -
Trigger the DAG Trigger the DAG manually to start processing. The pipeline will:
- Ingest raw data and preprocess it.
- Perform correlation analysis to identify redundant features.
- Execute PCA to reduce dimensionality.
- Generate visualizations, such as time series plots and correlation matrices.
-
Check Outputs Once completed, check the output files and images in the
artifcats/
folder.
or
# Step 1: Activate virtual environment:
cd airflow_env/ # (go to Airflow environment and open in terminal)
source bin/activate
# Step 2: Install Airflow (not required if done before)
pip install apache-airflow
# Step 3: Initialize Airflow database (not required if done before)
airflow db init
# Step 4: Start Airflow web server and airflow scheduler
airflow webserver -p 8080 & airflow scheduler
# Step 5: Access Airflow UI in your default browser
# http://localhost:8080
# Step 6: Deactivate virtual environment (after work completion)
deactivate
Run all tests in the tests
directory
pytest pipeline/airflow/tests/
We used DVC (Data Version Control) for files management.
-
Initialize DVC (not required if already initialize):
dvc init
-
Pull Data Files Pull the DVC-tracked data files to ensure all required datasets are available:
dvc pull
-
Data Versioning Data files are generated with
.dvc
files in the repository -
Tracking New Data If new files are added, to track them. Example:
dvc add <file-path> dvc push
-
Our Project Bucket
- ADS Index: Tracks economic trends and business cycles.
- Fama-French Factors: Provides historical data for financial research.
- FRED Variables: Includes various economic indicators, such as AMERIBOR, NIKKEI 225, and VIX.
- YFinance: Pulls historical stock data ('GOOGL') for financial time-series analysis.
The Airflow DAG (Group10_DataPipeline_MLOps
) was successfully implemented and tested, with all tasks executing as expected. The following details summarize the pipeline's performance and execution.
- Total Runs: 10
- Last Run: 2024-11-16 02:34:59 UTC
- Run Status: Success
- Run Duration: 00:03:37
The DAG consists of 19 tasks, each representing a step in the data processing, feature engineering, and model development pipeline. Key tasks include:
download_data_task
- Downloads initial datasets from multiple financial data sources.convert_data_task
- Converts data types to ensure compatibility and efficiency.keep_latest_data_task
- Filters datasets to keep only the most recent data.remove_weekend_data_task
- Removes data points from weekends to ensure consistency in the time-series analysis.handle_missing_values_task
- Handles missing data using imputation techniques or removal where necessary.plot_time_series_task
- Visualizes the time series trends for better exploratory analysis.removing_correlated_variables_task
- Eliminates highly correlated variables to prevent redundancy.add_lagged_features_task
- Generates lagged features to capture temporal dependencies in the dataset.add_feature_interactions_task
- Creates new interaction features between existing variables for improved predictive power.add_technical_indicators_task
- Calculates financial technical indicators, such as moving averages and RSI.scaler_task
- Scales the dataset features to a consistent range to enhance model training stability.visualize_pca_components_task
- Performs PCA for dimensionality reduction and visualizes the key components.upload_blob_task
- Uploads processed data or model artifacts to cloud storage for later access.linear_regression_model_task
- Trains a linear regression model on the processed data.lstm_model_task
- Trains an LSTM model for time-series predictions.xgboost_model_task
- Trains an XGBoost model for predictive analysis.sensitivity_analysis_task
- Analyzes model sensitivity to understand feature importance and impact on predictions.detect_bias_task
- Detects any biases in the model by evaluating it across different data slices.send_email_task
- Sends notifications regarding the DAG's completion status to the stakeholders.
All tasks completed successfully with minimal execution time per task, indicating efficient pipeline performance.
The Execution Graph confirms that tasks were executed sequentially and completed successfully (marked in green), showing no deferred, failed, or skipped tasks. The Gantt Chart illustrates the time taken by each task and confirms that the pipeline completed within the expected duration.
Detailed logs for each task provide insights into the processing steps, including correlation matrix updates, data handling operations, and confirmation of successful execution steps.
Anomaly Detection and Automated Alert Automated email notifications were configured to inform the team of task success or failure. As shown in the sample emails, each run completed with a success message confirming the full execution of the DAG tasks.
The pipeline scripts were validated with 46 unit tests using pytest
. All tests passed with zero errors. These tests cover critical modules such as:
test_handle_missing.py
test_feature_interactions.py
test_plot_yfinance_time_series.py
These tests ensure the stability and accuracy of data transformations, visualizations, and feature engineering processes.
-
Data Extraction:
download_data.py
: Downloads datasets from various financial sources and loads them into thedata
directory.
-
Data Preprocessing:
convert_column_dtype.py
: Converts data types for efficient processing.remove_weekend_data.py
: Filters out weekend data to maintain consistency in time-series analyses.handle_missing.py
: Handles missing data by imputation or removal.
-
Feature Engineering:
correlation.py
: Generates a correlation matrix to identify highly correlated features.pca.py
: Performs Principal Component Analysis to reduce dimensionality and capture key components.lagged_features.py
: Creates lagged versions of features for time-series analysis.technical_indicators.py
: Calculates common technical indicators used in financial analyses.
-
Data Scaling:
scaler.py
: Scales features to a standard range for better model performance.
-
Analysis and Visualization:
plot_yfinance_time_series.py
: Plots time-series data.feature_interactions.py
: Generates interaction terms between features.
-
Hyperparameter Tuning:
- Hyperparameter tuning was performed using grid search, random search, and Bayesian optimization. The best models were selected and saved as checkpoints in the
artifacts
directory. Visualizations for hyperparameter sensitivity, such asLinear Regression - Hyperparameter Sensitivity: model__alpha.png
andmodel__l1_ratio.png
, are also included in theartifacts
directory.
- Hyperparameter tuning was performed using grid search, random search, and Bayesian optimization. The best models were selected and saved as checkpoints in the
-
Model Sensitivity Analysis:
- Sensitivity analysis was conducted to understand how changes in inputs impact model performance. Techniques like SHAP (SHapley Additive exPlanations) were used for feature importance analysis. Feature importance graphs, such as
Feature Importance for ElasticNet on Test Set.png
, were saved to provide insights into key features driving model predictions.
- Sensitivity analysis was conducted to understand how changes in inputs impact model performance. Techniques like SHAP (SHapley Additive exPlanations) were used for feature importance analysis. Feature importance graphs, such as
Steps for Replication
- Access to a Google Cloud Platform (GCP) account with billing enabled.
- Vertex AI and Cloud Build services activated within your GCP project.
- The `gcloud` CLI tool installed and configured with the appropriate permissions.
- A GitHub repository with access to GitHub Actions for automation.
- Required IAM roles for deploying models to Vertex AI and managing Cloud Build resources.
- Push changes to the main branch of the GitHub repository.
- GitHub Actions automatically triggers the CI/CD pipeline to initiate deployment using Cloud Build.
- The pipeline executes pre-defined steps, ensuring the model is correctly deployed to Vertex AI.
- Confirm that all dependencies are installed locally or in the CI/CD pipeline environment.
- Access the Vertex AI console to verify the deployment.
- Test the deployed model endpoint to confirm successful deployment and validate model predictions.
- Review monitoring dashboards to ensure no issues with prediction outputs or feature drift.
Please refer to Data drift Notebook
- The drift detection process identifies significant shifts in data patterns by analyzing key statistical metrics across features such as Volume, RSI, MACD, MA20, and SP500_VIXCLS_ratio. Metrics like mean, variance, and percentile differences reveal substantial deviations, indicating changes in data distribution. For example, Volume shows a mean difference of
1.083e+16
and variance difference of0.591
, while RSI highlights deviations in its 50th percentile and variance. MACD and MA20 exhibit notable shifts in percentiles, suggesting changes in trend-related features, and SP500_VIXCLS_ratio reveals variability in market volatility. These findings emphasize the need to monitor data sources, adjust preprocessing pipelines, and potentially retrain models to maintain prediction accuracy and ensure model reliability in dynamic environments.
Workflows and setups for managing machine learning pipelines on Vertex AI in Google Cloud are as follows:
-
Jupyter Notebooks in Vertex AI Workbench:
- The setup includes instances like
group10-test-vy
andmlops-group10
, both configured for NumPy/SciPy and scikit-learn environments. These notebooks are GPU-enabled, optimizing their utility for intensive ML operations.
- The setup includes instances like
-
Training Pipelines:
- Multiple training pipelines are orchestrated on Vertex AI, such as
mlops-group10
andgroup10-model-train
. These are primarily custom training pipelines aimed at tasks like hyperparameter tuning, training, and validation, leveraging the scalability of Google Cloud's infrastructure.
- Multiple training pipelines are orchestrated on Vertex AI, such as
-
Metadata Management:
- Metadata tracking is managed through Vertex AI Metadata Store, with records such as
vertex_dataset
. This ensures reproducibility and streamlined monitoring of all artifacts produced during ML workflows.
- Metadata tracking is managed through Vertex AI Metadata Store, with records such as
-
Model Registry:
- Deployed models like
mlops-group10-deploy
andgroup10-model
are maintained in the model registry. The registry supports versioning and deployment tracking for consistency and monitoring.
- Deployed models like
-
Endpoints for Online Prediction:
- Various endpoints, such as
mlops-group10-deploy
andtestt
, are active and ready for predictions. The setup is optimized for real-time online predictions, and monitoring can be enabled for anomaly detection or drift detection.
- Various endpoints, such as
- Model Registration: Once a model is trained, register it in Vertex AI's Model Registry. Specify the model name, version, and any relevant metadata.
- Create an Endpoint:
- In Vertex AI, create an endpoint. This endpoint will act as the interface for serving predictions.
- Navigate to Vertex AI > Online prediction > Endpoints > Create.
- Assign a name and select the appropriate region.
- Deploy the Model to an Endpoint:
- Select the registered model and choose "Deploy to Endpoint".
- Configure the deployment settings such as machine type, traffic splitting among model versions, and whether to enable logging or monitoring.
- Confirm deployment which will make the model ready to serve predictions.
- Manage Versions: In Vertex AI, each model can have multiple versions allowing easy rollback and version comparison.
- Update Versions: Upload new versions of the model to the Model Registry and adjust the endpoint configurations to direct traffic to the newer version.
- Automate Deployments: Use GitHub Actions and Google Cloud Build to automate the deployment of new model versions from a repository.
- CI/CD Pipeline Configuration:
- GitHub Actions: Configure workflows in
.github/workflows/
directory to automate testing, building, and deploying models. - Cloud Build: Create a
cloudbuild.yaml
file specifying steps to build, test, and deploy models based on changes in the repository.
- GitHub Actions: Configure workflows in
- Script Functions:
- Pull the Latest Model: Scripts should fetch the latest model version from Vertex AI Model Registry or a specified repository.
- Deploy or Update Model: Automate the deployment of the model to the configured Vertex AI endpoint.
- Monitor and Log: Set up logging for deployment status to ensure visibility and troubleshooting capabilities.
- File:
airflowtrigeer.yaml
- Purpose: Initializes Airflow, starts required services, triggers a DAG (
Group10_DataPipeline_MLOps
), and monitors its execution. - Key Steps:
- Sets up Python and dependencies.
- Initializes and starts Airflow services via Docker Compose.
- Triggers the specified Airflow DAG and monitors its status until completion.
- Stops services and removes containers after execution.
- Triggers: On push to
main
- File:
model.yml
- Purpose: Packages the trainer code, uploads it to GCS, and triggers a Vertex AI Custom Job for model training.
- Key Steps:
- Prepares a Python trainer package (
trainer/task.py
) and packages it. - Uploads the package to a GCS bucket.
- Triggers a Vertex AI Custom Job with specified training arguments (e.g., dataset path, epochs, batch size).
- Notifies upon completion.
- Prepares a Python trainer package (
- File:
deploy_monitor.yaml
- Purpose: Deploys the trained model to Vertex AI, creates an endpoint, and sets up monitoring for model performance and data drift.
- Key Steps:
- Deploys the model to Vertex AI.
- Configures a new or existing endpoint for predictions.
- Enables monitoring for performance and feature/data drift.
- Triggers: On push to
main
- File:
train_deploy.yaml
- Purpose: Trains the ML model and deploys it to Vertex AI for online predictions.
- Key Steps:
- Builds a Docker image of the training script and pushes it to GCP Artifact Registry.
- Triggers model training.
- Deploys the trained model to a new or existing endpoint.
- Triggers: On push to
main
- File: Not explicitly listed (part of repository tests).
- Purpose: Runs automated tests on codebase changes.
- Key Steps:
- Installs project dependencies.
- Executes tests using
pytest
to ensure the codebase is robust and ready for deployment.
- Triggers: On push to
main
- Purpose: Synchronizes local artifacts and Airflow DAGs with a Google Cloud Storage bucket. Workflow to build Docker images, upload artifacts to GCP, and deploy to endpoints.
- Steps:
- Environment setup: Installs the Google Cloud CLI and authenticates with a service account.
- File uploads:
- Uploads specific artifacts and files to predefined GCP bucket locations.
- Synchronizes repository content with the bucket directory structure.
- Verification: Lists uploaded files to confirm the sync.
These YAML workflows automate various aspects of an ML lifecycle:
airflowtrigger.yaml
: Airflow DAG management.train_deploy.yaml
: Trains the ML model and deploys it to Vertex AI for online predictions.model.yml
: Training pipeline and GCS uploads.PyTest.yaml
: Testing and reporting.syncgcp.yaml
: Artifact and DAG synchronization with GCP.deploy_monitor.yaml
: Deploys the trained model, monitoring for model performance and data drift.
- Each workflow is tailored for a specific task in CI/CD for ML operations, leveraging GitHub Actions and Google Cloud services.
- Monitoring:
- Vertex AI provides dashboards to monitor model performance and data drift.
- Alerts are configured to notify stakeholders when anomalies, such as feature attribution drift, are detected.
The provided images highlight the active setup and management of a Vertex AI model monitoring system. Files like anomalies.json
and anomalies.textproto
document identified issues in the input data. The structure also includes folders such as baseline
, logs
, and metrics
, which organize monitoring data effectively for future analysis. A notification email confirming the creation of a model monitoring job for a specific Vertex AI endpoint. This email provides essential details, such as the endpoint name, monitoring job link, and the GCS bucket path where statistics and anomalies will be saved.
- Maintenance:
- Pre-configured thresholds for model performance trigger retraining or redeployment of updated models.
- Logs and alerts from Vertex AI and Cloud Build ensure the system remains reliable and scalable.
This notification is sent when a Vertex AI Model Monitoring job partially fails. For example, if the model encounters a drift detection issue or missing explanation specifications, an email alert is sent.
This email notification is triggered when data drift or anomalies are detected in the deployed model. It provides links to detailed logs and monitoring jobs for debugging.
This email is sent when a model monitoring job partially fails, providing details about the type of failure and suggestions for resolving the issue.
This email is triggered when the Vertex AI monitoring job detects data drift or anomalies in the deployed model. It provides links for further investigation.
The Stock Price Predictor UI Dashboard is a user-friendly interface designed to analyzing stock trends and making informed financial decisions. This dashboard, running locally at http://127.0.0.1:5001, provides a good experience for users to explore stock predictions and insights.
To launch the application:
-
Run the following command:
python app/app.py
-
Access the dashboard at: http://127.0.0.1:5001
This dashboard bridges the gap between raw stock data and actionable investment strategies, making financial decision-making easier and more efficient.
-
Homepage:
- Provides an overview of the tool's capabilities and purpose.
- Highlights the integration of machine learning with financial decision-making.
-
Prediction Page:
- Displays the current stock price and the predicted price.
- Includes options to "Buy" or "Sell" based on the predicted price trend.
- Offers a gamified experience to engage users in testing their predictions.
-
Visualization Dashboard:
- Features dynamic and interactive charts to compare actual vs. predicted stock prices.
- Provides insights into stock trends.
This project is licensed under the MIT License. See the LICENSE file.