From d6843d9c42872af05831dcea9911a7bf0ffdfa38 Mon Sep 17 00:00:00 2001 From: Adam Sitnik Date: Thu, 12 Dec 2024 20:22:36 +0100 Subject: [PATCH] More WriteGather fixes (#109826) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * don't run these tests in parallel, as each test cases uses more than 4 GB ram and disk! * fix the test: handle incomplete reads that should happen when we hit the max buffer limit * incomplete write fix: - pin the buffers only once - when re-trying, do that only for the actual reminder * Use native memory to get OOM a soon as we run out of memory (hoping to avoid the process getting killed on Linux when OOM happens) * For macOS preadv and pwritev can fail with EINVAL when the total length of all vectors overflows a 32-bit integer. * add an assert that is going to warn us if vector.Count is ever more than Int32.MaxValue --------- Co-authored-by: MichaƂ Petryka <35800402+MichalPetryka@users.noreply.github.com> # Conflicts: # src/libraries/System.IO.FileSystem/tests/RandomAccess/WriteGatherAsync.cs --- .../tests/RandomAccess/WriteGatherAsync.cs | 104 +++++++++++++----- .../src/System/IO/RandomAccess.Unix.cs | 57 +++++----- src/native/libs/System.Native/pal_io.c | 47 ++++++-- 3 files changed, 140 insertions(+), 68 deletions(-) diff --git a/src/libraries/System.IO.FileSystem/tests/RandomAccess/WriteGatherAsync.cs b/src/libraries/System.IO.FileSystem/tests/RandomAccess/WriteGatherAsync.cs index 1beb28088fbfa6..50752f52b5708e 100644 --- a/src/libraries/System.IO.FileSystem/tests/RandomAccess/WriteGatherAsync.cs +++ b/src/libraries/System.IO.FileSystem/tests/RandomAccess/WriteGatherAsync.cs @@ -1,6 +1,7 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Buffers; using System.Collections.Generic; using System.Linq; using System.Security.Cryptography; @@ -13,6 +14,7 @@ namespace System.IO.Tests { [SkipOnPlatform(TestPlatforms.Browser, "async file IO is not supported on browser")] + [Collection(nameof(DisableParallelization))] // don't run in parallel, as some of these tests use a LOT of resources public class RandomAccess_WriteGatherAsync : RandomAccess_Base { protected override ValueTask MethodUnderTest(SafeFileHandle handle, byte[] bytes, long fileOffset) @@ -151,21 +153,6 @@ public async Task NoInt32OverflowForLargeInputs(bool asyncFile, bool asyncMethod const int BufferSize = int.MaxValue / 1000; const long FileSize = (long)BufferCount * BufferSize; string filePath = GetTestFilePath(); - ReadOnlyMemory writeBuffer = RandomNumberGenerator.GetBytes(BufferSize); - List> writeBuffers = Enumerable.Repeat(writeBuffer, BufferCount).ToList(); - List> readBuffers = new List>(BufferCount); - - try - { - for (int i = 0; i < BufferCount; i++) - { - readBuffers.Add(new byte[BufferSize]); - } - } - catch (OutOfMemoryException) - { - throw new SkipTestException("Not enough memory."); - } FileOptions options = asyncFile ? FileOptions.Asynchronous : FileOptions.None; // we need to test both code paths options |= FileOptions.DeleteOnClose; @@ -180,29 +167,86 @@ public async Task NoInt32OverflowForLargeInputs(bool asyncFile, bool asyncMethod throw new SkipTestException("Not enough disk space."); } - long fileOffset = 0, bytesRead = 0; - try + using (sfh) { - if (asyncMethod) + ReadOnlyMemory writeBuffer = RandomNumberGenerator.GetBytes(BufferSize); + List> writeBuffers = Enumerable.Repeat(writeBuffer, BufferCount).ToList(); + + List memoryManagers = new List(BufferCount); + List> readBuffers = new List>(BufferCount); + + try { - await RandomAccess.WriteAsync(sfh, writeBuffers, fileOffset); - bytesRead = await RandomAccess.ReadAsync(sfh, readBuffers, fileOffset); + try + { + for (int i = 0; i < BufferCount; i++) + { + // We are using native memory here to get OOM as soon as possible. + NativeMemoryManager nativeMemoryManager = new(BufferSize); + memoryManagers.Add(nativeMemoryManager); + readBuffers.Add(nativeMemoryManager.Memory); + } + } + catch (OutOfMemoryException) + { + throw new SkipTestException("Not enough memory."); + } + + await Verify(asyncMethod, FileSize, sfh, writeBuffer, writeBuffers, readBuffers); } - else + finally { - RandomAccess.Write(sfh, writeBuffers, fileOffset); - bytesRead = RandomAccess.Read(sfh, readBuffers, fileOffset); + foreach (IDisposable memoryManager in memoryManagers) + { + memoryManager.Dispose(); + } } } - finally - { - sfh.Dispose(); // delete the file ASAP to avoid running out of resources in CI - } - Assert.Equal(FileSize, bytesRead); - for (int i = 0; i < BufferCount; i++) + static async Task Verify(bool asyncMethod, long FileSize, SafeFileHandle sfh, ReadOnlyMemory writeBuffer, List> writeBuffers, List> readBuffers) { - AssertExtensions.SequenceEqual(writeBuffer.Span, readBuffers[i].Span); + if (asyncMethod) + { + await RandomAccess.WriteAsync(sfh, writeBuffers, 0); + } + else + { + RandomAccess.Write(sfh, writeBuffers, 0); + } + + Assert.Equal(FileSize, RandomAccess.GetLength(sfh)); + + long fileOffset = 0; + while (fileOffset < FileSize) + { + long bytesRead = asyncMethod + ? await RandomAccess.ReadAsync(sfh, readBuffers, fileOffset) + : RandomAccess.Read(sfh, readBuffers, fileOffset); + + Assert.InRange(bytesRead, 0, FileSize); + + while (bytesRead > 0) + { + Memory readBuffer = readBuffers[0]; + if (bytesRead >= readBuffer.Length) + { + AssertExtensions.SequenceEqual(writeBuffer.Span, readBuffer.Span); + + bytesRead -= readBuffer.Length; + fileOffset += readBuffer.Length; + + readBuffers.RemoveAt(0); + } + else + { + // A read has finished somewhere in the middle of one of the read buffers. + // Example: buffer had 30 bytes and only 10 were read. + // We don't read the missing part, but try to read the whole buffer again. + // It's not optimal from performance perspective, but it keeps the test logic simple. + break; + } + } + } } } diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Unix.cs b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Unix.cs index bcddd3f4654fc1..147e3815812d0c 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Unix.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Unix.cs @@ -168,38 +168,30 @@ internal static unsafe void WriteGatherAtOffset(SafeFileHandle handle, IReadOnly var handles = new MemoryHandle[buffersCount]; Span vectors = buffersCount <= IovStackThreshold ? - stackalloc Interop.Sys.IOVector[IovStackThreshold] : + stackalloc Interop.Sys.IOVector[IovStackThreshold].Slice(0, buffersCount) : new Interop.Sys.IOVector[buffersCount]; try { - int buffersOffset = 0, firstBufferOffset = 0; - while (true) + long totalBytesToWrite = 0; + for (int i = 0; i < buffersCount; i++) { - long totalBytesToWrite = 0; - - for (int i = buffersOffset; i < buffersCount; i++) - { - ReadOnlyMemory buffer = buffers[i]; - totalBytesToWrite += buffer.Length; - - MemoryHandle memoryHandle = buffer.Pin(); - vectors[i] = new Interop.Sys.IOVector { Base = firstBufferOffset + (byte*)memoryHandle.Pointer, Count = (UIntPtr)(buffer.Length - firstBufferOffset) }; - handles[i] = memoryHandle; - - firstBufferOffset = 0; - } + ReadOnlyMemory buffer = buffers[i]; + totalBytesToWrite += buffer.Length; - if (totalBytesToWrite == 0) - { - break; - } + MemoryHandle memoryHandle = buffer.Pin(); + vectors[i] = new Interop.Sys.IOVector { Base = (byte*)memoryHandle.Pointer, Count = (UIntPtr)buffer.Length }; + handles[i] = memoryHandle; + } + int buffersOffset = 0; + while (totalBytesToWrite > 0) + { long bytesWritten; Span left = vectors.Slice(buffersOffset); fixed (Interop.Sys.IOVector* pinnedVectors = &MemoryMarshal.GetReference(left)) { - bytesWritten = Interop.Sys.PWriteV(handle, pinnedVectors, buffersCount - buffersOffset, fileOffset); + bytesWritten = Interop.Sys.PWriteV(handle, pinnedVectors, left.Length, fileOffset); } FileStreamHelpers.CheckFileCall(bytesWritten, handle.Path); @@ -211,22 +203,29 @@ internal static unsafe void WriteGatherAtOffset(SafeFileHandle handle, IReadOnly // The write completed successfully but for fewer bytes than requested. // We need to perform next write where the previous one has finished. fileOffset += bytesWritten; + totalBytesToWrite -= bytesWritten; // We need to try again for the remainder. - for (int i = 0; i < buffersCount; i++) + while (buffersOffset < buffersCount && bytesWritten > 0) { - int n = buffers[i].Length; + int n = (int)vectors[buffersOffset].Count; if (n <= bytesWritten) { - buffersOffset++; bytesWritten -= n; - if (bytesWritten == 0) - { - break; - } + buffersOffset++; } else { - firstBufferOffset = (int)(bytesWritten - n); + // A partial read: the vector needs to point to the new offset. + // But that offset needs to be relative to the previous attempt. + // Example: we have a single buffer with 30 bytes and the first read returned 10. + // The next read should try to read the remaining 20 bytes, but in case it also reads just 10, + // the third attempt should read last 10 bytes (not 20 again). + Interop.Sys.IOVector current = vectors[buffersOffset]; + vectors[buffersOffset] = new Interop.Sys.IOVector + { + Base = current.Base + (int)(bytesWritten), + Count = current.Count - (UIntPtr)(bytesWritten) + }; break; } } diff --git a/src/native/libs/System.Native/pal_io.c b/src/native/libs/System.Native/pal_io.c index 36e1510dcab8fa..35858794dff1af 100644 --- a/src/native/libs/System.Native/pal_io.c +++ b/src/native/libs/System.Native/pal_io.c @@ -1856,19 +1856,48 @@ int32_t SystemNative_PWrite(intptr_t fd, void* buffer, int32_t bufferSize, int64 } #if (HAVE_PREADV || HAVE_PWRITEV) && !defined(TARGET_WASM) -static int GetAllowedVectorCount(int32_t vectorCount) +static int GetAllowedVectorCount(IOVector* vectors, int32_t vectorCount) { +#if defined(IOV_MAX) + const int IovMax = IOV_MAX; +#else + // In theory all the platforms that we support define IOV_MAX, + // but we want to be extra safe and provde a fallback + // in case it turns out to not be true. + // 16 is low, but supported on every platform. + const int IovMax = 16; +#endif + int allowedCount = (int)vectorCount; -#if defined(IOV_MAX) - if (IOV_MAX < allowedCount) + // We need to respect the limit of items that can be passed in iov. + // In case of writes, the managed code is responsible for handling incomplete writes. + // In case of reads, we simply returns the number of bytes read and it's up to the users. + if (IovMax < allowedCount) { - // We need to respect the limit of items that can be passed in iov. - // In case of writes, the managed code is reponsible for handling incomplete writes. - // In case of reads, we simply returns the number of bytes read and it's up to the users. - allowedCount = IOV_MAX; + allowedCount = IovMax; } + +#if defined(TARGET_APPLE) + // For macOS preadv and pwritev can fail with EINVAL when the total length + // of all vectors overflows a 32-bit integer. + size_t totalLength = 0; + for (int i = 0; i < allowedCount; i++) + { + assert(INT_MAX >= vectors[i].Count); + + totalLength += vectors[i].Count; + + if (totalLength > INT_MAX) + { + allowedCount = i; + break; + } + } +#else + (void)vectors; #endif + return allowedCount; } #endif // (HAVE_PREADV || HAVE_PWRITEV) && !defined(TARGET_WASM) @@ -1881,7 +1910,7 @@ int64_t SystemNative_PReadV(intptr_t fd, IOVector* vectors, int32_t vectorCount, int64_t count = 0; int fileDescriptor = ToFileDescriptor(fd); #if HAVE_PREADV && !defined(TARGET_WASM) // preadv is buggy on WASM - int allowedVectorCount = GetAllowedVectorCount(vectorCount); + int allowedVectorCount = GetAllowedVectorCount(vectors, vectorCount); while ((count = preadv(fileDescriptor, (struct iovec*)vectors, allowedVectorCount, (off_t)fileOffset)) < 0 && errno == EINTR); #else int64_t current; @@ -1922,7 +1951,7 @@ int64_t SystemNative_PWriteV(intptr_t fd, IOVector* vectors, int32_t vectorCount int64_t count = 0; int fileDescriptor = ToFileDescriptor(fd); #if HAVE_PWRITEV && !defined(TARGET_WASM) // pwritev is buggy on WASM - int allowedVectorCount = GetAllowedVectorCount(vectorCount); + int allowedVectorCount = GetAllowedVectorCount(vectors, vectorCount); while ((count = pwritev(fileDescriptor, (struct iovec*)vectors, allowedVectorCount, (off_t)fileOffset)) < 0 && errno == EINTR); #else int64_t current;