-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[POC] feat: Add datafusion-storage #15018
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Xuanwo <github@xuanwo.io>
I plan to check this out more carefully tomorrow |
I ran out of time today -- will check it soon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for this code @Xuanwo -- I took a look at it this morning.
I took a high level look at the APIs and it seems quite reasonable (I left some comments). My biggest questions revolve around if we could extend this API to avoid copying.
I think a good next step would be to try and create an example of plumbing the use of this API and seeing how the integration looked. For example, maybe enough to to read a CSV or JSON file
If we can make the API work for the parquet reader I think it could be made to work for anything else in DataFusion. However, the parquet reader is likely the most complicated task so likely not a great one to start with.
If we move on with this API, we would also have to come up with a migration plan (aka some APIs are directly in terms of object store, such as FileScanConfig
which has a object store url embedded directly). I am sure this migration is doable, but it would take some thought and care
/// Adapter to allow using `futures::io::Sink` with `StorageFileWriter` | ||
pub struct StorageFileBytesSink {} | ||
|
||
/// Adapter to allow using `tokio::io::AsyncWrite` with `StorageFileWriter` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the concerns I have heard about the AsyncWrite
API is that it take a &[u8]
pretty much requiring an extra copy
I wonder if you have ideas about having the API take ownership of data -- for example https://docs.rs/object_store/latest/object_store/struct.PutPayload.html (which uses Bytes
to potentially avoid copying)
https://docs.rs/object_store/latest/object_store/struct.PutPayload.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @alamb, this is a reasonable concern and the most important thing I learned from object_store
.
My current design is as follows:
StorageFileWriter
itself implementsStorageFileWrite
, which is zero-cost and involves no data copying.StorageFileWriter::into_sink
converts itself into afutures::io::Sink
, which incurs a slight runtime overhead because we need to maintain the future state, but it does not introduce any data copying costs.StorageFileWriter::into_futures_write
andStorageFileWriter::into_tokio_write
convert themselves into anAsyncWrite
, which does require data copying. However, from an end-to-end perspective of the total data flow, this is unavoidable because the final use of this API requiresAsyncWrite
. This data copying is necessary at some point, regardless of where we introduce it. For example, thecsv
crate only acceptsAsyncWrite
.
Apart from the zero-cost fashion call on StorageFileWrite
, all other call styles require an explicit call like into_xxx
. The caller should be very careful about what they are doing and the cost they are incurring.
} | ||
} | ||
|
||
/// Adapter to allow using `futures::io::AsyncRead` with `StorageFileReader` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the same question about read as write -- to all these APIs require copying the data (aka take the data as &[u8]
)?
One of the neat things we do with the object store API and parquet reader when reading strings in Utf8View
is that the same Bytes
buffer that is returned from object_store
is converted into a parquet Buffer and then eventually forms the underlying Arrow Buffer of the returned array
IN other words there is no copying data around once it comes back from object store
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most thing is similar to #15018 (comment), we will have:
- zero cost
StorageFileRead
StorageFileReader::into_stream
StorageFileReader::into_futures_read
andStorageFileReader::into_tokio_read
} | ||
|
||
pub struct StorageFileWriter { | ||
inner: Box<dyn StorageFileWrite>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this was meant to be a trait (rather than a self recursive strucutre)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StorageFileWrite
is the trait and StorageFileWriter
is the struct we build upon it.
So I've not had time to look in huge detail, and would echo Andrew's concerns around AsyncWrite and friends, ObjectStore intentionally is not formulated in terms of them. However, taking a step back the proposed abstraction doesn't appear to operate at a higher-level than ObjectStore or OpenDAL, it is still focused on shuffling lists of objects/files and passing opaque bytes around. It is unclear to me that introducing a third abstraction at this same level really unlocks many new use-cases / optimisation opportunities? That seems a tough sell to me, given the potential disruption The original ticket stated
However, it is unclear to me why either of these would not fit the ObjectStore interface, and therefore how datafusion-storage would need to differ in order to accommodate them? Ultimately both of these are going to need some bridge through to DF, as I doubt a
Is there perhaps some functionality that OpenDAL provides that can't be exposed
The challenge here, and what I was getting at in the ticket, is that much of the functionality was being implemented at the ObjectStore boundary, where it might be better served being implemented either higher or lower in the stack. We've addressed the latter by introducing HttpClient in object_store. I see datafusion_storage as an opportunity to address the former. To put this concretely, say I am wanting to implement caching of parquet files, I don't want to be caching raw bytes and byte ranges. Instead I want to be able cache the metadata separately, and then perhaps have some internal data structure for quickly identifying row groups, etc... Similarly for CSV files, I might want the ability to cache file schemas, or meta information about number of rows, etc... This is the level that historically hasn't really existed in a coherent form in DF. There have existed things like AsyncFileReaderFactory, etc... but they're kind of ad-hoc, relatively hard to use, and not part of a coherent global design. It is possible/probable that the recent DataSource work is this abstraction already, but I haven't followed it close enough to weigh in on this with any authority. |
Which issue does this PR close?
datafusion-storage
as datafusion's own storage interface #14854Rationale for this change
This PR is basiclly a PoC for the
datafusion-storage
and all API are available just for discussion.Design
The DataFusion storage is designed based on current DataFusion usage, where we only use
read
,write
,stat
, andlist
for now.Storage
trait that implementations must follow, allowing each implementer to optimize withinStorageFileRead
,StorageFileWrite
, and other related components.Storage
is designed in an io-uring friendly way.StorageExt
trait primarily for DataFusion users, providing helper functions to optimize storage usage. These functions help users avoid unnecessary memory copies or clones, ensuring more efficient performance.StorageContext
to carry context such as metrics, tracing spans, HTTP clients, or runtime. DataFusion users can implement or inject them as needed.StorageFileReader
, we implement adapters forfutures::AsyncWrite
andfutures::Stream
to optimize our API and avoid unnecessary costs.datafusion-storage
implementer.write
StorageFileWrite
and do whatever they want.PutUpload
task toStorageContext::runtime
on need.More details
datafusion-storage
can serve as the default storage entry point for our execution, but we also provide traits like the ParquetAsyncFileReader
, allowing users to integrate their own storage solutions.What changes are included in this PR?
Added a new crate called
datafusion-storage
, which serves as the storage abstraction for the entire DataFusion ecosystem.Are these changes tested?
Not yet.
Are there any user-facing changes?
Yes, we will provide more details later as we make progress with the actual work.