Skip to content

Commit

Permalink
Merge pull request #19 from atalyaalon/main
Browse files Browse the repository at this point in the history
add airflow dags for cbs and cache updates
  • Loading branch information
atalyaalon authored Jan 5, 2022
2 parents 2e46041 + 3145b6d commit 5889bd0
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 1 deletion.
22 changes: 22 additions & 0 deletions airflow_server/dags/cbs_from_s3
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from airflow import DAG
from airflow.utils.dates import days_ago

from anyway_etl_airflow.operators.cli_bash_operator import CliBashOperator


dag_kwargs = dict(
default_args={
'owner': 'airflow',
},
schedule_interval=None,
catchup=False,
start_date=days_ago(2),
)


with DAG('cbs-import-from-s3', **dag_kwargs) as cbs_import_from_s3:
CliBashOperator(
'anyway-etl anyway-kubectl-exec python3 main.py process cbs --source s3'
'{% if dag_run.conf.get("load_start_year") %} --load-start-year {{ dag_run.conf["load_start_year"] }}{% endif %}',
task_id='cbs-import-from-s3'
)
31 changes: 31 additions & 0 deletions airflow_server/dags/import_email_to_s3_and_update_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from airflow import DAG
from airflow.utils.dates import days_ago

from anyway_etl_airflow.operators.cli_bash_operator import CliBashOperator


dag_kwargs = dict(
default_args={
'owner': 'airflow',
},
schedule_interval='@weekly',
catchup=False,
start_date=days_ago(2),
)


with DAG('import_email_to_s3_and_update_data', **dag_kwargs) as fill_infographics_cache_dag_for_streets:
CliBashOperator(
'anyway-etl anyway-kubectl-exec python3 main.py scripts importemail',
task_id='import-email-to-s3'
) >> CliBashOperator(
'anyway-etl anyway-kubectl-exec python3 main.py process cbs --source s3'
'{% if dag_run.conf.get("load_start_year") %} --load-start-year {{ dag_run.conf["load_start_year"] }}{% endif %}',
task_id='cbs-import-from-s3'
) >> CliBashOperator(
'anyway-etl anyway-kubectl-exec python3 main.py process infographics-data-cache-for-road-segments',
task_id='fill-infographics-cache-for-road-segments'
) >> CliBashOperator(
'anyway-etl anyway-kubectl-exec python3 main.py process cache update-street',
task_id='fill-infographics-cache-for-streets'
)
2 changes: 1 addition & 1 deletion airflow_server/dags/infographics_road_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
default_args={
'owner': 'airflow',
},
schedule_interval=None,
schedule_interval='@weekly',
catchup=False,
start_date=days_ago(2),
)
Expand Down
21 changes: 21 additions & 0 deletions airflow_server/dags/infographics_streets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from airflow import DAG
from airflow.utils.dates import days_ago

from anyway_etl_airflow.operators.cli_bash_operator import CliBashOperator


dag_kwargs = dict(
default_args={
'owner': 'airflow',
},
schedule_interval='@weekly',
catchup=False,
start_date=days_ago(2),
)


with DAG('fill-infographics-cache-for-streets', **dag_kwargs) as fill_infographics_cache_dag_for_streets:
CliBashOperator(
'anyway-etl anyway-kubectl-exec python3 main.py process cache update-street',
task_id='fill-infographics-cache-for-streets'
)

0 comments on commit 5889bd0

Please sign in to comment.