diff --git a/lib/src/aggregate_sample.dart b/lib/src/aggregate_sample.dart index b104030..f2ff8ed 100644 --- a/lib/src/aggregate_sample.dart +++ b/lib/src/aggregate_sample.dart @@ -4,6 +4,8 @@ import 'dart:async'; +import 'common_callbacks.dart'; + extension AggregateSample on Stream { /// Computes a value based on sequences of events, then emits that value when /// [trigger] emits an event. @@ -136,7 +138,7 @@ extension AggregateSample on Stream { triggerSub!.pause(); } if (cancels.isEmpty) return null; - return cancels.wait.then((_) => null); + return cancels.wait.then(ignoreArgument); }; }; return controller.stream; diff --git a/lib/src/async_expand.dart b/lib/src/async_expand.dart index e0c7672..28d2f40 100644 --- a/lib/src/async_expand.dart +++ b/lib/src/async_expand.dart @@ -4,6 +4,7 @@ import 'dart:async'; +import 'common_callbacks.dart'; import 'switch.dart'; /// Alternatives to [asyncExpand]. @@ -78,7 +79,9 @@ extension AsyncExpand on Stream { } controller.onCancel = () { if (subscriptions.isEmpty) return null; - return [for (var s in subscriptions) s.cancel()].wait.then((_) => null); + return [for (var s in subscriptions) s.cancel()] + .wait + .then(ignoreArgument); }; }; return controller.stream; diff --git a/lib/src/async_map.dart b/lib/src/async_map.dart index 81a176f..094df9c 100644 --- a/lib/src/async_map.dart +++ b/lib/src/async_map.dart @@ -5,6 +5,7 @@ import 'dart:async'; import 'aggregate_sample.dart'; +import 'common_callbacks.dart'; import 'from_handlers.dart'; import 'rate_limit.dart'; @@ -72,7 +73,7 @@ extension AsyncMap on Stream { trigger: workFinished.stream, aggregate: _dropPrevious, longPoll: true, - onEmpty: _ignore) + onEmpty: ignoreArgument) ._asyncMapThen(convert, workFinished.add); } @@ -133,4 +134,3 @@ extension AsyncMap on Stream { } T _dropPrevious(T event, _) => event; -void _ignore(Sink sink) {} diff --git a/lib/src/combine_latest.dart b/lib/src/combine_latest.dart index 24eca4a..f02a19e 100644 --- a/lib/src/combine_latest.dart +++ b/lib/src/combine_latest.dart @@ -4,6 +4,8 @@ import 'dart:async'; +import 'common_callbacks.dart'; + /// Utilities to combine events from multiple streams through a callback or into /// a list. extension CombineLatest on Stream { @@ -131,7 +133,7 @@ extension CombineLatest on Stream { ]; sourceSubscription = null; otherSubscription = null; - return cancels.wait.then((_) => null); + return cancels.wait.then(ignoreArgument); }; }; return controller.stream; @@ -228,7 +230,9 @@ extension CombineLatest on Stream { } controller.onCancel = () { if (subscriptions.isEmpty) return null; - return [for (var s in subscriptions) s.cancel()].wait.then((_) => null); + return [for (var s in subscriptions) s.cancel()] + .wait + .then(ignoreArgument); }; }; return controller.stream; diff --git a/lib/src/common_callbacks.dart b/lib/src/common_callbacks.dart new file mode 100644 index 0000000..c239220 --- /dev/null +++ b/lib/src/common_callbacks.dart @@ -0,0 +1,5 @@ +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +void ignoreArgument(_) {} diff --git a/lib/src/merge.dart b/lib/src/merge.dart index 9ffeb0a..3bfe06c 100644 --- a/lib/src/merge.dart +++ b/lib/src/merge.dart @@ -4,6 +4,8 @@ import 'dart:async'; +import 'common_callbacks.dart'; + /// Utilities to interleave events from multiple streams. extension Merge on Stream { /// Merges values and errors from this stream and [other] in any order as they @@ -90,7 +92,9 @@ extension Merge on Stream { } controller.onCancel = () { if (subscriptions.isEmpty) return null; - return [for (var s in subscriptions) s.cancel()].wait.then((_) => null); + return [for (var s in subscriptions) s.cancel()] + .wait + .then(ignoreArgument); }; }; return controller.stream; diff --git a/lib/src/rate_limit.dart b/lib/src/rate_limit.dart index 347919b..9be0be6 100644 --- a/lib/src/rate_limit.dart +++ b/lib/src/rate_limit.dart @@ -5,6 +5,7 @@ import 'dart:async'; import 'aggregate_sample.dart'; +import 'common_callbacks.dart'; import 'from_handlers.dart'; /// Utilities to rate limit events. @@ -305,7 +306,7 @@ extension RateLimit on Stream { trigger: trigger, aggregate: _dropPrevious, longPoll: longPoll, - onEmpty: _ignore); + onEmpty: ignoreArgument); /// Aggregates values until this source stream does not emit for [duration], /// then emits the aggregated values. @@ -353,4 +354,3 @@ extension RateLimit on Stream { T _dropPrevious(T element, _) => element; List _collect(T event, List? soFar) => (soFar ?? [])..add(event); void _empty(Sink> sink) => sink.add([]); -void _ignore(Sink sink) {} diff --git a/lib/src/switch.dart b/lib/src/switch.dart index 2ec3a54..546036e 100644 --- a/lib/src/switch.dart +++ b/lib/src/switch.dart @@ -5,6 +5,7 @@ import 'dart:async'; import 'async_expand.dart'; +import 'common_callbacks.dart'; /// A utility to take events from the most recent sub stream returned by a /// callback. @@ -126,12 +127,9 @@ extension SwitchLatest on Stream> { if (sub != null) sub.cancel(), ]; if (cancels.isEmpty) return null; - return cancels.wait.then(_ignore); + return cancels.wait.then(ignoreArgument); }; }; return controller.stream; } } - -/// Helper function to ignore future callback -void _ignore(_, [__]) {}