Skip to content

Commit

Permalink
* executor: tweak shutdown handling, print all tasks not complete
Browse files Browse the repository at this point in the history
Signed-off-by: neo <1100909+neowu@users.noreply.github.com>
  • Loading branch information
neowu committed Feb 6, 2024
1 parent 224a6ee commit e7793b9
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 55 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

### 9.0.6 (1/29/2024 - )

* executor: tweak shutdown handling, print all tasks not complete

### 9.0.5 (1/10/2024 - 1/29/2024)

* json: update jackson to 2.16.1
Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ apply(plugin = "project")

subprojects {
group = "core.framework"
version = "9.0.5"
version = "9.0.6"

repositories {
maven {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

import core.framework.async.Executor;
import core.framework.async.Task;
import core.framework.internal.log.ActionLog;
import core.framework.internal.log.LogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
Expand All @@ -24,10 +27,12 @@
*/
public final class ExecutorImpl implements Executor {
private final Logger logger = LoggerFactory.getLogger(ExecutorImpl.class);
private final ReentrantLock lock = new ReentrantLock();
private final ExecutorService executor;
private final LogManager logManager;
private final long maxProcessTimeInNano;
private final ReentrantLock lock = new ReentrantLock();
private final Set<String> runningTasks = ConcurrentHashMap.newKeySet(); // track running tasks, used to print tasks failed to complete on shutdown

volatile ScheduledExecutorService scheduler;

public ExecutorImpl(ExecutorService executor, LogManager logManager, long maxProcessTimeInNano) {
Expand Down Expand Up @@ -55,8 +60,8 @@ public void shutdown() {
public void awaitTermination(long timeoutInMs) throws InterruptedException {
boolean success = executor.awaitTermination(timeoutInMs, TimeUnit.MILLISECONDS);
if (!success) {
List<Runnable> canceledTasks = executor.shutdownNow(); // only return tasks not started yet
logger.warn(errorCode("FAILED_TO_STOP"), "failed to terminate executor, canceledTasks={}", canceledTasks);
executor.shutdownNow();
logger.warn(errorCode("FAILED_TO_STOP"), "failed to terminate executor, canceledTasks={}", runningTasks);
} else {
logger.info("executor stopped");
}
Expand Down Expand Up @@ -111,22 +116,21 @@ boolean scheduleDelayedTask(String action, Task task, Duration delay) {
}

private <T> Future<T> submitTask(ExecutorTask<T> execution) {
String task = execution.toString(); // task is action:actionId which is unique
try {
runningTasks.add(task);
return executor.submit(execution);
} catch (RejectedExecutionException e) { // with current executor impl, rejection only happens when shutdown
logger.warn(errorCode("TASK_REJECTED"), "reject task due to server is shutting down, action={}", execution.action(), e);
runningTasks.remove(task);
return new CancelledFuture<>();
}
}

private <T> ExecutorTask<T> execution(String actionId, String action, Instant startTime, Callable<T> task) {
var context = new ExecutorTask.TaskContext();
context.actionId = actionId;
context.action = action;
context.startTime = startTime;
context.parentActionLog = LogManager.CURRENT_ACTION_LOG.get();
context.maxProcessTimeInNano = maxProcessTimeInNano;
return new ExecutorTask<>(task, logManager, context);
var context = new ExecutorTask.TaskContext(actionId, action, startTime, maxProcessTimeInNano, runningTasks);
ActionLog parentActionLog = LogManager.CURRENT_ACTION_LOG.get();
return new ExecutorTask<>(task, logManager, context, parentActionLog);
}

class DelayedTask implements Callable<Void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,36 @@
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;

/**
* @author neo
*/
public class ExecutorTask<T> implements Callable<T> {
class ExecutorTask<T> implements Callable<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(ExecutorTask.class);
final String actionId;
private final String action;
private final LogManager logManager;

private final Callable<T> task;
private final Instant startTime;
private final long maxProcessTimeInNano;
private final LogManager logManager;
private final TaskContext context;

private final String rootAction;
private final String refId;
private final String correlationId;
private final String refId;
private final Trace trace;
@Nullable
private final PerformanceWarning[] warnings;

ExecutorTask(Callable<T> task, LogManager logManager, TaskContext context) {
ExecutorTask(Callable<T> task, LogManager logManager, TaskContext context, ActionLog parentActionLog) {
this.task = task;
this.logManager = logManager;
actionId = context.actionId;
action = context.action;
startTime = context.startTime;
maxProcessTimeInNano = context.maxProcessTimeInNano;
ActionLog parentActionLog = context.parentActionLog;
this.context = context;
if (parentActionLog != null) { // only keep info needed by call(), so parentActionLog can be GCed sooner
List<String> parentActionContext = parentActionLog.context.get("root_action");
rootAction = parentActionContext != null ? parentActionContext.get(0) : parentActionLog.action;
rootAction = parentActionContext != null ? parentActionContext.getFirst() : parentActionLog.action;
correlationId = parentActionLog.correlationId();
refId = parentActionLog.id;
trace = parentActionLog.trace;
trace = parentActionLog.trace == Trace.CASCADE ? Trace.CASCADE : null; // trace only with parent.cascade
warnings = parentActionLog.warnings();
} else {
rootAction = null;
Expand All @@ -59,50 +55,46 @@ public class ExecutorTask<T> implements Callable<T> {
@Override
public T call() throws Exception {
VirtualThread.COUNT.increase();
ActionLog actionLog = logManager.begin("=== task execution begin ===", actionId);
ActionLog actionLog = logManager.begin("=== task execution begin ===", context.actionId);
try {
actionLog.action(action());
actionLog.warningContext.maxProcessTimeInNano(maxProcessTimeInNano);
// here it doesn't log task class, is due to task usually is lambda or method reference, it's expensive to inspect, refer to ControllerInspector
actionLog.warningContext.maxProcessTimeInNano(context.maxProcessTimeInNano);
// here doesn't log task class, due to task usually is lambda or method reference, it's expensive to inspect, refer to ControllerInspector
if (rootAction != null) { // if rootAction != null, then all parent info are available
actionLog.context("root_action", rootAction);
LOGGER.debug("correlationId={}", correlationId);
actionLog.correlationIds = List.of(correlationId);
LOGGER.debug("refId={}", refId);
actionLog.refIds = List.of(refId);
if (trace == Trace.CASCADE) actionLog.trace = Trace.CASCADE;
if (trace != null) actionLog.trace = trace;
if (warnings != null) actionLog.initializeWarnings(warnings);
}
LOGGER.debug("taskClass={}", CallableTask.taskClass(task).getName());
Duration delay = Duration.between(startTime, actionLog.date);
Duration delay = Duration.between(context.startTime, actionLog.date);
LOGGER.debug("taskDelay={}", delay);
actionLog.stats.put("task_delay", (double) delay.toNanos());
actionLog.context.put("thread", List.of(Thread.currentThread().getName()));
return task.call();
} catch (Throwable e) {
logManager.logError(e);
throw new TaskException(Strings.format("task failed, action={}, id={}, error={}", action, actionId, e.getMessage()), e);
throw new TaskException(Strings.format("task failed, action={}, id={}, error={}", context.action, context.actionId, e.getMessage()), e);
} finally {
logManager.end("=== task execution end ===");
context.runningTasks.remove(toString());
VirtualThread.COUNT.decrease();
}
}

String action() {
return rootAction == null ? "task:" + action : rootAction + ":task:" + action;
return rootAction == null ? "task:" + context.action : rootAction + ":task:" + context.action;
}

// used to print all canceled tasks during shutdown
// used to print all canceled tasks during shutdown, used by both ScheduledExecutorService and VirtualThreadExecutor
@Override
public String toString() {
return action() + ":" + actionId;
return action() + ":" + context.actionId;
}

static class TaskContext {
String actionId;
String action;
Instant startTime;
ActionLog parentActionLog;
long maxProcessTimeInNano;
record TaskContext(String actionId, String action, Instant startTime, long maxProcessTimeInNano, Set<String> runningTasks) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.junit.jupiter.api.Test;

import java.time.Instant;
import java.util.HashSet;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -16,24 +17,24 @@
class ExecutorTaskTest {
@Test
void action() {
assertThat(new ExecutorTask<Void>(() -> null, null, context(null)).action())
assertThat(new ExecutorTask<Void>(() -> null, null, context(), null).action())
.isEqualTo("task:action");

var parentActionLog = new ActionLog(null, null);
parentActionLog.action = "parentAction";
assertThat(new ExecutorTask<Void>(() -> null, null, context(parentActionLog)).action())
assertThat(new ExecutorTask<Void>(() -> null, null, context(), parentActionLog).action())
.isEqualTo("parentAction:task:action");

parentActionLog.context("root_action", "rootAction");
assertThat(new ExecutorTask<Void>(() -> null, null, context(parentActionLog)).action())
assertThat(new ExecutorTask<Void>(() -> null, null, context(), parentActionLog).action())
.isEqualTo("rootAction:task:action");
}

@Test
void callWithException() {
var task = new ExecutorTask<Void>(() -> {
throw new Error("test");
}, new LogManager(), context(null));
}, new LogManager(), context(), null);
assertThatThrownBy(task::call)
.isInstanceOf(TaskException.class)
.hasMessageContaining("task failed")
Expand All @@ -46,23 +47,17 @@ void call() throws Exception {
var task = new ExecutorTask<>(() -> {
assertThat(ActionLogContext.get("thread")).hasSize(1);
return Boolean.TRUE;
}, new LogManager(), context(null));
}, new LogManager(), context(), null);
assertThat(task.call()).isTrue();
}

@Test
void convertToString() {
assertThat(new ExecutorTask<Void>(() -> null, null, context(null)).toString())
assertThat(new ExecutorTask<Void>(() -> null, null, context(), null).toString())
.isEqualTo("task:action:actionId");
}

private ExecutorTask.TaskContext context(ActionLog parentActionLog) {
var context = new ExecutorTask.TaskContext();
context.actionId = "actionId";
context.action = "action";
context.startTime = Instant.now();
context.parentActionLog = parentActionLog;
context.maxProcessTimeInNano = 25_000_000_000L;
return context;
private ExecutorTask.TaskContext context() {
return new ExecutorTask.TaskContext("actionId", "action", Instant.now(), 25_000_000_000L, new HashSet<>());
}
}

0 comments on commit e7793b9

Please sign in to comment.