-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathzillow_analytics.py
90 lines (71 loc) · 3.18 KB
/
zillow_analytics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from airflow import DAG
from datetime import timedelta, datetime
from airflow.operators.python import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
import json
import requests
with open('/home/ubuntu/airflow/config_api.json', 'r') as config_file:
api_host_key = json.load(config_file)
now = datetime.now()
dt_now_string = now.strftime("%d%m%Y%H%M%S")
s3_bucket = 'cleaned-data-zone-csv-bucket'
def extract_zillow_data(**kwargs):
url = kwargs['url']
headers = kwargs['headers']
querystring = kwargs['querystring']
dt_string = kwargs['date_string']
# return headers
response = requests.get(url, headers=headers, params=querystring)
response_data = response.json()
# Specify the output file path
output_file_path = f"/home/ubuntu/response_data_{dt_string}.json"
file_str = f'response_data_{dt_string}.csv'
with open(output_file_path, "w") as output_file:
json.dump(response_data, output_file, indent=4) # indent for pretty formatting
output_list = [output_file_path, file_str]
return output_list
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 5, 1),
'email': ['bhavanachitraghar80097@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(seconds=15)
}
with DAG('zillow_analytics_dag',
default_args=default_args,
schedule_interval = '@daily',
catchup=False) as dag:
extract_zillow_data_var = PythonOperator(
task_id= 'tsk_extract_zillow_data',
python_callable=extract_zillow_data,
op_kwargs={'url': 'https://zillow56.p.rapidapi.com/search', 'querystring': {"location":"houston, tx"}, 'headers': api_host_key, 'date_string':dt_now_string}
)
load_to_s3 = BashOperator(
task_id = 'tsk_load_to_s3',
bash_command = 'aws s3 mv {{ ti.xcom_pull("tsk_extract_zillow_data")[0]}} s3://zillow_raw_data/',
)
is_file_in_s3_available = S3KeySensor(
task_id='tsk_is_file_in_s3_available',
bucket_key='{{ti.xcom_pull("tsk_extract_zillow_data")[1]}}',
bucket_name=s3_bucket,
aws_conn_id='aws_s3_conn',
wildcard_match=False, # Set this to True if you want to use wildcards in the prefix
timeout=60, # Optional: Timeout for the sensor (in seconds)
poke_interval=5, # Optional: Time interval between S3 checks (in seconds)
)
transfer_s3_to_redshift = S3ToRedshiftOperator(
task_id="tsk_transfer_s3_to_redshift",
aws_conn_id='aws_s3_conn',
redshift_conn_id='conn_id_redshift',
s3_bucket=s3_bucket,
s3_key='{{ti.xcom_pull("tsk_extract_zillow_data")[1]}}',
schema="PUBLIC",
table="zillowdata",
copy_options=["csv IGNOREHEADER 1"],
)
extract_zillow_data_var >> load_to_s3 >> is_file_in_s3_available >> transfer_s3_to_redshift