-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
External sort failing with modest memory limit #15028
Comments
I have tried the repro. With I made small tweaks to the repro code to expose the status of memory consumers and the backtrace, the failure I got was:
I've slightly reformatted the error message to make it more readable. The backtrace is:
Parquet writer consumed most of the memory and triggered the allocation failure. The memory reserved is too small for Parquet writer to hold row groups in memory before flushing out to disk. This issue can also be reproduced without sorting. I tried replacing
I've tried setting a smaller table_opts.global.max_row_group_size = 1000; |
This is a great find / analysis -- thank you @ivankelly and @Kontinuation We actually found something like this upstream in InfluxDB IOx (kudos to @wiedld) and have a solution The Parquet writer can report its memory usage via So what we did was to add code that reported that usage to the memory pool here /// Wraps an [`ArrowWriter`] to track its buffered memory in a
/// DataFusion [`MemoryPool`]
///
/// If the memory used by the `ArrowWriter` exceeds the memory allowed
/// by the `MemoryPool`, subsequent writes will fail.
///
/// Note no attempt is made to cap the memory used by the
/// `ArrowWriter` (for example by flushing earlier), which might be a
/// useful exercise.
#[derive(Debug)]
pub struct TrackedMemoryArrowWriter<W: Write + Send> {
/// The inner ArrowWriter
inner: ArrowWriter<W>,
/// DataFusion memory reservation with
reservation: MemoryReservation,
}
impl<W: Write + Send> TrackedMemoryArrowWriter<W> {
/// create a new `TrackedMemoryArrowWriter<`
pub fn try_new(
sink: W,
schema: SchemaRef,
props: WriterProperties,
pool: Arc<dyn MemoryPool>,
) -> Result<Self> {
let inner = ArrowWriter::try_new(sink, schema, Some(props))?;
let consumer = MemoryConsumer::new("IOx ParquetWriter (TrackedMemoryArrowWriter)");
let reservation = consumer.register(&pool);
Ok(Self { inner, reservation })
}
/// Push a `RecordBatch` into the underlying writer, updating the
/// tracked allocation
pub fn write(&mut self, batch: RecordBatch) -> Result<()> {
// writer encodes the batch into its internal buffers
let result = self.inner.write(&batch);
// In progress memory, in bytes
let in_progress_size = self.inner.memory_size();
// update the allocation with the pool.
let reservation_result = self
.reservation
.try_resize(in_progress_size)
.map_err(Error::OutOfMemory);
// Log any errors
if let Err(e) = &reservation_result {
warn!(
%e,
in_progress_size,
in_progress_rows = self.inner.in_progress_rows(),
existing_allocation = self.reservation.size(),
"Could not allocate sufficient buffer memory for writing parquet data"
);
}
reservation_result?;
result?;
Ok(())
}
/// closes the writer, flushing any remaining data and returning
/// the written [`FileMetaData`]
///
/// [`FileMetaData`]: parquet::format::FileMetaData
pub fn close(self) -> Result<parquet::format::FileMetaData> {
// reservation is returned on drop
Ok(self.inner.close()?)
}
} Maybe we could use the same code / approach when DataFusion makes its arrow writer here: datafusion/datafusion/datasource-parquet/src/file_format.rs Lines 922 to 944 in da42933
|
Some other random thoughts:
|
Actually, with some more digging I see DataFusion already does report the usage here: Maybe we could add some way for the parquet writer to flush when under memory pressure The downside of doing so is that the written parquet files will likely be larger / less efficiently encoded |
BTW this amazing error message that shows what is actually consuming the memory is thanks to @wiedld 's efforts
|
Catching up. You're working on the sort spilling, and have encountered a scenario where the memory limits are being hit due to the writer not the sorter (error msg here). Your reproducer uses very low total memory limits. Your goal is to unblock the spill sorting efforts, by reducing the memory required for parquet encoding (which is configurable). Is this correct? Your reproducer uses mostly the default parquet writing options, except that you remove some statistics and remove the compression. With the default settings, the parallel parquet encoding will use 1 rowgroup writer and you will be buffering uncompressed data pages for that rowgroup. Have you tried to decrease the page size? |
I am not sure if just the page size is enough to reduce the memory needed -- you might also have to reduce the max row group size: |
If turn back on compression, and set |
Describe the bug
As discussed on discord, here's another external sort usecase that's failing.
Repro:
https://github.com/ivankelly/df-repro
To run:
The code reads in a bunch of parquet files (889MB in total) and tries to sort and output to a single parquet file.
Memory is limited to 100MB.
Different batch sizes and target partitions doesn't help.
To Reproduce
No response
Expected behavior
No response
Additional context
No response
The text was updated successfully, but these errors were encountered: