Skip to content

Commit

Permalink
remove cyclical dev-dependency on core
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS committed Feb 24, 2025
1 parent 33f7710 commit d571cba
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 57 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion datafusion/datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ xz2 = { version = "0.1", optional = true, features = ["static"] }
zstd = { version = "0.13", optional = true, default-features = false }

[dev-dependencies]
datafusion = { workspace = true, default-features = true }
tempfile = { workspace = true }

[lints]
Expand Down
40 changes: 34 additions & 6 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,15 +1017,43 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {

#[cfg(test)]
mod tests {
use crate::test_util::MockSource;
use crate::{test_util::MockSource, tests::aggr_test_schema};

use super::*;
use arrow::array::{Int32Array, RecordBatch};
use datafusion::physical_planner::create_physical_sort_expr;
use datafusion::{test::columns, test_util::aggr_test_schema};
use datafusion_common::assert_batches_eq;
use arrow::{
array::{Int32Array, RecordBatch},
compute::SortOptions,
};

use datafusion_common::{assert_batches_eq, DFSchema};
use datafusion_expr::{execution_props::ExecutionProps, SortExpr};
use datafusion_physical_expr::create_physical_expr;
use std::collections::HashMap;

fn create_physical_sort_expr(
e: &SortExpr,
input_dfschema: &DFSchema,
execution_props: &ExecutionProps,
) -> Result<PhysicalSortExpr> {
let SortExpr {
expr,
asc,
nulls_first,
} = e;
Ok(PhysicalSortExpr {
expr: create_physical_expr(expr, input_dfschema, execution_props)?,
options: SortOptions {
descending: !asc,
nulls_first: *nulls_first,
},
})
}

/// Returns the column names on the schema
pub fn columns(schema: &Schema) -> Vec<String> {
schema.fields().iter().map(|f| f.name().clone()).collect()
}

#[test]
fn physical_plan_config_no_projection() {
let file_schema = aggr_test_schema();
Expand Down Expand Up @@ -1375,7 +1403,7 @@ mod tests {
name: &'static str,
file_schema: Schema,
files: Vec<File>,
sort: Vec<datafusion_expr::SortExpr>,
sort: Vec<SortExpr>,
expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
}

Expand Down
18 changes: 7 additions & 11 deletions datafusion/datasource/src/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,18 +523,16 @@ impl FileStreamMetrics {
#[cfg(test)]
mod tests {
use crate::file_scan_config::FileScanConfig;
use crate::tests::make_partition;
use crate::PartitionedFile;
use arrow::error::ArrowError;
use datafusion_common::error::Result;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{FutureExt as _, StreamExt as _};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::prelude::SessionContext;
use datafusion::test::{make_partition, object_store::register_test_store};

use crate::file_meta::FileMeta;
use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
use crate::test_util::MockSource;
Expand Down Expand Up @@ -641,17 +639,15 @@ mod tests {
.map(|batch| batch.schema())
.unwrap_or_else(|| Arc::new(Schema::empty()));

let ctx = SessionContext::new();
// let ctx = SessionContext::new();
let mock_files: Vec<(String, u64)> = (0..self.num_files)
.map(|idx| (format!("mock_file{idx}"), 10_u64))
.collect();

let mock_files_ref: Vec<(&str, u64)> = mock_files
.iter()
.map(|(name, size)| (name.as_str(), *size))
.collect();

register_test_store(&ctx, &mock_files_ref);
// let mock_files_ref: Vec<(&str, u64)> = mock_files
// .iter()
// .map(|(name, size)| (name.as_str(), *size))
// .collect();

let file_group = mock_files
.into_iter()
Expand Down
40 changes: 3 additions & 37 deletions datafusion/datasource/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,28 +765,17 @@ mod memory_source_tests {

#[cfg(test)]
mod tests {
use crate::tests::{aggr_test_schema, make_partition};

use super::*;
use arrow::array::{ArrayRef, Int32Array};

use datafusion_physical_plan::expressions::lit;
use std::collections::HashMap;

use arrow::datatypes::{DataType, Field};
use datafusion_common::assert_batches_eq;
use datafusion_common::stats::{ColumnStatistics, Precision};
use futures::StreamExt;

// Return a RecordBatch with a single Int32 array with values (0..sz) in a field named "i"
pub fn make_partition(sz: i32) -> RecordBatch {
let seq_start = 0;
let seq_end = sz;
let values = (seq_start..seq_end).collect::<Vec<_>>();
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let arr = Arc::new(Int32Array::from(values));
let arr = arr as ArrayRef;

RecordBatch::try_new(schema, vec![arr]).unwrap()
}

#[tokio::test]
async fn exec_with_limit() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
Expand All @@ -813,29 +802,6 @@ mod tests {
Ok(())
}

/// Get the schema for the aggregate_test_* csv files
pub fn aggr_test_schema() -> SchemaRef {
let mut f1 = Field::new("c1", DataType::Utf8, false);
f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
let schema = Schema::new(vec![
f1,
Field::new("c2", DataType::UInt32, false),
Field::new("c3", DataType::Int8, false),
Field::new("c4", DataType::Int16, false),
Field::new("c5", DataType::Int32, false),
Field::new("c6", DataType::Int64, false),
Field::new("c7", DataType::UInt8, false),
Field::new("c8", DataType::UInt16, false),
Field::new("c9", DataType::UInt32, false),
Field::new("c10", DataType::UInt64, false),
Field::new("c11", DataType::Float32, false),
Field::new("c12", DataType::Float64, false),
Field::new("c13", DataType::Utf8, false),
]);

Arc::new(schema)
}

#[tokio::test]
async fn values_empty_case() -> Result<()> {
let schema = aggr_test_schema();
Expand Down
40 changes: 39 additions & 1 deletion datafusion/datasource/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,51 @@ impl From<ObjectMeta> for PartitionedFile {
#[cfg(test)]
mod tests {
use super::ListingTableUrl;
use arrow::{
array::{ArrayRef, Int32Array, RecordBatch},
datatypes::{DataType, Field, Schema, SchemaRef},
};
use datafusion_execution::object_store::{
DefaultObjectStoreRegistry, ObjectStoreRegistry,
};
use object_store::{local::LocalFileSystem, path::Path};
use std::{ops::Not, sync::Arc};
use std::{collections::HashMap, ops::Not, sync::Arc};
use url::Url;

/// Return a RecordBatch with a single Int32 array with values (0..sz) in a field named "i"
pub fn make_partition(sz: i32) -> RecordBatch {
let seq_start = 0;
let seq_end = sz;
let values = (seq_start..seq_end).collect::<Vec<_>>();
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
let arr = Arc::new(Int32Array::from(values));

RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
}

/// Get the schema for the aggregate_test_* csv files
pub fn aggr_test_schema() -> SchemaRef {
let mut f1 = Field::new("c1", DataType::Utf8, false);
f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
let schema = Schema::new(vec![
f1,
Field::new("c2", DataType::UInt32, false),
Field::new("c3", DataType::Int8, false),
Field::new("c4", DataType::Int16, false),
Field::new("c5", DataType::Int32, false),
Field::new("c6", DataType::Int64, false),
Field::new("c7", DataType::UInt8, false),
Field::new("c8", DataType::UInt16, false),
Field::new("c9", DataType::UInt32, false),
Field::new("c10", DataType::UInt64, false),
Field::new("c11", DataType::Float32, false),
Field::new("c12", DataType::Float64, false),
Field::new("c13", DataType::Utf8, false),
]);

Arc::new(schema)
}

#[test]
fn test_object_store_listing_url() {
let listing = ListingTableUrl::parse("file:///").unwrap();
Expand Down

0 comments on commit d571cba

Please sign in to comment.