Skip to content

Commit

Permalink
Update index fields with ParticipantIdentifier (#110)
Browse files Browse the repository at this point in the history
And a small update to FitbitEcg table schema that should have
been included when we originally added this data type.
  • Loading branch information
philerooski authored Apr 11, 2024
1 parent 8a81bec commit f312f6a
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 39 deletions.
80 changes: 41 additions & 39 deletions src/glue/jobs/json_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,42 +37,42 @@
"enrolledparticipants": ["ParticipantIdentifier"],
"fitbitprofiles": ["ParticipantIdentifier", "ModifiedDate"],
"fitbitdevices": ["ParticipantIdentifier", "Date"],
"fitbitactivitylogs": ["LogId"],
"fitbitactivitylogs": ["ParticipantIdentifier", "LogId"],
"fitbitdailydata": ["ParticipantIdentifier", "Date"],
"fitbitecg": ["FitbitEcgKey"],
"fitbitecg": ["ParticipantIdentifier", "FitbitEcgKey"],
"fitbitintradaycombined": ["ParticipantIdentifier", "Type", "DateTime"],
"fitbitrestingheartrates": ["ParticipantIdentifier", "Date"],
"fitbitsleeplogs": ["LogId"],
"healthkitv2characteristics": ["HealthKitCharacteristicKey"],
"healthkitv2samples": ["HealthKitSampleKey"],
"healthkitv2heartbeat": ["HealthKitHeartbeatSampleKey"],
"healthkitv2statistics": ["HealthKitStatisticKey"],
"healthkitv2clinicalrecords": ["HealthKitClinicalRecordKey"],
"healthkitv2electrocardiogram": ["HealthKitECGSampleKey"],
"healthkitv2workouts": ["HealthKitWorkoutKey"],
"healthkitv2activitysummaries": ["HealthKitActivitySummaryKey"],
"garminactivitydetailssummary": ["ParticipantID", "SummaryId"],
"garminactivitysummary": ["ParticipantID", "SummaryId"],
"garminbloodpressuresummary": ["ParticipantID", "SummaryId"],
"garmindailysummary": ["ParticipantID", "StartTimeInSeconds"],
"garminepochsummary": ["ParticipantID", "SummaryId"],
"garminhealthsnapshotsummary": ["ParticipantID", "StartTimeInSeconds"],
"garminhrvsummary": ["ParticipantID", "StartTimeInSeconds"],
"garminmanuallyupdatedactivitysummary": ["ParticipantID", "SummaryId"],
"garminmoveiqactivitysummary": ["ParticipantID", "SummaryId"],
"garminpulseoxsummary": ["ParticipantID", "SummaryId"],
"garminrespirationsummary": ["ParticipantID", "SummaryId"],
"fitbitsleeplogs": ["ParticipantIdentifier", "LogId"],
"healthkitv2characteristics": ["ParticipantIdentifier", "HealthKitCharacteristicKey"],
"healthkitv2samples": ["ParticipantIdentifier", "HealthKitSampleKey"],
"healthkitv2heartbeat": ["ParticipantIdentifier", "HealthKitHeartbeatSampleKey"],
"healthkitv2statistics": ["ParticipantIdentifier", "HealthKitStatisticKey"],
"healthkitv2clinicalrecords": ["ParticipantIdentifier", "HealthKitClinicalRecordKey"],
"healthkitv2electrocardiogram": ["ParticipantIdentifier", "HealthKitECGSampleKey"],
"healthkitv2workouts": ["ParticipantIdentifier", "HealthKitWorkoutKey"],
"healthkitv2activitysummaries": ["ParticipantIdentifier", "HealthKitActivitySummaryKey"],
"garminactivitydetailssummary": ["ParticipantIdentifier", "SummaryId"],
"garminactivitysummary": ["ParticipantIdentifier", "SummaryId"],
"garminbloodpressuresummary": ["ParticipantIdentifier", "SummaryId"],
"garmindailysummary": ["ParticipantIdentifier", "StartTimeInSeconds"],
"garminepochsummary": ["ParticipantIdentifier", "SummaryId"],
"garminhealthsnapshotsummary": ["ParticipantIdentifier", "StartTimeInSeconds"],
"garminhrvsummary": ["ParticipantIdentifier", "StartTimeInSeconds"],
"garminmanuallyupdatedactivitysummary": ["ParticipantIdentifier", "SummaryId"],
"garminmoveiqactivitysummary": ["ParticipantIdentifier", "SummaryId"],
"garminpulseoxsummary": ["ParticipantIdentifier", "SummaryId"],
"garminrespirationsummary": ["ParticipantIdentifier", "SummaryId"],
"garminsleepsummary": [
"ParticipantID",
"ParticipantIdentifier",
"StartTimeInSeconds",
"DurationInSeconds",
"Validation",
],
"garminstressdetailsummary": ["ParticipantID", "StartTimeInSeconds"],
"garminthirdpartydailysummary": ["ParticipantID", "StartTimeInSeconds"],
"garminusermetricssummary": ["ParticipantID", "CalenderDate"],
"googlefitsamples": ["GoogleFitSampleKey"],
"symptomlog": ["DataPointKey"],
"garminstressdetailsummary": ["ParticipantIdentifier", "StartTimeInSeconds"],
"garminthirdpartydailysummary": ["ParticipantIdentifier", "StartTimeInSeconds"],
"garminusermetricssummary": ["ParticipantIdentifier", "CalenderDate"],
"googlefitsamples": ["ParticipantIdentifier", "GoogleFitSampleKey"],
"symptomlog": ["ParticipantIdentifier", "DataPointKey"],
}


Expand Down Expand Up @@ -533,11 +533,14 @@ def add_index_to_table(
"""Add partition and index fields to a DynamicFrame.
A DynamicFrame containing the top-level fields already includes the index
fields, but DynamicFrame's which were flattened as a result of the
DynamicFrame.relationalize operation need to inherit the index and partition
fields from their parent. In order for this function to execute successfully,
the table's parent must already have the index fields and be included in
`processed_tables`.
fields -- `ParticipantIdentifier` and a primary key which is particular to
each data type (see global var `INDEX_FIELD_MAP`) -- but DynamicFrame's which
were flattened as a result of the DynamicFrame.relationalize operation need
to inherit the index and partition fields from their parent. We also propagate
the `ParticipantID` field where present, although this field is not included
in the SymptomLog data type. In order for this function to execute
successfully, the table's parent must already have the index fields and be
included in `processed_tables`.
In addition to adding the index fields, this function formats the names
of the (non-index) fields which were manipulated by the call to
Expand Down Expand Up @@ -582,12 +585,11 @@ def add_index_to_table(
else:
selectable_original_field_name = original_field_name
logger.info(f"Adding index to {original_field_name}")
parent_index = parent_table.select(
(
[selectable_original_field_name, "cohort"]
+ INDEX_FIELD_MAP[table_data_type]
)
).distinct()
index_fields = INDEX_FIELD_MAP[table_data_type]
additional_fields = [selectable_original_field_name, "cohort"]
if "ParticipantID" in parent_table.columns:
additional_fields.append("ParticipantID")
parent_index = parent_table.select(index_fields + additional_fields).distinct()
this_index = parent_index.withColumnRenamed(original_field_name, "id")
df_with_index = this_table.join(this_index, on="id", how="inner")
# remove prefix from field names
Expand Down
4 changes: 4 additions & 0 deletions src/glue/resources/table_columns.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ tables:
Type: string
- Name: FirmwareVersion
Type: string
- Name: export_start_date
Type: string
- Name: export_end_date
Type: string
partition_keys:
- Name: cohort
Type: string
Expand Down

0 comments on commit f312f6a

Please sign in to comment.