-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Push down expression evaluation in TableProviders
#14993
Comments
Argh I've asked this before and it's been answered: #7845 (comment) |
I am reopening this ticket as I think it covers serveral important usecases (that are all subsets of @adriangb 's example of
So a query might be select EXTRACT (minute from "EventDate"), SUM(something)
FROM hits
GROUP BY EXTRACT (minute from "EventDate"); Being able to evlauate the One possibility here might be add an API to TableProvider similar to maybe something like /// Returns true for all `Expr` in `expr` that can be directly evaluated by the TableProvider
fn supports_expr_pushdown(
&self,
expr: &[&Expr],
) -> Result<Vec<bool>, DataFusionError> This information would have to be threaded through to (maybe it would be time to make |
TableProviders
Related blog from @gatesn https://blog.spiraldb.com/what-if-we-just-didnt-decompress-it/ |
Maybe this is what you meant but my mind it's possible to do even better: instead of evaluating it during the scan, the file might even contain a pre-evaluated version of it. You can imagine something like a column called One note about this use case though: you can mostly achieve it today with some rewrite rules (you rewrite
On top of these issues for a Variant type there is the issue that these columns can't vary file by file.
Something like that sounds great to me! A couple things to think about taking the example of Who is responsible for evaluating these expressions if they can vary on a per-file basis? What expression does the That said I think the best path forward is likely to prototype something in a PR and go from there 😄 |
Yes indeed ! great point
Yes, this is how I would expect it to work. The table provider would have to figure out the best way to evalute the projection depending on its actual layout for variants this would mean:
I agree -- implementing this optimally in a TableProvider will be complex I note that the IO/CPU is already intertwined when implementing something like filter pushdown in parquet, so I am not sure also pusing down expressions makes the problem worse (or better) |
I agree here - it's likely to not be a problem in practice. So it sounds like the main complexity is going to be the |
@cetra3 suggested that maybe this can be done with a rewrite of the PhysicalPlan? I guess the main issue would be that you don't know anything about the file except the file path at this point. You'd need to do at least some IO to get the the Parquet metadata to know the file's actual schema. |
I guess what I was getting at is that maybe we could use a |
Given these are DataFusion scalar expressions (rather than any relational algebra), can the implementation not just invoke the expression as a fallback? -- With Vortex, we've gone one step further and the scan accepts However this does have the downside of pushing disproportionate complexity onto the TableProvider for the simple case of projecting out a few columns. |
I agree with @gatesn -- I don't think adding new columns based on expressions has to be all that complex (you can already do it via a SchemaAdapter / SchemaMaper) (there is a similar usecase for filling in new columns with default values rather than NULL)
The normal DataFusion filter pushdown API allows table providers to report which expressions they can handle, which means most providers can ignore filters unless they have code to handle it.
Again, I think simple TableProviders can use something like SchemaAdapter for this usecase |
Would SchemaAdapter be a good place to implement this functionality? It already has knowledge of the required columns and file schema. We'd need piping around it (changes to TableProvider, Execs?) but at least implementing this as a user of DataFusion could be relatively self contained. |
Is plumbing this through the schema overly specific to the pre-computed expression use-case? Or are you suggesting this is the mechanism by which all expression push-down occurs, by faking additional schema columns? |
I think I was suggesting this (though I haven't thought about the API it too carefully) Basically the TableSource would somehow have to say "I can evaluate this expression" and then have to tell DataFusion somehow what column corresponded to that expression. Maybe a good first step would be to try and code up an example showing how to "push down" a field access like input is a column { id : 124
name: 'foo'
},
{ id : 567
name: 'bar'
} And the table provider also has a
And the idea is to show how a query like select user['name'] from table Could be evaluated using the table provider using the separate shredded column |
BTW there is a related issue for parquet itself (where we don't support pushdown for sub fields): |
That feels like quite a roundabout way to do it, assuming I'm understanding correctly. DataFusion would ask which expressions can be pushed down, the provider would reply with some (typically) randomly generated column name string corresponding to the expression, augment its schema to include these expressions, and then DataFusion will ask for that column as part of the projection? Is there a specific objection to general case? Similarly, I haven't thought through this fully in the context of DataFusion. But roughly, DataFusion asks the table provider which expressions it can push-down, and the node is configured with both a projection expression and a filter expression. Exact same mechanism as filter expressions. In your example, the expression would be An advanced table provider could do with that as it pleases. A simple one can construction the old (existing) projection mask with
|
I am not sure I understand what the general case is you are referring to. The only thing DataFusion needs is to somehow know what output column corresponds to the expression that was pushed down (so it can match it up with the rest of the plan).
I agree this sounds complicated and not a great idea. It sounds like we are basically saying the same thing (which I view as a good thing) |
@gatesn I'm in agreement with you, what you are proposing makes total sense, especially the bit about sharing code paths with filter expressions.
👍🏻 agreed, but we should have at least some reasonable examples of how to use this that don't require tremendous complexity. E.g. hopefully we can use |
For the scenario of
select expensive_thing(col1) from data
I would like to pre-process (speed up)expensive_thing(col1)
.The easiest way I can think of doing this is by pre-computing the expression and saving it as a column with a specific name like
_expensive_thing__col1
.But then I have to hardcode this column into my schema, the hardcoded expressions need to be the same for every file, etc.
To me an ideal solution would be to push down the expression to the file reading level so that I can then check "does
_some_other_expensive_expr__col38
exist in the file? if so read that, otherwise readcol38
and compute the expression".The tricky thing is I'd want to do this on a per-file level: depending on the data different expression/column combinations would be pre-computed; it's prohibitive to put them all in the schema that is shared amongst all files.
The text was updated successfully, but these errors were encountered: