Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Push down expression evaluation in TableProviders #14993

Open
adriangb opened this issue Mar 3, 2025 · 17 comments
Open

Support Push down expression evaluation in TableProviders #14993

adriangb opened this issue Mar 3, 2025 · 17 comments

Comments

@adriangb
Copy link
Contributor

adriangb commented Mar 3, 2025

For the scenario of select expensive_thing(col1) from data I would like to pre-process (speed up) expensive_thing(col1).
The easiest way I can think of doing this is by pre-computing the expression and saving it as a column with a specific name like _expensive_thing__col1.
But then I have to hardcode this column into my schema, the hardcoded expressions need to be the same for every file, etc.

To me an ideal solution would be to push down the expression to the file reading level so that I can then check "does _some_other_expensive_expr__col38 exist in the file? if so read that, otherwise read col38 and compute the expression".

The tricky thing is I'd want to do this on a per-file level: depending on the data different expression/column combinations would be pre-computed; it's prohibitive to put them all in the schema that is shared amongst all files.

@adriangb
Copy link
Contributor Author

adriangb commented Mar 3, 2025

Argh I've asked this before and it's been answered: #7845 (comment)

@alamb
Copy link
Contributor

alamb commented Mar 5, 2025

I am reopening this ticket as I think it covers serveral important usecases (that are all subsets of @adriangb 's example of expensive_thing(col1) above

  • EXTRACT (minute from "EventDate"). For example, @gatesn mentions that the Vortex format may be able to evaluate this more quickly on the compressed format than extracting the full expression
  • struct_column["field_name"]: For example, extracting one field from a struct column -- in this case we could potentially update the json or parquet decoders to avoid materializing other fields (we would likely need more arrow-rs support too)

So a query might be

select EXTRACT (minute from "EventDate"),  SUM(something) 
FROM hits 
GROUP BY EXTRACT (minute from "EventDate");

Being able to evlauate the EXTRACT (minute from "EventDate") expression during the scan would be super helpful

One possibility here might be add an API to TableProvider similar to TableProvider::supports_filters_pushdown

maybe something like

/// Returns true for all `Expr` in `expr` that can be directly evaluated by the TableProvider
fn supports_expr_pushdown(
    &self,
    expr: &[&Expr],
) -> Result<Vec<bool>, DataFusionError>

This information would have to be threaded through to TableProvider::scan as well

(maybe it would be time to make TableProvider::scan_with_args 🤔 )

@alamb alamb changed the title Push down expression to files Support Push down expression evaluation in TableProviders Mar 5, 2025
@alamb
Copy link
Contributor

alamb commented Mar 5, 2025

Related blog from @gatesn

https://blog.spiraldb.com/what-if-we-just-didnt-decompress-it/

@adriangb
Copy link
Contributor Author

adriangb commented Mar 5, 2025

Being able to evaluate the EXTRACT (minute from "EventDate") expression during the scan would be super helpful

Maybe this is what you meant but my mind it's possible to do even better: instead of evaluating it during the scan, the file might even contain a pre-evaluated version of it. You can imagine something like a column called _computed_<hash of expr's SQL> so that you can read directly from (and use statistics of) a column called _computed_8has07fas98a (made up hash) instead of reading EventDate and computing EXTRACT (minute from "EventDate") on it. This becomes even more useful if you can use the statistics of the pre-computed expressions with a filter (e.g. where extract(minute from ts) = 1.

One note about this use case though: you can mostly achieve it today with some rewrite rules (you rewrite select EXTRACT (minute from "EventDate") from hits to select _computed_8has07fas98a as "EXTRACT (minute from \"EventDate ")" from hits. It's error prone and annoying though:

On top of these issues for a Variant type there is the issue that these columns can't vary file by file.
While this is crucial for a Variant type I think it can still be very helpful for other scenarios: if you are adding pre-computed columns as an optimization it might make sense to only compute them during an optimization / compaction pass. So you end up with some files that have the pre-computed column and some that don't. Blindly rewriting the expression to select _computed_8has07fas98a would by default result in incorrect results because SchemaAdapter would fill in nulls instead of computing the expression in real time. We worked around this by essentially forking SchemaAdapter and giving it the ability to generate columns from other columns instead of always filling them in with nulls, but that's very error prone and wonky.

One possibility here might be add an API to TableProvider similar to TableProvider::supports_filters_pushdown
maybe it would be time to make TableProvider::scan_with_args

Something like that sounds great to me!

A couple things to think about taking the example of select variant_get(json_col, 'key')::int from data:

Who is responsible for evaluating these expressions if they can vary on a per-file basis?
If the TableProvider says "yes, I can evaluate that expression" it is then responsible for doing the compute to evaluate it for every single file. If that data comes from a shredded column that makes sense, it's cheap. But if it has to start deserializing the Variant value column it's going to get expensive. Maybe that's not an issue but I did want to point out that it blurs the lines of where IO happens and where compute happens. If this is a problem I think it would complicate the API substantially.

What expression does the TableProvider get passed? In this example if could be variant_get(json_col, 'key') or variant_get(json_col, 'key')::int. I'm guessing it's the former, and the same rules as TableProvider::supports_filters_pushdown apply.

That said I think the best path forward is likely to prototype something in a PR and go from there 😄

@alamb
Copy link
Contributor

alamb commented Mar 5, 2025

Maybe this is what you meant but my mind it's possible to do even better: instead of evaluating it during the scan, the file might even contain a pre-evaluated version of it.

Yes indeed ! great point

Who is responsible for evaluating these expressions if they can vary on a per-file basis?
If the TableProvider says "yes, I can evaluate that expression" it is then responsible for doing the compute to evaluate it for every single file.

Yes, this is how I would expect it to work.

The table provider would have to figure out the best way to evalute the projection depending on its actual layout

for variants this would mean:

  1. Files that had a field extracted to a shredded column would use that
  2. Files that didn't have the field extracted would need to read the entire variant and pull out the field of interest

Maybe that's not an issue but I did want to point out that it blurs the lines of where IO happens and where compute happens. If this is a problem I think it would complicate the API substantially.

I agree -- implementing this optimally in a TableProvider will be complex

I note that the IO/CPU is already intertwined when implementing something like filter pushdown in parquet, so I am not sure also pusing down expressions makes the problem worse (or better)

@adriangb
Copy link
Contributor Author

adriangb commented Mar 6, 2025

I note that the IO/CPU is already intertwined when implementing something like filter pushdown in parquet, so I am not sure also pusing down expressions makes the problem worse (or better)

I agree here - it's likely to not be a problem in practice.

So it sounds like the main complexity is going to be the TableProvider having to take ownership of applying the expression. It would be interesting to see if there's a way for it to dynamically decide to fall back to DataFusion, I can imagine situations where it wants to handle certain branches / conditions but would rather not re-implement others.

@adriangb
Copy link
Contributor Author

adriangb commented Mar 6, 2025

@cetra3 suggested that maybe this can be done with a rewrite of the PhysicalPlan? I guess the main issue would be that you don't know anything about the file except the file path at this point. You'd need to do at least some IO to get the the Parquet metadata to know the file's actual schema.

@cetra3
Copy link
Contributor

cetra3 commented Mar 6, 2025

I guess what I was getting at is that maybe we could use a PhysicalOptimizerRule to do this sort of thing. However currently all the OptimizerRule traits are non async, which makes it a bit hard to do this sort of thing. I don't see why they can't be async

@gatesn
Copy link
Contributor

gatesn commented Mar 6, 2025

I can imagine situations where it wants to handle certain branches / conditions but would rather not re-implement others.

Given these are DataFusion scalar expressions (rather than any relational algebra), can the implementation not just invoke the expression as a fallback?

--

With Vortex, we've gone one step further and the scan accepts projection: Expr, filter: Option<Expr> where projection can arbitrarily select columns and apply scalar expressions to them. While this may be a little too general for DataFusion, it works well provided the system has good support for struct types and expressions for manipulating them. We have select (to select/exclude fields from a struct), pack (to assemble fields into a new struct), getitem (to extract a field from a struct), and merge (to union multiple structs, although this can be implemented with getitem+pack).

However this does have the downside of pushing disproportionate complexity onto the TableProvider for the simple case of projecting out a few columns.

@alamb
Copy link
Contributor

alamb commented Mar 6, 2025

So it sounds like the main complexity is going to be the TableProvider having to take ownership of applying the expression.

Given these are DataFusion scalar expressions (rather than any relational algebra), can the implementation not just invoke the expression as a fallback?

I agree with @gatesn -- I don't think adding new columns based on expressions has to be all that complex (you can already do it via a SchemaAdapter / SchemaMaper)

(there is a similar usecase for filling in new columns with default values rather than NULL)

With Vortex, we've gone one step further and the scan accepts projection: Expr, filter: Option where projection can arbitrarily select columns and apply scalar expressions to them. While this may be a little too general for DataFusion, it works well provided the system has good support for struct types and expressions for manipulating them.

The normal DataFusion filter pushdown API allows table providers to report which expressions they can handle, which means most providers can ignore filters unless they have code to handle it.

However this does have the downside of pushing disproportionate complexity onto the TableProvider for the simple case of projecting out a few columns.

Again, I think simple TableProviders can use something like SchemaAdapter for this usecase

@adriangb
Copy link
Contributor Author

adriangb commented Mar 6, 2025

Would SchemaAdapter be a good place to implement this functionality? It already has knowledge of the required columns and file schema. We'd need piping around it (changes to TableProvider, Execs?) but at least implementing this as a user of DataFusion could be relatively self contained.

@gatesn
Copy link
Contributor

gatesn commented Mar 6, 2025

Is plumbing this through the schema overly specific to the pre-computed expression use-case? Or are you suggesting this is the mechanism by which all expression push-down occurs, by faking additional schema columns?

@alamb
Copy link
Contributor

alamb commented Mar 6, 2025

Or are you suggesting this is the mechanism by which all expression push-down occurs, by faking additional schema columns?

I think I was suggesting this (though I haven't thought about the API it too carefully)

Basically the TableSource would somehow have to say "I can evaluate this expression" and then have to tell DataFusion somehow what column corresponded to that expression.

Maybe a good first step would be to try and code up an example showing how to "push down" a field access

like input is a column user with documents like

{ id : 124
  name: 'foo'
},
{ id : 567
  name: 'bar'
}

And the table provider also has a shredded column like

foo
bar

And the idea is to show how a query like

select user['name'] from table

Could be evaluated using the table provider using the separate shredded column

@alamb
Copy link
Contributor

alamb commented Mar 6, 2025

BTW there is a related issue for parquet itself (where we don't support pushdown for sub fields):

@gatesn
Copy link
Contributor

gatesn commented Mar 6, 2025

I think I was suggesting this (though I haven't thought about the API it too carefully)

That feels like quite a roundabout way to do it, assuming I'm understanding correctly. DataFusion would ask which expressions can be pushed down, the provider would reply with some (typically) randomly generated column name string corresponding to the expression, augment its schema to include these expressions, and then DataFusion will ask for that column as part of the projection?

Is there a specific objection to general case? Similarly, I haven't thought through this fully in the context of DataFusion.

But roughly, DataFusion asks the table provider which expressions it can push-down, and the node is configured with both a projection expression and a filter expression. Exact same mechanism as filter expressions.

In your example, the expression would be $.getitem("user").getitem("name")

An advanced table provider could do with that as it pleases.

A simple one can construction the old (existing) projection mask with expr.accessed_fields() -> ["user"] or similar, projects out the fields, and then invokes the projection expression:

expr.evaluate(self.project(expr.accessed_fields()))

@alamb
Copy link
Contributor

alamb commented Mar 6, 2025

Is there a specific objection to general case? Similarly, I haven't thought through this fully in the context of DataFusion.

I am not sure I understand what the general case is you are referring to.

The only thing DataFusion needs is to somehow know what output column corresponds to the expression that was pushed down (so it can match it up with the rest of the plan).

DataFusion would ask which expressions can be pushed down, the provider would reply with some (typically) randomly generated column name string corresponding to the expression, augment its schema to include these expressions, and then DataFusion will ask for that column as part of the projection?

I agree this sounds complicated and not a great idea.

It sounds like we are basically saying the same thing (which I view as a good thing)

@adriangb
Copy link
Contributor Author

adriangb commented Mar 6, 2025

But roughly, DataFusion asks the table provider which expressions it can push-down, and the node is configured with both a projection expression and a filter expression. Exact same mechanism as filter expressions.

@gatesn I'm in agreement with you, what you are proposing makes total sense, especially the bit about sharing code paths with filter expressions.

An advanced table provider could do with that as it pleases.

👍🏻 agreed, but we should have at least some reasonable examples of how to use this that don't require tremendous complexity. E.g. hopefully we can use SchemaAdapter or something similar so that I can write <100 LOC and get inject a custom implementation of struct unpacking / shredding into an existing table provider like ListingTableProvider

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants