Skip to content

Commit

Permalink
add clean_file_from_url
Browse files Browse the repository at this point in the history
  • Loading branch information
assafvayner committed Feb 4, 2025
1 parent 0bc9a42 commit 173196b
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 2 deletions.
8 changes: 6 additions & 2 deletions data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,20 @@ glob = "0.3.1"
reqwest-middleware = "0.3.3"
chrono = "0.4.39"

# Needed for cleaning files from urls
reqwest = { version = "0.12.12", features = ["stream"] }
futures = "0.3.31"

[target.'cfg(not(windows))'.dependencies]
openssl = "0.10"

# use embedded webpki root certs for MacOS as native certs take a very long time
# to load, which affects startup time significantly
[target.'cfg(macos)'.dependencies]
reqwest = { version = "0.11.4", features = ["json", "webpki-roots"] }
reqwest = { version = "0.12.12", features = ["json", "rustls-tls-webpki-roots"] }

[target.'cfg(not(macos))'.dependencies]
reqwest = { version = "0.11.4", features = ["json"] }
reqwest = { version = "0.12.12", features = ["json"] }

# Windows doesn't support assembly for compilation
[target.'cfg(not(target_os = "windows"))'.dependencies]
Expand Down
34 changes: 34 additions & 0 deletions data/src/data_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::sync::Arc;

use cas_client::CacheConfig;
use dirs::home_dir;
use futures::{AsyncReadExt, TryStreamExt};
use lazy_static::lazy_static;
use merkledb::constants::IDEAL_CAS_BLOCK_SIZE;
use parutils::{tokio_par_for_each, ParallelError};
Expand Down Expand Up @@ -200,6 +201,39 @@ async fn clean_file(processor: &PointerFileTranslator, f: String) -> errors::Res
Ok(pf)
}

async fn clean_file_from_url(
processor: &PointerFileTranslator,
file_url: String,
path: String,
) -> errors::Result<PointerFile> {
let path = PathBuf::from(path);
let mut read_buf = vec![0u8; READ_BLOCK_SIZE];

let response = reqwest::get(file_url).await?;

let mut reader = response.bytes_stream().map_err(std::io::Error::other).into_async_read();

let handle = processor
.start_clean(
IDEAL_CAS_BLOCK_SIZE / READ_BLOCK_SIZE, // enough to fill one CAS block
Some(&path), // for logging & telemetry
)
.await?;

loop {
let bytes = reader.read(&mut read_buf).await?;
if bytes == 0 {
break;
}

handle.add_bytes(read_buf[0..bytes].to_vec()).await?;
}

let pf_str = handle.result().await?;
let pf = PointerFile::init_from_string(&pf_str, path.to_str().unwrap());
Ok(pf)
}

async fn smudge_file(
proc: &PointerFileTranslator,
pointer_file: &PointerFile,
Expand Down
3 changes: 3 additions & 0 deletions data/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ pub enum DataProcessingError {

#[error("AuthError: {0}")]
AuthError(#[from] AuthError),

#[error("Reqwest: {0}")]
ReqwestError(#[from] reqwest::Error),
}

pub type Result<T> = std::result::Result<T, DataProcessingError>;
Expand Down

0 comments on commit 173196b

Please sign in to comment.