Skip to content

Commit

Permalink
add complete script
Browse files Browse the repository at this point in the history
  • Loading branch information
rxu17 committed Sep 5, 2024
1 parent 4610016 commit c4b85f4
Showing 1 changed file with 99 additions and 69 deletions.
168 changes: 99 additions & 69 deletions src/glue/jobs/run_great_expectations_on_parquet.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
import json
import logging
import sys
from datetime import datetime
from typing import Dict, List

import boto3
import great_expectations as gx
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.core.run_identifier import RunIdentifier
from great_expectations.core.yaml_handler import YAMLHandler
from great_expectations.data_context.types.base import DataContextConfig
from great_expectations.data_context.types.resource_identifiers import (
ExpectationSuiteIdentifier,
ValidationResultIdentifier,
)
from pyspark.context import SparkContext

logger = logging.getLogger(__name__)
Expand All @@ -23,10 +33,12 @@ def read_args() -> dict:
args = getResolvedOptions(
sys.argv,
[
"data-type",
"namespace",
"parquet-bucket",
"expectation-suite-path" "report-prefix",
"shareable-artifacts-bucket",
"cfn-bucket",
"namespace",
"data-type",
"expectation-suite-key-prefix",
],
)
for arg in args:
Expand All @@ -51,21 +63,20 @@ def validate_args(value: str) -> None:

def create_context(s3_bucket: str, namespace: str, report_prefix: str):
context = gx.get_context()
context.add_expectation_suite(expectation_suite_name="my_expectation_suite")
yaml = YAMLHandler()
context.add_datasource(
**yaml.load(
"""
name: my_spark_datasource
name: spark_datasource
class_name: Datasource
execution_engine:
class_name: SparkDFExecutionEngine
force_reuse_spark_context: true
data_connectors:
my_runtime_data_connector:
runtime_data_connector:
class_name: RuntimeDataConnector
batch_identifiers:
- my_batch_identifier
- batch_identifier
"""
)
)
Expand All @@ -83,22 +94,28 @@ def create_context(s3_bucket: str, namespace: str, report_prefix: str):
},
)

# Add DataDocs site configuration to output results to S3
context.add_site_builder(
site_name="s3_site",
site_config={
# Add evaluation parameter store
context.add_store(
"evaluation_parameter_store",
{
"class_name": "EvaluationParameterStore",
},
)

# Update DataDocs sites in the context's configuration
data_context_config = DataContextConfig()
data_context_config["data_docs_sites"] = {
"s3_site": {
"class_name": "SiteBuilder",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": s3_bucket,
"prefix": f"{namespace}/{report_prefix}",
},
"site_index_builder": {
"class_name": "DefaultSiteIndexBuilder",
},
},
)

"site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
}
}
context._project_config["data_docs_sites"] = data_context_config["data_docs_sites"]
return context


Expand All @@ -113,39 +130,43 @@ def get_spark_df(glue_context, parquet_bucket, namespace, datatype):
return spark_df


def get_batch_request(spark_dataset):
def get_batch_request(spark_dataset, data_type, run_id):
batch_request = RuntimeBatchRequest(
datasource_name="my_spark_datasource",
data_connector_name="my_runtime_data_connector",
data_asset_name="my-parquet-data-asset",
datasource_name="spark_datasource",
data_connector_name="runtime_data_connector",
data_asset_name=f"{data_type}-parquet-data-asset",
runtime_parameters={"batch_data": spark_dataset},
batch_identifiers={"my_batch_identifier": "okaybatchidentifier"},
batch_identifiers={"batch_identifier": f"{data_type}_{run_id.run_name}_batch"},
)
return batch_request


def add_expectations(data_context, expectation_suite_name):
existing_expectations = data_context.get_expectation_suite(
expectation_suite_name
).expectations
non_null_expectation = ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "ParticipantIdentifier"},
)
data_context.add_or_update_expectation_suite(
expectation_suite_name=expectation_suite_name,
expectations=[non_null_expectation, *existing_expectations],
)


import json


def add_expectations_from_json(data_context, expectations_path: str, data_type: str):
def read_json(
s3: boto3.client,
s3_bucket: str,
expectations_key_prefix: str,
) -> List[str]:
""" """
# read in the json filelist
s3_response_object = s3.get_object(Bucket=s3_bucket, Key=expectations_key_prefix)
json_content = s3_response_object["Body"].read().decode("utf-8")
expectations = json.loads(json_content)
return expectations


def add_expectations_from_json(
s3_client: boto3.client,
cfn_bucket: str,
data_context,
expectations_key_prefix: str,
data_type: str,
):
# Load the JSON file with the expectations
with open(expectations_path, "r") as file:
expectations_data = json.load(file)

expectations_data = read_json(
s3=s3_client,
s3_bucket=cfn_bucket,
expectations_key_prefix=expectations_key_prefix,
)
# Ensure the dataset exists in the JSON file
if data_type not in expectations_data:
raise ValueError(f"Dataset '{data_type}' not found in the JSON file.")
Expand All @@ -155,10 +176,6 @@ def add_expectations_from_json(data_context, expectations_path: str, data_type:
expectation_suite_name = suite_data["expectation_suite_name"]
new_expectations = suite_data["expectations"]

# Fetch existing expectations from the data context
existing_suite = data_context.get_expectation_suite(expectation_suite_name)
existing_expectations = existing_suite.expectations

# Convert new expectations from JSON format to ExpectationConfiguration objects
new_expectations_configs = [
ExpectationConfiguration(
Expand All @@ -167,28 +184,19 @@ def add_expectations_from_json(data_context, expectations_path: str, data_type:
for exp in new_expectations
]

# Combine existing expectations with new ones
updated_expectations = existing_expectations + new_expectations_configs

# Update the expectation suite in the data context
data_context.add_or_update_expectation_suite(
expectation_suite_name=expectation_suite_name, expectations=updated_expectations
expectation_suite_name=expectation_suite_name,
expectations=new_expectations_configs,
)


def main():
# args = read_args()
import pdb

pdb.set_trace()
args = {
"parquet_bucket": "recover-dev-processed-data",
"namespace": "etl-616",
"data_type": "healthkitv2heartbeats_expectations",
"expectation_suite_path": "recover-dev-cloudformation/etl-616/src/glue/resources/data_values_expectations.json",
}
args = read_args()
run_id = RunIdentifier(run_name=f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
s3 = boto3.client("s3")
context = create_context(
s3_bucket=args["parquet_bucket"],
s3_bucket=args["shareable_artifacts_bucket"],
namespace=args["namespace"],
report_prefix=f"great_expectation_reports/{args['data_type']}/parquet/",
)
Expand All @@ -208,25 +216,47 @@ def main():
result = filtered_results.collect()
logger.info("null_rows: %s", result)
logger.info("get_batch_request")
batch_request = get_batch_request(spark_df)
batch_request = get_batch_request(spark_df, args["data_type"], run_id)
logger.info("add_expectations")
# add_expectations(
# data_context=context,
# expectation_suite_name="my_expectation_suite"
# )

add_expectations_from_json(
s3_client=s3,
cfn_bucket=args["cfn_bucket"],
data_context=context,
json_file_path=args["expectation_suite_path"],
expectations_key_prefix=args["expectation_suite_key_prefix"],
data_type=args["data_type"],
)
logger.info("get_validator")
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name="my_expectation_suite"
batch_request=batch_request,
expectation_suite_name=f"{args['data_type']}_expectations",
)
logger.info("validator.validate")
validation_result = validator.validate()

logger.info("validation_result: %s", validation_result)

batch_identifier = batch_request["batch_identifiers"]["batch_identifier"]
suite = context.get_expectation_suite(f"{args['data_type']}_expectations")
# Create an ExpectationSuiteIdentifier
expectation_suite_identifier = ExpectationSuiteIdentifier(
expectation_suite_name=suite.expectation_suite_name
)

# Create a ValidationResultIdentifier using the run_id, expectation suite, and batch identifier
validation_result_identifier = ValidationResultIdentifier(
expectation_suite_identifier=expectation_suite_identifier,
batch_identifier=batch_identifier,
run_id=run_id,
)

context.validations_store.set(validation_result_identifier, validation_result)

context.build_data_docs(
site_names=["s3_site"],
)
logger.info("data docs saved!")


if __name__ == "__main__":
main()

0 comments on commit c4b85f4

Please sign in to comment.