diff --git a/Cargo.lock b/Cargo.lock index 7fb752574695..a03addc2208e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1752,6 +1752,7 @@ dependencies = [ "datafusion-physical-optimizer", "datafusion-physical-plan", "datafusion-sql", + "datafusion-storage", "doc-comment", "env_logger", "flate2", @@ -1835,6 +1836,7 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-expr-common", "datafusion-physical-plan", + "datafusion-storage", "futures", "log", "object_store", @@ -1921,6 +1923,7 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-expr-common", "datafusion-physical-plan", + "datafusion-storage", "flate2", "futures", "glob", @@ -1976,9 +1979,9 @@ dependencies = [ "dashmap", "datafusion-common", "datafusion-expr", + "datafusion-storage", "futures", "log", - "object_store", "parking_lot", "rand 0.8.5", "tempfile", @@ -2368,6 +2371,19 @@ dependencies = [ "tokio-postgres", ] +[[package]] +name = "datafusion-storage" +version = "45.0.0" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "dashmap", + "datafusion-common", + "futures", + "url", +] + [[package]] name = "datafusion-substrait" version = "45.0.0" diff --git a/Cargo.toml b/Cargo.toml index f7d39aeb3003..c682fc6c7ef2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ members = [ "datafusion/proto-common/gen", "datafusion/sql", "datafusion/sqllogictest", + "datafusion/storage", "datafusion/substrait", "datafusion/wasmtest", "datafusion-cli", @@ -127,6 +128,7 @@ datafusion-physical-plan = { path = "datafusion/physical-plan", version = "45.0. datafusion-proto = { path = "datafusion/proto", version = "45.0.0" } datafusion-proto-common = { path = "datafusion/proto-common", version = "45.0.0" } datafusion-sql = { path = "datafusion/sql", version = "45.0.0" } +datafusion-storage = { path = "datafusion/storage", version = "45.0.0" } doc-comment = "0.3" env_logger = "0.11" futures = "0.3" @@ -144,6 +146,7 @@ parquet = { version = "54.2.0", default-features = false, features = [ ] } pbjson = { version = "0.7.0" } pbjson-types = "0.7" +percent-encoding = "2.1.0" # Should match arrow-flight's version of prost. prost = "0.13.1" rand = "0.8.5" diff --git a/datafusion/storage/Cargo.toml b/datafusion/storage/Cargo.toml new file mode 100644 index 000000000000..8701e847187e --- /dev/null +++ b/datafusion/storage/Cargo.toml @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +authors = { workspace = true } +description = "DataFusion Storage" +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +name = "datafusion-storage" +readme = "README.md" +repository = { workspace = true } +rust-version = { workspace = true } +version = { workspace = true } + +[package.metadata.docs.rs] +all-features = true + +[lints] +workspace = true + +[lib] +name = "datafusion_storage" + +[dependencies] +datafusion-common = { workspace = true, default-features = true } + +async-trait = { workspace = true } +futures = { workspace = true } +url = { workspace = true } +chrono = { workspace = true } +bytes = { workspace = true } +dashmap = { workspace = true } diff --git a/datafusion/storage/LICENSE.txt b/datafusion/storage/LICENSE.txt new file mode 120000 index 000000000000..1ef648f64b34 --- /dev/null +++ b/datafusion/storage/LICENSE.txt @@ -0,0 +1 @@ +../../LICENSE.txt \ No newline at end of file diff --git a/datafusion/storage/NOTICE.txt b/datafusion/storage/NOTICE.txt new file mode 120000 index 000000000000..fb051c92b10b --- /dev/null +++ b/datafusion/storage/NOTICE.txt @@ -0,0 +1 @@ +../../NOTICE.txt \ No newline at end of file diff --git a/datafusion/storage/README.md b/datafusion/storage/README.md new file mode 100644 index 000000000000..dec73f324668 --- /dev/null +++ b/datafusion/storage/README.md @@ -0,0 +1,24 @@ + + +# DataFusion Storage + +This crate provides a general-purpose storage solution that can interact with various storage services, such as S3 and the local fs. Although it is used by the [DataFusion][df] query engine, it is designed to be easily integrated into any project requiring a storage interface. + +[df]: https://crates.io/crates/datafusion diff --git a/datafusion/storage/src/api.rs b/datafusion/storage/src/api.rs new file mode 100644 index 000000000000..e5f2b0f7f92f --- /dev/null +++ b/datafusion/storage/src/api.rs @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::context::StorageContext; +use crate::file_metadata::StorageFileMetadata; +use crate::list::{StorageFileLister, StorageListOptions}; +use crate::read::StorageFileReader; +use crate::stat::StorageStatOptions; +use crate::StorageFileWriter; +use crate::{StorageReadOptions, StorageWriteOptions, StorageWriteResult}; +use async_trait::async_trait; +use bytes::Bytes; +use datafusion_common::Result; +use futures::Stream; +use std::fmt::Debug; +use std::ops::Range; + +#[async_trait] +pub trait Storage: std::fmt::Display + Send + Sync + Debug + 'static { + async fn write_opts( + &self, + ctx: &StorageContext, + location: &str, + options: StorageWriteOptions, + ) -> Result; + + async fn read_opts( + &self, + ctx: &StorageContext, + location: &str, + options: StorageReadOptions, + ) -> Result; + + async fn stat_opts( + &self, + ctx: &StorageContext, + location: &str, + options: StorageStatOptions, + ) -> Result; + + async fn list_opts( + &self, + ctx: &StorageContext, + location: &str, + options: StorageListOptions, + ) -> Result; +} + +#[async_trait] +pub trait StorageExt: Storage { + async fn write( + &self, + ctx: &StorageContext, + location: &str, + payload: Bytes, + ) -> Result { + todo!() + } + + async fn write_iter( + &self, + ctx: &StorageContext, + location: &str, + payload: impl Iterator> + Send, + ) -> Result { + todo!() + } + + async fn write_stream( + &self, + ctx: &StorageContext, + location: &str, + stream: impl Stream> + Send, + ) -> Result { + todo!() + } + + async fn read(&self, ctx: &StorageContext, location: &str) -> Result { + todo!() + } + + async fn read_range( + &self, + ctx: &StorageContext, + location: &str, + range: Range, + ) -> Result { + todo!() + } + + async fn read_ranges( + &self, + ctx: &StorageContext, + location: &str, + ranges: &[Range], + ) -> Result> { + todo!() + } +} diff --git a/datafusion/storage/src/context.rs b/datafusion/storage/src/context.rs new file mode 100644 index 000000000000..8d26817173d9 --- /dev/null +++ b/datafusion/storage/src/context.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Context for storage operations +pub struct StorageContext { + // TODO: add more datafusion context here + + // metrics: metrics for the storage operations + // tracing_span: tracing span for the storage operations + // http_client: http_client for all the storage operations + // runtime: async runtime for the storage operations +} diff --git a/datafusion/storage/src/error.rs b/datafusion/storage/src/error.rs new file mode 100644 index 000000000000..3408988e375e --- /dev/null +++ b/datafusion/storage/src/error.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub enum StorageError { + Generic { + /// The store this error originated from + store: &'static str, + /// The wrapped error + source: Box, + }, +} diff --git a/datafusion/storage/src/file_metadata.rs b/datafusion/storage/src/file_metadata.rs new file mode 100644 index 000000000000..b3220ed4d03f --- /dev/null +++ b/datafusion/storage/src/file_metadata.rs @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use chrono::{DateTime, Utc}; + +/// The metadata that describes a file inside a storage system. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StorageFileMetadata { + /// The full path to the object + pub location: String, + /// The last modified time + pub last_modified: DateTime, + /// The size in bytes of the object + pub size: u64, + /// The unique identifier for the object + /// + /// + pub e_tag: Option, + /// A version indicator for this object + pub version: Option, +} diff --git a/datafusion/storage/src/lib.rs b/datafusion/storage/src/lib.rs new file mode 100644 index 000000000000..231bfe316ded --- /dev/null +++ b/datafusion/storage/src/lib.rs @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod api; +pub use api::Storage; + +mod file_metadata; +pub use file_metadata::StorageFileMetadata; + +mod list; + +mod read; +pub use read::ReadRange; +pub use read::StorageReadOptions; + +mod write; +pub use write::StorageFileWrite; +pub use write::StorageFileWriter; +pub use write::StorageWriteOptions; +pub use write::StorageWriteResult; + +mod context; +mod error; +mod registry; +mod stat; + +pub use registry::DefaultStorageRegistry; +pub use registry::StorageRegistry; +pub use registry::StorageUrl; diff --git a/datafusion/storage/src/list.rs b/datafusion/storage/src/list.rs new file mode 100644 index 000000000000..29e721d9c7e9 --- /dev/null +++ b/datafusion/storage/src/list.rs @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::context::StorageContext; +use crate::file_metadata::StorageFileMetadata; +use crate::StorageWriteResult; +use async_trait::async_trait; +use bytes::Bytes; + +pub struct StorageListOptions { + pub recursive: bool, +} + +#[async_trait] +pub trait StorageFileList: Send + Sync + 'static { + /// Write a single chunk of data to the file + async fn next( + &mut self, + ctx: &StorageContext, + ) -> datafusion_common::Result; +} + +pub struct StorageFileLister { + location: String, + inner: Box, +} + +impl StorageFileLister { + pub fn into_stream(self) -> StorageFileMetadataStream { + StorageFileMetadataStream {} + } +} + +/// Adapter to allow using `futures::Stream` with `StorageFileLister` +pub struct StorageFileMetadataStream {} diff --git a/datafusion/storage/src/read.rs b/datafusion/storage/src/read.rs new file mode 100644 index 000000000000..82c5aefda4e0 --- /dev/null +++ b/datafusion/storage/src/read.rs @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::context::StorageContext; +use crate::file_metadata::StorageFileMetadata; +use async_trait::async_trait; +use bytes::{Buf, Bytes}; +use chrono::{DateTime, Utc}; +use datafusion_common::Result; +use futures::stream::BoxStream; +use futures::{stream, Stream, StreamExt}; +use std::ops::{Range, RangeBounds}; + +/// Request only a portion of an object's bytes +/// +/// Implementations may wish to inspect [`ReadResult`] for the exact byte +/// range returned. +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum ReadRange { + /// Request a specific range of bytes + /// + /// If the given range is zero-length or starts after the end of the object, + /// an error will be returned. Additionally, if the range ends after the end + /// of the object, the entire remainder of the object will be returned. + /// Otherwise, the exact requested range will be returned. + Bounded(Range), + /// Request all bytes starting from a given byte offset + Offset(u64), + /// Request up to the last n bytes + Suffix(u64), +} + +impl> From for ReadRange { + fn from(value: T) -> Self { + use std::ops::Bound::*; + let first = match value.start_bound() { + Included(i) => *i, + Excluded(i) => i + 1, + Unbounded => 0, + }; + match value.end_bound() { + Included(i) => Self::Bounded(first..(i + 1)), + Excluded(i) => Self::Bounded(first..*i), + Unbounded => Self::Offset(first), + } + } +} + +/// Options for a read request, such as range +#[derive(Debug, Default, Clone)] +pub struct StorageReadOptions { + /// Request will succeed if the `StorageFileMetadata::e_tag` matches + /// otherwise returning [`Error::Precondition`] + /// + /// See + /// + /// Examples: + /// + /// ```text + /// If-Match: "xyzzy" + /// If-Match: "xyzzy", "r2d2xxxx", "c3piozzzz" + /// If-Match: * + /// ``` + pub if_match: Option, + /// Request will succeed if the `StorageFileMetadata::e_tag` does not match + /// otherwise returning [`Error::NotModified`] + /// + /// See + /// + /// Examples: + /// + /// ```text + /// If-None-Match: "xyzzy" + /// If-None-Match: "xyzzy", "r2d2xxxx", "c3piozzzz" + /// If-None-Match: * + /// ``` + pub if_none_match: Option, + /// Request will succeed if the object has been modified since + /// + /// + pub if_modified_since: Option>, + /// Request will succeed if the object has not been modified since + /// otherwise returning [`Error::Precondition`] + /// + /// Some stores, such as S3, will only return `NotModified` for exact + /// timestamp matches, instead of for any timestamp greater than or equal. + /// + /// + pub if_unmodified_since: Option>, + /// Request transfer of only the specified range of bytes + /// otherwise returning [`Error::NotModified`] + /// + /// + pub range: Option, + /// Request a particular object version + pub version: Option, +} + +#[async_trait] +pub trait StorageFileRead: Send + Sync + 'static { + async fn read(&mut self, ctx: &StorageContext) -> Result; +} + +pub struct StorageFileReader { + meta: StorageFileMetadata, + inner: Box, +} + +/// Expose public API for reading from a file +impl StorageFileReader { + pub fn metadata(&self) -> &StorageFileMetadata { + &self.meta + } + + pub fn into_futures_reader(self) -> StorageFileFuturesReader { + todo!() + } + + pub fn into_stream(self) -> StorageFileBytesStream { + todo!() + } + + pub fn into_tokio_reader(self) -> StorageFileTokioReader { + todo!() + } +} + +/// Adapter to allow using `futures::io::AsyncRead` with `StorageFileReader` +pub struct StorageFileFuturesReader {} + +/// Adapter to allow using `futures::io::Stream` with `StorageFileReader` +pub struct StorageFileBytesStream {} + +/// Adapter to allow using `tokio::io::AsyncRead` with `StorageFileReader` +pub struct StorageFileTokioReader {} diff --git a/datafusion/storage/src/registry.rs b/datafusion/storage/src/registry.rs new file mode 100644 index 000000000000..f89ad00439c1 --- /dev/null +++ b/datafusion/storage/src/registry.rs @@ -0,0 +1,288 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::Storage; +use dashmap::DashMap; +use datafusion_common::{exec_err, DataFusionError, Result}; +use std::sync::Arc; +use url::Url; + +/// A parsed URL identifying a particular [`Storage`] instance +/// +/// For example: +/// * `file://` for local file system +/// * `s3://bucket` for AWS S3 bucket +/// * `oss://bucket` for Aliyun OSS bucket +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct StorageUrl { + url: Url, +} + +impl StorageUrl { + /// Parse an [`StorageUrl`] from a string + /// + /// # Example + /// ``` + /// # use url::Url; + /// # use datafusion_storage::StorageUrl; + /// let storage_url = StorageUrl::parse("s3://bucket").unwrap(); + /// assert_eq!(storage_url.as_str(), "s3://bucket/"); + /// // can also access the underlying `Url` + /// let url: &Url = storage_url.as_ref(); + /// assert_eq!(url.scheme(), "s3"); + /// assert_eq!(url.host_str(), Some("bucket")); + /// assert_eq!(url.path(), "/"); + /// ``` + pub fn parse(s: impl AsRef) -> Result { + let mut parsed = + Url::parse(s.as_ref()).map_err(|e| DataFusionError::External(Box::new(e)))?; + + let remaining = &parsed[url::Position::BeforePath..]; + if !remaining.is_empty() && remaining != "/" { + return exec_err!( + "storage_url must only contain scheme and authority, got: {remaining}" + ); + } + + // Always set path for consistency + parsed.set_path("/"); + Ok(Self { url: parsed }) + } + + /// An [`StorageUrl`] for the local filesystem (`file://`) + /// + /// # Example + /// ``` + /// # use datafusion_storage::StorageUrl; + /// let local_fs = StorageUrl::parse("file://").unwrap(); + /// assert_eq!(local_fs, StorageUrl::local_filesystem()) + /// ``` + pub fn local_filesystem() -> Self { + Self::parse("file://").unwrap() + } + + /// Returns this [`StorageUrl`] as a string + pub fn as_str(&self) -> &str { + self.as_ref() + } +} + +impl AsRef for StorageUrl { + fn as_ref(&self) -> &str { + self.url.as_ref() + } +} + +impl AsRef for StorageUrl { + fn as_ref(&self) -> &Url { + &self.url + } +} + +impl std::fmt::Display for StorageUrl { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + self.as_str().fmt(f) + } +} + +/// [`StorageRegistry`] maps a URL to an [`Storage`] instance, +/// and allows DataFusion to read from different [`Storage`] +/// instances. For example DataFusion might be configured so that +/// +/// 1. `s3://my_bucket/lineitem/` mapped to the `/lineitem` path on an +/// AWS S3 object store bound to `my_bucket` +/// +/// 2. `s3://my_other_bucket/lineitem/` mapped to the (same) +/// `/lineitem` path on a *different* AWS S3 object store bound to +/// `my_other_bucket` +/// +/// When given a [`ListingTableUrl`], DataFusion tries to find an +/// appropriate [`Storage`]. For example +/// +/// ```sql +/// create external table unicorns stored as parquet location 's3://my_bucket/lineitem/'; +/// ``` +/// +/// In this particular case, the url `s3://my_bucket/lineitem/` will be provided to +/// [`StorageRegistry::get_storage`] and one of three things will happen: +/// +/// - If an [`Storage`] has been registered with [`StorageRegistry::register_storage`] with +/// `s3://my_bucket`, that [`Storage`] will be returned +/// +/// - If an AWS S3 object store can be ad-hoc discovered by the url `s3://my_bucket/lineitem/`, this +/// object store will be registered with key `s3://my_bucket` and returned. +/// +/// - Otherwise an error will be returned, indicating that no suitable [`Storage`] could +/// be found +/// +/// This allows for two different use-cases: +/// +/// 1. Systems where object store buckets are explicitly created using DDL, can register these +/// buckets using [`StorageRegistry::register_storage`] +/// +/// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can create [`Storage`] +/// lazily by providing a custom implementation of [`StorageRegistry`] +/// +/// +/// [`ListingTableUrl`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTableUrl.html +/// [`Storage`]: datafusion_storage::Storage +pub trait StorageRegistry: Send + Sync + std::fmt::Debug + 'static { + /// If a store with the same key existed before, it is replaced and returned + fn register_storage( + &self, + url: &Url, + store: Arc, + ) -> Option>; + + /// Get a suitable store for the provided URL. For example: + /// + /// - URL with scheme `file:///` or no scheme will return the default LocalFS store + /// - URL with scheme `s3://bucket/` will return the S3 store + /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store + /// + /// If no [`Storage`] found for the `url`, ad-hoc discovery may be executed depending on + /// the `url` and [`StorageRegistry`] implementation. An [`Storage`] may be lazily + /// created and registered. + fn get_storage(&self, url: &Url) -> Result>; +} + +/// The default [`StorageRegistry`] +pub struct DefaultStorageRegistry { + /// A map from scheme to object store that serve list / read operations for the store + stores: DashMap>, +} + +impl std::fmt::Debug for DefaultStorageRegistry { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("DefaultStorageRegistry") + .field( + "schemes", + &self + .stores + .iter() + .map(|o| o.key().clone()) + .collect::>(), + ) + .finish() + } +} + +impl Default for DefaultStorageRegistry { + fn default() -> Self { + Self::new() + } +} + +impl DefaultStorageRegistry { + /// Create a new [`DefaultStorageRegistry`] + pub fn new() -> Self { + let stores: DashMap> = DashMap::new(); + Self { stores } + } +} + +/// Stores are registered based on the scheme, host and port of the provided URL +/// with a [`LocalFileSystem::new`] automatically registered for `file://` (if the +/// target arch is not `wasm32`). +/// +/// For example: +/// +/// - `file:///my_path` will return the default LocalFS store +/// - `s3://bucket/path` will return a store registered with `s3://bucket` if any +/// - `hdfs://host:port/path` will return a store registered with `hdfs://host:port` if any +impl StorageRegistry for DefaultStorageRegistry { + fn register_storage( + &self, + url: &Url, + store: Arc, + ) -> Option> { + let s = get_url_key(url); + self.stores.insert(s, store) + } + + fn get_storage(&self, url: &Url) -> Result> { + let s = get_url_key(url); + self.stores + .get(&s) + .map(|o| Arc::clone(o.value())) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "No suitable object store found for {url}. See `RuntimeEnv::register_object_store`" + )) + }) + } +} + +/// Get the key of a url for object store registration. +/// The credential info will be removed +fn get_url_key(url: &Url) -> String { + format!( + "{}://{}", + url.scheme(), + &url[url::Position::BeforeHost..url::Position::AfterPort], + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_object_store_url() { + let file = StorageUrl::parse("file://").unwrap(); + assert_eq!(file.as_str(), "file:///"); + + let url = StorageUrl::parse("s3://bucket").unwrap(); + assert_eq!(url.as_str(), "s3://bucket/"); + + let url = StorageUrl::parse("s3://username:password@host:123").unwrap(); + assert_eq!(url.as_str(), "s3://username:password@host:123/"); + + let err = StorageUrl::parse("s3://bucket:invalid").unwrap_err(); + assert_eq!(err.strip_backtrace(), "External error: invalid port number"); + + let err = StorageUrl::parse("s3://bucket?").unwrap_err(); + assert_eq!( + err.strip_backtrace(), + "Execution error: StorageUrl must only contain scheme and authority, got: ?" + ); + + let err = StorageUrl::parse("s3://bucket?foo=bar").unwrap_err(); + assert_eq!(err.strip_backtrace(), "Execution error: StorageUrl must only contain scheme and authority, got: ?foo=bar"); + + let err = StorageUrl::parse("s3://host:123/foo").unwrap_err(); + assert_eq!(err.strip_backtrace(), "Execution error: StorageUrl must only contain scheme and authority, got: /foo"); + + let err = StorageUrl::parse("s3://username:password@host:123/foo").unwrap_err(); + assert_eq!(err.strip_backtrace(), "Execution error: StorageUrl must only contain scheme and authority, got: /foo"); + } + + #[test] + fn test_get_url_key() { + let file = StorageUrl::parse("file://").unwrap(); + let key = get_url_key(&file.url); + assert_eq!(key.as_str(), "file://"); + + let url = StorageUrl::parse("s3://bucket").unwrap(); + let key = get_url_key(&url.url); + assert_eq!(key.as_str(), "s3://bucket"); + + let url = StorageUrl::parse("s3://username:password@host:123").unwrap(); + let key = get_url_key(&url.url); + assert_eq!(key.as_str(), "s3://host:123"); + } +} diff --git a/datafusion/storage/src/stat.rs b/datafusion/storage/src/stat.rs new file mode 100644 index 000000000000..11fffe6ce0bb --- /dev/null +++ b/datafusion/storage/src/stat.rs @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::ReadRange; +use chrono::{DateTime, Utc}; + +pub struct StorageStatOptions { + /// Request will succeed if the `StorageFileMetadata::e_tag` matches + /// otherwise returning [`Error::Precondition`] + /// + /// See + /// + /// Examples: + /// + /// ```text + /// If-Match: "xyzzy" + /// If-Match: "xyzzy", "r2d2xxxx", "c3piozzzz" + /// If-Match: * + /// ``` + pub if_match: Option, + /// Request will succeed if the `StorageFileMetadata::e_tag` does not match + /// otherwise returning [`Error::NotModified`] + /// + /// See + /// + /// Examples: + /// + /// ```text + /// If-None-Match: "xyzzy" + /// If-None-Match: "xyzzy", "r2d2xxxx", "c3piozzzz" + /// If-None-Match: * + /// ``` + pub if_none_match: Option, + /// Request will succeed if the object has been modified since + /// + /// + pub if_modified_since: Option>, + /// Request will succeed if the object has not been modified since + /// otherwise returning [`Error::Precondition`] + /// + /// Some stores, such as S3, will only return `NotModified` for exact + /// timestamp matches, instead of for any timestamp greater than or equal. + /// + /// + pub if_unmodified_since: Option>, + /// Request a particular object version + pub version: Option, +} diff --git a/datafusion/storage/src/write.rs b/datafusion/storage/src/write.rs new file mode 100644 index 000000000000..0a0a37a04430 --- /dev/null +++ b/datafusion/storage/src/write.rs @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::context::StorageContext; +use async_trait::async_trait; +use bytes::{Buf, Bytes}; +use datafusion_common::Result; +use futures::AsyncWrite; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct StorageWriteOptions { + /// A version indicator for the newly created object + pub version: Option, +} + +/// Result for a write request +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StorageWriteResult { + /// The unique identifier for the newly created object + /// + /// + pub e_tag: Option, + /// A version indicator for the newly created object + pub version: Option, +} + +#[async_trait] +pub trait StorageFileWrite: Send + Sync + 'static { + /// Write a single chunk of data to the file + async fn write(&mut self, ctx: &StorageContext, data: Bytes) -> Result<()>; + + /// Finish writing to the file and return the result + async fn finish(&mut self, ctx: &StorageContext) -> Result; +} + +pub struct StorageFileWriter { + inner: Box, +} + +/// Expose public API for writing to a file +impl StorageFileWriter { + pub fn into_future_writer(self) -> StorageFileFuturesWriter { + todo!() + } + + pub fn into_sink(self) -> StorageFileBytesSink { + todo!() + } + + pub fn into_tokio_writer(self) -> StorageFileTokioWriter { + todo!() + } +} + +/// Adapter to allow using `futures::io::AsyncWrite` with `StorageFileWriter` +pub struct StorageFileFuturesWriter {} + +/// Adapter to allow using `futures::io::Sink` with `StorageFileWriter` +pub struct StorageFileBytesSink {} + +/// Adapter to allow using `tokio::io::AsyncWrite` with `StorageFileWriter` +pub struct StorageFileTokioWriter {}