Skip to content

Commit

Permalink
[ISSUE #423]🐛Fix after sending the message, the queueOffset did not i…
Browse files Browse the repository at this point in the history
…ncrease (#424)

* [ISSUE #423]🐛Fix after sending the message, the queueOffset did not increase

* fix test case error
  • Loading branch information
mxsm authored Jun 4, 2024
1 parent 774e0c4 commit 08aa6e0
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 6 deletions.
2 changes: 1 addition & 1 deletion rocketmq-store/src/queue/local_file_consume_queue_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ impl ConsumeQueueStoreTrait for ConsumeQueueStore {
for (topic, consume_queue_table) in self.inner.consume_queue_table.lock().iter_mut() {
for (queue_id, consume_queue) in consume_queue_table.iter_mut() {
let guard = consume_queue.lock();
let key = format!("{}_{}", guard.get_topic(), guard.get_queue_id());
let key = format!("{}-{}", guard.get_topic(), guard.get_queue_id());
let max_offset_in_queue = guard.get_max_offset_in_queue();
if guard.get_cq_type() == CQType::SimpleCQ {
cq_offset_table.insert(key, max_offset_in_queue);
Expand Down
8 changes: 4 additions & 4 deletions rocketmq-store/src/queue/queue_offset_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl QueueOffsetOperator {
}

pub fn remove(&self, topic: &str, queue_id: i32) {
let topic_queue_key = format!("{}_{}", topic, queue_id);
let topic_queue_key = format!("{}-{}", topic, queue_id);
// Beware of thread-safety
let mut topic_queue_table = self.topic_queue_table.lock();
topic_queue_table.remove(&topic_queue_key);
Expand Down Expand Up @@ -192,9 +192,9 @@ mod tests {

operator.remove("topic", 1);

assert_eq!(operator.get_queue_offset("topic-1"), 5);
assert_eq!(operator.get_batch_queue_offset("topic-1"), 5);
assert_eq!(operator.get_lmq_offset("topic-1"), 5);
assert_eq!(operator.get_queue_offset("topic-1"), 0);
assert_eq!(operator.get_batch_queue_offset("topic-1"), 0);
assert_eq!(operator.get_lmq_offset("topic-1"), 0);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-store/src/queue/single_consume_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ impl ConsumeQueueTrait for ConsumeQueue {
msg: &mut MessageExtBrokerInner,
) {
let queue_offset = queue_offset_operator
.get_queue_offset(format!("{}_{}", msg.topic(), msg.queue_id()).as_str());
.get_queue_offset(format!("{}-{}", msg.topic(), msg.queue_id()).as_str());
msg.message_ext_inner.queue_offset = queue_offset;
}

Expand Down

0 comments on commit 08aa6e0

Please sign in to comment.