diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 1afb9113eb4..e593a17c98e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -150,6 +150,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume */ private MessageListener messageListener; + /** + * Listener to call if message queue assignment is changed. + */ + private MessageQueueListener messageQueueListener; + /** * Offset Storage */ @@ -987,4 +992,12 @@ public boolean isClientRebalance() { public void setClientRebalance(boolean clientRebalance) { this.clientRebalance = clientRebalance; } + + public MessageQueueListener getMessageQueueListener() { + return messageQueueListener; + } + + public void setMessageQueueListener(MessageQueueListener messageQueueListener) { + this.messageQueueListener = messageQueueListener; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java index f4a8eda23a4..81e06ee4176 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java @@ -29,20 +29,22 @@ */ public interface MQConsumer extends MQAdmin { /** - * If consuming failure,message will be send back to the brokers,and delay consuming some time + * If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after + * interval specified in delay level. */ @Deprecated void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; /** - * If consuming failure,message will be send back to the broker,and delay consuming some time + * If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after + * interval specified in delay level. */ void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; /** - * Fetch message queues from consumer cache according to the topic + * Fetch message queues from consumer cache pertaining to the given topic. * * @param topic message topic * @return queue set diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java index 63795a6eeb0..74510f4c3ea 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java @@ -26,8 +26,7 @@ public interface MessageQueueListener { /** * @param topic message topic * @param mqAll all queues in this message topic - * @param mqDivided collection of queues,assigned to the current consumer + * @param mqAssigned collection of queues, assigned to the current consumer */ - void messageQueueChanged(final String topic, final Set mqAll, - final Set mqDivided); + void messageQueueChanged(final String topic, final Set mqAll, final Set mqAssigned); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index e57579321cb..cfb89b5c887 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -35,6 +35,7 @@ import org.apache.rocketmq.client.consumer.AckResult; import org.apache.rocketmq.client.consumer.AckStatus; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MessageQueueListener; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.PopCallback; import org.apache.rocketmq.client.consumer.PopResult; @@ -132,7 +133,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private long queueMaxSpanFlowControlTimes = 0; //10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h - private int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200}; + private final int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200}; private static final int MAX_POP_INVISIBLE_TIME = 300000; private static final int MIN_POP_INVISIBLE_TIME = 5000; @@ -1553,4 +1554,11 @@ public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenExcept int[] getPopDelayLevel() { return popDelayLevel; } + + public MessageQueueListener getMessageQueueListener() { + if (null == defaultMQPushConsumer) { + return null; + } + return defaultMQPushConsumer.getMessageQueueListener(); + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index df509f37161..f9cf429c69f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -20,6 +20,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; +import org.apache.rocketmq.client.consumer.MessageQueueListener; import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.consumer.store.ReadOffsetType; import org.apache.rocketmq.client.exception.MQClientException; @@ -52,7 +53,7 @@ public RebalancePushImpl(String consumerGroup, MessageModel messageModel, @Override public void messageQueueChanged(String topic, Set mqAll, Set mqDivided) { - /** + /* * When rebalance result changed, should update subscription's version to notify broker. * Fix: inconsistency subscription may lead to consumer miss messages. */ @@ -82,6 +83,11 @@ public void messageQueueChanged(String topic, Set mqAll, Set