Skip to content

Commit

Permalink
Merge pull request #71 from Sage-Bionetworks/etl-521
Browse files Browse the repository at this point in the history
[ETL-521] Run JSON to Parquet jobs in groups
  • Loading branch information
philerooski authored Aug 21, 2023
2 parents ad5d307 + b20964f commit fdf30bd
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 6 deletions.
2 changes: 1 addition & 1 deletion templates/glue-job-JSONToParquet.j2
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Parameters:
TimeoutInMinutes:
Type: Number
Description: The job timeout in minutes (integer).
Default: 120
Default: 720

TempS3Bucket:
Type: String
Expand Down
108 changes: 103 additions & 5 deletions templates/glue-workflow.j2
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
AWSTemplateFormatVersion: '2010-09-09'

Description: The primary workflow for processing RECOVER data
Description: >-
The primary workflow for processing RECOVER data. An outline of the workflow is below:

S3 to JSON ->
(JSON to Parquet) EnrolledParticipants and SymptomLog ->
(JSON to Parquet) HealthKit ->
(JSON to Parquet) Fitbit ->
(JSON to Parquet) Google ->
(JSON to Parquet) Garmin ->
CompareParquetJob

Parameters:

Expand Down Expand Up @@ -53,7 +62,8 @@ Resources:
{% set datasets = [] %}
{% for v in sceptre_user_data.dataset_schemas.tables.keys() if not "Deleted" in v %}
{% set dataset = {} %}
{% do dataset.update({'table_name': 'dataset_' + v.lower()})%}
{% do dataset.update({'data_type': v}) %}
{% do dataset.update({'table_name': 'dataset_' + v.lower()}) %}
{% do dataset.update({'stackname_prefix': '{}'.format(v.replace('_',''))}) %}
{% do datasets.append(dataset) %}
{% endfor %}
Expand Down Expand Up @@ -88,11 +98,11 @@ Resources:
Properties:
Name: !Sub "${Namespace}-S3ToJsonCompleteTrigger"
Actions:
{% for dataset in datasets %}
{% for dataset in datasets if not "HealthKit" in dataset["data_type"] and not "Fitbit" in dataset["data_type"] and not "Google" in dataset["data_type"] and not "Garmin" in dataset["data_type"] %}
- JobName: !Sub ${Namespace}-{{ dataset["stackname_prefix"]}}-Job
Arguments: {"--glue-table": {{ "{}".format(dataset["table_name"]) }} }
{% endfor %}
Description: This trigger runs after completion of the S3 to JSON job.
Description: This trigger kicks off every JSON to Parquet job which is not associated with a device and runs after completion of the S3 to JSON job.
Type: CONDITIONAL
Predicate:
Conditions:
Expand All @@ -102,6 +112,94 @@ Resources:
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow

HealthKitTrigger:
Type: AWS::Glue::Trigger
Properties:
Name: !Sub "${Namespace}-HealthKitTrigger"
Actions:
{% for dataset in datasets if "HealthKit" in dataset["data_type"] %}
- JobName: !Sub ${Namespace}-{{ dataset["stackname_prefix"]}}-Job
Arguments: {"--glue-table": {{ "{}".format(dataset["table_name"]) }} }
{% endfor %}
Description: This trigger kicks off every JSON to Parquet job which is associated with a HealthKit data type.
Type: CONDITIONAL
Predicate:
Conditions:
{% for dataset in datasets if not "HealthKit" in dataset["data_type"] and not "Fitbit" in dataset["data_type"] and not "Google" in dataset["data_type"] and not "Garmin" in dataset["data_type"] %}
- JobName: !Sub ${Namespace}-{{ dataset["stackname_prefix"]}}-Job
State: SUCCEEDED
LogicalOperator: EQUALS
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow

FitbitTrigger:
Type: AWS::Glue::Trigger
Properties:
Name: !Sub "${Namespace}-FitbitTrigger"
Actions:
{% for dataset in datasets if "Fitbit" in dataset["data_type"] %}
- JobName: !Sub ${Namespace}-{{ dataset["stackname_prefix"]}}-Job
Arguments: {"--glue-table": {{ "{}".format(dataset["table_name"]) }} }
{% endfor %}
Description: This trigger kicks off every JSON to Parquet job which is associated with a Fitbit data type.
Type: CONDITIONAL
Predicate:
Conditions:
{% for dataset in datasets if "HealthKit" in dataset["data_type"] %}
- JobName: !Sub ${Namespace}-{{ dataset["stackname_prefix"]}}-Job
State: SUCCEEDED
LogicalOperator: EQUALS
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow

GoogleTrigger:
Type: AWS::Glue::Trigger
Properties:
Name: !Sub "${Namespace}-GoogleTrigger"
Actions:
{% for dataset in datasets if "Google" in dataset["data_type"] %}
- JobName: !Sub ${Namespace}-{{ dataset["stackname_prefix"]}}-Job
Arguments: {"--glue-table": {{ "{}".format(dataset["table_name"]) }} }
{% endfor %}
Description: This trigger kicks off every JSON to Parquet job which is associated with a Google data type.
Type: CONDITIONAL
Predicate:
Conditions:
{% for dataset in datasets if "Fitbit" in dataset["data_type"] %}
- JobName: !Sub ${Namespace}-{{ dataset["stackname_prefix"]}}-Job
State: SUCCEEDED
LogicalOperator: EQUALS
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow

GarminTrigger:
Type: AWS::Glue::Trigger
Properties:
Name: !Sub "${Namespace}-GarminTrigger"
Actions:
{% for dataset in datasets if "Garmin" in dataset["data_type"] %}
- JobName: !Sub ${Namespace}-{{ dataset["stackname_prefix"]}}-Job
Arguments: {"--glue-table": {{ "{}".format(dataset["table_name"]) }} }
{% endfor %}
Description: This trigger kicks off every JSON to Parquet job which is associated with a Garmin data type.
Type: CONDITIONAL
Predicate:
Conditions:
{% for dataset in datasets if "Google" in dataset["data_type"] %}
- JobName: !Sub ${Namespace}-{{ dataset["stackname_prefix"]}}-Job
State: SUCCEEDED
LogicalOperator: EQUALS
{% endfor %}
Logical: AND
StartOnCreation: true
WorkflowName: !Ref PrimaryWorkflow

JsontoParquetCompleteTrigger:
Type: AWS::Glue::Trigger
Condition: IsStagingNamespace
Expand All @@ -121,7 +219,7 @@ Resources:
Type: CONDITIONAL
Predicate:
Conditions:
{% for dataset in datasets %}
{% for dataset in datasets if "Garmin" in dataset["data_type"] %}
- JobName: !Sub "${Namespace}-{{ dataset["stackname_prefix"] }}-Job"
State: SUCCEEDED
LogicalOperator: EQUALS
Expand Down

0 comments on commit fdf30bd

Please sign in to comment.