Skip to content

Commit

Permalink
Switch to Ember client (close #)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Jan 18, 2024
1 parent 896761e commit 3ee04b3
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package com.snowplowanalytics.snowplow.enrich.common.fs2
import java.util.concurrent.TimeoutException

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._

import cats.Show
import cats.data.EitherT
Expand Down Expand Up @@ -44,6 +44,7 @@ import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.RemoteAdapter
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.ApiRequestConf
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution}

Expand Down Expand Up @@ -139,35 +140,41 @@ object Environment {
final case class Enrichments[F[_]: Async](
registry: EnrichmentRegistry[F],
configs: List[EnrichmentConf],
httpClient: HttpClient[F]
httpApiEnrichment: HttpClient[F]
) {

/** Initialize same enrichments, specified by configs (in case DB files updated) */
def reinitialize(blockingEC: ExecutionContext, shifter: ShiftExecution[F]): F[Enrichments[F]] =
Enrichments.buildRegistry(configs, blockingEC, shifter, httpClient).map(registry => Enrichments(registry, configs, httpClient))
Enrichments
.buildRegistry(configs, blockingEC, shifter, httpApiEnrichment)
.map(registry => Enrichments(registry, configs, httpApiEnrichment))
}

object Enrichments {
def make[F[_]: Async](
configs: List[EnrichmentConf],
blockingEC: ExecutionContext,
shifter: ShiftExecution[F],
httpClient: HttpClient[F]
shifter: ShiftExecution[F]
): Resource[F, Ref[F, Enrichments[F]]] =
Resource.eval {
for {
registry <- buildRegistry[F](configs, blockingEC, shifter, httpClient)
ref <- Ref.of(Enrichments[F](registry, configs, httpClient))
} yield ref
}
for {
// We don't want the HTTP client of API enrichment to be reinitialized each time the assets are refreshed
httpClient <- configs.collectFirst { case api: ApiRequestConf => api.api.timeout } match {
case Some(timeout) =>
Clients.mkHttp(readTimeout = timeout.millis).map(HttpClient.fromHttp4sClient[F])
case None =>
Resource.pure[F, HttpClient[F]](HttpClient.noop[F])
}
registry <- Resource.eval(buildRegistry[F](configs, blockingEC, shifter, httpClient))
ref <- Resource.eval(Ref.of(Enrichments[F](registry, configs, httpClient)))
} yield ref

def buildRegistry[F[_]: Async](
configs: List[EnrichmentConf],
blockingEC: ExecutionContext,
shifter: ShiftExecution[F],
httpClient: HttpClient[F]
httpApiEnrichment: HttpClient[F]
) =
EnrichmentRegistry.build[F](configs, shifter, httpClient, blockingEC).value.flatMap {
EnrichmentRegistry.build[F](configs, shifter, httpApiEnrichment, blockingEC).value.flatMap {
case Right(reg) => Async[F].pure(reg)
case Left(error) => Async[F].raiseError[EnrichmentRegistry[F]](new RuntimeException(error))
}
Expand Down Expand Up @@ -198,7 +205,6 @@ object Environment {
bad <- sinkBad
pii <- sinkPii.sequence
http4s <- Clients.mkHttp()
http = HttpClient.fromHttp4sClient(http4s)
clts <- clients.map(Clients.init(http4s, _))
igluClient <- IgluCirceClient.parseDefault[F](parsedConfigs.igluJson).resource
remoteAdaptersEnabled = file.remoteAdapters.configs.nonEmpty
Expand All @@ -210,7 +216,7 @@ object Environment {
sem <- Resource.eval(Semaphore(1L))
assetsState <- Resource.eval(Assets.State.make[F](sem, clts, assets))
shifter <- ShiftExecution.ofSingleThread[F]
enrichments <- Enrichments.make[F](parsedConfigs.enrichmentConfigs, blockingEC, shifter, http)
enrichments <- Enrichments.make[F](parsedConfigs.enrichmentConfigs, blockingEC, shifter)
} yield Environment[F, A](
igluClient,
Http4sRegistryLookup(http4s),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ import fs2.Stream
import org.http4s.{Headers, Request, Uri}
import org.http4s.client.defaults
import org.http4s.client.{Client => Http4sClient}
import org.http4s.client.middleware.{Retry, RetryPolicy}
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.blaze.pipeline.Command
import org.http4s.client.middleware.Retry
import org.http4s.ember.client.EmberClientBuilder

import Clients._

Expand Down Expand Up @@ -67,29 +66,16 @@ object Clients {
def mkHttp[F[_]: Async](
connectionTimeout: FiniteDuration = defaults.ConnectTimeout,
readTimeout: FiniteDuration = defaults.RequestTimeout,
maxConnections: Int = 10 // http4s uses 10 by default
): Resource[F, Http4sClient[F]] =
BlazeClientBuilder[F]
.withConnectTimeout(connectionTimeout)
.withRequestTimeout(readTimeout)
.withMaxTotalConnections(maxConnections)
.resource
.map(Retry[F](retryPolicy, redactHeadersWhen))

private def retryPolicy[F[_]] =
RetryPolicy[F](
backoff,
retriable = {
//EOF error has to be retried explicitly for blaze client, see https://github.com/snowplow/enrich/issues/692
case (_, Left(Command.EOF)) => true
case _ => false
}
)

//retry once after 100 mills
private def backoff(attemptNumber: Int): Option[FiniteDuration] =
if (attemptNumber > 1) None
else Some(100.millis)
maxConnections: Int = 100 // default of Ember client
): Resource[F, Http4sClient[F]] = {
val builder = EmberClientBuilder
.default[F]
.withTimeout(readTimeout)
.withIdleConnectionTime(connectionTimeout)
.withMaxTotal(maxConnections)
val retryPolicy = builder.retryPolicy
builder.build.map(Retry[F](retryPolicy, redactHeadersWhen))
}

private def redactHeadersWhen(header: CIString) =
(Headers.SensitiveHeaders + CIString("apikey")).contains(header)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import cats.effect.unsafe.implicits.global

import cats.effect.testing.specs2.CatsEffect

import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution}
import com.snowplowanalytics.snowplow.enrich.common.utils.ShiftExecution

import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients

Expand Down Expand Up @@ -116,9 +116,7 @@ class AssetsSpec extends Specification with CatsEffect with ScalaCheck {
for {
shiftExecution <- ShiftExecution.ofSingleThread[IO]
sem <- Resource.eval(Semaphore[IO](1L))
http4s <- Clients.mkHttp[IO]()
http = HttpClient.fromHttp4sClient(http4s)
enrichments <- Environment.Enrichments.make[IO](List(), SpecHelpers.blockingEC, shiftExecution, http)
enrichments <- Environment.Enrichments.make[IO](List(), SpecHelpers.blockingEC, shiftExecution)
_ <- SpecHelpers.filesResource(TestFiles)
} yield (shiftExecution, sem, enrichments)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequ
import com.snowplowanalytics.snowplow.enrich.common.fs2.enrichments.ApiRequestEnrichmentSpec.unstructEvent
import com.snowplowanalytics.snowplow.enrich.common.fs2.EnrichSpec
import com.snowplowanalytics.snowplow.enrich.common.fs2.test._
import com.snowplowanalytics.snowplow.badrows.BadRow

class ApiRequestEnrichmentSpec extends Specification with CatsEffect {

Expand Down Expand Up @@ -84,9 +85,47 @@ class ApiRequestEnrichmentSpec extends Specification with CatsEffect {
testWithHttp.use { test =>
test.run().map {
case (bad, pii, good) =>
(bad must be empty)
(pii must be empty)
(good.map(_.derived_contexts) must contain(exactly(expected)))
bad must beEmpty
pii must beEmpty
good.map(_.derived_contexts) must contain(exactly(expected))
}
}
}

"generate bad rows if API server is not available" in {
val nbEvents = 1000
val input = Stream((1 to nbEvents).toList: _*)
.map { i =>
json"""{
"schema": "iglu:com.acme/test/jsonschema/1-0-1",
"data": {"path": {"id": $i}}
}"""
}
.map { ue =>
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ue_px", unstructEvent(ue)) :: EnrichSpec.collectorPayload.querystring
)
}
.map(_.toRaw)

val enrichment = ApiRequestConf(
SchemaKey("com.acme", "enrichment", "jsonschema", SchemaVer.Full(1, 0, 0)),
List(Input.Json("key1", "unstruct_event", SchemaCriterion("com.acme", "test", "jsonschema", 1), "$.path.id")),
HttpApi("GET", "http://foo/{{key1}}", 2000, Authentication(None)),
List(RegistryOutput("iglu:com.acme/output/jsonschema/1-0-0", Some(JsonOutput("$")))),
Cache(1, 1000),
ignoreOnError = false
)

TestEnvironment.make(input, List(enrichment)).use { test =>
test.run().map {
case (bad, pii, good) =>
good must beEmpty
pii must beEmpty
bad must haveSize(nbEvents)
bad
.collect { case ef: BadRow.EnrichmentFailures => ef.failure.messages.head }
.map(_.message.toString must contain("Connection refused"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLook

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event

import com.snowplowanalytics.iglu.core.SelfDescribingData

import com.snowplowanalytics.snowplow.badrows.BadRow

import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers.adaptersSchemas
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.RemoteAdapter
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution}
import com.snowplowanalytics.snowplow.enrich.common.utils.ShiftExecution

import com.snowplowanalytics.snowplow.enrich.common.fs2.{Assets, AttributedData, Enrich, EnrichSpec, Environment}
import com.snowplowanalytics.snowplow.enrich.common.fs2.Environment.{Enrichments, StreamsSettings}
Expand Down Expand Up @@ -112,7 +114,6 @@ object TestEnvironment extends CatsEffect {
def make(source: Stream[IO, Array[Byte]], enrichments: List[EnrichmentConf] = Nil): Resource[IO, TestEnvironment[Array[Byte]]] =
for {
http4s <- Clients.mkHttp[IO]()
http = HttpClient.fromHttp4sClient(http4s)
_ <- SpecHelpers.filesResource(enrichments.flatMap(_.filesToCache).map(p => Path(p._2)))
counter <- Resource.eval(Counter.make[IO])
metrics = Counter.mkCounterMetrics[IO](counter)
Expand All @@ -122,7 +123,7 @@ object TestEnvironment extends CatsEffect {
sem <- Resource.eval(Semaphore[IO](1L))
assetsState <- Resource.eval(Assets.State.make(sem, clients, enrichments.flatMap(_.filesToCache)))
shifter <- ShiftExecution.ofSingleThread[IO]
enrichmentsRef <- Enrichments.make[IO](enrichments, SpecHelpers.blockingEC, shifter, http)
enrichmentsRef <- Enrichments.make[IO](enrichments, SpecHelpers.blockingEC, shifter)
goodRef <- Resource.eval(Ref.of[IO, Vector[AttributedData[Array[Byte]]]](Vector.empty))
piiRef <- Resource.eval(Ref.of[IO, Vector[AttributedData[Array[Byte]]]](Vector.empty))
badRef <- Resource.eval(Ref.of[IO, Vector[Array[Byte]]](Vector.empty))
Expand Down Expand Up @@ -168,7 +169,11 @@ object TestEnvironment extends CatsEffect {
.parse(badRowStr)
.getOrElse(throw new RuntimeException(s"Error parsing bad row json: $badRowStr"))
parsed
.as[BadRow]
.getOrElse(throw new RuntimeException(s"Error decoding bad row: $parsed"))
.as[SelfDescribingData[BadRow]] match {
case Left(e) =>
throw new RuntimeException(s"Error decoding bad row $parsed", e)
case Right(sdj) =>
sdj.data
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ object EnrichmentRegistry {
def build[F[_]: Async](
confs: List[EnrichmentConf],
shifter: ShiftExecution[F],
httpClient: HttpClient[F],
httpApiEnrichment: HttpClient[F],
blockingEC: ExecutionContext
): EitherT[F, String, EnrichmentRegistry[F]] =
confs.foldLeft(EitherT.pure[F, String](EnrichmentRegistry[F]())) { (er, e) =>
e match {
case c: ApiRequestConf =>
for {
enrichment <- EitherT.right(c.enrichment[F](httpClient))
enrichment <- EitherT.right(c.enrichment[F](httpApiEnrichment))
registry <- er
} yield registry.copy(apiRequest = enrichment.some)
case c: PiiPseudonymizerConf => er.map(_.copy(piiPseudonymizer = c.enrichment.some))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,16 @@ object HttpClient {
.handleError(_.asLeft[String])
}
}

def noop[F[_]: Sync]: HttpClient[F] =
new HttpClient[F] {
override def getResponse(
uri: String,
authUser: Option[String],
authPassword: Option[String],
body: Option[String],
method: String
): F[Either[Throwable, String]] =
Sync[F].raiseError(new IllegalStateException("HTTP client not implemented"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import cats.effect.IO
import cats.effect.kernel.Resource
import cats.effect.unsafe.implicits.global

import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.ember.client.EmberClientBuilder

import cats.effect.testing.specs2.CatsEffect

Expand Down Expand Up @@ -91,7 +91,7 @@ object SpecHelpers extends CatsEffect {
val registryLookup = JavaNetRegistryLookup.ioLookupInstance[IO]

val blockingEC = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool)
private val http4sClient = BlazeClientBuilder[IO].resource
private val http4sClient = EmberClientBuilder.default[IO].build
val httpClient = http4sClient.map(HttpClient.fromHttp4sClient[IO])

private type NvPair = (String, String)
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ object Dependencies {
.exclude("software.amazon.glue", "schema-registry-serde")
val stsSdk2 = "software.amazon.awssdk" % "sts" % V.awsSdk2 % Runtime
val azureIdentity = "com.azure" % "azure-identity" % V.azureIdentity
val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.http4s
val http4sClient = "org.http4s" %% "http4s-ember-client" % V.http4s
val http4sCirce = "org.http4s" %% "http4s-circe" % V.http4s
val log4cats = "org.typelevel" %% "log4cats-slf4j" % V.log4cats
val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry
Expand Down

0 comments on commit 3ee04b3

Please sign in to comment.