From 7f9d4e548e2d3ad1ee1febce9afbe6dfbfdfe75e Mon Sep 17 00:00:00 2001 From: rdsharma26 <65777064+rdsharma26@users.noreply.github.com> Date: Tue, 17 Dec 2024 13:44:41 -0500 Subject: [PATCH] Fix row level bug when composing outcome (#594) * Fix row level bug when composing outcome - When a check fails due to a precondition failure, the row level results are not evaluated correctly. - For example, let's say a check has a completeness constraint which passes, and a minimum constraint which fails due to a precondition failure. - The row level results will be the results for just the completeness constraint. There will be no results generated for the minimum constraint, and therefore the row level results will be incorrect. - We fix this by adding a default outcome for when the row level result column is not provided by the analyzer. * Added similar logic to RowLevelConstraint as well Skipped RowLevelGroupedConstraint because only UniqueValueRatio/Uniqueness use it, and they don't use preconditions. --- .../com/amazon/deequ/VerificationResult.scala | 6 +- .../amazon/deequ/VerificationSuiteTest.scala | 56 +++++++++++++++++++ 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/amazon/deequ/VerificationResult.scala b/src/main/scala/com/amazon/deequ/VerificationResult.scala index b9b450f2..a6b4b37c 100644 --- a/src/main/scala/com/amazon/deequ/VerificationResult.scala +++ b/src/main/scala/com/amazon/deequ/VerificationResult.scala @@ -31,6 +31,7 @@ import com.amazon.deequ.repository.SimpleResultSerde import org.apache.spark.sql.Column import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.lit import org.apache.spark.sql.functions.{col, monotonically_increasing_id} import java.util.UUID @@ -144,9 +145,9 @@ object VerificationResult { val constraint = constraintResult.constraint constraint match { case asserted: RowLevelAssertedConstraint => - constraintResult.metric.flatMap(metricToColumn).map(asserted.assertion(_)) + constraintResult.metric.flatMap(metricToColumn).map(asserted.assertion(_)).orElse(Some(lit(false))) case _: RowLevelConstraint => - constraintResult.metric.flatMap(metricToColumn) + constraintResult.metric.flatMap(metricToColumn).orElse(Some(lit(false))) case _: RowLevelGroupedConstraint => constraintResult.metric.flatMap(metricToColumn) case _ => None @@ -160,7 +161,6 @@ object VerificationResult { } } - private[this] def getSimplifiedCheckResultOutput( verificationResult: VerificationResult) : Seq[SimpleCheckResultOutput] = { diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index 146579e8..49ab00aa 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -1996,6 +1996,62 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec } "Verification Suite's Row Level Results" should { + "yield correct results for invalid column type" in withSparkSession { sparkSession => + import sparkSession.implicits._ + val df = Seq( + ("1", 1, "blue"), + ("2", 2, "green"), + ("3", 3, "blue"), + ("4", 4, "red"), + ("5", 5, "purple") + ).toDF("id", "id2", "color") + + val idColumn = "id" + val id2Column = "id2" + + val minCheckOnInvalidColumnDescription = s"min check on $idColumn" + val minCheckOnValidColumnDescription = s"min check on $id2Column" + val patternMatchCheckOnInvalidColumnDescription = s"pattern check on $id2Column" + val patternMatchCheckOnValidColumnDescription = s"pattern check on $idColumn" + + val minCheckOnInvalidColumn = Check(CheckLevel.Error, minCheckOnInvalidColumnDescription) + .hasMin(idColumn, _ >= 3) + .isComplete(idColumn) + val minCheckOnValidColumn = Check(CheckLevel.Error, minCheckOnValidColumnDescription) + .hasMin(id2Column, _ >= 3) + .isComplete(id2Column) + + val patternMatchCheckOnInvalidColumn = Check(CheckLevel.Error, patternMatchCheckOnInvalidColumnDescription) + .hasPattern(id2Column, "[0-3]+".r) + val patternMatchCheckOnValidColumn = Check(CheckLevel.Error, patternMatchCheckOnValidColumnDescription) + .hasPattern(idColumn, "[0-3]+".r) + + val checks = Seq( + minCheckOnInvalidColumn, + minCheckOnValidColumn, + patternMatchCheckOnInvalidColumn, + patternMatchCheckOnValidColumn + ) + + val verificationResult = VerificationSuite().onData(df).addChecks(checks).run() + val rowLevelResultsDF = VerificationResult.rowLevelResultsAsDataFrame(sparkSession, verificationResult, df) + val rowLevelResults = rowLevelResultsDF.collect() + + val minCheckOnInvalidColumnRowLevelResults = + rowLevelResults.map(_.getAs[Boolean](minCheckOnInvalidColumnDescription)) + val minCheckOnValidColumnRowLevelResults = + rowLevelResults.map(_.getAs[Boolean](minCheckOnValidColumnDescription)) + val patternMatchCheckOnInvalidColumnRowLevelResults = + rowLevelResults.map(_.getAs[Boolean](patternMatchCheckOnInvalidColumnDescription)) + val patternMatchCheckOnValidColumnRowLevelResults = + rowLevelResults.map(_.getAs[Boolean](patternMatchCheckOnValidColumnDescription)) + + minCheckOnInvalidColumnRowLevelResults shouldBe Seq(false, false, false, false, false) + minCheckOnValidColumnRowLevelResults shouldBe Seq(false, false, true, true, true) + patternMatchCheckOnInvalidColumnRowLevelResults shouldBe Seq(false, false, false, false, false) + patternMatchCheckOnValidColumnRowLevelResults shouldBe Seq(true, true, true, false, false) + } + "yield correct results for satisfies check" in withSparkSession { sparkSession => import sparkSession.implicits._ val df = Seq(