From 01a2da5dc941bf84b86bed57283589acec40154b Mon Sep 17 00:00:00 2001 From: Pavel Zwerschke Date: Wed, 5 Mar 2025 01:39:10 +0100 Subject: [PATCH] wip --- Cargo.lock | 7 -- crates/rattler_index/src/lib.rs | 185 +++++++++++++++++++------------ crates/rattler_index/src/main.rs | 9 +- 3 files changed, 123 insertions(+), 78 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5915dd13c..c9ca34d54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6462,13 +6462,6 @@ dependencies = [ "wit-bindgen-rt", ] -[[package]] -name = "wasm-bin" -version = "0.1.0" -dependencies = [ - "rattler_solve", -] - [[package]] name = "wasm-bindgen" version = "0.2.100" diff --git a/crates/rattler_index/src/lib.rs b/crates/rattler_index/src/lib.rs index c6e0dc72e..f9f84b39e 100644 --- a/crates/rattler_index/src/lib.rs +++ b/crates/rattler_index/src/lib.rs @@ -5,7 +5,7 @@ use anyhow::Result; use bytes::buf::Buf; use fs_err::{self as fs}; -use futures::future::try_join_all; +use futures::{stream::FuturesUnordered, StreamExt}; use fxhash::FxHashMap; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use rattler_conda_types::{ @@ -74,12 +74,11 @@ pub fn package_record_from_index_json( Ok(package_record) } -fn repodata_patch_from_package_stream<'a>( +fn repodata_patch_from_conda_package_stream<'a>( package: impl Read + Seek + 'a, ) -> std::io::Result { let mut subdirs = FxHashMap::default(); - // todo: what about .tar.bz2? let mut content_reader = stream_conda_content(package).unwrap(); let entries = content_reader.entries().unwrap(); for entry in entries { @@ -90,17 +89,16 @@ fn repodata_patch_from_package_stream<'a>( let mut buf = Vec::new(); entry.read_to_end(&mut buf).unwrap(); let path = entry.path().unwrap(); - let components = path.components().into_iter().collect::>(); - let subdir = if components.len() != 2 { - todo!(); - } else { + let components = path.components().collect::>(); + let subdir = if components.len() == 2 { if components[1].as_os_str() != "patch_instructions.json" { todo!(); } components[0].as_os_str().to_string_lossy().to_string() + } else { + todo!(); }; - // println!("Contents: {:?}", String::from_utf8_lossy(&buf)); let instructions: PatchInstructions = serde_json::from_slice(&buf).unwrap(); subdirs.insert(subdir, instructions); } @@ -271,49 +269,72 @@ async fn index_subdir( .progress_chars("##-"); pb.set_style(sty); - let tasks = packages_to_add - .iter() - .map(|filename| { - tokio::spawn({ - let op = op.clone(); - let filename = filename.clone(); - let pb = pb.clone(); - let semaphore = semaphore.clone(); - { - async move { - let _permit = semaphore - .acquire() - .await - .expect("Semaphore was unexpectedly closed"); - pb.set_message(format!( - "Indexing {} {}", - subdir.as_str(), - console::style(filename.clone()).dim() - )); - let file_path = format!("{subdir}/{filename}"); - let buffer = op.read(&file_path).await?; - let reader = buffer.reader(); - // We already know it's not None - let archive_type = ArchiveType::try_from(&filename).unwrap(); - let record = match archive_type { - ArchiveType::TarBz2 => package_record_from_tar_bz2_reader(reader), - ArchiveType::Conda => package_record_from_conda_reader(reader), - }?; - pb.inc(1); - Ok::<(String, PackageRecord), std::io::Error>((filename.clone(), record)) - } + let mut tasks = FuturesUnordered::new(); + for filename in packages_to_add.iter() { + let task = { + let op = op.clone(); + let filename = filename.clone(); + let pb = pb.clone(); + let semaphore = semaphore.clone(); + { + async move { + let _permit = semaphore + .acquire() + .await + .expect("Semaphore was unexpectedly closed"); + pb.set_message(format!( + "Indexing {} {}", + subdir.as_str(), + console::style(filename.clone()).dim() + )); + let file_path = format!("{subdir}/{filename}"); + let buffer = op.read(&file_path).await?; + let reader = buffer.reader(); + // We already know it's not None + let archive_type = ArchiveType::try_from(&filename).unwrap(); + let record = match archive_type { + ArchiveType::TarBz2 => package_record_from_tar_bz2_reader(reader), + ArchiveType::Conda => package_record_from_conda_reader(reader), + }?; + pb.inc(1); + // todo: make this future ok/err instead of results + Ok::<(String, PackageRecord), std::io::Error>((filename.clone(), record)) } - }) - }) - .collect::>(); - let results = try_join_all(tasks).await?; - let results = results.into_iter().collect::>>(); - if let Err(err) = results { - pb.set_message("Failed"); - return Err(err.into()); + } + }; + tasks.push(tokio::spawn(task)); + } + let mut results = Vec::new(); + while let Some(join_result) = tasks.next().await { + match join_result { + Ok(Ok(result)) => results.push(result), + Ok(Err(e)) => { + tasks.clear(); + tracing::error!("Failed to process package: {}", e); + pb.abandon_with_message(format!( + "{} {}", + console::style("Failed to index").red(), + console::style(subdir.as_str()).dim() + )); + return Err(e.into()); + } + Err(join_err) => { + tasks.clear(); + tracing::error!("Task panicked: {}", join_err); + pb.abandon_with_message(format!( + "{} {}", + console::style("Failed to index").red(), + console::style(subdir.as_str()).dim() + )); + return Err(anyhow::anyhow!("Task panicked: {}", join_err)); + } + } } - let results = results.unwrap(); - pb.finish_with_message(format!("Finished {}", subdir.as_str())); + pb.finish_with_message(format!( + "{} {}", + console::style("Finished").green(), + subdir.as_str() + )); tracing::info!( "Successfully added {} packages to subdir {}.", @@ -373,6 +394,7 @@ async fn index_subdir( pub async fn index_fs( channel: impl Into, target_platform: Option, + repodata_patch: Option, force: bool, max_parallel: usize, multi_progress: Option, @@ -385,7 +407,7 @@ pub async fn index_fs( force, max_parallel, multi_progress, - Some("conda-forge-repodata-patches-20250228.14.29.06-hd8ed1ab_1.conda"), + repodata_patch, ) .await } @@ -401,6 +423,7 @@ pub async fn index_s3( secret_access_key: Option, session_token: Option, target_platform: Option, + repodata_patch: Option, force: bool, max_parallel: usize, multi_progress: Option, @@ -443,7 +466,7 @@ pub async fn index_s3( force, max_parallel, multi_progress, - Some("conda-forge-repodata-patches-20250228.14.29.06-hd8ed1ab_1.conda"), + repodata_patch, ) .await } @@ -466,7 +489,7 @@ pub async fn index( force: bool, max_parallel: usize, multi_progress: Option, - repodata_patch: Option<&str>, + repodata_patch: Option, ) -> anyhow::Result<()> { let builder = config.into_builder(); @@ -510,10 +533,19 @@ pub async fn index( } let repodata_patch = if let Some(path) = repodata_patch { + match ArchiveType::try_from(path.clone()) { + Some(ArchiveType::Conda) => {} + Some(ArchiveType::TarBz2) | None => { + return Err(anyhow::anyhow!( + "Only .conda packages are supported for repodata patches. Got: {}", + path + )) + } + } let repodata_patch_path = format!("noarch/{path}"); let repodata_patch_bytes = op.read(&repodata_patch_path).await?.to_bytes(); let reader = Cursor::new(repodata_patch_bytes); - let repodata_patch = repodata_patch_from_package_stream(reader)?; + let repodata_patch = repodata_patch_from_conda_package_stream(reader)?; Some(repodata_patch) } else { None @@ -522,22 +554,35 @@ pub async fn index( let semaphore = Semaphore::new(max_parallel); let semaphore = Arc::new(semaphore); - let tasks = subdirs - .iter() - .map(|subdir| { - tokio::spawn(index_subdir( - *subdir, - op.clone(), - force, - repodata_patch - .as_ref() - .and_then(|p| p.subdirs.get(&subdir.to_string()).cloned()), - multi_progress.clone(), - semaphore.clone(), - )) - }) - .collect::>(); - let results = try_join_all(tasks).await?; - results.into_iter().collect::>>()?; + let mut tasks = FuturesUnordered::new(); + for subdir in subdirs.iter() { + let task = index_subdir( + *subdir, + op.clone(), + force, + repodata_patch + .as_ref() + .and_then(|p| p.subdirs.get(&subdir.to_string()).cloned()), + multi_progress.clone(), + semaphore.clone(), + ); + tasks.push(tokio::spawn(task)); + } + + while let Some(join_result) = tasks.next().await { + match join_result { + Ok(Ok(_)) => {} + Ok(Err(e)) => { + tracing::error!("Failed to process subdir: {}", e); + tasks.clear(); + return Err(e); + } + Err(join_err) => { + tracing::error!("Task panicked: {}", join_err); + tasks.clear(); + return Err(anyhow::anyhow!("Task panicked: {}", join_err)); + } + } + } Ok(()) } diff --git a/crates/rattler_index/src/main.rs b/crates/rattler_index/src/main.rs index c72fc1d47..c8c08754c 100644 --- a/crates/rattler_index/src/main.rs +++ b/crates/rattler_index/src/main.rs @@ -32,13 +32,18 @@ struct Cli { /// The maximum number of packages to process in-memory simultaneously. /// This is necessary to limit memory usage when indexing large channels. - #[arg(long, default_value = "128", global = true)] + #[arg(long, default_value = "32", global = true)] max_parallel: usize, /// A specific platform to index. /// Defaults to all platforms available in the channel. #[arg(long, global = true)] target_platform: Option, + + /// The name of the conda package (expected to be in the `noarch` subdir) that should be used for repodata patching. + /// For more information, see `https://prefix.dev/blog/repodata_patching`. + #[arg(long, global = true)] + repodata_patch: Option, } /// The subcommands for the `rattler-index` CLI. @@ -106,6 +111,7 @@ async fn main() -> anyhow::Result<()> { index_fs( channel, cli.target_platform, + cli.repodata_patch, cli.force, cli.max_parallel, Some(multi_progress), @@ -130,6 +136,7 @@ async fn main() -> anyhow::Result<()> { secret_access_key, session_token, cli.target_platform, + cli.repodata_patch, cli.force, cli.max_parallel, Some(multi_progress),