Skip to content

Commit

Permalink
[ISSUE #9149]Assign offset in offsetTable even if the subscription ke…
Browse files Browse the repository at this point in the history
…y not exist. (#9150)

* Assign offset in offsetTable even if the subscription key not exist.
  • Loading branch information
dingshuangxi888 authored Feb 6, 2025
1 parent 8086fc5 commit 0176ef4
Showing 1 changed file with 4 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker.offset;

import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -417,27 +418,14 @@ public void assignResetOffset(String topic, String group, int queueId, long offs
}

String key = topic + TOPIC_GROUP_SEPARATOR + group;
ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
if (null == map) {
map = new ConcurrentHashMap<Integer, Long>();
ConcurrentMap<Integer, Long> previous = resetOffsetTable.putIfAbsent(key, map);
if (null != previous) {
map = previous;
}
}

map.put(queueId, offset);
LOG.debug("Reset offset OK. Topic={}, group={}, queueId={}, resetOffset={}",
topic, group, queueId, offset);
resetOffsetTable.computeIfAbsent(key, k -> Maps.newConcurrentMap()).put(queueId, offset);
LOG.debug("Reset offset OK. Topic={}, group={}, queueId={}, resetOffset={}", topic, group, queueId, offset);

// Two things are important here:
// 1, currentOffsetMap might be null if there is no previous records;
// 2, Our overriding here may get overridden by the client instantly in concurrent cases; But it still makes
// sense in cases like clients are offline.
ConcurrentMap<Integer, Long> currentOffsetMap = offsetTable.get(key);
if (null != currentOffsetMap) {
currentOffsetMap.put(queueId, offset);
}
offsetTable.computeIfAbsent(key, k -> Maps.newConcurrentMap()).put(queueId, offset);
}

public boolean hasOffsetReset(String topic, String group, int queueId) {
Expand Down

0 comments on commit 0176ef4

Please sign in to comment.