Skip to content

Commit

Permalink
let consumer be aware of message queue assignment change
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
  • Loading branch information
lizhanhui committed Nov 10, 2023
1 parent 27759f3 commit 3dfe731
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private MessageListener messageListener;

/**
* Listener to call if message queue assignment is changed.
*/
private MessageQueueListener messageQueueListener;

/**
* Offset Storage
*/
Expand Down Expand Up @@ -987,4 +992,12 @@ public boolean isClientRebalance() {
public void setClientRebalance(boolean clientRebalance) {
this.clientRebalance = clientRebalance;
}

public MessageQueueListener getMessageQueueListener() {
return messageQueueListener;
}

public void setMessageQueueListener(MessageQueueListener messageQueueListener) {
this.messageQueueListener = messageQueueListener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,22 @@
*/
public interface MQConsumer extends MQAdmin {
/**
* If consuming failure,message will be send back to the brokers,and delay consuming some time
* If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after
* interval specified in delay level.
*/
@Deprecated
void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException;

/**
* If consuming failure,message will be send back to the broker,and delay consuming some time
* If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after
* interval specified in delay level.
*/
void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

/**
* Fetch message queues from consumer cache according to the topic
* Fetch message queues from consumer cache pertaining to the given topic.
*
* @param topic message topic
* @return queue set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ public interface MessageQueueListener {
/**
* @param topic message topic
* @param mqAll all queues in this message topic
* @param mqDivided collection of queues,assigned to the current consumer
* @param mqAssigned collection of queues, assigned to the current consumer
*/
void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
final Set<MessageQueue> mqDivided);
void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqAssigned);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PopCallback;
import org.apache.rocketmq.client.consumer.PopResult;
Expand Down Expand Up @@ -132,7 +133,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private long queueMaxSpanFlowControlTimes = 0;

//10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
private int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
private final int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};

private static final int MAX_POP_INVISIBLE_TIME = 300000;
private static final int MIN_POP_INVISIBLE_TIME = 5000;
Expand Down Expand Up @@ -1553,4 +1554,11 @@ public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenExcept
int[] getPopDelayLevel() {
return popDelayLevel;
}

public MessageQueueListener getMessageQueueListener() {
if (null == defaultMQPushConsumer) {
return null;
}
return defaultMQPushConsumer.getMessageQueueListener();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
Expand Down Expand Up @@ -52,7 +53,7 @@ public RebalancePushImpl(String consumerGroup, MessageModel messageModel,

@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
/**
/*
* When rebalance result changed, should update subscription's version to notify broker.
* Fix: inconsistency subscription may lead to consumer miss messages.
*/
Expand Down Expand Up @@ -82,6 +83,11 @@ public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<Messa

// notify broker
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLockV2(true);

MessageQueueListener messageQueueListener = defaultMQPushConsumerImpl.getMessageQueueListener();
if (null != messageQueueListener) {
messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
}
}

@Override
Expand Down

0 comments on commit 3dfe731

Please sign in to comment.