Skip to content

Commit 087e9e7

Browse files
author
Denys Fakhritdinov
committed
extract consumer/producer metrics creation in separate method
1 parent 941e551 commit 087e9e7

File tree

2 files changed

+16
-50
lines changed

2 files changed

+16
-50
lines changed

modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala

+8-21
Original file line numberDiff line numberDiff line change
@@ -28,26 +28,7 @@ object ConsumerMetricsOf {
2828
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
2929
} yield { (clientId: ClientId) =>
3030
val source = sourceOf(clientId)
31-
32-
new ConsumerMetrics[F] {
33-
override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] =
34-
source.call(name, topic, latency, success)
35-
36-
override def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]): F[Unit] =
37-
source.poll(topic, bytes, records, age)
38-
39-
override def count(name: String, topic: Topic): F[Unit] =
40-
source.count(name, topic)
41-
42-
override def rebalance(name: String, topicPartition: TopicPartition): F[Unit] =
43-
source.rebalance(name, topicPartition)
44-
45-
override def topics(latency: FiniteDuration): F[Unit] =
46-
source.topics(latency)
47-
48-
override def exposeJavaMetrics[K, V](consumer: Consumer[F, K, V]): Resource[F, Unit] =
49-
registry.register(consumer.clientMetrics)
50-
}
31+
consumerMetricsOf(source, registry)
5132
}
5233

5334
/**
@@ -65,7 +46,13 @@ object ConsumerMetricsOf {
6546
): Resource[F, ConsumerMetrics[F]] =
6647
for {
6748
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
68-
} yield new ConsumerMetrics[F] {
49+
} yield consumerMetricsOf(source, registry)
50+
51+
private def consumerMetricsOf[F[_]](
52+
source: ConsumerMetrics[F],
53+
registry: KafkaMetricsRegistry[F],
54+
): ConsumerMetrics[F] =
55+
new ConsumerMetrics[F] {
6956
override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] =
7057
source.call(name, topic, latency, success)
7158

modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala

+8-29
Original file line numberDiff line numberDiff line change
@@ -28,34 +28,7 @@ object ProducerMetricsOf {
2828
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
2929
} yield { (clientId: ClientId) =>
3030
val source = sourceOf(clientId)
31-
32-
new ProducerMetrics[F] {
33-
override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency)
34-
35-
override def beginTransaction: F[Unit] = source.beginTransaction
36-
37-
override def sendOffsetsToTransaction(latency: FiniteDuration): F[Unit] =
38-
source.sendOffsetsToTransaction(latency)
39-
40-
override def commitTransaction(latency: FiniteDuration): F[Unit] = source.commitTransaction(latency)
41-
42-
override def abortTransaction(latency: FiniteDuration): F[Unit] = source.abortTransaction(latency)
43-
44-
override def send(topic: Topic, latency: FiniteDuration, bytes: Int): F[Unit] =
45-
source.send(topic, latency, bytes)
46-
47-
override def block(topic: Topic, latency: FiniteDuration): F[Unit] = source.block(topic, latency)
48-
49-
override def failure(topic: Topic, latency: FiniteDuration): F[Unit] = source.failure(topic, latency)
50-
51-
override def partitions(topic: Topic, latency: FiniteDuration): F[Unit] = source.partitions(topic, latency)
52-
53-
override def flush(latency: FiniteDuration): F[Unit] = source.flush(latency)
54-
55-
override def exposeJavaMetrics(producer: Producer[F]): Resource[F, Unit] =
56-
registry.register(producer.clientMetrics)
57-
58-
}
31+
producerMetricsOf(source, registry)
5932
}
6033

6134
/**
@@ -73,7 +46,13 @@ object ProducerMetricsOf {
7346
): Resource[F, ProducerMetrics[F]] =
7447
for {
7548
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
76-
} yield new ProducerMetrics[F] {
49+
} yield producerMetricsOf(source, registry)
50+
51+
private def producerMetricsOf[F[_]](
52+
source: ProducerMetrics[F],
53+
registry: KafkaMetricsRegistry[F],
54+
): ProducerMetrics[F] =
55+
new ProducerMetrics[F] {
7756
override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency)
7857

7958
override def beginTransaction: F[Unit] = source.beginTransaction

0 commit comments

Comments
 (0)