diff --git a/src/main/scala/com/amazon/deequ/checks/Check.scala b/src/main/scala/com/amazon/deequ/checks/Check.scala index 1e104892..36c57643 100644 --- a/src/main/scala/com/amazon/deequ/checks/Check.scala +++ b/src/main/scala/com/amazon/deequ/checks/Check.scala @@ -241,6 +241,20 @@ case class Check( uniquenessConstraint(Seq(column), Check.IsOne, filter, hint, analyzerOptions) } } + /** + * Creates a constraint that asserts on Uniqueness in a combined set of columns. + * + * @param columns Columns to run the assertion on + * @param hint A hint to provide additional context why a constraint could have failed + * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow) + * @return + */ + def areUnique(columns: Seq[String], hint: Option[String] = None, + analyzerOptions: Option[AnalyzerOptions] = None): CheckWithLastConstraintFilterable = { + addFilterableConstraint { filter => + uniquenessConstraint(columns, Check.IsOne, filter, hint, analyzerOptions) } + } + /** * Creates a constraint that asserts on a column(s) primary key characteristics. * Currently only checks uniqueness, but reserved for primary key checks if there is another diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index 49ab00aa..5e19a25f 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -237,44 +237,55 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val isComplete = new Check(CheckLevel.Error, "rule1").isComplete("att1") val completeness = new Check(CheckLevel.Error, "rule2").hasCompleteness("att2", _ > 0.7) val isPrimaryKey = new Check(CheckLevel.Error, "rule3").isPrimaryKey("item") - val minLength = new Check(CheckLevel.Error, "rule3") + val minLength = new Check(CheckLevel.Error, "rule4") .hasMinLength("item", _ >= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail))) - val maxLength = new Check(CheckLevel.Error, "rule4") + val maxLength = new Check(CheckLevel.Error, "rule5") .hasMaxLength("item", _ <= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail))) val patternMatch = new Check(CheckLevel.Error, "rule6").hasPattern("att2", "[a-z]".r) val min = new Check(CheckLevel.Error, "rule7").hasMin("val1", _ > 1) val max = new Check(CheckLevel.Error, "rule8").hasMax("val1", _ <= 3) val compliance = new Check(CheckLevel.Error, "rule9") .satisfies("item < 1000", "rule9", columns = List("item")) + val areUniqueTrue = new Check(CheckLevel.Error, "rule10") + .areUnique(Seq("item", "att1")) // att1 is not unique but is unique with item + val areUniqueFalse = new Check(CheckLevel.Error, "rule11") + .areUnique(Seq("att1", "att2")) // non unique for rows 1,4,6 + val expectedColumn1 = isComplete.description val expectedColumn2 = completeness.description - val expectedColumn3 = minLength.description - val expectedColumn4 = maxLength.description - val expectedColumn5 = patternMatch.description - val expectedColumn6 = min.description - val expectedColumn7 = max.description - val expectedColumn8 = compliance.description + val expectedColumn3 = isPrimaryKey.description + val expectedColumn4 = minLength.description + val expectedColumn5 = maxLength.description + val expectedColumn6 = patternMatch.description + val expectedColumn7 = min.description + val expectedColumn8 = max.description + val expectedColumn9 = compliance.description + val expectedColumn10 = areUniqueTrue.description + val expectedColumn11 = areUniqueFalse.description val suite = new VerificationSuite().onData(data) .addCheck(isComplete) .addCheck(completeness) + .addCheck(isPrimaryKey) .addCheck(minLength) .addCheck(maxLength) .addCheck(patternMatch) .addCheck(min) .addCheck(max) .addCheck(compliance) + .addCheck(areUniqueTrue) + .addCheck(areUniqueFalse) val result: VerificationResult = suite.run() assert(result.status == CheckStatus.Error) - val resultData = VerificationResult.rowLevelResultsAsDataFrame(session, result, data) + val resultData = VerificationResult.rowLevelResultsAsDataFrame(session, result, data).orderBy("item") resultData.show() val expectedColumns: Set[String] = - data.columns.toSet + expectedColumn1 + expectedColumn2 + expectedColumn3 + expectedColumn4 + - expectedColumn5 + expectedColumn6 + expectedColumn7 + expectedColumn8 + data.columns.toSet + expectedColumn1 + expectedColumn2 + expectedColumn3 + expectedColumn4 + expectedColumn5 + + expectedColumn6 + expectedColumn7 + expectedColumn8 + expectedColumn9 + expectedColumn10 + expectedColumn11 assert(resultData.columns.toSet == expectedColumns) @@ -288,19 +299,30 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec assert(Seq(true, true, true, true, true, true).sameElements(rowLevel3)) val rowLevel4 = resultData.select(expectedColumn4).collect().map(r => r.getBoolean(0)) - assert(Seq(true, false, false, false, false, false).sameElements(rowLevel4)) + assert(Seq(true, true, true, true, true, true).sameElements(rowLevel4)) - val rowLevel5 = resultData.select(expectedColumn5).collect().map(r => r.getAs[Boolean](0)) - assert(Seq(true, true, false, true, false, true).sameElements(rowLevel5)) + val rowLevel5 = resultData.select(expectedColumn5).collect().map(r => r.getBoolean(0)) + assert(Seq(true, false, false, false, false, false).sameElements(rowLevel5)) val rowLevel6 = resultData.select(expectedColumn6).collect().map(r => r.getAs[Boolean](0)) - assert(Seq(false, true, true, true, true, true).sameElements(rowLevel6)) + assert(Seq(true, true, false, true, false, true).sameElements(rowLevel6)) val rowLevel7 = resultData.select(expectedColumn7).collect().map(r => r.getAs[Boolean](0)) - assert(Seq(true, true, true, false, false, false).sameElements(rowLevel7)) + assert(Seq(false, true, true, true, true, true).sameElements(rowLevel7)) val rowLevel8 = resultData.select(expectedColumn8).collect().map(r => r.getAs[Boolean](0)) assert(Seq(true, true, true, false, false, false).sameElements(rowLevel8)) + + val rowLevel9 = resultData.select(expectedColumn9).collect().map(r => r.getAs[Boolean](0)) + assert(Seq(true, true, true, false, false, false).sameElements(rowLevel9)) + + // Multiple Uniqueness for item and att1 - att1 is not unique but is unique with item + val rowLevel10 = resultData.select(expectedColumn10).collect().map(r => r.getAs[Boolean](0)) + assert(Seq(true, true, true, true, true, true).sameElements(rowLevel10)) + + // Multiple Uniqueness for att1 and att2 - non unique for rows 1,4,6 + val rowLevel11 = resultData.select(expectedColumn11).collect().map(r => r.getAs[Boolean](0)) + assert(Seq(false, true, true, false, true, false).sameElements(rowLevel11)) } "generate a result that contains row-level results with true for filtered rows" in withSparkSession { session => diff --git a/src/test/scala/com/amazon/deequ/checks/CheckTest.scala b/src/test/scala/com/amazon/deequ/checks/CheckTest.scala index eab056f3..1988537b 100644 --- a/src/test/scala/com/amazon/deequ/checks/CheckTest.scala +++ b/src/test/scala/com/amazon/deequ/checks/CheckTest.scala @@ -176,6 +176,8 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix .isUnique("halfUniqueCombinedWithNonUnique").where("nonUnique > 0") .isUnique("nonUnique") .isUnique("nonUniqueWithNulls") + .areUnique(Seq("nonUnique", "onlyUniqueWithOtherNonUnique")) + .areUnique(Seq("nonUnique", "halfUniqueCombinedWithNonUnique")) val context = runChecks(getDfWithUniqueColumns(sparkSession), check) val result = check.evaluate(context) @@ -187,6 +189,8 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix assert(constraintStatuses(2) == ConstraintStatus.Success) assert(constraintStatuses(3) == ConstraintStatus.Failure) assert(constraintStatuses(4) == ConstraintStatus.Failure) + assert(constraintStatuses(5) == ConstraintStatus.Success) + assert(constraintStatuses(6) == ConstraintStatus.Failure) } "return the correct check status for primary key" in withSparkSession { sparkSession =>