sync: fix saving of fetch times and local state

Interleaved sync didn't save _fetch_times and _local_sync_state to disk.
Phased sync saved them, but incorrectly applied moving average smoothing
repeatedly when fetching submodules, and discarded historical data
during partial syncs.

Move .Save() calls to the end of main sync loops to ensure they run
once. Update _FetchTimes.Save() to merge new data with existing history,
preventing data loss.

Change-Id: I174f98a62ac86859f1eeea1daba65eb35c227852
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/519821
Commit-Queue: Gavin Mak <gavinmak@google.com>
Reviewed-by: Scott Lee <ddoman@google.com>
Tested-by: Gavin Mak <gavinmak@google.com>
This commit is contained in:
Gavin Mak
2025-10-20 11:13:09 -07:00
committed by LUCI
parent 2719a8e203
commit 1afe96a7e9
2 changed files with 165 additions and 145 deletions

View File

@@ -975,9 +975,6 @@ later is required to fix a server side protocol bug.
sync_event.set()
sync_progress_thread.join()
self._fetch_times.Save()
self._local_sync_state.Save()
if not self.outer_client.manifest.IsArchive:
self._GCProjects(projects, opt, err_event)
@@ -1003,53 +1000,58 @@ later is required to fix a server side protocol bug.
to_fetch.extend(all_projects)
to_fetch.sort(key=self._fetch_times.Get, reverse=True)
result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors)
success = result.success
fetched = result.projects
if not success:
err_event.set()
if opt.network_only:
# Bail out now; the rest touches the working tree.
if err_event.is_set():
e = SyncError(
"error: Exited sync due to fetch errors.",
aggregate_errors=errors,
)
logger.error(e)
raise e
return _FetchMainResult([])
# Iteratively fetch missing and/or nested unregistered submodules.
previously_missing_set = set()
while True:
self._ReloadManifest(None, manifest)
all_projects = self.GetProjects(
args,
missing_ok=True,
submodules_ok=opt.fetch_submodules,
manifest=manifest,
all_manifests=not opt.this_manifest_only,
)
missing = []
for project in all_projects:
if project.gitdir not in fetched:
missing.append(project)
if not missing:
break
# Stop us from non-stopped fetching actually-missing repos: If set
# of missing repos has not been changed from last fetch, we break.
missing_set = {p.name for p in missing}
if previously_missing_set == missing_set:
break
previously_missing_set = missing_set
result = self._Fetch(missing, opt, err_event, ssh_proxy, errors)
try:
result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors)
success = result.success
new_fetched = result.projects
fetched = result.projects
if not success:
err_event.set()
fetched.update(new_fetched)
if opt.network_only:
# Bail out now; the rest touches the working tree.
if err_event.is_set():
e = SyncError(
"error: Exited sync due to fetch errors.",
aggregate_errors=errors,
)
logger.error(e)
raise e
return _FetchMainResult([])
# Iteratively fetch missing and/or nested unregistered submodules.
previously_missing_set = set()
while True:
self._ReloadManifest(None, manifest)
all_projects = self.GetProjects(
args,
missing_ok=True,
submodules_ok=opt.fetch_submodules,
manifest=manifest,
all_manifests=not opt.this_manifest_only,
)
missing = []
for project in all_projects:
if project.gitdir not in fetched:
missing.append(project)
if not missing:
break
# Stop us from non-stopped fetching actually-missing repos: If
# set of missing repos has not been changed from last fetch, we
# break.
missing_set = {p.name for p in missing}
if previously_missing_set == missing_set:
break
previously_missing_set = missing_set
result = self._Fetch(missing, opt, err_event, ssh_proxy, errors)
success = result.success
new_fetched = result.projects
if not success:
err_event.set()
fetched.update(new_fetched)
finally:
self._fetch_times.Save()
self._local_sync_state.Save()
return _FetchMainResult(all_projects)
@@ -2491,107 +2493,120 @@ later is required to fix a server side protocol bug.
sync_event = _threading.Event()
sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event)
with multiprocessing.Manager() as manager, ssh.ProxyManager(
manager
) as ssh_proxy:
ssh_proxy.sock()
with self.ParallelContext():
self.get_parallel_context()["ssh_proxy"] = ssh_proxy
# TODO(gavinmak): Use multprocessing.Queue instead of dict.
self.get_parallel_context()[
"sync_dict"
] = multiprocessing.Manager().dict()
sync_progress_thread.start()
try:
with multiprocessing.Manager() as manager, ssh.ProxyManager(
manager
) as ssh_proxy:
ssh_proxy.sock()
with self.ParallelContext():
self.get_parallel_context()["ssh_proxy"] = ssh_proxy
# TODO(gavinmak): Use multprocessing.Queue instead of dict.
self.get_parallel_context()[
"sync_dict"
] = multiprocessing.Manager().dict()
sync_progress_thread.start()
try:
# Outer loop for dynamic project discovery. This continues
# until no unsynced projects remain.
while True:
projects_to_sync = [
p
for p in project_list
if p.relpath not in finished_relpaths
]
if not projects_to_sync:
break
try:
# Outer loop for dynamic project discovery. This
# continues until no unsynced projects remain.
while True:
projects_to_sync = [
p
for p in project_list
if p.relpath not in finished_relpaths
]
if not projects_to_sync:
break
pending_relpaths = {p.relpath for p in projects_to_sync}
if previously_pending_relpaths == pending_relpaths:
stalled_projects_str = "\n".join(
f" - {path}"
for path in sorted(list(pending_relpaths))
)
logger.error(
"The following projects failed and could not "
"be synced:\n%s",
stalled_projects_str,
)
err_event.set()
break
previously_pending_relpaths = pending_relpaths
self.get_parallel_context()[
"projects"
] = projects_to_sync
project_index_map = {
p: i for i, p in enumerate(projects_to_sync)
}
# Inner loop to process projects in a hierarchical
# order. This iterates through levels of project
# dependencies (e.g. 'foo' then 'foo/bar'). All projects
# in one level can be processed in parallel, but we must
# wait for a level to complete before starting the next.
for level_projects in _SafeCheckoutOrder(
projects_to_sync
):
if not level_projects:
continue
objdir_project_map = collections.defaultdict(list)
for p in level_projects:
objdir_project_map[p.objdir].append(
project_index_map[p]
pending_relpaths = {
p.relpath for p in projects_to_sync
}
if previously_pending_relpaths == pending_relpaths:
stalled_projects_str = "\n".join(
f" - {path}"
for path in sorted(list(pending_relpaths))
)
logger.error(
"The following projects failed and could "
"not be synced:\n%s",
stalled_projects_str,
)
work_items = list(objdir_project_map.values())
if not work_items:
continue
jobs = max(1, min(opt.jobs, len(work_items)))
callback = functools.partial(
self._ProcessSyncInterleavedResults,
finished_relpaths,
err_event,
errors,
opt,
)
if not self.ExecuteInParallel(
jobs,
functools.partial(self._SyncProjectList, opt),
work_items,
callback=callback,
output=pm,
chunksize=1,
initializer=self.InitWorker,
):
err_event.set()
break
previously_pending_relpaths = pending_relpaths
if err_event.is_set() and opt.fail_fast:
raise SyncFailFastError(aggregate_errors=errors)
self.get_parallel_context()[
"projects"
] = projects_to_sync
project_index_map = {
p: i for i, p in enumerate(projects_to_sync)
}
self._ReloadManifest(None, manifest)
project_list = self.GetProjects(
args,
missing_ok=True,
submodules_ok=opt.fetch_submodules,
manifest=manifest,
all_manifests=not opt.this_manifest_only,
)
pm.update_total(len(project_list))
finally:
sync_event.set()
sync_progress_thread.join()
# Inner loop to process projects in a hierarchical
# order. This iterates through levels of project
# dependencies (e.g. 'foo' then 'foo/bar'). All
# projects in one level can be processed in
# parallel, but we must wait for a level to complete
# before starting the next.
for level_projects in _SafeCheckoutOrder(
projects_to_sync
):
if not level_projects:
continue
objdir_project_map = collections.defaultdict(
list
)
for p in level_projects:
objdir_project_map[p.objdir].append(
project_index_map[p]
)
work_items = list(objdir_project_map.values())
if not work_items:
continue
jobs = max(1, min(opt.jobs, len(work_items)))
callback = functools.partial(
self._ProcessSyncInterleavedResults,
finished_relpaths,
err_event,
errors,
opt,
)
if not self.ExecuteInParallel(
jobs,
functools.partial(
self._SyncProjectList, opt
),
work_items,
callback=callback,
output=pm,
chunksize=1,
initializer=self.InitWorker,
):
err_event.set()
if err_event.is_set() and opt.fail_fast:
raise SyncFailFastError(
aggregate_errors=errors
)
self._ReloadManifest(None, manifest)
project_list = self.GetProjects(
args,
missing_ok=True,
submodules_ok=opt.fetch_submodules,
manifest=manifest,
all_manifests=not opt.this_manifest_only,
)
pm.update_total(len(project_list))
finally:
sync_event.set()
sync_progress_thread.join()
finally:
self._fetch_times.Save()
self._local_sync_state.Save()
pm.end()
@@ -2695,17 +2710,19 @@ class _FetchTimes:
self._saved = {}
def Save(self):
if self._saved is None:
if not self._seen:
return
self._Load()
for name, t in self._seen.items():
# Keep a moving average across the previous/current sync runs.
old = self._saved.get(name, t)
self._seen[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old)
self._saved[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old)
try:
with open(self._path, "w") as f:
json.dump(self._seen, f, indent=2)
json.dump(self._saved, f, indent=2)
except (OSError, TypeError):
platform_utils.remove(self._path, missing_ok=True)

View File

@@ -681,6 +681,9 @@ class InterleavedSyncTest(unittest.TestCase):
# Mock _GetCurrentBranchOnly for worker tests.
mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start()
self.cmd._fetch_times = mock.Mock()
self.cmd._local_sync_state = mock.Mock()
def tearDown(self):
"""Clean up resources."""
shutil.rmtree(self.repodir)