Technologies: Python (Apache Airflow/Kafka, pandas, numpy, BeautifulSoup, requests, sqlalchemy) , Bash/shell, PostgreSQL, MySQL.
Collect road traffic data from various sources in different formats and combine into one denormalized dataset, in order to upload to an RDBMS for analysis. Currency exchange rates are pulled from an API using requests and BeautifulSoup.
Project Files:
- dag.py - Apache Airflow DAG definitions and pipeline. Utilizes a BashOperator pipeline as all tasks are executed/started using Bash commands.
- submit-dag-bash.sh - Shell script to submit the DAG to Apache Airflow.
- transform.py - Python script which executes the transformations on the consolidated CSV file.
- TransformationsEngine.py - Python script containing the engine used to perform the transformations called in the transform.py script. Here the API called is made if a different currency is specified, to pull the new currency's exchange rate and apply to the payments column.
- load.py - Python script that loads the CSV into PostgreSQL.
- LoadPostgresqlEngine.py - Python script containing the engine for performing the load methods called in the load.py script.
- road-traffic-project-log.csv - Logs written to this CSV by SimpleLogEngine.py.
- SimpleLogEngine.py - Simple user-defined logging function.
- sql_create_table.sql - SQL to create the PostgreSQL table.
- sql_view.sql - SQL to create view of month and day_of_week pairs with total payments above average.
- sample data files - folder containing some of the data sources and files used/created during this ETL process.
ETL KafkaProducer streaming data into a MySQL database using KafkaConsumer.
Used Python to simulate streaming highway data in a KafkaProducer, and using KafkaConsumer & MySQL to ETL each message into MySQL.
Project files:
- setup-bash.sh - Bash commands for setting up project with Apache Kafka, install Python modules, and MySQL db creation.
- kafka-zookeeper.sh - Starting Kafka Zookeeper.
- kafka-broker.sh - Starting Kafka Broker service.
- execution-bash.sh - Creates Kafka topic in Bash and executes Python KafkaProducer/KafkaConsumer scripts.
- kafka-producer.py - Python script containing streaming data simulator with KafkaProducer.
- kafka-consumer.py - Python script for ETL-ing each message from KafkaProducer into a MySQL db using KafkaConsumer.
- SimpleLogEngine.py - For logging.
Airflow, transformations engine
Airflow, PostgreSQL Table ERD
Airflow, SQL View of Total payment Amount by Month/Day pairs if higher than average
Kafka consumer