Skip to content

Commit

Permalink
Are unique check (#599)
Browse files Browse the repository at this point in the history
* Add areUnique check for uniqueness of multiple columns
  • Loading branch information
eycho-am authored Feb 5, 2025
1 parent 82537bd commit ac3ae50
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 16 deletions.
14 changes: 14 additions & 0 deletions src/main/scala/com/amazon/deequ/checks/Check.scala
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,20 @@ case class Check(
uniquenessConstraint(Seq(column), Check.IsOne, filter, hint, analyzerOptions) }
}

/**
* Creates a constraint that asserts on Uniqueness in a combined set of columns.
*
* @param columns Columns to run the assertion on
* @param hint A hint to provide additional context why a constraint could have failed
* @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow)
* @return
*/
def areUnique(columns: Seq[String], hint: Option[String] = None,
analyzerOptions: Option[AnalyzerOptions] = None): CheckWithLastConstraintFilterable = {
addFilterableConstraint { filter =>
uniquenessConstraint(columns, Check.IsOne, filter, hint, analyzerOptions) }
}

/**
* Creates a constraint that asserts on a column(s) primary key characteristics.
* Currently only checks uniqueness, but reserved for primary key checks if there is another
Expand Down
54 changes: 38 additions & 16 deletions src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -237,44 +237,55 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
val isComplete = new Check(CheckLevel.Error, "rule1").isComplete("att1")
val completeness = new Check(CheckLevel.Error, "rule2").hasCompleteness("att2", _ > 0.7)
val isPrimaryKey = new Check(CheckLevel.Error, "rule3").isPrimaryKey("item")
val minLength = new Check(CheckLevel.Error, "rule3")
val minLength = new Check(CheckLevel.Error, "rule4")
.hasMinLength("item", _ >= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail)))
val maxLength = new Check(CheckLevel.Error, "rule4")
val maxLength = new Check(CheckLevel.Error, "rule5")
.hasMaxLength("item", _ <= 1, analyzerOptions = Option(AnalyzerOptions(NullBehavior.Fail)))
val patternMatch = new Check(CheckLevel.Error, "rule6").hasPattern("att2", "[a-z]".r)
val min = new Check(CheckLevel.Error, "rule7").hasMin("val1", _ > 1)
val max = new Check(CheckLevel.Error, "rule8").hasMax("val1", _ <= 3)
val compliance = new Check(CheckLevel.Error, "rule9")
.satisfies("item < 1000", "rule9", columns = List("item"))
val areUniqueTrue = new Check(CheckLevel.Error, "rule10")
.areUnique(Seq("item", "att1")) // att1 is not unique but is unique with item
val areUniqueFalse = new Check(CheckLevel.Error, "rule11")
.areUnique(Seq("att1", "att2")) // non unique for rows 1,4,6

val expectedColumn1 = isComplete.description
val expectedColumn2 = completeness.description
val expectedColumn3 = minLength.description
val expectedColumn4 = maxLength.description
val expectedColumn5 = patternMatch.description
val expectedColumn6 = min.description
val expectedColumn7 = max.description
val expectedColumn8 = compliance.description
val expectedColumn3 = isPrimaryKey.description
val expectedColumn4 = minLength.description
val expectedColumn5 = maxLength.description
val expectedColumn6 = patternMatch.description
val expectedColumn7 = min.description
val expectedColumn8 = max.description
val expectedColumn9 = compliance.description
val expectedColumn10 = areUniqueTrue.description
val expectedColumn11 = areUniqueFalse.description

val suite = new VerificationSuite().onData(data)
.addCheck(isComplete)
.addCheck(completeness)
.addCheck(isPrimaryKey)
.addCheck(minLength)
.addCheck(maxLength)
.addCheck(patternMatch)
.addCheck(min)
.addCheck(max)
.addCheck(compliance)
.addCheck(areUniqueTrue)
.addCheck(areUniqueFalse)

val result: VerificationResult = suite.run()

assert(result.status == CheckStatus.Error)

val resultData = VerificationResult.rowLevelResultsAsDataFrame(session, result, data)
val resultData = VerificationResult.rowLevelResultsAsDataFrame(session, result, data).orderBy("item")

resultData.show()
val expectedColumns: Set[String] =
data.columns.toSet + expectedColumn1 + expectedColumn2 + expectedColumn3 + expectedColumn4 +
expectedColumn5 + expectedColumn6 + expectedColumn7 + expectedColumn8
data.columns.toSet + expectedColumn1 + expectedColumn2 + expectedColumn3 + expectedColumn4 + expectedColumn5 +
expectedColumn6 + expectedColumn7 + expectedColumn8 + expectedColumn9 + expectedColumn10 + expectedColumn11
assert(resultData.columns.toSet == expectedColumns)


Expand All @@ -288,19 +299,30 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
assert(Seq(true, true, true, true, true, true).sameElements(rowLevel3))

val rowLevel4 = resultData.select(expectedColumn4).collect().map(r => r.getBoolean(0))
assert(Seq(true, false, false, false, false, false).sameElements(rowLevel4))
assert(Seq(true, true, true, true, true, true).sameElements(rowLevel4))

val rowLevel5 = resultData.select(expectedColumn5).collect().map(r => r.getAs[Boolean](0))
assert(Seq(true, true, false, true, false, true).sameElements(rowLevel5))
val rowLevel5 = resultData.select(expectedColumn5).collect().map(r => r.getBoolean(0))
assert(Seq(true, false, false, false, false, false).sameElements(rowLevel5))

val rowLevel6 = resultData.select(expectedColumn6).collect().map(r => r.getAs[Boolean](0))
assert(Seq(false, true, true, true, true, true).sameElements(rowLevel6))
assert(Seq(true, true, false, true, false, true).sameElements(rowLevel6))

val rowLevel7 = resultData.select(expectedColumn7).collect().map(r => r.getAs[Boolean](0))
assert(Seq(true, true, true, false, false, false).sameElements(rowLevel7))
assert(Seq(false, true, true, true, true, true).sameElements(rowLevel7))

val rowLevel8 = resultData.select(expectedColumn8).collect().map(r => r.getAs[Boolean](0))
assert(Seq(true, true, true, false, false, false).sameElements(rowLevel8))

val rowLevel9 = resultData.select(expectedColumn9).collect().map(r => r.getAs[Boolean](0))
assert(Seq(true, true, true, false, false, false).sameElements(rowLevel9))

// Multiple Uniqueness for item and att1 - att1 is not unique but is unique with item
val rowLevel10 = resultData.select(expectedColumn10).collect().map(r => r.getAs[Boolean](0))
assert(Seq(true, true, true, true, true, true).sameElements(rowLevel10))

// Multiple Uniqueness for att1 and att2 - non unique for rows 1,4,6
val rowLevel11 = resultData.select(expectedColumn11).collect().map(r => r.getAs[Boolean](0))
assert(Seq(false, true, true, false, true, false).sameElements(rowLevel11))
}

"generate a result that contains row-level results with true for filtered rows" in withSparkSession { session =>
Expand Down
4 changes: 4 additions & 0 deletions src/test/scala/com/amazon/deequ/checks/CheckTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
.isUnique("halfUniqueCombinedWithNonUnique").where("nonUnique > 0")
.isUnique("nonUnique")
.isUnique("nonUniqueWithNulls")
.areUnique(Seq("nonUnique", "onlyUniqueWithOtherNonUnique"))
.areUnique(Seq("nonUnique", "halfUniqueCombinedWithNonUnique"))

val context = runChecks(getDfWithUniqueColumns(sparkSession), check)
val result = check.evaluate(context)
Expand All @@ -187,6 +189,8 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
assert(constraintStatuses(2) == ConstraintStatus.Success)
assert(constraintStatuses(3) == ConstraintStatus.Failure)
assert(constraintStatuses(4) == ConstraintStatus.Failure)
assert(constraintStatuses(5) == ConstraintStatus.Success)
assert(constraintStatuses(6) == ConstraintStatus.Failure)
}

"return the correct check status for primary key" in withSparkSession { sparkSession =>
Expand Down

0 comments on commit ac3ae50

Please sign in to comment.