Skip to content

Commit

Permalink
Adds RocketMQ plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
CodePrometheus committed Feb 7, 2025
1 parent 06b47b1 commit 8cbd9dc
Show file tree
Hide file tree
Showing 18 changed files with 1,095 additions and 0 deletions.
12 changes: 12 additions & 0 deletions instrumentation/benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,18 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-instrumentation-rocketmq-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq-client.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package brave.rocketmq.client;

import brave.Tracing;
import brave.kafka.clients.TracingProducerBenchmarks;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.util.concurrent.TimeUnit;

import static org.apache.rocketmq.client.producer.SendStatus.SEND_OK;

@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 10, time = 1)
@Fork(3)
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Thread)
public class RocketMQProducerBenchmarks {
Message message;
DefaultMQProducer producer, tracingProducer;

@Setup(Level.Trial) public void init() {
message = new Message("zipkin", "zipkin".getBytes());
Tracing tracing = Tracing.newBuilder().build();
producer = new FakeProducer();
tracingProducer = new FakeProducer();
tracingProducer.getDefaultMQProducerImpl().registerSendMessageHook(
new TracingSendMessage(RocketMQTracing.newBuilder(tracing).build())
);
}

@TearDown(Level.Trial) public void close() {
Tracing.current().close();
}

@Benchmark public SendResult send_baseCase() throws Exception {
return producer.send(message);
}

@Benchmark public void send_traced() throws Exception {
tracingProducer.send(message);
}

// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.addProfiler("gc")
.include(".*" + TracingProducerBenchmarks.class.getSimpleName())
.build();

new Runner(opt).run();
}

static final class FakeProducer extends DefaultMQProducer {
@Override public SendResult send(Message msg) {
SendResult sendResult = new SendResult();
sendResult.setMsgId("zipkin");
sendResult.setSendStatus(SEND_OK);
return sendResult;
}
}
}
1 change: 1 addition & 0 deletions instrumentation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
<module>kafka-streams</module>
<module>netty-codec-http</module>
<module>vertx-web</module>
<module>rocketmq-client</module>
</modules>

<!-- ${project.groupId}:brave version is set in the root pom.
Expand Down
98 changes: 98 additions & 0 deletions instrumentation/rocketmq-client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# brave-instrumentation-rocketmq-client

## Tracing for RocketMQ Client

This module provides instrumentation for RocketMQ based services.

## example

### producer

The key is to register our hook to the producer, use `registerSendMessageHook(new TracingSendMessage())`.

```java
package brave.rocketmq.client;

import brave.Tracing;
import brave.messaging.MessagingRequest;
import brave.messaging.MessagingTracing;
import brave.sampler.SamplerFunction;
import brave.sampler.SamplerFunctions;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class ProducerExample {
public static void main(String[] args) throws Exception {
Tracing tracing = Tracing.newBuilder().build();
SamplerFunction<MessagingRequest> producerSampler = SamplerFunctions.deferDecision();
RocketMQTracing producerTracing = RocketMQTracing.create(
MessagingTracing.newBuilder(tracing).producerSampler(producerSampler).build());

String topic = "testSend";
Message message = new Message(topic, "zipkin", "zipkin".getBytes());
DefaultMQProducer producer = new DefaultMQProducer("testSend");
producer.getDefaultMQProducerImpl()
.registerSendMessageHook(new TracingSendMessage());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
producer.send(message);

producer.shutdown();
}
}
```

### consumer

wrap `org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly`
using `brave.rocketmq.client.RocketMQTracing.messageListenerOrderly(org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly)`,
or alternatively, wrap `org.apache.rocketmq.client.consumer.listener.messageListenerConcurrently`
using `brave.rocketmq.client.RocketMQTracing.messageListenerConcurrently(org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently)`;

```java
package brave.rocketmq.client;

import java.util.List;
import java.util.Optional;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.messaging.MessagingRequest;
import brave.messaging.MessagingTracing;
import brave.rocketmq.client.RocketMQTracing;
import brave.sampler.SamplerFunction;
import brave.sampler.SamplerFunctions;

public class ProducerExample {
public static void main(String[] args) throws Exception {
Tracing tracing = Tracing.newBuilder().build();
SamplerFunction<MessagingRequest> producerSampler = SamplerFunctions.deferDecision();
RocketMQTracing rocketMQTracing = RocketMQTracing.create(
MessagingTracing.newBuilder(tracing).producerSampler(producerSampler).build());

String topic = "testPushConsumer";
String nameserverAddr = "127.0.0.1:9876";

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testPushConsumer");
consumer.setNamesrvAddr(nameserverAddr);
consumer.subscribe(topic, "*");
MessageListenerConcurrently messageListenerConcurrently = rocketMQTracing.messageListenerConcurrently(
new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
// do something
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.registerMessageListener(messageListenerConcurrently);

consumer.start();
}
}
```
6 changes: 6 additions & 0 deletions instrumentation/rocketmq-client/bnd.bnd
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# We use brave.internal.Nullable, but it is not used at runtime.
Import-Package: \
!brave.internal*,\
*
Export-Package: \
brave.rocketmq.client
51 changes: 51 additions & 0 deletions instrumentation/rocketmq-client/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?xml version="1.0"?>
<!--
Copyright The OpenZipkin Authors
SPDX-License-Identifier: Apache-2.0
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-instrumentation-parent</artifactId>
<version>6.0.4-SNAPSHOT</version>
</parent>

<artifactId>brave-instrumentation-rocketmq-client</artifactId>
<name>Brave Instrumentation: RocketMQ Client</name>

<properties>
<!-- Matches Export-Package in bnd.bnd -->
<module.name>brave.rocketmq.client</module.name>

<main.basedir>${project.basedir}/../..</main.basedir>
</properties>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-instrumentation-messaging</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq-client.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-tests</artifactId>
<scope>test</scope>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package brave.rocketmq.client;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.internal.Nullable;
import brave.messaging.MessagingRequest;
import brave.propagation.TraceContext;
import brave.propagation.TraceContextOrSamplingFlags;
import brave.sampler.SamplerFunction;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;

import static brave.Span.Kind.CONSUMER;
import static brave.internal.Throwables.propagateIfFatal;
import static brave.rocketmq.client.RocketMQTracing.ROCKETMQ_TOPIC;

/**
* Read records headers to create and complete a child of the incoming
* producers span if possible.
* The spans are modeled as a duration 1 {@link Span.Kind#CONSUMER} span to represent consuming the
* message from the rocketmq broker with a child span representing the processing of the message.
*/
abstract class AbstractMessageListener {
final RocketMQTracing rocketMQTracing;
final Tracing tracing;
final Tracer tracer;
final TraceContext.Extractor<MessageConsumerRequest> extractor;
final SamplerFunction<MessagingRequest> sampler;
@Nullable final String remoteServiceName;

AbstractMessageListener(RocketMQTracing rocketMQTracing) {
this.rocketMQTracing = rocketMQTracing;
this.tracing = rocketMQTracing.messagingTracing.tracing();
this.tracer = tracing.tracer();
this.extractor = rocketMQTracing.consumerExtractor;
this.sampler = rocketMQTracing.consumerSampler;
this.remoteServiceName = rocketMQTracing.remoteServiceName;
}

<T> T processConsumeMessage(
List<MessageExt> msgs,
Function<List<MessageExt>, T> consumerFunc,
BiFunction<T, T, Boolean> successFunc,
T successStatus
) {
for (MessageExt message : msgs) {
MessageConsumerRequest request = new MessageConsumerRequest(message);
TraceContextOrSamplingFlags extracted =
rocketMQTracing.extractAndClearTraceIdHeaders(extractor, request, message.getProperties());
Span consumerSpan = rocketMQTracing.nextMessagingSpan(sampler, request, extracted);
Span listenerSpan = tracer.newChild(consumerSpan.context());

if (!consumerSpan.isNoop()) {
setConsumerSpan(consumerSpan, message.getTopic());
// incur timestamp overhead only once
long timestamp = tracing.clock(consumerSpan.context()).currentTimeMicroseconds();
consumerSpan.start(timestamp);
long consumerFinish = timestamp + 1L; // save a clock reading
consumerSpan.finish(consumerFinish);
// not using scoped span as we want to start with a pre-configured time
listenerSpan.name("on-message").start(consumerFinish);
}

Tracer.SpanInScope scope = tracer.withSpanInScope(listenerSpan);
Throwable error = null;
T result;

try {
result = consumerFunc.apply(msgs);
} catch (Throwable t) {
propagateIfFatal(t);
error = t;
throw t;
} finally {
if (error != null) listenerSpan.error(error);
listenerSpan.finish();
scope.close();
}

if (!successFunc.apply(result, successStatus)) {
return result;
}
}
return successStatus;
}

void setConsumerSpan(Span span, String topic) {
span.name("receive").kind(CONSUMER);
span.tag(ROCKETMQ_TOPIC, topic);
if (remoteServiceName != null) span.remoteServiceName(remoteServiceName);
}
}
Loading

0 comments on commit 8cbd9dc

Please sign in to comment.