diff --git a/crates/collab/.env.toml b/crates/collab/.env.toml index e775179f6c2504..168e14415430e8 100644 --- a/crates/collab/.env.toml +++ b/crates/collab/.env.toml @@ -18,6 +18,7 @@ SEED_PATH = "crates/collab/seed.default.json" LLM_DATABASE_URL = "postgres://postgres@localhost/zed_llm" LLM_DATABASE_MAX_CONNECTIONS = 5 LLM_API_SECRET = "llm-secret" +OPENAI_API_KEY = "llm-secret" # SLACK_PANICS_WEBHOOK = "" diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index 53bd58596c3bff..1b8f7e9479fed4 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -1772,6 +1772,7 @@ impl Workspace { self.project.read(cx).visible_worktrees(cx) } + #[cfg(any(test, feature = "test-support"))] pub fn worktree_scans_complete(&self, cx: &App) -> impl Future + 'static { let futures = self .worktrees(cx) diff --git a/crates/worktree/src/worktree.rs b/crates/worktree/src/worktree.rs index c3e444caf1d19b..16f9a27ceb9344 100644 --- a/crates/worktree/src/worktree.rs +++ b/crates/worktree/src/worktree.rs @@ -13,6 +13,7 @@ use futures::{ mpsc::{self, UnboundedSender}, oneshot, }, + future::join_all, select_biased, task::Poll, FutureExt as _, Stream, StreamExt, @@ -59,7 +60,7 @@ use std::{ path::{Path, PathBuf}, pin::Pin, sync::{ - atomic::{AtomicUsize, Ordering::SeqCst}, + atomic::{self, AtomicU32, AtomicUsize, Ordering::SeqCst}, Arc, }, time::{Duration, Instant}, @@ -559,6 +560,7 @@ struct BackgroundScannerState { changed_paths: Vec>, prev_snapshot: Snapshot, git_hosting_provider_registry: Option>, + repository_scans: HashMap>, } #[derive(Debug, Clone)] @@ -623,6 +625,7 @@ impl DerefMut for LocalSnapshot { } } +#[derive(Debug)] enum ScanState { Started, Updated { @@ -1445,7 +1448,7 @@ impl LocalWorktree { scan_requests_rx, path_prefixes_to_scan_rx, next_entry_id, - state: Mutex::new(BackgroundScannerState { + state: Arc::new(Mutex::new(BackgroundScannerState { prev_snapshot: snapshot.snapshot.clone(), snapshot, scanned_dirs: Default::default(), @@ -1453,8 +1456,9 @@ impl LocalWorktree { paths_to_scan: Default::default(), removed_entries: Default::default(), changed_paths: Default::default(), + repository_scans: HashMap::default(), git_hosting_provider_registry, - }), + })), phase: BackgroundScannerPhase::InitialScan, share_private_files, settings, @@ -2227,14 +2231,12 @@ impl LocalWorktree { let _maintain_remote_snapshot = cx.background_executor().spawn(async move { let mut is_first = true; while let Some((snapshot, entry_changes, repo_changes)) = snapshots_rx.next().await { - let update; - if is_first { - update = snapshot.build_initial_update(project_id, worktree_id); + let update = if is_first { is_first = false; + snapshot.build_initial_update(project_id, worktree_id) } else { - update = - snapshot.build_update(project_id, worktree_id, entry_changes, repo_changes); - } + snapshot.build_update(project_id, worktree_id, entry_changes, repo_changes) + }; for update in proto::split_worktree_update(update) { let _ = resume_updates_rx.try_recv(); @@ -2604,7 +2606,7 @@ impl Snapshot { mut update: proto::UpdateWorktree, always_included_paths: &PathMatcher, ) -> Result<()> { - log::trace!( + log::debug!( "applying remote worktree update. {} entries updated, {} removed", update.updated_entries.len(), update.removed_entries.len() @@ -4163,6 +4165,11 @@ struct PathEntry { scan_id: usize, } +#[derive(Debug, Default)] +struct FsScanned { + status_scans: Arc, +} + impl sum_tree::Item for PathEntry { type Summary = PathEntrySummary; @@ -4226,7 +4233,7 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey { } struct BackgroundScanner { - state: Mutex, + state: Arc>, fs: Arc, fs_case_sensitive: bool, status_updates_tx: UnboundedSender, @@ -4240,7 +4247,7 @@ struct BackgroundScanner { share_private_files: bool, } -#[derive(PartialEq)] +#[derive(Copy, Clone, PartialEq)] enum BackgroundScannerPhase { InitialScan, EventsReceivedDuringInitialScan, @@ -4249,8 +4256,6 @@ enum BackgroundScannerPhase { impl BackgroundScanner { async fn run(&mut self, mut fs_events_rx: Pin>>>) { - use futures::FutureExt as _; - // If the worktree root does not contain a git repository, then find // the git repository in an ancestor directory. Find any gitignore files // in ancestor directories. @@ -4328,13 +4333,14 @@ impl BackgroundScanner { // Perform an initial scan of the directory. drop(scan_job_tx); - self.scan_dirs(true, scan_job_rx).await; + let scans_running = self.scan_dirs(true, scan_job_rx).await; { let mut state = self.state.lock(); state.snapshot.completed_scan_id = state.snapshot.scan_id; } - self.send_status_update(false, SmallVec::new()); + let scanning = scans_running.status_scans.load(atomic::Ordering::Acquire) > 0; + self.send_status_update(scanning, SmallVec::new()); // Process any any FS events that occurred while performing the initial scan. // For these events, update events cannot be as precise, because we didn't @@ -4360,7 +4366,8 @@ impl BackgroundScanner { // these before handling changes reported by the filesystem. request = self.next_scan_request().fuse() => { let Ok(request) = request else { break }; - if !self.process_scan_request(request, false).await { + let scanning = scans_running.status_scans.load(atomic::Ordering::Acquire) > 0; + if !self.process_scan_request(request, scanning).await { return; } } @@ -4382,7 +4389,8 @@ impl BackgroundScanner { self.process_events(vec![abs_path]).await; } } - self.send_status_update(false, request.done); + let scanning = scans_running.status_scans.load(atomic::Ordering::Acquire) > 0; + self.send_status_update(scanning, request.done); } paths = fs_events_rx.next().fuse() => { @@ -4575,24 +4583,36 @@ impl BackgroundScanner { .await; self.update_ignore_statuses(scan_job_tx).await; - self.scan_dirs(false, scan_job_rx).await; + let scans_running = self.scan_dirs(false, scan_job_rx).await; - if !dot_git_abs_paths.is_empty() { - self.update_git_repositories(dot_git_abs_paths).await; - } - - { - let mut state = self.state.lock(); - state.snapshot.completed_scan_id = state.snapshot.scan_id; - for (_, entry) in mem::take(&mut state.removed_entries) { - state.scanned_dirs.remove(&entry.id); - } - } + let status_update = if !dot_git_abs_paths.is_empty() { + Some(self.update_git_repositories(dot_git_abs_paths)) + } else { + None + }; - #[cfg(test)] - self.state.lock().snapshot.check_git_invariants(); + let phase = self.phase; + let status_update_tx = self.status_updates_tx.clone(); + let state = self.state.clone(); + self.executor + .spawn(async move { + if let Some(status_update) = status_update { + status_update.await; + } - self.send_status_update(false, SmallVec::new()); + { + let mut state = state.lock(); + state.snapshot.completed_scan_id = state.snapshot.scan_id; + for (_, entry) in mem::take(&mut state.removed_entries) { + state.scanned_dirs.remove(&entry.id); + } + #[cfg(test)] + state.snapshot.check_git_invariants(); + } + let scanning = scans_running.status_scans.load(atomic::Ordering::Acquire) > 0; + send_status_update_inner(phase, state, status_update_tx, scanning, SmallVec::new()); + }) + .detach(); } async fn forcibly_load_paths(&self, paths: &[Arc]) -> bool { @@ -4614,8 +4634,9 @@ impl BackgroundScanner { } drop(scan_job_tx); } + let scans_running = Arc::new(AtomicU32::new(0)); while let Ok(job) = scan_job_rx.recv().await { - self.scan_dir(&job).await.log_err(); + self.scan_dir(&scans_running, &job).await.log_err(); } !mem::take(&mut self.state.lock().paths_to_scan).is_empty() @@ -4625,17 +4646,16 @@ impl BackgroundScanner { &self, enable_progress_updates: bool, scan_jobs_rx: channel::Receiver, - ) { - use futures::FutureExt as _; - + ) -> FsScanned { if self .status_updates_tx .unbounded_send(ScanState::Started) .is_err() { - return; + return FsScanned::default(); } + let scans_running = Arc::new(AtomicU32::new(1)); let progress_update_count = AtomicUsize::new(0); self.executor .scoped(|scope| { @@ -4680,7 +4700,7 @@ impl BackgroundScanner { // Recursively load directories from the file system. job = scan_jobs_rx.recv().fuse() => { let Ok(job) = job else { break }; - if let Err(err) = self.scan_dir(&job).await { + if let Err(err) = self.scan_dir(&scans_running, &job).await { if job.path.as_ref() != Path::new("") { log::error!("error scanning directory {:?}: {}", job.abs_path, err); } @@ -4688,34 +4708,28 @@ impl BackgroundScanner { } } } - }) + }); } }) .await; - } - fn send_status_update(&self, scanning: bool, barrier: SmallVec<[barrier::Sender; 1]>) -> bool { - let mut state = self.state.lock(); - if state.changed_paths.is_empty() && scanning { - return true; + scans_running.fetch_sub(1, atomic::Ordering::Release); + FsScanned { + status_scans: scans_running, } + } - let new_snapshot = state.snapshot.clone(); - let old_snapshot = mem::replace(&mut state.prev_snapshot, new_snapshot.snapshot.clone()); - let changes = self.build_change_set(&old_snapshot, &new_snapshot, &state.changed_paths); - state.changed_paths.clear(); - - self.status_updates_tx - .unbounded_send(ScanState::Updated { - snapshot: new_snapshot, - changes, - scanning, - barrier, - }) - .is_ok() + fn send_status_update(&self, scanning: bool, barrier: SmallVec<[barrier::Sender; 1]>) -> bool { + send_status_update_inner( + self.phase, + self.state.clone(), + self.status_updates_tx.clone(), + scanning, + barrier, + ) } - async fn scan_dir(&self, job: &ScanJob) -> Result<()> { + async fn scan_dir(&self, scans_running: &Arc, job: &ScanJob) -> Result<()> { let root_abs_path; let root_char_bag; { @@ -4755,22 +4769,25 @@ impl BackgroundScanner { swap_to_front(&mut child_paths, *GITIGNORE); swap_to_front(&mut child_paths, *DOT_GIT); + let mut git_status_update_jobs = Vec::new(); for child_abs_path in child_paths { let child_abs_path: Arc = child_abs_path.into(); let child_name = child_abs_path.file_name().unwrap(); let child_path: Arc = job.path.join(child_name).into(); if child_name == *DOT_GIT { - let repo = self.state.lock().insert_git_repository( - child_path.clone(), - self.fs.as_ref(), - self.watcher.as_ref(), - ); - - if let Some(local_repo) = repo { - self.update_git_repository(UpdateGitRepoJob { - local_repository: local_repo, - }); + { + let mut state = self.state.lock(); + let repo = state.insert_git_repository( + child_path.clone(), + self.fs.as_ref(), + self.watcher.as_ref(), + ); + if let Some(local_repo) = repo { + scans_running.fetch_add(1, atomic::Ordering::Release); + git_status_update_jobs + .push(self.schedule_git_statuses_update(&mut state, local_repo)); + } } } else if child_name == *GITIGNORE { match build_gitignore(&child_abs_path, self.fs.as_ref()).await { @@ -4887,6 +4904,32 @@ impl BackgroundScanner { new_entries.push(child_entry); } + let task_state = self.state.clone(); + let phase = self.phase; + let status_updates_tx = self.status_updates_tx.clone(); + let scans_running = scans_running.clone(); + self.executor + .spawn(async move { + if !git_status_update_jobs.is_empty() { + let status_updates = join_all(git_status_update_jobs).await; + let status_updated = status_updates + .iter() + .any(|update_result| update_result.is_ok()); + scans_running.fetch_sub(status_updates.len() as u32, atomic::Ordering::Release); + if status_updated { + let scanning = scans_running.load(atomic::Ordering::Acquire) > 0; + send_status_update_inner( + phase, + task_state, + status_updates_tx, + scanning, + SmallVec::new(), + ); + } + } + }) + .detach(); + let mut state = self.state.lock(); // Identify any subdirectories that should not be scanned. @@ -5127,8 +5170,6 @@ impl BackgroundScanner { } async fn update_ignore_statuses(&self, scan_job_tx: Sender) { - use futures::FutureExt as _; - let mut ignores_to_update = Vec::new(); let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded(); let prev_snapshot; @@ -5278,10 +5319,10 @@ impl BackgroundScanner { state.snapshot.entries_by_id.edit(entries_by_id_edits, &()); } - async fn update_git_repositories(&self, dot_git_paths: Vec) { + fn update_git_repositories(&self, dot_git_paths: Vec) -> Task<()> { log::debug!("reloading repositories: {dot_git_paths:?}"); - let mut repo_updates = Vec::new(); + let mut status_updates = Vec::new(); { let mut state = self.state.lock(); let scan_id = state.snapshot.scan_id; @@ -5305,7 +5346,7 @@ impl BackgroundScanner { None => { let Ok(relative) = dot_git_dir.strip_prefix(state.snapshot.abs_path()) else { - return; + return Task::ready(()); }; match state.insert_git_repository( relative.into(), @@ -5334,7 +5375,8 @@ impl BackgroundScanner { } }; - repo_updates.push(UpdateGitRepoJob { local_repository }); + status_updates + .push(self.schedule_git_statuses_update(&mut state, local_repository)); } // Remove any git repositories whose .git entry no longer exists. @@ -5365,257 +5407,109 @@ impl BackgroundScanner { }); } - let (mut updates_done_tx, mut updates_done_rx) = barrier::channel(); - self.executor - .scoped(|scope| { - scope.spawn(async { - for repo_update in repo_updates { - self.update_git_repository(repo_update); - } - updates_done_tx.blocking_send(()).ok(); - }); - - scope.spawn(async { - loop { - select_biased! { - // Process any path refresh requests before moving on to process - // the queue of git statuses. - request = self.next_scan_request().fuse() => { - let Ok(request) = request else { break }; - if !self.process_scan_request(request, true).await { - return; - } - } - _ = updates_done_rx.recv().fuse() => break, - } - } - }); - }) - .await; - } - - fn update_branches(&self, job: &UpdateGitRepoJob) -> Result<()> { - let branches = job.local_repository.repo().branches()?; - let snapshot = self.state.lock().snapshot.snapshot.clone(); - - let mut repository = snapshot - .repository(job.local_repository.work_directory.path_key()) - .context("Missing repository")?; - - repository.branch = branches.into_iter().find(|branch| branch.is_head); - - let mut state = self.state.lock(); - state - .snapshot - .repositories - .insert_or_replace(repository, &()); - - Ok(()) + self.executor.spawn(async move { + let _updates_finished: Vec> = + join_all(status_updates).await; + }) } - fn update_statuses(&self, job: &UpdateGitRepoJob) -> Result<()> { - log::trace!( - "updating git statuses for repo {:?}", - job.local_repository.work_directory.display_name() - ); - let t0 = Instant::now(); - - let statuses = job - .local_repository - .repo() - .status(&[git::WORK_DIRECTORY_REPO_PATH.clone()])?; + /// Update the git statuses for a given batch of entries. + fn schedule_git_statuses_update( + &self, + state: &mut BackgroundScannerState, + mut local_repository: LocalRepositoryEntry, + ) -> oneshot::Receiver<()> { + let repository_name = local_repository.work_directory.display_name(); + let path_key = local_repository.work_directory.path_key(); - log::trace!( - "computed git statuses for repo {:?} in {:?}", - job.local_repository.work_directory.display_name(), - t0.elapsed() - ); + let job_state = self.state.clone(); + let (tx, rx) = oneshot::channel(); - let t0 = Instant::now(); - let mut changed_paths = Vec::new(); - let snapshot = self.state.lock().snapshot.snapshot.clone(); + state.repository_scans.insert( + path_key.clone(), + self.executor.spawn(async move { + update_branches(&job_state, &mut local_repository).log_err(); + log::trace!("updating git statuses for repo {repository_name}",); + let t0 = Instant::now(); + + let Some(statuses) = local_repository + .repo() + .status(&[git::WORK_DIRECTORY_REPO_PATH.clone()]) + .log_err() + else { + return; + }; - let mut repository = snapshot - .repository(job.local_repository.work_directory.path_key()) - .context("Got an UpdateGitStatusesJob for a repository that isn't in the snapshot")?; + log::trace!( + "computed git statuses for repo {repository_name} in {:?}", + t0.elapsed() + ); - let merge_head_shas = job.local_repository.repo().merge_head_shas(); - if merge_head_shas != job.local_repository.current_merge_head_shas { - mem::take(&mut repository.current_merge_conflicts); - } + let t0 = Instant::now(); + let mut changed_paths = Vec::new(); + let snapshot = job_state.lock().snapshot.snapshot.clone(); - let mut new_entries_by_path = SumTree::new(&()); - for (repo_path, status) in statuses.entries.iter() { - let project_path = repository.work_directory.unrelativize(repo_path); + let Some(mut repository) = snapshot + .repository(path_key) + .context( + "Tried to update git statuses for a repository that isn't in the snapshot", + ) + .log_err() + else { + return; + }; - new_entries_by_path.insert_or_replace( - StatusEntry { - repo_path: repo_path.clone(), - status: *status, - }, - &(), - ); - if status.is_conflicted() { - repository.current_merge_conflicts.insert(repo_path.clone()); - } + let merge_head_shas = local_repository.repo().merge_head_shas(); + if merge_head_shas != local_repository.current_merge_head_shas { + mem::take(&mut repository.current_merge_conflicts); + } - if let Some(path) = project_path { - changed_paths.push(path); - } - } + let mut new_entries_by_path = SumTree::new(&()); + for (repo_path, status) in statuses.entries.iter() { + let project_path = repository.work_directory.unrelativize(repo_path); - repository.statuses_by_path = new_entries_by_path; + new_entries_by_path.insert_or_replace( + StatusEntry { + repo_path: repo_path.clone(), + status: *status, + }, + &(), + ); - let mut state = self.state.lock(); - state - .snapshot - .repositories - .insert_or_replace(repository, &()); + if let Some(path) = project_path { + changed_paths.push(path); + } + } - state - .snapshot - .git_repositories - .update(&job.local_repository.work_directory_id, |entry| { - entry.current_merge_head_shas = merge_head_shas; - }); + repository.statuses_by_path = new_entries_by_path; + let mut state = job_state.lock(); + state + .snapshot + .repositories + .insert_or_replace(repository, &()); + state.snapshot.git_repositories.update( + &local_repository.work_directory_id, + |entry| { + entry.current_merge_head_shas = merge_head_shas; + entry.status_scan_id += 1; + }, + ); - util::extend_sorted( - &mut state.changed_paths, - changed_paths, - usize::MAX, - Ord::cmp, - ); + util::extend_sorted( + &mut state.changed_paths, + changed_paths, + usize::MAX, + Ord::cmp, + ); - log::trace!( - "applied git status updates for repo {:?} in {:?}", - job.local_repository.work_directory.display_name(), - t0.elapsed(), + log::trace!( + "applied git status updates for repo {repository_name} in {:?}", + t0.elapsed(), + ); + tx.send(()).ok(); + }), ); - Ok(()) - } - - /// Update the git statuses for a given batch of entries. - fn update_git_repository(&self, job: UpdateGitRepoJob) { - self.update_branches(&job).log_err(); - self.update_statuses(&job).log_err(); - } - - fn build_change_set( - &self, - old_snapshot: &Snapshot, - new_snapshot: &Snapshot, - event_paths: &[Arc], - ) -> UpdatedEntriesSet { - use BackgroundScannerPhase::*; - use PathChange::{Added, AddedOrUpdated, Loaded, Removed, Updated}; - - // Identify which paths have changed. Use the known set of changed - // parent paths to optimize the search. - let mut changes = Vec::new(); - let mut old_paths = old_snapshot.entries_by_path.cursor::(&()); - let mut new_paths = new_snapshot.entries_by_path.cursor::(&()); - let mut last_newly_loaded_dir_path = None; - old_paths.next(&()); - new_paths.next(&()); - for path in event_paths { - let path = PathKey(path.clone()); - if old_paths.item().map_or(false, |e| e.path < path.0) { - old_paths.seek_forward(&path, Bias::Left, &()); - } - if new_paths.item().map_or(false, |e| e.path < path.0) { - new_paths.seek_forward(&path, Bias::Left, &()); - } - loop { - match (old_paths.item(), new_paths.item()) { - (Some(old_entry), Some(new_entry)) => { - if old_entry.path > path.0 - && new_entry.path > path.0 - && !old_entry.path.starts_with(&path.0) - && !new_entry.path.starts_with(&path.0) - { - break; - } - - match Ord::cmp(&old_entry.path, &new_entry.path) { - Ordering::Less => { - changes.push((old_entry.path.clone(), old_entry.id, Removed)); - old_paths.next(&()); - } - Ordering::Equal => { - if self.phase == EventsReceivedDuringInitialScan { - if old_entry.id != new_entry.id { - changes.push(( - old_entry.path.clone(), - old_entry.id, - Removed, - )); - } - // If the worktree was not fully initialized when this event was generated, - // we can't know whether this entry was added during the scan or whether - // it was merely updated. - changes.push(( - new_entry.path.clone(), - new_entry.id, - AddedOrUpdated, - )); - } else if old_entry.id != new_entry.id { - changes.push((old_entry.path.clone(), old_entry.id, Removed)); - changes.push((new_entry.path.clone(), new_entry.id, Added)); - } else if old_entry != new_entry { - if old_entry.kind.is_unloaded() { - last_newly_loaded_dir_path = Some(&new_entry.path); - changes.push(( - new_entry.path.clone(), - new_entry.id, - Loaded, - )); - } else { - changes.push(( - new_entry.path.clone(), - new_entry.id, - Updated, - )); - } - } - old_paths.next(&()); - new_paths.next(&()); - } - Ordering::Greater => { - let is_newly_loaded = self.phase == InitialScan - || last_newly_loaded_dir_path - .as_ref() - .map_or(false, |dir| new_entry.path.starts_with(dir)); - changes.push(( - new_entry.path.clone(), - new_entry.id, - if is_newly_loaded { Loaded } else { Added }, - )); - new_paths.next(&()); - } - } - } - (Some(old_entry), None) => { - changes.push((old_entry.path.clone(), old_entry.id, Removed)); - old_paths.next(&()); - } - (None, Some(new_entry)) => { - let is_newly_loaded = self.phase == InitialScan - || last_newly_loaded_dir_path - .as_ref() - .map_or(false, |dir| new_entry.path.starts_with(dir)); - changes.push(( - new_entry.path.clone(), - new_entry.id, - if is_newly_loaded { Loaded } else { Added }, - )); - new_paths.next(&()); - } - (None, None) => break, - } - } - } - - changes.into() + rx } async fn progress_timer(&self, running: bool) { @@ -5645,6 +5539,159 @@ impl BackgroundScanner { } } +fn send_status_update_inner( + phase: BackgroundScannerPhase, + state: Arc>, + status_updates_tx: UnboundedSender, + scanning: bool, + barrier: SmallVec<[barrier::Sender; 1]>, +) -> bool { + let mut state = state.lock(); + if state.changed_paths.is_empty() && scanning { + return true; + } + + let new_snapshot = state.snapshot.clone(); + let old_snapshot = mem::replace(&mut state.prev_snapshot, new_snapshot.snapshot.clone()); + let changes = build_diff(phase, &old_snapshot, &new_snapshot, &state.changed_paths); + state.changed_paths.clear(); + + status_updates_tx + .unbounded_send(ScanState::Updated { + snapshot: new_snapshot, + changes, + scanning, + barrier, + }) + .is_ok() +} + +fn update_branches( + state: &Mutex, + repository: &mut LocalRepositoryEntry, +) -> Result<()> { + let branches = repository.repo().branches()?; + let snapshot = state.lock().snapshot.snapshot.clone(); + let mut repository = snapshot + .repository(repository.work_directory.path_key()) + .context("Missing repository")?; + repository.branch = branches.into_iter().find(|branch| branch.is_head); + + let mut state = state.lock(); + state + .snapshot + .repositories + .insert_or_replace(repository, &()); + + Ok(()) +} + +fn build_diff( + phase: BackgroundScannerPhase, + old_snapshot: &Snapshot, + new_snapshot: &Snapshot, + event_paths: &[Arc], +) -> UpdatedEntriesSet { + use BackgroundScannerPhase::*; + use PathChange::{Added, AddedOrUpdated, Loaded, Removed, Updated}; + + // Identify which paths have changed. Use the known set of changed + // parent paths to optimize the search. + let mut changes = Vec::new(); + let mut old_paths = old_snapshot.entries_by_path.cursor::(&()); + let mut new_paths = new_snapshot.entries_by_path.cursor::(&()); + let mut last_newly_loaded_dir_path = None; + old_paths.next(&()); + new_paths.next(&()); + for path in event_paths { + let path = PathKey(path.clone()); + if old_paths.item().map_or(false, |e| e.path < path.0) { + old_paths.seek_forward(&path, Bias::Left, &()); + } + if new_paths.item().map_or(false, |e| e.path < path.0) { + new_paths.seek_forward(&path, Bias::Left, &()); + } + loop { + match (old_paths.item(), new_paths.item()) { + (Some(old_entry), Some(new_entry)) => { + if old_entry.path > path.0 + && new_entry.path > path.0 + && !old_entry.path.starts_with(&path.0) + && !new_entry.path.starts_with(&path.0) + { + break; + } + + match Ord::cmp(&old_entry.path, &new_entry.path) { + Ordering::Less => { + changes.push((old_entry.path.clone(), old_entry.id, Removed)); + old_paths.next(&()); + } + Ordering::Equal => { + if phase == EventsReceivedDuringInitialScan { + if old_entry.id != new_entry.id { + changes.push((old_entry.path.clone(), old_entry.id, Removed)); + } + // If the worktree was not fully initialized when this event was generated, + // we can't know whether this entry was added during the scan or whether + // it was merely updated. + changes.push(( + new_entry.path.clone(), + new_entry.id, + AddedOrUpdated, + )); + } else if old_entry.id != new_entry.id { + changes.push((old_entry.path.clone(), old_entry.id, Removed)); + changes.push((new_entry.path.clone(), new_entry.id, Added)); + } else if old_entry != new_entry { + if old_entry.kind.is_unloaded() { + last_newly_loaded_dir_path = Some(&new_entry.path); + changes.push((new_entry.path.clone(), new_entry.id, Loaded)); + } else { + changes.push((new_entry.path.clone(), new_entry.id, Updated)); + } + } + old_paths.next(&()); + new_paths.next(&()); + } + Ordering::Greater => { + let is_newly_loaded = phase == InitialScan + || last_newly_loaded_dir_path + .as_ref() + .map_or(false, |dir| new_entry.path.starts_with(dir)); + changes.push(( + new_entry.path.clone(), + new_entry.id, + if is_newly_loaded { Loaded } else { Added }, + )); + new_paths.next(&()); + } + } + } + (Some(old_entry), None) => { + changes.push((old_entry.path.clone(), old_entry.id, Removed)); + old_paths.next(&()); + } + (None, Some(new_entry)) => { + let is_newly_loaded = phase == InitialScan + || last_newly_loaded_dir_path + .as_ref() + .map_or(false, |dir| new_entry.path.starts_with(dir)); + changes.push(( + new_entry.path.clone(), + new_entry.id, + if is_newly_loaded { Loaded } else { Added }, + )); + new_paths.next(&()); + } + (None, None) => break, + } + } + } + + changes.into() +} + fn swap_to_front(child_paths: &mut Vec, file: &OsStr) { let position = child_paths .iter() @@ -5691,6 +5738,7 @@ impl RepoPaths { } } +#[derive(Debug)] struct ScanJob { abs_path: Arc, path: Arc, @@ -5707,10 +5755,6 @@ struct UpdateIgnoreStatusJob { scan_queue: Sender, } -struct UpdateGitRepoJob { - local_repository: LocalRepositoryEntry, -} - pub trait WorktreeModelHandle { #[cfg(any(test, feature = "test-support"))] fn flush_fs_events<'a>( diff --git a/crates/worktree/src/worktree_tests.rs b/crates/worktree/src/worktree_tests.rs index f4e6da23455c5b..2a704c62e898ef 100644 --- a/crates/worktree/src/worktree_tests.rs +++ b/crates/worktree/src/worktree_tests.rs @@ -24,6 +24,7 @@ use std::{ mem, path::{Path, PathBuf}, sync::Arc, + time::Duration, }; use util::{test::TempTree, ResultExt}; @@ -1504,6 +1505,7 @@ async fn test_bump_mtime_of_git_repo_workdir(cx: &mut TestAppContext) { &[(Path::new("b/c.txt"), StatusCode::Modified.index())], ); cx.executor().run_until_parked(); + cx.executor().advance_clock(Duration::from_secs(1)); let snapshot = tree.read_with(cx, |tree, _| tree.snapshot());