From 5ec94304a7b4f9fc90dfb92f0eebcf003690d9db Mon Sep 17 00:00:00 2001 From: Phil Snyder Date: Fri, 18 Oct 2024 15:23:24 -0700 Subject: [PATCH] consolidate GX docs to a single docsite --- .../jobs/run_great_expectations_on_parquet.py | 389 +++++++++--------- src/glue/resources/great_expectations.yml | 58 +++ 2 files changed, 252 insertions(+), 195 deletions(-) create mode 100644 src/glue/resources/great_expectations.yml diff --git a/src/glue/jobs/run_great_expectations_on_parquet.py b/src/glue/jobs/run_great_expectations_on_parquet.py index 68c0e99..d285139 100644 --- a/src/glue/jobs/run_great_expectations_on_parquet.py +++ b/src/glue/jobs/run_great_expectations_on_parquet.py @@ -5,9 +5,11 @@ from typing import Dict import boto3 -import great_expectations as gx from awsglue.context import GlueContext from awsglue.utils import getResolvedOptions +from pyspark.context import SparkContext + +import great_expectations as gx from great_expectations.core.batch import RuntimeBatchRequest from great_expectations.core.expectation_configuration import ExpectationConfiguration from great_expectations.core.run_identifier import RunIdentifier @@ -17,7 +19,6 @@ ExpectationSuiteIdentifier, ValidationResultIdentifier, ) -from pyspark.context import SparkContext logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -77,83 +78,82 @@ def create_context( configurations """ context = gx.get_context() - add_datasource(context) - add_validation_stores(context, s3_bucket, namespace, key_prefix) - add_data_docs_sites(context, s3_bucket, namespace, key_prefix) - return context - - -def add_datasource(context: "EphemeralDataContext") -> "EphemeralDataContext": - """Adds the spark datasource - - Args: - context (EphemeralDataContext): data context to add to - - Returns: - EphemeralDataContext: data context object with datasource configuration - added - """ - yaml = YAMLHandler() - context.add_datasource( - **yaml.load( - """ - name: spark_datasource - class_name: Datasource - execution_engine: - class_name: SparkDFExecutionEngine - force_reuse_spark_context: true - data_connectors: - runtime_data_connector: - class_name: RuntimeDataConnector - batch_identifiers: - - batch_identifier - """ - ) - ) + # add_datasource(context) + # add_validation_stores(context, s3_bucket, namespace, key_prefix) + add_data_docs_sites(context, s3_bucket, namespace) return context -def add_validation_stores( - context: "EphemeralDataContext", +# def add_datasource(context: "EphemeralDataContext") -> "EphemeralDataContext": +# """Adds the spark datasource + +# Args: +# context (EphemeralDataContext): data context to add to + +# Returns: +# EphemeralDataContext: data context object with datasource configuration +# added +# """ +# yaml = YAMLHandler() +# context.add_datasource( +# **yaml.load( +# """ +# name: spark_datasource +# class_name: Datasource +# execution_engine: +# class_name: SparkDFExecutionEngine +# force_reuse_spark_context: true +# data_connectors: +# runtime_data_connector: +# class_name: RuntimeDataConnector +# batch_identifiers: +# - batch_identifier +# """ +# ) +# ) +# return context + + +# def add_validation_stores( +# context: "EphemeralDataContext", +# s3_bucket: str, +# namespace: str, +# key_prefix: str, +# ) -> "EphemeralDataContext": +# """Adds the validation store configurations to the context object + +# Args: +# context (EphemeralDataContext): data context to add to +# s3_bucket (str): name of the s3 bucket to save validation results to +# namespace (str): name of the namespace +# key_prefix (str): s3 key prefix to save the +# validation results to + +# Returns: +# EphemeralDataContext: data context object with validation stores' +# configuration added +# """ +# # Programmatically configure the validation result store and +# # DataDocs to use S3 +# context.add_store( +# "validation_result_store", +# { +# "class_name": "ValidationsStore", +# "store_backend": { +# "class_name": "TupleS3StoreBackend", +# "bucket": s3_bucket, +# "prefix": f"{namespace}/{key_prefix}", +# }, +# }, +# ) +# return context + + +def update_data_docs_sites( + context: gx.data_context.AbstractDataContext, s3_bucket: str, namespace: str, - key_prefix: str, -) -> "EphemeralDataContext": - """Adds the validation store configurations to the context object - - Args: - context (EphemeralDataContext): data context to add to - s3_bucket (str): name of the s3 bucket to save validation results to - namespace (str): name of the namespace - key_prefix (str): s3 key prefix to save the - validation results to - - Returns: - EphemeralDataContext: data context object with validation stores' - configuration added - """ - # Programmatically configure the validation result store and - # DataDocs to use S3 - context.add_store( - "validation_result_store", - { - "class_name": "ValidationsStore", - "store_backend": { - "class_name": "TupleS3StoreBackend", - "bucket": s3_bucket, - "prefix": f"{namespace}/{key_prefix}", - }, - }, - ) - return context - - -def add_data_docs_sites( - context: "EphemeralDataContext", - s3_bucket: str, - namespace: str, - key_prefix: str, -) -> "EphemeralDataContext": +) -> gx.data_context.AbstractDataContext: """Adds the data docs sites configuration to the context object so data docs can be saved to a s3 location. This is a special workaround to add the data docs because we're using EphemeralDataContext @@ -170,19 +170,18 @@ def add_data_docs_sites( EphemeralDataContext: data context object with data docs sites' configuration added """ - data_context_config = DataContextConfig() - data_context_config["data_docs_sites"] = { - "s3_site": { + context.update_data_docs_site( + site_name="s3_site", + site_config={ "class_name": "SiteBuilder", "store_backend": { "class_name": "TupleS3StoreBackend", "bucket": s3_bucket, - "prefix": f"{namespace}/{key_prefix}", + "prefix": f"{namespace}/great_expectation_reports/parquet/", }, "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"}, - } - } - context._project_config["data_docs_sites"] = data_context_config["data_docs_sites"] + }, + ) return context @@ -212,29 +211,23 @@ def get_spark_df( def get_batch_request( + gx_context: gx.data_context.AbstractDataContext, spark_dataset: "pyspark.sql.dataframe.DataFrame", data_type: str, - run_id: RunIdentifier, -) -> RuntimeBatchRequest: +) -> gx.datasource.fluent.batch_request: """Retrieves the unique metadata for this batch request Args: spark_dataset (pyspark.sql.dataframe.DataFrame): parquet dataset as spark df data_type (str): data type name - run_id (RunIdentifier): contains the run name and - run time metadata of this batch run Returns: RuntimeBatchRequest: contains metadata for the batch run request to identify this great expectations run """ - batch_request = RuntimeBatchRequest( - datasource_name="spark_datasource", - data_connector_name="runtime_data_connector", - data_asset_name=f"{data_type}-parquet-data-asset", - runtime_parameters={"batch_data": spark_dataset}, - batch_identifiers={"batch_identifier": f"{data_type}_{run_id.run_name}_batch"}, - ) + data_source = gx_context.sources.add_or_update_spark(name="parquet") + data_asset = data_source.add_dataframe_asset(name=f"{data_type}_spark_dataframe") + batch_request = data_asset.build_batch_request(dataframe=spark_dataset) return batch_request @@ -263,87 +256,78 @@ def read_json( def add_expectations_from_json( expectations_data: Dict[str, str], - context: "EphemeralDataContext", - data_type: str, -) -> "EphemeralDataContext": + context: gx.data_context.AbstractDataContext, +) -> gx.data_context.AbstractDataContext: """Adds in the read in expectations to the context object Args: expectations_data (Dict[str, str]): expectations context (EphemeralDataContext): context object - data_type (str): name of the data type - - Raises: - ValueError: thrown when no expectations exist for this data type Returns: - EphemeralDataContext: context object with expectations added + AbstractDataContext: context object with expectations added """ - # Ensure the data type exists in the JSON file - if data_type not in expectations_data: - raise ValueError(f"No expectations found for data type '{data_type}'") - - # Extract the expectation suite and expectations for the dataset - suite_data = expectations_data[data_type] - expectation_suite_name = suite_data["expectation_suite_name"] - new_expectations = suite_data["expectations"] - - # Convert new expectations from JSON format to ExpectationConfiguration objects - new_expectations_configs = [ - ExpectationConfiguration( - expectation_type=exp["expectation_type"], kwargs=exp["kwargs"] + for data_type in expectations_data: + suite_data = expectations_data[data_type] + expectation_suite_name = suite_data["expectation_suite_name"] + new_expectations = suite_data["expectations"] + + # Convert new expectations from dict to ExpectationConfiguration objects + new_expectations_configs = [ + ExpectationConfiguration( + expectation_type=exp["expectation_type"], kwargs=exp["kwargs"] + ) + for exp in new_expectations + ] + + # Update the expectation suite in the data context + context.add_or_update_expectation_suite( + expectation_suite_name=expectation_suite_name, + expectations=new_expectations_configs, ) - for exp in new_expectations - ] - - # Update the expectation suite in the data context - context.add_or_update_expectation_suite( - expectation_suite_name=expectation_suite_name, - expectations=new_expectations_configs, - ) return context -def add_validation_results_to_store( - context: "EphemeralDataContext", - expectation_suite_name: str, - validation_result: Dict[str, str], - batch_identifier: RuntimeBatchRequest, - run_identifier: RunIdentifier, -) -> "EphemeralDataContext": - """Adds the validation results manually to the validation store. - This is a workaround for a EphemeralDataContext context object, - and for us to avoid complicating our folder structure to include - checkpoints/other more persistent data context object types - until we need that feature - - Args: - context (EphemeralDataContext): context object to add results to - expectation_suite_name (str): name of expectation suite - validation_result (Dict[str, str]): results outputted by gx - validator to be stored - batch_identifier (RuntimeBatchRequest): metadata containing details of - the batch request - run_identifier (RunIdentifier): metadata containing details of the gx run - - Returns: - EphemeralDataContext: context object with validation results added to - """ - expectation_suite = context.get_expectation_suite(expectation_suite_name) - # Create an ExpectationSuiteIdentifier - expectation_suite_identifier = ExpectationSuiteIdentifier( - expectation_suite_name=expectation_suite.expectation_suite_name - ) - - # Create a ValidationResultIdentifier using the run_id, expectation suite, and batch identifier - validation_result_identifier = ValidationResultIdentifier( - expectation_suite_identifier=expectation_suite_identifier, - batch_identifier=batch_identifier, - run_id=run_identifier, - ) - - context.validations_store.set(validation_result_identifier, validation_result) - return context +# def add_validation_results_to_store( +# context: "EphemeralDataContext", +# expectation_suite_name: str, +# validation_result: Dict[str, str], +# batch_identifier: RuntimeBatchRequest, +# run_identifier: RunIdentifier, +# ) -> "EphemeralDataContext": +# """Adds the validation results manually to the validation store. +# This is a workaround for a EphemeralDataContext context object, +# and for us to avoid complicating our folder structure to include +# checkpoints/other more persistent data context object types +# until we need that feature + +# Args: +# context (EphemeralDataContext): context object to add results to +# expectation_suite_name (str): name of expectation suite +# validation_result (Dict[str, str]): results outputted by gx +# validator to be stored +# batch_identifier (RuntimeBatchRequest): metadata containing details of +# the batch request +# run_identifier (RunIdentifier): metadata containing details of the gx run + +# Returns: +# EphemeralDataContext: context object with validation results added to +# """ +# expectation_suite = context.get_expectation_suite(expectation_suite_name) +# # Create an ExpectationSuiteIdentifier +# expectation_suite_identifier = ExpectationSuiteIdentifier( +# expectation_suite_name=expectation_suite.expectation_suite_name +# ) + +# # Create a ValidationResultIdentifier using the run_id, expectation suite, and batch identifier +# validation_result_identifier = ValidationResultIdentifier( +# expectation_suite_identifier=expectation_suite_identifier, +# batch_identifier=batch_identifier, +# run_id=run_identifier, +# ) + +# context.validations_store.set(validation_result_identifier, validation_result) +# return context def main(): @@ -351,24 +335,15 @@ def main(): run_id = RunIdentifier(run_name=f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}") expectation_suite_name = f"{args['data_type']}_expectations" s3 = boto3.client("s3") - context = create_context( + + # Set up Great Expectations + gx_context = gx.get_context() + logger.info("update data docs site") + gx_context = update_data_docs_sites( + context=gx_context, s3_bucket=args["shareable_artifacts_bucket"], namespace=args["namespace"], - key_prefix=f"great_expectation_reports/{args['data_type']}/parquet/", - ) - glue_context = GlueContext(SparkContext.getOrCreate()) - logger.info("get_spark_df") - spark_df = get_spark_df( - glue_context=glue_context, - parquet_bucket=args["parquet_bucket"], - namespace=args["namespace"], - data_type=args["data_type"], ) - logger.info("get_batch_request") - batch_request = get_batch_request(spark_df, args["data_type"], run_id) - logger.info("add_expectations") - - # Load the JSON file with the expectations logger.info("reads_expectations_from_json") expectations_data = read_json( s3=s3, @@ -376,31 +351,55 @@ def main(): key=args["expectation_suite_key"], ) logger.info("adds_expectations_from_json") - add_expectations_from_json( + gx_context = add_expectations_from_json( expectations_data=expectations_data, - context=context, - data_type=args["data_type"], + context=gx_context, ) - logger.info("get_validator") - validator = context.get_validator( - batch_request=batch_request, - expectation_suite_name=expectation_suite_name, - ) - logger.info("validator.validate") - validation_result = validator.validate() - logger.info("validation_result: %s", validation_result) + # Set up Spark + glue_context = GlueContext(SparkContext.getOrCreate()) + logger.info("get_spark_df") + spark_df = get_spark_df( + glue_context=glue_context, + parquet_bucket=args["parquet_bucket"], + namespace=args["namespace"], + data_type=args["data_type"], + ) - add_validation_results_to_store( - context, - expectation_suite_name, - validation_result, - batch_identifier=batch_request["batch_identifiers"]["batch_identifier"], - run_identifier=run_id, + # Put the two together and validate the GX expectations + logger.info("get_batch_request") + batch_request = get_batch_request( + gx_context=gx_context, spark_dataset=spark_df, data_type=args["data_type"] ) - context.build_data_docs( - site_names=["s3_site"], + logger.info("add checkpoint") + checkpoint = gx_context.add_or_update_checkpoint( + name=f"{args['data_type']}-checkpoint", + expectation_suite_name=expectation_suite_name, + batch_request=batch_request, ) + logger.info("run checkpoint") + checkpoint_result = checkpoint.run(run_name=f"run_{datetime.now()}") + # logger.info("get_validator") + # validator = gx_context.get_validator( + # batch_request=batch_request, + # expectation_suite_name=expectation_suite_name, + # ) + # logger.info("validator.validate") + # validation_result = validator.validate() + + # logger.info("validation_result: %s", validation_result) + + # Use checkpoint action instead + # add_validation_results_to_store( + # gx_context, + # expectation_suite_name, + # validation_result, + # batch_identifier=batch_request["batch_identifiers"]["batch_identifier"], + # run_identifier=run_id, + # ) + # gx_context.build_data_docs( + # site_names=["s3_site"], + # ) logger.info("data docs saved!") diff --git a/src/glue/resources/great_expectations.yml b/src/glue/resources/great_expectations.yml new file mode 100644 index 0000000..9ce83cd --- /dev/null +++ b/src/glue/resources/great_expectations.yml @@ -0,0 +1,58 @@ +config_version: 3.0 +stores: + expectations_store: + class_name: ExpectationsStore + store_backend: + class_name: TupleFilesystemStoreBackend + base_directory: expectations/ + validations_store: + class_name: ValidationsStore + store_backend: + class_name: TupleFilesystemStoreBackend + base_directory: uncommitted/validations/ + evaluation_parameter_store: + class_name: EvaluationParameterStore + checkpoint_store: + class_name: CheckpointStore + store_backend: + class_name: TupleFilesystemStoreBackend + suppress_store_backend_id: true + base_directory: checkpoints/ + profiler_store: + class_name: ProfilerStore + store_backend: + class_name: TupleFilesystemStoreBackend + suppress_store_backend_id: true + base_directory: profilers/ +expectations_store_name: expectations_store +validations_store_name: validations_store +evaluation_parameter_store_name: evaluation_parameter_store +checkpoint_store_name: checkpoint_store +datasources: + spark_datasource: + class_name: Datasource + execution_engine: + class_name: SparkDFExecutionEngine + force_reuse_spark_context: true + data_connectors: + runtime_data_connector: + class_name: RuntimeDataConnector + batch_identifiers: + - batch_identifier +fluent_datasources: + parquet: + type: spark + assets: {} +data_docs_sites: + s3_site: + class_name: SiteBuilder + store_backend: + class_name: TupleS3StoreBackend + bucket: recover-shareable-artifacts-vpn + prefix: main/great_expectation_reports/parquet + site_index_builder: + class_name: DefaultSiteIndexBuilder +include_rendered_content: + globally: false + expectation_suite: false + expectation_validation_result: false