diff --git a/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml b/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml new file mode 100644 index 00000000..e0042baa --- /dev/null +++ b/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml @@ -0,0 +1,19 @@ +template: + path: glue-job-run-great-expectations.j2 +dependencies: + - develop/glue-job-role.yaml +stack_name: "{{ stack_group_config.namespace }}-glue-job-RunGreatExpectations" +parameters: + Namespace: {{ stack_group_config.namespace }} + JobDescription: Runs great expectations on a set of data + JobRole: !stack_output_external glue-job-role::RoleArn + TempS3Bucket: {{ stack_group_config.processed_data_bucket_name }} + S3ScriptBucket: {{ stack_group_config.template_bucket_name }} + S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py' + ExpectationSuiteKey: '{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json' + GlueVersion: "{{ stack_group_config.json_to_parquet_glue_version }}" + AdditionalPythonModules: great_expectations~=0.17 +stack_tags: + {{ stack_group_config.default_stack_tags }} +sceptre_user_data: + dataset_schemas: !file src/glue/resources/table_columns.yaml diff --git a/src/glue/jobs/run_great_expectations_on_parquet.py b/src/glue/jobs/run_great_expectations_on_parquet.py new file mode 100644 index 00000000..d31a6409 --- /dev/null +++ b/src/glue/jobs/run_great_expectations_on_parquet.py @@ -0,0 +1,232 @@ +import logging +import sys + +import great_expectations as gx +from awsglue.context import GlueContext +from awsglue.utils import getResolvedOptions +from great_expectations.core.batch import RuntimeBatchRequest +from great_expectations.core.expectation_configuration import ExpectationConfiguration +from great_expectations.core.yaml_handler import YAMLHandler +from pyspark.context import SparkContext + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +formatter = logging.Formatter("%(levelname)s:%(name)s:%(message)s") +handler.setFormatter(formatter) +logger.addHandler(handler) + + +def read_args() -> dict: + """Returns the specific params that our code needs to run""" + args = getResolvedOptions( + sys.argv, + [ + "data-type", + "namespace", + "parquet-bucket", + "expectation-suite-path" "report-prefix", + ], + ) + for arg in args: + validate_args(args[arg]) + return args + + +def validate_args(value: str) -> None: + """Checks to make sure none of the input command line arguments are empty strings + + Args: + value (str): the value of the command line argument parsed by argparse + + Raises: + ValueError: when value is an empty string + """ + if value == "": + raise ValueError("Argument value cannot be an empty string") + else: + return None + + +def create_context(s3_bucket: str, namespace: str, report_prefix: str): + context = gx.get_context() + context.add_expectation_suite(expectation_suite_name="my_expectation_suite") + yaml = YAMLHandler() + context.add_datasource( + **yaml.load( + """ + name: my_spark_datasource + class_name: Datasource + execution_engine: + class_name: SparkDFExecutionEngine + force_reuse_spark_context: true + data_connectors: + my_runtime_data_connector: + class_name: RuntimeDataConnector + batch_identifiers: + - my_batch_identifier + """ + ) + ) + # Programmatically configure the validation result store and + # DataDocs to use S3 + context.add_store( + "validation_result_store", + { + "class_name": "ValidationsStore", + "store_backend": { + "class_name": "TupleS3StoreBackend", + "bucket": s3_bucket, + "prefix": f"{namespace}/{report_prefix}", + }, + }, + ) + + # Add DataDocs site configuration to output results to S3 + context.add_site_builder( + site_name="s3_site", + site_config={ + "class_name": "SiteBuilder", + "store_backend": { + "class_name": "TupleS3StoreBackend", + "bucket": s3_bucket, + "prefix": f"{namespace}/{report_prefix}", + }, + "site_index_builder": { + "class_name": "DefaultSiteIndexBuilder", + }, + }, + ) + + return context + + +def get_spark_df(glue_context, parquet_bucket, namespace, datatype): + s3_parquet_path = f"s3://{parquet_bucket}/{namespace}/parquet/dataset_{datatype}/" + dynamic_frame = glue_context.create_dynamic_frame_from_options( + connection_type="s3", + connection_options={"paths": [s3_parquet_path]}, + format="parquet", + ) + spark_df = dynamic_frame.toDF() + return spark_df + + +def get_batch_request(spark_dataset): + batch_request = RuntimeBatchRequest( + datasource_name="my_spark_datasource", + data_connector_name="my_runtime_data_connector", + data_asset_name="my-parquet-data-asset", + runtime_parameters={"batch_data": spark_dataset}, + batch_identifiers={"my_batch_identifier": "okaybatchidentifier"}, + ) + return batch_request + + +def add_expectations(data_context, expectation_suite_name): + existing_expectations = data_context.get_expectation_suite( + expectation_suite_name + ).expectations + non_null_expectation = ExpectationConfiguration( + expectation_type="expect_column_values_to_not_be_null", + kwargs={"column": "ParticipantIdentifier"}, + ) + data_context.add_or_update_expectation_suite( + expectation_suite_name=expectation_suite_name, + expectations=[non_null_expectation, *existing_expectations], + ) + + +import json + + +def add_expectations_from_json(data_context, expectations_path: str, data_type: str): + # Load the JSON file with the expectations + with open(expectations_path, "r") as file: + expectations_data = json.load(file) + + # Ensure the dataset exists in the JSON file + if data_type not in expectations_data: + raise ValueError(f"Dataset '{data_type}' not found in the JSON file.") + + # Extract the expectation suite and expectations for the dataset + suite_data = expectations_data[data_type] + expectation_suite_name = suite_data["expectation_suite_name"] + new_expectations = suite_data["expectations"] + + # Fetch existing expectations from the data context + existing_suite = data_context.get_expectation_suite(expectation_suite_name) + existing_expectations = existing_suite.expectations + + # Convert new expectations from JSON format to ExpectationConfiguration objects + new_expectations_configs = [ + ExpectationConfiguration( + expectation_type=exp["expectation_type"], kwargs=exp["kwargs"] + ) + for exp in new_expectations + ] + + # Combine existing expectations with new ones + updated_expectations = existing_expectations + new_expectations_configs + + # Update the expectation suite in the data context + data_context.add_or_update_expectation_suite( + expectation_suite_name=expectation_suite_name, expectations=updated_expectations + ) + + +def main(): + # args = read_args() + import pdb + + pdb.set_trace() + args = { + "parquet_bucket": "recover-dev-processed-data", + "namespace": "etl-616", + "data_type": "healthkitv2heartbeats_expectations", + "expectation_suite_path": "recover-dev-cloudformation/etl-616/src/glue/resources/data_values_expectations.json", + } + context = create_context( + s3_bucket=args["parquet_bucket"], + namespace=args["namespace"], + report_prefix=f"great_expectation_reports/{args['data_type']}/parquet/", + ) + glue_context = GlueContext(SparkContext.getOrCreate()) + logger.info("get_spark_df") + spark_df = get_spark_df( + glue_context=glue_context, + parquet_bucket=args["parquet_bucket"], + namespace=args["namespace"], + datatype=args["data_type"], + ) + logger.info("isNull") + null_rows = spark_df.ParticipantIdentifier.isNull() + logger.info("filter") + filtered_results = spark_df.filter(null_rows) + logger.info("collect") + result = filtered_results.collect() + logger.info("null_rows: %s", result) + logger.info("get_batch_request") + batch_request = get_batch_request(spark_df) + logger.info("add_expectations") + # add_expectations( + # data_context=context, + # expectation_suite_name="my_expectation_suite" + # ) + add_expectations_from_json( + data_context=context, + json_file_path=args["expectation_suite_path"], + data_type=args["data_type"], + ) + logger.info("get_validator") + validator = context.get_validator( + batch_request=batch_request, expectation_suite_name="my_expectation_suite" + ) + logger.info("validator.validate") + validation_result = validator.validate() + logger.info("validation_result: %s", validation_result) + + +if __name__ == "__main__": + main() diff --git a/src/glue/resources/data_values_expectations.json b/src/glue/resources/data_values_expectations.json new file mode 100644 index 00000000..19773820 --- /dev/null +++ b/src/glue/resources/data_values_expectations.json @@ -0,0 +1,13 @@ +{ + "healthkitv2heartbeats": { + "expectation_suite_name": "healthkitv2heartbeats_expectations", + "expectations": [ + { + "expectation_type": "expect_column_values_to_be_between", + "kwargs": { + "column": "HealthKitHeartbeatKey" + } + } + ] + } +} diff --git a/templates/glue-job-run-great-expectations.j2 b/templates/glue-job-run-great-expectations.j2 new file mode 100644 index 00000000..10ce4f18 --- /dev/null +++ b/templates/glue-job-run-great-expectations.j2 @@ -0,0 +1,113 @@ +AWSTemplateFormatVersion: '2010-09-09' +Description: >- + An AWS Glue job in the data catalog. An AWS Glue job encapsulates a script + that connects to your source data, processes it, and then writes it out + to your data target. + +Parameters: + Namespace: + Type: String + Description: >- + The namespace string used to for the individual glue job names + + JobDescription: + Type: String + Description: A fuller description of what the job does. + Default: '' + + JobRole: + Type: String + Description: The name or ARN of the IAM role that will run this job. + + TempS3Bucket: + Type: String + Description: The name of the S3 bucket where temporary files and logs are written. + + S3ScriptBucket: + Type: String + Description: The name of the S3 bucket where the script file is located. + + S3ScriptKey: + Type: String + Description: The bucket key where the script file is located. + + ExpectationSuiteKey + Type: String + Description: The bucket key where the expectation suite is located. + + GlueVersion: + Type: String + Description: The version of glue to use for this job + + DefaultWorkerType: + Type: String + Description: >- + Which worker type to use for this job. + Default: 'Standard' + + DefaultNumberOfWorkers: + Type: Number + Description: >- + How many DPUs to allot to this job. This parameter is not used for types + FitbitIntradayCombined and HealthKitV2Samples. + Default: 1 + + MaxRetries: + Type: Number + Description: How many times to retry the job if it fails (integer). + Default: 0 # TODO change this to 1 after initial development + + TimeoutInMinutes: + Type: Number + Description: The job timeout in minutes (integer). + Default: 1200 + + AdditionalPythonModules: + Type: String + Description: >- + Additional python packages to install as a comma-delimited list. + Any format supported by pip3 is supported here. + +Resources: + + {% set datasets = [] %} + {% for v in sceptre_user_data.dataset_schemas.tables.keys() if not "Deleted" in v %} + {% set dataset = {} %} + {% do dataset.update({"type": v}) %} + {% do dataset.update({"table_name": "dataset_" + v.lower()})%} + {% do dataset.update({"stackname_prefix": "{}".format(v.replace("_",""))}) %} + {% do datasets.append(dataset) %} + {% endfor %} + + {% for dataset in datasets %} + {{ dataset["stackname_prefix"] }}ParquetJob: + Type: AWS::Glue::Job + Properties: + Command: + Name: glueetl + ScriptLocation: !Sub s3://${S3ScriptBucket}/${S3ScriptKey} + DefaultArguments: + --TempDir: !Sub s3://${TempS3Bucket}/tmp + --enable-continuous-cloudwatch-log: true + --enable-metrics: true + --enable-spark-ui: true + --spark-event-logs-path: !Sub s3://${TempS3Bucket}/spark-logs/${AWS::StackName}/ + --job-bookmark-option: job-bookmark-disable + --job-language: python + --glue-table: {{ dataset["table_name"] }} + --additional-python-modules: !Ref AdditionalPythonModules + # --conf spark.sql.adaptive.enabled + Description: !Sub "${JobDescription} for data type {{ dataset['type'] }}" + GlueVersion: !Ref GlueVersion + MaxRetries: !Ref MaxRetries + Name: !Sub "${Namespace}-{{ dataset["stackname_prefix"] }}-Job" + {% if dataset["type"] == "HealthKitV2Samples" or dataset["type"] == "FitbitIntradayCombined" -%} + WorkerType: !Ref LargeJobWorkerType + NumberOfWorkers: !Ref LargeJobNumberOfWorkers + {% else -%} + WorkerType: !Ref DefaultWorkerType + NumberOfWorkers: !Ref DefaultNumberOfWorkers + {%- endif %} + Role: !Ref JobRole + Timeout: !Ref TimeoutInMinutes + {% endfor %} diff --git a/tests/Dockerfile.aws_glue_4 b/tests/Dockerfile.aws_glue_4 index 485ecf55..0785b73f 100644 --- a/tests/Dockerfile.aws_glue_4 +++ b/tests/Dockerfile.aws_glue_4 @@ -1,4 +1,4 @@ FROM amazon/aws-glue-libs:glue_libs_4.0.0_image_01 -RUN pip3 install moto~=4.1 pytest-datadir ecs_logging~=2.0 +RUN pip3 install moto~=4.1 pytest-datadir ecs_logging~=2.0 great_expectations~=0.17 ENTRYPOINT ["bash", "-l"]