From 8fa709a6c3d42294c2e2565f256ef6912e2710f8 Mon Sep 17 00:00:00 2001 From: Nick Mitchell Date: Thu, 24 Oct 2024 17:36:58 -0400 Subject: [PATCH] feat: calling-convention=stdio Adds support for workers accepting input on stdout and producing output on stdout. This adds add1c and add1d test coverage in pipelines.sh. add1d also adds python test coverage for pipelines. Also adds mix-and-match pipelines, where we mix-and-match calling conventions, and python vs bash. Improved pipeline error handling and error handling test coverage. Separate task failure watching from redirect logic. Signed-off-by: Nick Mitchell --- cmd/options/build.go | 2 + cmd/options/calling-conventions.go | 18 +++++ cmd/subcommands/component/worker/run.go | 10 +-- pkg/boot/failures.go | 66 +++++++++++++++++++ pkg/boot/io.go | 4 +- pkg/boot/up.go | 37 ++++++++--- pkg/build/options.go | 28 ++++---- pkg/fe/transformer/api/workerpool/lower.go | 18 ++++- pkg/ir/hlir/application.go | 1 + pkg/ir/hlir/calling-convention.go | 41 ++++++++++++ pkg/ir/queue/as.go | 13 ++-- pkg/ir/queue/pattern.go | 5 ++ pkg/runtime/builtins/redirect.go | 42 +----------- pkg/runtime/worker/options.go | 2 + pkg/runtime/worker/process-task.go | 45 +++++++++---- tests/bin/pipelines.sh | 77 ++++++++++++++++++++-- 16 files changed, 317 insertions(+), 92 deletions(-) create mode 100644 cmd/options/calling-conventions.go create mode 100644 pkg/boot/failures.go create mode 100644 pkg/ir/hlir/calling-convention.go diff --git a/cmd/options/build.go b/cmd/options/build.go index 5831b312..b4d605bf 100644 --- a/cmd/options/build.go +++ b/cmd/options/build.go @@ -14,6 +14,8 @@ func AddBuildOptions(cmd *cobra.Command) (*build.Options, error) { return nil, err } + AddCallingConventionOptionsTo(cmd, &options) + cmd.Flags().StringVarP(&options.ImagePullSecret, "image-pull-secret", "s", options.ImagePullSecret, "Of the form :@ghcr.io") cmd.Flags().StringVar(&options.Queue, "queue", options.Queue, "Use the queue defined by this Secret (data: accessKeyID, secretAccessKey, endpoint)") cmd.Flags().BoolVar(&options.HasGpuSupport, "gpu", options.HasGpuSupport, "Run with GPUs (if supported by the application)") diff --git a/cmd/options/calling-conventions.go b/cmd/options/calling-conventions.go new file mode 100644 index 00000000..41176ed1 --- /dev/null +++ b/cmd/options/calling-conventions.go @@ -0,0 +1,18 @@ +package options + +import ( + "github.com/spf13/cobra" + + "lunchpail.io/pkg/build" +) + +func AddCallingConventionOptions(cmd *cobra.Command) *build.Options { + opts := &build.Options{} + AddCallingConventionOptionsTo(cmd, opts) + cmd.MarkFlagRequired("calling-convention") + return opts +} + +func AddCallingConventionOptionsTo(cmd *cobra.Command, options *build.Options) { + cmd.Flags().VarP(&options.CallingConvention, "calling-convention", "C", "Task input and output calling convention [files, stdio]") +} diff --git a/cmd/subcommands/component/worker/run.go b/cmd/subcommands/component/worker/run.go index 8536a1ef..6f14dcad 100644 --- a/cmd/subcommands/component/worker/run.go +++ b/cmd/subcommands/component/worker/run.go @@ -33,6 +33,7 @@ func Run() *cobra.Command { var startupDelay int cmd.Flags().IntVar(&startupDelay, "delay", 0, "Delay (in seconds) before engaging in any work") + ccOpts := options.AddCallingConventionOptions(cmd) logOpts := options.AddLogOptions(cmd) cmd.RunE = func(cmd *cobra.Command, args []string) error { @@ -46,10 +47,11 @@ func Run() *cobra.Command { } return worker.Run(context.Background(), args, worker.Options{ - StartupDelay: startupDelay, - PollingInterval: pollingInterval, - LogOptions: *logOpts, - RunContext: run.ForPool(poolName).ForWorker(workerName), + CallingConvention: ccOpts.CallingConvention, + StartupDelay: startupDelay, + PollingInterval: pollingInterval, + LogOptions: *logOpts, + RunContext: run.ForPool(poolName).ForWorker(workerName), }) } diff --git a/pkg/boot/failures.go b/pkg/boot/failures.go new file mode 100644 index 00000000..b0ee20c8 --- /dev/null +++ b/pkg/boot/failures.go @@ -0,0 +1,66 @@ +package boot + +import ( + "context" + "fmt" + "os" + "strings" + + "lunchpail.io/pkg/be" + "lunchpail.io/pkg/build" + "lunchpail.io/pkg/ir/queue" + s3 "lunchpail.io/pkg/runtime/queue" +) + +func lookForTaskFailures(ctx context.Context, backend be.Backend, run queue.RunContext, opts build.LogOptions) error { + client, err := s3.NewS3ClientForRun(ctx, backend, run.RunName) + if err != nil { + return err + } + defer client.Stop() + + if err := client.Mkdirp(run.Bucket); err != nil { + return err + } + + failures := run.AsFileForAnyWorker(queue.FinishedWithFailed) // we want to be notified if a task fails in *any* worker + objc, errc := client.Listen(run.Bucket, failures, "", false) + + done := false + for !done { + select { + case err := <-errc: + if err == nil || strings.Contains(err.Error(), "EOF") { + done = true + } else { + fmt.Fprintln(os.Stderr, err) + } + case object := <-objc: + // Oops, a task failed. Fetch the stderr and show it. + if opts.Verbose { + fmt.Fprintf(os.Stderr, "Got indication of task failure %s\n", object) + } + + // We need to find the FinishedWithStderr file + // that corresponds to the given object, which + // is an AssignedAndFinished file. To do so, + // we can parse the object to extract the task + // instance (`ForObjectTask`) and then use + // that `fortask` to templatize the + // FinishedWithCode + forobject, err := run.ForObject(queue.FinishedWithFailed, object) + if err != nil { + return err + } + + errorContent, err := client.Get(run.Bucket, forobject.AsFile(queue.FinishedWithStderr)) + if err != nil { + return err + } + + return fmt.Errorf("\033[0;31m" + errorContent + "\033[0m\n") + } + } + + return nil +} diff --git a/pkg/boot/io.go b/pkg/boot/io.go index 05b1afb9..eac0a049 100644 --- a/pkg/boot/io.go +++ b/pkg/boot/io.go @@ -7,6 +7,8 @@ import ( "path/filepath" "slices" + "github.com/dustin/go-humanize/english" + "lunchpail.io/pkg/be" "lunchpail.io/pkg/build" "lunchpail.io/pkg/ir/llir" @@ -29,7 +31,7 @@ func catAndRedirect(ctx context.Context, inputs []string, backend be.Backend, ir if len(inputs) > 0 { // "cat" the inputs into the queue if opts.Verbose { - fmt.Fprintf(os.Stderr, "up is using 'cat' to inject %d input files\n", len(inputs)) + fmt.Fprintf(os.Stderr, "Using 'cat' to inject %s\n", english.Plural(len(inputs), "input file", "")) } if err := builtins.Cat(ctx, client.S3Client, ir.Context.Run, inputs, opts); err != nil { return err diff --git a/pkg/boot/up.go b/pkg/boot/up.go index 34a6d20a..ac6632c6 100644 --- a/pkg/boot/up.go +++ b/pkg/boot/up.go @@ -107,50 +107,61 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption // We need to chain the isRunning channel to our 0-2 consumers // below. This is because golang channels are not multicast. - isRunning3 := make(chan struct{}) + isRunning4 := make(chan struct{}) needsCatAndRedirect := len(opts.Inputs) > 0 || ir.Context.Run.Step > 0 go func() { <-isRunning - isRunning3 <- struct{}{} + isRunning4 <- struct{}{} + isRunning4 <- struct{}{} if needsCatAndRedirect { - isRunning3 <- struct{}{} + isRunning4 <- struct{}{} } if opts.Watch { - isRunning3 <- struct{}{} + isRunning4 <- struct{}{} } }() + var errorFromIo error redirectDone := make(chan struct{}) if needsCatAndRedirect { // Behave like `cat inputs | ... > outputs` go func() { // wait for the run to be ready for us to enqueue - <-isRunning3 + <-isRunning4 defer func() { redirectDone <- struct{}{} }() if err := catAndRedirect(cancellable, opts.Inputs, backend, ir, *opts.BuildOptions.Log); err != nil { - fmt.Fprintln(os.Stderr, err) + errorFromIo = err cancel() } }() } else if opts.Watch { verbose := opts.BuildOptions.Log.Verbose go func() { - <-isRunning3 + <-isRunning4 go watchLogs(cancellable, backend, ir, WatchOptions{Verbose: verbose}) go watchUtilization(cancellable, backend, ir, WatchOptions{Verbose: verbose}) }() } go func() { - <-isRunning3 + <-isRunning4 if err := handlePipelineStdout(ir.Context); err != nil { fmt.Fprintln(os.Stderr, err) } }() + var errorFromTask error + go func() { + <-isRunning4 + if err := lookForTaskFailures(cancellable, backend, ir.Context.Run, *opts.BuildOptions.Log); err != nil { + errorFromTask = err + // fail fast? cancel() + } + }() + defer cancel() - err := backend.Up(cancellable, ir, opts.BuildOptions, isRunning) + errorFromUp := backend.Up(cancellable, ir, opts.BuildOptions, isRunning) if needsCatAndRedirect { <-redirectDone @@ -162,5 +173,11 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption default: } - return err + if errorFromTask != nil { + return errorFromTask + } + if errorFromIo != nil { + return errorFromIo + } + return errorFromUp } diff --git a/pkg/build/options.go b/pkg/build/options.go index de869259..b8040653 100644 --- a/pkg/build/options.go +++ b/pkg/build/options.go @@ -7,6 +7,7 @@ import ( "path/filepath" "lunchpail.io/pkg/be/target" + "lunchpail.io/pkg/ir/hlir" ) type TargetOptions struct { @@ -23,19 +24,20 @@ type Options struct { Target *TargetOptions Log *LogOptions - ImagePullSecret string `yaml:"imagePullSecret,omitempty"` - OverrideValues []string `yaml:"overrideValues,omitempty"` - OverrideFileValues []string `yaml:"overrideFileValues,omitempty"` - Queue string `yaml:",omitempty"` - HasGpuSupport bool `yaml:"hasGpuSupport,omitempty"` - ApiKey string `yaml:"apiKey,omitempty"` - ResourceGroupID string `yaml:"resourceGroupID,omitempty"` - SSHKeyType string `yaml:"SSHKeyType,omitempty"` - PublicSSHKey string `yaml:"publicSSHKey,omitempty"` - Zone string `yaml:"zone,omitempty"` - Profile string `yaml:"profile,omitempty"` - ImageID string `yaml:"imageID,omitempty"` - CreateNamespace bool `yaml:"createNamespace,omitempty"` + hlir.CallingConvention `yaml:"callingConvention,omitempty"` + ImagePullSecret string `yaml:"imagePullSecret,omitempty"` + OverrideValues []string `yaml:"overrideValues,omitempty"` + OverrideFileValues []string `yaml:"overrideFileValues,omitempty"` + Queue string `yaml:",omitempty"` + HasGpuSupport bool `yaml:"hasGpuSupport,omitempty"` + ApiKey string `yaml:"apiKey,omitempty"` + ResourceGroupID string `yaml:"resourceGroupID,omitempty"` + SSHKeyType string `yaml:"SSHKeyType,omitempty"` + PublicSSHKey string `yaml:"publicSSHKey,omitempty"` + Zone string `yaml:"zone,omitempty"` + Profile string `yaml:"profile,omitempty"` + ImageID string `yaml:"imageID,omitempty"` + CreateNamespace bool `yaml:"createNamespace,omitempty"` } //go:embed buildOptions.json diff --git a/pkg/fe/transformer/api/workerpool/lower.go b/pkg/fe/transformer/api/workerpool/lower.go index 79b66e6d..1093afc5 100644 --- a/pkg/fe/transformer/api/workerpool/lower.go +++ b/pkg/fe/transformer/api/workerpool/lower.go @@ -27,11 +27,25 @@ func Lower(buildName string, ctx llir.Context, app hlir.Application, pool hlir.W app.Spec.Env = make(map[string]string) } - queueArgs := fmt.Sprintf("--pool %s --worker $LUNCHPAIL_POD_NAME --verbose=%v --debug=%v ", pool.Metadata.Name, opts.Log.Verbose, opts.Log.Debug) + queueArgs := fmt.Sprintf("--pool %s --worker $LUNCHPAIL_POD_NAME --verbose=%v --debug=%v ", + pool.Metadata.Name, + opts.Log.Verbose, + opts.Log.Debug, + ) + + callingConvention := opts.CallingConvention + if callingConvention == "" { + callingConvention = app.Spec.CallingConvention + } + if callingConvention == "" { + callingConvention = hlir.CallingConventionFiles + } + app.Spec.Command = fmt.Sprintf(`trap "$LUNCHPAIL_EXE component worker prestop %s" EXIT -$LUNCHPAIL_EXE component worker run --delay %d %s -- %s`, +$LUNCHPAIL_EXE component worker run --delay %d --calling-convention %v %s -- %s`, queueArgs, startupDelay, + callingConvention, queueArgs, app.Spec.Command, ) diff --git a/pkg/ir/hlir/application.go b/pkg/ir/hlir/application.go index 161188b0..b4e69f48 100644 --- a/pkg/ir/hlir/application.go +++ b/pkg/ir/hlir/application.go @@ -32,6 +32,7 @@ type Application struct { SecurityContext SecurityContext `yaml:"securityContext,omitempty"` ContainerSecurityContext ContainerSecurityContext `yaml:"containerSecurityContext,omitempty"` Needs []Needs `yaml:"needs,omitempty"` + CallingConvention `yaml:"callingConvention,omitempty"` } } diff --git a/pkg/ir/hlir/calling-convention.go b/pkg/ir/hlir/calling-convention.go new file mode 100644 index 00000000..6e1409a8 --- /dev/null +++ b/pkg/ir/hlir/calling-convention.go @@ -0,0 +1,41 @@ +package hlir + +import "fmt" + +type CallingConvention string + +const ( + CallingConventionFiles CallingConvention = "files" + CallingConventionStdio = "stdio" +) + +func lookup(maybe string) (CallingConvention, error) { + switch maybe { + case string(CallingConventionFiles): + return CallingConventionFiles, nil + case string(CallingConventionStdio): + return CallingConventionStdio, nil + } + + return "", fmt.Errorf("Unsupported calling convention %s\n", maybe) +} + +// String is used both by fmt.Print and by Cobra in help text +func (cc *CallingConvention) String() string { + return string(*cc) +} + +// Set must have pointer receiver so it doesn't change the value of a copy +func (cc *CallingConvention) Set(v string) error { + p, err := lookup(v) + if err != nil { + return err + } + *cc = p + return nil +} + +// Type is only used in help text +func (cc *CallingConvention) Type() string { + return "CallingConvention" +} diff --git a/pkg/ir/queue/as.go b/pkg/ir/queue/as.go index f410b629..1e2d1c50 100644 --- a/pkg/ir/queue/as.go +++ b/pkg/ir/queue/as.go @@ -28,11 +28,14 @@ func (run RunContext) AsFile(path Path) string { if err != nil { return "" } - return s + return anyPoolP.ReplaceAllString( + anyWorkerP.ReplaceAllString( + anyTaskP.ReplaceAllString(s, ""), + ""), + "") } -// As with AsFile() but returning the enclosing directory (i.e. not -// specific to a pool, a worker, or a task) -func (run RunContext) AsFileForAnyWorker(path Path) string { - return filepath.Dir(filepath.Dir(run.ForPool("").ForWorker("").ForTask("").AsFile(path))) +// As with AsFile(), but independent of any particular worker +func (ctx RunContext) AsFileForAnyWorker(path Path) string { + return ctx.ForPool(any).ForWorker(any).ForTask(any).AsFile(path) } diff --git a/pkg/ir/queue/pattern.go b/pkg/ir/queue/pattern.go index a34707f9..d1294a4f 100644 --- a/pkg/ir/queue/pattern.go +++ b/pkg/ir/queue/pattern.go @@ -2,6 +2,11 @@ package queue import "regexp" +var any = "*" +var anyPoolP = regexp.MustCompile("/pool/\\" + any) +var anyWorkerP = regexp.MustCompile("/worker/\\" + any) +var anyTaskP = regexp.MustCompile("\\" + any + "$") // task comes at the end + var placeholder = "xxxxxxxxxxxxxx" var placeholderR = regexp.MustCompile(placeholder) diff --git a/pkg/runtime/builtins/redirect.go b/pkg/runtime/builtins/redirect.go index 58e5db89..349dd5be 100644 --- a/pkg/runtime/builtins/redirect.go +++ b/pkg/runtime/builtins/redirect.go @@ -7,7 +7,6 @@ import ( "path/filepath" "strings" - "github.com/dustin/go-humanize/english" "golang.org/x/sync/errgroup" "lunchpail.io/pkg/build" @@ -17,10 +16,7 @@ import ( func RedirectTo(ctx context.Context, client s3.S3Client, run queue.RunContext, folderFor func(object string) string, opts build.LogOptions) error { outbox := run.AsFile(queue.AssignedAndFinished) - failures := run.AsFileForAnyWorker(queue.FinishedWithFailed) // we want to be notified if a task fails in *any* worker - outboxObjects, outboxErrs := client.Listen(client.Paths.Bucket, outbox, "", false) - failuresObjects, failuresErrs := client.Listen(client.Paths.Bucket, failures, "", false) group, _ := errgroup.WithContext(ctx) done := false @@ -37,7 +33,7 @@ func RedirectTo(ctx context.Context, client s3.S3Client, run queue.RunContext, f } if err := client.Download(client.Paths.Bucket, object, dst); err != nil { if opts.Verbose { - fmt.Fprintf(os.Stderr, "Error Downloading output %s\n%v\n", object, err) + fmt.Fprintf(os.Stderr, "Error downloading output %s\n%v\n", object, err) } return err } @@ -54,46 +50,18 @@ func RedirectTo(ctx context.Context, client s3.S3Client, run queue.RunContext, f }) } - nFailures := 0 for !done { select { + case <-ctx.Done(): + done = true case err := <-outboxErrs: if err == nil || strings.Contains(err.Error(), "EOF") { done = true } else { fmt.Fprintln(os.Stderr, err) } - case err := <-failuresErrs: - if err == nil || strings.Contains(err.Error(), "EOF") { - done = true - } else { - fmt.Fprintln(os.Stderr, err) - } case object := <-outboxObjects: downloadNow(object) - case object := <-failuresObjects: - // Oops, a task failed. Fetch the stderr and show it. - if opts.Verbose { - fmt.Fprintf(os.Stderr, "Got indication of task failure %s\n", object) - } - - // We need to find the FinishedWithStderr file - // that corresponds to the given object, which - // is an AssignedAndFinished file. To do so, - // we can parse the object to extract the task - // instance (`ForObjectTask`) and then use - // that `fortask` to templatize the - // FinishedWithCode - forobject, err := run.ForObject(queue.FinishedWithFailed, object) - if err != nil { - return err - } - errorContent, err := client.Get(run.Bucket, forobject.AsFile(queue.FinishedWithStderr)) - if err != nil { - return err - } - fmt.Fprintf(os.Stderr, "\033[0;31m"+errorContent+"\033[0m") - nFailures++ } } @@ -101,9 +69,5 @@ func RedirectTo(ctx context.Context, client s3.S3Client, run queue.RunContext, f return err } - if nFailures > 0 { - return fmt.Errorf("Error: %s failed", english.PluralWord(nFailures, "task", "")) - } - return nil } diff --git a/pkg/runtime/worker/options.go b/pkg/runtime/worker/options.go index c9dfc02c..bd0afd17 100644 --- a/pkg/runtime/worker/options.go +++ b/pkg/runtime/worker/options.go @@ -2,10 +2,12 @@ package worker import ( "lunchpail.io/pkg/build" + "lunchpail.io/pkg/ir/hlir" "lunchpail.io/pkg/ir/queue" ) type Options struct { + hlir.CallingConvention queue.RunContext StartupDelay int PollingInterval int diff --git a/pkg/runtime/worker/process-task.go b/pkg/runtime/worker/process-task.go index ed0db11c..dfa7fda1 100644 --- a/pkg/runtime/worker/process-task.go +++ b/pkg/runtime/worker/process-task.go @@ -77,36 +77,58 @@ func (p taskProcessor) process(task string) error { }() // Open stdout/err streams - stdoutWriter, stderrWriter := p.streamStdout(taskContext) + stdoutWriter, stderrWriter, stdoutReader := p.streamStdout(taskContext) defer stdoutWriter.Close() defer stderrWriter.Close() // Here is where we invoke the underlying task handler handlercmd := exec.CommandContext(p.ctx, p.handler[0], slices.Concat(p.handler[1:], []string{localprocessing, localoutbox})...) - handlercmd.Stdout = io.MultiWriter(os.Stdout, stdoutWriter) handlercmd.Stderr = io.MultiWriter(os.Stderr, stderrWriter) + handlercmd.Stdout = io.MultiWriter(os.Stdout, stdoutWriter) + switch p.opts.CallingConvention { + case "stdio": + if stdin, err := os.Open(localprocessing); err != nil { + fmt.Fprintf(os.Stderr, "Internal Error setting up stdin: %v\n", err) + return nil + } else { + handlercmd.Stdin = stdin + } + p.backgroundS3Tasks.Go(func() error { + defer stdoutReader.Close() + return p.client.StreamingUpload(taskContext.Bucket, taskContext.AsFile(queue.AssignedAndFinished), stdoutReader) + }) + defer func() { + p.backgroundS3Tasks.Go(func() error { + <-doneMovingToProcessing + return p.client.Rm(taskContext.Bucket, inprogress) + }) + }() + default: + defer func() { p.handleOutbox(taskContext, inprogress, localoutbox, doneMovingToProcessing) }() + } err = handlercmd.Run() if err != nil { - fmt.Fprintln(os.Stderr, "Internal Error running the handler:", err) + fmt.Fprintln(os.Stderr, "Handler launch failed:", err) } // Clean things up p.handleExitCode(taskContext, handlercmd.ProcessState.ExitCode()) - p.handleOutbox(taskContext, inprogress, localoutbox, doneMovingToProcessing) if p.opts.LogOptions.Verbose { - fmt.Fprintf(os.Stderr, "Worker done with task %s\n", task) + fmt.Fprintf(os.Stderr, "Worker done with task %s exitCode=%d\n", task, handlercmd.ProcessState.ExitCode()) } return nil } // Set up pipes to stream output of the subprocess directly to S3 -func (p taskProcessor) streamStdout(taskContext queue.RunContext) (*io.PipeWriter, *io.PipeWriter) { +func (p taskProcessor) streamStdout(taskContext queue.RunContext) (*io.PipeWriter, *io.PipeWriter, *io.PipeReader) { stdoutReader, stdoutWriter := io.Pipe() - p.backgroundS3Tasks.Go(func() error { - defer stdoutReader.Close() - return p.client.StreamingUpload(taskContext.Bucket, taskContext.AsFile(queue.FinishedWithStdout), stdoutReader) - }) + if p.opts.CallingConvention == "files" { + p.backgroundS3Tasks.Go(func() error { + defer stdoutReader.Close() + return p.client.StreamingUpload(taskContext.Bucket, taskContext.AsFile(queue.FinishedWithStdout), stdoutReader) + }) + } stderrReader, stderrWriter := io.Pipe() p.backgroundS3Tasks.Go(func() error { @@ -114,7 +136,7 @@ func (p taskProcessor) streamStdout(taskContext queue.RunContext) (*io.PipeWrite return p.client.StreamingUpload(taskContext.Bucket, taskContext.AsFile(queue.FinishedWithStderr), stderrReader) }) - return stdoutWriter, stderrWriter + return stdoutWriter, stderrWriter, stdoutReader } // Report and upload exit code @@ -130,7 +152,6 @@ func (p taskProcessor) handleExitCode(taskContext queue.RunContext, exitCode int return p.client.Touch(taskContext.Bucket, taskContext.AsFile(queue.FinishedWithSucceeded)) }) } else { - fmt.Fprintln(os.Stderr, "Error with exit code "+strconv.Itoa(exitCode)+" while processing "+taskContext.Task) p.backgroundS3Tasks.Go(func() error { return p.client.Touch(taskContext.Bucket, taskContext.AsFile(queue.FinishedWithFailed)) }) } } diff --git a/tests/bin/pipelines.sh b/tests/bin/pipelines.sh index 3cb7e69f..447ba0bb 100755 --- a/tests/bin/pipelines.sh +++ b/tests/bin/pipelines.sh @@ -10,7 +10,7 @@ lp=/tmp/lunchpail IN1=$(mktemp) echo "1" > $IN1 -trap "rm -f $IN1 $add1b" EXIT +trap "rm -f $IN1 $fail $add1b $add1c $add1d" EXIT export LUNCHPAIL_NAME="pipeline-test" export LUNCHPAIL_TARGET=${LUNCHPAIL_TARGET:-local} @@ -80,8 +80,11 @@ function validate { noLoitering 'minio server' noLoitering 'worker run' + # if we get here, then the non-zero exit code was expected, hence + # the return 0 (we don't need to validate the output files, + # i.e. the validations just after this) if [[ $expected_ec != 0 ]] - then return 1 + then return 0 fi if [[ -e "$actual" ]] @@ -93,19 +96,35 @@ function validate { expected_sha256=$(cat "$expected" | sha256sum) if [[ "$actual_sha256" = "$expected_sha256" ]] then echo "✅ PASS the output file is valid file=$actual" - else echo "❌ FAIL mismatched sha256 on output file file=$actual actual_sha256=$actual_sha256 expected_sha256=$expected_sha256" && return 1 + else echo "❌ FAIL mismatched sha256 on output file file=$actual actual=$(cat $actual) expected=$(cat $expected) actual_file=$actual expected_file=$expected" && return 1 fi rm -f "$actual" } -# build an add1 using `build -e/--eval` +# build a fail app +fail=$(mktemp) +/tmp/lunchpail build -e 'exit 1' -o $fail & +failpid=$! + +# build an add1 using `build -e/--eval`; printf because `echo -n` is not universally supported add1b=$(mktemp) -/tmp/lunchpail build -e 'printf "%d" $((1+$(cat $1))) > $2' -o $add1b +/tmp/lunchpail build -e 'printf "%d" $((1+$(cat $1))) > $2' -o $add1b & + +# ibid, for stdio calling convention; we need the extra 'read v' because dash does not support