Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into wiedld/refactor-sort-…
Browse files Browse the repository at this point in the history
…pushdown
  • Loading branch information
alamb committed Feb 25, 2025
2 parents d5bb721 + d0ab003 commit bcdebc1
Show file tree
Hide file tree
Showing 54 changed files with 3,594 additions and 390 deletions.
5 changes: 1 addition & 4 deletions .github/workflows/extended.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@ concurrency:

# https://docs.github.com/en/actions/writing-workflows/choosing-when-your-workflow-runs/events-that-trigger-workflows#running-your-pull_request-workflow-when-a-pull-request-merges
#
# These jobs only run on the `main` branch as they are time consuming
# These jobs are not run as part of PR checks as they are time-consuming
# and should not fail often.
#
# We still run them as they provide important coverage to ensure correctness
# in the (very rare) event of a hash failure or sqlite library query failure.
on:
# Run on all commits to main
push:
branches:
- main

jobs:
# Check crate compiles and base cargo check passes
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,6 @@ datafusion/core/benches/data/*
# rat
filtered_rat.txt
rat.txt

# data generated by examples
datafusion-examples/examples/datafusion-examples/
7 changes: 2 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ homepage = "https://datafusion.apache.org"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/datafusion"
# Define Minimum Supported Rust Version (MSRV)
rust-version = "1.82.0"
# Define DataFusion version
version = "45.0.0"

[workspace.dependencies]
Expand Down Expand Up @@ -144,7 +146,6 @@ pbjson = { version = "0.7.0" }
pbjson-types = "0.7"
# Should match arrow-flight's version of prost.
prost = "0.13.1"
prost-derive = "0.13.1"
rand = "0.8.5"
recursive = "0.1.1"
regex = "1.8"
Expand Down
104 changes: 83 additions & 21 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ git checkout main
# Gather baseline data for tpch benchmark
./benchmarks/bench.sh run tpch

# Switch to the branch the branch name is mybranch and gather data
# Switch to the branch named mybranch and gather data
git checkout mybranch
./benchmarks/bench.sh run tpch

Expand Down Expand Up @@ -157,22 +157,19 @@ Benchmark tpch_mem.json
└──────────────┴──────────────┴──────────────┴───────────────┘
```

Note that you can also execute an automatic comparison of the changes in a given PR against the base
just by including the trigger `/benchmark` in any comment.

### Running Benchmarks Manually

Assuming data in the `data` directory, the `tpch` benchmark can be run with a command like this
Assuming data is in the `data` directory, the `tpch` benchmark can be run with a command like this:

```bash
cargo run --release --bin dfbench -- tpch --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
```

See the help for more details
See the help for more details.

### Different features

You can enable `mimalloc` or `snmalloc` (to use either the mimalloc or snmalloc allocator) as features by passing them in as `--features`. For example
You can enable `mimalloc` or `snmalloc` (to use either the mimalloc or snmalloc allocator) as features by passing them in as `--features`. For example:

```shell
cargo run --release --features "mimalloc" --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096
Expand All @@ -184,6 +181,7 @@ The benchmark program also supports CSV and Parquet input file formats and a uti
```bash
cargo run --release --bin tpch -- convert --input ./data --output /mnt/tpch-parquet --format parquet
```

Or if you want to verify and run all the queries in the benchmark, you can just run `cargo test`.

### Comparing results between runs
Expand All @@ -206,7 +204,7 @@ $ cargo run --release --bin tpch -- benchmark datafusion --iterations 5 --path .
./compare.py /tmp/output_main/tpch-summary--1679330119.json /tmp/output_branch/tpch-summary--1679328405.json
```

This will produce output like
This will produce output like:

```
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
Expand Down Expand Up @@ -243,28 +241,92 @@ The `dfbench` program contains subcommands to run the various
benchmarks. When benchmarking, it should always be built in release
mode using `--release`.

Full help for each benchmark can be found in the relevant sub
command. For example to get help for tpch, run
Full help for each benchmark can be found in the relevant
subcommand. For example, to get help for tpch, run:

```shell
cargo run --release --bin dfbench --help
cargo run --release --bin dfbench -- tpch --help
...
datafusion-benchmarks 27.0.0
benchmark command
dfbench-tpch 45.0.0
Run the tpch benchmark.

This benchmarks is derived from the [TPC-H][1] version
[2.17.1]. The data and answers are generated using `tpch-gen` from
[2].

[1]: http://www.tpc.org/tpch/
[2]: https://github.com/databricks/tpch-dbgen.git,
[2.17.1]: https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-h_v2.17.1.pdf

USAGE:
dfbench <SUBCOMMAND>
dfbench tpch [FLAGS] [OPTIONS] --path <path>

FLAGS:
-d, --debug
Activate debug mode to see more details

SUBCOMMANDS:
clickbench Run the clickbench benchmark
help Prints this message or the help of the given subcommand(s)
parquet-filter Test performance of parquet filter pushdown
sort Test performance of parquet filter pushdown
tpch Run the tpch benchmark.
tpch-convert Convert tpch .slt files to .parquet or .csv files
-S, --disable-statistics
Whether to disable collection of statistics (and cost based optimizations) or not

-h, --help
Prints help information
...
```

# Writing a new benchmark

## Creating or downloading data outside of the benchmark

If you want to create or download the data with Rust as part of running the benchmark, see the next
section on adding a benchmark subcommand and add code to create or download data as part of its
`run` function.

If you want to create or download the data with shell commands, in `benchmarks/bench.sh`, define a
new function named `data_[your benchmark name]` and call that function in the `data` command case
as a subcommand case named for your benchmark. Also call the new function in the `data all` case.

## Adding the benchmark subcommand

In `benchmarks/bench.sh`, define a new function named `run_[your benchmark name]` following the
example of existing `run_*` functions. Call that function in the `run` command case as a subcommand
case named for your benchmark. subcommand for your benchmark. Also call the new function in the
`run all` case. Add documentation for your benchmark to the text in the `usage` function.

In `benchmarks/src/bin/dfbench.rs`, add a `dfbench` subcommand for your benchmark by:

- Adding a new variant to the `Options` enum
- Adding corresponding code to handle the new variant in the `main` function, similar to the other
variants
- Adding a module to the `use datafusion_benchmarks::{}` statement

In `benchmarks/src/lib.rs`, declare the new module you imported in `dfbench.rs` and create the
corresponding file(s) for the module's code.

In the module, following the pattern of other existing benchmarks, define a `RunOpt` struct with:

- A doc comment that will become the `--help` output for the subcommand
- A `run` method that the `dfbench` `main` function will call.
- A `--path` structopt field that the `bench.sh` script should use with `${DATA_DIR}` to define
where the input data should be stored.
- An `--output` structopt field that the `bench.sh` script should use with `"${RESULTS_FILE}"` to
define where the benchmark's results should be stored.

### Creating or downloading data as part of the benchmark

Use the `--path` structopt field defined on the `RunOpt` struct to know where to store or look for
the data. Generate the data using whatever Rust code you'd like, before the code that will be
measuring an operation.

### Collecting data

Your benchmark should create and use an instance of `BenchmarkRun` defined in `benchmarks/src/util/run.rs` as follows:

- Call its `start_new_case` method with a string that will appear in the "Query" column of the
compare output.
- Use `write_iter` to record elapsed times for the behavior you're benchmarking.
- When all cases are done, call the `BenchmarkRun`'s `maybe_write_json` method, giving it the value
of the `--output` structopt field on `RunOpt`.

# Benchmarks

The output of `dfbench` help includes a description of each benchmark, which is reproduced here for convenience
Expand Down
18 changes: 10 additions & 8 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,17 +258,19 @@ pub(super) async fn exec_and_print(
let mut stream = execute_stream(physical_plan, task_ctx.clone())?;
let mut results = vec![];
let mut row_count = 0_usize;
let max_rows = match print_options.maxrows {
MaxRows::Unlimited => usize::MAX,
MaxRows::Limited(n) => n,
};
while let Some(batch) = stream.next().await {
let batch = batch?;
let curr_num_rows = batch.num_rows();
if let MaxRows::Limited(max_rows) = print_options.maxrows {
// Stop collecting results if the number of rows exceeds the limit
// results batch should include the last batch that exceeds the limit
if row_count < max_rows + curr_num_rows {
// Try to grow the reservation to accommodate the batch in memory
reservation.try_grow(get_record_batch_memory_size(&batch))?;
results.push(batch);
}
// Stop collecting results if the number of rows exceeds the limit
// results batch should include the last batch that exceeds the limit
if row_count < max_rows + curr_num_rows {
// Try to grow the reservation to accommodate the batch in memory
reservation.try_grow(get_record_batch_memory_size(&batch))?;
results.push(batch);
}
row_count += curr_num_rows;
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async fn csv_opener() -> Result<()> {
.with_batch_size(8192)
.with_projection(&scan_config);

let opener = config.create_file_opener(Ok(object_store), &scan_config, 0)?;
let opener = config.create_file_opener(object_store, &scan_config, 0);

let mut result = vec![];
let mut stream =
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl Span {
/// the column a that comes from SELECT 1 AS a UNION ALL SELECT 2 AS a you'll
/// need two spans.
#[derive(Debug, Clone)]
// Store teh first [`Span`] on the stack because that is by far the most common
// Store the first [`Span`] on the stack because that is by far the most common
// case. More will spill onto the heap.
pub struct Spans(pub Vec<Span>);

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ pub trait FileSource: Send + Sync {
/// Creates a `dyn FileOpener` based on given parameters
fn create_file_opener(
&self,
object_store: datafusion_common::Result<Arc<dyn ObjectStore>>,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> datafusion_common::Result<Arc<dyn FileOpener>>;
) -> Arc<dyn FileOpener>;
/// Any
fn as_any(&self) -> &dyn Any;
/// Initialize new type with batch size configuration
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,14 @@ pub struct ArrowSource {
impl FileSource for ArrowSource {
fn create_file_opener(
&self,
object_store: Result<Arc<dyn ObjectStore>>,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn FileOpener>> {
Ok(Arc::new(ArrowOpener {
object_store: object_store?,
) -> Arc<dyn FileOpener> {
Arc::new(ArrowOpener {
object_store,
projection: base_config.file_column_projection_indices(),
}))
})
}

fn as_any(&self) -> &dyn Any {
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,23 +194,23 @@ impl FileSource for AvroSource {
#[cfg(feature = "avro")]
fn create_file_opener(
&self,
object_store: Result<Arc<dyn ObjectStore>>,
object_store: Arc<dyn ObjectStore>,
_base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn FileOpener>> {
Ok(Arc::new(private::AvroOpener {
) -> Arc<dyn FileOpener> {
Arc::new(private::AvroOpener {
config: Arc::new(self.clone()),
object_store: object_store?,
}))
object_store,
})
}

#[cfg(not(feature = "avro"))]
fn create_file_opener(
&self,
_object_store: Result<Arc<dyn ObjectStore>>,
_object_store: Arc<dyn ObjectStore>,
_base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn FileOpener>> {
) -> Arc<dyn FileOpener> {
panic!("Avro feature is not enabled in this build")
}

Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,15 +564,15 @@ impl CsvOpener {
impl FileSource for CsvSource {
fn create_file_opener(
&self,
object_store: Result<Arc<dyn ObjectStore>>,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn FileOpener>> {
Ok(Arc::new(CsvOpener {
) -> Arc<dyn FileOpener> {
Arc::new(CsvOpener {
config: Arc::new(self.clone()),
file_compression_type: base_config.file_compression_type,
object_store: object_store?,
}))
object_store,
})
}

fn as_any(&self) -> &dyn Any {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ impl DataSource for FileScanConfig {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let object_store = context.runtime_env().object_store(&self.object_store_url);
let object_store = context.runtime_env().object_store(&self.object_store_url)?;

let source = self
.source
.with_batch_size(context.session_config().batch_size())
.with_schema(Arc::clone(&self.file_schema))
.with_projection(self);

let opener = source.create_file_opener(object_store, self, partition)?;
let opener = source.create_file_opener(object_store, self, partition);

let stream = FileStream::new(self, partition, opener, source.metrics())?;
Ok(Box::pin(stream))
Expand Down
Loading

0 comments on commit bcdebc1

Please sign in to comment.