From ab4e82e67f001c73e4ce1ede075416a4c303ab8b Mon Sep 17 00:00:00 2001 From: Christiaan Rakowski Date: Sun, 21 Aug 2022 13:22:14 +0200 Subject: [PATCH] Added quick implementation on top of Parallel.ForEachAsync, but left it disabled for now as benchmark results are mixed --- .../ParallelAsync.Unordered.cs | 100 ++++++++++++++++++ .../CSRakowski.Parallel.Benchmarks/Program.cs | 5 +- 2 files changed, 104 insertions(+), 1 deletion(-) diff --git a/src/CSRakowski.Parallel/ParallelAsync.Unordered.cs b/src/CSRakowski.Parallel/ParallelAsync.Unordered.cs index 626ef35..3eb9259 100644 --- a/src/CSRakowski.Parallel/ParallelAsync.Unordered.cs +++ b/src/CSRakowski.Parallel/ParallelAsync.Unordered.cs @@ -30,6 +30,34 @@ private static async Task> ForEachAsyncImplUnordered(); + + try + { + var options = new System.Threading.Tasks.ParallelOptions + { + CancellationToken = cancellationToken, + MaxDegreeOfParallelism = batchSize + }; + + await System.Threading.Tasks.Parallel.ForEachAsync(collection, options, async (i, ct) => + { + var r = await func(i, ct).ConfigureAwait(false); + concurrentResult.Add(r); + }).ConfigureAwait(false); + } + catch (TaskCanceledException) + { + // Expected + } + + result.AddRange(concurrentResult); + +#else + using (var enumerator = collection.GetEnumerator()) { var hasNext = true; @@ -81,6 +109,8 @@ private static async Task> ForEachAsyncImplUnordered(IEnumerable collec long runId = ParallelAsyncEventSource.Log.GetRunId(); ParallelAsyncEventSource.Log.RunStart(runId, batchSize, true, 0); +#if false && NET6_0_OR_GREATER + + try + { + var options = new System.Threading.Tasks.ParallelOptions + { + CancellationToken = cancellationToken, + MaxDegreeOfParallelism = batchSize + }; + + await System.Threading.Tasks.Parallel.ForEachAsync(collection, options, (i, ct) => new ValueTask(func(i, ct))).ConfigureAwait(false); + } + catch (TaskCanceledException) + { + // Expected + } + +#else + using (var enumerator = collection.GetEnumerator()) { var hasNext = true; @@ -140,6 +189,8 @@ private static async Task ForEachAsyncImplUnordered(IEnumerable collec } } +#endif + ParallelAsyncEventSource.Log.RunStop(runId); } @@ -154,6 +205,32 @@ private static async Task> ForEachAsyncImplUnordered(); + + try + { + var options = new System.Threading.Tasks.ParallelOptions + { + CancellationToken = cancellationToken, + MaxDegreeOfParallelism = batchSize + }; + + await System.Threading.Tasks.Parallel.ForEachAsync(collection, options, async (i, ct) => + { + var r = await func(i, ct).ConfigureAwait(false); + concurrentResult.Add(r); + }).ConfigureAwait(false); + } + catch (TaskCanceledException) + { + // Expected + } + + result.AddRange(concurrentResult); +#else + var enumerator = collection.GetAsyncEnumerator(cancellationToken); try { @@ -210,6 +287,8 @@ private static async Task> ForEachAsyncImplUnordered(IAsyncEnumerable c long runId = ParallelAsyncEventSource.Log.GetRunId(); ParallelAsyncEventSource.Log.RunStart(runId, batchSize, true, 0); +#if false && NET6_0_OR_GREATER + + try + { + var options = new System.Threading.Tasks.ParallelOptions + { + CancellationToken = cancellationToken, + MaxDegreeOfParallelism = batchSize + }; + + await System.Threading.Tasks.Parallel.ForEachAsync(collection, options, (i, ct) => new ValueTask(func(i, ct))).ConfigureAwait(false); + } + catch (TaskCanceledException) + { + // Expected + } + +#else + var enumerator = collection.GetAsyncEnumerator(cancellationToken); try { @@ -265,6 +363,8 @@ private static async Task ForEachAsyncImplUnordered(IAsyncEnumerable c await enumerator.DisposeAsync().ConfigureAwait(false); } +#endif + ParallelAsyncEventSource.Log.RunStop(runId); } diff --git a/tests/CSRakowski.Parallel.Benchmarks/Program.cs b/tests/CSRakowski.Parallel.Benchmarks/Program.cs index cbed67b..2322c41 100644 --- a/tests/CSRakowski.Parallel.Benchmarks/Program.cs +++ b/tests/CSRakowski.Parallel.Benchmarks/Program.cs @@ -16,7 +16,10 @@ public static class Program { public static void Main(string[] args) { - var summary = BenchmarkRunner.Run(); +#if NET6_0_OR_GREATER + + var summary = BenchmarkRunner.Run(); +#endif } } }