diff --git a/broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java b/broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java new file mode 100644 index 00000000000..859311fe9f1 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/AdminAsyncTaskManager.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker; + +import org.apache.rocketmq.common.AsyncTask; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +public class AdminAsyncTaskManager { + + private static final Map ASYNC_TASK_MAP = new ConcurrentHashMap<>(); + + private static final Map> TASK_NAME_TO_IDS_MAP = new ConcurrentHashMap<>(); + + public static String createTask(String taskName) { + String taskId = UUID.randomUUID().toString(); + ASYNC_TASK_MAP.put(taskId, new AsyncTask(taskName, taskId)); + TASK_NAME_TO_IDS_MAP.computeIfAbsent(taskName, k -> new ArrayList<>()).add(taskId); + return taskId; + } + + public static List getTaskIdsByName(String taskName) { + return TASK_NAME_TO_IDS_MAP.getOrDefault(taskName, Collections.emptyList()); + } + + public static AsyncTask getTaskStatus(String taskId) { + return ASYNC_TASK_MAP.get(taskId); + } + + public static void updateTaskStatus(String taskId, int status, String result) { + AsyncTask task = ASYNC_TASK_MAP.get(taskId); + if (task != null) { + task.setStatus(status); + task.setResult(result); + } + } + + public static void removeTask(String taskId) { + AsyncTask task = ASYNC_TASK_MAP.remove(taskId); + if (task != null) { + TASK_NAME_TO_IDS_MAP.computeIfPresent(task.getTaskName(), (k, v) -> { + v.remove(taskId); + return v.isEmpty() ? null : v; + }); + } + } +} \ No newline at end of file diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index a9b913192fa..4ad565f79da 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -56,6 +56,7 @@ import org.apache.rocketmq.auth.authorization.exception.AuthorizationException; import org.apache.rocketmq.auth.authorization.model.Acl; import org.apache.rocketmq.auth.authorization.model.Resource; +import org.apache.rocketmq.broker.AdminAsyncTaskManager; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.auth.converter.AclConverter; import org.apache.rocketmq.broker.auth.converter.UserConverter; @@ -72,6 +73,7 @@ import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; +import org.apache.rocketmq.common.AsyncTask; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; @@ -81,6 +83,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PlainAccessConfig; +import org.apache.rocketmq.common.TaskStatus; import org.apache.rocketmq.common.TopicAttributes; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UnlockCallback; @@ -149,6 +152,8 @@ import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UserInfo; +import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusResponseHeader; import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader; @@ -415,6 +420,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, return this.listAcl(ctx, request); case RequestCode.POP_ROLLBACK: return this.transferPopToFsStore(ctx, request); + case RequestCode.CHECK_ASYNC_TASK_STATUS: + return this.checkAsyncTaskStatus(ctx, request); default: return getUnknownCmdResponse(ctx, request); } @@ -487,11 +494,14 @@ private RemotingCommand updateAndGetGroupForbidden(ChannelHandlerContext ctx, Re private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) { CheckRocksdbCqWriteResult result = new CheckRocksdbCqWriteResult(); result.setCheckStatus(CheckRocksdbCqWriteResult.CheckStatus.CHECK_IN_PROGRESS.getValue()); + String taskId = AdminAsyncTaskManager.createTask("checkRocksdbCqWriteProgress"); Runnable runnable = () -> { try { CheckRocksdbCqWriteResult checkResult = doCheckRocksdbCqWriteProgress(ctx, request); + AdminAsyncTaskManager.updateTaskStatus(taskId, TaskStatus.SUCCESS.getValue(), JSON.toJSONString(checkResult)); LOGGER.info("checkRocksdbCqWriteProgress result: {}", JSON.toJSONString(checkResult)); } catch (Exception e) { + AdminAsyncTaskManager.updateTaskStatus(taskId, TaskStatus.ERROR.getValue(), e.getMessage()); LOGGER.error("checkRocksdbCqWriteProgress error", e); } }; @@ -3597,4 +3607,32 @@ private RemotingCommand transferPopToFsStore(ChannelHandlerContext ctx, Remoting } return response; } + + private RemotingCommand checkAsyncTaskStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + final CheckAsyncTaskStatusRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckAsyncTaskStatusRequestHeader.class); + try { + List taskIds = AdminAsyncTaskManager.getTaskIdsByName(requestHeader.getTaskName()); + if (taskIds == null || taskIds.isEmpty()) { + throw new RemotingCommandException("taskId not found"); + } + + List result = new ArrayList<>(); + for (String taskId : taskIds) { + AsyncTask taskStatus = AdminAsyncTaskManager.getTaskStatus(taskId); + result.add(taskStatus); + + if (taskStatus.getStatus() == TaskStatus.SUCCESS.getValue()) { + AdminAsyncTaskManager.removeTask(taskId); + } + } + + RemotingCommand response = RemotingCommand.createResponseCommand(CheckAsyncTaskStatusResponseHeader.class); + response.setCode(ResponseCode.SUCCESS); + response.setBody(JSON.toJSONBytes(result)); + return response; + } catch (Exception e) { + LOGGER.error("checkAsyncTaskStatus error", e); + return RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR, e.getMessage()); + } + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 114093e3502..547015df5ed 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -55,6 +55,7 @@ import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.rpchook.NamespaceRpcHook; +import org.apache.rocketmq.common.AsyncTask; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.common.MQVersion; @@ -149,6 +150,7 @@ import org.apache.rocketmq.remoting.protocol.header.AddBrokerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CloneGroupOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader; @@ -3601,4 +3603,17 @@ public void exportPopRecord(String brokerAddr, long timeout) throws RemotingConn } throw new MQBrokerException(response.getCode(), response.getRemark()); } + + public List checkAsyncTaskStatus(String brokerAddr, CheckAsyncTaskStatusRequestHeader requestHeader, + long timeoutMillis) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ASYNC_TASK_STATUS, + requestHeader); + RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, timeoutMillis); + assert response != null; + if (response.getCode() == SUCCESS) { + return RemotingSerializable.decodeList(response.getBody(), AsyncTask.class); + } + throw new MQBrokerException(response.getCode(), response.getRemark()); + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/AsyncTask.java b/common/src/main/java/org/apache/rocketmq/common/AsyncTask.java new file mode 100644 index 00000000000..eca3d346c77 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/AsyncTask.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common; + +import java.util.Date; + +public class AsyncTask { + + private String taskName; + + private String taskId; + + private int status; + + private Date createTime; + + private String result; + + public AsyncTask(String taskName, String taskId) { + this.taskName = taskName; + this.taskId = taskId; + this.status = TaskStatus.INIT.getValue(); + this.createTime = new Date(); + this.result = null; + } + + public String getTaskName() { + return taskName; + } + + public void setTaskName(String taskName) { + this.taskName = taskName; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public String getResult() { + return result; + } + + public void setResult(String result) { + this.result = result; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/rocketmq/common/TaskStatus.java b/common/src/main/java/org/apache/rocketmq/common/TaskStatus.java new file mode 100644 index 00000000000..e2bbe2acee9 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/TaskStatus.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.common; + +public enum TaskStatus { + + INIT(0, "Initialized"), + + IN_PROGRESS(1, "In Progress"), + + ERROR(2, "Error"), + + SUCCESS(3, "Success"); + + private final int value; + + private final String desc; + + TaskStatus(int value, String desc) { + this.value = value; + this.desc = desc; + } + + public int getValue() { + return value; + } + + public String getDesc() { + return desc; + } +} \ No newline at end of file diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java index e3b180a5379..2d2f8778cd9 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java @@ -220,6 +220,7 @@ public class RequestCode { public static final int UPDATE_AND_GET_GROUP_FORBIDDEN = 353; public static final int CHECK_ROCKSDB_CQ_WRITE_PROGRESS = 354; public static final int EXPORT_ROCKSDB_CONFIG_TO_JSON = 355; + public static final int CHECK_ASYNC_TASK_STATUS = 356; public static final int LITE_PULL_MESSAGE = 361; public static final int RECALL_MESSAGE = 370; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java new file mode 100644 index 00000000000..04446b234cd --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusRequestHeader.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol.header; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.action.Action; +import org.apache.rocketmq.common.action.RocketMQAction; +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RequestCode; + +@RocketMQAction(value = RequestCode.CHECK_ASYNC_TASK_STATUS, action = Action.GET) +public class CheckAsyncTaskStatusRequestHeader implements CommandCustomHeader { + + private String taskName; + + @Override + public void checkFields() throws RemotingCommandException { + if (StringUtils.isBlank(taskName)) { + throw new RemotingCommandException("taskName cannot be null or blank"); + } + } + + public String getTaskName() { + return taskName; + } + + public void setTaskName(String taskId) { + this.taskName = taskId; + } +} \ No newline at end of file diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusResponseHeader.java new file mode 100644 index 00000000000..13f9b8852e9 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/CheckAsyncTaskStatusResponseHeader.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.protocol.header; + +import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; + + +public class CheckAsyncTaskStatusResponseHeader implements CommandCustomHeader { + + private String taskId; + + private String status; + + private int progress; + + private String result; + + @Override + public void checkFields() throws RemotingCommandException { + + } + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public int getProgress() { + return progress; + } + + public void setProgress(int progress) { + this.progress = progress; + } + + public String getResult() { + return result; + } + + public void setResult(String result) { + this.result = result; + } +} \ No newline at end of file diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index f224f749cbc..30918395ec7 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -25,6 +25,7 @@ import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.AsyncTask; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.common.Pair; @@ -66,6 +67,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.UserInfo; import org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; @@ -1018,4 +1020,9 @@ public void exportPopRecords(String brokerAddr, long timeout) throws RemotingCon RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { defaultMQAdminExtImpl.exportPopRecords(brokerAddr, timeout); } + + public List checkAsyncTaskStatus(String brokerAddr, CheckAsyncTaskStatusRequestHeader requestHeader) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + return defaultMQAdminExtImpl.checkAsyncTaskStatus(brokerAddr, requestHeader); + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 5be99606dc8..69f36ad42ab 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -45,6 +45,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.MQClientManager; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.common.AsyncTask; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.common.KeyBuilder; @@ -104,6 +105,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.UserInfo; import org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; @@ -2099,4 +2101,9 @@ public void exportPopRecords(String brokerAddr, long timeout) throws RemotingCon RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { this.mqClientInstance.getMQClientAPIImpl().exportPopRecord(brokerAddr, timeout); } + + public List checkAsyncTaskStatus(String brokerAddr, CheckAsyncTaskStatusRequestHeader requestHeader) + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { + return this.mqClientInstance.getMQClientAPIImpl().checkAsyncTaskStatus(brokerAddr, requestHeader, timeoutMillis); + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 2f01b6cba81..294c462b070 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -24,6 +24,7 @@ import org.apache.rocketmq.client.MQAdmin; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.AsyncTask; import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PlainAccessConfig; @@ -62,6 +63,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicList; import org.apache.rocketmq.remoting.protocol.body.UserInfo; import org.apache.rocketmq.remoting.protocol.header.ExportRocksDBConfigToJsonRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader; import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader; import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; @@ -534,4 +536,6 @@ String setCommitLogReadAheadMode(final String brokerAddr, String mode) void exportPopRecords(String brokerAddr, long timeout) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException; + + List checkAsyncTaskStatus(String brokerAddr, CheckAsyncTaskStatusRequestHeader requestHeader)throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java new file mode 100644 index 00000000000..c648e861edc --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/CheckAsyncTaskStatusSubCommand.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.tools.command.stats; + +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.AsyncTask; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.header.CheckAsyncTaskStatusRequestHeader; +import org.apache.rocketmq.remoting.protocol.route.BrokerData; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.SubCommand; + +import java.util.List; +import java.util.Map; + +public class CheckAsyncTaskStatusSubCommand implements SubCommand { + + @Override + public String commandName() { + return "checkAsyncTaskStatus"; + } + + @Override + public String commandDesc() { + return "Check the status of an asynchronous task by task name."; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("c", "cluster", true, "Cluster name"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("n", "nameserverAddr", true, "NameServer address"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("t", "taskName", true, "The name of the asynchronous task"); + opt.setRequired(true); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + defaultMQAdminExt.setNamesrvAddr(StringUtils.trim(commandLine.getOptionValue('n'))); + + String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : ""; + String taskName = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : ""; + if (StringUtils.isBlank(taskName)) { + System.out.println("Task name cannot be empty. Please specify a task name with -t."); + return; + } + + try { + defaultMQAdminExt.start(); + + ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); + Map> clusterAddrTable = clusterInfo.getClusterAddrTable(); + Map brokerAddrTable = clusterInfo.getBrokerAddrTable(); + + if (clusterAddrTable.get(clusterName) == null) { + System.out.println("Cluster '" + clusterName + "' not found in cluster address table."); + return; + } + + for (Map.Entry entry : brokerAddrTable.entrySet()) { + String brokerName = entry.getKey(); + String brokerAddr = entry.getValue().getBrokerAddrs().get(0L); + + try { + CheckAsyncTaskStatusRequestHeader requestHeader = new CheckAsyncTaskStatusRequestHeader(); + requestHeader.setTaskName(taskName); + + List asyncTaskStatus = defaultMQAdminExt.checkAsyncTaskStatus(brokerAddr, requestHeader); + + if (asyncTaskStatus != null && !asyncTaskStatus.isEmpty()) { + for (AsyncTask taskStatus : asyncTaskStatus) { + System.out.printf("Found a task for task name '%s' on broker %s: Task ID: %s, Status: %s, Result: %s, CreateTime: %s%n", + taskName, brokerName, taskStatus.getTaskId(), taskStatus.getStatus(), + taskStatus.getResult(), taskStatus.getCreateTime().toString()); + } + } else { + System.out.printf("No tasks found for task name '%s' on broker %s.%n", taskName, brokerName); + } + } catch (Exception e) { + System.out.printf("Failed to query task status for task name '%s' on broker %s: %s%n", + taskName, brokerName, e.getMessage()); + } + } + } catch (Exception e) { + throw new RuntimeException("Failed to execute " + this.getClass().getSimpleName() + " command", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } +}