Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelzw committed Mar 5, 2025
1 parent 29430c4 commit 01a2da5
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 78 deletions.
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

185 changes: 115 additions & 70 deletions crates/rattler_index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use anyhow::Result;
use bytes::buf::Buf;
use fs_err::{self as fs};
use futures::future::try_join_all;
use futures::{stream::FuturesUnordered, StreamExt};
use fxhash::FxHashMap;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use rattler_conda_types::{
Expand Down Expand Up @@ -74,12 +74,11 @@ pub fn package_record_from_index_json<T: Read>(
Ok(package_record)
}

fn repodata_patch_from_package_stream<'a>(
fn repodata_patch_from_conda_package_stream<'a>(
package: impl Read + Seek + 'a,
) -> std::io::Result<rattler_conda_types::RepoDataPatch> {
let mut subdirs = FxHashMap::default();

// todo: what about .tar.bz2?
let mut content_reader = stream_conda_content(package).unwrap();
let entries = content_reader.entries().unwrap();
for entry in entries {
Expand All @@ -90,17 +89,16 @@ fn repodata_patch_from_package_stream<'a>(
let mut buf = Vec::new();
entry.read_to_end(&mut buf).unwrap();
let path = entry.path().unwrap();
let components = path.components().into_iter().collect::<Vec<_>>();
let subdir = if components.len() != 2 {
todo!();
} else {
let components = path.components().collect::<Vec<_>>();
let subdir = if components.len() == 2 {
if components[1].as_os_str() != "patch_instructions.json" {
todo!();
}
components[0].as_os_str().to_string_lossy().to_string()
} else {
todo!();
};

// println!("Contents: {:?}", String::from_utf8_lossy(&buf));
let instructions: PatchInstructions = serde_json::from_slice(&buf).unwrap();
subdirs.insert(subdir, instructions);
}
Expand Down Expand Up @@ -271,49 +269,72 @@ async fn index_subdir(
.progress_chars("##-");
pb.set_style(sty);

let tasks = packages_to_add
.iter()
.map(|filename| {
tokio::spawn({
let op = op.clone();
let filename = filename.clone();
let pb = pb.clone();
let semaphore = semaphore.clone();
{
async move {
let _permit = semaphore
.acquire()
.await
.expect("Semaphore was unexpectedly closed");
pb.set_message(format!(
"Indexing {} {}",
subdir.as_str(),
console::style(filename.clone()).dim()
));
let file_path = format!("{subdir}/{filename}");
let buffer = op.read(&file_path).await?;
let reader = buffer.reader();
// We already know it's not None
let archive_type = ArchiveType::try_from(&filename).unwrap();
let record = match archive_type {
ArchiveType::TarBz2 => package_record_from_tar_bz2_reader(reader),
ArchiveType::Conda => package_record_from_conda_reader(reader),
}?;
pb.inc(1);
Ok::<(String, PackageRecord), std::io::Error>((filename.clone(), record))
}
let mut tasks = FuturesUnordered::new();
for filename in packages_to_add.iter() {
let task = {
let op = op.clone();
let filename = filename.clone();
let pb = pb.clone();
let semaphore = semaphore.clone();
{
async move {
let _permit = semaphore
.acquire()
.await
.expect("Semaphore was unexpectedly closed");
pb.set_message(format!(
"Indexing {} {}",
subdir.as_str(),
console::style(filename.clone()).dim()
));
let file_path = format!("{subdir}/{filename}");
let buffer = op.read(&file_path).await?;
let reader = buffer.reader();
// We already know it's not None
let archive_type = ArchiveType::try_from(&filename).unwrap();
let record = match archive_type {
ArchiveType::TarBz2 => package_record_from_tar_bz2_reader(reader),
ArchiveType::Conda => package_record_from_conda_reader(reader),
}?;
pb.inc(1);
// todo: make this future ok/err instead of results
Ok::<(String, PackageRecord), std::io::Error>((filename.clone(), record))
}
})
})
.collect::<Vec<_>>();
let results = try_join_all(tasks).await?;
let results = results.into_iter().collect::<std::io::Result<Vec<_>>>();
if let Err(err) = results {
pb.set_message("Failed");
return Err(err.into());
}
};
tasks.push(tokio::spawn(task));
}
let mut results = Vec::new();
while let Some(join_result) = tasks.next().await {
match join_result {
Ok(Ok(result)) => results.push(result),
Ok(Err(e)) => {
tasks.clear();
tracing::error!("Failed to process package: {}", e);
pb.abandon_with_message(format!(
"{} {}",
console::style("Failed to index").red(),
console::style(subdir.as_str()).dim()
));
return Err(e.into());
}
Err(join_err) => {
tasks.clear();
tracing::error!("Task panicked: {}", join_err);
pb.abandon_with_message(format!(
"{} {}",
console::style("Failed to index").red(),
console::style(subdir.as_str()).dim()
));
return Err(anyhow::anyhow!("Task panicked: {}", join_err));
}
}
}
let results = results.unwrap();
pb.finish_with_message(format!("Finished {}", subdir.as_str()));
pb.finish_with_message(format!(
"{} {}",
console::style("Finished").green(),
subdir.as_str()
));

tracing::info!(
"Successfully added {} packages to subdir {}.",
Expand Down Expand Up @@ -373,6 +394,7 @@ async fn index_subdir(
pub async fn index_fs(
channel: impl Into<PathBuf>,
target_platform: Option<Platform>,
repodata_patch: Option<String>,
force: bool,
max_parallel: usize,
multi_progress: Option<MultiProgress>,
Expand All @@ -385,7 +407,7 @@ pub async fn index_fs(
force,
max_parallel,
multi_progress,
Some("conda-forge-repodata-patches-20250228.14.29.06-hd8ed1ab_1.conda"),
repodata_patch,
)
.await
}
Expand All @@ -401,6 +423,7 @@ pub async fn index_s3(
secret_access_key: Option<String>,
session_token: Option<String>,
target_platform: Option<Platform>,
repodata_patch: Option<String>,
force: bool,
max_parallel: usize,
multi_progress: Option<MultiProgress>,
Expand Down Expand Up @@ -443,7 +466,7 @@ pub async fn index_s3(
force,
max_parallel,
multi_progress,
Some("conda-forge-repodata-patches-20250228.14.29.06-hd8ed1ab_1.conda"),
repodata_patch,
)
.await
}
Expand All @@ -466,7 +489,7 @@ pub async fn index<T: Configurator>(
force: bool,
max_parallel: usize,
multi_progress: Option<MultiProgress>,
repodata_patch: Option<&str>,
repodata_patch: Option<String>,
) -> anyhow::Result<()> {
let builder = config.into_builder();

Expand Down Expand Up @@ -510,10 +533,19 @@ pub async fn index<T: Configurator>(
}

let repodata_patch = if let Some(path) = repodata_patch {
match ArchiveType::try_from(path.clone()) {
Some(ArchiveType::Conda) => {}
Some(ArchiveType::TarBz2) | None => {
return Err(anyhow::anyhow!(
"Only .conda packages are supported for repodata patches. Got: {}",
path
))
}
}
let repodata_patch_path = format!("noarch/{path}");
let repodata_patch_bytes = op.read(&repodata_patch_path).await?.to_bytes();
let reader = Cursor::new(repodata_patch_bytes);
let repodata_patch = repodata_patch_from_package_stream(reader)?;
let repodata_patch = repodata_patch_from_conda_package_stream(reader)?;
Some(repodata_patch)
} else {
None
Expand All @@ -522,22 +554,35 @@ pub async fn index<T: Configurator>(
let semaphore = Semaphore::new(max_parallel);
let semaphore = Arc::new(semaphore);

let tasks = subdirs
.iter()
.map(|subdir| {
tokio::spawn(index_subdir(
*subdir,
op.clone(),
force,
repodata_patch
.as_ref()
.and_then(|p| p.subdirs.get(&subdir.to_string()).cloned()),
multi_progress.clone(),
semaphore.clone(),
))
})
.collect::<Vec<_>>();
let results = try_join_all(tasks).await?;
results.into_iter().collect::<anyhow::Result<Vec<_>>>()?;
let mut tasks = FuturesUnordered::new();
for subdir in subdirs.iter() {
let task = index_subdir(
*subdir,
op.clone(),
force,
repodata_patch
.as_ref()
.and_then(|p| p.subdirs.get(&subdir.to_string()).cloned()),
multi_progress.clone(),
semaphore.clone(),
);
tasks.push(tokio::spawn(task));
}

while let Some(join_result) = tasks.next().await {
match join_result {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
tracing::error!("Failed to process subdir: {}", e);
tasks.clear();
return Err(e);
}
Err(join_err) => {
tracing::error!("Task panicked: {}", join_err);
tasks.clear();
return Err(anyhow::anyhow!("Task panicked: {}", join_err));
}
}
}
Ok(())
}
9 changes: 8 additions & 1 deletion crates/rattler_index/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,18 @@ struct Cli {

/// The maximum number of packages to process in-memory simultaneously.
/// This is necessary to limit memory usage when indexing large channels.
#[arg(long, default_value = "128", global = true)]
#[arg(long, default_value = "32", global = true)]
max_parallel: usize,

/// A specific platform to index.
/// Defaults to all platforms available in the channel.
#[arg(long, global = true)]
target_platform: Option<Platform>,

/// The name of the conda package (expected to be in the `noarch` subdir) that should be used for repodata patching.
/// For more information, see `https://prefix.dev/blog/repodata_patching`.
#[arg(long, global = true)]
repodata_patch: Option<String>,
}

/// The subcommands for the `rattler-index` CLI.
Expand Down Expand Up @@ -106,6 +111,7 @@ async fn main() -> anyhow::Result<()> {
index_fs(
channel,
cli.target_platform,
cli.repodata_patch,
cli.force,
cli.max_parallel,
Some(multi_progress),
Expand All @@ -130,6 +136,7 @@ async fn main() -> anyhow::Result<()> {
secret_access_key,
session_token,
cli.target_platform,
cli.repodata_patch,
cli.force,
cli.max_parallel,
Some(multi_progress),
Expand Down

0 comments on commit 01a2da5

Please sign in to comment.