Skip to content

Commit

Permalink
Make task runner logger configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasbindreiter committed May 10, 2024
1 parent 313b1ac commit 0558ab2
Showing 1 changed file with 16 additions and 14 deletions.
30 changes: 16 additions & 14 deletions workflows/v1/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ type TaskRunner struct {
Client workflowsv1connect.TaskServiceClient
taskDefinitions map[taskIdentifier]ExecutableTask
tracer trace.Tracer
log *slog.Logger
}

func NewTaskRunner(client workflowsv1connect.TaskServiceClient) *TaskRunner {
return &TaskRunner{
Client: client,
taskDefinitions: make(map[taskIdentifier]ExecutableTask),
tracer: otel.Tracer("tilebox.com/observability"),
log: slog.Default(),
}
}

Expand Down Expand Up @@ -119,7 +121,7 @@ func (t *TaskRunner) Run(ctx context.Context) {
NextTaskToRun: &workflowsv1.NextTaskToRun{ClusterSlug: DefaultClusterSlug, Identifiers: identifiers},
}))
if err != nil {
slog.ErrorContext(ctx, "failed to work-steal a task", "error", err)
t.log.ErrorContext(ctx, "failed to work-steal a task", "error", err)
// return // should we even try again, or just stop here?
} else {
task = taskResponse.Msg.GetNextTask()
Expand All @@ -128,7 +130,7 @@ func (t *TaskRunner) Run(ctx context.Context) {

if task != nil { // we have a task to execute
if isEmpty(task.GetId()) {
slog.ErrorContext(ctx, "got a task without an ID - skipping to the next task")
t.log.ErrorContext(ctx, "got a task without an ID - skipping to the next task")
task = nil
continue
}
Expand Down Expand Up @@ -157,33 +159,33 @@ func (t *TaskRunner) Run(ctx context.Context) {
ComputedTask: computedTask, NextTaskToRun: nextTaskToRun,
}))
if err != nil {
slog.ErrorContext(ctx, "failed to mark task as computed, retrying", "error", err)
t.log.ErrorContext(ctx, "failed to mark task as computed, retrying", "error", err)
return nil, err
}
return taskResponse.Msg.GetNextTask(), nil
}, retry.Context(ctxSignal), retry.DelayType(retry.CombineDelay(retry.BackOffDelay, retry.RandomDelay)),
)
if err != nil {
slog.ErrorContext(ctx, "failed to retry NextTask", "error", err)
t.log.ErrorContext(ctx, "failed to retry NextTask", "error", err)
return // we got a cancellation signal, so let's just stop here
}
} else { // err != nil
slog.ErrorContext(ctx, "task execution failed", "error", err)
t.log.ErrorContext(ctx, "task execution failed", "error", err)
err = retry.Do(
func() error {
_, err := t.Client.TaskFailed(ctx, connect.NewRequest(&workflowsv1.TaskFailedRequest{
TaskId: task.GetId(),
CancelJob: true,
}))
if err != nil {
slog.ErrorContext(ctx, "failed to report task failure", "error", err)
t.log.ErrorContext(ctx, "failed to report task failure", "error", err)
return err
}
return nil
}, retry.Context(ctxSignal), retry.DelayType(retry.CombineDelay(retry.BackOffDelay, retry.RandomDelay)),
)
if err != nil {
slog.ErrorContext(ctx, "failed to retry TaskFailed", "error", err)
t.log.ErrorContext(ctx, "failed to retry TaskFailed", "error", err)
return // we got a cancellation signal, so let's just stop here
}
task = nil // reported a task failure, let's work-steal again
Expand All @@ -193,7 +195,7 @@ func (t *TaskRunner) Run(ctx context.Context) {
}
} else {
// if we didn't get a task, let's wait for a bit and try work-stealing again
slog.DebugContext(ctx, "no task to run")
t.log.DebugContext(ctx, "no task to run")

// instead of time.Sleep we set a timer and select on it, so we still can catch signals like SIGINT
timer := time.NewTimer(pollingInterval + rand.N(jitterInterval))
Expand All @@ -210,7 +212,7 @@ func (t *TaskRunner) Run(ctx context.Context) {
func (t *TaskRunner) executeTask(ctx context.Context, task *workflowsv1.Task) (*taskExecutionContext, error) {
// start a goroutine to extend the lease of the task continuously until the task execution is finished
leaseCtx, stopLeaseExtensions := context.WithCancel(ctx)
go extendTaskLease(leaseCtx, t.Client, task.GetId(), task.GetLease().GetLease().AsDuration(), task.GetLease().GetRecommendedWaitUntilNextExtension().AsDuration())
go t.extendTaskLease(leaseCtx, t.Client, task.GetId(), task.GetLease().GetLease().AsDuration(), task.GetLease().GetRecommendedWaitUntilNextExtension().AsDuration())
defer stopLeaseExtensions()

// actually execute the task
Expand All @@ -224,7 +226,7 @@ func (t *TaskRunner) executeTask(ctx context.Context, task *workflowsv1.Task) (*
}

return observability.StartJobSpan(ctx, t.tracer, fmt.Sprintf("task/%s", identifier.Name()), task.GetJob(), func(ctx context.Context) (*taskExecutionContext, error) {
slog.DebugContext(ctx, "executing task", "task", identifier.Name, "version", identifier.Version)
t.log.DebugContext(ctx, "executing task", "task", identifier.Name, "version", identifier.Version)
taskStruct := reflect.New(reflect.ValueOf(taskPrototype).Elem().Type()).Interface().(ExecutableTask)

_, isProtobuf := taskStruct.(proto.Message)
Expand Down Expand Up @@ -257,7 +259,7 @@ func (t *TaskRunner) executeTask(ctx context.Context, task *workflowsv1.Task) (*

// extendTaskLease is a function designed to be run as a goroutine, extending the lease of a task continuously until the
// context is cancelled, which indicates that the execution of the task is finished.
func extendTaskLease(ctx context.Context, client workflowsv1connect.TaskServiceClient, taskID *workflowsv1.UUID, initialLease, initialWait time.Duration) {
func (t *TaskRunner) extendTaskLease(ctx context.Context, client workflowsv1connect.TaskServiceClient, taskID *workflowsv1.UUID, initialLease, initialWait time.Duration) {
wait := initialWait
lease := initialLease
for {
Expand All @@ -268,21 +270,21 @@ func extendTaskLease(ctx context.Context, client workflowsv1connect.TaskServiceC
return
case <-timer.C: // the timer expired, let's try to extend the lease
}
slog.DebugContext(ctx, "extending task lease", "task_id", uuid.Must(uuid.FromBytes(taskID.GetUuid())), "lease", lease, "wait", wait)
t.log.DebugContext(ctx, "extending task lease", "task_id", uuid.Must(uuid.FromBytes(taskID.GetUuid())), "lease", lease, "wait", wait)
req := &workflowsv1.TaskLeaseRequest{
TaskId: taskID,
RequestedLease: durationpb.New(2 * lease), // double the current lease duration for the next extension
}
extension, err := client.ExtendTaskLease(ctx, connect.NewRequest(req))
if err != nil {
slog.ErrorContext(ctx, "failed to extend task lease", "error", err, "task_id", uuid.Must(uuid.FromBytes(taskID.GetUuid())))
t.log.ErrorContext(ctx, "failed to extend task lease", "error", err, "task_id", uuid.Must(uuid.FromBytes(taskID.GetUuid())))
// The server probably has an internal error, but there is no point in trying to extend the lease again
// because it will be expired then, so let's just return
return
}
if extension.Msg.GetLease() == nil {
// the server did not return a lease extension, it means that there is no need in trying to extend the lease
slog.DebugContext(ctx, "task lease extension not granted", "task_id", uuid.Must(uuid.FromBytes(taskID.GetUuid())))
t.log.DebugContext(ctx, "task lease extension not granted", "task_id", uuid.Must(uuid.FromBytes(taskID.GetUuid())))
return
}
// will probably be double the previous lease (since we requested that) or capped by the server at maxLeaseDuration
Expand Down

0 comments on commit 0558ab2

Please sign in to comment.