This is an end-to-end AWS Cloud ETL project. This data pipeline orchestration uses Apache Airflow on AWS EC2 as well as Snowpipe.
It demonstrates how to build ETL data pipeline that would perform data transformation using Python on Apache Airflow as well as automatic data ingestion into a Snowflake data warehouse via Snowpipe.
The data would then be visualized using Microsoft Power BI.
This is a Redfin Real Estate Data Analytics Python ETL data engineering project using Apache Airflow, Snowpipe, Snowflake and AWS services. This project shows how to connect to the Redfin data source to extract real estate data using Python and dump the raw data into an Amazon S3 bucket, then transform the raw data using Pandas and load it into another Amazon S3 bucket.
As soon as the transformed data lands inside the second S3 bucket, Snowpipe would be triggered this would automatically run a COPY
command to load the transformed data into a Snowflake data warehouse table. Power BI would be connected to the snowflake data warehouse to then visualize the data to obtain insights.
Apache airflow would be used to orchestrate and automate this process.
Apache Airflow is an open-source platform used for orchestrating and scheduling workflows of tasks and data pipelines. Apache Airflow would be installed on an EC2 instance to orchestrate the pipeline.
The project was inspired by Dr Yemi Olanipekun, whose tutorials benefitted me a lot.
-
Fundamental knowledge of SQL, Python, CSV/JSON, AWS Cloud, Apache Airflow, DW & ETL concepts
-
Familiarity with fundamentals Snowflake DW and S3-Snowpipe data auto-ingestion concepts like SQS Queue
-
Redfin real estate City houses dataset (to serve as the source of the real estate data)
-
AWS EC2 instance with at least 20 GB storage (t3.xlarge) Ubuntu; and AWS S3 bucket as Data Lake
-
Code Editor (I used VSCode) for connecting to EC2 instance to create code (DAG file) on Airflow
-
Apache Airflow for orchestration (authoring of the ETL workflow via DAG) & services connections
-
Good knowledge and skills in data analytics and visualization on Microsoft Power BI tool
-
Perseverance to troubleshoot and resolve errors and failures, read logs, rebuild components, etc
The following account of the project development process may not be enough to enable the reader code along or replicate the whole process from start to finish. For instance, there is no detailing of the steps involved in creating account with Amazon AWS. There is also no detailing of the steps in creating an IAM User whose credentials would be used by Airflow, setting up the S3 buckets, deploying the Snowpipe along with SQS Queue configuration on the S3 bucket, setting up Snowflake data warehouse and connecting Power BI to it, spinning up the AWS EC2 instance from scratch and preparing it to work with Airflow (Firewall settings for HTTP/HTTPS/SSH and attaching the IAM Role), connecting VSCode to the EC2 instance, as well as accessing Airflow via web browser.
However a person who is knowledgeable in Data Engineering skills should be familiar with how these are set up. By the way the reader who is not knowledgeable in these areas is hereby encouraged to do their own research, enroll in a Data Engineering bootcamp or learn from data engineering tutorials available online on some websites or some great channels on YouTube (such as Tuple Spectra), or reach out to me for clarification. With that out of the way, let’s go over the key steps in this project.
Having satisfied all the 8 requirements in the preceding section, I proceeded to carry out the following setup:
The source data is a real estate data obtained from https://www.redfin.com/news/data-center/.
The particular dataset of interest is the US Housing Market data by City
, obtained from the following link:
The data is in a compressed format (GZ).
The first part of the development process was done in an online Python Notebook environment (Google Colab, colab.google) in order to examine the source data to see the fields and records. This would also serve as a means to code out the data transform function and other preliminary code.
The source dataset originally comprises of 5779946 records and 58 fields.
The data transformation included the following activities:
First, the following 24 columns were selected to be of interest:
period_begin,
period_end,
period_duration,
region_type,
region_type_id,
table_id,
is_seasonally_adjusted,
city,
state,
state_code,
property_type,
property_type_id,
median_sale_price,
median_list_price,
median_ppsf,
median_list_ppsf,
homes_sold,
inventory,
months_of_supply,
median_dom,
avg_sale_to_list,
sold_above_list,
parent_metro_region_metro_code,
last_updated
Next, as part of data cleaning, these actions were taken on the resulting dataset: • Drop all records that have null values.
• Typecast the period_begin
and period_end
to DATETIME datatype.
• Extract year data from period_begin_in_years
and period_end_in_years
into 2 new columns.
• Extract month data from period_begin_in_months
and period_end_in_months
into 2 new columns.
• Map the month number to their respective month name.
The transformed dataset was the reported to be 4500063 records and 28 fields.
See the Proof-of-Concept (PoC) Python Notebook file here.
-
Landing zone bucket:
redfin-analytics-landing-zone-raw-data
-
Transformed data bucket:
redfin-analytics-transformed-data-bucket
Based on the 28 selected columns for the transformed dataset, the destination table schema was designed using the appropriate data type. The data warehouse, destination table, staging area, and Snowpipe were then created using a prepared SQL script.
See the SQL file here.
Verified the table schema:
DESC TABLE redfin_analytics_database.redfin_analytics_schema.redfin_analytics_table;
Checked the properties of the pipe created:
DESC PIPE redfin_analytics_database.snowpipe_schema.redfin_analytics_snowpipe;
Took note of the Notification Channel ARN
string.
First tested the external stage created on Snowflake by uploading a test CSV file to the redfin-analytics-transformed-data-bucket S3 bucket
.
See the test file here.
Then this was tested by running the following command on the Snowflake SQL Workspace:
list @redfin_analytics_database.external_stage_schema.redfin_dw_ext_stage;
Next, configured the S3 bucket to make it trigger Snowpipe. This was done by creating an Event Notification on the S3 bucket via the bucket Properties
settings:
-
Event name:
redfin-analytics-snowpipe-event
-
Object creation:
All object create events
-
Destination:
SQS Queue
-
Specfy SQS Queue (Enter SQS Queue ARN): <<Entered the
Notification Channel ARN
of Snowpipe>>
Checked the destination table in Snowflake to see if Snowpipe is working:
SELECT *
FROM redfin_analytics_database.redfin_analytics_schema.redfin_analytics_table
LIMIT 10;
SELECT COUNT(*) AS Number_Of_Records
FROM redfin_analytics_database.redfin_analytics_schema.redfin_analytics_table;
In preparation for the rest of the project, the S3 bucket was emptied and the Snowflake table re-created:
-
Instance name:
redfin-analytics-etl-pipeline-computer
-
Instance type: t3.xlarge, 20 GiB storage (instead of the default 8 GiB)
-
EC2 role name:
ec2-access-to-s3-role
-
Details: AWS Service, Use case is EC2
-
Permissions:
AmazonS3FullAccess
-
These required dependencies were then installed on EC2 for this project:
sudo apt update
sudo apt install python3-pip
python3 –version
sudo apt install python3.12-venv
sudo apt install -y libxml2-dev libxmlsec1-dev libxmlsec1-openssl pkg-config
python3.12 -m venv redfin_analytics_etl_venv
source redfin_analytics_etl_venv/bin/activate
pip install pandas
pip install apache-airflow
pip install apache-airflow-providers-amazon
airflow standalone
The DAG was written to orchestrate the workflow once every month.
See the finished DAG file here.
This orchestration made use of a necessary AWS Airflow connection which was added via the Airflow GUI:
-
Connection ID:
aws_new_conn
-
Connection Type:
Amazon Web Services
-
AWS Access Key: <<
THE IAM USER ACCESS KEY
>> -
AWS Secret Access Key: <<
THE IAM USER SECRET ACCESS KEY
>> -
Extra:
{ "region_name": "af-south-1" }
The DAG tasks ready to be triggered:
The state of the S3 buckets and the Snowflake table before triggering the DAG.
The first task was a success and it took about 10 minutes to complete.
The second task was also a success and it took a little more than 4 minutes to complete.
The buckets were then observed to contain data:
The Snowflake data warehouse table was also seen to contain the 4.5 million records data.
This confirms that the entire orchestration was indeed successful.
-
Server:
xxxxxxx-xxxxxxx.snowflakecomputing.com
-
Warehouse:
redfin_analytics_warehouse
-
Username: <<
MY SNOWFLAKE ACCOUNT USERNAME
>> -
Password: <<
MY SNOWFLAKE ACCOUNT PASSWORD
>>
Loaded the data via DirectQuery
.
I did not encounter any real challenges in this project. The orchestration was quite straightforward and building it was really enjoyable.
I am thankful to Dr. Opeyemi ‘Yemi’ Olanipekun for inspiring me to carry out this project. His passionate way of teaching and guidance is second to none.
https://docs.snowflake.com/en/user-guide/data-load-snowpipe-intro
https://docs.snowflake.com/en/sql-reference/sql/create-file-format
https://docs.snowflake.com/en/sql-reference/sql/create-stage
https://docs.snowflake.com/en/sql-reference/sql/copy-into-table
https://docs.snowflake.com/en/sql-reference/sql/create-pipe
https://docs.snowflake.com/en/sql-reference/sql/desc-table
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/emr/emr.html
https://registry.astronomer.io/providers/amazon/versions/latest/modules/emrjobflowsensor
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html
https://registry.astronomer.io/providers/amazon/versions/latest/modules/emraddstepsoperator
Cover Image credited to Tuple Spectra channel on Youtube.