Skip to content

Commit

Permalink
get GX config template from S3
Browse files Browse the repository at this point in the history
  • Loading branch information
philerooski committed Oct 30, 2024
1 parent 9b8abbf commit 83b8397
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 113 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/cleanup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,9 @@ jobs:
run: pipenv run sceptre --debug --var namespace=${{ github.event.ref }} delete develop/namespaced --yes

- name: Remove artifacts
run: pipenv run python src/scripts/manage_artifacts/artifacts.py --remove --namespace ${{ github.event.ref }} --cfn_bucket ${{ vars.CFN_BUCKET }}
run: |
pipenv run python src/scripts/manage_artifacts/artifacts.py
--remove
--namespace ${{ github.event.ref }}
--cfn_bucket ${{ vars.CFN_BUCKET }}
--shareable-artifacts-bucket ${{ vars.SHAREABLE_ARTIFACTS_BUCKET }}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ parameters:
S3ScriptBucket: {{ stack_group_config.template_bucket_name }}
S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py'
ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json"
GXConfigKey: "{{ stack_group_config.namespace }}/src/glue/resources/great_expectations.yml"
GlueVersion: "{{ stack_group_config.great_expectations_job_glue_version }}"
AdditionalPythonModules: "great_expectations~=0.18,urllib3<2"
stack_tags:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ parameters:
S3ScriptBucket: {{ stack_group_config.template_bucket_name }}
S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py'
ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json"
GXConfigKey: "{{ stack_group_config.namespace }}/src/glue/resources/great_expectations.yml"
GlueVersion: "{{ stack_group_config.great_expectations_job_glue_version }}"
AdditionalPythonModules: "great_expectations~=0.18,urllib3<2"
stack_tags:
Expand Down
113 changes: 58 additions & 55 deletions src/glue/jobs/run_great_expectations_on_parquet.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
import json
import logging
import os
import subprocess
import sys
from datetime import datetime
from typing import Dict

import boto3
import great_expectations as gx
import pyspark
import yaml
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.core.run_identifier import RunIdentifier
from great_expectations.core.yaml_handler import YAMLHandler
from great_expectations.data_context.types.base import DataContextConfig
from great_expectations.data_context.types.resource_identifiers import (
ExpectationSuiteIdentifier,
ValidationResultIdentifier,
)
from pyspark.context import SparkContext

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
Expand All @@ -41,7 +32,7 @@ def read_args() -> dict:
"namespace",
"data-type",
"expectation-suite-key",
"gx-resources-key-prefix",
"gx-config-key",
],
)
for arg in args:
Expand All @@ -60,39 +51,59 @@ def validate_args(value: str) -> None:
"""
if value == "":
raise ValueError("Argument value cannot be an empty string")
else:
return None
return None


def update_data_docs_sites(
context: gx.data_context.AbstractDataContext,
s3_bucket: str,
def configure_gx_config(
gx_config_bucket: str,
gx_config_key: str,
shareable_artifacts_bucket: str,
namespace: str,
) -> gx.data_context.AbstractDataContext:
"""
Updates the `data_docs_sites` configuration to reflect the appropriate environment and namespace
) -> dict:
"""Download and configure a `great_expectations.yml` file locally
This function will download a `great_expectations.yml` file from S3 to a `gx` directory.
This file will be automatically be used to configue the GX data context when calling
`gx.get_context()`.
Args:
context (gx.data_context.AbstractDataContext): The GX data context to update
s3_bucket (str): The S3 bucket where data docs are written
gx_config_bucket (str): S3 bucket containing the `great_expectations.yml` file.
gx_config_key (str): S3 key where this file is located.
shareable_artifacts_bucket (str): S3 bucket where shareable artifacts are written.
namespace (str): The current namespace
Returns:
gx.data_context.AbstractDataContext: The updated GX data context object
"""
context.update_data_docs_site(
site_name="s3_site",
site_config={
"class_name": "SiteBuilder",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": s3_bucket,
"prefix": f"{namespace}/great_expectation_reports/parquet/",
},
"site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
},
gx_config_path = "gx/great_expectations.yml"
os.makedirs("gx", exist_ok=True)
s3_client = boto3.client("s3")
logger.info(
f"Downloading s3://{gx_config_bucket}/{gx_config_key} to {gx_config_path}"
)
return context
s3_client.download_file(
Bucket=gx_config_bucket, Key=gx_config_key, Filename=gx_config_path
)
with open(gx_config_path, "rb") as gx_config_obj:
gx_config = yaml.safe_load(gx_config_obj)
# fmt: off
gx_config["stores"]["validations_store"]["store_backend"]["bucket"] = (
shareable_artifacts_bucket
)
gx_config["stores"]["validations_store"]["store_backend"]["prefix"] = (
gx_config["stores"]["validations_store"]["store_backend"]["prefix"].format(
namespace=namespace
)
)
gx_config["data_docs_sites"]["s3_site"]["store_backend"]["bucket"] = (
shareable_artifacts_bucket
)
gx_config["data_docs_sites"]["s3_site"]["store_backend"]["prefix"] = (
gx_config["data_docs_sites"]["s3_site"]["store_backend"]["prefix"].format(
namespace=namespace
)
)
# fmt: on
with open(gx_config_path, "w", encoding="utf-8") as gx_config_obj:
yaml.dump(gx_config, gx_config_obj)
return gx_config


def get_spark_df(
Expand Down Expand Up @@ -195,7 +206,7 @@ def add_expectations_from_json(

# Convert new expectations from dict to ExpectationConfiguration objects
new_expectations_configs = [
ExpectationConfiguration(
gx.core.expectation_configuration.ExpectationConfiguration(
expectation_type=exp["expectation_type"], kwargs=exp["kwargs"]
)
for exp in new_expectations
Expand All @@ -212,28 +223,20 @@ def add_expectations_from_json(
def main():
args = read_args()
s3 = boto3.client("s3")
# Download GX stores and configuration
subprocess.run(
args=[
"aws",
"s3",
"sync",
f"s3://{os.path.join(args['shareable_artifacts_bucket'], args['gx_resources_key_prefix'])}",
".",
],
check=True,
run_id = gx.core.run_identifier.RunIdentifier(
run_name=f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
)
run_id = RunIdentifier(run_name=f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
expectation_suite_name = f"{args['data_type']}_expectations"

# Set up Great Expectations
gx_context = gx.get_context()
logger.info("update_data_docs_site")
gx_context = update_data_docs_sites(
context=gx_context,
s3_bucket=args["shareable_artifacts_bucket"],
logger.info("configure_gx_config")
configure_gx_config(
gx_config_bucket=args["cfn_bucket"],
gx_config_key=args["gx_config_key"],
shareable_artifacts_bucket=args["shareable_artifacts_bucket"],
namespace=args["namespace"],
)
gx_context = gx.get_context()
logger.info("reads_expectations_from_json")
expectations_data = read_json(
s3=s3,
Expand All @@ -247,7 +250,7 @@ def main():
)

# Set up Spark
glue_context = GlueContext(SparkContext.getOrCreate())
glue_context = GlueContext(pyspark.context.SparkContext.getOrCreate())
logger.info("get_spark_df")
spark_df = get_spark_df(
glue_context=glue_context,
Expand Down
34 changes: 13 additions & 21 deletions src/glue/resources/great_expectations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,51 +8,43 @@ stores:
validations_store:
class_name: ValidationsStore
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: uncommitted/validations/
class_name: TupleS3StoreBackend
suppress_store_backend_id: true
bucket: "{shareable_artifacts_bucket}"
prefix: "{namespace}/great_expectation_reports/parquet/validations/"
evaluation_parameter_store:
class_name: EvaluationParameterStore
checkpoint_store:
class_name: CheckpointStore
store_backend:
class_name: TupleFilesystemStoreBackend
suppress_store_backend_id: true
base_directory: checkpoints/
profiler_store:
class_name: ProfilerStore
store_backend:
class_name: TupleFilesystemStoreBackend
suppress_store_backend_id: true
base_directory: profilers/
expectations_store_name: expectations_store
validations_store_name: validations_store
evaluation_parameter_store_name: evaluation_parameter_store
checkpoint_store_name: checkpoint_store
datasources:
spark_datasource:
class_name: Datasource
execution_engine:
class_name: SparkDFExecutionEngine
force_reuse_spark_context: true
data_connectors:
runtime_data_connector:
class_name: RuntimeDataConnector
batch_identifiers:
- batch_identifier
fluent_datasources:
parquet:
type: spark
assets: {}
data_docs_sites:
s3_site:
class_name: SiteBuilder
store_backend:
class_name: TupleS3StoreBackend
bucket: recover-shareable-artifacts-vpn
prefix: main/great_expectation_reports/parquet
bucket: "{shareable_artifacts_bucket}"
prefix: "{namespace}/great_expectation_reports/parquet/"
site_index_builder:
class_name: DefaultSiteIndexBuilder
include_rendered_content:
globally: false
expectation_suite: false
expectation_validation_result: false
fluent_datasources:
my_parquet_datasource:
type: spark
assets:
my_dataframe_asset:
type: dataframe
batch_metadata: {}
35 changes: 1 addition & 34 deletions src/scripts/manage_artifacts/artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,35 +47,6 @@ def upload(namespace: str, cfn_bucket: str):
execute_command(cmd)


def sync(namespace: str, shareable_artifacts_bucket: str):
"""Sync resources which are not version controlled to this namespace.
In some cases, we do not want to version control some data (like Great Expectations artifacts)
but we need to duplicate this data from the main namespace to a development namespace.
Args:
namespace (str): The development namespace
shareable_artifacts_bucket (str): The S3 bucket containing shareable artifacts
"""
# Copy Great Expectations artifacts to this namespace
source_gx_artifacts = os.path.join(
"s3://", shareable_artifacts_bucket, "main/great_expectation_resources/"
)
target_gx_artifacts = os.path.join(
"s3://", shareable_artifacts_bucket, namespace, "great_expectation_resources/"
)
gx_artifacts_clean_up_cmd = ["aws", "s3", "rm", "--recursive", target_gx_artifacts]
execute_command(gx_artifacts_clean_up_cmd)
gx_artifacts_sync_cmd = [
"aws",
"s3",
"sync",
source_gx_artifacts,
target_gx_artifacts,
]
execute_command(gx_artifacts_sync_cmd)


def delete(namespace: str, cfn_bucket: str):
"""Removes all files recursively for namespace"""
s3_path = os.path.join("s3://", cfn_bucket, namespace)
Expand All @@ -93,13 +64,9 @@ def list_namespaces(cfn_bucket: str):
def main(args):
if args.upload:
upload(args.namespace, args.cfn_bucket)
if args.namespace != "main":
sync(
namespace=args.namespace,
shareable_artifacts_bucket=args.shareable_artifacts_bucket,
)
elif args.remove:
delete(args.namespace, args.cfn_bucket)
delete(args.namespace, args.shareable_artifacts_bucket)
else:
list_namespaces(args.cfn_bucket)

Expand Down
8 changes: 6 additions & 2 deletions templates/glue-job-run-great-expectations-on-parquet.j2
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ Parameters:

ExpectationSuiteKey:
Type: String
Description: The s3 key prefix of the expectation suite.
Description: The S3 key of the GX expectation file.

GXConfigKey:
Type: String
Description: The S3 key of the GX configuration file/template.

MaxRetries:
Type: Number
Expand Down Expand Up @@ -106,7 +110,7 @@ Resources:
--parquet-bucket: !Ref ParquetBucket
--shareable-artifacts-bucket: !Ref ShareableArtifactsBucket
--expectation-suite-key: !Ref ExpectationSuiteKey
--gx-resources-key-prefix: !Sub "${Namespace}/great_expectation_resources/parquet/"
--gx-config-key: !Ref GXConfigKey
Description: !Sub "${JobDescription} for data type {{ dataset['type'] }}"
GlueVersion: !Ref GlueVersion
MaxRetries: !Ref MaxRetries
Expand Down
Loading

0 comments on commit 83b8397

Please sign in to comment.