Skip to content

Commit

Permalink
refactor gx code, add tests, adjust gx version
Browse files Browse the repository at this point in the history
  • Loading branch information
rxu17 committed Sep 6, 2024
1 parent 26e31e9 commit 8e45790
Show file tree
Hide file tree
Showing 5 changed files with 394 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ parameters:
S3ScriptBucket: {{ stack_group_config.template_bucket_name }}
S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py'
GlueVersion: "{{ stack_group_config.json_to_parquet_glue_version }}"
AdditionalPythonModules: great_expectations~=0.17
AdditionalPythonModules: great_expectations~=0.18
stack_tags:
{{ stack_group_config.default_stack_tags }}
sceptre_user_data:
Expand Down
242 changes: 200 additions & 42 deletions src/glue/jobs/run_great_expectations_on_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import sys
from datetime import datetime
from typing import Dict, List
from typing import Dict

import boto3
import great_expectations as gx
Expand Down Expand Up @@ -61,8 +61,38 @@ def validate_args(value: str) -> None:
return None


def create_context(s3_bucket: str, namespace: str, report_prefix: str):
def create_context(
s3_bucket: str, namespace: str, key_prefix: str
) -> "EphemeralDataContext":
"""Creates the data context and adds stores,
datasource and data docs configurations
Args:
s3_bucket (str): name of s3 bucket to store to
namespace (str): namespace
key_prefix (str): s3 key prefix
Returns:
EphemeralDataContext: context object with all
configurations
"""
context = gx.get_context()
add_datasource(context)
add_validation_stores(context, s3_bucket, namespace, key_prefix)
add_data_docs_sites(context, s3_bucket, namespace, key_prefix)
return context


def add_datasource(context: "EphemeralDataContext") -> "EphemeralDataContext":
"""Adds the spark datasource
Args:
context (EphemeralDataContext): data context to add to
Returns:
EphemeralDataContext: data context object with datasource configuration
added
"""
yaml = YAMLHandler()
context.add_datasource(
**yaml.load(
Expand All @@ -80,6 +110,28 @@ def create_context(s3_bucket: str, namespace: str, report_prefix: str):
"""
)
)
return context


def add_validation_stores(
context: "EphemeralDataContext",
s3_bucket: str,
namespace: str,
key_prefix: str,
) -> "EphemeralDataContext":
"""Adds the validation store configurations to the context object
Args:
context (EphemeralDataContext): data context to add to
s3_bucket (str): name of the s3 bucket to save validation results to
namespace (str): name of the namespace
key_prefix (str): s3 key prefix to save the
validation results to
Returns:
EphemeralDataContext: data context object with validation stores'
configuration added
"""
# Programmatically configure the validation result store and
# DataDocs to use S3
context.add_store(
Expand All @@ -89,7 +141,7 @@ def create_context(s3_bucket: str, namespace: str, report_prefix: str):
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": s3_bucket,
"prefix": f"{namespace}/{report_prefix}",
"prefix": f"{namespace}/{key_prefix}",
},
},
)
Expand All @@ -101,16 +153,39 @@ def create_context(s3_bucket: str, namespace: str, report_prefix: str):
"class_name": "EvaluationParameterStore",
},
)
return context


# Update DataDocs sites in the context's configuration
def add_data_docs_sites(
context: "EphemeralDataContext",
s3_bucket: str,
namespace: str,
key_prefix: str,
) -> "EphemeralDataContext":
"""Adds the data docs sites configuration to the context object
so data docs can be saved to a s3 location. This is a special
workaround to add the data docs because we're using EphemeralDataContext
context objects and they don't store to memory.
Args:
context (EphemeralDataContext): data context to add to
s3_bucket (str): name of the s3 bucket to save gx docs to
namespace (str): name of the namespace
key_prefix (str): s3 key prefix to save the
gx docs to
Returns:
EphemeralDataContext: data context object with data docs sites'
configuration added
"""
data_context_config = DataContextConfig()
data_context_config["data_docs_sites"] = {
"s3_site": {
"class_name": "SiteBuilder",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": s3_bucket,
"prefix": f"{namespace}/{report_prefix}",
"prefix": f"{namespace}/{key_prefix}",
},
"site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
}
Expand All @@ -119,8 +194,22 @@ def create_context(s3_bucket: str, namespace: str, report_prefix: str):
return context


def get_spark_df(glue_context, parquet_bucket, namespace, datatype):
s3_parquet_path = f"s3://{parquet_bucket}/{namespace}/parquet/dataset_{datatype}/"
def get_spark_df(
glue_context: GlueContext, parquet_bucket: str, namespace: str, data_type: str
) -> "pyspark.sql.dataframe.DataFrame":
"""Reads in the parquet dataset as a Dynamic Frame and converts it
to a spark dataframe
Args:
glue_context (GlueContext): the aws glue context object
parquet_bucket (str): the name of the bucket holding parquet files
namespace (str): the namespace
data_type (str): the data type name
Returns:
pyspark.sql.dataframe.DataFrame: spark dataframe of the read in parquet dataset
"""
s3_parquet_path = f"s3://{parquet_bucket}/{namespace}/parquet/dataset_{data_type}/"
dynamic_frame = glue_context.create_dynamic_frame_from_options(
connection_type="s3",
connection_options={"paths": [s3_parquet_path]},
Expand All @@ -130,7 +219,23 @@ def get_spark_df(glue_context, parquet_bucket, namespace, datatype):
return spark_df


def get_batch_request(spark_dataset, data_type, run_id):
def get_batch_request(
spark_dataset: "pyspark.sql.dataframe.DataFrame",
data_type: str,
run_id: RunIdentifier,
) -> RuntimeBatchRequest:
"""Retrieves the unique metadata for this batch request
Args:
spark_dataset (pyspark.sql.dataframe.DataFrame): parquet dataset as spark df
data_type (str): data type name
run_id (RunIdentifier): contains the run name and
run time metadata of this batch run
Returns:
RuntimeBatchRequest: contains metadata for the batch run request
to identify this great expectations run
"""
batch_request = RuntimeBatchRequest(
datasource_name="spark_datasource",
data_connector_name="runtime_data_connector",
Expand All @@ -145,8 +250,18 @@ def read_json(
s3: boto3.client,
s3_bucket: str,
expectations_key_prefix: str,
) -> List[str]:
""" """
) -> Dict[str, str]:
"""Reads in a json object
Args:
s3 (boto3.client): s3 client connection
s3_bucket (str): name of the s3 bucket to read from
expectations_key_prefix (str): s3 key prefix of the
location of the expectations json to read from
Returns:
Dict[str, str]: the expectations suite read in from json
"""
# read in the json filelist
s3_response_object = s3.get_object(Bucket=s3_bucket, Key=expectations_key_prefix)
json_content = s3_response_object["Body"].read().decode("utf-8")
Expand All @@ -155,21 +270,23 @@ def read_json(


def add_expectations_from_json(
s3_client: boto3.client,
cfn_bucket: str,
data_context,
expectations_key_prefix: str,
expectations_data: Dict[str, str],
context: "EphemeralDataContext",
data_type: str,
):
# Load the JSON file with the expectations
expectations_data = read_json(
s3=s3_client,
s3_bucket=cfn_bucket,
expectations_key_prefix=expectations_key_prefix,
)
) -> "EphemeralDataContext":
"""Adds in the read in expectations to the context object
Args:
expectations_data (Dict[str, str]): expectations
context (EphemeralDataContext): context object
data_type (str): name of the data type
Raises:
ValueError: thrown when no expectations exist for this data type
"""
# Ensure the dataset exists in the JSON file
if data_type not in expectations_data:
raise ValueError(f"Dataset '{data_type}' not found in the JSON file.")
raise ValueError(f"No expectations found for dataset '{data_type}'")

# Extract the expectation suite and expectations for the dataset
suite_data = expectations_data[data_type]
Expand All @@ -185,15 +302,59 @@ def add_expectations_from_json(
]

# Update the expectation suite in the data context
data_context.add_or_update_expectation_suite(
context.add_or_update_expectation_suite(
expectation_suite_name=expectation_suite_name,
expectations=new_expectations_configs,
)
return "EphemeralDataContext"


def add_validation_results_to_store(
context: "EphemeralDataContext",
expectation_suite_name: str,
validation_result: Dict[str, str],
batch_identifier: RuntimeBatchRequest,
run_identifier: RunIdentifier,
) -> "EphemeralDataContext":
"""Adds the validation results manually to the validation store.
This is a workaround for a EphemeralDataContext context object,
and for us to avoid complicating our folder structure to include
checkpoints/other more persistent data context object types
until we need that feature
Args:
context (EphemeralDataContext): context object to add results to
expectation_suite_name (str): name of expectation suite
validation_result (Dict[str, str]): results outputted by gx
validator to be stored
batch_identifier (RuntimeBatchRequest): metadata containing details of
the batch request
run_identifier (RunIdentifier): metadata containing details of the gx run
Returns:
EphemeralDataContext: context object with validation results added to
"""
expectation_suite = context.get_expectation_suite(expectation_suite_name)
# Create an ExpectationSuiteIdentifier
expectation_suite_identifier = ExpectationSuiteIdentifier(
expectation_suite_name=expectation_suite.expectation_suite_name
)

# Create a ValidationResultIdentifier using the run_id, expectation suite, and batch identifier
validation_result_identifier = ValidationResultIdentifier(
expectation_suite_identifier=expectation_suite_identifier,
batch_identifier=batch_identifier,
run_id=run_identifier,
)

context.validations_store.set(validation_result_identifier, validation_result)
return context


def main():
args = read_args()
run_id = RunIdentifier(run_name=f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
expectation_suite_name = f"{args['data_type']}_expectations"
s3 = boto3.client("s3")
context = create_context(
s3_bucket=args["shareable_artifacts_bucket"],
Expand All @@ -219,39 +380,36 @@ def main():
batch_request = get_batch_request(spark_df, args["data_type"], run_id)
logger.info("add_expectations")

# Load the JSON file with the expectations
logger.info("reads_expectations_from_json")
expectations_data = read_json(
s3=s3,
s3_bucket=args["cfn_bucket"],
expectations_key_prefix=args["expectation_suite_key_prefix"],
)
logger.info("adds_expectations_from_json")
add_expectations_from_json(
s3_client=s3,
cfn_bucket=args["cfn_bucket"],
expectations_data=expectations_data,
data_context=context,
expectations_key_prefix=args["expectation_suite_key_prefix"],
data_type=args["data_type"],
)
logger.info("get_validator")
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=f"{args['data_type']}_expectations",
expectation_suite_name=expectation_suite_name,
)
logger.info("validator.validate")
validation_result = validator.validate()

logger.info("validation_result: %s", validation_result)

batch_identifier = batch_request["batch_identifiers"]["batch_identifier"]
suite = context.get_expectation_suite(f"{args['data_type']}_expectations")
# Create an ExpectationSuiteIdentifier
expectation_suite_identifier = ExpectationSuiteIdentifier(
expectation_suite_name=suite.expectation_suite_name
add_validation_results_to_store(
context,
expectation_suite_name,
validation_result,
batch_identifier=batch_request["batch_identifiers"]["batch_identifier"],
run_identifier=run_id,
)

# Create a ValidationResultIdentifier using the run_id, expectation suite, and batch identifier
validation_result_identifier = ValidationResultIdentifier(
expectation_suite_identifier=expectation_suite_identifier,
batch_identifier=batch_identifier,
run_id=run_id,
)

context.validations_store.set(validation_result_identifier, validation_result)

context.build_data_docs(
site_names=["s3_site"],
)
Expand Down
2 changes: 1 addition & 1 deletion templates/glue-workflow.j2
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ Resources:
"--parquet-bucket": !Ref ParquetBucketName
"--shareable-artifacts-bucket": !Ref ShareableArtifactsBucketName
"--expectation-suite-key-prefix": !Sub "${Namespace}/src/glue/resources/data_values_expectations.json"
"--additional-python-modules": "great_expectations~=0.17"
"--additional-python-modules": "great_expectations~=0.18"
{% endfor %}
Description: This trigger runs the great expectation parquet jobs after completion of all JSON to Parquet jobs
Type: CONDITIONAL
Expand Down
2 changes: 1 addition & 1 deletion tests/Dockerfile.aws_glue_4
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM amazon/aws-glue-libs:glue_libs_4.0.0_image_01

RUN pip3 install moto~=4.1 pytest-datadir ecs_logging~=2.0 great_expectations~=0.17
RUN pip3 install moto~=4.1 pytest-datadir ecs_logging~=2.0 great_expectations~=0.18
ENTRYPOINT ["bash", "-l"]
Loading

0 comments on commit 8e45790

Please sign in to comment.