Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support rocketmq #1412

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions instrumentation/rocketmq-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,33 +49,39 @@ public class ProducerExample {

### consumer

Replace `org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently`
with `brave.rocketmq.client.TracingMessageListenerConcurrently`
or `org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly`
with `brave.rocketmq.client.TracingMessageListenerOrderly`;
wrap `org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently`
using `brave.rocketmq.client.RocketMQTracing.wrap(long, org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly)`,
or alternatively, wrap `org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly`
using `brave.rocketmq.client.RocketMQTracing.wrap(int, org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently)`;

```java
package brave.rocketmq.client;

import java.util.List;
import java.util.Optional;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.messaging.MessagingRequest;
import brave.messaging.MessagingTracing;
import brave.rocketmq.client.RocketMQTracing;
import brave.sampler.SamplerFunction;
import brave.sampler.SamplerFunctions;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.Optional;

public class ProducerExample {

public static void main(String[] args) throws Exception {
// todo Replaced with actual tracing construct
Tracing tracing = Tracing.newBuilder().build();
SamplerFunction<MessagingRequest> producerSampler = SamplerFunctions.deferDecision();
RocketMQTracing producerTracing = RocketMQTracing.create(
RocketMQTracing rocketMQTracing = RocketMQTracing.create(
MessagingTracing.newBuilder(tracing).producerSampler(producerSampler).build());

String topic = "testPushConsumer";
Expand All @@ -84,16 +90,20 @@ public class ProducerExample {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testPushConsumer");
consumer.setNamesrvAddr(nameserverAddr);
consumer.subscribe(topic, "*");
consumer.registerMessageListener(new TraceableMessageListenerConcurrently(0, producerTracing) {
MessageListenerConcurrently messageListenerConcurrently = rocketMQTracing.wrap(new MessageListenerConcurrently() {
@Override
protected void handleMessage(MessageExt messageExt) {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
Span span =
Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null);
// do something
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.registerMessageListener(messageListenerConcurrently);

consumer.start();
}

}

```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@
import brave.propagation.TraceContext.Injector;
import brave.propagation.TraceContextOrSamplingFlags;
import brave.sampler.SamplerFunction;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;

import java.util.Map;

public class RocketMQTracing {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public class RocketMQTracing {
/**
* Use this class to decorate your RocketMQ consumer / producer and enable Tracing.
*
* @since 6.1
*/
public final class RocketMQTracing {


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note this won't be public, and this is like jms, rabbit, etc.

Suggested change
static final String ROCKETMQ_TOPIC = "rocketmq.topic";

private static final long defaultSuspendCurrentQueueTimeMillis = 1000;
private static final int defaultDelayLevelWhenNextConsume = 0;

public static RocketMQTracing create(Tracing tracing) {
return new RocketMQTracing(MessagingTracing.create(tracing), RocketMQTags.ROCKETMQ_SERVICE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't pass around this constant.. you'll see why, but it is simpler and less parameters

}
Expand Down Expand Up @@ -106,4 +113,35 @@ public Tracing tracing() {
public Tracer tracer() {
return tracer;
}

public MessageListenerOrderly wrap(MessageListenerOrderly messageListenerOrderly) {
return new TracingMessageListenerOrderly(defaultSuspendCurrentQueueTimeMillis, this, messageListenerOrderly);
}

public MessageListenerOrderly wrap(long suspendCurrentQueueTimeMillis, MessageListenerOrderly messageListenerOrderly) {
return new TracingMessageListenerOrderly(suspendCurrentQueueTimeMillis, this, messageListenerOrderly);
}

public MessageListenerConcurrently wrap(MessageListenerConcurrently messageListenerConcurrently) {
return new TracingMessageListenerConcurrently(defaultDelayLevelWhenNextConsume, this, messageListenerConcurrently);
}

public MessageListenerConcurrently wrap(int delayLevelWhenNextConsume, MessageListenerConcurrently messageListenerConcurrently) {
return new TracingMessageListenerConcurrently(delayLevelWhenNextConsume, this, messageListenerConcurrently);
}

public MessageListenerOrderly unwrap(MessageListenerOrderly messageListenerOrderly) {
if (messageListenerOrderly instanceof TracingMessageListenerOrderly) {
return ((TracingMessageListenerOrderly)messageListenerOrderly).messageListenerOrderly;
}
return null;
}

public MessageListenerConcurrently unwrap(MessageListenerConcurrently messageListenerConcurrently) {
if (messageListenerConcurrently instanceof TracingMessageListenerConcurrently) {
return ((TracingMessageListenerConcurrently)messageListenerConcurrently).messageListenerConcurrently;
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,28 @@
*/
package brave.rocketmq.client;

import brave.Span;
import brave.Tracer;
import brave.Tracer.SpanInScope;
import java.util.Collections;
import java.util.List;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

// TODO: I think we don't want to expose a custom class rather wrap in context and prove a user can
// do custom tagging via their own MessageListenerConcurrently.
// Maybe expose RocketMQTracing.messageListenerConcurrently() to wrap theirs or make spans default
// and not expose this.
public abstract class TracingMessageListenerConcurrently implements MessageListenerConcurrently {
import brave.Span;
import brave.Tracer.SpanInScope;

class TracingMessageListenerConcurrently implements MessageListenerConcurrently {

private final int delayLevelWhenNextConsume;

private final RocketMQTracing tracing;
final MessageListenerConcurrently messageListenerConcurrently;

public TracingMessageListenerConcurrently(int delayLevelWhenNextConsume,
RocketMQTracing tracing) {
TracingMessageListenerConcurrently(int delayLevelWhenNextConsume,
RocketMQTracing tracing, MessageListenerConcurrently messageListenerConcurrently) {
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
this.tracing = tracing;
this.messageListenerConcurrently = messageListenerConcurrently;
}

@Override
Expand All @@ -50,7 +49,7 @@ public final ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

ConsumeConcurrentlyStatus result;
try (SpanInScope scope = tracing.tracer().withSpanInScope(span)) {
result = handleMessage(msg, context);
result = messageListenerConcurrently.consumeMessage(Collections.singletonList(msg), context);
} catch (Exception e) {
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
result = ConsumeConcurrentlyStatus.RECONSUME_LATER;
Expand All @@ -66,7 +65,4 @@ public final ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

protected abstract ConsumeConcurrentlyStatus handleMessage(MessageExt messageExt,
ConsumeConcurrentlyContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,27 @@
*/
package brave.rocketmq.client;

import brave.Span;
import brave.Tracer;
import brave.Tracer.SpanInScope;
import java.util.Collections;
import java.util.List;

import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;

// TODO: I think we don't want to expose a custom class rather wrap in context and prove a user can
// do custom tagging via their own MessageListenerOrderly.
// Maybe expose RocketMQTracing.messageListenerOrderly() to wrap theirs or make spans default
// and not expose this.
public abstract class TracingMessageListenerOrderly implements MessageListenerOrderly {
import brave.Span;
import brave.Tracer.SpanInScope;

class TracingMessageListenerOrderly implements MessageListenerOrderly {
private final long suspendCurrentQueueTimeMillis;
private final RocketMQTracing tracing;
final MessageListenerOrderly messageListenerOrderly;

public TracingMessageListenerOrderly(long suspendCurrentQueueTimeMillis,
RocketMQTracing tracing) {
TracingMessageListenerOrderly(long suspendCurrentQueueTimeMillis,
RocketMQTracing tracing, MessageListenerOrderly messageListenerOrderly) {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
this.tracing = tracing;
this.messageListenerOrderly = messageListenerOrderly;
}

@Override
Expand All @@ -48,7 +48,7 @@ public final ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,

ConsumeOrderlyStatus result;
try (SpanInScope scope = tracing.tracer().withSpanInScope(span)) {
result = handleMessage(msg, context);
result = messageListenerOrderly.consumeMessage(Collections.singletonList(msg), context);
} catch (Exception e) {
span.error(e);
context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
Expand All @@ -65,7 +65,4 @@ public final ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,

return ConsumeOrderlyStatus.SUCCESS;
}

protected abstract ConsumeOrderlyStatus handleMessage(MessageExt messageExt,
ConsumeOrderlyContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ public void sendMessageBefore(SendMessageContext context) {
request,
msg.getProperties());
span.name(RocketMQTags.TO_PREFIX + msg.getTopic());
span.tag(RocketMQTags.ROCKETMQ_TAGS, Util.getOrEmpty(msg.getTags()));
if (msg.getTags() != null && !msg.getTags().isEmpty()) {
span.tag(RocketMQTags.ROCKETMQ_TAGS, msg.getTags());
}
context.setMqTraceContext(span);
tracing.producerInjector.inject(span.context(), request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,4 @@ static <T extends MessagingRequest> Span createAndStartSpan(RocketMQTracing trac
return span;
}

// TODO: we shouldn't add tags with empty values!
static String getOrEmpty(String obj) {
if (obj == null) {
return "";
} else {
return obj;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import brave.sampler.SamplerFunctions;
import brave.test.ITRemote;
import brave.test.IntegrationTestSpanHandler;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -33,6 +35,8 @@
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
Expand Down Expand Up @@ -145,17 +149,17 @@ class ITRocketMQTracingTest extends ITRemote {
consumer.subscribe(topic, "*");
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Span> reference = new AtomicReference<>();
consumer.registerMessageListener(new TracingMessageListenerConcurrently(0, consumerTracing) {
MessageListenerConcurrently messageListenerConcurrently = consumerTracing.wrap(new MessageListenerConcurrently() {
@Override
protected ConsumeConcurrentlyStatus handleMessage(MessageExt messageExt,
ConsumeConcurrentlyContext context) {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
Span span =
Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null);
reference.set(span);
latch.countDown();
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.registerMessageListener(messageListenerConcurrently);
producer.send(message);
consumer.start();

Expand Down Expand Up @@ -185,17 +189,17 @@ protected ConsumeConcurrentlyStatus handleMessage(MessageExt messageExt,
consumer.subscribe(topic, "*");
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Span> reference = new AtomicReference<>();
consumer.registerMessageListener(new TracingMessageListenerOrderly(0, consumerTracing) {
MessageListenerOrderly messageListenerOrderly = consumerTracing.wrap(new MessageListenerOrderly() {
@Override
protected ConsumeOrderlyStatus handleMessage(MessageExt messageExt,
ConsumeOrderlyContext context) {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
Span span =
Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null);
reference.set(span);
latch.countDown();
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.registerMessageListener(messageListenerOrderly);
producer.send(message);
consumer.start();

Expand Down Expand Up @@ -226,17 +230,17 @@ protected ConsumeOrderlyStatus handleMessage(MessageExt messageExt,
consumer.subscribe(topic, "*");
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Span> reference = new AtomicReference<>();
consumer.registerMessageListener(new TracingMessageListenerOrderly(0, consumerTracing) {
MessageListenerOrderly messageListenerOrderly = consumerTracing.wrap(new MessageListenerOrderly() {
@Override
protected ConsumeOrderlyStatus handleMessage(MessageExt messageExt,
ConsumeOrderlyContext context) {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
Span span =
Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null);
reference.set(span);
latch.countDown();
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.registerMessageListener(messageListenerOrderly);

producer.send(message);
consumer.start();
Expand Down
Loading