Skip to content

Commit

Permalink
Add range table function (#14830)
Browse files Browse the repository at this point in the history
* extract name

* extract inclusive

* range table function
  • Loading branch information
simonvandel authored Feb 24, 2025
1 parent e799097 commit a235276
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 21 deletions.
97 changes: 83 additions & 14 deletions datafusion/functions-table/src/generate_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,19 @@ use std::sync::Arc;
#[derive(Debug, Clone)]
enum GenSeriesArgs {
/// ContainsNull signifies that at least one argument(start, end, step) was null, thus no series will be generated.
ContainsNull,
ContainsNull {
include_end: bool,
name: &'static str,
},
/// AllNotNullArgs holds the start, end, and step values for generating the series when all arguments are not null.
AllNotNullArgs { start: i64, end: i64, step: i64 },
AllNotNullArgs {
start: i64,
end: i64,
step: i64,
/// Indicates whether the end value should be included in the series.
include_end: bool,
name: &'static str,
},
}

/// Table that generates a series of integers from `start`(inclusive) to `end`(inclusive), incrementing by step
Expand All @@ -57,15 +67,26 @@ struct GenerateSeriesState {

/// Tracks current position when generating table
current: i64,
/// Indicates whether the end value should be included in the series.
include_end: bool,
name: &'static str,
}

impl GenerateSeriesState {
fn reach_end(&self, val: i64) -> bool {
if self.step > 0 {
return val > self.end;
if self.include_end {
return val > self.end;
} else {
return val >= self.end;
}
}

val < self.end
if self.include_end {
val < self.end
} else {
val <= self.end
}
}
}

Expand All @@ -74,8 +95,8 @@ impl fmt::Display for GenerateSeriesState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"generate_series: start={}, end={}, batch_size={}",
self.start, self.end, self.batch_size
"{}: start={}, end={}, batch_size={}",
self.name, self.start, self.end, self.batch_size
)
}
}
Expand Down Expand Up @@ -124,21 +145,31 @@ impl TableProvider for GenerateSeriesTable {

let state = match self.args {
// if args have null, then return 0 row
GenSeriesArgs::ContainsNull => GenerateSeriesState {
GenSeriesArgs::ContainsNull { include_end, name } => GenerateSeriesState {
schema: self.schema.clone(),
start: 0,
end: 0,
step: 1,
current: 1,
batch_size,
include_end,
name,
},
GenSeriesArgs::AllNotNullArgs { start, end, step } => GenerateSeriesState {
GenSeriesArgs::AllNotNullArgs {
start,
end,
step,
include_end,
name,
} => GenerateSeriesState {
schema: self.schema.clone(),
start,
end,
step,
current: start,
batch_size,
include_end,
name,
},
};

Expand All @@ -150,12 +181,15 @@ impl TableProvider for GenerateSeriesTable {
}

#[derive(Debug)]
pub struct GenerateSeriesFunc {}
struct GenerateSeriesFuncImpl {
name: &'static str,
include_end: bool,
}

impl TableFunctionImpl for GenerateSeriesFunc {
impl TableFunctionImpl for GenerateSeriesFuncImpl {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
if exprs.is_empty() || exprs.len() > 3 {
return plan_err!("generate_series function requires 1 to 3 arguments");
return plan_err!("{} function requires 1 to 3 arguments", self.name);
}

let mut normalize_args = Vec::new();
Expand All @@ -177,7 +211,10 @@ impl TableFunctionImpl for GenerateSeriesFunc {
// contain null
return Ok(Arc::new(GenerateSeriesTable {
schema,
args: GenSeriesArgs::ContainsNull,
args: GenSeriesArgs::ContainsNull {
include_end: self.include_end,
name: self.name,
},
}));
}

Expand All @@ -186,7 +223,7 @@ impl TableFunctionImpl for GenerateSeriesFunc {
[start, end] => (*start, *end, 1),
[start, end, step] => (*start, *end, *step),
_ => {
return plan_err!("generate_series function requires 1 to 3 arguments");
return plan_err!("{} function requires 1 to 3 arguments", self.name);
}
};

Expand All @@ -204,7 +241,39 @@ impl TableFunctionImpl for GenerateSeriesFunc {

Ok(Arc::new(GenerateSeriesTable {
schema,
args: GenSeriesArgs::AllNotNullArgs { start, end, step },
args: GenSeriesArgs::AllNotNullArgs {
start,
end,
step,
include_end: self.include_end,
name: self.name,
},
}))
}
}

#[derive(Debug)]
pub struct GenerateSeriesFunc {}

impl TableFunctionImpl for GenerateSeriesFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let impl_func = GenerateSeriesFuncImpl {
name: "generate_series",
include_end: true,
};
impl_func.call(exprs)
}
}

#[derive(Debug)]
pub struct RangeFunc {}

impl TableFunctionImpl for RangeFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let impl_func = GenerateSeriesFuncImpl {
name: "range",
include_end: false,
};
impl_func.call(exprs)
}
}
3 changes: 2 additions & 1 deletion datafusion/functions-table/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::sync::Arc;

/// Returns all default table functions
pub fn all_default_table_functions() -> Vec<Arc<TableFunction>> {
vec![generate_series()]
vec![generate_series(), range()]
}

/// Creates a singleton instance of a table function
Expand All @@ -55,3 +55,4 @@ macro_rules! create_udtf_function {
}

create_udtf_function!(generate_series::GenerateSeriesFunc, "generate_series");
create_udtf_function!(generate_series::RangeFunc, "range");
128 changes: 122 additions & 6 deletions datafusion/sqllogictest/test_files/table_functions.slt
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ SELECT SUM(v1) FROM generate_series(1, 5) t1(v1)
query I
SELECT * FROM generate_series(6, -1, -2)
----
6
4
2
0
6
4
2
0

query I
SELECT * FROM generate_series(6, 66, 666)
----
6
6



Expand Down Expand Up @@ -120,7 +120,7 @@ SELECT v1 + 10 FROM (SELECT * FROM generate_series(1, 3) t1(v1))

# Test generate_series with JOIN
query II rowsort
SELECT a.v1, b.v1
SELECT a.v1, b.v1
FROM generate_series(1, 3) a(v1)
JOIN generate_series(2, 4) b(v1)
ON a.v1 = b.v1 - 1
Expand Down Expand Up @@ -187,3 +187,119 @@ SELECT generate_series(1, t1.end) FROM generate_series(3, 5) as t1(end)
[1, 2, 3, 4, 5]
[1, 2, 3, 4]
[1, 2, 3]

# Test range table function
query I
SELECT * FROM range(6)
----
0
1
2
3
4
5



query I rowsort
SELECT * FROM range(1, 5)
----
1
2
3
4

query I rowsort
SELECT * FROM range(1, 1)
----

query I rowsort
SELECT * FROM range(3, 6)
----
3
4
5

# #generated_data > batch_size
query I
SELECT count(v1) FROM range(-66666,66666) t1(v1)
----
133332

query I rowsort
SELECT SUM(v1) FROM range(1, 5) t1(v1)
----
10

query I
SELECT * FROM range(6, -1, -2)
----
6
4
2
0

query I
SELECT * FROM range(6, 66, 666)
----
6



#
# Test range with null arguments
#

query I
SELECT * FROM range(NULL, 5)
----

query I
SELECT * FROM range(1, NULL)
----

query I
SELECT * FROM range(NULL, NULL)
----

query I
SELECT * FROM range(1, 5, NULL)
----


query TT
EXPLAIN SELECT * FROM range(1, 5)
----
logical_plan TableScan: tmp_table projection=[value]
physical_plan LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]

#
# Test range with invalid arguments
#

query error DataFusion error: Error during planning: start is bigger than end, but increment is positive: cannot generate infinite series
SELECT * FROM range(5, 1)

query error DataFusion error: Error during planning: start is smaller than end, but increment is negative: cannot generate infinite series
SELECT * FROM range(-6, 6, -1)

query error DataFusion error: Error during planning: step cannot be zero
SELECT * FROM range(-6, 6, 0)

query error DataFusion error: Error during planning: start is bigger than end, but increment is positive: cannot generate infinite series
SELECT * FROM range(6, -6, 1)


statement error DataFusion error: Error during planning: range function requires 1 to 3 arguments
SELECT * FROM range(1, 2, 3, 4)


statement error DataFusion error: Error during planning: First argument must be an integer literal
SELECT * FROM range('foo', 'bar')

# UDF and UDTF `range` can be used simultaneously
query ? rowsort
SELECT range(1, t1.end) FROM range(3, 5) as t1(end)
----
[1, 2, 3]
[1, 2]

0 comments on commit a235276

Please sign in to comment.