Skip to content

Commit

Permalink
fix: retry failed repodata streaming on io error (#1017)
Browse files Browse the repository at this point in the history
  • Loading branch information
baszalmstra authored Jan 8, 2025
1 parent df49f15 commit 2133141
Show file tree
Hide file tree
Showing 5 changed files with 496 additions and 237 deletions.
57 changes: 32 additions & 25 deletions crates/rattler_cache/src/package_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use rattler_digest::Sha256Hash;
use rattler_networking::retry_policies::{DoNotRetryPolicy, RetryDecision, RetryPolicy};
use rattler_package_streaming::{DownloadReporter, ExtractError};
pub use reporter::CacheReporter;
use reqwest::StatusCode;
use simple_spawn_blocking::Cancelled;
use tracing::instrument;
use url::Url;
Expand Down Expand Up @@ -210,6 +209,12 @@ impl PackageCache {
/// This is a convenience wrapper around `get_or_fetch` which fetches the
/// package from the given URL if the package could not be found in the
/// cache.
///
/// This function assumes that the `client` is already configured with a
/// retry middleware that will retry any request that fails. This function
/// uses the passed in `retry_policy` if, after the request has been sent
/// and the response is successful, streaming of the package data fails
/// and the whole request must be retried.
#[instrument(skip_all, fields(url=%url))]
pub async fn get_or_fetch_from_url_with_retry(
&self,
Expand Down Expand Up @@ -253,17 +258,12 @@ impl PackageCache {
// Extract any potential error
let Err(err) = result else { return Ok(()); };

// Only retry on certain errors.
if !matches!(
&err,
ExtractError::IoError(_) | ExtractError::CouldNotCreateDestination(_)
) && !matches!(&err, ExtractError::ReqwestError(err) if
err.is_timeout() ||
err.is_connect() ||
err
.status()
.map_or(false, |status| status.is_server_error() || status == StatusCode::TOO_MANY_REQUESTS || status == StatusCode::REQUEST_TIMEOUT)
) {
// Only retry on io errors. We assume that the user has
// middleware installed that handles connection retries.

if !matches!(&err,
ExtractError::IoError(_) | ExtractError::CouldNotCreateDestination(_)
) {
return Err(err);
}

Expand Down Expand Up @@ -502,14 +502,19 @@ mod test {
use rattler_conda_types::package::{ArchiveIdentifier, PackageFile, PathsJson};
use rattler_digest::{parse_digest_from_hex, Sha256};
use rattler_networking::retry_policies::{DoNotRetryPolicy, ExponentialBackoffBuilder};
use reqwest::Client;
use reqwest_middleware::ClientBuilder;
use reqwest_retry::RetryTransientMiddleware;
use tempfile::tempdir;
use tokio::sync::Mutex;
use tokio_stream::StreamExt;
use url::Url;

use super::PackageCache;
use crate::validation::ValidationMode;
use crate::{package_cache::CacheKey, validation::validate_package_directory};
use crate::{
package_cache::CacheKey,
validation::{validate_package_directory, ValidationMode},
};

fn get_test_data_dir() -> PathBuf {
Path::new(env!("CARGO_MANIFEST_DIR")).join("../../test-data")
Expand Down Expand Up @@ -630,23 +635,18 @@ mod test {
FailAfterBytes(usize),
}

async fn redirect_to_anaconda(
async fn redirect_to_prefix(
axum::extract::Path((channel, subdir, file)): axum::extract::Path<(String, String, String)>,
) -> Redirect {
Redirect::permanent(&format!(
"https://conda.anaconda.org/{channel}/{subdir}/{file}"
))
Redirect::permanent(&format!("https://prefix.dev/{channel}/{subdir}/{file}"))
}

async fn test_flaky_package_cache(archive_name: &str, middleware: Middleware) {
let static_dir = get_test_data_dir();
println!("Serving files from {}", static_dir.display());

// Construct a service that serves raw files from the test directory
// build our application with a route
let router = Router::new()
// `GET /` goes to `root`
.route("/{channel}/{subdir}/{file}", get(redirect_to_anaconda));
.route("/{channel}/{subdir}/{file}", get(redirect_to_prefix));

// Construct a router that returns data from the static dir but fails the first
// try.
Expand Down Expand Up @@ -679,12 +679,14 @@ mod test {

let server_url = Url::parse(&format!("http://localhost:{}", addr.port())).unwrap();

let client = ClientBuilder::new(Client::default()).build();

// Do the first request without
let result = cache
.get_or_fetch_from_url_with_retry(
ArchiveIdentifier::try_from_filename(archive_name).unwrap(),
server_url.join(archive_name).unwrap(),
reqwest::Client::default().into(),
client.clone(),
DoNotRetryPolicy,
None,
)
Expand All @@ -697,13 +699,18 @@ mod test {
assert_eq!(*request_count_lock, 1, "Expected there to be 1 request");
}

let retry_policy = ExponentialBackoffBuilder::default().build_with_max_retries(3);
let client = ClientBuilder::from_client(client)
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build();

// The second one should fail after the 2nd try
let result = cache
.get_or_fetch_from_url_with_retry(
ArchiveIdentifier::try_from_filename(archive_name).unwrap(),
server_url.join(archive_name).unwrap(),
reqwest::Client::default().into(),
ExponentialBackoffBuilder::default().build_with_max_retries(3),
client,
retry_policy,
None,
)
.await;
Expand Down
48 changes: 30 additions & 18 deletions crates/rattler_package_streaming/src/reqwest/tokio.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
//! Functionality to stream and extract packages directly from a [`reqwest::Url`] within a [`tokio`]
//! async context.
//! Functionality to stream and extract packages directly from a
//! [`reqwest::Url`] within a [`tokio`] async context.
use std::{path::Path, sync::Arc};

use crate::{DownloadReporter, ExtractError, ExtractResult};
use fs_err::tokio as tokio_fs;
use futures_util::stream::TryStreamExt;
use rattler_conda_types::package::ArchiveType;
use rattler_digest::Sha256Hash;
use reqwest::Response;
use std::path::Path;
use std::sync::Arc;
use tokio::io::BufReader;
use tokio_util::either::Either;
use tokio_util::io::StreamReader;
use tokio_util::{either::Either, io::StreamReader};
use tracing;
use url::Url;
use zip::result::ZipError;

/// zipfiles may use data descriptors to signal that the decompressor needs to seek ahead in the buffer
/// to find the compressed data length.
/// Since we stream the package over a non seekable HTTP connection, this condition will cause an error during
/// decompression. In this case, we fallback to reading the whole data to a buffer before attempting decompression.
/// Read more in <https://github.com/conda/rattler/issues/794>
use crate::{DownloadReporter, ExtractError, ExtractResult};

/// zipfiles may use data descriptors to signal that the decompressor needs to
/// seek ahead in the buffer to find the compressed data length.
/// Since we stream the package over a non seekable HTTP connection, this
/// condition will cause an error during decompression. In this case, we
/// fallback to reading the whole data to a buffer before attempting
/// decompression. Read more in <https://github.com/conda/rattler/issues/794>
const DATA_DESCRIPTOR_ERROR_MESSAGE: &str = "The file length is not available in the local header";

fn error_for_status(response: reqwest::Response) -> reqwest_middleware::Result<Response> {
Expand Down Expand Up @@ -51,7 +52,8 @@ async fn get_reader(
let mut request = client.get(url.clone());

if let Some(sha256) = expected_sha256 {
// This is used by the OCI registry middleware to verify the sha256 of the response
// This is used by the OCI registry middleware to verify the sha256 of the
// response
request = request.header("X-Expected-Sha256", format!("{sha256:x}"));
}

Expand All @@ -72,12 +74,21 @@ async fn get_reader(

// Get the response as a stream
Ok(Either::Right(StreamReader::new(byte_stream.map_err(
|err| std::io::Error::new(std::io::ErrorKind::Other, err),
|err| {
if err.is_body() {
std::io::Error::new(std::io::ErrorKind::Interrupted, err)
} else if err.is_decode() {
std::io::Error::new(std::io::ErrorKind::InvalidData, err)
} else {
std::io::Error::new(std::io::ErrorKind::Other, err)
}
},
))))
}
}

/// Extracts the contents a `.tar.bz2` package archive from the specified remote location.
/// Extracts the contents a `.tar.bz2` package archive from the specified remote
/// location.
///
/// ```rust,no_run
/// # #[tokio::main]
Expand Down Expand Up @@ -113,7 +124,8 @@ pub async fn extract_tar_bz2(
Ok(result)
}

/// Extracts the contents a `.conda` package archive from the specified remote location.
/// Extracts the contents a `.conda` package archive from the specified remote
/// location.
///
/// ```rust,no_run
/// # #[tokio::main]
Expand Down Expand Up @@ -182,8 +194,8 @@ pub async fn extract_conda(
}
}

/// Extracts the contents a package archive from the specified remote location. The type of package
/// is determined based on the path of the url.
/// Extracts the contents a package archive from the specified remote location.
/// The type of package is determined based on the path of the url.
///
/// ```rust,no_run
/// # #[tokio::main]
Expand Down
1 change: 1 addition & 0 deletions crates/rattler_repodata_gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ tokio-util = { workspace = true, features = ["codec", "io"] }
tracing = { workspace = true }
url = { workspace = true, features = ["serde"] }
zstd = { workspace = true }
retry-policies = { workspace = true }
rattler_cache = { version = "0.3.1", path = "../rattler_cache" }
rattler_redaction = { version = "0.1.5", path = "../rattler_redaction", features = ["reqwest", "reqwest-middleware"] }

Expand Down
Loading

0 comments on commit 2133141

Please sign in to comment.