Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: metadata columns #14057

Open
wants to merge 22 commits into
base: main
Choose a base branch
from

Conversation

chenkovsky
Copy link

Which issue does this PR close?

Closes #13975.

Rationale for this change

many databases support pseudo columns, for example, file_path, file_name, file_size, rowid.
for pseudo columns, we don't want to get them by default, but we want to be able to use them explicitly.

for the database that supports rowid. select * from tb won't return rowid. but we can get rowid by select rowid, * from tb. spark has already supported metadata columns. this PR want to support it in datafusion.

What changes are included in this PR?

  • add an API in table provider that will return metadata column schema.
  • change DFSchema add metadata column.
  • change logical plan e.g. TableScan to support it.

Are these changes tested?

Unit test is added

Are there any user-facing changes?

No

For FFI table provider API, one function that returns metadata column is added.

@github-actions github-actions bot added logical-expr Logical plan and expressions optimizer Optimizer rules core Core DataFusion crate catalog Related to the catalog crate common Related to common crate labels Jan 9, 2025
return metadata.qualified_field(i - self.inner.len());
}
}
self.inner.qualified_field(i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better not to mix inner field and meta field?

maybe we need another method meta_field(&self, i: usize)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually implementing another method was my first attempt. but I found that I need to change a lot of code, because column index is used everywhere. that's why in currently implementation metadata column has index + len(fields).

Copy link
Contributor

@jayzhan211 jayzhan211 Jan 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't only where you need meta columns you need to change the code with meta_field? Others code that call with field remain the same.

The downside of the current approach is that whenever the schema is changed, the index of meta columns need to adjust too. I think this is error prone. Minimize the dependency of meta schema and schema is better

Copy link
Author

@chenkovsky chenkovsky Jan 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. it's error prone. Can we change the offsets of metadata columns, e.g. (-1 as usize) (-2 as usize) then there's no such problem. I see some databases use this trick.

Isn't only where you need meta columns you need to change the code with meta_field? Others code that call with field remain the same.

yes, we can. but many apis use Vec to represent columns. I have to change many structs and method defnitions to pass extra parameters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(-1 as usize) how does this large offset work? We have vector instead of map

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jayzhan211 I pushed a commit, could you please review it again?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay this approach looks good to me.

.collect()
let mut fields: Vec<&Field> = self.inner.fields_with_unqualified_name(name);
if let Some(schema) = self.metadata_schema() {
fields.append(&mut schema.fields_with_unqualified_name(name));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fields.append(&mut schema.fields_with_unqualified_name(name));
fields.append(schema.fields_with_unqualified_name(name));

let mut fields: Vec<(Option<&TableReference>, &Field)> =
self.inner.qualified_fields_with_unqualified_name(name);
if let Some(schema) = self.metadata_schema() {
fields.append(&mut schema.qualified_fields_with_unqualified_name(name));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fields.append(&mut schema.qualified_fields_with_unqualified_name(name));
fields.append(schema.qualified_fields_with_unqualified_name(name));

return (
Some(table_name.clone()),
Arc::new(
metadata.field(*i - METADATA_OFFSET).clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle where i < METADATA_OFFSET

Copy link
Contributor

@jayzhan211 jayzhan211 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, wait others to review this

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @chenkovsky and @jayzhan211 -- this is a neat feature and I think has also been asked for before 💯

Also, I think the code is well structured and tested.

Before we merge this PR I think we need

  1. a test for more than one metadata column
  2. ensure this doesn't slow down planning (I will run benchmarks and report back)

I would strongly recommend we do in this PR (but could do as a follow on)

  1. More documentation (to help others and our future selves use it)
  2. Change the test to use assert_batches_eq

&self.inner.schema
}

pub fn with_metadata_schema(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please document these APIs

@@ -55,6 +55,11 @@ pub trait TableProvider: Debug + Sync + Send {
/// Get a reference to the schema for this table
fn schema(&self) -> SchemaRef;

/// Get metadata columns of this table.
fn metadata_columns(&self) -> Option<SchemaRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please document this better -- specifically:

  1. A link to the prior art (spark metadata columns)
  2. A brief summary of what metadata columns are used for and an example (you can copy the content from the spark docs)

metadata: Option<QualifiedSchema>,
}

pub const METADATA_OFFSET: usize = usize::MAX >> 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please document what this is and how it relates to DFSchema::inner

inner: QualifiedSchema,
/// Stores functional dependencies in the schema.
functional_dependencies: FunctionalDependencies,
/// metadata columns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide more documentation here to document what these are (perhaps adding a link to the higher level description you write on TableProvider::metadata_columns)

pub const METADATA_OFFSET: usize = usize::MAX >> 1;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QualifiedSchema {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document what this struct is used for

}
}

pub fn metadata_schema(&self) -> &Option<QualifiedSchema> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add documentation -- imagine you are someone using this API and are not familar with metadata_schema or the content of this API. I think you would want a short summary of what this is and then a link to the full details

use datafusion_common::METADATA_OFFSET;
use itertools::Itertools;

/// A User, with an id and a bank account
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is is actually quite a cool example of using metadata index

Eventually I think it would be great to add an example in https://github.com/apache/datafusion/tree/main/datafusion-examples

.unwrap();
let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap();
assert_eq!(batch.num_rows(), 2);
let serializer = CsvSerializer::new().with_header(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To check the results, can you please use assert_batches_eq instead of converting to CSV?

That is

  1. more consistent with the rest of the codebase
  2. easier to read
  3. easier to update

For example:

let expected = vec![
"+----+----+",
"| c1 | c2 |",
"+----+----+",
"| 1 | 1 |",
"| 1 | 2 |",
"| 1 | 3 |",
"| 1 | 4 |",
"| 1 | 5 |",
"| 1 | 6 |",
"| 1 | 7 |",
"| 1 | 8 |",
"| 1 | 9 |",
"| 1 | 10 |",
"| 2 | 1 |",
"| 2 | 2 |",
"| 2 | 3 |",
"| 2 | 4 |",
"| 2 | 5 |",
"| 2 | 6 |",
"| 2 | 7 |",
"| 2 | 8 |",
"| 2 | 9 |",
"| 2 | 10 |",
"+----+----+",
];
assert_batches_sorted_eq!(expected, &results);

let all_batchs = df5.collect().await.unwrap();
let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap();
let bytes = serializer.serialize(batch, true).unwrap();
assert_eq!(bytes, "1,2\n");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please also add a test for more than one metadata column?

@alamb
Copy link
Contributor

alamb commented Jan 12, 2025

Something other people have asked for in the past (whihc I can't find now) is the ability to know what file a particular row came from in a listing table that combines multiple files

Update: I found it at #8906

To be clear I think this PR would enable selecting a subset of files, as described on #8906 (comment)

@adriangb
Copy link
Contributor

We want this as well to hide "special" internal columns we create to speed up JSON columns. +1 for the feature!

@adriangb
Copy link
Contributor

My only question is if "metadata" is the right name for these columns? Could it be "system" columns or something like that?

@Omega359
Copy link
Contributor

Metadata column is the name I'm familiar with in other systems. For example, spark/databricks

@adriangb
Copy link
Contributor

I guess the naming doesn't really hurt our use case so okay let's go with that if it means something in the domain in general 👍🏻

@alamb
Copy link
Contributor

alamb commented Jan 13, 2025

FWIW I ran the planning benchmarks on this branch and see no measurable difference. ✅

++ critcmp main feature_metadata_columns
group                            feature_metadata_columns               main
-----                            ------------------------               ----
logical_select_all_from_1000     1.00      5.2±0.03ms        ? ?/sec    1.01      5.3±0.04ms        ? ?/sec
physical_plan_clickbench_all     1.00    226.7±1.82ms        ? ?/sec    1.00    227.1±1.40ms        ? ?/sec
physical_plan_tpcds_all          1.00   1380.0±3.29ms        ? ?/sec    1.00   1378.8±5.32ms        ? ?/sec
physical_plan_tpch_all           1.00     90.2±0.70ms        ? ?/sec    1.01     91.1±1.32ms        ? ?/sec
physical_select_all_from_1000    1.02     42.4±0.30ms        ? ?/sec    1.00     41.5±0.16ms        ? ?/sec

@berkaysynnada
Copy link
Contributor

Can these metadata columns utilize normal column properties, like ordering equivalences, constantness, distinctness etc.? For example, AFAIU rowid is an ordered column, and if I sort the table by rowid, the SortExec would be removed? (it seems to me not yet at this point) Can we iterate over the design to support those capabilities, too?

@alamb
Copy link
Contributor

alamb commented Jan 15, 2025

Can these metadata columns utilize normal column properties, like ordering equivalences, constantness, distinctness etc.? For example, AFAIU rowid is an ordered column, and if I sort the table by rowid, the SortExec would be removed? (it seems to me not yet at this point) Can we iterate over the design to support those capabilities, too?

I with this PR a custom table provider that was ordered by row_id could communicate that information to avoid a SortExec

From what I can tell, the metadata columns is only a notion in the LogicalPlan

Specifically, the ExecutionPlan returned by the provider is no different than any other ExecutionPlan so it can communicate sortedness via ExecutionPlan::properties as normal

@berkaysynnada
Copy link
Contributor

Can these metadata columns utilize normal column properties, like ordering equivalences, constantness, distinctness etc.? For example, AFAIU rowid is an ordered column, and if I sort the table by rowid, the SortExec would be removed? (it seems to me not yet at this point) Can we iterate over the design to support those capabilities, too?

I with this PR a custom table provider that was ordered by row_id could communicate that information to avoid a SortExec

From what I can tell, the metadata columns is only a notion in the LogicalPlan

Specifically, the ExecutionPlan returned by the provider is no different than any other ExecutionPlan so it can communicate sortedness via ExecutionPlan::properties as normal

What I mean is:

https://github.com/chenkovsky/datafusion/blob/5c4b5c4c7aee47b6287e5fcf32d87485ee1c9e37/datafusion/core/tests/sql/metadata_columns.rs#L389

When I print this query, there exists a SortExec for _rowid. But what I understand is _rowid should be a one-by-one increasing column?

@chenkovsky
Copy link
Author

Can these metadata columns utilize normal column properties, like ordering equivalences, constantness, distinctness etc.? For example, AFAIU rowid is an ordered column, and if I sort the table by rowid, the SortExec would be removed? (it seems to me not yet at this point) Can we iterate over the design to support those capabilities, too?

I with this PR a custom table provider that was ordered by row_id could communicate that information to avoid a SortExec
From what I can tell, the metadata columns is only a notion in the LogicalPlan
Specifically, the ExecutionPlan returned by the provider is no different than any other ExecutionPlan so it can communicate sortedness via ExecutionPlan::properties as normal

What I mean is:

https://github.com/chenkovsky/datafusion/blob/5c4b5c4c7aee47b6287e5fcf32d87485ee1c9e37/datafusion/core/tests/sql/metadata_columns.rs#L389

When I print this query, there exists a SortExec for _rowid. But what I understand is _rowid should be a one-by-one increasing column?

Maybe not, I use vec to store values in test, but if the inner datastructure is btree, the scan order is not always increasing.

@chenkovsky
Copy link
Author

chenkovsky commented Feb 7, 2025

@chenkovsky the main difference between the two approaches is how to transmit the information on which columns are system columns and which aren't. The approach in this PR does it explicitly by modifying DFSchema, TableProvider and a couple other spots and also manipulating the meaning of field indexes in DFSchema. The approach in #14362 does it by adding metadata to Field. They both work but each have pros and cons.

How about we check with @alamb, @Omega359 and @jayzhan211 what they think sounds best?

Either way I still think we should name these system columns not metadata columns to avoid confusion with DFScheama::metadata_schema and DFSchema::metadata meaning two very different things, etc.

@adriangb

of course, I want to listen other's opinions. and I also think name is a small thing. changing to system column is also ok.

besides _rowid save and load problem. before compare pros and cons, would you mind to add some tests about stopping system column propagation? I haven't seen them on your branch?

@adriangb
Copy link
Contributor

adriangb commented Feb 7, 2025

would you mind to add some tests about stopping system column propagation? I haven't seen them on your branch?

have you seen these?

#[tokio::test]
async fn test_system_column_with_cte() {
let ctx = setup_test_context().await;
// System columns not available after CTE
let select = r"
WITH cte AS (SELECT * FROM test)
SELECT _rowid FROM cte
";
assert!(ctx.sql(select).await.is_err());
// Explicitly selected system columns become regular columns
let select = r"
WITH cte AS (SELECT id, _rowid FROM test)
SELECT * FROM cte
";
let df = ctx.sql(select).await.unwrap();
let batches = df.collect().await.unwrap();
#[rustfmt::skip]
let expected = [
"+----+--------+",
"| id | _rowid |",
"+----+--------+",
"| 1 | 0 |",
"| 2 | 1 |",
"| 3 | 2 |",
"+----+--------+",
];
assert_batches_sorted_eq!(expected, &batches);
}
#[tokio::test]
async fn test_system_column_in_subquery() {
let ctx = setup_test_context().await;
// System columns not available in subquery
let select = r"
SELECT _rowid FROM (SELECT * FROM test)
";
assert!(ctx.sql(select).await.is_err());
// Explicitly selected system columns become regular columns
let select = r"
SELECT * FROM (SELECT id, _rowid FROM test)
";
let df = ctx.sql(select).await.unwrap();
let batches = df.collect().await.unwrap();
#[rustfmt::skip]
let expected = [
"+----+--------+",
"| id | _rowid |",
"+----+--------+",
"| 1 | 0 |",
"| 2 | 1 |",
"| 3 | 2 |",
"+----+--------+",
];
assert_batches_sorted_eq!(expected, &batches);
}

@chenkovsky
Copy link
Author

chenkovsky commented Feb 7, 2025

would you mind to add some tests about stopping system column propagation? I haven't seen them on your branch?

have you seen these?

#[tokio::test]
async fn test_system_column_with_cte() {
let ctx = setup_test_context().await;
// System columns not available after CTE
let select = r"
WITH cte AS (SELECT * FROM test)
SELECT _rowid FROM cte
";
assert!(ctx.sql(select).await.is_err());
// Explicitly selected system columns become regular columns
let select = r"
WITH cte AS (SELECT id, _rowid FROM test)
SELECT * FROM cte
";
let df = ctx.sql(select).await.unwrap();
let batches = df.collect().await.unwrap();
#[rustfmt::skip]
let expected = [
"+----+--------+",
"| id | _rowid |",
"+----+--------+",
"| 1 | 0 |",
"| 2 | 1 |",
"| 3 | 2 |",
"+----+--------+",
];
assert_batches_sorted_eq!(expected, &batches);
}
#[tokio::test]
async fn test_system_column_in_subquery() {
let ctx = setup_test_context().await;
// System columns not available in subquery
let select = r"
SELECT _rowid FROM (SELECT * FROM test)
";
assert!(ctx.sql(select).await.is_err());
// Explicitly selected system columns become regular columns
let select = r"
SELECT * FROM (SELECT id, _rowid FROM test)
";
let df = ctx.sql(select).await.unwrap();
let batches = df.collect().await.unwrap();
#[rustfmt::skip]
let expected = [
"+----+--------+",
"| id | _rowid |",
"+----+--------+",
"| 1 | 0 |",
"| 2 | 1 |",
"| 3 | 2 |",
"+----+--------+",
];
assert_batches_sorted_eq!(expected, &batches);
}

so for _row_id save load problem. so in your implementation "a system column stops being a system column once it's projected" ?

for stopping system column propagation, have you tested other logical plans e.g. union intersect?

@jayzhan211
Copy link
Contributor

@chenkovsky You mentioned that the issues of #14362 are 1) duplicated field issues 2) HashMap

I think overall datafusion only care about the system columns that are generated by datafusion, other system columns from other engine should be considered normal columns, but since this is just based on my guess not from any practical experience, is there any concern of this assumption?

For HashMap, I don't think it has performance issue since we only check boolean from it and we don't need to access it frequently given the field should be fixed once created.

@chenkovsky
Copy link
Author

chenkovsky commented Feb 7, 2025

@chenkovsky You mentioned that the issues of #14362 are 1) duplicated field issues 2) HashMap

I think overall datafusion only care about the system columns that are generated by datafusion, other system columns from other engine should be considered normal columns, but since this is just based on my guess not from any practical experience, is there any concern of this assumption?

For HashMap, I don't think it has performance issue since we only check boolean from it and we don't need to access it frequently given the field should be fixed once created.

@jayzhan211

datafusion supports loading files, without such feature, of course, we don't need to take care about this. but with this feature we have to take care about this.

and could you please also see the discussion in #14362. we have more different opinions.

for example, system/metadata column propagation problem.

for _rowid save load problem. currently, data engineers have to write a with clause in #14362 . when using dataframe api, data engineers also have to take more care about metadata dict in #14362. I haven't seen such behavior in other systems. It adds a lot of burden to data engineers.

anyway, after #14362 adds more ut about stopping propagation, let's pros and cons.

@adriangb
Copy link
Contributor

adriangb commented Feb 7, 2025

for stopping system column propagation, have you tested other logical plans e.g. union intersect?

I have not tried constructing logical plans directly. My thought was that even if you do some sort of union you'll still have a projection. For example:

select *, _rowid
from t1
union all
select *, _rowid
from t2

In this case _rowid would come out as a "regular" column. If you did:

select *
from t1
union all
select *
from t2

You get no _rowid column in the output at all.

I'm sure you can create funky stuff by constructing a plan without a projection, but I don't think that happens in the real world even with the optimizer: there is always a projection in the first pass and only after that is evaluated would it get pushed down to a tablescan.

> SET datafusion.optimizer.max_passes = 0;
0 row(s) fetched.
Elapsed 0.001 seconds.

> create table t (x int, y int);
0 row(s) fetched.
Elapsed 0.002 seconds.

> explain
select *
from t;
+---------------+-----------------------------------------------+
| plan_type     | plan                                          |
+---------------+-----------------------------------------------+
| logical_plan  | Projection: t.x, t.y                          |
|               |   TableScan: t                                |
| physical_plan | MemoryExec: partitions=1, partition_sizes=[0] |
|               |                                               |
+---------------+-----------------------------------------------+
2 row(s) fetched.
Elapsed 0.006 seconds.

> explain
select x
from t;
+---------------+-----------------------------------------------+
| plan_type     | plan                                          |
+---------------+-----------------------------------------------+
| logical_plan  | Projection: t.x                               |
|               |   TableScan: t                                |
| physical_plan | MemoryExec: partitions=1, partition_sizes=[0] |
|               |                                               |
+---------------+-----------------------------------------------+

So I don't think we have to worry about a plan without a projection as long as a plan with a projection correctly evaluates wildcards.

@adriangb
Copy link
Contributor

adriangb commented Feb 7, 2025

for _rowid save load problem. currently, data engineers have to write a with clause in #14362

why do they have to write a with clause? It's not clear to me what operations we are talking about but if it's copying data from one table to another:

INSERT INTO t1
SELECT * FROM t2;

Will not include system columns from t2. If they do:

INSERT INTO t1
SELECT *, _rowid FROM t2;

Then yes this will attempt to insert _rowid into t1 which is maybe meaningless but if the user explicitly tried to do that... well we shouldn't stop them.

@chenkovsky
Copy link
Author

for _rowid save load problem. currently, data engineers have to write a with clause in #14362

why do they have to write a with clause? It's not clear to me what operations we are talking about but if it's copying data from one table to another:

INSERT INTO t1
SELECT * FROM t2;

Will not include system columns from t2. If they do:

INSERT INTO t1
SELECT *, _rowid FROM t2;

Then yes this will attempt to insert _rowid into t1 which is maybe meaningless but if the user explicitly tried to do that... well we shouldn't stop them.

I must say the premise again. just the scenario when data engineers use datafusion as compute engine and want to read and write parquet file. I know there's no problem when we do insert.

@chenkovsky
Copy link
Author

for stopping system column propagation, have you tested other logical plans e.g. union intersect?

I have not tried constructing logical plans directly. My thought was that even if you do some sort of union you'll still have a projection. For example:

select *, _rowid
from t1
union all
select *, _rowid
from t2

In this case _rowid would come out as a "regular" column. If you did:

select *
from t1
union all
select *
from t2

You get no _rowid column in the output at all.

I'm sure you can create funky stuff by constructing a plan without a projection, but I don't think that happens in the real world even with the optimizer: there is always a projection in the first pass and only after that is evaluated would it get pushed down to a tablescan.

> SET datafusion.optimizer.max_passes = 0;
0 row(s) fetched.
Elapsed 0.001 seconds.

> create table t (x int, y int);
0 row(s) fetched.
Elapsed 0.002 seconds.

> explain
select *
from t;
+---------------+-----------------------------------------------+
| plan_type     | plan                                          |
+---------------+-----------------------------------------------+
| logical_plan  | Projection: t.x, t.y                          |
|               |   TableScan: t                                |
| physical_plan | MemoryExec: partitions=1, partition_sizes=[0] |
|               |                                               |
+---------------+-----------------------------------------------+
2 row(s) fetched.
Elapsed 0.006 seconds.

> explain
select x
from t;
+---------------+-----------------------------------------------+
| plan_type     | plan                                          |
+---------------+-----------------------------------------------+
| logical_plan  | Projection: t.x                               |
|               |   TableScan: t                                |
| physical_plan | MemoryExec: partitions=1, partition_sizes=[0] |
|               |                                               |
+---------------+-----------------------------------------------+

So I don't think we have to worry about a plan without a projection as long as a plan with a projection correctly evaluates wildcards.

as I previously asked, in your implementation "a system column stops being a system column once it's projected" ?
If this is correct, then as you said there's no need to add more UTs.

I have to call out it seems that this behavior is incompatible with Spark. I know whether follow Spark's standard is another problem. but community should be aware of this.

@chenkovsky
Copy link
Author

chenkovsky commented Feb 8, 2025

as I previously asked, in your implementation "a system column stops being a system column once it's projected" ? If this is correct, then as you said there's no need to add more UTs.

I have to call out it seems that this behavior is incompatible with Spark. I know whether follow Spark's standard is another problem. but community should be aware of this.

I have to revoke my judgements for #14362 from metadata/system propagation side, because previously judgements are based on the assumption that difference between two approaches is just how to transmit the information, the goal is same. but it seems that it's not true. #14362 has own propagation rules. It's really hard for me to talk about a totally different thing. let's look pros and cons directly.

pros of this approach:

  1. dataframe api friendly. There's no chance to hurt themself for dataframe api users.
  2. Spark compatible. Spark has already been battle tested in many areas, it's design to be compatible with many different data sources and data sinks. So there's fewer unknown problems.

@jayzhan211
Copy link
Contributor

for stopping system column propagation, have you tested other logical plans e.g. union intersect?

I have not tried constructing logical plans directly. My thought was that even if you do some sort of union you'll still have a projection. For example:

select *, _rowid
from t1
union all
select *, _rowid
from t2

In this case _rowid would come out as a "regular" column. If you did:

select *
from t1
union all
select *
from t2

You get no _rowid column in the output at all.

I'm sure you can create funky stuff by constructing a plan without a projection, but I don't think that happens in the real world even with the optimizer: there is always a projection in the first pass and only after that is evaluated would it get pushed down to a tablescan.

> SET datafusion.optimizer.max_passes = 0;
0 row(s) fetched.
Elapsed 0.001 seconds.

> create table t (x int, y int);
0 row(s) fetched.
Elapsed 0.002 seconds.

> explain
select *
from t;
+---------------+-----------------------------------------------+
| plan_type     | plan                                          |
+---------------+-----------------------------------------------+
| logical_plan  | Projection: t.x, t.y                          |
|               |   TableScan: t                                |
| physical_plan | MemoryExec: partitions=1, partition_sizes=[0] |
|               |                                               |
+---------------+-----------------------------------------------+
2 row(s) fetched.
Elapsed 0.006 seconds.

> explain
select x
from t;
+---------------+-----------------------------------------------+
| plan_type     | plan                                          |
+---------------+-----------------------------------------------+
| logical_plan  | Projection: t.x                               |
|               |   TableScan: t                                |
| physical_plan | MemoryExec: partitions=1, partition_sizes=[0] |
|               |                                               |
+---------------+-----------------------------------------------+

So I don't think we have to worry about a plan without a projection as long as a plan with a projection correctly evaluates wildcards.

as I previously asked, in your implementation "a system column stops being a system column once it's projected" ?
If this is correct, then as you said there's no need to add more UTs.

I have to call out it seems that this behavior is incompatible with Spark. I know whether follow Spark's standard is another problem. but community should be aware of this.

Do you know the rationale for spark that why it doesn't consider projected columns as normal column?

@chenkovsky
Copy link
Author

chenkovsky commented Feb 8, 2025

Do you know the rationale for spark that why it doesn't consider projected columns as normal column?

If my memory is correct, it's also designed for dataframe api user. for withColumn clause, it's a project.
dataframe users often chain withColumn operators.

df.withColumn(...).withColumn(...)

If project cannot propagate metadata/system columns, it's very hard to use this chain.

I cannot tell are there any other differences between spark standard and #14362 's standard because most sqls will have at least one project and implementations are quite different.

@chenkovsky
Copy link
Author

chenkovsky commented Feb 8, 2025

Do you know the rationale for spark that why it doesn't consider projected columns as normal column?

I dont know why it doesn't consider metadata/system column as normal column.

from my opnion, normal column can be propagated automatically. but metadata/system column should have another propagate strategy, so putting them in different places is a correct way.

I don't know whether I have answered your question. make my statement more clear. I hope that I haven't confused you. @jayzhan211

if a system/metadata column is selected in project, after project, it's normal column now. every design should behavior similarly.
if a system/metadata column is not selected in project, after project, in spark system/metadata column's still a valid system/metadata column, but it's not that in #14362 . #14362 assumes projection always in the first pass to remove system/metadata columns. the use case of this design in spark is dataframe withColumn chain

@chenkovsky
Copy link
Author

for #14362 ,if we want to make it feasible, I think at least it should check metadata dict loaded from file for every format. (e.g. parquet), otherwise the whole system may be broken if it reads a file that contains magic meta. Is it right?

@jayzhan211
Copy link
Contributor

if a system/metadata column is not selected in project, after project, in spark system/metadata column's still a valid system/metadata column, but it's not that in #14362

Is this specific to Spark, or do most systems work this way? We should offer customizable options so users can choose their preference—unless this behavior is standard across most systems.

@chenkovsky
Copy link
Author

chenkovsky commented Feb 10, 2025

if a system/metadata column is not selected in project, after project, in spark system/metadata column's still a valid system/metadata column, but it's not that in #14362

Is this specific to Spark, or do most systems work this way? We should offer customizable options so users can choose their preference—unless this behavior is standard across most systems.

I agree with you, this should be customizable, although this feature makes dataframe api much easier to use. I think both in this PR and #14362 are customizable, but it's harder in #14362, because many behaviors depend on the assumption "there is always a projection in the first pass, projection will erase system/metadata column". @jayzhan211 could you please also see the UT I put in #14362. you can see that this assumption is not correct in dataframe api. and you can also see the rowid save load problem. when I save a table to csv, it will also save rowid into csv. no system will do like this.

@adriangb
Copy link
Contributor

adriangb commented Feb 10, 2025

when I save a table to csv, it will also save rowid into csv. no system will do like this.

My problem is with this statement. I don't think there's a universal definition and use case for "system columns". Spark has one. Postgres has another. Our system has another.

You use _rowid as an example. Is that the _rowid within a single file? Or is that the _rowid of the entire table (similar to Postgres' ctid)? I think it's reasonable for both to exist and for both to be considered system columns. The former does somewhat "loose" it's meaning when copied through a query from one file to another and it only really makes sense to generate it dynamically when reading a file. The latter could be copied from one file to another without issues.

In our case we use system columns to speed up access to JSON: we take a row with json data such as json_col: text = [{"a": 1, "b": "lorem"}, {"a": 2}] and split it into _lf__json_col__a: int = [1, 2] and __lf__json_col__b: text = ["lorem", null]. This is well known technique, it's basically what ClickHouse does. We write these to files (they are not dynamically generated) and want them to be treated as normal columns when reading/writing. We just don't want them to show up when a user does select *. Is this not a valid use case for system columns?

In Postgres:

ff=# create table test (x int);
CREATE TABLE
ff=# insert into test values (1);
INSERT 0 1
ff=# \copy (select ctid, * from test) TO 'out.csv' WITH (FORMAT CSV, HEADER);
COPY 1
ctid,x
"(0,1)",1

It's perfectly happy to copy a system column to a file, another table, etc.

My thought is to establish a piece of metadata marking a column as a system column with the implementation doing nothing beyond excluding them from select * unless they are explicitly included. That seems to me like a universally agreed upon thing to do with system columns. Anything else that is not part of a universal definition of a system column is IMO something that should be implemented system by system by rewriting logical plans, customizing reading and writing, etc. Having it as field metadata means this information should be accessible from most hook points in DataFusion.

@chenkovsky
Copy link
Author

chenkovsky commented Feb 10, 2025

when I save a table to csv, it will also save rowid into csv. no system will do like this.

My problem is with this statement. I don't think there's a universal definition and use case for "system columns". Spark has one. Postgres has another. Our system has another.

first, the assumption of projection is not correct in dataframe, right?

I agree with you. there's no universal definition.

But If I'm a logical plan implementer.

in this PR, if I want to support system column, I can create another logical plan. If I don't want to support system column. I even don't need to take care of what is system column.

But in #14362 , If I want to support system column, it's easy, because it's propagated automatically, but If I don't want to support system column, I have to take care to erase it, e.g. CopyTo. at least this is counterintuitive for me. Why do I have to take care of something I don't support?

for save files, maybe we can implmenent different logical plan for postgres and spark, or just set a switch flag. but if the assumption is not correct, the affected logical plan is not only CopyTo. maybe not only in logical plan, other places where schema is used will be impacted. it's leaked.

BTW, maybe CopyTo is much complex than a switch flag. But it's another story.

there are two types of system/metadata columns. feel free to correct me.

  1. still meaningful outside the system. e.g. _rowid, whehter export these columns to file are both ok.
    2.not meaningful outside the system. e.g. _file, especially in data lake, it will be changed easily(e.g. after compact operation), and only used for inner optimization. I think most data lakes will erase these columns when writing to files by default.

@chenkovsky
Copy link
Author

chenkovsky commented Feb 10, 2025

when I save a table to csv, it will also save rowid into csv. no system will do like this.

My problem is with this statement. I don't think there's a universal definition and use case for "system columns". Spark has one. Postgres has another. Our system has another.

sorry for my arbitrary conclusion,
I have to add a qualifier to it, "from my dataframe api experience". I learned a lot about postgres from you.

@alamb
Copy link
Contributor

alamb commented Feb 13, 2025

I am sorry I have not been following along this PR closely -- is it ready for a final review @chenkovsky and @adriangb ?

@chenkovsky
Copy link
Author

I am sorry I have not been following along this PR closely -- is it ready for a final review @chenkovsky and @adriangb ?

@alamb The discussion was very lively.

from my opinion, the pros of this PR are: dataframe api friendly, spark compatible, no need to worry about metadata/system column leakage. for #14362, it makes an assumption on project plan which is not correct for dataframe api, and it should check schema of user input file, otherwise the magic metadata dict will be loaded from file. and it should pay more attention to prevent metadata/system column leakage. in this PR, there are no such problems. In part I agree with @adriangb, If I have another choice, I won't use offset. But I think there is no better option.

we also want to hear your opinions.

@adriangb
Copy link
Contributor

adriangb commented Feb 13, 2025

Yep I agree with your summary @chenkovsky

@alamb , TLDR from my perspective is:

  1. There are two approaches, feat: metadata columns #14057 (this PR) and Support marking columns as system columns via Field's metadata #14362.
  2. Fundamentally it is not super clear how to define the feature set of a "system column". This PR approaches it from the view point of making it behave as similar as possible to Spark. Support marking columns as system columns via Field's metadata #14362 specifies that if a column has a certain key set in it's Field's metadata it will not be expanded in SELECT * (it currently does some other stuff with that metadata, e.g. disambiguating column name clashes, I'm ambivalent about that part). These approaches end up differing in several places. For example as @chenkovsky pointed out, in Support marking columns as system columns via Field's metadata #14362 if you add the metadata to a field and then write it to a file that metadata is preserved and that column will act as a system column if you do select * from 'file.parquet'. That's not necessarily a bad thing but as @chenkovsky has pointed out that differs from how Spark handles it's system columns.
  3. This PR requires somewhat invasive changes (my opinion) into DFSchema, including changing how field indexes work, which seemed a bit scary to me hence why I wanted to experiment an alternative approach in Support marking columns as system columns via Field's metadata #14362.

@alamb alamb mentioned this pull request Feb 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
catalog Related to the catalog crate common Related to common crate core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

metadata column support
6 participants