Skip to content

Commit

Permalink
initial commit for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
rxu17 committed Sep 4, 2024
1 parent fbd83a6 commit ac378fd
Show file tree
Hide file tree
Showing 5 changed files with 378 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
template:
path: glue-job-run-great-expectations.j2
dependencies:
- develop/glue-job-role.yaml
stack_name: "{{ stack_group_config.namespace }}-glue-job-RunGreatExpectations"
parameters:
Namespace: {{ stack_group_config.namespace }}
JobDescription: Runs great expectations on a set of data
JobRole: !stack_output_external glue-job-role::RoleArn
TempS3Bucket: {{ stack_group_config.processed_data_bucket_name }}
S3ScriptBucket: {{ stack_group_config.template_bucket_name }}
S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py'
ExpectationSuiteKey: '{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json'
GlueVersion: "{{ stack_group_config.json_to_parquet_glue_version }}"
AdditionalPythonModules: great_expectations~=0.17
stack_tags:
{{ stack_group_config.default_stack_tags }}
sceptre_user_data:
dataset_schemas: !file src/glue/resources/table_columns.yaml
232 changes: 232 additions & 0 deletions src/glue/jobs/run_great_expectations_on_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
import logging
import sys

import great_expectations as gx
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.core.yaml_handler import YAMLHandler
from pyspark.context import SparkContext

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(levelname)s:%(name)s:%(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)


def read_args() -> dict:
"""Returns the specific params that our code needs to run"""
args = getResolvedOptions(
sys.argv,
[
"data-type",
"namespace",
"parquet-bucket",
"expectation-suite-path" "report-prefix",
],
)
for arg in args:
validate_args(args[arg])
return args


def validate_args(value: str) -> None:
"""Checks to make sure none of the input command line arguments are empty strings
Args:
value (str): the value of the command line argument parsed by argparse
Raises:
ValueError: when value is an empty string
"""
if value == "":
raise ValueError("Argument value cannot be an empty string")
else:
return None


def create_context(s3_bucket: str, namespace: str, report_prefix: str):
context = gx.get_context()
context.add_expectation_suite(expectation_suite_name="my_expectation_suite")
yaml = YAMLHandler()
context.add_datasource(
**yaml.load(
"""
name: my_spark_datasource
class_name: Datasource
execution_engine:
class_name: SparkDFExecutionEngine
force_reuse_spark_context: true
data_connectors:
my_runtime_data_connector:
class_name: RuntimeDataConnector
batch_identifiers:
- my_batch_identifier
"""
)
)
# Programmatically configure the validation result store and
# DataDocs to use S3
context.add_store(
"validation_result_store",
{
"class_name": "ValidationsStore",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": s3_bucket,
"prefix": f"{namespace}/{report_prefix}",
},
},
)

# Add DataDocs site configuration to output results to S3
context.add_site_builder(
site_name="s3_site",
site_config={
"class_name": "SiteBuilder",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": s3_bucket,
"prefix": f"{namespace}/{report_prefix}",
},
"site_index_builder": {
"class_name": "DefaultSiteIndexBuilder",
},
},
)

return context


def get_spark_df(glue_context, parquet_bucket, namespace, datatype):
s3_parquet_path = f"s3://{parquet_bucket}/{namespace}/parquet/dataset_{datatype}/"
dynamic_frame = glue_context.create_dynamic_frame_from_options(
connection_type="s3",
connection_options={"paths": [s3_parquet_path]},
format="parquet",
)
spark_df = dynamic_frame.toDF()
return spark_df


def get_batch_request(spark_dataset):
batch_request = RuntimeBatchRequest(
datasource_name="my_spark_datasource",
data_connector_name="my_runtime_data_connector",
data_asset_name="my-parquet-data-asset",
runtime_parameters={"batch_data": spark_dataset},
batch_identifiers={"my_batch_identifier": "okaybatchidentifier"},
)
return batch_request


def add_expectations(data_context, expectation_suite_name):
existing_expectations = data_context.get_expectation_suite(
expectation_suite_name
).expectations
non_null_expectation = ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "ParticipantIdentifier"},
)
data_context.add_or_update_expectation_suite(
expectation_suite_name=expectation_suite_name,
expectations=[non_null_expectation, *existing_expectations],
)


import json


def add_expectations_from_json(data_context, expectations_path: str, data_type: str):
# Load the JSON file with the expectations
with open(expectations_path, "r") as file:
expectations_data = json.load(file)

# Ensure the dataset exists in the JSON file
if data_type not in expectations_data:
raise ValueError(f"Dataset '{data_type}' not found in the JSON file.")

# Extract the expectation suite and expectations for the dataset
suite_data = expectations_data[data_type]
expectation_suite_name = suite_data["expectation_suite_name"]
new_expectations = suite_data["expectations"]

# Fetch existing expectations from the data context
existing_suite = data_context.get_expectation_suite(expectation_suite_name)
existing_expectations = existing_suite.expectations

# Convert new expectations from JSON format to ExpectationConfiguration objects
new_expectations_configs = [
ExpectationConfiguration(
expectation_type=exp["expectation_type"], kwargs=exp["kwargs"]
)
for exp in new_expectations
]

# Combine existing expectations with new ones
updated_expectations = existing_expectations + new_expectations_configs

# Update the expectation suite in the data context
data_context.add_or_update_expectation_suite(
expectation_suite_name=expectation_suite_name, expectations=updated_expectations
)


def main():
# args = read_args()
import pdb

pdb.set_trace()
args = {
"parquet_bucket": "recover-dev-processed-data",
"namespace": "etl-616",
"data_type": "healthkitv2heartbeats_expectations",
"expectation_suite_path": "recover-dev-cloudformation/etl-616/src/glue/resources/data_values_expectations.json",
}
context = create_context(
s3_bucket=args["parquet_bucket"],
namespace=args["namespace"],
report_prefix=f"great_expectation_reports/{args['data_type']}/parquet/",
)
glue_context = GlueContext(SparkContext.getOrCreate())
logger.info("get_spark_df")
spark_df = get_spark_df(
glue_context=glue_context,
parquet_bucket=args["parquet_bucket"],
namespace=args["namespace"],
datatype=args["data_type"],
)
logger.info("isNull")
null_rows = spark_df.ParticipantIdentifier.isNull()
logger.info("filter")
filtered_results = spark_df.filter(null_rows)
logger.info("collect")
result = filtered_results.collect()
logger.info("null_rows: %s", result)
logger.info("get_batch_request")
batch_request = get_batch_request(spark_df)
logger.info("add_expectations")
# add_expectations(
# data_context=context,
# expectation_suite_name="my_expectation_suite"
# )
add_expectations_from_json(
data_context=context,
json_file_path=args["expectation_suite_path"],
data_type=args["data_type"],
)
logger.info("get_validator")
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name="my_expectation_suite"
)
logger.info("validator.validate")
validation_result = validator.validate()
logger.info("validation_result: %s", validation_result)


if __name__ == "__main__":
main()
13 changes: 13 additions & 0 deletions src/glue/resources/data_values_expectations.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"healthkitv2heartbeats": {
"expectation_suite_name": "healthkitv2heartbeats_expectations",
"expectations": [
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "HealthKitHeartbeatKey"
}
}
]
}
}
113 changes: 113 additions & 0 deletions templates/glue-job-run-great-expectations.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
AWSTemplateFormatVersion: '2010-09-09'
Description: >-
An AWS Glue job in the data catalog. An AWS Glue job encapsulates a script
that connects to your source data, processes it, and then writes it out
to your data target.

Parameters:
Namespace:
Type: String
Description: >-
The namespace string used to for the individual glue job names

JobDescription:
Type: String
Description: A fuller description of what the job does.
Default: ''

JobRole:
Type: String
Description: The name or ARN of the IAM role that will run this job.

TempS3Bucket:
Type: String
Description: The name of the S3 bucket where temporary files and logs are written.

S3ScriptBucket:
Type: String
Description: The name of the S3 bucket where the script file is located.

S3ScriptKey:
Type: String
Description: The bucket key where the script file is located.

ExpectationSuiteKey
Type: String
Description: The bucket key where the expectation suite is located.

GlueVersion:
Type: String
Description: The version of glue to use for this job

DefaultWorkerType:
Type: String
Description: >-
Which worker type to use for this job.
Default: 'Standard'

DefaultNumberOfWorkers:
Type: Number
Description: >-
How many DPUs to allot to this job. This parameter is not used for types
FitbitIntradayCombined and HealthKitV2Samples.
Default: 1

MaxRetries:
Type: Number
Description: How many times to retry the job if it fails (integer).
Default: 0 # TODO change this to 1 after initial development

TimeoutInMinutes:
Type: Number
Description: The job timeout in minutes (integer).
Default: 1200

AdditionalPythonModules:
Type: String
Description: >-
Additional python packages to install as a comma-delimited list.
Any format supported by pip3 is supported here.

Resources:

{% set datasets = [] %}
{% for v in sceptre_user_data.dataset_schemas.tables.keys() if not "Deleted" in v %}
{% set dataset = {} %}
{% do dataset.update({"type": v}) %}
{% do dataset.update({"table_name": "dataset_" + v.lower()})%}
{% do dataset.update({"stackname_prefix": "{}".format(v.replace("_",""))}) %}
{% do datasets.append(dataset) %}
{% endfor %}

{% for dataset in datasets %}
{{ dataset["stackname_prefix"] }}ParquetJob:
Type: AWS::Glue::Job
Properties:
Command:
Name: glueetl
ScriptLocation: !Sub s3://${S3ScriptBucket}/${S3ScriptKey}
DefaultArguments:
--TempDir: !Sub s3://${TempS3Bucket}/tmp
--enable-continuous-cloudwatch-log: true
--enable-metrics: true
--enable-spark-ui: true
--spark-event-logs-path: !Sub s3://${TempS3Bucket}/spark-logs/${AWS::StackName}/
--job-bookmark-option: job-bookmark-disable
--job-language: python
--glue-table: {{ dataset["table_name"] }}
--additional-python-modules: !Ref AdditionalPythonModules
# --conf spark.sql.adaptive.enabled
Description: !Sub "${JobDescription} for data type {{ dataset['type'] }}"
GlueVersion: !Ref GlueVersion
MaxRetries: !Ref MaxRetries
Name: !Sub "${Namespace}-{{ dataset["stackname_prefix"] }}-Job"
{% if dataset["type"] == "HealthKitV2Samples" or dataset["type"] == "FitbitIntradayCombined" -%}
WorkerType: !Ref LargeJobWorkerType
NumberOfWorkers: !Ref LargeJobNumberOfWorkers
{% else -%}
WorkerType: !Ref DefaultWorkerType
NumberOfWorkers: !Ref DefaultNumberOfWorkers
{%- endif %}
Role: !Ref JobRole
Timeout: !Ref TimeoutInMinutes
{% endfor %}
2 changes: 1 addition & 1 deletion tests/Dockerfile.aws_glue_4
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM amazon/aws-glue-libs:glue_libs_4.0.0_image_01

RUN pip3 install moto~=4.1 pytest-datadir ecs_logging~=2.0
RUN pip3 install moto~=4.1 pytest-datadir ecs_logging~=2.0 great_expectations~=0.17
ENTRYPOINT ["bash", "-l"]

0 comments on commit ac378fd

Please sign in to comment.