Skip to content

Commit

Permalink
[ISSUE #9097]Add new command to check async task status in broker.
Browse files Browse the repository at this point in the history
  • Loading branch information
MatthewAden committed Jan 27, 2025
1 parent de4e48d commit c869383
Show file tree
Hide file tree
Showing 12 changed files with 500 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker;

import org.apache.rocketmq.common.AsyncTask;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class AdminAsyncTaskManager {

private static final Map<String, AsyncTask> ASYNC_TASK_MAP = new ConcurrentHashMap<>();

private static final Map<String, List<String>> TASK_NAME_TO_IDS_MAP = new ConcurrentHashMap<>();

public static String createTask(String taskName) {
String taskId = UUID.randomUUID().toString();
ASYNC_TASK_MAP.put(taskId, new AsyncTask(taskName, taskId));
TASK_NAME_TO_IDS_MAP.computeIfAbsent(taskName, k -> new ArrayList<>()).add(taskId);
return taskId;
}

public static List<String> getTaskIdsByName(String taskName) {
return TASK_NAME_TO_IDS_MAP.getOrDefault(taskName, Collections.emptyList());
}

public static AsyncTask getTaskStatus(String taskId) {
return ASYNC_TASK_MAP.get(taskId);
}

public static void updateTaskStatus(String taskId, int status, String result) {
AsyncTask task = ASYNC_TASK_MAP.get(taskId);
if (task != null) {
task.setStatus(status);
task.setResult(result);
}
}

public static void removeTask(String taskId) {
AsyncTask task = ASYNC_TASK_MAP.remove(taskId);
if (task != null) {
TASK_NAME_TO_IDS_MAP.computeIfPresent(task.getTaskName(), (k, v) -> {
v.remove(taskId);
return v.isEmpty() ? null : v;
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.rocketmq.auth.authorization.exception.AuthorizationException;
import org.apache.rocketmq.auth.authorization.model.Acl;
import org.apache.rocketmq.auth.authorization.model.Resource;
import org.apache.rocketmq.broker.AdminAsyncTaskManager;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.auth.converter.AclConverter;
import org.apache.rocketmq.broker.auth.converter.UserConverter;
Expand All @@ -72,6 +73,7 @@
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.AsyncTask;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
Expand All @@ -81,6 +83,7 @@
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TaskStatus;
import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UnlockCallback;
Expand Down Expand Up @@ -149,6 +152,8 @@
import org.apache.rocketmq.remoting.protocol.body.TopicList;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
Expand Down Expand Up @@ -415,6 +420,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
return this.listAcl(ctx, request);
case RequestCode.POP_ROLLBACK:
return this.transferPopToFsStore(ctx, request);
case RequestCode.CHECK_ASYNC_TASK_STATUS:
return this.checkAsyncTaskStatus(ctx, request);
default:
return getUnknownCmdResponse(ctx, request);
}
Expand Down Expand Up @@ -487,11 +494,14 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re
private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) {
CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult();
result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue());
String taskId = AdminAsyncTaskManager.createTask("checkRocksdbCqWriteProgress");
Runnable runnable = () -> {
try {
CheckRocksdbCqWriteResult checkResult = doCheckRocksdbCqWriteProgress(ctx, request);
AdminAsyncTaskManager.updateTaskStatus(taskId, TaskStatus.SUCCESS.getValue(), JSON.toJSONString(checkResult));
LOGGER.info("checkRocksdbCqWriteProgress result: {}", JSON.toJSONString(checkResult));
} catch (Exception e) {
AdminAsyncTaskManager.updateTaskStatus(taskId, TaskStatus.ERROR.getValue(), e.getMessage());
LOGGER.error("checkRocksdbCqWriteProgress error", e);
}
};
Expand Down Expand Up @@ -3597,4 +3607,32 @@ private RemotingCommand transferPopToFsStore(ChannelHandlerContext ctx, Remoting
}
return response;
}

private RemotingCommand checkAsyncTaskStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final CheckAsyncTaskStatusRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckAsyncTaskStatusRequestHeader.class);
try {
List<String> taskIds = AdminAsyncTaskManager.getTaskIdsByName(requestHeader.getTaskName());
if (taskIds == null || taskIds.isEmpty()) {
throw new RemotingCommandException("taskId not found");
}

List<AsyncTask> result = new ArrayList<>();
for (String taskId : taskIds) {
AsyncTask taskStatus = AdminAsyncTaskManager.getTaskStatus(taskId);
result.add(taskStatus);

if (taskStatus.getStatus() == TaskStatus.SUCCESS.getValue()) {
AdminAsyncTaskManager.removeTask(taskId);
}
}

RemotingCommand response = RemotingCommand.createResponseCommand(CheckAsyncTaskStatusResponseHeader.class);
response.setCode(ResponseCode.SUCCESS);
response.setBody(JSON.toJSONBytes(result));
return response;
} catch (Exception e) {
LOGGER.error("checkAsyncTaskStatus error", e);
return RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.rpchook.NamespaceRpcHook;
import org.apache.rocketmq.common.AsyncTask;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
import org.apache.rocketmq.common.MQVersion;
Expand Down Expand Up @@ -149,6 +150,7 @@
import org.apache.rocketmq.remoting.protocol.header.AddBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
Expand Down Expand Up @@ -3601,4 +3603,17 @@ public void exportPopRecord(String brokerAddr, long timeout) throws RemotingConn
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}

public List<AsyncTask> checkAsyncTaskStatus(String brokerAddr, CheckAsyncTaskStatusRequestHeader requestHeader,
long timeoutMillis)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ASYNC_TASK_STATUS,
requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis);
assert response != null;
if (response.getCode() == SUCCESS) {
return RemotingSerializable.decodeList(response.getBody(), AsyncTask.class);
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
}
81 changes: 81 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/AsyncTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.common;

import java.util.Date;

public class AsyncTask {

private String taskName;

private String taskId;

private int status;

private Date createTime;

private String result;

public AsyncTask(String taskName, String taskId) {
this.taskName = taskName;
this.taskId = taskId;
this.status = TaskStatus.INIT.getValue();
this.createTime = new Date();
this.result = null;
}

public String getTaskName() {
return taskName;
}

public void setTaskName(String taskName) {
this.taskName = taskName;
}

public int getStatus() {
return status;
}

public void setStatus(int status) {
this.status = status;
}

public String getResult() {
return result;
}

public void setResult(String result) {
this.result = result;
}

public Date getCreateTime() {
return createTime;
}

public void setCreateTime(Date createTime) {
this.createTime = createTime;
}

public String getTaskId() {
return taskId;
}

public void setTaskId(String taskId) {
this.taskId = taskId;
}
}
46 changes: 46 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/TaskStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.common;

public enum TaskStatus {

INIT(0, "Initialized"),

IN_PROGRESS(1, "In Progress"),

ERROR(2, "Error"),

SUCCESS(3, "Success");

private final int value;

private final String desc;

TaskStatus(int value, String desc) {
this.value = value;
this.desc = desc;
}

public int getValue() {
return value;
}

public String getDesc() {
return desc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public class RequestCode {
public static final int UPDATE_AND_GET_GROUP_FORBIDDEN = 353;
public static final int CHECK_ROCKSDB_CQ_WRITE_PROGRESS = 354;
public static final int EXPORT_ROCKSDB_CONFIG_TO_JSON = 355;
public static final int CHECK_ASYNC_TASK_STATUS = 356;

public static final int LITE_PULL_MESSAGE = 361;
public static final int RECALL_MESSAGE = 370;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.remoting.protocol.header;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.action.Action;
import org.apache.rocketmq.common.action.RocketMQAction;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RequestCode;

@RocketMQAction(value = RequestCode.CHECK_ASYNC_TASK_STATUS, action = Action.GET)
public class CheckAsyncTaskStatusRequestHeader implements CommandCustomHeader {

private String taskName;

@Override
public void checkFields() throws RemotingCommandException {
if (StringUtils.isBlank(taskName)) {
throw new RemotingCommandException("taskName cannot be null or blank");
}
}

public String getTaskName() {
return taskName;
}

public void setTaskName(String taskId) {
this.taskName = taskId;
}
}
Loading

0 comments on commit c869383

Please sign in to comment.