Skip to content

Commit

Permalink
add integration tests, remove null rows code, add dep for urllib3<2
Browse files Browse the repository at this point in the history
  • Loading branch information
rxu17 committed Sep 6, 2024
1 parent 2dae40e commit 6a07092
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ parameters:
S3ScriptBucket: {{ stack_group_config.template_bucket_name }}
S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py'
GlueVersion: "{{ stack_group_config.json_to_parquet_glue_version }}"
AdditionalPythonModules: great_expectations~=0.18
AdditionalPythonModules: "great_expectations~=0.18,urllib3<2"
stack_tags:
{{ stack_group_config.default_stack_tags }}
sceptre_user_data:
Expand Down
20 changes: 4 additions & 16 deletions src/glue/jobs/run_great_expectations_on_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,6 @@ def add_validation_stores(
},
},
)

# Add evaluation parameter store
context.add_store(
"evaluation_parameter_store",
{
"class_name": "EvaluationParameterStore",
},
)
return context


Expand Down Expand Up @@ -283,6 +275,9 @@ def add_expectations_from_json(
Raises:
ValueError: thrown when no expectations exist for this data type
Returns:
EphemeralDataContext: context object with expectations added
"""
# Ensure the dataset exists in the JSON file
if data_type not in expectations_data:
Expand All @@ -306,7 +301,7 @@ def add_expectations_from_json(
expectation_suite_name=expectation_suite_name,
expectations=new_expectations_configs,
)
return "EphemeralDataContext"
return context


def add_validation_results_to_store(
Expand Down Expand Up @@ -369,13 +364,6 @@ def main():
namespace=args["namespace"],
data_type=args["data_type"],
)
logger.info("isNull")
null_rows = spark_df.ParticipantIdentifier.isNull()
logger.info("filter")
filtered_results = spark_df.filter(null_rows)
logger.info("collect")
result = filtered_results.collect()
logger.info("null_rows: %s", result)
logger.info("get_batch_request")
batch_request = get_batch_request(spark_df, args["data_type"], run_id)
logger.info("add_expectations")
Expand Down
2 changes: 1 addition & 1 deletion templates/glue-workflow.j2
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ Resources:
"--parquet-bucket": !Ref ParquetBucketName
"--shareable-artifacts-bucket": !Ref ShareableArtifactsBucketName
"--expectation-suite-key-prefix": !Sub "${Namespace}/src/glue/resources/data_values_expectations.json"
"--additional-python-modules": "great_expectations~=0.18"
"--additional-python-modules": "great_expectations~=0.18,urllib3<2"
{% endfor %}
Description: This trigger runs the great expectation parquet jobs after completion of all JSON to Parquet jobs
Type: CONDITIONAL
Expand Down
165 changes: 142 additions & 23 deletions tests/test_run_great_expectations_on_parquet.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
from unittest.mock import MagicMock, patch

import great_expectations
import great_expectations as gx
import pytest
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.core.run_identifier import RunIdentifier
from great_expectations.core.yaml_handler import YAMLHandler
from great_expectations.data_context.types.resource_identifiers import (
ExpectationSuiteIdentifier,
ValidationResultIdentifier,
)
from pyspark.sql import SparkSession

from src.glue.jobs import run_great_expectations_on_parquet as run_gx_on_pq


@pytest.fixture
def test_context(scope="function"):
context = gx.get_context()
yield context


@pytest.fixture(scope="function")
def test_spark():
yield SparkSession.builder.appName("BatchRequestTest").getOrCreate()


def test_create_context():
with (
patch.object(great_expectations, "get_context") as mock_get_context,
patch.object(gx, "get_context") as mock_get_context,
patch.object(run_gx_on_pq, "add_datasource") as mock_add_datasource,
patch.object(
run_gx_on_pq, "add_validation_stores"
Expand Down Expand Up @@ -44,18 +57,36 @@ def test_create_context():
)


def test_add_datasource():
def test_that_add_datasource_calls_correctly():
mock_context = MagicMock()
yaml_handler = YAMLHandler()

result_context = run_gx_on_pq.add_datasource(mock_context)

# Verify that the datasource was added
mock_context.add_datasource.assert_called_once()
assert result_context == mock_context


def test_add_validation_stores():
@pytest.mark.integration
def test_that_add_datasource_adds_correctly(test_context):
# Assuming you've already added a datasource, you can list it
run_gx_on_pq.add_datasource(test_context)
datasources = test_context.list_datasources()

# Define the expected datasource name
expected_datasource_name = "spark_datasource"

# Check that the expected datasource is present and other details are correct
assert any(
ds["name"] == expected_datasource_name for ds in datasources
), f"Datasource '{expected_datasource_name}' was not added correctly."
datasource = next(
ds for ds in datasources if ds["name"] == expected_datasource_name
)
assert datasource["class_name"] == "Datasource"
assert "SparkDFExecutionEngine" in datasource["execution_engine"]["class_name"]


def test_add_validation_stores_has_expected_calls():
mock_context = MagicMock()
s3_bucket = "test-bucket"
namespace = "test-namespace"
Expand All @@ -68,7 +99,7 @@ def test_add_validation_stores():
)

# Verify that the validation store is added
mock_add_store.assert_any_call(
mock_add_store.assert_called_once_with(
"validation_result_store",
{
"class_name": "ValidationsStore",
Expand All @@ -80,18 +111,36 @@ def test_add_validation_stores():
},
)

# Verify that the evaluation parameter store is added
mock_add_store.assert_any_call(
"evaluation_parameter_store",
{
"class_name": "EvaluationParameterStore",
},
)

assert result_context == mock_context


def test_get_spark_df():
@pytest.mark.integration
def test_validation_store_details(test_context):
# Mock context and stores
run_gx_on_pq.add_validation_stores(
test_context,
s3_bucket="test-bucket",
namespace="test",
key_prefix="test_folder/",
)

# Run the test logic
stores = test_context.list_stores()
expected_store_name = "validation_result_store"

assert any(store["name"] == expected_store_name for store in stores)
# pulls the store we want
store_config = [store for store in stores if store["name"] == expected_store_name][
0
]

assert store_config["class_name"] == "ValidationsStore"
assert store_config["store_backend"]["class_name"] == "TupleS3StoreBackend"
assert store_config["store_backend"]["bucket"] == "test-bucket"
assert store_config["store_backend"]["prefix"] == "test/test_folder/"


def test_get_spark_df_has_expected_calls():
glue_context = MagicMock()
mock_dynamic_frame = MagicMock()
mock_spark_df = MagicMock()
Expand All @@ -111,9 +160,7 @@ def test_get_spark_df():
)

# Verify the S3 path and the creation of the DynamicFrame
expected_path = (
f"s3://{parquet_bucket}/{namespace}/parquet/dataset_{data_type}/"
)
expected_path = f"s3://test-bucket/test-namespace/parquet/dataset_test-data/"
mock_create_dynamic_frame.assert_called_once_with(
connection_type="s3",
connection_options={"paths": [expected_path]},
Expand All @@ -140,7 +187,33 @@ def test_get_batch_request():
assert batch_request.runtime_parameters == {"batch_data": spark_dataset}


def test_read_json():
@pytest.mark.integration
def test_that_get_batch_request_details_are_correct(test_spark):
# Create a simple PySpark DataFrame to simulate the dataset
data = [("Alice", 34), ("Bob", 45), ("Charlie", 29)]
columns = ["name", "age"]
spark_dataset = test_spark.createDataFrame(data, columns)

# Create a RunIdentifier
run_id = RunIdentifier(run_name="test_run_2023")

# Call the function and get the RuntimeBatchRequest
data_type = "user_data"
batch_request = run_gx_on_pq.get_batch_request(spark_dataset, data_type, run_id)

# Assertions to check that the batch request is properly populated
assert isinstance(batch_request, RuntimeBatchRequest)
assert batch_request.datasource_name == "spark_datasource"
assert batch_request.data_connector_name == "runtime_data_connector"
assert batch_request.data_asset_name == "user_data-parquet-data-asset"
assert (
batch_request.batch_identifiers["batch_identifier"]
== "user_data_test_run_2023_batch"
)
assert batch_request.runtime_parameters["batch_data"] == spark_dataset


def test_read_json_correctly_returns_expected_values():
s3_bucket = "test-bucket"
key_prefix = "test-prefix"

Expand Down Expand Up @@ -169,7 +242,7 @@ def test_read_json():
assert result == {"test_key": "test_value"}


def test_add_expectations_from_json():
def test_that_add_expectations_from_json_has_expected_call():
mock_context = MagicMock()

# Sample expectations data
Expand All @@ -194,7 +267,54 @@ def test_add_expectations_from_json():
mock_context.add_or_update_expectation_suite.assert_called_once()


def test_add_validation_results_to_store():
@pytest.mark.integration
def test_add_expectations_from_json_adds_details_correctly(test_context):
# Mock expectations data
expectations_data = {
"user_data": {
"expectation_suite_name": "user_data_suite",
"expectations": [
{
"expectation_type": "expect_column_to_exist",
"kwargs": {"column": "user_id"},
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {"column": "age", "min_value": 18, "max_value": 65},
},
],
}
}

data_type = "user_data"

# Call the function to add expectations
test_context = run_gx_on_pq.add_expectations_from_json(
expectations_data, test_context, data_type
)

# Retrieve the expectation suite to verify that expectations were added
expectation_suite = test_context.get_expectation_suite("user_data_suite")

assert expectation_suite.expectation_suite_name == "user_data_suite"
assert len(expectation_suite.expectations) == 2

# Verify the details of the first expectation
first_expectation = expectation_suite.expectations[0]
assert first_expectation.expectation_type == "expect_column_to_exist"
assert first_expectation.kwargs == {"column": "user_id"}

# Verify the details of the second expectation
second_expectation = expectation_suite.expectations[1]
assert second_expectation.expectation_type == "expect_column_values_to_be_between"
assert second_expectation.kwargs == {
"column": "age",
"min_value": 18,
"max_value": 65,
}


def test_that_add_validation_results_to_store_has_expected_calls():
# Mock the EphemeralDataContext and the necessary components
mock_context = MagicMock()
mock_expectation_suite = MagicMock()
Expand All @@ -220,7 +340,6 @@ def test_add_validation_results_to_store():
# Assert that the expectation suite was retrieved correctly
mock_context.get_expectation_suite.assert_called_once_with("test_suite")

# Create expected ExpectationSuiteIdentifier and ValidationResultIdentifier
expected_expectation_suite_identifier = ExpectationSuiteIdentifier(
expectation_suite_name="test_suite"
)
Expand Down

0 comments on commit 6a07092

Please sign in to comment.