Skip to content

Commit

Permalink
Revert "Switch Json's PooledByteBufferWriter to shared ArrayBuffer he…
Browse files Browse the repository at this point in the history
…lper (dotnet#111348)"

This reverts commit d454419.
  • Loading branch information
MihaZupan committed Jan 21, 2025
1 parent 4365cd2 commit 599dbd3
Show file tree
Hide file tree
Showing 25 changed files with 361 additions and 58 deletions.
208 changes: 182 additions & 26 deletions src/libraries/Common/src/System/Text/Json/PooledByteBufferWriter.cs
Original file line number Diff line number Diff line change
@@ -1,93 +1,249 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.IO.Pipelines;
using System.Net;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace System.Text.Json
{
internal sealed class PooledByteBufferWriter : PipeWriter, IDisposable
{
// This class allows two possible configurations: if rentedBuffer is not null then
// it can be used as an IBufferWriter and holds a buffer that should eventually be
// returned to the shared pool. If rentedBuffer is null, then the instance is in a
// cleared/disposed state and it must re-rent a buffer before it can be used again.
private byte[]? _rentedBuffer;
private int _index;
private readonly Stream? _stream;

private const int MinimumBufferSize = 256;

private ArrayBuffer _buffer;
private readonly Stream? _stream;
// Value copied from Array.MaxLength in System.Private.CoreLib/src/libraries/System.Private.CoreLib/src/System/Array.cs.
public const int MaximumBufferSize = 0X7FFFFFC7;

private PooledByteBufferWriter()
{
#if NET
// Ensure we are in sync with the Array.MaxLength implementation.
Debug.Assert(MaximumBufferSize == Array.MaxLength);
#endif
}

public PooledByteBufferWriter(int initialCapacity)
public PooledByteBufferWriter(int initialCapacity) : this()
{
_buffer = new ArrayBuffer(initialCapacity, usePool: true);
Debug.Assert(initialCapacity > 0);

_rentedBuffer = ArrayPool<byte>.Shared.Rent(initialCapacity);
_index = 0;
}

public PooledByteBufferWriter(int initialCapacity, Stream stream) : this(initialCapacity)
{
_stream = stream;
}

public ReadOnlySpan<byte> WrittenSpan => _buffer.ActiveSpan;
public ReadOnlyMemory<byte> WrittenMemory
{
get
{
Debug.Assert(_rentedBuffer != null);
Debug.Assert(_index <= _rentedBuffer.Length);
return _rentedBuffer.AsMemory(0, _index);
}
}

public ReadOnlyMemory<byte> WrittenMemory => _buffer.ActiveMemory;
public int WrittenCount
{
get
{
Debug.Assert(_rentedBuffer != null);
return _index;
}
}

public int Capacity => _buffer.Capacity;
public int Capacity
{
get
{
Debug.Assert(_rentedBuffer != null);
return _rentedBuffer.Length;
}
}

public void Clear() => _buffer.Discard(_buffer.ActiveLength);
public int FreeCapacity
{
get
{
Debug.Assert(_rentedBuffer != null);
return _rentedBuffer.Length - _index;
}
}

public void Clear()
{
ClearHelper();
}

public void ClearAndReturnBuffers() => _buffer.ClearAndReturnBuffer();
public void ClearAndReturnBuffers()
{
Debug.Assert(_rentedBuffer != null);

public void Dispose() => _buffer.Dispose();
ClearHelper();
byte[] toReturn = _rentedBuffer;
_rentedBuffer = null;
ArrayPool<byte>.Shared.Return(toReturn);
}

private void ClearHelper()
{
Debug.Assert(_rentedBuffer != null);
Debug.Assert(_index <= _rentedBuffer.Length);

_rentedBuffer.AsSpan(0, _index).Clear();
_index = 0;
}

// Returns the rented buffer back to the pool
public void Dispose()
{
if (_rentedBuffer == null)
{
return;
}

ClearHelper();
byte[] toReturn = _rentedBuffer;
_rentedBuffer = null;
ArrayPool<byte>.Shared.Return(toReturn);
}

public void InitializeEmptyInstance(int initialCapacity)
{
Debug.Assert(initialCapacity > 0);
Debug.Assert(_buffer.ActiveLength == 0);
Debug.Assert(_rentedBuffer is null);

_buffer.EnsureAvailableSpace(initialCapacity);
_rentedBuffer = ArrayPool<byte>.Shared.Rent(initialCapacity);
_index = 0;
}

public static PooledByteBufferWriter CreateEmptyInstanceForCaching() => new PooledByteBufferWriter(initialCapacity: 0);
public static PooledByteBufferWriter CreateEmptyInstanceForCaching() => new PooledByteBufferWriter();

public override void Advance(int count) => _buffer.Commit(count);
public override void Advance(int count)
{
Debug.Assert(_rentedBuffer != null);
Debug.Assert(count >= 0);
Debug.Assert(_index <= _rentedBuffer.Length - count);
_index += count;
}

public override Memory<byte> GetMemory(int sizeHint = MinimumBufferSize)
{
Debug.Assert(sizeHint > 0);

_buffer.EnsureAvailableSpace(sizeHint);
return _buffer.AvailableMemory;
CheckAndResizeBuffer(sizeHint);
return _rentedBuffer.AsMemory(_index);
}

public override Span<byte> GetSpan(int sizeHint = MinimumBufferSize)
{
Debug.Assert(sizeHint > 0);

_buffer.EnsureAvailableSpace(sizeHint);
return _buffer.AvailableSpan;
CheckAndResizeBuffer(sizeHint);
return _rentedBuffer.AsSpan(_index);
}

#if NET
internal void WriteToStream(Stream destination) => destination.Write(_buffer.ActiveSpan);
internal void WriteToStream(Stream destination)
{
destination.Write(WrittenMemory.Span);
}
#else
internal void WriteToStream(Stream destination) => destination.Write(_buffer.ActiveMemory);
internal void WriteToStream(Stream destination)
{
Debug.Assert(_rentedBuffer != null);
destination.Write(_rentedBuffer, 0, _index);
}
#endif

private void CheckAndResizeBuffer(int sizeHint)
{
Debug.Assert(_rentedBuffer != null);
Debug.Assert(sizeHint > 0);

int currentLength = _rentedBuffer.Length;
int availableSpace = currentLength - _index;

// If we've reached ~1GB written, grow to the maximum buffer
// length to avoid incessant minimal growths causing perf issues.
if (_index >= MaximumBufferSize / 2)
{
sizeHint = Math.Max(sizeHint, MaximumBufferSize - currentLength);
}

if (sizeHint > availableSpace)
{
int growBy = Math.Max(sizeHint, currentLength);

int newSize = currentLength + growBy;

if ((uint)newSize > MaximumBufferSize)
{
newSize = currentLength + sizeHint;
if ((uint)newSize > MaximumBufferSize)
{
ThrowHelper.ThrowOutOfMemoryException_BufferMaximumSizeExceeded((uint)newSize);
}
}

byte[] oldBuffer = _rentedBuffer;

_rentedBuffer = ArrayPool<byte>.Shared.Rent(newSize);

Debug.Assert(oldBuffer.Length >= _index);
Debug.Assert(_rentedBuffer.Length >= _index);

Span<byte> oldBufferAsSpan = oldBuffer.AsSpan(0, _index);
oldBufferAsSpan.CopyTo(_rentedBuffer);
oldBufferAsSpan.Clear();
ArrayPool<byte>.Shared.Return(oldBuffer);
}

Debug.Assert(_rentedBuffer.Length - _index > 0);
Debug.Assert(_rentedBuffer.Length - _index >= sizeHint);
}

public override async ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
{
Debug.Assert(_stream is not null);
#if NET
await _stream.WriteAsync(WrittenMemory, cancellationToken).ConfigureAwait(false);
#else
Debug.Assert(_rentedBuffer != null);
await _stream.WriteAsync(_rentedBuffer, 0, _index, cancellationToken).ConfigureAwait(false);
#endif
Clear();

return new FlushResult(isCanceled: false, isCompleted: false);
}

public override bool CanGetUnflushedBytes => true;
public override long UnflushedBytes => _buffer.ActiveLength;
public override long UnflushedBytes => _index;

// This type is used internally in JsonSerializer to help buffer and flush bytes to the underlying Stream.
// It's only pretending to be a PipeWriter and doesn't need Complete or CancelPendingFlush for the internal usage.
public override void CancelPendingFlush() => throw new NotImplementedException();
public override void Complete(Exception? exception = null) => throw new NotImplementedException();
}

internal static partial class ThrowHelper
{
[DoesNotReturn]
[MethodImpl(MethodImplOptions.NoInlining)]
public static void ThrowOutOfMemoryException_BufferMaximumSizeExceeded(uint capacity)
{
throw new OutOfMemoryException(SR.Format(SR.BufferMaximumSizeExceeded, capacity));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

/// <summary>Extensions to enable the tests to use the span-based Read/Write methods that only exist in netcoreapp.</summary>
internal static class StreamSpanExtensions
{
// These implementations are inefficient and are just for testing purposes.

public static int Read(this Stream stream, Span<byte> destination)
{
byte[] array = new byte[destination.Length];
int bytesRead = stream.Read(array, 0, array.Length);
new Span<byte>(array, 0, bytesRead).CopyTo(destination);
return bytesRead;
}

public static void Write(this Stream stream, ReadOnlySpan<byte> source) =>
stream.Write(source.ToArray(), 0, source.Length);

public static ValueTask<int> ReadAsync(this Stream stream, Memory<byte> destination, CancellationToken cancellationToken = default(CancellationToken))
{
byte[] array = new byte[destination.Length];
return new ValueTask<int>(stream.ReadAsync(array, 0, array.Length, cancellationToken).ContinueWith(t =>
{
int bytesRead = t.GetAwaiter().GetResult();
new Span<byte>(array, 0, bytesRead).CopyTo(destination.Span);
return bytesRead;
}));
}

public static Task WriteAsync(this Stream stream, ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken)) =>
stream.WriteAsync(source.ToArray(), 0, source.Length, cancellationToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Generic;
using System.IO;
using System.Security.Principal;
using System.Threading.Tasks;

Expand Down
35 changes: 35 additions & 0 deletions src/libraries/Common/tests/System/Net/StreamArrayExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace System.Net
{
public static class StreamArrayExtensions
{
public static ValueTask WriteAsync(this Stream stream, ReadOnlyMemory<byte> memory)
{
bool isArray = MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment);
Assert.True(isArray);

return new ValueTask(stream.WriteAsync(segment.Array, segment.Offset, segment.Count));
}

public static ValueTask WriteAsync(this StreamWriter writer, string text)
{
return new ValueTask(writer.WriteAsync(text.ToCharArray(), 0, text.Length));
}

public static ValueTask<int> ReadAsync(this Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
bool isArray = MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment);
Assert.True(isArray);

return new ValueTask<int>(stream.ReadAsync(segment.Array, segment.Offset, segment.Count, cancellationToken));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private void WriteInternal(IExternalScopeProvider? scopeProvider, TextWriter tex
writer.Flush();
}

var messageBytes = output.WrittenSpan;
var messageBytes = output.WrittenMemory.Span;
var logMessageBuffer = ArrayPool<char>.Shared.Rent(Encoding.UTF8.GetMaxCharCount(messageBytes.Length));
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
<ItemGroup>
<Compile Include="$(CommonPath)Extensions\Logging\NullExternalScopeProvider.cs" Link="Common\src\Extensions\Logging\NullExternalScopeProvider.cs" />
<Compile Include="$(CommonPath)Extensions\Logging\NullScope.cs" Link="Common\src\Extensions\Logging\NullScope.cs" />
<Compile Include="$(CommonPath)System\Net\ArrayBuffer.cs" Link="Common\System\Net\ArrayBuffer.cs" />
<Compile Include="$(CommonPath)System\Text\Json\PooledByteBufferWriter.cs" Link="Common\System\Text\Json\PooledByteBufferWriter.cs" />
<Compile Include="$(CommonPath)System\ThrowHelper.cs" Link="Common\System\ThrowHelper.cs" />
</ItemGroup>
Expand All @@ -30,7 +29,6 @@
</ItemGroup>

<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
<Compile Include="$(CommonPath)System\IO\StreamExtensions.netstandard.cs" Link="Common\System\IO\StreamExtensions.netstandard.cs" />
<Compile Include="$(CoreLibSharedDir)System\Diagnostics\CodeAnalysis\DynamicallyAccessedMembersAttribute.cs" />
<Compile Include="$(CoreLibSharedDir)System\Diagnostics\CodeAnalysis\DynamicallyAccessedMemberTypes.cs" />
<Compile Include="$(CoreLibSharedDir)System\Diagnostics\CodeAnalysis\DynamicDependencyAttribute.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@
<resheader name="writer">
<value>System.Resources.ResXResourceWriter, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089</value>
</resheader>
<data name="BufferMaximumSizeExceeded" xml:space="preserve">
<value>Cannot allocate a buffer of size {0}.</value>
</data>
<data name="QueueModeNotSupported" xml:space="preserve">
<value>{0} is not a supported queue mode value.</value>
</data>
Expand Down
Loading

0 comments on commit 599dbd3

Please sign in to comment.