-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Improve datafusion-cli memory usage and considering reserve mem… #14766
Conversation
…ory for the result batches
The testing result for the 10G memory case, now it's 5G peak memory: /usr/bin/time -l cargo run --release -- --mem-pool-type fair -m 5G --maxrows 10 -f '/Users/zhuqi/arrow-datafusion/benchmarks/data/external_sort.sql'
Compiling datafusion-cli v45.0.0 (/Users/zhuqi/arrow-datafusion/datafusion-cli)
Building [=======================> ] 541/542: datafusion-cli(bin)
Finished `release` profile [optimized] target(s) in 6m 11s
Running `/Users/zhuqi/arrow-datafusion/target/release/datafusion-cli --mem-pool-type fair -m 5G --maxrows 10 -f /Users/zhuqi/arrow-datafusion/benchmarks/data/external_sort.sql`
DataFusion CLI v45.0.0
0 row(s) fetched.
Elapsed 0.007 seconds.
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
| l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_shipdate | l_commitdate | l_receiptdate |
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
| 1 | 1551894 | 76910 | 1 | 17.00 | 33078.94 | 0.04 | 0.02 | 1996-03-13 | 1996-02-12 | 1996-03-22 |
| 1 | 673091 | 73092 | 2 | 36.00 | 38306.16 | 0.09 | 0.06 | 1996-04-12 | 1996-02-28 | 1996-04-20 |
| 1 | 636998 | 36999 | 3 | 8.00 | 15479.68 | 0.10 | 0.02 | 1996-01-29 | 1996-03-05 | 1996-01-31 |
| 1 | 21315 | 46316 | 4 | 28.00 | 34616.68 | 0.09 | 0.06 | 1996-04-21 | 1996-03-30 | 1996-05-16 |
| 1 | 240267 | 15274 | 5 | 24.00 | 28974.00 | 0.10 | 0.04 | 1996-03-30 | 1996-03-14 | 1996-04-01 |
| 1 | 156345 | 6348 | 6 | 32.00 | 44842.88 | 0.07 | 0.02 | 1996-01-30 | 1996-02-07 | 1996-02-03 |
| 2 | 1061698 | 11719 | 1 | 38.00 | 63066.32 | 0.00 | 0.05 | 1997-01-28 | 1997-01-14 | 1997-02-02 |
| 3 | 42970 | 17971 | 1 | 45.00 | 86083.65 | 0.06 | 0.00 | 1994-02-02 | 1994-01-04 | 1994-02-23 |
| 3 | 190355 | 65359 | 2 | 49.00 | 70822.15 | 0.10 | 0.00 | 1993-11-09 | 1993-12-20 | 1993-11-24 |
| 3 | 1284483 | 34508 | 3 | 27.00 | 39620.34 | 0.06 | 0.07 | 1994-01-16 | 1993-11-22 | 1994-01-23 |
| . |
| . |
| . |
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
81920 row(s) fetched. (First 10 displayed. Use --maxrows to adjust)
Elapsed 2.044 seconds.
378.04 real 8.92 user 5.89 sys
5026254848 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
1281540 page reclaims
0 page faults
0 swaps
0 block input operations
0 block output operations
0 messages sent
0 messages received
0 signals received
2049 voluntary context switches
79635 involuntary context switches
191394982554 instructions retired
54836127363 cycles elapsed
5024748368 peak memory footprint |
Before this PR, the result is double: /usr/bin/time -l cargo run --release -- --mem-pool-type fair -m 5G --maxrows 10 -f '/Users/zhuqi/arrow-datafusion/benchmarks/data/external_sort.sql'
Compiling datafusion-cli v45.0.0 (/Users/zhuqi/arrow-datafusion/datafusion-cli)
Finished `release` profile [optimized] target(s) in 6m 05s
Running `/Users/zhuqi/arrow-datafusion/target/release/datafusion-cli --mem-pool-type fair -m 5G --maxrows 10 -f /Users/zhuqi/arrow-datafusion/benchmarks/data/external_sort.sql`
DataFusion CLI v45.0.0
0 row(s) fetched.
Elapsed 0.007 seconds.
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
| l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_shipdate | l_commitdate | l_receiptdate |
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
| 1 | 1551894 | 76910 | 1 | 17.00 | 33078.94 | 0.04 | 0.02 | 1996-03-13 | 1996-02-12 | 1996-03-22 |
| 1 | 673091 | 73092 | 2 | 36.00 | 38306.16 | 0.09 | 0.06 | 1996-04-12 | 1996-02-28 | 1996-04-20 |
| 1 | 636998 | 36999 | 3 | 8.00 | 15479.68 | 0.10 | 0.02 | 1996-01-29 | 1996-03-05 | 1996-01-31 |
| 1 | 21315 | 46316 | 4 | 28.00 | 34616.68 | 0.09 | 0.06 | 1996-04-21 | 1996-03-30 | 1996-05-16 |
| 1 | 240267 | 15274 | 5 | 24.00 | 28974.00 | 0.10 | 0.04 | 1996-03-30 | 1996-03-14 | 1996-04-01 |
| 1 | 156345 | 6348 | 6 | 32.00 | 44842.88 | 0.07 | 0.02 | 1996-01-30 | 1996-02-07 | 1996-02-03 |
| 2 | 1061698 | 11719 | 1 | 38.00 | 63066.32 | 0.00 | 0.05 | 1997-01-28 | 1997-01-14 | 1997-02-02 |
| 3 | 42970 | 17971 | 1 | 45.00 | 86083.65 | 0.06 | 0.00 | 1994-02-02 | 1994-01-04 | 1994-02-23 |
| 3 | 190355 | 65359 | 2 | 49.00 | 70822.15 | 0.10 | 0.00 | 1993-11-09 | 1993-12-20 | 1993-11-24 |
| 3 | 1284483 | 34508 | 3 | 27.00 | 39620.34 | 0.06 | 0.07 | 1994-01-16 | 1993-11-22 | 1994-01-23 |
| . |
| . |
| . |
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
59986052 row(s) fetched. (First 10 displayed. Use --maxrows to adjust)
Elapsed 5.044 seconds.
375.80 real 13.14 user 8.05 sys
9792942080 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
2445044 page reclaims
205 page faults
0 swaps
0 block input operations
0 block output operations
0 messages sent
0 messages received
0 signals received
2537 voluntary context switches
99881 involuntary context switches
313190538941 instructions retired
79238532090 cycles elapsed
9802777552 peak memory footprint |
Thank you for the help. So I think we need two extra configurations for this purpose: Now I'm not sure if it's okay to register result sink for a |
Thank you @2010YOUY01 for review, good suggestion, if i make sense right, so we will control two extra things:
if results.len() >= max_rows && stop-after-max-rows{
break;
}
Maybe we can do more smart things for memory usage for keeping the result and print? For example, we add a option that peek-memory-for-batch-result. Also we can register the peek-memory-for-batch-result to memory-pool? So we will not register higher memory for datafusion-cli, if we want to fully materialized but we only want to print it?
|
I think it's very rare to print super large results to the terminal, to make this approach useful 🤔 |
@2010YOUY01 It makes sense to me, thanks! |
Addressed comments in latest PR @2010YOUY01 ,thanks! |
I think, it will also benefit the #14510 |
Another thing that I remember |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for working on this @zhuqi-lucas -- I had some comments. Let me know what you think
datafusion-cli/src/main.rs
Outdated
#[clap( | ||
short, | ||
long, | ||
help = "Whether to stop early when max rows is reached, this will help reduce the memory usage when the result is large" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this option is basically the same as adding a LIMIT max_rows
to the query and thus is somewhat redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good point, remove it now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The option appears to still be present
datafusion-cli/src/exec.rs
Outdated
results.push(batch); | ||
if let MaxRows::Limited(max_rows) = print_options.maxrows { | ||
// Stop collecting results if the number of rows exceeds the limit | ||
if results.len() >= max_rows && print_options.stop_after_max_rows { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you break here it will stop the plan early -- then you need to add stop_after_max_rows
to permit the plan to run to completion, however, if that option is set then the data is buffered again (which from a memory perspective is the same as increasing the max_rows)
Rather than break
ing here, I recommend just ignoring the batches after max_rows
has been reached (aka don't push
them into results
) -- I think that would be the most intuitive behavior:
- Queries would still always run to completion
- Max rows would control how much memory was used by datafusion-cli buffering
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @alamb for review and the great idea! It makes sense to me, addressed in latest PR.
Great idea, thank you @alamb , addressed in latest PR. |
Testing result: /usr/bin/time -l cargo run --release -- --mem-pool-type fair -m 5G --maxrows 1 -f '/Users/zhuqi/arrow-datafusion/benchmarks/data/external_sort.sql'
Finished `release` profile [optimized] target(s) in 0.26s
Running `/Users/zhuqi/arrow-datafusion/target/release/datafusion-cli --mem-pool-type fair -m 5G --maxrows 1 -f /Users/zhuqi/arrow-datafusion/benchmarks/data/external_sort.sql`
DataFusion CLI v45.0.0
0 row(s) fetched.
Elapsed 0.006 seconds.
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
| l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_shipdate | l_commitdate | l_receiptdate |
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
| 1 | 1551894 | 76910 | 1 | 17.00 | 33078.94 | 0.04 | 0.02 | 1996-03-13 | 1996-02-12 | 1996-03-22 |
| . |
| . |
| . |
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
8192 row(s) fetched. (First 1 displayed. Use --maxrows to adjust)
Elapsed 4.129 seconds.
8.16 real 12.72 user 7.25 sys
5292523520 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
1334371 page reclaims
0 page faults
0 swaps
0 block input operations
0 block output operations
0 messages sent
0 messages received
0 signals received
1830 voluntary context switches
107473 involuntary context switches
296497285174 instructions retired
74662056546 cycles elapsed
5291681480 peak memory footprint |
Don't break early makes sense to me, I believe it's intended in most cases, let's keep it simple. If the query is selecting the whole table, this result does not look correct:
It should reflect the result size, so it should be the row count of the whole table. |
Good catch, @2010YOUY01 , because we don't pass the row-count to print details, i will try to fix it. |
Fixed the row count in latest PR, thanks @2010YOUY01 ! It's the same count before this PR. /usr/bin/time -l cargo run --release -- --mem-pool-type fair -m 5G --maxrows 1 -f '/Users/zhuqi/arrow-datafusion/benchmarks/data/external_sort.sql'
Compiling datafusion-cli v45.0.0 (/Users/zhuqi/arrow-datafusion/datafusion-cli)
Building [=======================> ] 541/542: datafusion-cli(bin)
Finished `release` profile [optimized] target(s) in 6m 03s
Running `/Users/zhuqi/arrow-datafusion/target/release/datafusion-cli --mem-pool-type fair -m 5G --maxrows 1 -f /Users/zhuqi/arrow-datafusion/benchmarks/data/external_sort.sql`
DataFusion CLI v45.0.0
0 row(s) fetched.
Elapsed 0.006 seconds.
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
| l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_shipdate | l_commitdate | l_receiptdate |
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
| 1 | 1551894 | 76910 | 1 | 17.00 | 33078.94 | 0.04 | 0.02 | 1996-03-13 | 1996-02-12 | 1996-03-22 |
| . |
| . |
| . |
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
59986052 row(s) fetched. (First 1 displayed. Use --maxrows to adjust)
Elapsed 4.080 seconds.
371.52 real 12.41 user 6.23 sys
5065715712 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
1290233 page reclaims
0 page faults
0 swaps
0 block input operations
0 block output operations
0 messages sent
0 messages received
0 signals received
1766 voluntary context switches
84133 involuntary context switches
291219115157 instructions retired
69961052638 cycles elapsed
5064254312 peak memory footprint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zhuqi-lucas and @2010YOUY01
I think it would be nice to remove the unused stop_after_max_rows
option now, but we could also do it as a follow on PR too if you prefer
datafusion-cli/src/main.rs
Outdated
#[clap( | ||
short, | ||
long, | ||
help = "Whether to stop early when max rows is reached, this will help reduce the memory usage when the result is large" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The option appears to still be present
@@ -102,14 +102,14 @@ impl PrintOptions { | |||
schema: SchemaRef, | |||
batches: &[RecordBatch], | |||
query_start_time: Instant, | |||
row_count: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a great idea
Thank you @alamb @2010YOUY01 for review, also removed the unused stop_after_max_rows option in latest PR! |
} | ||
adjusted | ||
.into_inner() | ||
.print_batches(schema, &results, now, row_count)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice if it even could print based on the stream instead of collecting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Dandandan review , it's a good point if we setting the maxrows huge number because the results only keep the maxrows count batch size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created a follow-up ticket:
#14810
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice
apache#14766) * feat: Improve datafusion-cli memory usage and considering reserve memory for the result batches * Address new comments * Address new comments * fix test * fix test * Address comments * Fix doc * Fix row count showing * Fix fmt * fix corner case * remove unused code
…ory for the result batches
Which issue does this PR close?
Rationale for this change
This is the follow-up for the discussion #14644 (comment)
Problem
I tried one query and this PR is not working as expected, I specified one query to run under 5GB memory (select * without order requires 7GB) but it's still consuming around 10GB, could you double check? I suspect we missed some details.
It is a problem of datafusion-cli. If datafusion-cli decides to hold all the result batches in memory, it should create a memory consumer for itself and reserve memory for the result batches.
datafusion-cli calls collect to hold the entire result set in memory before displaying it, this is unnecessary when maxrows is not unlimited. I've tried the following code to replace the collect call, and the maximum resident set size has reduced to 4.8 GB:
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?
It fixed the datafusion-cli memory high usage when we only print part of the huge result.