Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return the "position" of rows in parquet files after performing a query. #13261

Open
adamfaulkner-at opened this issue Nov 5, 2024 · 7 comments
Labels
enhancement New feature or request

Comments

@adamfaulkner-at
Copy link

Is your feature request related to a problem or challenge?

Hello! I'm working on a database, using the delta lake format with datafusion as the query engine. I'd like to implement support for writing deletion vectors in delta lake when a row is deleted from my database. There's a very similar feature in iceberg that seems to work in exactly the same way.

The general idea is that a deletion vector encodes a bitmap for which rows in a parquet file are no longer valid and should be filtered out of any query results. That is, if the bit in position P is set, then the P'th row in the corresponding parquet file should be filtered out of query results.

AFAICT, the APIs already exist to enable this on the read side (see spiceai for example), but it's challenging to implement this on the write side because there's no obvious way to get the position of a row in a parquet file. The best idea I've come up with is to always sort my parquet files prior to writing them, and use a function like ROW_NUMBER to figure out the positions of rows. It would be great if the parquet reader machinery could expose this information directly instead.

Describe the solution you'd like

I'm not sure what a good API would look like here, but one idea is that the parquet reader could expose some new option that enables row position information to be returned as some special column name. I.E.

let ctx = SessionContext::new_with_config(SessionConfig::default().set_bool("datafusion.execution.parquet.include_row_position", true))
let record_batches = ctx.read_parquet("foo.parquet").filter(filters).select(PARQUET_ROW_POSITION).collect();
// record batches now contains the indexes of rows in "foo.parquet" that match the provided filters.

Another potential API could be to provide an alternative table provider which augments a parquet file with row numbers, without breaking when predicates are pushed down.

Describe alternatives you've considered

I'm considering doing the equivalent of this SQL:

SELECT row_number FROM
  (SELECT ROW_NUMBER() OVER (ORDER BY pk ASC) as row_number, c1, c2 (... all columns relevant for filtering) FROM table)
WHERE filters;

I assume this means that indexes and pruning will not happen, and this will likely not perform very well.

This requires that every file that I write be ordered by some pk. This is probably OK.

Additional context

I'm sure that the delta-rs and iceberg-rust projects will eventually want a feature like this. Neither project seems to be implementing deletion vector writes quite yet, but something like this will be highly useful.

@adamfaulkner-at adamfaulkner-at added the enhancement New feature or request label Nov 5, 2024
@findepi
Copy link
Member

findepi commented Nov 5, 2024

use a function like ROW_NUMBER to figure out the positions of rows. It would be great if the parquet reader machinery could expose this information directly instead.

The SQL-level approach would work only if the source file isn't filtered: no predicates, no pre-existing deletion vectors, etc.
I agree with the assessment that the information must be coning from the file reader itself.

Describe the solution you'd like

I'm not sure what a good API would look like here, but one idea is that the parquet reader could expose some new option that enables row position information to be returned as some special column name. I.E.

let ctx = SessionContext::new_with_config(SessionConfig::default().set_bool("datafusion.execution.parquet.include_row_position", true))
let record_batches = ctx.read_parquet("foo.parquet").filter(filters).select(PARQUET_ROW_POSITION).collect();
// record batches now contains the indexes of rows in "foo.parquet" that match the provided filters.

i like the syntax
@alamb can this be handled with some form of a hidden column?

@alamb
Copy link
Contributor

alamb commented Nov 6, 2024

I agree with the assessment that the information must be coning from the file reader itself.

I also agree with this assessment

In general I am not sure a SQL level solution will work well in general. Some challenges:

  • ctx.read_parquet("foo.parquet") may read the file in parallel, interleaving the rows
  • ctx.read_parquet("<directory>") can read more than one file and the row off set / position are per file

However, the DataFrame API you sketch out above seems reasonable and a relatively small part

THe other systems I know that support Delete Vectors (e.g. Vectica) basically have

  1. A special flag on the scan node (ParquetExec in DataFusion) that says to emit positions (in addition to potentially adding filters, etc)
  2. A special operator that knows how to take a stream of positions and encode them as whatever delete vectory format there is.

So in DataFusion this might look more like adding a method to TableProvider like TableProvider::delete_from similar to https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html#method.insert_into

And then each table provider would implement whatever API (which would likely involve positions as you describe)

This would allow DataFusion to handle the planning of DELETE

@findepi
Copy link
Member

findepi commented Nov 6, 2024

The positions (numbers) themselves are not enough. We need to return file paths as well.
The deletions will go back into TableProvider, right? so the TableProvider itself could determine what's the data it needs to execute deletions.
I think looking at how Trino used to implement deletes could be enlightening.
It starts here: https://github.com/trinodb/trino/blob/400/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java#L618 by asking data source what is the data needed for deletion tracking.
I wrote "used to" because current implementation is all MERGE-based and the code is much harder to follow, and thus less inspiring :)

@alamb
Copy link
Contributor

alamb commented Nov 6, 2024

I think that API makes a lot of sense.

What does MERGE based mean? Does it mean that the row_ids come in order?

@findepi
Copy link
Member

findepi commented Nov 8, 2024

Nothing to do with merge sort, sorry for not being clear.
Support SQL MERGE is more difficult than supporting SQL DELETE, but once merge is supported, it can be used to impl delete.

@adamfaulkner-at
Copy link
Author

We need to return file paths as well.

I think I could get around this in the current API by adding a fake partition column that includes the filename inside the table provider. Look at how Delta-rs does this in their table provider here.

Having a more explicit API would be nice, though.

@alamb
Copy link
Contributor

alamb commented Feb 12, 2025

BTW I think this can be achieved when we merge the metadata columns PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants