Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka streams processor tracing support on context.forward #1382

Closed
frosiere opened this issue Oct 24, 2023 · 17 comments · Fixed by #1409
Closed

Kafka streams processor tracing support on context.forward #1382

frosiere opened this issue Oct 24, 2023 · 17 comments · Fixed by #1409
Assignees

Comments

@frosiere
Copy link
Contributor

frosiere commented Oct 24, 2023

In previous versions of Kafka Streams, it was possible to use a transformer to convert an input into an output after transformation.

Since Kafka Streams 3.4, these transformers are deprecated in favor of the new processor API. So, basically, a record can be forwarded to children processors using context.forward.

It's working fine in terms of Kafka flow but the tracing support seems incomplete (baggage values updated in a processor are not propagated) when using this method.

So, shouldn't we review the tracing to also take care of baggages propagation when using the forward method?
Maybe I missed something but I were not able to propagate any baggages.

Remark:

  • The TracingContext contains the correct baggage values but this one is not consulted when doing a forward.
  • The ProcessorContext contains the original baggage values

Maybe KafkaStreamsTracing should provide a forward method copying the tracing context on the record headers?

Any help or feedback is more than welcome.

@frosiere
Copy link
Contributor Author

frosiere commented Oct 24, 2023

The problem has been reproduced and solved locally by copying the tracing context from the current trace into the record headers just before forwarding the record. A transparent solution would be to create a TracingProcessorContext that performs this copy when a record is transmitted. This tracing context would be created in the init method of the tracing processor.

...
// from the tracing processor
tracing.propagation().injector(<HEADER_SETTER>).inject(tracing.currentTraceContext().get(), recordToForward.headers());
forward(recordToForward);
...

If we agree on the proposal, I may contribute by dropping a PR.

@codefromthecrypt
Copy link
Member

@frosiere In cases where a library updates their pattern, or adds a new function, we need to support it for the reasons you mentioned. There seems to be a TracingV2Processor added recently. Can you verify this works or clarify the issue about what's missing?

@frosiere
Copy link
Contributor Author

frosiere commented Jan 9, 2024

TracerV2Processor is not useful in this case, as the forward method is defined on the ProcessorContext that needs to be traced.
I can contribute by dropping a PR to add the missing support.

@codefromthecrypt
Copy link
Member

@frosiere thanks for the help. Brave has dropped all deprecated functions in the last release, so it should be easier to navigate now.

@frosiere
Copy link
Contributor Author

frosiere commented Jan 10, 2024

Thank you for your quick reply. The forward method is defined on the ProcessorContext. So, the latest release therefore suffers from the same issue - tracing details are lost when using ProcessorContext#forward.

Having a TracingProcessorContext and a FixedKeyProcessorContext would solve that issue.

See https://docs.confluent.io/platform/current/streams/developer-guide/processor-api.html#defining-a-stream-processor for more details about that forward method.

@codefromthecrypt
Copy link
Member

I have to mark this help wanted as I don't do anything like tracing at my day job and am about 100hrs over budget lately. Hope you can move this forward, or someone else can take over with the tips you mention!

@codefromthecrypt
Copy link
Member

nobody took this so I will!

@codefromthecrypt
Copy link
Member

#1408 begins consolidation, so we only need to instrument forward in one place

@codefromthecrypt
Copy link
Member

@frosiere can you validate this branch and suggest a failing integration test? #1409

@codefromthecrypt
Copy link
Member

@frosiere don't worry about suggesting the test as I can figure one out if the branch works for you, just don't want to merge unless we have instructions and that it actually does what's necessary.

@frosiere
Copy link
Contributor Author

frosiere commented Jan 29, 2024

Sorry for the long delay, I missed the first post.

The test should be defined as follows:

  1. Using a StreamsBuilder, define a stream consuming from a single topic and processing records using 2 processors.
  2. The first processor simply forward records to the second processor using the context.
  3. The second processor may log and check the tracing context.
  4. Be sure to initialize a trace before generating records in the topic consumed by the stream.
  5. If the trace is not lost between the 2 processors, the problem can be considered solved.
  6. The solution should work for both fixed-key and regular processors. So, we certainly need 2 tests.

I checked the code and the fix looks good to me. Thanks a lot for your support.

Let me know if you need more help or a concrete implementation of that test.

@frosiere
Copy link
Contributor Author

Are you waiting for me to take action or help you move the issue forward?

@codefromthecrypt
Copy link
Member

@frosiere if you have time to verify #1409 (not just eyeball, but that it works ) and make the tests I would appreciate it. I've been swamped with all the repos in this org and was deferring feature work in brave until the rocketmq finishes (as either way it would be a minor bump). help is definitely preferred vs waiting for me though, I have only used kafka once in one job several years ago, so you're in a better position than me (as are others, but I think many are also not in a position to help too I guess)

@codefromthecrypt
Copy link
Member

you can take the code from my PR and make your own if you like, if that helps I mean.

@codefromthecrypt
Copy link
Member

have some time this morning before market, taking a look to finish it!

@codefromthecrypt
Copy link
Member

@frosiere sorry I didn't remember earlier that this had no api impact so could have gone out in a patch with no problemo. Apologize for accidentally sitting on it, waiting for other change.

Can you take a quick look at #1409 to see if I got your sketch correct (I think I did), and if so, I'll add the missing fixedkey dimension and get this out.

@frosiere
Copy link
Contributor Author

Feature is working as expected. Thanks a lot for the implementation. Looking forward to receive a release containing that fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants