Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve datafusion-cli memory usage and considering reserve mem… #14766

Merged
merged 13 commits into from
Feb 21, 2025

Conversation

zhuqi-lucas
Copy link
Contributor

…ory for the result batches

Which issue does this PR close?

Rationale for this change

This is the follow-up for the discussion #14644 (comment)

Problem
I tried one query and this PR is not working as expected, I specified one query to run under 5GB memory (select * without order requires 7GB) but it's still consuming around 10GB, could you double check? I suspect we missed some details.

It is a problem of datafusion-cli. If datafusion-cli decides to hold all the result batches in memory, it should create a memory consumer for itself and reserve memory for the result batches.

datafusion-cli calls collect to hold the entire result set in memory before displaying it, this is unnecessary when maxrows is not unlimited. I've tried the following code to replace the collect call, and the maximum resident set size has reduced to 4.8 GB:

What changes are included in this PR?

  1. Support memory consumer for datafusion-cli itself and reserve memory for the result batches.
  2. Support hold the streaming the result set in memory and displaying it when maxrows is not unlimited.

Are these changes tested?

Are there any user-facing changes?

It fixed the datafusion-cli memory high usage when we only print part of the huge result.

@zhuqi-lucas
Copy link
Contributor Author

cc @alamb @2010YOUY01

@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented Feb 19, 2025

  1. The memory usage now is accurate, it will not collect all result to memory.
  2. We now register datafusion-cli result batch to memory pool also.

The testing result for the 10G memory case, now it's 5G peak memory:

/usr/bin/time -l cargo run --release -- --mem-pool-type fair -m 5G --maxrows 10 -f '/Users/zhuqi/arrow-datafusion/benchmarks/data/external_sort.sql'
   Compiling datafusion-cli v45.0.0 (/Users/zhuqi/arrow-datafusion/datafusion-cli)
    Building [=======================> ] 541/542: datafusion-cli(bin)

    Finished `release` profile [optimized] target(s) in 6m 11s
     Running `/Users/zhuqi/arrow-datafusion/target/release/datafusion-cli --mem-pool-type fair -m 5G --maxrows 10 -f /Users/zhuqi/arrow-datafusion/benchmarks/data/external_sort.sql`
DataFusion CLI v45.0.0
0 row(s) fetched.
Elapsed 0.007 seconds.

+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
| l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_shipdate | l_commitdate | l_receiptdate |
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
| 1          | 1551894   | 76910     | 1            | 17.00      | 33078.94        | 0.04       | 0.02  | 1996-03-13 | 1996-02-12   | 1996-03-22    |
| 1          | 673091    | 73092     | 2            | 36.00      | 38306.16        | 0.09       | 0.06  | 1996-04-12 | 1996-02-28   | 1996-04-20    |
| 1          | 636998    | 36999     | 3            | 8.00       | 15479.68        | 0.10       | 0.02  | 1996-01-29 | 1996-03-05   | 1996-01-31    |
| 1          | 21315     | 46316     | 4            | 28.00      | 34616.68        | 0.09       | 0.06  | 1996-04-21 | 1996-03-30   | 1996-05-16    |
| 1          | 240267    | 15274     | 5            | 24.00      | 28974.00        | 0.10       | 0.04  | 1996-03-30 | 1996-03-14   | 1996-04-01    |
| 1          | 156345    | 6348      | 6            | 32.00      | 44842.88        | 0.07       | 0.02  | 1996-01-30 | 1996-02-07   | 1996-02-03    |
| 2          | 1061698   | 11719     | 1            | 38.00      | 63066.32        | 0.00       | 0.05  | 1997-01-28 | 1997-01-14   | 1997-02-02    |
| 3          | 42970     | 17971     | 1            | 45.00      | 86083.65        | 0.06       | 0.00  | 1994-02-02 | 1994-01-04   | 1994-02-23    |
| 3          | 190355    | 65359     | 2            | 49.00      | 70822.15        | 0.10       | 0.00  | 1993-11-09 | 1993-12-20   | 1993-11-24    |
| 3          | 1284483   | 34508     | 3            | 27.00      | 39620.34        | 0.06       | 0.07  | 1994-01-16 | 1993-11-22   | 1994-01-23    |
| .                                                                                                                                                 |
| .                                                                                                                                                 |
| .                                                                                                                                                 |
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
81920 row(s) fetched. (First 10 displayed. Use --maxrows to adjust)
Elapsed 2.044 seconds.

      378.04 real         8.92 user         5.89 sys
          5026254848  maximum resident set size
                   0  average shared memory size
                   0  average unshared data size
                   0  average unshared stack size
             1281540  page reclaims
                   0  page faults
                   0  swaps
                   0  block input operations
                   0  block output operations
                   0  messages sent
                   0  messages received
                   0  signals received
                2049  voluntary context switches
               79635  involuntary context switches
        191394982554  instructions retired
         54836127363  cycles elapsed
          5024748368  peak memory footprint

@zhuqi-lucas
Copy link
Contributor Author

Before this PR, the result is double:

/usr/bin/time -l cargo run --release -- --mem-pool-type fair -m 5G --maxrows 10 -f '/Users/zhuqi/arrow-datafusion/benchmarks/data/external_sort.sql'

   Compiling datafusion-cli v45.0.0 (/Users/zhuqi/arrow-datafusion/datafusion-cli)
    Finished `release` profile [optimized] target(s) in 6m 05s
     Running `/Users/zhuqi/arrow-datafusion/target/release/datafusion-cli --mem-pool-type fair -m 5G --maxrows 10 -f /Users/zhuqi/arrow-datafusion/benchmarks/data/external_sort.sql`
DataFusion CLI v45.0.0
0 row(s) fetched.
Elapsed 0.007 seconds.

+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
| l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_shipdate | l_commitdate | l_receiptdate |
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
| 1          | 1551894   | 76910     | 1            | 17.00      | 33078.94        | 0.04       | 0.02  | 1996-03-13 | 1996-02-12   | 1996-03-22    |
| 1          | 673091    | 73092     | 2            | 36.00      | 38306.16        | 0.09       | 0.06  | 1996-04-12 | 1996-02-28   | 1996-04-20    |
| 1          | 636998    | 36999     | 3            | 8.00       | 15479.68        | 0.10       | 0.02  | 1996-01-29 | 1996-03-05   | 1996-01-31    |
| 1          | 21315     | 46316     | 4            | 28.00      | 34616.68        | 0.09       | 0.06  | 1996-04-21 | 1996-03-30   | 1996-05-16    |
| 1          | 240267    | 15274     | 5            | 24.00      | 28974.00        | 0.10       | 0.04  | 1996-03-30 | 1996-03-14   | 1996-04-01    |
| 1          | 156345    | 6348      | 6            | 32.00      | 44842.88        | 0.07       | 0.02  | 1996-01-30 | 1996-02-07   | 1996-02-03    |
| 2          | 1061698   | 11719     | 1            | 38.00      | 63066.32        | 0.00       | 0.05  | 1997-01-28 | 1997-01-14   | 1997-02-02    |
| 3          | 42970     | 17971     | 1            | 45.00      | 86083.65        | 0.06       | 0.00  | 1994-02-02 | 1994-01-04   | 1994-02-23    |
| 3          | 190355    | 65359     | 2            | 49.00      | 70822.15        | 0.10       | 0.00  | 1993-11-09 | 1993-12-20   | 1993-11-24    |
| 3          | 1284483   | 34508     | 3            | 27.00      | 39620.34        | 0.06       | 0.07  | 1994-01-16 | 1993-11-22   | 1994-01-23    |
| .                                                                                                                                                 |
| .                                                                                                                                                 |
| .                                                                                                                                                 |
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
59986052 row(s) fetched. (First 10 displayed. Use --maxrows to adjust)
Elapsed 5.044 seconds.

      375.80 real        13.14 user         8.05 sys
          9792942080  maximum resident set size
                   0  average shared memory size
                   0  average unshared data size
                   0  average unshared stack size
             2445044  page reclaims
                 205  page faults
                   0  swaps
                   0  block input operations
                   0  block output operations
                   0  messages sent
                   0  messages received
                   0  signals received
                2537  voluntary context switches
               99881  involuntary context switches
        313190538941  instructions retired
         79238532090  cycles elapsed
          9802777552  peak memory footprint

@2010YOUY01
Copy link
Contributor

Thank you for the help.
This change will stop execution once maxrow is reached. I think this is the optimal behavior for application developers using datafusion-cli for quick experiments.
However, datafusion internal developers might use it for timing certain queries, so perhaps they want queries to run till the end. Also, I think the current behavior can also be useful, if it is intended to measure the maximum memory footprint with the result fully materialized. (Though it's a rarer case)

So I think we need two extra configurations for this purpose:
--stop-after-max-rows, default: false -- Controls whether to stop early when maxrows is reached. I guess there is less people using datafusion-cli for application purposes? So default to false for accurate timing.
--retain-full-results, default: false -- Controls whether to throw away or accumulate result batches after maxrows is reached

Now I'm not sure if it's okay to register result sink for a MemoryReservation, I'll think about it in the background for a while.

@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented Feb 19, 2025

Thank you @2010YOUY01 for review, good suggestion, if i make sense right, so we will control two extra things:

  1. --stop-after-max-rows, default: false , we can use this to control the end of the streaming.
if results.len() >= max_rows && stop-after-max-rows{
      break;
}
  1. --retain-full-results, default: false , we can use this to control the memory usage:

Maybe we can do more smart things for memory usage for keeping the result and print? For example, we add a option that peek-memory-for-batch-result.

Also we can register the peek-memory-for-batch-result to memory-pool? So we will not register higher memory for datafusion-cli, if we want to fully materialized but we only want to print it?

current_total_usage

we only register current_total_usage to memory pool, but not fully materialized result

add new batch to current_total_usage

if current_total_usage + batch >= memory-for-batch-result {
     1. // we should print it 
     2. // we should free the current_total_usage 
}

@2010YOUY01
Copy link
Contributor

Maybe we can do more smart things for memory usage for keeping the result and print? For example, we add a option that peek-memory-for-batch-result.

Also we can register the peek-memory-for-batch-result to memory-pool? So we will not register higher memory for datafusion-cli, if we want to fully materialized but we only want to print it?

current_total_usage

we only register current_total_usage to memory pool, but not fully materialized result

add new batch to current_total_usage

if current_total_usage + batch >= memory-for-batch-result {
     1. // we should print it 
     2. // we should free the current_total_usage 
}

I think it's very rare to print super large results to the terminal, to make this approach useful 🤔
I've reconsidered --retain-full-results, this is only useful for some very rare profiling use cases, perhaps we can ignore this at the moment for simplicity.
How about only adding --stop-after-max-rows, and by default free the memory of evaluated batches after maxrows reached?

@zhuqi-lucas
Copy link
Contributor Author

Maybe we can do more smart things for memory usage for keeping the result and print? For example, we add a option that peek-memory-for-batch-result.
Also we can register the peek-memory-for-batch-result to memory-pool? So we will not register higher memory for datafusion-cli, if we want to fully materialized but we only want to print it?

current_total_usage

we only register current_total_usage to memory pool, but not fully materialized result

add new batch to current_total_usage

if current_total_usage + batch >= memory-for-batch-result {
     1. // we should print it 
     2. // we should free the current_total_usage 
}

I think it's very rare to print super large results to the terminal, to make this approach useful 🤔 I've reconsidered --retain-full-results, this is only useful for some very rare profiling use cases, perhaps we can ignore this at the moment for simplicity. How about only adding --stop-after-max-rows, and by default free the memory of evaluated batches after maxrows reached?

@2010YOUY01 It makes sense to me, thanks!

@github-actions github-actions bot added the documentation Improvements or additions to documentation label Feb 19, 2025
@zhuqi-lucas
Copy link
Contributor Author

Addressed comments in latest PR @2010YOUY01 ,thanks!

@zhuqi-lucas
Copy link
Contributor Author

I think, it will also benefit the #14510

@alamb alamb mentioned this pull request Feb 19, 2025
@alamb
Copy link
Contributor

alamb commented Feb 19, 2025

It is a problem of datafusion-cli. If datafusion-cli decides to hold all the result batches in memory, it should create a memory consumer for itself and reserve memory for the result batches.

Another thing that I remember psql did was to only buffer the first like 1000 lines or so to figure out column spacing -- right now datafusion-cli buffers the entire thing to ensure it can format all the rows reasonably which is likely unecessary for the majority of the time

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for working on this @zhuqi-lucas -- I had some comments. Let me know what you think

#[clap(
short,
long,
help = "Whether to stop early when max rows is reached, this will help reduce the memory usage when the result is large"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this option is basically the same as adding a LIMIT max_rows to the query and thus is somewhat redundant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point, remove it now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option appears to still be present

results.push(batch);
if let MaxRows::Limited(max_rows) = print_options.maxrows {
// Stop collecting results if the number of rows exceeds the limit
if results.len() >= max_rows && print_options.stop_after_max_rows {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you break here it will stop the plan early -- then you need to add stop_after_max_rows to permit the plan to run to completion, however, if that option is set then the data is buffered again (which from a memory perspective is the same as increasing the max_rows)

Rather than breaking here, I recommend just ignoring the batches after max_rows has been reached (aka don't push them into results) -- I think that would be the most intuitive behavior:

  1. Queries would still always run to completion
  2. Max rows would control how much memory was used by datafusion-cli buffering

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @alamb for review and the great idea! It makes sense to me, addressed in latest PR.

@zhuqi-lucas
Copy link
Contributor Author

Thank you for working on this @zhuqi-lucas -- I had some comments. Let me know what you think

Great idea, thank you @alamb , addressed in latest PR.

@zhuqi-lucas zhuqi-lucas requested a review from alamb February 19, 2025 14:58
@zhuqi-lucas
Copy link
Contributor Author

Testing result:

/usr/bin/time -l cargo run --release -- --mem-pool-type fair  -m 5G  --maxrows 1 -f '/Users/zhuqi/arrow-datafusion/benchmarks/data/external_sort.sql'
    Finished `release` profile [optimized] target(s) in 0.26s
     Running `/Users/zhuqi/arrow-datafusion/target/release/datafusion-cli --mem-pool-type fair -m 5G --maxrows 1 -f /Users/zhuqi/arrow-datafusion/benchmarks/data/external_sort.sql`
DataFusion CLI v45.0.0
0 row(s) fetched.
Elapsed 0.006 seconds.

+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
| l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_shipdate | l_commitdate | l_receiptdate |
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
| 1          | 1551894   | 76910     | 1            | 17.00      | 33078.94        | 0.04       | 0.02  | 1996-03-13 | 1996-02-12   | 1996-03-22    |
| .                                                                                                                                                 |
| .                                                                                                                                                 |
| .                                                                                                                                                 |
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
8192 row(s) fetched. (First 1 displayed. Use --maxrows to adjust)
Elapsed 4.129 seconds.

        8.16 real        12.72 user         7.25 sys
          5292523520  maximum resident set size
                   0  average shared memory size
                   0  average unshared data size
                   0  average unshared stack size
             1334371  page reclaims
                   0  page faults
                   0  swaps
                   0  block input operations
                   0  block output operations
                   0  messages sent
                   0  messages received
                   0  signals received
                1830  voluntary context switches
              107473  involuntary context switches
        296497285174  instructions retired
         74662056546  cycles elapsed
          5291681480  peak memory footprint

@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Feb 19, 2025
@2010YOUY01
Copy link
Contributor

Don't break early makes sense to me, I believe it's intended in most cases, let's keep it simple.

If the query is selecting the whole table, this result does not look correct:

8192 row(s) fetched. (First 1 displayed. Use --maxrows to adjust)

It should reflect the result size, so it should be the row count of the whole table.

@zhuqi-lucas
Copy link
Contributor Author

Don't break early makes sense to me, I believe it's intended in most cases, let's keep it simple.

If the query is selecting the whole table, this result does not look correct:

8192 row(s) fetched. (First 1 displayed. Use --maxrows to adjust)

It should reflect the result size, so it should be the row count of the whole table.

Good catch, @2010YOUY01 , because we don't pass the row-count to print details, i will try to fix it.

@zhuqi-lucas
Copy link
Contributor Author

zhuqi-lucas commented Feb 20, 2025

Fixed the row count in latest PR, thanks @2010YOUY01 ! It's the same count before this PR.

/usr/bin/time -l cargo run --release -- --mem-pool-type fair  -m 5G  --maxrows 1 -f '/Users/zhuqi/arrow-datafusion/benchmarks/data/external_sort.sql'
   Compiling datafusion-cli v45.0.0 (/Users/zhuqi/arrow-datafusion/datafusion-cli)
    Building [=======================> ] 541/542: datafusion-cli(bin)
    Finished `release` profile [optimized] target(s) in 6m 03s
     Running `/Users/zhuqi/arrow-datafusion/target/release/datafusion-cli --mem-pool-type fair -m 5G --maxrows 1 -f /Users/zhuqi/arrow-datafusion/benchmarks/data/external_sort.sql`
DataFusion CLI v45.0.0
0 row(s) fetched.
Elapsed 0.006 seconds.

+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
| l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_shipdate | l_commitdate | l_receiptdate |
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
| 1          | 1551894   | 76910     | 1            | 17.00      | 33078.94        | 0.04       | 0.02  | 1996-03-13 | 1996-02-12   | 1996-03-22    |
| .                                                                                                                                                 |
| .                                                                                                                                                 |
| .                                                                                                                                                 |
+------------+-----------+-----------+--------------+------------+-----------------+------------+-------+------------+--------------+---------------+
59986052 row(s) fetched. (First 1 displayed. Use --maxrows to adjust)
Elapsed 4.080 seconds.

      371.52 real        12.41 user         6.23 sys
          5065715712  maximum resident set size
                   0  average shared memory size
                   0  average unshared data size
                   0  average unshared stack size
             1290233  page reclaims
                   0  page faults
                   0  swaps
                   0  block input operations
                   0  block output operations
                   0  messages sent
                   0  messages received
                   0  signals received
                1766  voluntary context switches
               84133  involuntary context switches
        291219115157  instructions retired
         69961052638  cycles elapsed
          5064254312  peak memory footprint

Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Thank you.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zhuqi-lucas and @2010YOUY01

I think it would be nice to remove the unused stop_after_max_rows option now, but we could also do it as a follow on PR too if you prefer

#[clap(
short,
long,
help = "Whether to stop early when max rows is reached, this will help reduce the memory usage when the result is large"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option appears to still be present

@@ -102,14 +102,14 @@ impl PrintOptions {
schema: SchemaRef,
batches: &[RecordBatch],
query_start_time: Instant,
row_count: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a great idea

@zhuqi-lucas
Copy link
Contributor Author

Thanks @zhuqi-lucas and @2010YOUY01

I think it would be nice to remove the unused stop_after_max_rows option now, but we could also do it as a follow on PR too if you prefer

Thank you @alamb @2010YOUY01 for review, also removed the unused stop_after_max_rows option in latest PR!

@2010YOUY01 2010YOUY01 merged commit 83487e3 into apache:main Feb 21, 2025
24 checks passed
}
adjusted
.into_inner()
.print_batches(schema, &results, now, row_count)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice if it even could print based on the stream instead of collecting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Dandandan review , it's a good point if we setting the maxrows huge number because the results only keep the maxrows count batch size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a follow-up ticket:
#14810

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

ozankabak pushed a commit to synnada-ai/datafusion-upstream that referenced this pull request Feb 25, 2025
apache#14766)

* feat: Improve datafusion-cli memory usage and considering reserve memory for the result batches

* Address new comments

* Address new comments

* fix test

* fix test

* Address comments

* Fix doc

* Fix row count showing

* Fix fmt

* fix corner case

* remove unused code
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants