Skip to content

Commit

Permalink
Address Ian's feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Feb 13, 2024
1 parent 387025d commit 5b40ead
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import java.util.Base64

import org.joda.time.DateTime

import cats.data.{NonEmptyList, ValidatedNel}
import cats.data.{Ior, NonEmptyList, ValidatedNel}
import cats.{Monad, Parallel}
import cats.implicits._

Expand Down Expand Up @@ -170,7 +170,7 @@ object Enrich {
case None =>
Sync[F].unit
}
} yield (List(Left((badRow, None))), collectorTstamp)
} yield (List(Ior.left(badRow)), collectorTstamp)

/** Build a `generic_error` bad row for unhandled runtime errors */
def genericBadRow(
Expand All @@ -197,9 +197,9 @@ object Enrich {
case (previous, item) =>
val (bad, enriched, incomplete) = previous
item match {
case Right(e) => (bad, e :: enriched, incomplete)
case Left((br, Some(i))) => (br :: bad, enriched, i :: incomplete)
case Left((br, _)) => (br :: bad, enriched, incomplete)
case Ior.Right(e) => (bad, e :: enriched, incomplete)
case Ior.Left(br) => (br :: bad, enriched, incomplete)
case Ior.Both(br, i) => (br :: bad, enriched, i :: incomplete)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/
package com.snowplowanalytics.snowplow.enrich.common

import cats.data.{EitherT, ValidatedNel}
import cats.data.{EitherT, Ior, ValidatedNel}

import com.snowplowanalytics.snowplow.badrows.BadRow

Expand All @@ -25,7 +25,7 @@ package object fs2 {
type ByteSink[F[_]] = List[Array[Byte]] => F[Unit]
type AttributedByteSink[F[_]] = List[AttributedData[Array[Byte]]] => F[Unit]

type Enriched = Either[(BadRow, Option[EnrichedEvent]), EnrichedEvent]
type Enriched = Ior[BadRow, EnrichedEvent]
type Result = (List[Enriched], Option[Long])

/** Function to transform an origin raw payload into good and/or bad rows */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
package com.snowplowanalytics.snowplow.enrich.common

import cats.Monad
import cats.data.{Validated, ValidatedNel}
import cats.data.{Ior, Validated, ValidatedNel}
import cats.effect.kernel.Sync
import cats.implicits._

Expand Down Expand Up @@ -61,7 +61,7 @@ object EtlPipeline {
invalidCount: F[Unit],
registryLookup: RegistryLookup[F],
atomicFields: AtomicFields
): F[List[(Either[(BadRow, Option[EnrichedEvent]), EnrichedEvent])]] =
): F[List[Ior[BadRow, EnrichedEvent]]] =
input match {
case Validated.Valid(Some(payload)) =>
adapterRegistry
Expand All @@ -83,10 +83,10 @@ object EtlPipeline {
)
}
case Validated.Invalid(badRow) =>
Monad[F].pure(List((Left((badRow, None)))))
Monad[F].pure(List(Ior.left(badRow)))
}
case Validated.Invalid(badRows) =>
Monad[F].pure(badRows.toList.map(br => (Left((br, None)))))
Monad[F].pure(badRows.toList.map(br => Ior.left(br)))
case Validated.Valid(None) =>
Monad[F].pure(Nil)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import java.time.Instant
import org.joda.time.DateTime
import io.circe.Json
import cats.{Applicative, Monad}
import cats.data.{EitherT, NonEmptyList, OptionT, StateT, Validated}
import cats.data.{EitherT, Ior, NonEmptyList, OptionT, StateT}
import cats.effect.kernel.Sync
import cats.implicits._

Expand Down Expand Up @@ -66,11 +66,14 @@ object EnrichmentManager {
registryLookup: RegistryLookup[F],
atomicFields: AtomicFields,
emitIncomplete: Boolean = true
): F[Either[(BadRow, Option[EnrichedEvent]), EnrichedEvent]] =
): F[Ior[BadRow, EnrichedEvent]] =
for {
validatedInput <- mapAndValidateInput(raw, etlTstamp, processor, client, registryLookup)
(schemaViolations, enrichedEvent, extractResult) = validatedInput
enriched <- if (schemaViolations.isEmpty || emitIncomplete)
enrichedEvent <- Sync[F].delay(new EnrichedEvent)
validatedInput <- mapAndValidateInput(raw, enrichedEvent, etlTstamp, processor, client, registryLookup)
(schemaViolations, extractResult) = validatedInput
enriched <- runEnrichingStep(
List(schemaViolations),
emitIncomplete,
enrich(
enrichedEvent,
registry,
Expand All @@ -81,48 +84,62 @@ object EnrichmentManager {
extractResult.unstructEvent,
featureFlags.legacyEnrichmentOrder,
registryLookup
)
else
Sync[F].pure((None, Nil))
),
(None, Nil)
)
(enrichFailures, enrichmentsContexts) = enriched
validationFailures <- if ((schemaViolations.isEmpty && enrichFailures.isEmpty) || emitIncomplete)
validateEnriched(enrichedEvent, raw, processor, featureFlags.acceptInvalid, invalidCount, atomicFields)
else
Sync[F].pure(None)
validationFailures <- runEnrichingStep(
List(schemaViolations, enrichFailures),
emitIncomplete,
validateEnriched(enrichedEvent, raw, processor, featureFlags.acceptInvalid, invalidCount, atomicFields),
None
)
badRows = List(schemaViolations, enrichFailures, validationFailures).flatten
output = badRows match {
case Nil =>
Right(enrichedEvent)
Ior.right(enrichedEvent)
case head :: _ =>
if (!emitIncomplete)
Left((head, None))
Ior.left(head)
else {
val failuresContext = createFailuresContext(badRows)
ME.formatContexts(failuresContext :: enrichmentsContexts ::: extractResult.validationInfoContexts)
.foreach(c => enrichedEvent.derived_contexts = c)
Left((head, Some(enrichedEvent)))
Ior.both(head, enrichedEvent)
}
}
} yield output

def runEnrichingStep[F[_]: Sync, A](
errors: List[Option[BadRow]],
emitIncomplete: Boolean,
step: F[A],
fallback: A
): F[A] =
if (errors.forall(_.isEmpty) || emitIncomplete)
step
else
Sync[F].pure(fallback)

// TODO: aggregate all the errors inside same SchemaViolations
def mapAndValidateInput[F[_]: Sync](
raw: RawEvent,
enrichedEvent: EnrichedEvent,
etlTstamp: DateTime,
processor: Processor,
client: IgluCirceClient[F],
registryLookup: RegistryLookup[F]
): F[(Option[BadRow], EnrichedEvent, IgluUtils.EventExtractResult)] =
): F[(Option[BadRow], IgluUtils.EventExtractResult)] =
for {
mapped <- Sync[F].delay(setupEnrichedEvent(raw, etlTstamp, processor))
(enrichmentFailures, enrichedEvent) = mapped
setupViolations <- Sync[F].delay(setupEnrichedEvent(raw, enrichedEvent, etlTstamp, processor))
validated <- IgluUtils.extractAndValidateInputJsons(enrichedEvent, client, raw, processor, registryLookup)
(schemaViolations, sdjs) = validated
maybeBadRow = aggregateBadRows(List(enrichmentFailures, schemaViolations))
} yield (maybeBadRow, enrichedEvent, sdjs)
maybeBadRow = aggregateBadRows(setupViolations, schemaViolations)
} yield (maybeBadRow, sdjs)

private def aggregateBadRows(badRows: List[Option[BadRow]]): Option[BadRow] =
badRows.flatten.headOption
// TODO: take FailureDetails as input
private def aggregateBadRows(schemaViolations: Option[BadRow], more: Option[BadRow]): Option[BadRow] =
List(schemaViolations, more).flatten.headOption

/**
* @return Valid enrichments contexts and bad row if one or several contexts are invalid
Expand Down Expand Up @@ -309,10 +326,10 @@ object EnrichmentManager {
// TODO create SchemaViolations instead of EnrichmentsFailures
private def setupEnrichedEvent(
raw: RawEvent,
e: EnrichedEvent,
etlTstamp: DateTime,
processor: Processor
): (Option[BadRow.EnrichmentFailures], EnrichedEvent) = {
val e = new EnrichedEvent()
): Option[BadRow.EnrichmentFailures] = {
e.event_id = EE.generateEventId() // May be updated later if we have an `eid` parameter
e.v_collector = raw.source.name // May be updated later if we have a `cv` parameter
e.v_etl = ME.etlVersion(processor)
Expand All @@ -329,17 +346,13 @@ object EnrichmentManager {
// Map/validate/transform input fields to enriched event fields
val transformed = Transform.transform(raw, e)

(collectorTstamp |+| transformed) match {
case Validated.Invalid(enrichmentFailures) =>
val badRow = EnrichmentManager.buildEnrichmentFailuresBadRow(
enrichmentFailures,
EnrichedEvent.toPartiallyEnrichedEvent(e),
RawEvent.toRawEvent(raw),
processor
)
(Some(badRow), e)
case _ =>
(None, e)
(collectorTstamp |+| transformed).swap.toOption.map { enrichmentFailures =>
EnrichmentManager.buildEnrichmentFailuresBadRow(
enrichmentFailures,
EnrichedEvent.toPartiallyEnrichedEvent(e),
RawEvent.toRawEvent(raw),
processor
)
}
}

Expand Down Expand Up @@ -836,10 +849,7 @@ object EnrichmentManager {
// We're using static field's length validation. See more in https://github.com/snowplow/enrich/issues/608
AtomicFieldsLengthValidator
.validate[F](enriched, raw, processor, acceptInvalid, invalidCount, atomicFields)
.map {
case Left(br) => Some(br)
case _ => None
}
.map(_.swap.toOption)

// TODO
private def createFailuresContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,13 @@ object IgluUtils {
case Left(err) => List(err)
case _ => Nil
}
maybeBadRow = (invalidContexts ::: invalidUnstruct) match {
case Nil => None
case head :: tail =>
buildSchemaViolationsBadRow(
NonEmptyList.of(head, tail: _*),
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
).some
maybeBadRow = (invalidContexts ::: invalidUnstruct).toNel.map { nel =>
buildSchemaViolationsBadRow(
nel,
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
)
}
validationInfo = (validContexts.flatMap(_.validationInfo) ::: unstructEvent.flatMap(_.validationInfo).toList).distinct.map(_.toSdj)
output = EventExtractResult(
Expand Down Expand Up @@ -168,24 +166,21 @@ object IgluUtils {
.map(_.separate)
.map {
case (errors, valid) =>
val maybeBadRow = errors match {
case Nil => None
case head :: tail =>
val enrichmentFailures = NonEmptyList.of(head, tail: _*).map {
case (schemaKey, clientError) =>
FailureDetails.EnrichmentFailure(
FailureDetails.EnrichmentInformation(schemaKey, "enrichments-contexts-validation").some,
FailureDetails.EnrichmentFailureMessage.IgluError(schemaKey, clientError)
)
}
EnrichmentManager
.buildEnrichmentFailuresBadRow(
enrichmentFailures,
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
val maybeBadRow = errors.toNel.map { nel =>
val enrichmentFailures = nel.map {
case (schemaKey, clientError) =>
FailureDetails.EnrichmentFailure(
FailureDetails.EnrichmentInformation(schemaKey, "enrichments-contexts-validation").some,
FailureDetails.EnrichmentFailureMessage.IgluError(schemaKey, clientError)
)
.some
}
EnrichmentManager
.buildEnrichmentFailuresBadRow(
enrichmentFailures,
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
)
}
(maybeBadRow, valid)
}
Expand All @@ -209,7 +204,7 @@ object IgluUtils {
.parse(json)
.leftMap(FailureDetails.SchemaViolation.NotIglu(json, _))
.toEitherT[F]
// Check thant the schema of SelfDescribingData[Json] is the expected one
// Check that the schema of SelfDescribingData[Json] is the expected one
_ <- if (validateCriterion(sdj, expectedCriterion))
EitherT.rightT[F, FailureDetails.SchemaViolation](sdj)
else
Expand Down Expand Up @@ -250,12 +245,10 @@ object IgluUtils {
registryLookup: RegistryLookup[F]
): F[List[Either[(SchemaKey, ClientError), SelfDescribingData[Json]]]] =
sdjs
.traverse(check(client, _, registryLookup).value)
.map(_.zip(sdjs))
.map(_.map {
case (Left(err), _) => Left(err)
case (_, sdj) => Right(sdj)
})
.traverse { sdj =>
check(client, sdj, registryLookup).value
.map(_.as(sdj))
}

/** Parse a Json as a SDJ and check that it's valid */
private def parseAndValidateSDJ_sv[F[_]: Monad: Clock]( // _sv for SchemaViolation
Expand Down

0 comments on commit 5b40ead

Please sign in to comment.