From e6f9295013dd788e9dab8d34d345dfca7f3e6bd6 Mon Sep 17 00:00:00 2001 From: Oliver Eikemeier Date: Sun, 18 Feb 2024 18:14:48 +0100 Subject: [PATCH] Rewrite to use a structure instead of a channel --- .buildkite/pipeline.yaml | 4 + .codeclimate.yml | 4 + .github/codecov.yml | 2 +- .github/workflows/test.yml | 13 +- .gitignore | 1 + README.md | 48 ++----- async_test.go | 112 --------------- combine.go | 156 +++++++++----------- combine_all_test.go | 122 ++++++++++++++++ combine_seq_test.go | 61 -------- combine_test.go | 224 +++++++++++++++++------------ future.go | 101 ++++++------- memoizer_test.go => future_test.go | 136 +++++++++--------- go.mod | 8 +- go.sum | 14 +- iterator.go | 93 ++++++++++++ memoizer.go | 130 ----------------- combine_seq.go => nocopy.go | 17 +-- promise.go | 44 +++--- result.go => result/result.go | 44 +++++- result/result_test.go | 96 +++++++++++++ result_test.go | 52 ------- then.go | 36 +++-- then_test.go | 136 +++++++++--------- value.go | 48 +++++++ 25 files changed, 874 insertions(+), 828 deletions(-) delete mode 100644 async_test.go create mode 100644 combine_all_test.go delete mode 100644 combine_seq_test.go rename memoizer_test.go => future_test.go (59%) create mode 100644 iterator.go delete mode 100644 memoizer.go rename combine_seq.go => nocopy.go (63%) rename result.go => result/result.go (57%) create mode 100644 result/result_test.go delete mode 100644 result_test.go create mode 100644 value.go diff --git a/.buildkite/pipeline.yaml b/.buildkite/pipeline.yaml index 8987757..5b26119 100644 --- a/.buildkite/pipeline.yaml +++ b/.buildkite/pipeline.yaml @@ -11,8 +11,12 @@ steps: - test-collector#v1.10.0: files: test.xml format: junit + env: + GOEXPERIMENT: rangefunc - label: ':codecov: + :codeclimate: Coverage' commands: - go test -race -coverprofile=cover.out ./... - sh .buildkite/upload_coverage.sh cover.out + env: + GOEXPERIMENT: rangefunc diff --git a/.codeclimate.yml b/.codeclimate.yml index 9791151..052c19b 100644 --- a/.codeclimate.yml +++ b/.codeclimate.yml @@ -12,3 +12,7 @@ exclude_patterns: - "go.mod" - "go.sum" - "LICENSE" + - "nocopy.go" +engines: + golangci: + enabled: true diff --git a/.github/codecov.yml b/.github/codecov.yml index 4931c82..9028113 100644 --- a/.github/codecov.yml +++ b/.github/codecov.yml @@ -4,4 +4,4 @@ coverage: project: false patch: false ignore: - - internal/mocks + - nocopy.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 53b421b..a34bbe2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -10,8 +10,11 @@ name: Test jobs: test: runs-on: ubuntu-latest - env: - GOEXPERIMENT: rangefunc + permissions: + checks: write + contents: read + pull-requests: read + statuses: write steps: - name: ✔ Check out uses: actions/checkout@v4 @@ -20,5 +23,11 @@ jobs: with: go-version: "1.22" check-latest: true + - name: 🧸 golangci-lint + uses: golangci/golangci-lint-action@v4 + with: + version: v1.56.2 - name: 🔨 Test run: go test -race ./... + env: + GOEXPERIMENT: rangefunc diff --git a/.gitignore b/.gitignore index 2d240c3..fca0085 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ /bin/ /cover.out /test.xml +/trace.out diff --git a/README.md b/README.md index 5793620..631c760 100644 --- a/README.md +++ b/README.md @@ -7,27 +7,14 @@ [![Maintainability](https://api.codeclimate.com/v1/badges/72fe9626fb821fc70251/maintainability)](https://codeclimate.com/github/fillmore-labs/async-exp/maintainability) [![Go Report Card](https://goreportcard.com/badge/fillmore-labs.com/exp/async)](https://goreportcard.com/report/fillmore-labs.com/exp/async) [![License](https://img.shields.io/github/license/fillmore-labs/exp-async)](https://www.apache.org/licenses/LICENSE-2.0) +[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2Ffillmore-labs%2Fasync-exp.svg?type=shield&issueType=license)](https://app.fossa.com/projects/git%2Bgithub.com%2Ffillmore-labs%2Fasync-exp) The `async` package provides interfaces and utilities for writing asynchronous code in Go. ## Motivation -Futures and promises are constructs used for asynchronous and concurrent programming, allowing developers to work with -values that may not be immediately available and can be evaluated in a different execution context. - -Go is known for its built-in concurrency features like goroutines and channels. -The select statement further allows for efficient multiplexing and synchronization of multiple channels, thereby -enabling developers to coordinate and orchestrate asynchronous operations effectively. -Additionally, the context package offers a standardized way to manage cancellation, deadlines, and timeouts within -concurrent and asynchronous code. - -On the other hand, Go's error handling mechanism, based on explicit error values returned from functions, provides a -clear and concise way to handle errors. - -The purpose of this package is to provide a thin layer over channels which simplifies the integration of concurrent -code while providing a cohesive strategy for handling asynchronous errors. -By adhering to Go's standard conventions for asynchronous and concurrent code, as well as error propagation, this -package aims to enhance developer productivity and code reliability in scenarios requiring asynchronous operations. +This is an experimental package which has a similar API as +[fillmore-labs.com/promise](https://pkg.go.dev/fillmore-labs.com/promise), but is implemented with a structure instead. ## Usage @@ -37,18 +24,19 @@ address (see [GetMyIP](#getmyip) for an example). Now you can do ```go - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - future := async.NewFutureAsync(func() (string, error) { - return getMyIP(ctx) - }) + query := func() (string, error) { + return getMyIP(ctx) // Capture context with timeout + } + future := async.NewAsync(query) ``` and elsewhere in your program, even in a different goroutine ```go - if ip, err := future.Wait(ctx); err == nil { + if ip, err := future.Await(ctx); err == nil { slog.Info("Found IP", "ip", ip) } else { slog.Error("Failed to fetch IP", "error", err) @@ -77,27 +65,15 @@ func getMyIP(ctx context.Context) (string, error) { } defer func() { _ = resp.Body.Close() }() - ipResponse := &struct { + ipResponse := struct { Origin string `json:"origin"` }{} + err = json.NewDecoder(resp.Body).Decode(&ipResponse) - if err := json.NewDecoder(resp.Body).Decode(ipResponse); err != nil { - return "", err - } - - return ipResponse.Origin, nil + return ipResponse.Origin, err } ``` -## Concurrency Correctness - -When utilizing plain Go channels for concurrency, reasoning over the correctness of concurrent code becomes simpler -compared to some other implementations of futures and promises. -Channels provide a clear and explicit means of communication and synchronization between concurrent goroutines, making -it easier to understand and reason about the flow of data and control in a concurrent program. - -Therefore, this library provides a straightforward and idiomatic approach to achieving concurrency correctness. - ## Links - [Futures and Promises](https://en.wikipedia.org/wiki/Futures_and_promises) in the English Wikipedia diff --git a/async_test.go b/async_test.go deleted file mode 100644 index 22df2b1..0000000 --- a/async_test.go +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright 2023-2024 Oliver Eikemeier. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// SPDX-License-Identifier: Apache-2.0 - -package async_test - -import ( - "context" - "errors" - "testing" - - "fillmore-labs.com/exp/async" - "github.com/stretchr/testify/suite" -) - -type AsyncTestSuite struct { - suite.Suite - promise async.Promise[int] - future async.Future[int] -} - -func TestAsyncTestSuite(t *testing.T) { - t.Parallel() - suite.Run(t, new(AsyncTestSuite)) -} - -func (s *AsyncTestSuite) SetupTest() { - s.future, s.promise = async.NewFuture[int]() -} - -func (s *AsyncTestSuite) TestValue() { - // given - s.promise.Do(func() (int, error) { return 1, nil }) - - // when - value, err := s.future.Wait(context.Background()) - - // then - if s.NoError(err) { - s.Equal(1, value) - } -} - -var errTest = errors.New("test error") - -func (s *AsyncTestSuite) TestError() { - // given - ctx := context.Background() - s.promise.Do(func() (int, error) { return 0, errTest }) - - // when - _, err := s.future.Wait(ctx) - - // then - s.ErrorIs(err, errTest) -} - -func (s *AsyncTestSuite) TestCancellation() { - // given - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - // when - _, err := s.future.Wait(ctx) - - // then - s.ErrorIs(err, context.Canceled) -} - -func (s *AsyncTestSuite) TestNoResult() { - // given - ctx := context.Background() - s.promise.Fulfill(1) - _, _ = s.future.Wait(ctx) - - // when - _, err := s.future.Wait(ctx) - - // then - s.ErrorIs(err, async.ErrNoResult) -} - -func (s *AsyncTestSuite) TestTryWait() { - // given - - // when - _, err1 := s.future.TryWait() - - s.promise.Fulfill(1) - - v2, err2 := s.future.TryWait() - _, err3 := s.future.TryWait() - - // then - s.ErrorIs(err1, async.ErrNotReady) - if s.NoError(err2) { - s.Equal(1, v2) - } - s.ErrorIs(err3, async.ErrNoResult) -} diff --git a/combine.go b/combine.go index 745adfd..bef5aa6 100644 --- a/combine.go +++ b/combine.go @@ -18,129 +18,109 @@ package async import ( "context" + "errors" "fmt" - "reflect" - "runtime/trace" -) - -func release[R any](futures []Awaitable[R], released []bool) { - for i, done := range released { - if !done { - futures[i].releaseRunning() - } - } -} -func YieldAll[R any](ctx context.Context, yield func(int, Result[R]) bool, futures ...Awaitable[R]) error { - numFutures := len(futures) - selectCases := make([]reflect.SelectCase, numFutures+1) - - for i, future := range futures { - future.addRunning() - selectCases[i] = reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(future.channel()), - } - } - selectCases[numFutures] = reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(ctx.Done()), - } - - released := make([]bool, numFutures) - - for i := 0; i < numFutures; i++ { - chosen, rcv, ok := reflect.Select(selectCases) - - if chosen == numFutures { // context channel - release(futures, released) + "fillmore-labs.com/exp/async/result" +) - return fmt.Errorf("async wait canceled: %w", ctx.Err()) - } +// AwaitAll returns a function that yields the results of all futures. +// If the context is canceled, it returns an error for the remaining futures. +func AwaitAll[R any](ctx context.Context, futures ...Future[R]) func(yield func(int, result.Result[R]) bool) { + i := newIterator(ctx, func(f Future[R]) result.Result[R] { return f.v }, futures) - selectCases[chosen].Chan = reflect.Value{} + return i.yieldTo +} - r, _ := rcv.Interface().(Result[R]) - v := futures[chosen].processResult(r, ok) - released[chosen] = true +// AwaitAllAny returns a function that yields the results of all futures. +// If the context is canceled, it returns an error for the remaining futures. +func AwaitAllAny(ctx context.Context, futures ...AnyFuture) func(yield func(int, result.Result[any]) bool) { + i := newIterator(ctx, func(f AnyFuture) result.Result[any] { return f.any() }, futures) - if !yield(chosen, v) { - release(futures, released) + return i.yieldTo +} - return nil - } - } +// AwaitAllResults waits for all futures to complete and returns the results. +// If the context is canceled, it returns early with errors for the remaining futures. +func AwaitAllResults[R any](ctx context.Context, futures ...Future[R]) []result.Result[R] { + return awaitAllResults(len(futures), AwaitAll(ctx, futures...)) +} - return nil +// AwaitAllResultsAny waits for all futures to complete and returns the results. +// If the context is canceled, it returns early with errors for the remaining futures. +func AwaitAllResultsAny(ctx context.Context, futures ...AnyFuture) []result.Result[any] { + return awaitAllResults(len(futures), AwaitAllAny(ctx, futures...)) } -// WaitAll returns the results of all completed futures. If the context is canceled, it returns early with an error. -func WaitAll[R any](ctx context.Context, futures ...Awaitable[R]) ([]Result[R], error) { - defer trace.StartRegion(ctx, "asyncWaitAll").End() - numFutures := len(futures) +func awaitAllResults[R any](n int, iter func(yield func(int, result.Result[R]) bool)) []result.Result[R] { + results := make([]result.Result[R], n) - results := make([]Result[R], numFutures) - yield := func(i int, r Result[R]) bool { + iter(func(i int, r result.Result[R]) bool { results[i] = r return true - } + }) - err := YieldAll(ctx, yield, futures...) - if err != nil { - return nil, err - } + return results +} - return results, nil +// AwaitAllValues returns the values of completed futures. +// If any future fails or the context is canceled, it returns early with an error. +func AwaitAllValues[R any](ctx context.Context, futures ...Future[R]) ([]R, error) { + return awaitAllValues(len(futures), AwaitAll(ctx, futures...)) } -// WaitAllValues returns the values of all completed futures. +// AwaitAllValuesAny returns the values of completed futures. // If any future fails or the context is canceled, it returns early with an error. -func WaitAllValues[R any](ctx context.Context, futures ...Awaitable[R]) ([]R, error) { - defer trace.StartRegion(ctx, "asyncWaitAllValues").End() - numFutures := len(futures) +func AwaitAllValuesAny(ctx context.Context, futures ...AnyFuture) ([]any, error) { + return awaitAllValues(len(futures), AwaitAllAny(ctx, futures...)) +} - results := make([]R, numFutures) +func awaitAllValues[R any](n int, iter func(yield func(int, result.Result[R]) bool)) ([]R, error) { + results := make([]R, n) var yieldErr error - yield := func(i int, r Result[R]) bool { - v, err := r.V() - if err != nil { - yieldErr = fmt.Errorf("async WaitAllValues result %d: %w", i, err) + + iter(func(i int, r result.Result[R]) bool { + if r.Err() != nil { + yieldErr = fmt.Errorf("list AwaitAllValues result %d: %w", i, r.Err()) return false } - results[i] = v + results[i] = r.Value() return true - } + }) - err := YieldAll(ctx, yield, futures...) - if yieldErr != nil { - return nil, yieldErr - } - if err != nil { - return nil, err - } + return results, yieldErr +} + +// ErrNoResult is returned when [AwaitFirst] is called on an empty list. +var ErrNoResult = errors.New("no result") - return results, nil +// AwaitFirst returns the result of the first completed future. +// If the context is canceled, it returns early with an error. +func AwaitFirst[R any](ctx context.Context, futures ...Future[R]) (R, error) { + return awaitFirst(AwaitAll(ctx, futures...)) } -// WaitFirst returns the result of the first completed future. +// AwaitFirstAny returns the result of the first completed future. // If the context is canceled, it returns early with an error. -func WaitFirst[R any](ctx context.Context, futures ...Awaitable[R]) (R, error) { - defer trace.StartRegion(ctx, "asyncWaitFirst").End() +func AwaitFirstAny(ctx context.Context, futures ...AnyFuture) (any, error) { + return awaitFirst(AwaitAllAny(ctx, futures...)) +} - var result Result[R] - yield := func(i int, r Result[R]) bool { - result = r +func awaitFirst[R any](iter func(yield func(int, result.Result[R]) bool)) (R, error) { + var v result.Result[R] + + iter(func(_ int, r result.Result[R]) bool { + v = r return false - } + }) - err := YieldAll(ctx, yield, futures...) - if err != nil { - return *new(R), err + if v == nil { + return *new(R), ErrNoResult } - return result.V() + return v.V() } diff --git a/combine_all_test.go b/combine_all_test.go new file mode 100644 index 0000000..c822c58 --- /dev/null +++ b/combine_all_test.go @@ -0,0 +1,122 @@ +// Copyright 2023-2024 Oliver Eikemeier. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +//go:build goexperiment.rangefunc + +package async_test + +import ( + "context" + "testing" + + "fillmore-labs.com/exp/async" + "fillmore-labs.com/exp/async/result" + "github.com/stretchr/testify/assert" +) + +func TestAll(t *testing.T) { + t.Parallel() + + // given + promises, futures := makePromisesAndFutures[int]() + values := []struct { + value int + err error + }{ + {1, nil}, + {0, errTest}, + {2, nil}, + } + + for i, v := range values { + promises[i].Do(func() (int, error) { return v.value, v.err }) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // when + results := make([]result.Result[int], len(futures)) + for i, r := range async.AwaitAll(ctx, futures...) { //nolint:typecheck + results[i] = r + } + + // then + if assert.NoError(t, results[0].Err()) { + assert.Equal(t, 1, results[0].Value()) + } + if assert.ErrorIs(t, results[1].Err(), errTest) { + _ = results[1].Value() // Should not panic + } + if assert.NoError(t, results[2].Err()) { + assert.Equal(t, 2, results[2].Value()) + } +} + +func TestAllEmpty(t *testing.T) { + t.Parallel() + + // given + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // when + allFutures := async.AwaitAllResults[int](ctx) + + // then + assert.Zero(t, len(allFutures)) + for _, v := range allFutures { //nolint:typecheck + t.Errorf("Invalid value %v", v) + } +} + +func TestAnyAll(t *testing.T) { + t.Parallel() + + // given + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + p1, f1 := async.New[int]() + p2, f2 := async.New[string]() + p3, f3 := async.New[struct{}]() + + p1.Resolve(1) + p2.Resolve("test") + p3.Resolve(struct{}{}) + + // when + results := make([]result.Result[any], 3) + for i, r := range async.AwaitAllAny(ctx, f1, f2, f3) { //nolint:typecheck + results[i] = r + } + + // then + for i, r := range results { + if assert.NoError(t, r.Err()) { + switch i { + case 0: + assert.Equal(t, 1, r.Value()) + case 1: + assert.Equal(t, "test", r.Value()) + case 2: + assert.Equal(t, struct{}{}, r.Value()) + default: + assert.Fail(t, "unexpected index") + } + } + } +} diff --git a/combine_seq_test.go b/combine_seq_test.go deleted file mode 100644 index 89d7e05..0000000 --- a/combine_seq_test.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2023-2024 Oliver Eikemeier. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// SPDX-License-Identifier: Apache-2.0 - -//go:build goexperiment.rangefunc - -package async_test - -import ( - "context" - "testing" - - "fillmore-labs.com/exp/async" - "github.com/stretchr/testify/assert" -) - -// All returns the results of all completed futures. If the context is canceled, it returns early with an error. -func TestAll(t *testing.T) { - t.Parallel() - - // given - promises, futures := makePromisesAndFutures[int]() - promises[0].Fulfill(1) - promises[1].Reject(errTest) - close(promises[2]) - - memoizers := make([]async.Awaitable[int], 0, len(futures)) - for _, f := range futures { - memoizers = append(memoizers, f.Memoize()) - } - - // when - var results [3]async.Result[int] - ctx := context.Background() - for i, r := range async.All(ctx, memoizers...) { //nolint:typecheck - results[i] = r - } - - // then - v0, err0 := results[0].V() - _, err1 := results[1].V() - _, err2 := results[2].V() - - if assert.NoError(t, err0) { - assert.Equal(t, 1, v0) - } - assert.ErrorIs(t, err1, errTest) - assert.ErrorIs(t, err2, async.ErrNoResult) -} diff --git a/combine_test.go b/combine_test.go index 9b4e0f5..f64f535 100644 --- a/combine_test.go +++ b/combine_test.go @@ -21,17 +21,18 @@ import ( "testing" "fillmore-labs.com/exp/async" + "fillmore-labs.com/exp/async/result" "github.com/stretchr/testify/assert" ) const iterations = 3 -func makePromisesAndFutures[R any]() ([]async.Promise[R], []async.Awaitable[R]) { +func makePromisesAndFutures[R any]() ([]async.Promise[R], []async.Future[R]) { var promises [iterations]async.Promise[R] - var futures [iterations]async.Awaitable[R] + var futures [iterations]async.Future[R] for i := 0; i < iterations; i++ { - futures[i], promises[i] = async.NewFuture[R]() + promises[i], futures[i] = async.New[R]() } return promises[:], futures[:] @@ -42,30 +43,27 @@ func TestWaitAll(t *testing.T) { // given promises, futures := makePromisesAndFutures[int]() - promises[0].Fulfill(1) - promises[1].Reject(errTest) - close(promises[2]) - memoizers := make([]async.Awaitable[int], 0, len(futures)) - for _, f := range futures { - memoizers = append(memoizers, f.Memoize()) - } + promises[0].Resolve(1) + promises[1].Reject(errTest) + promises[2].Resolve(2) // when ctx := context.Background() - results, err := async.WaitAll(ctx, memoizers...) + results := async.AwaitAllResults(ctx, futures...) // then - if assert.NoError(t, err) { - v0, err0 := results[0].V() - _, err1 := results[1].V() - _, err2 := results[2].V() + assert.Len(t, results, len(futures)) + v0, err0 := results[0].V() + err1 := results[1].Err() + v2, err2 := results[2].V() - if assert.NoError(t, err0) { - assert.Equal(t, 1, v0) - } - assert.ErrorIs(t, err1, errTest) - assert.ErrorIs(t, err2, async.ErrNoResult) + if assert.NoError(t, err0) { + assert.Equal(t, 1, v0) + } + assert.ErrorIs(t, err1, errTest) + if assert.NoError(t, err2) { + assert.Equal(t, 2, v2) } } @@ -75,15 +73,16 @@ func TestAllValues(t *testing.T) { // given promises, futures := makePromisesAndFutures[int]() for i := 0; i < iterations; i++ { - promises[i].Fulfill(i + 1) + promises[i].Resolve(i + 1) } // when ctx := context.Background() - results, err := async.WaitAllValues(ctx, futures...) + results, err := async.AwaitAllValues(ctx, futures...) // then if assert.NoError(t, err) { + assert.Len(t, results, iterations) for i := 0; i < iterations; i++ { assert.Equal(t, i+1, results[i]) } @@ -99,7 +98,7 @@ func TestAllValuesError(t *testing.T) { // when ctx := context.Background() - _, err := async.WaitAllValues(ctx, futures...) + _, err := async.AwaitAllValues(ctx, futures...) // then assert.ErrorIs(t, err, errTest) @@ -110,15 +109,15 @@ func TestFirst(t *testing.T) { // given promises, futures := makePromisesAndFutures[int]() - promises[1].Fulfill(2) + promises[1].Resolve(2) // when ctx := context.Background() - result, err := async.WaitFirst(ctx, futures...) + v, err := async.AwaitFirst(ctx, futures...) // then if assert.NoError(t, err) { - assert.Equal(t, 2, result) + assert.Equal(t, 2, v) } } @@ -127,22 +126,29 @@ func TestCombineCancellation(t *testing.T) { subTests := []struct { name string - combine func(context.Context, ...async.Awaitable[int]) (any, error) + combine func([]async.Future[int], context.Context) error }{ - {name: "First", combine: func(ctx context.Context, futures ...async.Awaitable[int]) (any, error) { - return async.WaitFirst(ctx, futures...) + {name: "First", combine: func(futures []async.Future[int], ctx context.Context) error { + _, err := async.AwaitFirst(ctx, futures...) + + return err }}, - {name: "All", combine: func(ctx context.Context, futures ...async.Awaitable[int]) (any, error) { - return async.WaitAll(ctx, futures...) + {name: "All", combine: func(futures []async.Future[int], ctx context.Context) error { + r := async.AwaitAllResults(ctx, futures...) + + return r[0].Err() }}, - {name: "AllValues", combine: func(ctx context.Context, futures ...async.Awaitable[int]) (any, error) { - return async.WaitAllValues(ctx, futures...) + {name: "AllValues", combine: func(futures []async.Future[int], ctx context.Context) error { + _, err := async.AwaitAllValues(ctx, futures...) + + return err }}, } for _, tc := range subTests { combine := tc.combine - t.Run(tc.name, func(t *testing.T) { + test := func(t *testing.T) { + t.Helper() t.Parallel() // given @@ -152,30 +158,32 @@ func TestCombineCancellation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - _, err := combine(ctx, futures...) + err := combine(futures, ctx) // then assert.ErrorIs(t, err, context.Canceled) - }) + } + + _ = t.Run(tc.name, test) } } -func TestCombineMemoized(t *testing.T) { //nolint:funlen +func TestCombineMemoized(t *testing.T) { t.Parallel() subTests := []struct { name string - combine func(context.Context, ...async.Awaitable[int]) (any, error) + combine func(context.Context, []async.Future[int]) (any, error) expect func(t *testing.T, actual any) }{ - {name: "First", combine: func(ctx context.Context, futures ...async.Awaitable[int]) (any, error) { - return async.WaitFirst(ctx, futures...) + {name: "First", combine: func(ctx context.Context, futures []async.Future[int]) (any, error) { + return async.AwaitFirst(ctx, futures...) }, expect: func(t *testing.T, actual any) { t.Helper(); assert.Equal(t, 3, actual) }}, - {name: "All", combine: func(ctx context.Context, futures ...async.Awaitable[int]) (any, error) { - return async.WaitAll(ctx, futures...) + {name: "All", combine: func(ctx context.Context, futures []async.Future[int]) (any, error) { + return async.AwaitAllResults(ctx, futures...), nil }, expect: func(t *testing.T, actual any) { t.Helper() - vv, ok := actual.([]async.Result[int]) + vv, ok := actual.([]result.Result[int]) if !ok { assert.Fail(t, "Unexpected result type") @@ -189,91 +197,117 @@ func TestCombineMemoized(t *testing.T) { //nolint:funlen } } }}, - {name: "AllValues", combine: func(ctx context.Context, futures ...async.Awaitable[int]) (any, error) { - return async.WaitAllValues(ctx, futures...) + {name: "AllValues", combine: func(ctx context.Context, futures []async.Future[int]) (any, error) { + return async.AwaitAllValues(ctx, futures...) }, expect: func(t *testing.T, actual any) { t.Helper(); assert.Equal(t, []int{3, 3, 3}, actual) }}, } for _, tc := range subTests { combine := tc.combine expect := tc.expect - t.Run(tc.name, func(t *testing.T) { + _ = t.Run(tc.name, func(t *testing.T) { + t.Helper() t.Parallel() // given promises, futures := makePromisesAndFutures[int]() - for _, promise := range promises { - promise.Fulfill(3) - } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - memoizers := make([]async.Awaitable[int], 0, len(futures)) - for _, f := range futures { - memoizer := f.Memoize() - memoizers = append(memoizers, memoizer) - _, _ = memoizer.TryWait() + for _, p := range promises { + p.Resolve(3) } // when - ctx := context.Background() - - result, err := combine(ctx, memoizers...) + v, err := combine(ctx, futures) // then if assert.NoError(t, err) { - expect(t, result) + expect(t, v) } }) } } -func TestCombineAfterMemoized(t *testing.T) { +func TestAwaitAllEmpty(t *testing.T) { t.Parallel() - subTests := []struct { - name string - combine func(context.Context, ...async.Awaitable[int]) (any, error) - expect func(t *testing.T, actual any) - }{ - {name: "First", combine: func(ctx context.Context, futures ...async.Awaitable[int]) (any, error) { - return async.WaitFirst(ctx, futures...) - }}, - {name: "AllValues", combine: func(ctx context.Context, futures ...async.Awaitable[int]) (any, error) { - return async.WaitAllValues(ctx, futures...) - }}, + // given + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // when + results := async.AwaitAllResultsAny(ctx) + + // then + assert.Empty(t, results) +} + +func TestAwaitAllValuesEmpty(t *testing.T) { + t.Parallel() + + // given + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // when + results, err := async.AwaitAllValuesAny(ctx) + + // then + if assert.NoError(t, err) { + assert.Empty(t, results) } +} - for _, tc := range subTests { - combine := tc.combine - t.Run(tc.name, func(t *testing.T) { - t.Parallel() +func TestAwaitFirstEmpty(t *testing.T) { + t.Parallel() - // given - promises, futures := makePromisesAndFutures[int]() + // given + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - promises[1].Reject(errTest) + // when + _, err := async.AwaitFirstAny(ctx) - memoizers := make([]async.Awaitable[int], 0, len(futures)) - for _, f := range futures { - memoizer := f.Memoize() - memoizers = append(memoizers, memoizer) - } + // then + assert.ErrorIs(t, err, async.ErrNoResult) +} - // when - ctx := context.Background() - _, err := combine(ctx, memoizers...) +func TestAllAny(t *testing.T) { + // given + t.Parallel() + ctx := context.Background() - close(promises[0]) + p1, f1 := async.New[int]() + p2, f2 := async.New[string]() + p3, f3 := async.New[struct{}]() - _, err0 := memoizers[0].TryWait() - _, err1 := memoizers[1].TryWait() - _, err2 := memoizers[2].TryWait() + p1.Resolve(1) + p2.Resolve("test") + p3.Resolve(struct{}{}) - // then - assert.ErrorIs(t, err, errTest) - assert.ErrorIs(t, err0, async.ErrNoResult) - assert.ErrorIs(t, err1, errTest) - assert.ErrorIs(t, err2, async.ErrNotReady) - }) + // when + results := make([]result.Result[any], 3) + async.AwaitAllAny(ctx, f1, f2, f3)(func(i int, r result.Result[any]) bool { + results[i] = r + + return true + }) + + // then + for i, r := range results { + if assert.NoError(t, r.Err()) { + switch i { + case 0: + assert.Equal(t, 1, r.Value()) + case 1: + assert.Equal(t, "test", r.Value()) + case 2: + assert.Equal(t, struct{}{}, r.Value()) + default: + assert.Fail(t, "unexpected index") + } + } } } diff --git a/future.go b/future.go index d7ed6b4..816c4fe 100644 --- a/future.go +++ b/future.go @@ -20,92 +20,77 @@ import ( "context" "errors" "fmt" + + "fillmore-labs.com/exp/async/result" ) -// Future represents an asynchronous operation that will complete sometime in the future. -// -// It is a read-only channel that can be used to retrieve the final result of a [Promise] with [Future.Wait]. -type Future[R any] <-chan Result[R] +// ErrNotReady is returned when a future is not complete. +var ErrNotReady = errors.New("future not ready") -// NewFuture provides a simple way to create a Future for synchronous operations. -// This allows synchronous and asynchronous code to be composed seamlessly and separating initiation from running. -// -// The returned [Future] can be used to retrieve the eventual result of the [Promise]. -func NewFuture[R any]() (Future[R], Promise[R]) { - ch := make(chan Result[R], 1) +// Future represents a read-only view of the result of an asynchronous operation. +type Future[R any] struct { + *value[R] +} - return ch, ch +type AnyFuture interface { + Done() <-chan struct{} + any() result.Result[any] } -// NewFutureAsync runs f asynchronously, immediately returning a [Future] that can be used to retrieve the eventual -// result. This allows separating evaluating the result from computation. -func NewFutureAsync[R any](fn func() (R, error)) Future[R] { - f, p := NewFuture[R]() +// NewAsync runs fn asynchronously, immediately returning a [Future] that can be used to retrieve the +// eventual result. This allows separating evaluating the result from computation. +func NewAsync[R any](fn func() (R, error)) Future[R] { + p, f := New[R]() go p.Do(fn) return f } -// Awaitable is the underlying interface for [Future] and [Memoizer]. -// Plain futures can only be queried once, while memoizers can be queried multiple times. -type Awaitable[R any] interface { - Wait(ctx context.Context) (R, error) // Wait returns the final result of the associated [Promise]. - TryWait() (R, error) // TryWait returns the result when ready, [ErrNotReady] otherwise. - Memoize() *Memoizer[R] // Memoizer returns a [Memoizer] which can be queried multiple times. - - channel() <-chan Result[R] - addRunning() - releaseRunning() - processResult(r Result[R], ok bool) Result[R] -} - -// ErrNotReady is returned when a future is not complete. -var ErrNotReady = errors.New("future not ready") - -// ErrNoResult is returned when a future completes but has no defined result value. -var ErrNoResult = errors.New("no result") - -// Wait returns the final result of the associated [Promise]. -// It can only be called once and blocks until a result is received or the context is canceled. -// If you need to read multiple times from a [Future] wrap it with [Future.Memoize]. -func (f Future[R]) Wait(ctx context.Context) (R, error) { - select { - case r, ok := <-f: - return f.processResult(r, ok).V() +// Await returns the cached result or blocks until a result is available or the context is canceled. +func (f Future[R]) Await(ctx context.Context) (R, error) { + select { // wait for future completion or context cancel + case <-f.done: + return f.v.V() case <-ctx.Done(): - return *new(R), fmt.Errorf("future wait: %w", ctx.Err()) + return *new(R), fmt.Errorf("future await: %w", context.Cause(ctx)) } } -// TryWait returns the result when ready, [ErrNotReady] otherwise. -func (f Future[R]) TryWait() (R, error) { +// Try returns the cached result when ready, [ErrNotReady] otherwise. +func (f Future[R]) Try() (R, error) { select { - case r, ok := <-f: - return f.processResult(r, ok).V() + case <-f.done: + return f.v.V() default: return *new(R), ErrNotReady } } -// Memoize creates a new [Memoizer], consuming the [Future]. -func (f Future[R]) Memoize() *Memoizer[R] { - return &Memoizer[R]{done: make(chan struct{}), future: f} +// OnComplete executes fn when the [Future] is fulfilled. +func (f Future[R]) OnComplete(fn func(r result.Result[R])) { + f.onComplete(fn) } -func (f Future[R]) processResult(r Result[R], ok bool) Result[R] { - if ok { - return r +func (f Future[R]) ToChannel() <-chan result.Result[R] { + ch := make(chan result.Result[R], 1) + fn := func(r result.Result[R]) { + ch <- r + close(ch) } - return errorResult[R]{err: ErrNoResult} -} + f.onComplete(fn) -func (f Future[R]) channel() <-chan Result[R] { //nolint:unused - return f + return ch } -func (f Future[R]) addRunning() {} //nolint:unused +// Done returns a channel that is closed when the future is complete. +// It enables the use of future values in select statements. +func (f Future[_]) Done() <-chan struct{} { + return f.done +} -func (f Future[R]) releaseRunning() {} //nolint:unused +func (f Future[_]) any() result.Result[any] { + return f.v.Any() +} diff --git a/memoizer_test.go b/future_test.go similarity index 59% rename from memoizer_test.go rename to future_test.go index 24ae286..f46cd34 100644 --- a/memoizer_test.go +++ b/future_test.go @@ -18,95 +18,99 @@ package async_test import ( "context" + "errors" "sync" "testing" "time" "fillmore-labs.com/exp/async" "github.com/stretchr/testify/assert" + "go.uber.org/goleak" ) -func TestCancellation(t *testing.T) { +var errTest = errors.New("test error") + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + +func TestAsyncValue(t *testing.T) { t.Parallel() // given ctx, cancel := context.WithCancel(context.Background()) - cancel() - f, _ := async.NewFuture[int]() + defer cancel() // when - m := f.Memoize() - _, err := m.Wait(ctx) + f := async.NewAsync(func() (int, error) { return 1, nil }) + value, err := f.Await(ctx) // then - assert.ErrorIs(t, err, context.Canceled) + if assert.NoError(t, err) { + assert.Equal(t, 1, value) + } } -func TestMultiple(t *testing.T) { +func TestAsyncError(t *testing.T) { t.Parallel() // given - const iterations = 1_000 - f, p := async.NewFuture[int]() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // when - m := f.Memoize() + f := async.NewAsync(func() (int, error) { return 0, errTest }) + _, err := f.Await(ctx) - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() + // then + assert.ErrorIs(t, err, errTest) +} - var values [iterations]int - var errors [iterations]error +func TestCancellation(t *testing.T) { + t.Parallel() - var wg sync.WaitGroup - wg.Add(iterations) - for i := 0; i < iterations; i++ { - go func(i int) { - defer wg.Done() - values[i], errors[i] = m.Wait(ctx) - }(i) - } - p.Fulfill(1) - wg.Wait() + // given + _, f := async.New[int]() + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + // when + _, err := f.Await(ctx) // then - for i := 0; i < iterations; i++ { - if assert.NoError(t, errors[i]) { - assert.Equal(t, 1, values[i]) - } - } + assert.ErrorIs(t, err, context.Canceled) } -func TestMultipleClosed(t *testing.T) { +func TestMultiple(t *testing.T) { t.Parallel() // given const iterations = 1_000 - f, p := async.NewFuture[int]() + p, f := async.New[int]() // when - m := f.Memoize() - - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() var values [iterations]int - var errors [iterations]error + var errs [iterations]error var wg sync.WaitGroup wg.Add(iterations) for i := 0; i < iterations; i++ { go func(i int) { defer wg.Done() - values[i], errors[i] = m.Wait(ctx) + values[i], errs[i] = f.Await(ctx) }(i) } - close(p) + p.Resolve(1) wg.Wait() // then for i := 0; i < iterations; i++ { - assert.ErrorIs(t, errors[i], async.ErrNoResult) + if assert.NoError(t, errs[i]) { + assert.Equal(t, 1, values[i]) + } } } @@ -114,15 +118,14 @@ func TestTryWait(t *testing.T) { t.Parallel() // given - f, p := async.NewFuture[int]() + p, f := async.New[int]() // when - m := f.Memoize() - _, err1 := m.TryWait() - p.Fulfill(1) + _, err1 := f.Try() - value2, err2 := m.TryWait() - value3, err3 := m.TryWait() + p.Resolve(1) + value2, err2 := f.Try() + value3, err3 := f.Try() // then assert.ErrorIs(t, err1, async.ErrNotReady) @@ -134,40 +137,24 @@ func TestTryWait(t *testing.T) { } } -func TestMemoize(t *testing.T) { - t.Parallel() - - // given - f, _ := async.NewFuture[int]() - - // when - m := f.Memoize() - mm := m.Memoize() - - // then - assert.Same(t, m, mm) -} - func TestMemoizerAllValues(t *testing.T) { t.Parallel() // given const iterations = 1_000 - f, p := async.NewFuture[int]() + p, f := async.New[int]() // when - m := f.Memoize() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - var memoizers [iterations]async.Awaitable[int] + futures := make([]async.Future[int], iterations) for i := 0; i < iterations; i++ { - memoizers[i] = m + futures[i] = f } - _ = time.AfterFunc(1*time.Millisecond, func() { p.Fulfill(1) }) - values, err := async.WaitAllValues(ctx, memoizers[:]...) + _ = time.AfterFunc(1*time.Millisecond, func() { p.Resolve(1) }) + values, err := async.AwaitAllValues(ctx, futures...) // then if assert.NoError(t, err) { @@ -176,3 +163,22 @@ func TestMemoizerAllValues(t *testing.T) { } } } + +func TestToChannel(t *testing.T) { + t.Parallel() + + // given + p, f := async.New[int]() + p.Resolve(1) + + // when + ch := f.ToChannel() + + // then + v, err := (<-ch).V() + _, ok := <-ch + if assert.NoError(t, err) { + assert.Equal(t, 1, v) + } + assert.False(t, ok) +} diff --git a/go.mod b/go.mod index fc5842f..5a5af0c 100644 --- a/go.mod +++ b/go.mod @@ -2,12 +2,16 @@ module fillmore-labs.com/exp/async go 1.21 -toolchain go1.22.0 +toolchain go1.22.1 -require github.com/stretchr/testify v1.8.4 +require ( + github.com/stretchr/testify v1.9.0 + go.uber.org/goleak v1.3.0 +) require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index fa4b6e6..7a38846 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,18 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/iterator.go b/iterator.go new file mode 100644 index 0000000..7ca95dd --- /dev/null +++ b/iterator.go @@ -0,0 +1,93 @@ +// Copyright 2023-2024 Oliver Eikemeier. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package async + +import ( + "context" + "fmt" + "reflect" + "runtime/trace" + + "fillmore-labs.com/exp/async/result" +) + +// This iterator is used to combine the results of multiple asynchronous operations waiting in parallel. +type iterator[R any, F AnyFuture] struct { + _ noCopy + numFutures int + active []F + cases []reflect.SelectCase + value func(f F) result.Result[R] + ctx context.Context //nolint:containedctx +} + +func newIterator[R any, F AnyFuture]( + ctx context.Context, value func(f F) result.Result[R], l []F, +) *iterator[R, F] { + numFutures := len(l) + active := make([]F, numFutures) + _ = copy(active, l) + + cases := make([]reflect.SelectCase, numFutures+1) + for idx, f := range active { + cases[idx] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(f.Done()), + } + } + cases[numFutures] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(ctx.Done()), + } + + return &iterator[R, F]{ + numFutures: numFutures, + active: active, + cases: cases, + value: value, + ctx: ctx, + } +} + +func (i *iterator[R, F]) yieldTo(yield func(int, result.Result[R]) bool) { + defer trace.StartRegion(i.ctx, "asyncSeq").End() + for run := 0; run < i.numFutures; run++ { + chosen, _, _ := reflect.Select(i.cases) + + if chosen == i.numFutures { // context channel + err := fmt.Errorf("list yield canceled: %w", context.Cause(i.ctx)) + i.yieldErr(yield, err) + + break + } + + i.cases[chosen].Chan = reflect.Value{} // Disable case + v := i.value(i.active[chosen]) + if !yield(chosen, v) { + break + } + } +} + +func (i *iterator[R, F]) yieldErr(yield func(int, result.Result[R]) bool, err error) { + e := result.OfError[R](err) + for idx := 0; idx < i.numFutures; idx++ { + if i.cases[idx].Chan.IsValid() && !yield(idx, e) { + break + } + } +} diff --git a/memoizer.go b/memoizer.go deleted file mode 100644 index 9abeca3..0000000 --- a/memoizer.go +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright 2023-2024 Oliver Eikemeier. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// SPDX-License-Identifier: Apache-2.0 - -package async - -import ( - "context" - "fmt" - "runtime/trace" - "sync/atomic" -) - -// Memoizer caches results from a [Future] to enable multiple queries and avoid unnecessary recomputation. -type Memoizer[R any] struct { - future <-chan Result[R] // future is the [Future] being cached - running atomic.Int32 // number of goroutines at a select - done chan struct{} // done signals when future has completed - value Result[R] // value will hold the cached result -} - -// Wait returns the cached result or blocks until a result is available or the context is canceled. -func (m *Memoizer[R]) Wait(ctx context.Context) (R, error) { - defer trace.StartRegion(ctx, "asyncMemoizerWait").End() - m.addRunning() - select { // wait for future completion or context cancel - case r, ok := <-m.channel(): - return m.processResult(r, ok).V() - - case <-ctx.Done(): - m.releaseRunning() - - return *new(R), fmt.Errorf("memoizer wait: %w", ctx.Err()) - } -} - -// TryWait returns the cached result when ready, [ErrNotReady] otherwise. -func (m *Memoizer[R]) TryWait() (R, error) { - m.addRunning() - select { - case r, ok := <-m.channel(): - return m.processResult(r, ok).V() - - default: - m.releaseRunning() - - return *new(R), ErrNotReady - } -} - -// Memoize returns this [Memoizer]. -func (m *Memoizer[R]) Memoize() *Memoizer[R] { - return m -} - -// processResult handles caching the result when received on the future channel. -// It signals completion on done after updating value. -func (m *Memoizer[R]) processResult(r Result[R], ok bool) Result[R] { - if ok { // We got a result - m.value = r - close(m.done) - m.releaseRunning() // This has to be done after signalling done - - return r - } - - if m.thereAreOthers() { // Wait for other goroutines to resolve the closed channel - <-m.done - - return m.value - } - - // This is the last goroutine and the channel is closed - select { - case <-m.done: // Some other goroutine resolved - - default: // The channel closed without a result - m.value = errorResult[R]{ErrNoResult} - close(m.done) - } - m.releaseRunning() - - return m.value -} - -// channel simply returns the underlying future channel. -func (m *Memoizer[R]) channel() <-chan Result[R] { - return m.future -} - -// addRunning manage the running counter atomically. -func (m *Memoizer[R]) addRunning() { - m.running.Add(1) -} - -// releaseRunning manage the running counter atomically. -func (m *Memoizer[R]) releaseRunning() { - m.running.Add(-1) -} - -// thereAreOthers checks if this goroutine is the only remaining one after the channel is closed. -// -// How does this work? -// -// We use an atomic counter to track the number of goroutines running. We are leaving the running phase by -// decrementing the counter and wait for the others to finish and resolve the value. -// -// If after decrementing the counter is 0, we know that there are no other goroutines running (only waiting), so we have -// to resolve ourselves. -// -// If now another goroutine starts, increasing the counter to 1 again, we can not swap out the 0 count to 1 and leave -// the work to the new goroutine. -// -// If we can swap out the counter, every later started new goroutine sees that there is another running and will leave -// resolving to it. -func (m *Memoizer[R]) thereAreOthers() bool { - return m.running.Add(-1) != 0 || !m.running.CompareAndSwap(0, 1) -} diff --git a/combine_seq.go b/nocopy.go similarity index 63% rename from combine_seq.go rename to nocopy.go index 60acddd..c9f75b3 100644 --- a/combine_seq.go +++ b/nocopy.go @@ -14,20 +14,9 @@ // // SPDX-License-Identifier: Apache-2.0 -//go:build goexperiment.rangefunc - package async -import ( - "context" - - // experimental. - "iter" -) +type noCopy struct{} -// All returns the results of all completed futures as a range function. If the context is canceled, it returns early. -func All[R any](ctx context.Context, futures ...Awaitable[R]) iter.Seq2[int, Result[R]] { - return func(yield func(int, Result[R]) bool) { - _ = YieldAll[R](ctx, yield, futures...) - } -} +func (*noCopy) Lock() {} +func (*noCopy) Unlock() {} diff --git a/promise.go b/promise.go index c296176..191792b 100644 --- a/promise.go +++ b/promise.go @@ -16,29 +16,37 @@ package async -// Promise is used to send the result of an asynchronous operation. -// -// It is a write-only channel. -// Either [Promise.Fulfill] or [Promise.Reject] should be called exactly once. -type Promise[R any] chan<- Result[R] - -// Do runs f synchronously, fulfilling the promise once it completes. -func (p Promise[R]) Do(f func() (R, error)) { - if value, err := f(); err == nil { - p.Fulfill(value) - } else { - p.Reject(err) +import "fillmore-labs.com/exp/async/result" + +// Promise defines the common operations for resolving a [Future] to its final value. +// Implementations allow calling on of the functions from any goroutine once. Any subsequent call will panic. +type Promise[R any] struct { + *value[R] +} + +func New[R any]() (Promise[R], Future[R]) { + r := value[R]{ + done: make(chan struct{}), + queue: make(chan []func(result result.Result[R]), 1), } + r.queue <- nil + + return Promise[R]{value: &r}, Future[R]{value: &r} } -// Fulfill fulfills the promise with a value. -func (p Promise[R]) Fulfill(value R) { - p <- valueResult[R]{value: value} - close(p) +// func (p Promise[R]) Future() Future[R] { return Future[R]{value: p.value} } + +// Resolve resolves the promise with a value. +func (p Promise[R]) Resolve(value R) { + p.complete(result.OfValue(value)) } // Reject breaks the promise with an error. func (p Promise[R]) Reject(err error) { - p <- errorResult[R]{err: err} - close(p) + p.complete(result.OfError[R](err)) +} + +// Do runs fn synchronously, fulfilling the [Promise] once it completes. +func (p Promise[R]) Do(fn func() (R, error)) { + p.complete(result.Of(fn())) } diff --git a/result.go b/result/result.go similarity index 57% rename from result.go rename to result/result.go index da34a57..b265fb4 100644 --- a/result.go +++ b/result/result.go @@ -14,14 +14,34 @@ // // SPDX-License-Identifier: Apache-2.0 -package async +package result // Result defines the interface for returning results from asynchronous operations. // It encapsulates the final value or error from the operation. type Result[R any] interface { - V() (R, error) // The V method returns the final value or an error. - Value() R // The Value method returns the final value. - Err() error // The Err method returns the error. + V() (R, error) // The V method returns the final value or an error. + Value() R // The Value method returns the final value. + Err() error // The Err method returns the error. + Any() Result[any] // The Any method returns a Result[any] that can be used with any type. +} + +// Of creates a new [Result] from a pair of values. +func Of[R any](value R, err error) Result[R] { + if err != nil { + return errorResult[R]{err: err} + } + + return valueResult[R]{value: value} +} + +// OfValue creates a new [Result] from a value. +func OfValue[R any](value R) Result[R] { + return valueResult[R]{value: value} +} + +// OfError creates a new [Result] from an error. +func OfError[R any](err error) Result[R] { + return errorResult[R]{err: err} } // valueResult is an implementation of [Result] that simply holds a value. @@ -40,12 +60,17 @@ func (v valueResult[R]) Value() R { } // The Err method returns nil. -func (v valueResult[R]) Err() error { +func (v valueResult[_]) Err() error { return nil } +// Any returns the valueResult as a Result[any]. +func (v valueResult[_]) Any() Result[any] { + return valueResult[any]{value: v.value} +} + // errorResult handles errors from failed operations. -type errorResult[R any] struct { +type errorResult[_ any] struct { err error } @@ -60,6 +85,11 @@ func (e errorResult[R]) Value() R { } // Err returns the stored error. -func (e errorResult[R]) Err() error { +func (e errorResult[_]) Err() error { return e.err } + +// Any returns the errorResult as a Result[any]. +func (e errorResult[_]) Any() Result[any] { + return errorResult[any](e) +} diff --git a/result/result_test.go b/result/result_test.go new file mode 100644 index 0000000..6e79b10 --- /dev/null +++ b/result/result_test.go @@ -0,0 +1,96 @@ +// Copyright 2023-2024 Oliver Eikemeier. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package result_test + +import ( + "errors" + "testing" + + "fillmore-labs.com/exp/async/result" + "github.com/stretchr/testify/assert" +) + +var errTest = errors.New("test error") + +func TestV(t *testing.T) { + t.Parallel() + // given + r := result.OfValue(1) + // when + v, err := r.V() + // then + if assert.NoError(t, err) { + assert.Equal(t, 1, v) + } +} + +func TestVErr(t *testing.T) { + t.Parallel() + // given + r := result.OfError[struct{}](errTest) + // when + _, err := r.V() + // then + assert.ErrorIs(t, err, errTest) +} + +func TestOf(t *testing.T) { + t.Parallel() + // given + r := result.Of(1, nil) + // when + v := r.Value() + err := r.Err() + // then + if assert.NoError(t, err) { + assert.Equal(t, 1, v) + } +} + +func TestOfErr(t *testing.T) { + t.Parallel() + // given + r := result.Of(1, errTest) + // when + _ = r.Value() // doesn't panic + err := r.Err() + // then + assert.ErrorIs(t, err, errTest) +} + +func TestAny(t *testing.T) { + t.Parallel() + // given + r := result.OfValue(1) + // when + r2 := r.Any() + // then + if assert.NoError(t, r2.Err()) { + assert.Equal(t, 1, r2.Value()) + } +} + +func TestAnyErr(t *testing.T) { + t.Parallel() + // given + r := result.OfError[int](errTest) + // when + r2 := r.Any() + // then + assert.ErrorIs(t, r2.Err(), errTest) + _ = r2.Value() +} diff --git a/result_test.go b/result_test.go deleted file mode 100644 index b5bb3b7..0000000 --- a/result_test.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2023-2024 Oliver Eikemeier. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// SPDX-License-Identifier: Apache-2.0 - -package async_test - -import ( - "testing" - - "fillmore-labs.com/exp/async" - "github.com/stretchr/testify/assert" -) - -func TestErrorResult(t *testing.T) { - // given - f, p := async.NewFuture[int]() - p.Reject(errTest) - r := <-f - - // when - v, err := r.Value(), r.Err() - - // then - assert.ErrorIs(t, err, errTest) - assert.Equal(t, 0, v) -} - -func TestValueResult(t *testing.T) { - // given - f, p := async.NewFuture[int]() - p.Fulfill(1) - r := <-f - - // when - v, err := r.Value(), r.Err() - - // then - assert.NoError(t, err) - assert.Equal(t, 1, v) -} diff --git a/then.go b/then.go index 0b54718..ed47158 100644 --- a/then.go +++ b/then.go @@ -16,21 +16,27 @@ package async -import "context" - -// Then transforms the embedded result from an [Awaitable] using 'then'. -// This allows to easily handle errors embedded in the response. -// It blocks until a result is received or the context is canceled. -func Then[R, S any](ctx context.Context, f Awaitable[R], then func(R) (S, error)) (S, error) { - reply, err := f.Wait(ctx) - if err != nil { - return *new(S), err - } - - return then(reply) +import "fillmore-labs.com/exp/async/result" + +// Transform transforms the value of a successful [Future] synchronously into another, enabling i.e. unwrapping of +// values. +func Transform[R, S any](f Future[R], fn func(R, error) (S, error)) Future[S] { + ps, fs := New[S]() + + f.OnComplete(func(r result.Result[R]) { + ps.Do(func() (S, error) { return fn(r.V()) }) + }) + + return fs } -// ThenAsync asynchronously transforms the embedded result from an [Awaitable] using 'then'. -func ThenAsync[R, S any](ctx context.Context, f Awaitable[R], then func(R) (S, error)) Future[S] { - return NewFutureAsync[S](func() (S, error) { return Then(ctx, f, then) }) +// AndThen executes fn asynchronously when future f completes, enabling chaining of operations. +func AndThen[R, S any](f Future[R], fn func(R, error) (S, error)) Future[S] { + ps, fs := New[S]() + + f.OnComplete(func(r result.Result[R]) { + go ps.Do(func() (S, error) { return fn(r.V()) }) + }) + + return fs } diff --git a/then_test.go b/then_test.go index e4ffa86..735bb33 100644 --- a/then_test.go +++ b/then_test.go @@ -18,104 +18,102 @@ package async_test import ( "context" + "strconv" "testing" "fillmore-labs.com/exp/async" - "github.com/stretchr/testify/suite" + "github.com/stretchr/testify/assert" ) -type ThenTestSuite struct { - suite.Suite - future async.Future[int] - promise async.Promise[int] -} +func itoa(i int, err error) (string, error) { + if err != nil { + return "", err + } -func TestThenTestSuite(t *testing.T) { - t.Parallel() - suite.Run(t, new(ThenTestSuite)) -} + if i < 0 { + return "", errTest + } -func (s *ThenTestSuite) SetupSubTest() { - s.future, s.promise = async.NewFuture[int]() + return strconv.Itoa(i), nil } -func (s *ThenTestSuite) fromFuture() async.Awaitable[int] { return s.future } -func (s *ThenTestSuite) fromMemoizer() async.Awaitable[int] { return s.future.Memoize() } +func TestTransform1(t *testing.T) { + t.Parallel() -type futureMemoizerTest[R any] struct { - name string - awaitable func() async.Awaitable[R] -} + // given + p, f := async.New[int]() + p.Resolve(42) -func (s *ThenTestSuite) createFutureMemoizerTests() []futureMemoizerTest[int] { - return []futureMemoizerTest[int]{ - {name: "Future", awaitable: s.fromFuture}, - {name: "Memoizer", awaitable: s.fromMemoizer}, - } -} + // when + f1 := async.Transform(f, itoa) -func add1OrError(value int) (int, error) { - if value == 2 { - return 0, errTest + // then + v, err := f1.Try() + if assert.NoError(t, err) { + assert.Equal(t, "42", v) } - - return value + 1, nil } -func (s *ThenTestSuite) TestThen() { - futureMemoizerTests := s.createFutureMemoizerTests() +func TestTransform2(t *testing.T) { + t.Parallel() - for _, tc := range futureMemoizerTests { - _ = s.Run(tc.name, func() { - // given - awaitable := tc.awaitable() - s.promise.Fulfill(1) + // given + p, f := async.New[int]() - // when - value, err := async.Then[int, int](context.Background(), awaitable, add1OrError) + // when + f1 := async.Transform(f, itoa) + p.Resolve(42) - // then - if s.NoError(err) { - s.Equal(2, value) - } - }) + // then + v, err := f1.Try() + if assert.NoError(t, err) { + assert.Equal(t, "42", v) } } -func (s *ThenTestSuite) TestThenError() { - futureMemoizerTests := s.createFutureMemoizerTests() +func TestTransformError1(t *testing.T) { + t.Parallel() - for _, tc := range futureMemoizerTests { - _ = s.Run(tc.name, func() { - // given - awaitable := tc.awaitable() + // given + p, f := async.New[int]() - s.promise.Reject(errTest) + // when + f1 := async.Transform(f, itoa) + p.Reject(errTest) - // when - _, err := async.Then(context.Background(), awaitable, add1OrError) + // then + _, err := f1.Try() + assert.ErrorIs(t, err, errTest) +} - // then - s.ErrorIs(err, errTest) - }) - } +func TestTransformError2(t *testing.T) { + t.Parallel() + + // given + p, f := async.New[int]() + + // when + f1 := async.Transform(f, itoa) + p.Resolve(-1) + + // then + _, err := f1.Try() + assert.ErrorIs(t, err, errTest) } -func (s *ThenTestSuite) TestThenAsync() { - futureMemoizerTests := s.createFutureMemoizerTests() +func TestAndThen(t *testing.T) { + t.Parallel() - for _, tc := range futureMemoizerTests { - _ = s.Run(tc.name, func() { - // given - awaitable := tc.awaitable() - f := async.ThenAsync(context.Background(), awaitable, add1OrError) - s.promise.Fulfill(2) + // given + p, f := async.New[int]() - // when - _, err := f.Wait(context.Background()) + // when + f1 := async.AndThen(f, itoa) + p.Resolve(42) - // then - s.ErrorIs(err, errTest) - }) + // then + v, err := f1.Await(context.Background()) + if assert.NoError(t, err) { + assert.Equal(t, "42", v) } } diff --git a/value.go b/value.go new file mode 100644 index 0000000..16fe45c --- /dev/null +++ b/value.go @@ -0,0 +1,48 @@ +// Copyright 2023-2024 Oliver Eikemeier. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package async + +import "fillmore-labs.com/exp/async/result" + +// value wraps a [Result] to enable multiple queries and avoid unnecessary recomputation. +type value[R any] struct { + _ noCopy + done chan struct{} // signals when future has completed + v result.Result[R] // valid only when done is closed + queue chan []func(result result.Result[R]) // list of functions to execute synchronously when completed +} + +func (r *value[R]) complete(value result.Result[R]) { + r.v = value + close(r.done) + + queue := <-r.queue + close(r.queue) + + for _, fn := range queue { + fn(value) + } +} + +func (r *value[R]) onComplete(fn func(value result.Result[R])) { + if queue, ok := <-r.queue; ok { + queue = append(queue, fn) + r.queue <- queue + } else { + fn(r.v) + } +}