From 7f4ab65b7995e20a598c4ad9688267d1358d284b Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Mon, 25 Sep 2023 17:38:05 -0400 Subject: [PATCH] Objectstore fixes (#23) * Fix handling of list results regarding escape * Fix listing root * Add tests for listing non-existent prefix * Add feature flag to linking gsasl --- rust/build.rs | 1 + rust/src/error.rs | 4 ++ rust/src/file.rs | 9 ++- rust/src/object_store.rs | 139 +++++++++++++++++++++++++-------------- rust/tests/common/mod.rs | 17 +++++ 5 files changed, 119 insertions(+), 51 deletions(-) diff --git a/rust/build.rs b/rust/build.rs index 185ae5c..40f8d6c 100644 --- a/rust/build.rs +++ b/rust/build.rs @@ -1,6 +1,7 @@ use std::io::Result; fn main() -> Result<()> { + #[cfg(feature = "token")] println!("cargo:rustc-link-lib=gsasl"); #[cfg(feature = "generate-protobuf")] diff --git a/rust/src/error.rs b/rust/src/error.rs index 6ed8c22..807b870 100644 --- a/rust/src/error.rs +++ b/rust/src/error.rs @@ -15,6 +15,10 @@ pub enum HdfsError { DataTransferError(String), #[error("invalid path")] InvalidPath(String), + #[error("invalid argument")] + InvalidArgument(String), + #[error("file already exists")] + AlreadyExists(String), #[error("operation failed")] OperationFailed(String), #[error("file not found")] diff --git a/rust/src/file.rs b/rust/src/file.rs index 36a284e..5ab8750 100644 --- a/rust/src/file.rs +++ b/rust/src/file.rs @@ -8,7 +8,7 @@ use crate::hdfs::datanode::{BlockReader, BlockWriter}; use crate::hdfs::ec::EcSchema; use crate::hdfs::protocol::NamenodeProtocol; use crate::proto::hdfs; -use crate::Result; +use crate::{HdfsError, Result}; pub struct FileReader { status: hdfs::HdfsFileStatusProto, @@ -74,7 +74,12 @@ impl FileReader { /// could be smaller than `len` if `offset + len` extends beyond the end of the file. pub async fn read_range(&self, offset: usize, len: usize) -> Result { let end = usize::min(self.file_length(), offset + len); - assert!(offset <= end); + if offset >= end { + return Err(HdfsError::InvalidArgument( + "Offset is past the end of the file".to_string(), + )); + } + let buf_size = end - offset; let mut buf = BytesMut::zeroed(buf_size); self.read_range_buf(&mut buf, offset).await?; diff --git a/rust/src/object_store.rs b/rust/src/object_store.rs index 6aa2dec..e3222fc 100644 --- a/rust/src/object_store.rs +++ b/rust/src/object_store.rs @@ -32,6 +32,34 @@ impl HdfsObjectStore { client: Arc::new(client), } } + + async fn internal_copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> { + // TODO: Batch this instead of loading the whole thing + let overwrite = match self.client.get_file_info(&make_absolute_file(to)).await { + Ok(_) if overwrite => true, + Ok(_) => Err(HdfsError::AlreadyExists(make_absolute_file(to)))?, + Err(HdfsError::FileNotFound(_)) => false, + Err(e) => Err(e)?, + }; + + let mut write_options = WriteOptions::default(); + write_options.overwrite = overwrite; + + let data = { + let mut file = self.client.read(&make_absolute_file(from)).await?; + file.read(file.remaining()).await? + }; + + let mut new_file = self + .client + .create(&make_absolute_file(to), write_options) + .await?; + + new_file.write(data).await?; + new_file.close().await?; + + Ok(()) + } } impl Display for HdfsObjectStore { @@ -225,16 +253,19 @@ impl ObjectStore for HdfsObjectStore { ) .into_stream() .filter(|res| { - let result = if let Ok(status) = res { - !status.isdir - } else { - true + let result = match res { + Ok(status) => !status.isdir, + // Listing by prefix should just return an empty list if the prefix isn't found + Err(HdfsError::FileNotFound(_)) => false, + _ => true, }; future::ready(result) }) - .map(move |res| { - res.map(|s| s.into()) - .map_err(|err| object_store::Error::from(err)) + .map(|res| { + res.map_or_else( + |e| Err(object_store::Error::from(e)), + |s| get_object_meta(&s), + ) }); Ok(Box::pin(status_stream)) @@ -247,28 +278,38 @@ impl ObjectStore for HdfsObjectStore { /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of /// `foo/bar_baz/x`. async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { - let mut status_stream = self.client.list_status_iter( - &prefix - .map(|p| make_absolute_dir(p)) - .unwrap_or("".to_string()), - false, - ); + let mut status_stream = self + .client + .list_status_iter( + &prefix + .map(|p| make_absolute_dir(p)) + .unwrap_or("".to_string()), + false, + ) + .into_stream() + .filter(|res| { + let result = match res { + // Listing by prefix should just return an empty list if the prefix isn't found + Err(HdfsError::FileNotFound(_)) => false, + _ => true, + }; + future::ready(result) + }); let mut statuses = Vec::::new(); while let Some(status) = status_stream.next().await { statuses.push(status?); } - let dirs: Vec = statuses - .iter() - .filter(|s| s.isdir) - .map(|s| Path::from(s.path.as_ref())) - .collect(); - let files: Vec = statuses - .iter() - .filter(|s| !s.isdir) - .map(|s| s.into()) - .collect(); + let mut dirs: Vec = Vec::new(); + for status in statuses.iter().filter(|s| s.isdir) { + dirs.push(Path::parse(&status.path)?) + } + + let mut files: Vec = Vec::new(); + for status in statuses.iter().filter(|s| !s.isdir) { + files.push(get_object_meta(status)?) + } Ok(ListResult { common_prefixes: dirs, @@ -276,13 +317,6 @@ impl ObjectStore for HdfsObjectStore { }) } - /// Copy an object from one path to another in the same object store. - /// - /// If there exists an object at the destination, it will be overwritten. - async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> { - Err(object_store::Error::NotImplemented) - } - async fn rename(&self, from: &Path, to: &Path) -> Result<()> { Ok(self .client @@ -297,6 +331,13 @@ impl ObjectStore for HdfsObjectStore { .await?) } + /// Copy an object from one path to another in the same object store. + /// + /// If there exists an object at the destination, it will be overwritten. + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + self.internal_copy(from, to, true).await + } + /// Copy an object from one path to another, only if destination is empty. /// /// Will return an error if the destination already has an object. @@ -304,8 +345,8 @@ impl ObjectStore for HdfsObjectStore { /// Performs an atomic operation if the underlying object storage supports it. /// If atomic operations are not supported by the underlying object storage (like S3) /// it will return an error. - async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> { - Err(object_store::Error::NotImplemented) + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + self.internal_copy(from, to, false).await } } @@ -316,6 +357,10 @@ impl From for object_store::Error { path: path.clone(), source: Box::new(HdfsError::FileNotFound(path)), }, + HdfsError::AlreadyExists(path) => object_store::Error::AlreadyExists { + path: path.clone(), + source: Box::new(HdfsError::AlreadyExists(path)), + }, _ => object_store::Error::Generic { store: "HdfsObjectStore", source: Box::new(value), @@ -391,24 +436,20 @@ fn make_absolute_file(path: &Path) -> String { } fn make_absolute_dir(path: &Path) -> String { - format!("/{}/", path.as_ref()) -} - -impl From<&FileStatus> for ObjectMeta { - fn from(status: &FileStatus) -> Self { - ObjectMeta { - location: Path::from(status.path.clone()), - last_modified: Utc.from_utc_datetime( - &NaiveDateTime::from_timestamp_opt(status.modification_time as i64, 0).unwrap(), - ), - size: status.length, - e_tag: None, - } + if path.parts().count() > 0 { + format!("/{}/", path.as_ref()) + } else { + "/".to_string() } } -impl From for ObjectMeta { - fn from(status: FileStatus) -> Self { - (&status).into() - } +fn get_object_meta(status: &FileStatus) -> Result { + Ok(ObjectMeta { + location: Path::parse(&status.path)?, + last_modified: Utc.from_utc_datetime( + &NaiveDateTime::from_timestamp_opt(status.modification_time as i64, 0).unwrap(), + ), + size: status.length, + e_tag: None, + }) } diff --git a/rust/tests/common/mod.rs b/rust/tests/common/mod.rs index 9c8adf7..2298369 100644 --- a/rust/tests/common/mod.rs +++ b/rust/tests/common/mod.rs @@ -258,6 +258,23 @@ async fn test_object_store_list(store: &HdfsObjectStore) -> object_store::Result assert_eq!(list.len(), 1); assert_eq!(list[0].as_ref().unwrap().location, Path::from("/testfile")); + // Listing of a prefix that doesn't exist should return an empty result, not an error + assert_eq!( + store + .list(Some(&Path::from("/doesnt/exist"))) + .await? + .count() + .await, + 0 + ); + + let list = store + .list_with_delimiter(Some(&Path::from("/doesnt/exist"))) + .await?; + + assert_eq!(list.common_prefixes.len(), 0); + assert_eq!(list.objects.len(), 0); + Ok(()) }