Skip to content

Commit

Permalink
Allow async commands to suspend on interrupt
Browse files Browse the repository at this point in the history
Incomplete implementation; breaks when a command interrupts a suspendable command of the same priority

Add a standard unit for nanoseconds to support sub-millisecond pause precision
  • Loading branch information
SamCarlberg committed Apr 19, 2024
1 parent 787a863 commit 3661407
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,24 @@ default int priority() {
return DEFAULT_PRIORITY;
}

enum InterruptBehavior {
/**
* Cancel the command when interrupted. This is the default behavior.
*/
Cancel,

/**
* Suspend the command when interrupted, resuming when no higher-priority commands are still
* running. Useful for commands to automatically pick back up from where they left off; however,
* be careful to ensure the command should still be running when it resumes!
*/
Suspend
}

default InterruptBehavior interruptBehavior() {
return InterruptBehavior.Cancel;
}

/**
* Checks if this command has a lower {@link #priority() priority} than another command.
*
Expand Down Expand Up @@ -158,7 +176,7 @@ static AsyncCommandBuilder requiring(HardwareResource requirement, HardwareResou
* @throws InterruptedException if the command was interrupted by another command while paused
*/
static void pause(Measure<Time> duration) throws InterruptedException {
Thread.sleep((long) duration.in(Milliseconds));
AsyncScheduler.getInstance().pauseCurrentCommand(duration);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class AsyncCommandBuilder {
private ThrowingRunnable impl;
private String name;
private int priority = AsyncCommand.DEFAULT_PRIORITY;
private AsyncCommand.InterruptBehavior interruptBehavior = AsyncCommand.InterruptBehavior.Cancel;

public AsyncCommandBuilder requiring(HardwareResource resource) {
requirements.add(resource);
Expand Down Expand Up @@ -41,6 +42,15 @@ public AsyncCommandBuilder withPriority(int priority) {
return this;
}

public AsyncCommandBuilder suspendOnInterrupt() {
return withInterruptBehavior(AsyncCommand.InterruptBehavior.Suspend);
}

public AsyncCommandBuilder withInterruptBehavior(AsyncCommand.InterruptBehavior behavior) {
this.interruptBehavior = behavior;
return this;
}

public AsyncCommandBuilder executing(ThrowingRunnable impl) {
this.impl = impl;
return this;
Expand Down Expand Up @@ -71,6 +81,11 @@ public int priority() {
return priority;
}

@Override
public InterruptBehavior interruptBehavior() {
return interruptBehavior;
}

@Override
public String toString() {
return name();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package edu.wpi.first.wpilibj3.command.async;

import static edu.wpi.first.units.Units.Milliseconds;
import static edu.wpi.first.units.Units.Nanoseconds;

import edu.wpi.first.units.Measure;
import edu.wpi.first.units.Time;
Expand All @@ -9,7 +10,9 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -63,20 +66,27 @@ public static AsyncScheduler getInstance() {
}

public AsyncScheduler() {
// Start polling the event loop in 20ms intervals
this(Milliseconds.of(20));
}

public AsyncScheduler(Measure<Time> triggerPollingPeriod) {
// Start polling the event loop
// This intentionally runs on the same carrier thread as commands so the boolean checks
// will be guaranteed to run on the same thread as the commands that manipulate state
service.submit(
() -> {
while (true) {
try {
Thread.sleep(20);
eventLoop.poll();
} catch (Exception e) {
// ignore and keep running
}
}
});
service.submit(() -> pollEventLoop(triggerPollingPeriod));
}

@SuppressWarnings({"InfiniteLoopStatement", "BusyWait"})
private void pollEventLoop(Measure<Time> period) {
var ms = (long) period.in(Milliseconds);
while (true) {
try {
Thread.sleep(ms);
eventLoop.poll();
} catch (Exception e) {
// ignore and keep running
}
}
}

public void registerResource(HardwareResource resource) {
Expand Down Expand Up @@ -164,7 +174,8 @@ public void schedule(AsyncCommand command) {
runningCommands.put(resource, command);
shadowrun.computeIfAbsent(resource, _r -> new HashSet<>()).add(command);

if (runningCommand != null) {
if (runningCommand != null
&& runningCommand.interruptBehavior() == AsyncCommand.InterruptBehavior.Cancel) {
cancel(runningCommand);
}
}
Expand Down Expand Up @@ -216,6 +227,7 @@ public void checkForErrors() {
*/
private Callable<?> createCommandExecutorCallback(AsyncCommand command) {
return () -> {
commandOnCurrentThread.set(command);
try {
command.run();
return null;
Expand Down Expand Up @@ -375,7 +387,8 @@ public Map<HardwareResource, AsyncCommand> getRunningCommands() {
/** Cancels all currently running commands. */
public void cancelAll() {
Util.writing(
lock, () -> runningCommands.forEach((_resource, runningCommand) -> cancel(runningCommand)));
lock,
() -> List.copyOf(runningCommands.values()).forEach(this::cancel));
}

/**
Expand All @@ -397,7 +410,82 @@ public void scheduleDefault(HardwareResource resource) {
* using it
*/
public AsyncCommand getCommandUsing(HardwareResource resource) {
return runningCommands.get(resource);
return Util.reading(lock, () -> runningCommands.get(resource));
}

private final ThreadLocal<AsyncCommand> commandOnCurrentThread = new ThreadLocal<>();

/**
* Pauses the current command for 20 milliseconds. If a higher priority command on
* any of the current command's required resources is scheduled while paused, and if the current
* command is set to {@link AsyncCommand.InterruptBehavior#Suspend}, then the command will
* continue to stay paused as long as any higher priority command is running. Otherwise, a higher
* priority command will interrupt the command while it is paused.
*
* @throws InterruptedException if the current command was canceled, or, if the command's
* {@link AsyncCommand#interruptBehavior() interrupt behavior} is not set to suspend,
* interrupted by a higher priority command
*/
public void pauseCurrentCommand() throws InterruptedException {
pauseCurrentCommand(DEFAULT_UPDATE_PERIOD);
}

/**
* Pauses the current command for the specified period of time. If a higher priority command on
* any of the current command's required resources is scheduled while paused, and if the current
* command is set to {@link AsyncCommand.InterruptBehavior#Suspend}, then the command will
* continue to stay paused as long as any higher priority command is running. Otherwise, a higher
* priority command will interrupt the command while it is paused.
*
* @throws InterruptedException if the current command was canceled, or, if the command's
* {@link AsyncCommand#interruptBehavior() interrupt behavior} is not set to suspend,
* interrupted by a higher priority command
*/
@SuppressWarnings("BusyWait")
public void pauseCurrentCommand(Measure<Time> time) throws InterruptedException {
var command = commandOnCurrentThread.get();
if (command == null) {
throw new IllegalStateException(
"pauseCurrentCommand() may only be called by a running command!");
}

long ms = (long) time.in(Milliseconds);
int ns = (int) (time.in(Nanoseconds) % 1e6);

// Always sleep once
Thread.sleep(ms, ns);

// Then, if the command is configured to suspend while higher priority commands run,
// continue to sleep in a loop. If we're ever interrupted, it means the command was canceled
// outright, and we should respect the cancellation
if (command.interruptBehavior() == AsyncCommand.InterruptBehavior.Suspend) {
// TODO: We allow commands to interrupt running commands with the same priority level.
// However, this check only pauses if a HIGHER priority command is running, and thus can
// allow two commands of the same priority level to run concurrently. THIS IS BAD
for (var highestPriority = getHighestPriorityCommandUsingAnyOf(command.requirements());
highestPriority.isPresent() && highestPriority.getAsInt() >= command.priority();
highestPriority = getHighestPriorityCommandUsingAnyOf(command.requirements())) {
Thread.sleep(ms, ns);
}
}
}

public OptionalInt getHighestPriorityCommandUsingAnyOf(Collection<HardwareResource> resources) {
return Util.reading(lock, () -> {
boolean set = false;
int priority = Integer.MIN_VALUE;
for (HardwareResource resource : resources) {
var command = getCommandUsing(resource);
if (command == null) {
continue;
}
priority = Math.max(priority, command.priority());
set = true;
}

if (set) return OptionalInt.of(priority);
else return OptionalInt.empty();
});
}

public Collection<AsyncCommand> getAllCommandsUsing(HardwareResource resource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import static edu.wpi.first.units.Units.Milliseconds;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -187,7 +189,7 @@ void atomicity() throws Exception {
AsyncCommand.noHardware(
() -> {
for (int i = 0; i < iterations; i++) {
Thread.sleep(1);
scheduler.pauseCurrentCommand(Milliseconds.one());
resource.x++;
}
})
Expand Down Expand Up @@ -261,7 +263,7 @@ void runResource() throws Exception {
() -> {
example.x = 0;
for (int i = 0; i < 10; i++) {
AsyncCommand.pause();
scheduler.pauseCurrentCommand();
example.x++;
}
})
Expand All @@ -273,6 +275,93 @@ void runResource() throws Exception {
assertEquals(10, example.x);
}

@RepeatedTest(10)
void suspendOnInterruptContinuesAutomatically() throws Exception {
var scheduler = new AsyncScheduler();
var count = new AtomicInteger(0);

var resource = new HardwareResource("Resource", scheduler);

var suspendable = AsyncCommand.requiring(resource).executing(() -> {
count.set(1);
scheduler.pauseCurrentCommand();
count.set(2);
}).suspendOnInterrupt().withPriority(1).named("Suspender");

var interrupter = AsyncCommand.requiring(resource).executing(() -> {
Thread.sleep(37);
}).withPriority(2).named("Interrupter");

scheduler.schedule(suspendable);
Thread.sleep(5); // wait for command to start up and hit the pause
scheduler.schedule(interrupter);
scheduler.await(interrupter);
assertTrue(scheduler.isRunning(suspendable)); // should still be running
assertTrue(scheduler.shadowrun.get(resource).contains(suspendable)); // and in the shadowrun too
scheduler.await(suspendable);
assertEquals(2, count.get());
}

@RepeatedTest(10)
void cancelOnSuspendedReallyDoesCancel() throws Exception {
var scheduler = new AsyncScheduler();
var count = new AtomicInteger(0);

var resource = new HardwareResource("Resource", scheduler);

var suspendable = AsyncCommand.requiring(resource).executing(() -> {
count.set(1);
scheduler.pauseCurrentCommand();
count.set(2);
}).suspendOnInterrupt().withPriority(1).named("Suspender");

var interrupter = AsyncCommand.requiring(resource).executing(() -> {
Thread.sleep(50);
}).withPriority(2).named("Interrupter");

scheduler.schedule(suspendable);
Thread.sleep(5); // wait for command to start up and hit the pause
scheduler.schedule(interrupter);
scheduler.cancel(suspendable);
assertFalse(scheduler.isRunning(suspendable)); // should not still be running
assertFalse(scheduler.shadowrun.get(resource).contains(suspendable)); // and in the shadowrun too
scheduler.await(suspendable);
assertEquals(1, count.get());
scheduler.cancelAll();
}

@Test
void cancelOnInterruptDoesNotResume() throws Exception {
var scheduler = new AsyncScheduler();

var count = new AtomicInteger(0);

var resource = new HardwareResource("Resource", scheduler);

var interrupter = AsyncCommand.requiring(resource).executing(() -> {
Thread.sleep(50);
}).withPriority(2).named("Interrupter");

var canceledCommand = AsyncCommand.requiring(resource).executing(() -> {
count.set(1);
scheduler.pauseCurrentCommand();
count.set(2);
}).withPriority(1).named("Cancel By Default");

scheduler.schedule(canceledCommand);
Thread.sleep(5); // wait for command to start up and hit the pause

scheduler.schedule(interrupter);
scheduler.await(canceledCommand);
assertEquals(1, count.get()); // the second set should not have run
}

@Test
void pauseOutsideCommand() {
var scheduler = new AsyncScheduler();
assertThrows(IllegalStateException.class, scheduler::pauseCurrentCommand);
}

record PriorityCommand(int priority, HardwareResource... subsystems) implements AsyncCommand {
@Override
public void run() throws Exception {
Expand Down
6 changes: 6 additions & 0 deletions wpiunits/src/main/java/edu/wpi/first/units/Units.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ private Units() {
/** Alias for {@link #Microseconds} to make combined unit definitions read more smoothly. */
public static final Time Microsecond = Microseconds; // singularized alias

/** 1/1,000,000,000 of a {@link #Seconds Second}. */
public static final Time Nanoseconds = derive(Seconds).splitInto(1e9).named("Nanosecond").symbol("ns").make();

/** 1/1,000,000,000 of a {@link #Seconds Second}. */
public static final Time Nanosecond = Nanoseconds; // singularized alias

/** 60 {@link #Seconds}. */
public static final Time Minutes =
derive(Seconds).aggregate(60).named("Minute").symbol("min").make();
Expand Down

0 comments on commit 3661407

Please sign in to comment.