Skip to content

Commit 6c8dfd6

Browse files
Merge pull request #433 from evolution-gaming/df/java-client-metrics-of
Expose Java client metrics via factory from ClientId
2 parents 004160d + 087e9e7 commit 6c8dfd6

File tree

3 files changed

+57
-5
lines changed

3 files changed

+57
-5
lines changed

modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala

+28-2
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,34 @@ package com.evolutiongaming.skafka.consumer
33
import cats.effect.{Resource, Sync}
44
import cats.effect.std.UUIDGen
55
import com.evolutiongaming.catshelper.ToTry
6-
import com.evolutiongaming.skafka.{Topic, TopicPartition}
6+
import com.evolutiongaming.skafka.{ClientId, Topic, TopicPartition}
77
import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry
88
import io.prometheus.client.CollectorRegistry
99

1010
import scala.concurrent.duration.FiniteDuration
1111

1212
object ConsumerMetricsOf {
1313

14+
/**
15+
* Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics.
16+
*
17+
* @param sourceOf original [[ConsumerMetrics]] factory
18+
* @param prometheus instance of Prometheus registry
19+
* @param prefix metric name prefix
20+
* @return [[ConsumerMetrics]] that exposes Java Kafka client metrics
21+
*/
22+
def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen](
23+
sourceOf: ClientId => ConsumerMetrics[F],
24+
prometheus: CollectorRegistry,
25+
prefix: Option[String],
26+
): Resource[F, ClientId => ConsumerMetrics[F]] =
27+
for {
28+
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
29+
} yield { (clientId: ClientId) =>
30+
val source = sourceOf(clientId)
31+
consumerMetricsOf(source, registry)
32+
}
33+
1434
/**
1535
* Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics.
1636
*
@@ -26,7 +46,13 @@ object ConsumerMetricsOf {
2646
): Resource[F, ConsumerMetrics[F]] =
2747
for {
2848
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
29-
} yield new ConsumerMetrics[F] {
49+
} yield consumerMetricsOf(source, registry)
50+
51+
private def consumerMetricsOf[F[_]](
52+
source: ConsumerMetrics[F],
53+
registry: KafkaMetricsRegistry[F],
54+
): ConsumerMetrics[F] =
55+
new ConsumerMetrics[F] {
3056
override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] =
3157
source.call(name, topic, latency, success)
3258

modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala

+28-2
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,34 @@ package com.evolutiongaming.skafka.producer
33
import cats.effect.{Resource, Sync}
44
import cats.effect.std.UUIDGen
55
import com.evolutiongaming.catshelper.ToTry
6-
import com.evolutiongaming.skafka.Topic
6+
import com.evolutiongaming.skafka.{ClientId, Topic}
77
import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry
88
import io.prometheus.client.CollectorRegistry
99

1010
import scala.concurrent.duration.FiniteDuration
1111

1212
object ProducerMetricsOf {
1313

14+
/**
15+
* Construct [[ProducerMetrics]] that will expose Java Kafka client metrics.
16+
*
17+
* @param sourceOf original [[ProducerMetrics]] factory
18+
* @param prometheus instance of Prometheus registry
19+
* @param prefix metric name prefix
20+
* @return [[ProducerMetrics]] that exposes Java Kafka client metrics
21+
*/
22+
def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen](
23+
sourceOf: ClientId => ProducerMetrics[F],
24+
prometheus: CollectorRegistry,
25+
prefix: Option[String],
26+
): Resource[F, ClientId => ProducerMetrics[F]] =
27+
for {
28+
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
29+
} yield { (clientId: ClientId) =>
30+
val source = sourceOf(clientId)
31+
producerMetricsOf(source, registry)
32+
}
33+
1434
/**
1535
* Construct [[ProducerMetrics]] that will expose Java Kafka client metrics.
1636
*
@@ -26,7 +46,13 @@ object ProducerMetricsOf {
2646
): Resource[F, ProducerMetrics[F]] =
2747
for {
2848
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
29-
} yield new ProducerMetrics[F] {
49+
} yield producerMetricsOf(source, registry)
50+
51+
private def producerMetricsOf[F[_]](
52+
source: ProducerMetrics[F],
53+
registry: KafkaMetricsRegistry[F],
54+
): ProducerMetrics[F] =
55+
new ProducerMetrics[F] {
3056
override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency)
3157

3258
override def beginTransaction: F[Unit] = source.beginTransaction

version.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ThisBuild / version := "16.2.1-SNAPSHOT"
1+
ThisBuild / version := "16.3.0"

0 commit comments

Comments
 (0)