Skip to content

Collect highway traffic data from various streaming/non-streaming sources and load into PostgreSQL & MySQL databases for analysis.

Notifications You must be signed in to change notification settings

BJTangerine/Airflow-Kafka-ETL-Road-Data-Project

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

12 Commits
 
 
 
 
 
 

Repository files navigation

Apache Airflow/Kafka ETL on Static & Streaming Road Traffic Data

Technologies: Python (Apache Airflow/Kafka, pandas, numpy, BeautifulSoup, requests, sqlalchemy) , Bash/shell, PostgreSQL, MySQL.

Airflow ETL

Collect road traffic data from various sources in different formats and combine into one denormalized dataset, in order to upload to an RDBMS for analysis. Currency exchange rates are pulled from an API using requests and BeautifulSoup.

Project Files:

  • dag.py - Apache Airflow DAG definitions and pipeline. Utilizes a BashOperator pipeline as all tasks are executed/started using Bash commands.
  • submit-dag-bash.sh - Shell script to submit the DAG to Apache Airflow.
  • transform.py - Python script which executes the transformations on the consolidated CSV file.
  • TransformationsEngine.py - Python script containing the engine used to perform the transformations called in the transform.py script. Here the API called is made if a different currency is specified, to pull the new currency's exchange rate and apply to the payments column.
  • load.py - Python script that loads the CSV into PostgreSQL.
  • LoadPostgresqlEngine.py - Python script containing the engine for performing the load methods called in the load.py script.
  • road-traffic-project-log.csv - Logs written to this CSV by SimpleLogEngine.py.
  • SimpleLogEngine.py - Simple user-defined logging function.
  • sql_create_table.sql - SQL to create the PostgreSQL table.
  • sql_view.sql - SQL to create view of month and day_of_week pairs with total payments above average.
  • sample data files - folder containing some of the data sources and files used/created during this ETL process.

Kafka ETL

ETL KafkaProducer streaming data into a MySQL database using KafkaConsumer.

Used Python to simulate streaming highway data in a KafkaProducer, and using KafkaConsumer & MySQL to ETL each message into MySQL.

Project files:

  • setup-bash.sh - Bash commands for setting up project with Apache Kafka, install Python modules, and MySQL db creation.
  • kafka-zookeeper.sh - Starting Kafka Zookeeper.
  • kafka-broker.sh - Starting Kafka Broker service.
  • execution-bash.sh - Creates Kafka topic in Bash and executes Python KafkaProducer/KafkaConsumer scripts.
  • kafka-producer.py - Python script containing streaming data simulator with KafkaProducer.
  • kafka-consumer.py - Python script for ETL-ing each message from KafkaProducer into a MySQL db using KafkaConsumer.
  • SimpleLogEngine.py - For logging.

Images

Airflow, transformations engine

transformer code image preview3

Airflow, PostgreSQL Table ERD

Create Table ERD image

Airflow, SQL View of Total payment Amount by Month/Day pairs if higher than average

SQL View Total Payment Amount by Month and Day if Higher Than Average Total Payment Amount by Month and Day

Kafka consumer

kafka-consumer code snippet image

About

Collect highway traffic data from various streaming/non-streaming sources and load into PostgreSQL & MySQL databases for analysis.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published