Skip to content

Commit

Permalink
Ensure final stage info is never changed
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Jan 8, 2019
1 parent 6259a21 commit 8412a1f
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,14 @@ public void addStateChangeListener(StateChangeListener<StageState> stateChangeLi
}

/**
* Add a listener which is notified when the final stage status is ready. This notification is
* guaranteed to be fired only once.
* Add a listener for the final stage info. This notification is guaranteed to be fired only once.
* Listener is always notified asynchronously using a dedicated notification thread pool so, care should
* be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is
* possible notifications are observed out of order due to the asynchronous execution.
*/
public void addFinalStatusListener(StateChangeListener<BasicStageStats> stateChangeListener)
public void addFinalStageInfoListener(StateChangeListener<StageInfo> stateChangeListener)
{
stateMachine.addFinalStatusListener(ignored -> stateChangeListener.stateChanged(getBasicStageStats()));
stateMachine.addFinalStageInfoListener(stateChangeListener);
}

public void addCompletedDriverGroupsChangedListener(Consumer<Set<Lifespan>> newlyCompletedDriverGroupConsumer)
Expand Down Expand Up @@ -523,7 +522,10 @@ private synchronized void updateFinalTaskInfo(TaskInfo finalTaskInfo)
private synchronized void checkAllTaskFinal()
{
if (stateMachine.getState().isDone() && tasksWithFinalInfo.containsAll(allTasks)) {
stateMachine.setAllTasksFinal();
List<TaskInfo> finalTaskInfos = getAllTasks().stream()
.map(RemoteTask::getTaskInfo)
.collect(toImmutableList());
stateMachine.setAllTasksFinal(finalTaskInfos);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand All @@ -56,6 +57,7 @@
import static com.facebook.presto.execution.StageState.SCHEDULING_SPLITS;
import static com.facebook.presto.execution.StageState.TERMINAL_STAGE_STATES;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.units.DataSize.succinctBytes;
import static io.airlift.units.Duration.succinctDuration;
Expand All @@ -78,7 +80,7 @@ public class StageStateMachine
private final SplitSchedulerStats scheduledStats;

private final StateMachine<StageState> stageState;
private final StateMachine<Boolean> finalStatusReady;
private final StateMachine<Optional<StageInfo>> finalStageInfo;
private final AtomicReference<ExecutionFailureInfo> failureCause = new AtomicReference<>();

private final AtomicReference<DateTime> schedulingComplete = new AtomicReference<>();
Expand All @@ -105,7 +107,7 @@ public StageStateMachine(
stageState = new StateMachine<>("stage " + stageId, executor, PLANNED, TERMINAL_STAGE_STATES);
stageState.addStateChangeListener(state -> log.debug("Stage %s is %s", stageId, state));

finalStatusReady = new StateMachine<>("final stage " + stageId, executor, false, ImmutableList.of(true));
finalStageInfo = new StateMachine<>("final stage " + stageId, executor, Optional.empty());
}

public StageId getStageId()
Expand Down Expand Up @@ -195,27 +197,29 @@ public boolean transitionToFailed(Throwable throwable)
}

/**
* Add a listener which is notified when the final stage status is ready. This notification is
* guaranteed to be fired only once.
* Add a listener for the final stage info. This notification is guaranteed to be fired only once.
* Listener is always notified asynchronously using a dedicated notification thread pool so, care should
* be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is
* possible notifications are observed out of order due to the asynchronous execution.
*/
public void addFinalStatusListener(StateChangeListener<?> finalStatusListener)
public void addFinalStageInfoListener(StateChangeListener<StageInfo> finalStatusListener)
{
AtomicBoolean done = new AtomicBoolean();
StateChangeListener<Boolean> fireOnceStateChangeListener = isReady -> {
if (isReady && done.compareAndSet(false, true)) {
finalStatusListener.stateChanged(null);
StateChangeListener<Optional<StageInfo>> fireOnceStateChangeListener = finalStageInfo -> {
if (finalStageInfo.isPresent() && done.compareAndSet(false, true)) {
finalStatusListener.stateChanged(finalStageInfo.get());
}
};
finalStatusReady.addStateChangeListener(fireOnceStateChangeListener);
finalStageInfo.addStateChangeListener(fireOnceStateChangeListener);
}

public void setAllTasksFinal()
public void setAllTasksFinal(Iterable<TaskInfo> finalTaskInfos)
{
requireNonNull(finalTaskInfos, "finalTaskInfos is null");
checkState(stageState.get().isDone());
finalStatusReady.set(true);
StageInfo stageInfo = getStageInfo(() -> finalTaskInfos);
checkArgument(stageInfo.isCompleteInfo(), "finalTaskInfos are not all done");
finalStageInfo.compareAndSet(Optional.empty(), Optional.of(stageInfo));
}

public long getUserMemoryReservation()
Expand All @@ -237,6 +241,13 @@ public void updateMemoryUsage(long deltaUserMemoryInBytes, long deltaTotalMemory

public BasicStageStats getBasicStageStats(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
{
Optional<StageInfo> finalStageInfo = this.finalStageInfo.get();
if (finalStageInfo.isPresent()) {
return finalStageInfo.get()
.getStageStats()
.toBasicStageStats(finalStageInfo.get().getState());
}

// stage state must be captured first in order to provide a
// consistent view of the stage. For example, building this
// information, the stage could finish, and the task states would
Expand Down Expand Up @@ -324,6 +335,11 @@ public BasicStageStats getBasicStageStats(Supplier<Iterable<TaskInfo>> taskInfos

public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
{
Optional<StageInfo> finalStageInfo = this.finalStageInfo.get();
if (finalStageInfo.isPresent()) {
return finalStageInfo.get();
}

// stage state must be captured first in order to provide a
// consistent view of the stage. For example, building this
// information, the stage could finish, and the task states would
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
import javax.annotation.concurrent.Immutable;

import java.util.List;
import java.util.OptionalDouble;
import java.util.Set;

import static com.facebook.presto.execution.StageState.RUNNING;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;

@Immutable
Expand Down Expand Up @@ -345,4 +348,31 @@ public List<OperatorStats> getOperatorSummaries()
{
return operatorSummaries;
}

public BasicStageStats toBasicStageStats(StageState stageState)
{
boolean isScheduled = (stageState == RUNNING) || stageState.isDone();

OptionalDouble progressPercentage = OptionalDouble.empty();
if (isScheduled && totalDrivers != 0) {
progressPercentage = OptionalDouble.of(min(100, (completedDrivers * 100.0) / totalDrivers));
}

return new BasicStageStats(
isScheduled,
totalDrivers,
queuedDrivers,
runningDrivers,
completedDrivers,
rawInputDataSize,
rawInputPositions,
(long) cumulativeUserMemory,
userMemoryReservation,
totalMemoryReservation,
totalCpuTime,
totalScheduledTime,
fullyBlocked,
blockedReasons,
progressPercentage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ else if (queryStateMachine.getQueryState() == QueryState.STARTING) {
}
});
for (SqlStageExecution stage : stages.values()) {
stage.addFinalStatusListener(status -> queryStateMachine.updateQueryInfo(Optional.ofNullable(getStageInfo())));
stage.addFinalStageInfoListener(status -> queryStateMachine.updateQueryInfo(Optional.ofNullable(getStageInfo())));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;

public class TestSqlStageExecution
Expand Down Expand Up @@ -115,7 +116,7 @@ private void testFinalStageInfoInternal()

// add listener that fetches stage info when the final status is available
SettableFuture<StageInfo> finalStageInfo = SettableFuture.create();
stage.addFinalStatusListener(value -> finalStageInfo.set(stage.getStageInfo()));
stage.addFinalStageInfoListener(finalStageInfo::set);

// in a background thread add a ton of tasks
CountDownLatch latch = new CountDownLatch(1000);
Expand Down Expand Up @@ -150,7 +151,7 @@ private void testFinalStageInfoInternal()
StageInfo stageInfo = finalStageInfo.get(1, MINUTES);
assertFalse(stageInfo.getTasks().isEmpty());
assertTrue(stageInfo.isCompleteInfo());
assertTrue(stage.getStageInfo().isCompleteInfo());
assertSame(stage.getStageInfo(), stageInfo);

// cancel the background thread adding tasks
addTasksTask.cancel(true);
Expand Down

0 comments on commit 8412a1f

Please sign in to comment.