Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External sort failing with modest memory limit #15028

Open
ivankelly opened this issue Mar 5, 2025 · 8 comments
Open

External sort failing with modest memory limit #15028

ivankelly opened this issue Mar 5, 2025 · 8 comments
Labels
bug Something isn't working

Comments

@ivankelly
Copy link

Describe the bug

As discussed on discord, here's another external sort usecase that's failing.

Repro:
https://github.com/ivankelly/df-repro

To run:

$ bash setup.sh # download the source data
$ RUST_LOG=trace cargo run
...
Error: Resources exhausted: Failed to allocate additional 1450451 bytes for ParquetSink(ArrowColumnWriter) with 62770337 bytes already allocated for this reservation - 1107184 bytes remain available for the total pool

The code reads in a bunch of parquet files (889MB in total) and tries to sort and output to a single parquet file.
Memory is limited to 100MB.
Different batch sizes and target partitions doesn't help.

To Reproduce

No response

Expected behavior

No response

Additional context

No response

@Kontinuation
Copy link
Member

Kontinuation commented Mar 6, 2025

I have tried the repro. With batch_size = 100 configured, this is more like a problem of Parquet writer, and not strongly related to sorting.

I made small tweaks to the repro code to expose the status of memory consumers and the backtrace, the failure I got was:

Error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
  ParquetSink(ArrowColumnWriter) consumed 65911731 bytes (62.8 MB),
  ExternalSorter[0] consumed 23587552 bytes (22.4 MB),
  ExternalSorterMerge[0] consumed 14261080 bytes (13.6 MB),
  ParquetSink(SerializedFileWriter) consumed 0 bytes.
Error: Failed to allocate additional 1450451 bytes for ParquetSink(ArrowColumnWriter) with 62770337 bytes already allocated for this reservation - 1097237 bytes remain available for the total pool

I've slightly reformatted the error message to make it more readable. The backtrace is:

backtrace:    0: std::backtrace_rs::backtrace::libunwind::trace
             at /rustc/30f168ef811aec63124eac677e14699baa9395bd/library/std/src/../../backtrace/src/backtrace/libunwind.rs:117:9
   1: std::backtrace_rs::backtrace::trace_unsynchronized
             at /rustc/30f168ef811aec63124eac677e14699baa9395bd/library/std/src/../../backtrace/src/backtrace/mod.rs:66:14
   2: std::backtrace::Backtrace::create
             at /rustc/30f168ef811aec63124eac677e14699baa9395bd/library/std/src/backtrace.rs:331:13
   3: datafusion_common::error::DataFusionError::get_back_trace
             at /Users/bopeng/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/datafusion-common-45.0.0/src/error.rs:410:30
   4: datafusion_execution::memory_pool::pool::insufficient_capacity_err
             at /Users/bopeng/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/datafusion-execution-45.0.0/src/memory_pool/pool.rs:249:5
   5: <datafusion_execution::memory_pool::pool::FairSpillPool as datafusion_execution::memory_pool::MemoryPool>::try_grow
             at /Users/bopeng/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/datafusion-execution-45.0.0/src/memory_pool/pool.rs:220:32
   6: <datafusion_execution::memory_pool::pool::TrackConsumersPool<I> as datafusion_execution::memory_pool::MemoryPool>::try_grow
             at /Users/bopeng/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/datafusion-execution-45.0.0/src/memory_pool/pool.rs:362:9
   7: datafusion_execution::memory_pool::MemoryReservation::try_grow
             at /Users/bopeng/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/datafusion-execution-45.0.0/src/memory_pool/mod.rs:298:9
   8: datafusion_execution::memory_pool::MemoryReservation::try_resize
             at /Users/bopeng/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/datafusion-execution-45.0.0/src/memory_pool/mod.rs:281:34
   9: datafusion::datasource::file_format::parquet::column_serializer_task::{{closure}}
             at /Users/bopeng/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/datafusion-45.0.0/src/datasource/file_format/parquet.rs:900:9
  10: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /Users/bopeng/.rustup/toolchains/nightly-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/future/future.rs:124:9
  11: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
             at /Users/bopeng/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.43.0/src/runtime/task/core.rs:331:17
  12: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
             at /Users/bopeng/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.43.0/src/loom/std/unsafe_cell.rs:16:9
  13: tokio::runtime::task::core::Core<T,S>::poll
             at /Users/bopeng/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.43.0/src/runtime/task/core.rs:320:13
  14: tokio::runtime::task::harness::poll_future::{{closure}}
             at /Users/bopeng/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/tokio-1.43.0/src/runtime/task/harness.rs:532:19
  15: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /Users/bopeng/.rustup/toolchains/nightly-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/panic/unwind_safe.rs:272:9
  16: std::panicking::try::do_call
             at /Users/bopeng/.rustup/toolchains/nightly-aarch64-apple-darwin/lib/rustlib/src/rust/library/std/src/panicking.rs:587:40
  17: ___rust_try
...

Parquet writer consumed most of the memory and triggered the allocation failure. The memory reserved is too small for Parquet writer to hold row groups in memory before flushing out to disk.

This issue can also be reproduced without sorting. I tried replacing let sorted = df.sort(...) with let sorted = df. The error message is:

Error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
  ParquetSink(ArrowColumnWriter) consumed 104209999 bytes (99.4 MB),
  ParquetSink(SerializedFileWriter) consumed 0 bytes.
Error: Failed to allocate additional 1253843 bytes for ParquetSink(ArrowColumnWriter) with 99576954 bytes already allocated for this reservation - 647601 bytes remain available for the total pool

I've tried setting a smaller max_row_group_size to reduce the amount of memory required by ParquetSink, then the query finished successfully:

table_opts.global.max_row_group_size = 1000;

@alamb
Copy link
Contributor

alamb commented Mar 6, 2025

Parquet writer consumed most of the memory and triggered the allocation failure. The memory reserved is too small for Parquet writer to hold row groups in memory before flushing out to disk.

This is a great find / analysis -- thank you @ivankelly and @Kontinuation

We actually found something like this upstream in InfluxDB IOx (kudos to @wiedld) and have a solution

The Parquet writer can report its memory usage via ArrowWriter::memory_size

So what we did was to add code that reported that usage to the memory pool here

https://github.com/influxdata/influxdb3_core/blob/26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f/parquet_file/src/writer.rs#L28-L99

/// Wraps an [`ArrowWriter`] to track its buffered memory in a
/// DataFusion [`MemoryPool`]
///
/// If the memory used by the `ArrowWriter` exceeds the memory allowed
/// by the `MemoryPool`, subsequent writes will fail.
///
/// Note no attempt is made to cap the memory used by the
/// `ArrowWriter` (for example by flushing earlier), which might be a
/// useful exercise.
#[derive(Debug)]
pub struct TrackedMemoryArrowWriter<W: Write + Send> {
    /// The inner ArrowWriter
    inner: ArrowWriter<W>,
    /// DataFusion memory reservation with
    reservation: MemoryReservation,
}

impl<W: Write + Send> TrackedMemoryArrowWriter<W> {
    /// create a new `TrackedMemoryArrowWriter<`
    pub fn try_new(
        sink: W,
        schema: SchemaRef,
        props: WriterProperties,
        pool: Arc<dyn MemoryPool>,
    ) -> Result<Self> {
        let inner = ArrowWriter::try_new(sink, schema, Some(props))?;
        let consumer = MemoryConsumer::new("IOx ParquetWriter (TrackedMemoryArrowWriter)");
        let reservation = consumer.register(&pool);

        Ok(Self { inner, reservation })
    }

    /// Push a `RecordBatch` into the underlying writer, updating the
    /// tracked allocation
    pub fn write(&mut self, batch: RecordBatch) -> Result<()> {
        // writer encodes the batch into its internal buffers
        let result = self.inner.write(&batch);

        // In progress memory, in bytes
        let in_progress_size = self.inner.memory_size();

        // update the allocation with the pool.
        let reservation_result = self
            .reservation
            .try_resize(in_progress_size)
            .map_err(Error::OutOfMemory);

        // Log any errors
        if let Err(e) = &reservation_result {
            warn!(
                %e,
                in_progress_size,
                in_progress_rows = self.inner.in_progress_rows(),
                existing_allocation = self.reservation.size(),
                "Could not allocate sufficient buffer memory for writing parquet data"
            );
        }

        reservation_result?;
        result?;
        Ok(())
    }

    /// closes the writer, flushing any remaining data and returning
    /// the written [`FileMetaData`]
    ///
    /// [`FileMetaData`]: parquet::format::FileMetaData
    pub fn close(self) -> Result<parquet::format::FileMetaData> {
        // reservation is returned on drop
        Ok(self.inner.close()?)
    }
}

Maybe we could use the same code / approach when DataFusion makes its arrow writer here:

async fn create_async_arrow_writer(
&self,
location: &Path,
object_store: Arc<dyn ObjectStore>,
parquet_props: WriterProperties,
) -> Result<AsyncArrowWriter<BufWriter>> {
let buf_writer = BufWriter::new(object_store, location.clone());
let options = ArrowWriterOptions::new()
.with_properties(parquet_props)
.with_skip_arrow_metadata(self.parquet_options.global.skip_arrow_metadata);
let writer = AsyncArrowWriter::try_new_with_options(
buf_writer,
get_writer_schema(&self.config),
options,
)?;
Ok(writer)
}
/// Parquet options
pub fn parquet_options(&self) -> &TableParquetOptions {
&self.parquet_options
}

@alamb
Copy link
Contributor

alamb commented Mar 6, 2025

Some other random thoughts:

  1. We didn't upstream this originally as our parquet writer pre-dates the ability of DataFusion to write parquet files -- though we would be happy to have someone to do
  2. Our implementation will error if it can't expand the memory reservation, but you could imagine forcing the ArrowWriter to flush its currently buffered data when under memory pressure

@alamb
Copy link
Contributor

alamb commented Mar 6, 2025

Actually, with some more digging I see DataFusion already does report the usage here:
https://github.com/apache/datafusion/blob/da4293323032e2408c9e3b9b28e644a96aea0f13/datafusion/datasource-parquet/src/file_format.rs#L993-L992

Maybe we could add some way for the parquet writer to flush when under memory pressure

The downside of doing so is that the written parquet files will likely be larger / less efficiently encoded

@alamb
Copy link
Contributor

alamb commented Mar 6, 2025

BTW this amazing error message that shows what is actually consuming the memory is thanks to @wiedld 's efforts

Error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
  ParquetSink(ArrowColumnWriter) consumed 65911731 bytes (62.8 MB),
  ExternalSorter[0] consumed 23587552 bytes (22.4 MB),
  ExternalSorterMerge[0] consumed 14261080 bytes (13.6 MB),
  ParquetSink(SerializedFileWriter) consumed 0 bytes.
Error: Failed to allocate additional 1450451 bytes for ParquetSink(ArrowColumnWriter) with 62770337 bytes already allocated for this reservation - 1097237 bytes remain available for the total pool

@wiedld
Copy link
Contributor

wiedld commented Mar 6, 2025

Catching up. You're working on the sort spilling, and have encountered a scenario where the memory limits are being hit due to the writer not the sorter (error msg here). Your reproducer uses very low total memory limits. Your goal is to unblock the spill sorting efforts, by reducing the memory required for parquet encoding (which is configurable). Is this correct?

Your reproducer uses mostly the default parquet writing options, except that you remove some statistics and remove the compression. With the default settings, the parallel parquet encoding will use 1 rowgroup writer and you will be buffering uncompressed data pages for that rowgroup. Have you tried to decrease the page size?

@alamb
Copy link
Contributor

alamb commented Mar 6, 2025

With the default settings, the parallel parquet encoding will use 1 rowgroup writer and you will be buffering uncompressed data pages for that rowgroup. Have you tried to decrease the page size?

I am not sure if just the page size is enough to reduce the memory needed -- you might also have to reduce the max row group size:

https://docs.rs/parquet/latest/parquet/file/properties/struct.WriterPropertiesBuilder.html#method.set_max_row_group_size

@wiedld
Copy link
Contributor

wiedld commented Mar 6, 2025

If turn back on compression, and set table_opts.global.max_row_group_size = 5_000 (this is max row count used here), then you can avoid the OOM on that reproducer. If you keep compression off, then you have to reduce the max row count to be much smaller (so can buffer it all in memory).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants