diff --git a/subcmds/sync.py b/subcmds/sync.py index 582bd0579..f9500314d 100644 --- a/subcmds/sync.py +++ b/subcmds/sync.py @@ -975,9 +975,6 @@ later is required to fix a server side protocol bug. sync_event.set() sync_progress_thread.join() - self._fetch_times.Save() - self._local_sync_state.Save() - if not self.outer_client.manifest.IsArchive: self._GCProjects(projects, opt, err_event) @@ -1003,53 +1000,58 @@ later is required to fix a server side protocol bug. to_fetch.extend(all_projects) to_fetch.sort(key=self._fetch_times.Get, reverse=True) - result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors) - success = result.success - fetched = result.projects - if not success: - err_event.set() - - if opt.network_only: - # Bail out now; the rest touches the working tree. - if err_event.is_set(): - e = SyncError( - "error: Exited sync due to fetch errors.", - aggregate_errors=errors, - ) - - logger.error(e) - raise e - return _FetchMainResult([]) - - # Iteratively fetch missing and/or nested unregistered submodules. - previously_missing_set = set() - while True: - self._ReloadManifest(None, manifest) - all_projects = self.GetProjects( - args, - missing_ok=True, - submodules_ok=opt.fetch_submodules, - manifest=manifest, - all_manifests=not opt.this_manifest_only, - ) - missing = [] - for project in all_projects: - if project.gitdir not in fetched: - missing.append(project) - if not missing: - break - # Stop us from non-stopped fetching actually-missing repos: If set - # of missing repos has not been changed from last fetch, we break. - missing_set = {p.name for p in missing} - if previously_missing_set == missing_set: - break - previously_missing_set = missing_set - result = self._Fetch(missing, opt, err_event, ssh_proxy, errors) + try: + result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors) success = result.success - new_fetched = result.projects + fetched = result.projects if not success: err_event.set() - fetched.update(new_fetched) + + if opt.network_only: + # Bail out now; the rest touches the working tree. + if err_event.is_set(): + e = SyncError( + "error: Exited sync due to fetch errors.", + aggregate_errors=errors, + ) + + logger.error(e) + raise e + return _FetchMainResult([]) + + # Iteratively fetch missing and/or nested unregistered submodules. + previously_missing_set = set() + while True: + self._ReloadManifest(None, manifest) + all_projects = self.GetProjects( + args, + missing_ok=True, + submodules_ok=opt.fetch_submodules, + manifest=manifest, + all_manifests=not opt.this_manifest_only, + ) + missing = [] + for project in all_projects: + if project.gitdir not in fetched: + missing.append(project) + if not missing: + break + # Stop us from non-stopped fetching actually-missing repos: If + # set of missing repos has not been changed from last fetch, we + # break. + missing_set = {p.name for p in missing} + if previously_missing_set == missing_set: + break + previously_missing_set = missing_set + result = self._Fetch(missing, opt, err_event, ssh_proxy, errors) + success = result.success + new_fetched = result.projects + if not success: + err_event.set() + fetched.update(new_fetched) + finally: + self._fetch_times.Save() + self._local_sync_state.Save() return _FetchMainResult(all_projects) @@ -2491,107 +2493,120 @@ later is required to fix a server side protocol bug. sync_event = _threading.Event() sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event) - with multiprocessing.Manager() as manager, ssh.ProxyManager( - manager - ) as ssh_proxy: - ssh_proxy.sock() - with self.ParallelContext(): - self.get_parallel_context()["ssh_proxy"] = ssh_proxy - # TODO(gavinmak): Use multprocessing.Queue instead of dict. - self.get_parallel_context()[ - "sync_dict" - ] = multiprocessing.Manager().dict() - sync_progress_thread.start() + try: + with multiprocessing.Manager() as manager, ssh.ProxyManager( + manager + ) as ssh_proxy: + ssh_proxy.sock() + with self.ParallelContext(): + self.get_parallel_context()["ssh_proxy"] = ssh_proxy + # TODO(gavinmak): Use multprocessing.Queue instead of dict. + self.get_parallel_context()[ + "sync_dict" + ] = multiprocessing.Manager().dict() + sync_progress_thread.start() - try: - # Outer loop for dynamic project discovery. This continues - # until no unsynced projects remain. - while True: - projects_to_sync = [ - p - for p in project_list - if p.relpath not in finished_relpaths - ] - if not projects_to_sync: - break + try: + # Outer loop for dynamic project discovery. This + # continues until no unsynced projects remain. + while True: + projects_to_sync = [ + p + for p in project_list + if p.relpath not in finished_relpaths + ] + if not projects_to_sync: + break - pending_relpaths = {p.relpath for p in projects_to_sync} - if previously_pending_relpaths == pending_relpaths: - stalled_projects_str = "\n".join( - f" - {path}" - for path in sorted(list(pending_relpaths)) - ) - logger.error( - "The following projects failed and could not " - "be synced:\n%s", - stalled_projects_str, - ) - err_event.set() - break - previously_pending_relpaths = pending_relpaths - - self.get_parallel_context()[ - "projects" - ] = projects_to_sync - project_index_map = { - p: i for i, p in enumerate(projects_to_sync) - } - - # Inner loop to process projects in a hierarchical - # order. This iterates through levels of project - # dependencies (e.g. 'foo' then 'foo/bar'). All projects - # in one level can be processed in parallel, but we must - # wait for a level to complete before starting the next. - for level_projects in _SafeCheckoutOrder( - projects_to_sync - ): - if not level_projects: - continue - - objdir_project_map = collections.defaultdict(list) - for p in level_projects: - objdir_project_map[p.objdir].append( - project_index_map[p] + pending_relpaths = { + p.relpath for p in projects_to_sync + } + if previously_pending_relpaths == pending_relpaths: + stalled_projects_str = "\n".join( + f" - {path}" + for path in sorted(list(pending_relpaths)) + ) + logger.error( + "The following projects failed and could " + "not be synced:\n%s", + stalled_projects_str, ) - - work_items = list(objdir_project_map.values()) - if not work_items: - continue - - jobs = max(1, min(opt.jobs, len(work_items))) - callback = functools.partial( - self._ProcessSyncInterleavedResults, - finished_relpaths, - err_event, - errors, - opt, - ) - if not self.ExecuteInParallel( - jobs, - functools.partial(self._SyncProjectList, opt), - work_items, - callback=callback, - output=pm, - chunksize=1, - initializer=self.InitWorker, - ): err_event.set() + break + previously_pending_relpaths = pending_relpaths - if err_event.is_set() and opt.fail_fast: - raise SyncFailFastError(aggregate_errors=errors) + self.get_parallel_context()[ + "projects" + ] = projects_to_sync + project_index_map = { + p: i for i, p in enumerate(projects_to_sync) + } - self._ReloadManifest(None, manifest) - project_list = self.GetProjects( - args, - missing_ok=True, - submodules_ok=opt.fetch_submodules, - manifest=manifest, - all_manifests=not opt.this_manifest_only, - ) - pm.update_total(len(project_list)) - finally: - sync_event.set() - sync_progress_thread.join() + # Inner loop to process projects in a hierarchical + # order. This iterates through levels of project + # dependencies (e.g. 'foo' then 'foo/bar'). All + # projects in one level can be processed in + # parallel, but we must wait for a level to complete + # before starting the next. + for level_projects in _SafeCheckoutOrder( + projects_to_sync + ): + if not level_projects: + continue + + objdir_project_map = collections.defaultdict( + list + ) + for p in level_projects: + objdir_project_map[p.objdir].append( + project_index_map[p] + ) + + work_items = list(objdir_project_map.values()) + if not work_items: + continue + + jobs = max(1, min(opt.jobs, len(work_items))) + callback = functools.partial( + self._ProcessSyncInterleavedResults, + finished_relpaths, + err_event, + errors, + opt, + ) + if not self.ExecuteInParallel( + jobs, + functools.partial( + self._SyncProjectList, opt + ), + work_items, + callback=callback, + output=pm, + chunksize=1, + initializer=self.InitWorker, + ): + err_event.set() + + if err_event.is_set() and opt.fail_fast: + raise SyncFailFastError( + aggregate_errors=errors + ) + + self._ReloadManifest(None, manifest) + project_list = self.GetProjects( + args, + missing_ok=True, + submodules_ok=opt.fetch_submodules, + manifest=manifest, + all_manifests=not opt.this_manifest_only, + ) + pm.update_total(len(project_list)) + finally: + sync_event.set() + sync_progress_thread.join() + finally: + self._fetch_times.Save() + self._local_sync_state.Save() pm.end() @@ -2695,17 +2710,19 @@ class _FetchTimes: self._saved = {} def Save(self): - if self._saved is None: + if not self._seen: return + self._Load() + for name, t in self._seen.items(): # Keep a moving average across the previous/current sync runs. old = self._saved.get(name, t) - self._seen[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old) + self._saved[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old) try: with open(self._path, "w") as f: - json.dump(self._seen, f, indent=2) + json.dump(self._saved, f, indent=2) except (OSError, TypeError): platform_utils.remove(self._path, missing_ok=True) diff --git a/tests/test_subcmds_sync.py b/tests/test_subcmds_sync.py index 6c9cc9ab6..6eb8a5a71 100644 --- a/tests/test_subcmds_sync.py +++ b/tests/test_subcmds_sync.py @@ -681,6 +681,9 @@ class InterleavedSyncTest(unittest.TestCase): # Mock _GetCurrentBranchOnly for worker tests. mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start() + self.cmd._fetch_times = mock.Mock() + self.cmd._local_sync_state = mock.Mock() + def tearDown(self): """Clean up resources.""" shutil.rmtree(self.repodir)