diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java index fb371dce05f..647e3d6ff7f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java @@ -496,10 +496,13 @@ public long revive(long currentTime, int maxCount) { if (record.getAttemptTimes() < brokerConfig.getPopReviveMaxAttemptTimes()) { long backoffInterval = 1000L * REWRITE_INTERVALS_IN_SECONDS[ Math.min(REWRITE_INTERVALS_IN_SECONDS.length, record.getAttemptTimes())]; - record.setInvisibleTime(record.getInvisibleTime() + backoffInterval); - record.setAttemptTimes(record.getAttemptTimes() + 1); - failureList.add(record); - log.warn("PopConsumerService revive backoff retry, record={}", record); + long nextInvisibleTime = record.getInvisibleTime() + backoffInterval; + PopConsumerRecord retryRecord = new PopConsumerRecord(record.getPopTime(), record.getGroupId(), + record.getTopicId(), record.getQueueId(), record.getRetryFlag(), nextInvisibleTime, + record.getOffset(), record.getAttemptId()); + retryRecord.setAttemptTimes(record.getAttemptTimes() + 1); + failureList.add(retryRecord); + log.warn("PopConsumerService revive backoff retry, record={}", retryRecord); } else { log.error("PopConsumerService drop record, message may be lost, record={}", record); } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java index 5e73adb1ea1..b77c170c8c6 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java @@ -385,6 +385,42 @@ public void reviveRetryTest() { consumerService.shutdown(); } + @Test + public void reviveBackoffRetryTest() { + Mockito.when(brokerController.getEscapeBridge()).thenReturn(Mockito.mock(EscapeBridge.class)); + PopConsumerService consumerServiceSpy = Mockito.spy(consumerService); + + consumerService.getPopConsumerStore().start(); + + long popTime = 1000000000L; + long invisibleTime = 60 * 1000L; + PopConsumerRecord record = new PopConsumerRecord(); + record.setPopTime(popTime); + record.setInvisibleTime(invisibleTime); + record.setTopicId("topic"); + record.setGroupId("group"); + record.setQueueId(0); + record.setOffset(0); + consumerService.getPopConsumerStore().writeRecords(Collections.singletonList(record)); + + Mockito.doReturn(CompletableFuture.completedFuture(Triple.of(Mockito.mock(MessageExt.class), "", false))) + .when(consumerServiceSpy).getMessageAsync(any(PopConsumerRecord.class)); + Mockito.when(brokerController.getEscapeBridge().putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn( + new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)) + ); + + long visibleTimestamp = popTime + invisibleTime; + + // revive fails + Assert.assertEquals(1, consumerServiceSpy.revive(visibleTimestamp, 1)); + // should be invisible now + Assert.assertEquals(0, consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp, 1).size()); + // will be visible again in 10 seconds + Assert.assertEquals(1, consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp + 10 * 1000, 1).size()); + + consumerService.shutdown(); + } + @Test public void transferToFsStoreTest() { Assert.assertNotNull(consumerService.getServiceName());