-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka streams processor tracing support on context.forward #1382
Comments
The problem has been reproduced and solved locally by copying the tracing context from the current trace into the record headers just before forwarding the record. A transparent solution would be to create a TracingProcessorContext that performs this copy when a record is transmitted. This tracing context would be created in the init method of the tracing processor.
If we agree on the proposal, I may contribute by dropping a PR. |
@frosiere In cases where a library updates their pattern, or adds a new function, we need to support it for the reasons you mentioned. There seems to be a TracingV2Processor added recently. Can you verify this works or clarify the issue about what's missing? |
TracerV2Processor is not useful in this case, as the forward method is defined on the ProcessorContext that needs to be traced. |
@frosiere thanks for the help. Brave has dropped all deprecated functions in the last release, so it should be easier to navigate now. |
Thank you for your quick reply. The forward method is defined on the ProcessorContext. So, the latest release therefore suffers from the same issue - tracing details are lost when using ProcessorContext#forward. Having a TracingProcessorContext and a FixedKeyProcessorContext would solve that issue. See https://docs.confluent.io/platform/current/streams/developer-guide/processor-api.html#defining-a-stream-processor for more details about that forward method. |
I have to mark this help wanted as I don't do anything like tracing at my day job and am about 100hrs over budget lately. Hope you can move this forward, or someone else can take over with the tips you mention! |
nobody took this so I will! |
#1408 begins consolidation, so we only need to instrument forward in one place |
@frosiere don't worry about suggesting the test as I can figure one out if the branch works for you, just don't want to merge unless we have instructions and that it actually does what's necessary. |
Sorry for the long delay, I missed the first post. The test should be defined as follows:
I checked the code and the fix looks good to me. Thanks a lot for your support. Let me know if you need more help or a concrete implementation of that test. |
Are you waiting for me to take action or help you move the issue forward? |
@frosiere if you have time to verify #1409 (not just eyeball, but that it works ) and make the tests I would appreciate it. I've been swamped with all the repos in this org and was deferring feature work in brave until the rocketmq finishes (as either way it would be a minor bump). help is definitely preferred vs waiting for me though, I have only used kafka once in one job several years ago, so you're in a better position than me (as are others, but I think many are also not in a position to help too I guess) |
you can take the code from my PR and make your own if you like, if that helps I mean. |
have some time this morning before market, taking a look to finish it! |
@frosiere sorry I didn't remember earlier that this had no api impact so could have gone out in a patch with no problemo. Apologize for accidentally sitting on it, waiting for other change. Can you take a quick look at #1409 to see if I got your sketch correct (I think I did), and if so, I'll add the missing fixedkey dimension and get this out. |
Feature is working as expected. Thanks a lot for the implementation. Looking forward to receive a release containing that fix. |
In previous versions of Kafka Streams, it was possible to use a transformer to convert an input into an output after transformation.
Since Kafka Streams 3.4, these transformers are deprecated in favor of the new processor API. So, basically, a record can be forwarded to children processors using context.forward.
It's working fine in terms of Kafka flow but the tracing support seems incomplete (baggage values updated in a processor are not propagated) when using this method.
So, shouldn't we review the tracing to also take care of baggages propagation when using the forward method?
Maybe I missed something but I were not able to propagate any baggages.
Remark:
Maybe KafkaStreamsTracing should provide a forward method copying the tracing context on the record headers?
Any help or feedback is more than welcome.
The text was updated successfully, but these errors were encountered: