Skip to content

Commit

Permalink
Objectstore fixes (#23)
Browse files Browse the repository at this point in the history
* Fix handling of list results regarding escape

* Fix listing root

* Add tests for listing non-existent prefix

* Add feature flag to linking gsasl
  • Loading branch information
Kimahriman authored Sep 25, 2023
1 parent d0d393b commit 7f4ab65
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 51 deletions.
1 change: 1 addition & 0 deletions rust/build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::io::Result;

fn main() -> Result<()> {
#[cfg(feature = "token")]
println!("cargo:rustc-link-lib=gsasl");

#[cfg(feature = "generate-protobuf")]
Expand Down
4 changes: 4 additions & 0 deletions rust/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ pub enum HdfsError {
DataTransferError(String),
#[error("invalid path")]
InvalidPath(String),
#[error("invalid argument")]
InvalidArgument(String),
#[error("file already exists")]
AlreadyExists(String),
#[error("operation failed")]
OperationFailed(String),
#[error("file not found")]
Expand Down
9 changes: 7 additions & 2 deletions rust/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::hdfs::datanode::{BlockReader, BlockWriter};
use crate::hdfs::ec::EcSchema;
use crate::hdfs::protocol::NamenodeProtocol;
use crate::proto::hdfs;
use crate::Result;
use crate::{HdfsError, Result};

pub struct FileReader {
status: hdfs::HdfsFileStatusProto,
Expand Down Expand Up @@ -74,7 +74,12 @@ impl FileReader {
/// could be smaller than `len` if `offset + len` extends beyond the end of the file.
pub async fn read_range(&self, offset: usize, len: usize) -> Result<Bytes> {
let end = usize::min(self.file_length(), offset + len);
assert!(offset <= end);
if offset >= end {
return Err(HdfsError::InvalidArgument(
"Offset is past the end of the file".to_string(),
));
}

let buf_size = end - offset;
let mut buf = BytesMut::zeroed(buf_size);
self.read_range_buf(&mut buf, offset).await?;
Expand Down
139 changes: 90 additions & 49 deletions rust/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,34 @@ impl HdfsObjectStore {
client: Arc::new(client),
}
}

async fn internal_copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
// TODO: Batch this instead of loading the whole thing
let overwrite = match self.client.get_file_info(&make_absolute_file(to)).await {
Ok(_) if overwrite => true,
Ok(_) => Err(HdfsError::AlreadyExists(make_absolute_file(to)))?,
Err(HdfsError::FileNotFound(_)) => false,
Err(e) => Err(e)?,
};

let mut write_options = WriteOptions::default();
write_options.overwrite = overwrite;

let data = {
let mut file = self.client.read(&make_absolute_file(from)).await?;
file.read(file.remaining()).await?
};

let mut new_file = self
.client
.create(&make_absolute_file(to), write_options)
.await?;

new_file.write(data).await?;
new_file.close().await?;

Ok(())
}
}

impl Display for HdfsObjectStore {
Expand Down Expand Up @@ -225,16 +253,19 @@ impl ObjectStore for HdfsObjectStore {
)
.into_stream()
.filter(|res| {
let result = if let Ok(status) = res {
!status.isdir
} else {
true
let result = match res {
Ok(status) => !status.isdir,
// Listing by prefix should just return an empty list if the prefix isn't found
Err(HdfsError::FileNotFound(_)) => false,
_ => true,
};
future::ready(result)
})
.map(move |res| {
res.map(|s| s.into())
.map_err(|err| object_store::Error::from(err))
.map(|res| {
res.map_or_else(
|e| Err(object_store::Error::from(e)),
|s| get_object_meta(&s),
)
});

Ok(Box::pin(status_stream))
Expand All @@ -247,42 +278,45 @@ impl ObjectStore for HdfsObjectStore {
/// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
/// `foo/bar_baz/x`.
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
let mut status_stream = self.client.list_status_iter(
&prefix
.map(|p| make_absolute_dir(p))
.unwrap_or("".to_string()),
false,
);
let mut status_stream = self
.client
.list_status_iter(
&prefix
.map(|p| make_absolute_dir(p))
.unwrap_or("".to_string()),
false,
)
.into_stream()
.filter(|res| {
let result = match res {
// Listing by prefix should just return an empty list if the prefix isn't found
Err(HdfsError::FileNotFound(_)) => false,
_ => true,
};
future::ready(result)
});

let mut statuses = Vec::<FileStatus>::new();
while let Some(status) = status_stream.next().await {
statuses.push(status?);
}

let dirs: Vec<Path> = statuses
.iter()
.filter(|s| s.isdir)
.map(|s| Path::from(s.path.as_ref()))
.collect();
let files: Vec<ObjectMeta> = statuses
.iter()
.filter(|s| !s.isdir)
.map(|s| s.into())
.collect();
let mut dirs: Vec<Path> = Vec::new();
for status in statuses.iter().filter(|s| s.isdir) {
dirs.push(Path::parse(&status.path)?)
}

let mut files: Vec<ObjectMeta> = Vec::new();
for status in statuses.iter().filter(|s| !s.isdir) {
files.push(get_object_meta(status)?)
}

Ok(ListResult {
common_prefixes: dirs,
objects: files,
})
}

/// Copy an object from one path to another in the same object store.
///
/// If there exists an object at the destination, it will be overwritten.
async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
Err(object_store::Error::NotImplemented)
}

async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
Ok(self
.client
Expand All @@ -297,15 +331,22 @@ impl ObjectStore for HdfsObjectStore {
.await?)
}

/// Copy an object from one path to another in the same object store.
///
/// If there exists an object at the destination, it will be overwritten.
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
self.internal_copy(from, to, true).await
}

/// Copy an object from one path to another, only if destination is empty.
///
/// Will return an error if the destination already has an object.
///
/// Performs an atomic operation if the underlying object storage supports it.
/// If atomic operations are not supported by the underlying object storage (like S3)
/// it will return an error.
async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> {
Err(object_store::Error::NotImplemented)
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.internal_copy(from, to, false).await
}
}

Expand All @@ -316,6 +357,10 @@ impl From<HdfsError> for object_store::Error {
path: path.clone(),
source: Box::new(HdfsError::FileNotFound(path)),
},
HdfsError::AlreadyExists(path) => object_store::Error::AlreadyExists {
path: path.clone(),
source: Box::new(HdfsError::AlreadyExists(path)),
},
_ => object_store::Error::Generic {
store: "HdfsObjectStore",
source: Box::new(value),
Expand Down Expand Up @@ -391,24 +436,20 @@ fn make_absolute_file(path: &Path) -> String {
}

fn make_absolute_dir(path: &Path) -> String {
format!("/{}/", path.as_ref())
}

impl From<&FileStatus> for ObjectMeta {
fn from(status: &FileStatus) -> Self {
ObjectMeta {
location: Path::from(status.path.clone()),
last_modified: Utc.from_utc_datetime(
&NaiveDateTime::from_timestamp_opt(status.modification_time as i64, 0).unwrap(),
),
size: status.length,
e_tag: None,
}
if path.parts().count() > 0 {
format!("/{}/", path.as_ref())
} else {
"/".to_string()
}
}

impl From<FileStatus> for ObjectMeta {
fn from(status: FileStatus) -> Self {
(&status).into()
}
fn get_object_meta(status: &FileStatus) -> Result<ObjectMeta> {
Ok(ObjectMeta {
location: Path::parse(&status.path)?,
last_modified: Utc.from_utc_datetime(
&NaiveDateTime::from_timestamp_opt(status.modification_time as i64, 0).unwrap(),
),
size: status.length,
e_tag: None,
})
}
17 changes: 17 additions & 0 deletions rust/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,23 @@ async fn test_object_store_list(store: &HdfsObjectStore) -> object_store::Result
assert_eq!(list.len(), 1);
assert_eq!(list[0].as_ref().unwrap().location, Path::from("/testfile"));

// Listing of a prefix that doesn't exist should return an empty result, not an error
assert_eq!(
store
.list(Some(&Path::from("/doesnt/exist")))
.await?
.count()
.await,
0
);

let list = store
.list_with_delimiter(Some(&Path::from("/doesnt/exist")))
.await?;

assert_eq!(list.common_prefixes.len(), 0);
assert_eq!(list.objects.len(), 0);

Ok(())
}

Expand Down

0 comments on commit 7f4ab65

Please sign in to comment.