Skip to content

Commit

Permalink
Let users define tracing options in all cases (#259)
Browse files Browse the repository at this point in the history
When creating a producer/consumer (or the stream equivalent) from an existing Kafka client producer/consumer, default tracing options where used.

Consequently, the policy was always set to PROPAGATE.

Besides, when creating a shared producer, user provided config was used to create the Kafka producer, but then default options were used for tracing.

Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
  • Loading branch information
tsegismont authored Dec 13, 2023
1 parent e56773a commit 4093d81
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 33 deletions.
28 changes: 20 additions & 8 deletions src/main/java/io/vertx/kafka/client/consumer/KafkaConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,11 @@
import org.apache.kafka.common.serialization.Deserializer;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.*;
import java.util.regex.Pattern;

import static io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE;

/**
* Vert.x Kafka consumer.
* <p>
Expand All @@ -55,16 +53,30 @@ public interface KafkaConsumer<K, V> extends ReadStream<KafkaConsumerRecord<K, V
/**
* Create a new KafkaConsumer instance from a native {@link Consumer}.
*
* @param vertx Vert.x instance to use
* @param vertx Vert.x instance to use
* @param consumer the Kafka consumer to wrap
* @return an instance of the KafkaConsumer
* @return an instance of the KafkaConsumer
*/
@GenIgnore
@GenIgnore(PERMITTED_TYPE)
static <K, V> KafkaConsumer<K, V> create(Vertx vertx, Consumer<K, V> consumer) {
KafkaReadStream<K, V> stream = KafkaReadStream.create(vertx, consumer);
return new KafkaConsumerImpl<>(stream);
}

/**
* Create a new KafkaConsumer instance from a native {@link Consumer}.
*
* @param vertx Vert.x instance to use
* @param consumer the Kafka consumer to wrap
* @param options options used only for tracing settings
* @return an instance of the KafkaConsumer
*/
@GenIgnore(PERMITTED_TYPE)
static <K, V> KafkaConsumer<K, V> create(Vertx vertx, Consumer<K, V> consumer, KafkaClientOptions options) {
KafkaReadStream<K, V> stream = KafkaReadStream.create(vertx, consumer, options);
return new KafkaConsumerImpl<>(stream);
}

/**
* Create a new KafkaConsumer instance
*
Expand Down
24 changes: 15 additions & 9 deletions src/main/java/io/vertx/kafka/client/consumer/KafkaReadStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,15 @@
import io.vertx.kafka.client.common.KafkaClientOptions;
import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl;
import io.vertx.kafka.client.serialization.VertxSerdes;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.*;
import java.util.regex.Pattern;

/**
Expand Down Expand Up @@ -230,7 +224,19 @@ static <K, V> KafkaReadStream<K, V> create(Vertx vertx, KafkaClientOptions optio
* @return an instance of the KafkaReadStream
*/
static <K, V> KafkaReadStream<K, V> create(Vertx vertx, Consumer<K, V> consumer) {
return new KafkaReadStreamImpl<>(vertx, consumer, new KafkaClientOptions());
return create(vertx, consumer, new KafkaClientOptions());
}

/**
* Create a new KafkaReadStream instance
*
* @param vertx Vert.x instance to use
* @param consumer native Kafka consumer instance
* @param options options used only for tracing settings
* @return an instance of the KafkaReadStream
*/
static <K, V> KafkaReadStream<K, V> create(Vertx vertx, Consumer<K, V> consumer, KafkaClientOptions options) {
return new KafkaReadStreamImpl<>(vertx, consumer, options);
}

/**
Expand Down
19 changes: 17 additions & 2 deletions src/main/java/io/vertx/kafka/client/producer/KafkaProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.Map;
import java.util.Properties;

import static io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE;

/**
* Vert.x Kafka producer.
* <p>
Expand Down Expand Up @@ -201,9 +203,22 @@ static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, KafkaCl
* @param producer the Kafka producer to wrap
* @return an instance of the KafkaProducer
*/
@GenIgnore
@GenIgnore(PERMITTED_TYPE)
static <K, V> KafkaProducer<K, V> create(Vertx vertx, Producer<K, V> producer) {
KafkaWriteStream<K, V> stream = KafkaWriteStream.create(vertx, producer);
return create(vertx, producer, new KafkaClientOptions());
}

/**
* Create a new KafkaProducer instance from a native {@link Producer}.
*
* @param vertx Vert.x instance to use
* @param producer the Kafka producer to wrap
* @param options options used only for tracing settings
* @return an instance of the KafkaProducer
*/
@GenIgnore(PERMITTED_TYPE)
static <K, V> KafkaProducer<K, V> create(Vertx vertx, Producer<K, V> producer, KafkaClientOptions options) {
KafkaWriteStream<K, V> stream = KafkaWriteStream.create(vertx, producer, options);
return new KafkaProducerImpl<>(vertx, stream);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,18 @@ static <K, V> KafkaWriteStream<K, V> create(Vertx vertx, KafkaClientOptions opti
* @param producer native Kafka producer instance
*/
static <K, V> KafkaWriteStream<K, V> create(Vertx vertx, Producer<K, V> producer) {
return new KafkaWriteStreamImpl<>(vertx, producer, new KafkaClientOptions());
return create(vertx, producer, new KafkaClientOptions());
}

/**
* Create a new KafkaWriteStream instance.
*
* @param vertx Vert.x instance to use
* @param producer native Kafka producer instance
* @param options options used only for tracing settings
*/
static <K, V> KafkaWriteStream<K, V> create(Vertx vertx, Producer<K, V> producer, KafkaClientOptions options) {
return new KafkaWriteStreamImpl<>(vertx, producer, options);
}

@Fluent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.kafka.client.common.KafkaClientOptions;
import io.vertx.kafka.client.common.PartitionInfo;
import io.vertx.kafka.client.common.impl.CloseHandler;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.common.PartitionInfo;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import io.vertx.kafka.client.producer.RecordMetadata;
import io.vertx.kafka.client.serialization.VertxSerdes;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Serializer;

Expand All @@ -45,52 +46,86 @@
public class KafkaProducerImpl<K, V> implements KafkaProducer<K, V> {

public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, Properties config) {
return createShared(vertx, name, () -> KafkaWriteStream.create(vertx, config));
return createShared(
vertx,
name,
() -> new org.apache.kafka.clients.producer.KafkaProducer<>(config),
KafkaClientOptions.fromProperties(config, true));
}

public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, Map<String, String> config) {
return createShared(vertx, name, () -> KafkaWriteStream.create(vertx, new HashMap<>(config)));
Map<String, Object> copy = new HashMap<>(config);
return createShared(
vertx,
name,
() -> new org.apache.kafka.clients.producer.KafkaProducer<>(copy),
KafkaClientOptions.fromMap(copy, true));
}

public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, KafkaClientOptions options) {
return createShared(vertx, name, () -> KafkaWriteStream.create(vertx, options));
Map<String, Object> config = new HashMap<>();
if (options.getConfig() != null) {
config.putAll(options.getConfig());
}
return createShared(vertx, name, () -> new org.apache.kafka.clients.producer.KafkaProducer<>(config), options);
}

public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, Properties config, Class<K> keyType, Class<V> valueType) {
return createShared(vertx, name, () -> KafkaWriteStream.create(vertx, config, keyType, valueType));
Serializer<K> keySerializer = VertxSerdes.serdeFrom(keyType).serializer();
Serializer<V> valueSerializer = VertxSerdes.serdeFrom(valueType).serializer();
return createShared(vertx, name, config, keySerializer, valueSerializer);
}

public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, Properties config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
return createShared(vertx, name, () -> KafkaWriteStream.create(vertx, config, keySerializer, valueSerializer));
KafkaClientOptions options = KafkaClientOptions.fromProperties(config, true);
return createShared(
vertx,
name,
() -> new org.apache.kafka.clients.producer.KafkaProducer<>(config, keySerializer, valueSerializer),
options);
}

public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, Map<String, String> config, Class<K> keyType, Class<V> valueType) {
return createShared(vertx, name, () -> KafkaWriteStream.create(vertx, new HashMap<>(config), keyType, valueType));
Serializer<K> keySerializer = VertxSerdes.serdeFrom(keyType).serializer();
Serializer<V> valueSerializer = VertxSerdes.serdeFrom(valueType).serializer();
return createShared(vertx, name, config, keySerializer, valueSerializer);
}

public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, Map<String, String> config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
return createShared(vertx, name, () -> KafkaWriteStream.create(vertx, new HashMap<>(config), keySerializer, valueSerializer));
Map<String, Object> copy = new HashMap<>(config);
return createShared(
vertx,
name,
() -> new org.apache.kafka.clients.producer.KafkaProducer<>(copy, keySerializer, valueSerializer),
KafkaClientOptions.fromMap(copy, true));
}

public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, KafkaClientOptions options, Class<K> keyType, Class<V> valueType) {
return createShared(vertx, name, () -> KafkaWriteStream.create(vertx, options, keyType, valueType));
Serializer<K> keySerializer = VertxSerdes.serdeFrom(keyType).serializer();
Serializer<V> valueSerializer = VertxSerdes.serdeFrom(valueType).serializer();
return createShared(vertx, name, options, keySerializer, valueSerializer);
}

public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, KafkaClientOptions options, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
return createShared(vertx, name, () -> KafkaWriteStream.create(vertx, options, keySerializer, valueSerializer));
Map<String, Object> config = new HashMap<>();
if (options.getConfig() != null) {
config.putAll(options.getConfig());
}
return createShared(vertx, name, () -> new org.apache.kafka.clients.producer.KafkaProducer<>(config, keySerializer, valueSerializer), options);
}

private static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, Supplier<KafkaWriteStream> streamFactory) {
private static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String name, Supplier<Producer<K, V>> producerFactory, KafkaClientOptions options) {
CloseFuture closeFuture = new CloseFuture();
Producer<K, V> s = ((VertxInternal)vertx).createSharedResource("__vertx.shared.kafka.producer", name, closeFuture, cf -> {
Producer<K, V> producer = streamFactory.get().unwrap();
Producer<K, V> producer = producerFactory.get();
cf.add(completion -> vertx.<Void>executeBlocking(() -> {
producer.close();
return null;
}).onComplete(completion));
return producer;
});
KafkaProducerImpl<K, V> producer = new KafkaProducerImpl<>(vertx, KafkaWriteStream.create(vertx, s), new CloseHandler((timeout, ar) -> {
KafkaWriteStream<K, V> kafkaWriteStream = KafkaWriteStream.create(vertx, s, options);
KafkaProducerImpl<K, V> producer = new KafkaProducerImpl<>(vertx, kafkaWriteStream, new CloseHandler((timeout, ar) -> {
closeFuture.close().onComplete(ar);
}));
producer.registerCloseHook();
Expand Down

0 comments on commit 4093d81

Please sign in to comment.