Skip to content

Commit

Permalink
Added quick implementation on top of Parallel.ForEachAsync, but left …
Browse files Browse the repository at this point in the history
…it disabled for now as benchmark results are mixed
  • Loading branch information
csrakowski committed Dec 30, 2023
1 parent 592d807 commit ab4e82e
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 1 deletion.
100 changes: 100 additions & 0 deletions src/CSRakowski.Parallel/ParallelAsync.Unordered.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,34 @@ private static async Task<IEnumerable<TResult>> ForEachAsyncImplUnordered<TResul
long runId = ParallelAsyncEventSource.Log.GetRunId();
ParallelAsyncEventSource.Log.RunStart(runId, batchSize, true, estimatedResultSize);


#if false && NET6_0_OR_GREATER

var concurrentResult = new System.Collections.Concurrent.ConcurrentBag<TResult>();

try
{
var options = new System.Threading.Tasks.ParallelOptions
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = batchSize
};

await System.Threading.Tasks.Parallel.ForEachAsync<TIn>(collection, options, async (i, ct) =>
{
var r = await func(i, ct).ConfigureAwait(false);
concurrentResult.Add(r);
}).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
// Expected
}

result.AddRange(concurrentResult);

#else

using (var enumerator = collection.GetEnumerator())
{
var hasNext = true;
Expand Down Expand Up @@ -81,6 +109,8 @@ private static async Task<IEnumerable<TResult>> ForEachAsyncImplUnordered<TResul
}
}

#endif

ParallelAsyncEventSource.Log.RunStop(runId);

return result;
Expand All @@ -100,6 +130,25 @@ private static async Task ForEachAsyncImplUnordered<TIn>(IEnumerable<TIn> collec
long runId = ParallelAsyncEventSource.Log.GetRunId();
ParallelAsyncEventSource.Log.RunStart(runId, batchSize, true, 0);

#if false && NET6_0_OR_GREATER

try
{
var options = new System.Threading.Tasks.ParallelOptions
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = batchSize
};

await System.Threading.Tasks.Parallel.ForEachAsync<TIn>(collection, options, (i, ct) => new ValueTask(func(i, ct))).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
// Expected
}

#else

using (var enumerator = collection.GetEnumerator())
{
var hasNext = true;
Expand Down Expand Up @@ -140,6 +189,8 @@ private static async Task ForEachAsyncImplUnordered<TIn>(IEnumerable<TIn> collec
}
}

#endif

ParallelAsyncEventSource.Log.RunStop(runId);
}

Expand All @@ -154,6 +205,32 @@ private static async Task<IEnumerable<TResult>> ForEachAsyncImplUnordered<TResul
long runId = ParallelAsyncEventSource.Log.GetRunId();
ParallelAsyncEventSource.Log.RunStart(runId, batchSize, true, estimatedResultSize);

#if false && NET6_0_OR_GREATER

var concurrentResult = new System.Collections.Concurrent.ConcurrentBag<TResult>();

try
{
var options = new System.Threading.Tasks.ParallelOptions
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = batchSize
};

await System.Threading.Tasks.Parallel.ForEachAsync<TIn>(collection, options, async (i, ct) =>
{
var r = await func(i, ct).ConfigureAwait(false);
concurrentResult.Add(r);
}).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
// Expected
}

result.AddRange(concurrentResult);
#else

var enumerator = collection.GetAsyncEnumerator(cancellationToken);
try
{
Expand Down Expand Up @@ -210,6 +287,8 @@ private static async Task<IEnumerable<TResult>> ForEachAsyncImplUnordered<TResul
await enumerator.DisposeAsync().ConfigureAwait(false);
}

#endif

ParallelAsyncEventSource.Log.RunStop(runId);

return result;
Expand All @@ -220,6 +299,25 @@ private static async Task ForEachAsyncImplUnordered<TIn>(IAsyncEnumerable<TIn> c
long runId = ParallelAsyncEventSource.Log.GetRunId();
ParallelAsyncEventSource.Log.RunStart(runId, batchSize, true, 0);

#if false && NET6_0_OR_GREATER

try
{
var options = new System.Threading.Tasks.ParallelOptions
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = batchSize
};

await System.Threading.Tasks.Parallel.ForEachAsync<TIn>(collection, options, (i, ct) => new ValueTask(func(i, ct))).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
// Expected
}

#else

var enumerator = collection.GetAsyncEnumerator(cancellationToken);
try
{
Expand Down Expand Up @@ -265,6 +363,8 @@ private static async Task ForEachAsyncImplUnordered<TIn>(IAsyncEnumerable<TIn> c
await enumerator.DisposeAsync().ConfigureAwait(false);
}

#endif

ParallelAsyncEventSource.Log.RunStop(runId);
}

Expand Down
5 changes: 4 additions & 1 deletion tests/CSRakowski.Parallel.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ public static class Program
{
public static void Main(string[] args)
{
var summary = BenchmarkRunner.Run<ParallelAsyncBenchmarks_AsyncStreams>();
#if NET6_0_OR_GREATER

var summary = BenchmarkRunner.Run<CompareWith_Parallel_ForEachAsync>();
#endif
}
}
}

0 comments on commit ab4e82e

Please sign in to comment.