diff --git a/data/Cargo.toml b/data/Cargo.toml index bc9abb4..03b2f4e 100644 --- a/data/Cargo.toml +++ b/data/Cargo.toml @@ -54,16 +54,20 @@ glob = "0.3.1" reqwest-middleware = "0.3.3" chrono = "0.4.39" +# Needed for cleaning files from urls +reqwest = { version = "0.12.12", features = ["stream"] } +futures = "0.3.31" + [target.'cfg(not(windows))'.dependencies] openssl = "0.10" # use embedded webpki root certs for MacOS as native certs take a very long time # to load, which affects startup time significantly [target.'cfg(macos)'.dependencies] -reqwest = { version = "0.11.4", features = ["json", "webpki-roots"] } +reqwest = { version = "0.12.12", features = ["json", "rustls-tls-webpki-roots"] } [target.'cfg(not(macos))'.dependencies] -reqwest = { version = "0.11.4", features = ["json"] } +reqwest = { version = "0.12.12", features = ["json"] } # Windows doesn't support assembly for compilation [target.'cfg(not(target_os = "windows"))'.dependencies] diff --git a/data/src/data_client.rs b/data/src/data_client.rs index e50f2ac..921a6fa 100644 --- a/data/src/data_client.rs +++ b/data/src/data_client.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use cas_client::CacheConfig; use dirs::home_dir; +use futures::{AsyncReadExt, TryStreamExt}; use lazy_static::lazy_static; use merkledb::constants::IDEAL_CAS_BLOCK_SIZE; use parutils::{tokio_par_for_each, ParallelError}; @@ -200,6 +201,39 @@ async fn clean_file(processor: &PointerFileTranslator, f: String) -> errors::Res Ok(pf) } +async fn clean_file_from_url( + processor: &PointerFileTranslator, + file_url: String, + path: String, +) -> errors::Result { + let path = PathBuf::from(path); + let mut read_buf = vec![0u8; READ_BLOCK_SIZE]; + + let response = reqwest::get(file_url).await?; + + let mut reader = response.bytes_stream().map_err(std::io::Error::other).into_async_read(); + + let handle = processor + .start_clean( + IDEAL_CAS_BLOCK_SIZE / READ_BLOCK_SIZE, // enough to fill one CAS block + Some(&path), // for logging & telemetry + ) + .await?; + + loop { + let bytes = reader.read(&mut read_buf).await?; + if bytes == 0 { + break; + } + + handle.add_bytes(read_buf[0..bytes].to_vec()).await?; + } + + let pf_str = handle.result().await?; + let pf = PointerFile::init_from_string(&pf_str, path.to_str().unwrap()); + Ok(pf) +} + async fn smudge_file( proc: &PointerFileTranslator, pointer_file: &PointerFile, diff --git a/data/src/errors.rs b/data/src/errors.rs index b310a5d..ac45ac7 100644 --- a/data/src/errors.rs +++ b/data/src/errors.rs @@ -72,6 +72,9 @@ pub enum DataProcessingError { #[error("AuthError: {0}")] AuthError(#[from] AuthError), + + #[error("Reqwest: {0}")] + ReqwestError(#[from] reqwest::Error), } pub type Result = std::result::Result;