Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] feat: Add datafusion-storage #15018

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

Xuanwo
Copy link
Member

@Xuanwo Xuanwo commented Mar 5, 2025

Which issue does this PR close?

Rationale for this change

This PR is basiclly a PoC for the datafusion-storage and all API are available just for discussion.

Design

The DataFusion storage is designed based on current DataFusion usage, where we only use read, write, stat, and list for now.

  • We have a Storage trait that implementations must follow, allowing each implementer to optimize within StorageFileRead, StorageFileWrite, and other related components.
    • Storage is designed in an io-uring friendly way.
  • We introduced a StorageExt trait primarily for DataFusion users, providing helper functions to optimize storage usage. These functions help users avoid unnecessary memory copies or clones, ensuring more efficient performance.
  • For every operation, we will pass down a StorageContext to carry context such as metrics, tracing spans, HTTP clients, or runtime. DataFusion users can implement or inject them as needed.
  • For each returned struct, such as StorageFileReader, we implement adapters for futures::AsyncWrite and futures::Stream to optimize our API and avoid unnecessary costs.
  • We intentionally hide storage-specific features like multipart uploads, leaving them as implementation details for the datafusion-storage implementer.
    • Datafusion users don't need to care about what happened while calling write
    • But if they do, they can dive into StorageFileWrite and do whatever they want.
    • For examples, object_store can spawn PutUpload task to StorageContext::runtime on need.

More details

datafusion-storage can serve as the default storage entry point for our execution, but we also provide traits like the Parquet AsyncFileReader, allowing users to integrate their own storage solutions.

What changes are included in this PR?

Added a new crate called datafusion-storage, which serves as the storage abstraction for the entire DataFusion ecosystem.

Are these changes tested?

Not yet.

Are there any user-facing changes?

Yes, we will provide more details later as we make progress with the actual work.

Signed-off-by: Xuanwo <github@xuanwo.io>
@alamb
Copy link
Contributor

alamb commented Mar 6, 2025

I plan to check this out more carefully tomorrow

@alamb
Copy link
Contributor

alamb commented Mar 7, 2025

I ran out of time today -- will check it soon

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for this code @Xuanwo -- I took a look at it this morning.

I took a high level look at the APIs and it seems quite reasonable (I left some comments). My biggest questions revolve around if we could extend this API to avoid copying.

I think a good next step would be to try and create an example of plumbing the use of this API and seeing how the integration looked. For example, maybe enough to to read a CSV or JSON file

If we can make the API work for the parquet reader I think it could be made to work for anything else in DataFusion. However, the parquet reader is likely the most complicated task so likely not a great one to start with.

If we move on with this API, we would also have to come up with a migration plan (aka some APIs are directly in terms of object store, such as FileScanConfig which has a object store url embedded directly). I am sure this migration is doable, but it would take some thought and care

/// Adapter to allow using `futures::io::Sink` with `StorageFileWriter`
pub struct StorageFileBytesSink {}

/// Adapter to allow using `tokio::io::AsyncWrite` with `StorageFileWriter`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the concerns I have heard about the AsyncWrite API is that it take a &[u8] pretty much requiring an extra copy

I wonder if you have ideas about having the API take ownership of data -- for example https://docs.rs/object_store/latest/object_store/struct.PutPayload.html (which uses Bytes to potentially avoid copying)

https://docs.rs/object_store/latest/object_store/struct.PutPayload.html

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @alamb, this is a reasonable concern and the most important thing I learned from object_store.

My current design is as follows:

  • StorageFileWriter itself implements StorageFileWrite, which is zero-cost and involves no data copying.
  • StorageFileWriter::into_sink converts itself into a futures::io::Sink, which incurs a slight runtime overhead because we need to maintain the future state, but it does not introduce any data copying costs.
  • StorageFileWriter::into_futures_write and StorageFileWriter::into_tokio_write convert themselves into an AsyncWrite, which does require data copying. However, from an end-to-end perspective of the total data flow, this is unavoidable because the final use of this API requires AsyncWrite. This data copying is necessary at some point, regardless of where we introduce it. For example, the csv crate only accepts AsyncWrite.

Apart from the zero-cost fashion call on StorageFileWrite, all other call styles require an explicit call like into_xxx. The caller should be very careful about what they are doing and the cost they are incurring.

}
}

/// Adapter to allow using `futures::io::AsyncRead` with `StorageFileReader`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question about read as write -- to all these APIs require copying the data (aka take the data as &[u8])?

One of the neat things we do with the object store API and parquet reader when reading strings in Utf8View is that the same Bytes buffer that is returned from object_store is converted into a parquet Buffer and then eventually forms the underlying Arrow Buffer of the returned array

IN other words there is no copying data around once it comes back from object store

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most thing is similar to #15018 (comment), we will have:

  • zero cost StorageFileRead
  • StorageFileReader::into_stream
  • StorageFileReader::into_futures_read and StorageFileReader::into_tokio_read

}

pub struct StorageFileWriter {
inner: Box<dyn StorageFileWrite>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this was meant to be a trait (rather than a self recursive strucutre)

Copy link
Member Author

@Xuanwo Xuanwo Mar 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StorageFileWrite is the trait and StorageFileWriter is the struct we build upon it.

@alamb
Copy link
Contributor

alamb commented Mar 9, 2025

@tustvold
Copy link
Contributor

tustvold commented Mar 9, 2025

So I've not had time to look in huge detail, and would echo Andrew's concerns around AsyncWrite and friends, ObjectStore intentionally is not formulated in terms of them.

However, taking a step back the proposed abstraction doesn't appear to operate at a higher-level than ObjectStore or OpenDAL, it is still focused on shuffling lists of objects/files and passing opaque bytes around. It is unclear to me that introducing a third abstraction at this same level really unlocks many new use-cases / optimisation opportunities? That seems a tough sell to me, given the potential disruption

The original ticket stated

The benefit is that users can implement innovative features like datafusion-storage-cudf or datafusion-storage-io_uring without being constrained by the current I/O abstraction of object-store or OpenDAL.

However, it is unclear to me why either of these would not fit the ObjectStore interface, and therefore how datafusion-storage would need to differ in order to accommodate them? Ultimately both of these are going to need some bridge through to DF, as I doubt a !Send DataFusion is on the cards anytime soon, and this could easily be mapped to ObjectStore.

This would allow them to maintain useful features such as context management and add additional requirements to the trait while letting datafusion-storage-object-store and datafusion-storage-opendal handle the extra work.

Is there perhaps some functionality that OpenDAL provides that can't be exposed object_store_opendal, and a new datafusion-storage abstraction would allow exposing it? Could we add this to the ObjectStore interface?

With the growth of DF, we have to continuously add more features to object_store, making it increasingly difficult to compose, as described in apache/arrow-rs#7171.

The challenge here, and what I was getting at in the ticket, is that much of the functionality was being implemented at the ObjectStore boundary, where it might be better served being implemented either higher or lower in the stack. We've addressed the latter by introducing HttpClient in object_store. I see datafusion_storage as an opportunity to address the former.

To put this concretely, say I am wanting to implement caching of parquet files, I don't want to be caching raw bytes and byte ranges. Instead I want to be able cache the metadata separately, and then perhaps have some internal data structure for quickly identifying row groups, etc...

Similarly for CSV files, I might want the ability to cache file schemas, or meta information about number of rows, etc...

This is the level that historically hasn't really existed in a coherent form in DF. There have existed things like AsyncFileReaderFactory, etc... but they're kind of ad-hoc, relatively hard to use, and not part of a coherent global design. It is possible/probable that the recent DataSource work is this abstraction already, but I haven't followed it close enough to weigh in on this with any authority.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants