Skip to content

Commit

Permalink
Better Storage errors (#680)
Browse files Browse the repository at this point in the history
Some conditions weren't really errors, but expected results. Changing
those cases richer result types.
  • Loading branch information
paraseba authored Feb 5, 2025
1 parent e1afdae commit 4c705da
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 112 deletions.
56 changes: 27 additions & 29 deletions icechunk/src/refs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ use serde::{Deserialize, Serialize};
use serde_with::{serde_as, TryFromInto};
use thiserror::Error;

use crate::{format::SnapshotId, storage, Storage, StorageError};
use crate::{
format::SnapshotId,
storage::{self, GetRefResult, WriteRefResult},
Storage, StorageError,
};

fn crock_encode_int(n: u64) -> String {
// skip the first 3 bytes (zeroes)
Expand Down Expand Up @@ -179,21 +183,21 @@ pub async fn create_tag(
let key = tag_key(name)?;
let data = RefData { snapshot };
let content = serde_json::to_vec(&data)?;
storage
match storage
.write_ref(
storage_settings,
key.as_str(),
overwrite_refs,
Bytes::copy_from_slice(&content),
)
.await
.map_err(|e| match e {
StorageError::RefAlreadyExists(_) => {
RefError::TagAlreadyExists(name.to_string())
}
err => err.into(),
})?;
Ok(())
{
Ok(WriteRefResult::Written) => Ok(()),
Ok(WriteRefResult::WontOverwrite) => {
Err(RefError::TagAlreadyExists(name.to_string()))
}
Err(err) => Err(err.into()),
}
}

#[async_recursion]
Expand Down Expand Up @@ -237,8 +241,8 @@ pub async fn update_branch(
)
.await
{
Ok(_) => Ok(new_version),
Err(StorageError::RefAlreadyExists(_)) => {
Ok(WriteRefResult::Written) => Ok(new_version),
Ok(WriteRefResult::WontOverwrite) => {
// If the branch version already exists, an update happened since we checked
// we can just try again and the conflict will be reported
update_branch(
Expand Down Expand Up @@ -370,19 +374,19 @@ pub async fn delete_tag(

// no race condition: delete_tag ^ 2 = delete_tag
let key = tag_delete_marker_key(tag)?;
storage
match storage
.write_ref(
storage_settings,
key.as_str(),
overwrite_refs,
Bytes::from_static(&[]),
)
.await
.map_err(|e| match e {
StorageError::RefAlreadyExists(_) => RefError::RefNotFound(tag.to_string()),
err => err.into(),
})?;
Ok(())
{
Ok(WriteRefResult::Written) => Ok(()),
Ok(WriteRefResult::WontOverwrite) => Err(RefError::RefNotFound(tag.to_string())),
Err(err) => Err(err.into()),
}
}

pub async fn fetch_tag(
Expand All @@ -395,20 +399,16 @@ pub async fn fetch_tag(

let fut1: Pin<Box<dyn Future<Output = RefResult<Bytes>>>> = async move {
match storage.get_ref(storage_settings, ref_path.as_str()).await {
Ok(data) => Ok(data),
Err(StorageError::RefNotFound(..)) => {
Err(RefError::RefNotFound(name.to_string()))
}
Ok(GetRefResult::Found { bytes }) => Ok(bytes),
Ok(GetRefResult::NotFound) => Err(RefError::RefNotFound(name.to_string())),
Err(err) => Err(err.into()),
}
}
.boxed();
let fut2 = async move {
match storage.get_ref(storage_settings, delete_marker_path.as_str()).await {
Ok(_) => Ok(Bytes::new()),
Err(StorageError::RefNotFound(..)) => {
Err(RefError::RefNotFound(name.to_string()))
}
Ok(GetRefResult::Found { .. }) => Ok(Bytes::new()),
Ok(GetRefResult::NotFound) => Err(RefError::RefNotFound(name.to_string())),
Err(err) => Err(err.into()),
}
}
Expand Down Expand Up @@ -441,10 +441,8 @@ async fn fetch_branch(
) -> RefResult<RefData> {
let path = version.to_path(name)?;
match storage.get_ref(storage_settings, path.as_str()).await {
Ok(data) => Ok(serde_json::from_slice(data.as_ref())?),
Err(StorageError::RefNotFound(..)) => {
Err(RefError::RefNotFound(name.to_string()))
}
Ok(GetRefResult::Found { bytes }) => Ok(serde_json::from_slice(bytes.as_ref())?),
Ok(GetRefResult::NotFound) => Err(RefError::RefNotFound(name.to_string())),
Err(err) => Err(err.into()),
}
}
Expand Down
19 changes: 13 additions & 6 deletions icechunk/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
list_branches, list_tags, update_branch, BranchVersion, Ref, RefError,
},
session::Session,
storage::{self, ETag},
storage::{self, ETag, FetchConfigResult, UpdateConfigResult},
virtual_chunks::{ContainerName, VirtualChunkResolver},
Storage, StorageError,
};
Expand Down Expand Up @@ -66,6 +66,8 @@ pub enum RepositoryError {
ConflictingPathNotFound(NodeId),
#[error("error in config deserialization `{0}`")]
ConfigDeserializationError(#[from] serde_yml::Error),
#[error("config was updated by other session")]
ConfigWasUpdated,
#[error("branch update conflict: `({expected_parent:?}) != ({actual_parent:?})`")]
Conflict { expected_parent: Option<SnapshotId>, actual_parent: Option<SnapshotId> },
#[error("I/O error `{0}`")]
Expand Down Expand Up @@ -267,11 +269,11 @@ impl Repository {
storage: &(dyn Storage + Send + Sync),
) -> RepositoryResult<Option<(RepositoryConfig, ETag)>> {
match storage.fetch_config(&storage.default_settings()).await? {
Some((bytes, etag)) => {
FetchConfigResult::Found { bytes, etag } => {
let config = serde_yml::from_slice(&bytes)?;
Ok(Some((config, etag)))
}
None => Ok(None),
FetchConfigResult::NotFound => Ok(None),
}
}

Expand All @@ -290,14 +292,19 @@ impl Repository {
config_etag: Option<&ETag>,
) -> RepositoryResult<ETag> {
let bytes = Bytes::from(serde_yml::to_string(config)?);
let res = storage
match storage
.update_config(
&storage.default_settings(),
bytes,
config_etag.map(|e| e.as_str()),
)
.await?;
Ok(res)
.await?
{
UpdateConfigResult::Updated { new_etag } => Ok(new_etag),
UpdateConfigResult::NotOnLatestVersion => {
Err(RepositoryError::ConfigWasUpdated)
}
}
}

pub fn config(&self) -> &RepositoryConfig {
Expand Down
17 changes: 12 additions & 5 deletions icechunk/src/storage/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncRead;

use super::{ETag, ListInfo, Reader, Settings, Storage, StorageError, StorageResult};
use super::{
FetchConfigResult, GetRefResult, ListInfo, Reader, Settings, Storage, StorageError,
StorageResult, UpdateConfigResult, WriteRefResult,
};
use crate::{
format::{ChunkId, ChunkOffset, ManifestId, SnapshotId},
private,
Expand Down Expand Up @@ -46,15 +49,15 @@ impl Storage for LoggingStorage {
async fn fetch_config(
&self,
settings: &Settings,
) -> StorageResult<Option<(Bytes, ETag)>> {
) -> StorageResult<FetchConfigResult> {
self.backend.fetch_config(settings).await
}
async fn update_config(
&self,
settings: &Settings,
config: Bytes,
etag: Option<&str>,
) -> StorageResult<ETag> {
) -> StorageResult<UpdateConfigResult> {
self.backend.update_config(settings, config, etag).await
}

Expand Down Expand Up @@ -159,7 +162,11 @@ impl Storage for LoggingStorage {
self.backend.write_chunk(settings, id, bytes).await
}

async fn get_ref(&self, settings: &Settings, ref_key: &str) -> StorageResult<Bytes> {
async fn get_ref(
&self,
settings: &Settings,
ref_key: &str,
) -> StorageResult<GetRefResult> {
self.backend.get_ref(settings, ref_key).await
}

Expand All @@ -173,7 +180,7 @@ impl Storage for LoggingStorage {
ref_key: &str,
overwrite_refs: bool,
bytes: Bytes,
) -> StorageResult<()> {
) -> StorageResult<WriteRefResult> {
self.backend.write_ref(settings, ref_key, overwrite_refs, bytes).await
}

Expand Down
48 changes: 34 additions & 14 deletions icechunk/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,6 @@ pub enum StorageError {
S3DeleteObjectError(#[from] SdkError<DeleteObjectsError, HttpResponse>),
#[error("error streaming bytes from object store {0}")]
S3StreamError(#[from] ByteStreamError),
#[error("cannot overwrite ref: {0}")]
RefAlreadyExists(String),
#[error("ref not found: {0}")]
RefNotFound(String),
#[error("the etag does not match")]
ConfigUpdateConflict,
#[error("I/O error: {0}")]
IOError(#[from] std::io::Error),
#[error("unknown storage error: {0}")]
Expand Down Expand Up @@ -192,6 +186,30 @@ impl Reader {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FetchConfigResult {
Found { bytes: Bytes, etag: ETag },
NotFound,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum UpdateConfigResult {
Updated { new_etag: ETag },
NotOnLatestVersion,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum GetRefResult {
Found { bytes: Bytes },
NotFound,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WriteRefResult {
Written,
WontOverwrite,
}

/// Fetch and write the parquet files that represent the repository in object store
///
/// Different implementation can cache the files differently, or not at all.
Expand All @@ -202,16 +220,14 @@ pub trait Storage: fmt::Debug + private::Sealed + Sync + Send {
fn default_settings(&self) -> Settings {
Default::default()
}
async fn fetch_config(
&self,
settings: &Settings,
) -> StorageResult<Option<(Bytes, ETag)>>;
async fn fetch_config(&self, settings: &Settings)
-> StorageResult<FetchConfigResult>;
async fn update_config(
&self,
settings: &Settings,
config: Bytes,
etag: Option<&str>,
) -> StorageResult<ETag>;
) -> StorageResult<UpdateConfigResult>;
async fn fetch_snapshot(
&self,
settings: &Settings,
Expand Down Expand Up @@ -273,7 +289,11 @@ pub trait Storage: fmt::Debug + private::Sealed + Sync + Send {
bytes: Bytes,
) -> StorageResult<()>;

async fn get_ref(&self, settings: &Settings, ref_key: &str) -> StorageResult<Bytes>;
async fn get_ref(
&self,
settings: &Settings,
ref_key: &str,
) -> StorageResult<GetRefResult>;
async fn ref_names(&self, settings: &Settings) -> StorageResult<Vec<String>>;
async fn ref_versions(
&self,
Expand All @@ -286,7 +306,7 @@ pub trait Storage: fmt::Debug + private::Sealed + Sync + Send {
ref_key: &str,
overwrite_refs: bool,
bytes: Bytes,
) -> StorageResult<()>;
) -> StorageResult<WriteRefResult>;

async fn list_objects<'a>(
&'a self,
Expand Down Expand Up @@ -666,7 +686,7 @@ pub fn new_gcs_storage(
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
#[allow(clippy::unwrap_used, clippy::panic)]
mod tests {

use std::collections::HashSet;
Expand Down
Loading

0 comments on commit 4c705da

Please sign in to comment.