Skip to content

Commit

Permalink
kafka-tracing: injects init context on forward
Browse files Browse the repository at this point in the history
Signed-off-by: Adrian Cole <adrian@tetrate.io>
  • Loading branch information
Adrian Cole committed Jan 15, 2024
1 parent 66f8df3 commit 86f6d15
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public <K, V> Producer<K, V> producer(Producer<K, V> producer) {
* one couldn't be extracted.
*/
public Span nextSpan(ConsumerRecord<?, ?> record) {
// Eventhough the type is ConsumerRecord, this is not a (remote) consumer span. Only "poll"
// Even though the type is ConsumerRecord, this is not a (remote) consumer span. Only "poll"
// events create consumer spans. Since this is a processor span, we use the normal sampler.
TraceContextOrSamplingFlags extracted =
extractAndClearTraceIdHeaders(processorExtractor, record.headers(), record.headers());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2020 The OpenZipkin Authors
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -16,12 +16,12 @@
import brave.propagation.Propagation.Getter;
import brave.propagation.Propagation.Setter;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessingContext;

final class KafkaStreamsPropagation {
/**
* Used by {@link KafkaStreamsTracing#nextSpan(ProcessorContext)} to extract a trace context from
* a prior stage.
* Used by {@link KafkaStreamsTracing#nextSpan(ProcessingContext, Headers)} to extract a trace
* context from a prior stage.
*/
static final Getter<Headers, String> GETTER = new Getter<Headers, String>() {
@Override public String get(Headers headers, String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
*/
package brave.kafka.streams;

import brave.propagation.CurrentTraceContext;
import brave.propagation.TraceContext;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;

class TracingFixedKeyProcessor<KIn, VIn, VOut> extends
final class TracingFixedKeyProcessor<KIn, VIn, VOut> extends
BaseTracingProcessor<FixedKeyProcessorContext<KIn, VOut>, FixedKeyRecord<KIn, VIn>, FixedKeyProcessor<KIn, VIn, VOut>>
implements FixedKeyProcessor<KIn, VIn, VOut> {

Expand All @@ -31,13 +33,20 @@ class TracingFixedKeyProcessor<KIn, VIn, VOut> extends
return record.headers();
}

@Override
void process(FixedKeyProcessor<KIn, VIn, VOut> delegate, FixedKeyRecord<KIn, VIn> record) {
@Override void process(FixedKeyProcessor<KIn, VIn, VOut> delegate,
FixedKeyRecord<KIn, VIn> record) {
delegate.process(record);
}

@Override public void init(FixedKeyProcessorContext<KIn, VOut> context) {
this.context = context;
CurrentTraceContext current =
kafkaStreamsTracing.kafkaTracing.messagingTracing().tracing().currentTraceContext();
TraceContext traceContext = current.get();
if (traceContext != null) {
context =
new TracingFixedKeyProcessorContext<>(context, kafkaStreamsTracing.injector, traceContext);
}
delegate.init(context);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package brave.kafka.streams;

import brave.propagation.TraceContext;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;

/** Injects the initialization tracing context to record headers on forward */
final class TracingFixedKeyProcessorContext<KForward, VForward>
extends TracingProcessingContext<FixedKeyProcessorContext<KForward, VForward>>
implements FixedKeyProcessorContext<KForward, VForward> {

TracingFixedKeyProcessorContext(FixedKeyProcessorContext<KForward, VForward> delegate,
TraceContext.Injector<Headers> injector, TraceContext context) {
super(delegate, injector, context);
}

@Override public <K extends KForward, V extends VForward> void forward(FixedKeyRecord<K, V> r) {
injector.inject(context, r.headers());
delegate.forward(r);
}

@Override
public <K extends KForward, V extends VForward> void forward(FixedKeyRecord<K, V> r, String s) {
injector.inject(context, r.headers());
delegate.forward(r, s);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package brave.kafka.streams;

import brave.propagation.TraceContext;
import brave.propagation.TraceContext.Injector;
import java.io.File;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.ProcessingContext;
import org.apache.kafka.streams.processor.api.RecordMetadata;

class TracingProcessingContext<C extends ProcessingContext> implements ProcessingContext {
final C delegate;
final Injector<Headers> injector;
final TraceContext context;

TracingProcessingContext(C delegate, Injector<Headers> injector,
TraceContext context) {
this.delegate = delegate;
this.injector = injector;
this.context = context;
}

@Override public String applicationId() {
return delegate.applicationId();
}

@Override public TaskId taskId() {
return delegate.taskId();
}

@Override public Optional<RecordMetadata> recordMetadata() {
return delegate.recordMetadata();
}

@Override public Serde<?> keySerde() {
return delegate.keySerde();
}

@Override public Serde<?> valueSerde() {
return delegate.valueSerde();
}

@Override public File stateDir() {
return delegate.stateDir();
}

@Override public StreamsMetrics metrics() {
return delegate.metrics();
}

@Override public <S extends StateStore> S getStateStore(String s) {
return delegate.getStateStore(s);
}

@Override public Cancellable schedule(Duration duration, PunctuationType punctuationType,
Punctuator punctuator) {
return delegate.schedule(duration, punctuationType, punctuator);
}

@Override public void commit() {
delegate.commit();
}

@Override public Map<String, Object> appConfigs() {
return delegate.appConfigs();
}

@Override public Map<String, Object> appConfigsWithPrefix(String s) {
return delegate.appConfigsWithPrefix(s);
}

@Override public long currentSystemTimeMs() {
return delegate.currentSystemTimeMs();
}

@Override public long currentStreamTimeMs() {
return delegate.currentStreamTimeMs();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package brave.kafka.streams;

import brave.propagation.CurrentTraceContext;
import brave.propagation.TraceContext;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
Expand All @@ -37,6 +39,12 @@ final class TracingProcessor<KIn, VIn, KOut, VOut> extends

@Override public void init(ProcessorContext<KOut, VOut> context) {
this.context = context;
CurrentTraceContext current =
kafkaStreamsTracing.kafkaTracing.messagingTracing().tracing().currentTraceContext();
TraceContext traceContext = current.get();
if (traceContext != null) {
context = new TracingProcessorContext<>(context, kafkaStreamsTracing.injector, traceContext);
}
delegate.init(context);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package brave.kafka.streams;

import brave.propagation.TraceContext;
import brave.propagation.TraceContext.Injector;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;

/** Injects the initialization tracing context to record headers on forward */
final class TracingProcessorContext<KForward, VForward>
extends TracingProcessingContext<ProcessorContext<KForward, VForward>>
implements ProcessorContext<KForward, VForward> {

TracingProcessorContext(ProcessorContext<KForward, VForward> delegate,
Injector<Headers> injector, TraceContext context) {
super(delegate, injector, context);
}

@Override public <K extends KForward, V extends VForward> void forward(Record<K, V> r) {
injector.inject(context, r.headers());
delegate.forward(r);
}

@Override
public <K extends KForward, V extends VForward> void forward(Record<K, V> r, String s) {
injector.inject(context, r.headers());
delegate.forward(r, s);
}
}

0 comments on commit 86f6d15

Please sign in to comment.