Compare commits

...

12 Commits

Author SHA1 Message Date
Kuang-che Wu
ab2d321104 sync: fix connection error on macOS
With a large number of sync workers, the sync process may fail on
macOS due to connection errors. The root cause is that multiple
workers may attempt to connect to the multiprocessing manager server
at the same time when handling the first job. This can lead to
connection failures if there are too many pending connections, exceeding
the socket listening backlog.

Bug: 377538810
Change-Id: I1924d318d076ca3be61d75daa37bfa8d7dc23ed7
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/441541
Tested-by: Josip Sokcevic <sokcevic@google.com>
Commit-Queue: Josip Sokcevic <sokcevic@google.com>
Reviewed-by: Josip Sokcevic <sokcevic@google.com>
2024-11-06 16:33:17 +00:00
Josip Sokcevic
aada468916 upload: Return correct tuple values in _ProcessResults
Incorrect tuple values were returned with http://go/grev/440221 -
instead of returning (Project, ReviewableBranch), _ProcessResults was
returning (int, ReviewableBranch).

R=jojwang@google.com

Bug: 376731172
Change-Id: I75205f42fd23f5ee6bd8d0c15b18066189b42bd9
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/441121
Reviewed-by: Sam Saccone <samccone@google.com>
Commit-Queue: Josip Sokcevic <sokcevic@google.com>
Tested-by: Josip Sokcevic <sokcevic@google.com>
2024-10-31 21:18:53 +00:00
Allen Webb
1d5098617e worktree: Do not try to fix relative paths
--worktree was broken with incorrect paths in the .git files
whenever the local copy of git populated gitdir with relative paths
instead of absoulte paths.

Bug: 376251410
Change-Id: Id32dc1576315218967de2a9bfe43bf7a5a0e7aa6
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/440801
Commit-Queue: Allen Webb <allenwebb@google.com>
Reviewed-by: Josip Sokcevic <sokcevic@google.com>
Tested-by: Allen Webb <allenwebb@google.com>
2024-10-30 17:03:57 +00:00
Josip Sokcevic
e219c78fe5 forall: Fix returning results early
rc should be returned only after all results are processed.

R=jojwang@google.com

Bug: b/376454189
Change-Id: I8200b9954240dd3e8e9f2ab82494779a3cb38627
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/440901
Tested-by: Josip Sokcevic <sokcevic@google.com>
Commit-Queue: Josip Sokcevic <sokcevic@google.com>
Reviewed-by: Joanna Wang <jojwang@google.com>
2024-10-30 16:11:04 +00:00
joehsu
f9f4df62e0 Use full name of the revision when checking dest-branch
The manifest usually doesn't sepecify the revision with the full name
(e.g. refs/heads/REV).
However, when checking if the name of the merge branch, full name is
used on the merge branch.

The CL use full name of revision when comparing it with the merge
branch.

Bug: b/370919047
Test: repo upload on a project with `dest-branch` set
Change-Id: Ib6fa2f7246beb5bae0a26a70048a7ac03b6c5a2f
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/438401
Reviewed-by: Josip Sokcevic <sokcevic@google.com>
Tested-by: Joe Hsu <joehsu@google.com>
Commit-Queue: Josip Sokcevic <sokcevic@google.com>
2024-10-28 23:47:08 +00:00
Fredrik de Groot
ebdf0409d2 Add REPO_SKIP_SELF_UPDATE check in sync
The command _PostRepoFetch will try to self update
during repo sync. That is beneficial but adds
version uncertainty, fail potential and slow downs
in non-interactive scenarios.

Conditionally skip the update if env variable
REPO_SKIP_SELF_UPDATE is defined.

A call to selfupdate works as before, meaning even
with the variable set, it will run the update.

Change-Id: Iab0ef55dc3d3db3cbf1ba1f506c57fbb58a504c3
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/439967
Tested-by: Fredrik de Groot <fredrik.de.groot@haleytek.com>
Commit-Queue: Josip Sokcevic <sokcevic@google.com>
Reviewed-by: Josip Sokcevic <sokcevic@google.com>
2024-10-28 17:46:25 +00:00
Fredrik de Groot
303bd963d5 manifest: add optional base check on remove and extend
This adds an optional, built-in checker for
guarding against patches hanging on wrong
base revisions, which is useful if a lower layer of
the manifest changes after a patch was done.

When adding a patch with a new revision using
extend-project or remove-project/project:

          C---D---E patches in project bla
         /
    A---B project bla in manifest state 1

<extend-project name="bla" revision="E" base-rev="B">

If project bla gets updated, in a new snap ID
or by a supplier or similar, to a new state:

          C---D---E patches in project bla
         /
    A---B---F---G project bla in manifest state 2

Parsing will fail because revision of bla is now G,
giving the choice to create a new patch branch
from G and updating base-rev, or keeping previous
branch for some reason and only updating base-rev.

Intended for use in a layered manifest with
hashed revisions. Named refs like branches and tags
also work fine when comparing, but will be misleading
if a branch is used as base-rev.

Change-Id: Ic6211550a7d3cc9656057f6a2087c505b40cad2b
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/436777
Reviewed-by: Josip Sokcevic <sokcevic@google.com>
Tested-by: Fredrik de Groot <fredrik.de.groot@haleytek.com>
Commit-Queue: Josip Sokcevic <sokcevic@google.com>
2024-10-28 16:55:10 +00:00
Josip Sokcevic
ae384f8623 [event_log] Stop leaking semaphore resources
With the global state and fork, we are left with uncleaned resources.
Isolate mulitprocessing.Value in a function so we stop the leak.

Bug: 353656374
Change-Id: If50bb544bda12b72f00c02bc1d2c0d19de000b88
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/440261
Commit-Queue: Josip Sokcevic <sokcevic@google.com>
Reviewed-by: Gavin Mak <gavinmak@google.com>
Tested-by: Josip Sokcevic <sokcevic@google.com>
2024-10-24 16:58:17 +00:00
Kuang-che Wu
70a4e643e6 progress: always show done message
The done message was omitted if the task is shorter than 0.5s. This
might confuse users.

Bug: b/371638995
Change-Id: I3fdd2cd8daea16d34fba88457d09397fff71af15
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/440222
Tested-by: Kuang-che Wu <kcwu@google.com>
Commit-Queue: Kuang-che Wu <kcwu@google.com>
Reviewed-by: Josip Sokcevic <sokcevic@google.com>
2024-10-24 16:21:28 +00:00
Kuang-che Wu
8da4861b38 subcmds: reduce multiprocessing serialization overhead
Follow the same approach as 39ffd9977e to reduce serialization overhead.

Below benchmarks are tested with 2.7k projects on my workstation
(warm cache). git tracing is disabled for benchmark.

(seconds)              | v2.48 | v2.48 | this CL | this CL
	               |       |  -j32 |         |    -j32
-----------------------------------------------------------
with clean tree state:
branches (none)        |   5.6 |   5.9 |    1.0  |    0.9
status (clean)         |  21.3 |   9.4 |   19.4  |    4.7
diff (none)            |   7.6 |   7.2 |    5.7  |    2.2
prune (none)           |   5.7 |   6.1 |    1.3  |    1.2
abandon (none)         |  19.4 |  18.6 |    0.9  |    0.8
upload (none)          |  19.7 |  18.7 |    0.9  |    0.8
forall -c true         |   7.5 |   7.6 |    0.6  |    0.6
forall -c "git log -1" |  11.3 |  11.1 |    0.6  |    0.6

with branches:
start BRANCH --all     |  21.9 |  20.3 |   13.6  |    2.6
checkout BRANCH        |  29.1 |  27.8 |    1.1  |    1.0
branches (2)           |  28.0 |  28.6 |    1.5  |    1.3
abandon BRANCH         |  29.2 |  27.5 |    9.7  |    2.2

Bug: b/371638995
Change-Id: I53989a3d1e43063587b3f52f852b1c2c56b49412
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/440221
Reviewed-by: Josip Sokcevic <sokcevic@google.com>
Tested-by: Kuang-che Wu <kcwu@google.com>
Commit-Queue: Kuang-che Wu <kcwu@google.com>
2024-10-23 23:34:34 +00:00
Kuang-che Wu
39ffd9977e sync: reduce multiprocessing serialization overhead
Background:
 - Manifest object is large (for projects like Android) in terms of
   serialization cost and size (more than 1mb).
 - Lots of Project objects usually share only a few manifest objects.

Before this CL, Project objects were passed to workers via function
parameters. Function parameters are pickled separately (in chunk). In
other words, manifests are serialized again and again. The major
serialization overhead of repo sync was
  O(manifest_size * projects / chunksize)

This CL uses following tricks to reduce serialization overhead.
 - All projects are pickled in one invocation. Because Project objects
   share manifests, pickle library remembers which objects are already
   seen and avoid the serialization cost.
 - Pass the Project objects to workers at worker intialization time.
   And pass project index as function parameters instead. The number of
   workers is much smaller than the number of projects.
 - Worker init state are shared on Linux (fork based). So it requires
   zero serialization for Project objects.

On Linux (fork based), the serialization overhead is
  O(projects)  --- one int per project
On Windows (spawn based), the serialization overhead is
  O(manifest_size * min(workers, projects))

Moreover, use chunksize=1 to avoid the chance that some workers are idle
while other workers still have more than one job in their chunk queue.

Using 2.7k projects as the baseline, originally "repo sync" no-op
sync takes 31s for fetch and 25s for checkout on my Linux workstation.
With this CL, it takes 12s for fetch and 1s for checkout.

Bug: b/371638995
Change-Id: Ifa22072ea54eacb4a5c525c050d84de371e87caa
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/439921
Tested-by: Kuang-che Wu <kcwu@google.com>
Reviewed-by: Josip Sokcevic <sokcevic@google.com>
Commit-Queue: Kuang-che Wu <kcwu@google.com>
2024-10-23 02:58:45 +00:00
Kaushik Lingarkar
584863fb5e Fix incremental syncs for prjs with submodules
When performing an incremental sync (re-running repo init with an
updated manifest revision) with --fetch-submodules or sync-s=true,
there is an attempt to get a list of all projects (including
submodules) before projects are actually fetched. However, we can
only list submodules of a project if we have already fetched its
revision. Instead of throwing an error when we don't have the
revision, assume there are no submodules for that project. In the
sync cmd, we already update the list of projects to include
submodules after fetching superprojects.

Change-Id: I48bc68c48b5b10117356b18f5375d17f9a89ec05
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/439761
Commit-Queue: Kaushik Lingarkar <kaushik.lingarkar@linaro.org>
Tested-by: Kaushik Lingarkar <kaushik.lingarkar@linaro.org>
Reviewed-by: Josip Sokcevic <sokcevic@google.com>
Reviewed-by: Nasser Grainawi <nasser.grainawi@linaro.org>
2024-10-18 03:55:10 +00:00
19 changed files with 563 additions and 293 deletions

View File

@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import multiprocessing
import optparse
import os
@@ -70,6 +71,14 @@ class Command:
# migrated subcommands can set it to False.
MULTI_MANIFEST_SUPPORT = True
# Shared data across parallel execution workers.
_parallel_context = None
@classmethod
def get_parallel_context(cls):
assert cls._parallel_context is not None
return cls._parallel_context
def __init__(
self,
repodir=None,
@@ -242,9 +251,39 @@ class Command:
"""Perform the action, after option parsing is complete."""
raise NotImplementedError
@staticmethod
@classmethod
@contextlib.contextmanager
def ParallelContext(cls):
"""Obtains the context, which is shared to ExecuteInParallel workers.
Callers can store data in the context dict before invocation of
ExecuteInParallel. The dict will then be shared to child workers of
ExecuteInParallel.
"""
assert cls._parallel_context is None
cls._parallel_context = {}
try:
yield
finally:
cls._parallel_context = None
@classmethod
def _InitParallelWorker(cls, context, initializer):
cls._parallel_context = context
if initializer:
initializer()
@classmethod
def ExecuteInParallel(
jobs, func, inputs, callback, output=None, ordered=False
cls,
jobs,
func,
inputs,
callback,
output=None,
ordered=False,
chunksize=WORKER_BATCH_SIZE,
initializer=None,
):
"""Helper for managing parallel execution boiler plate.
@@ -269,6 +308,9 @@ class Command:
output: An output manager. May be progress.Progess or
color.Coloring.
ordered: Whether the jobs should be processed in order.
chunksize: The number of jobs processed in batch by parallel
workers.
initializer: Worker initializer.
Returns:
The |callback| function's results are returned.
@@ -278,12 +320,16 @@ class Command:
if len(inputs) == 1 or jobs == 1:
return callback(None, output, (func(x) for x in inputs))
else:
with multiprocessing.Pool(jobs) as pool:
with multiprocessing.Pool(
jobs,
initializer=cls._InitParallelWorker,
initargs=(cls._parallel_context, initializer),
) as pool:
submit = pool.imap if ordered else pool.imap_unordered
return callback(
pool,
output,
submit(func, inputs, chunksize=WORKER_BATCH_SIZE),
submit(func, inputs, chunksize=chunksize),
)
finally:
if isinstance(output, progress.Progress):

View File

@@ -107,11 +107,13 @@ following DTD:
<!ATTLIST extend-project remote CDATA #IMPLIED>
<!ATTLIST extend-project dest-branch CDATA #IMPLIED>
<!ATTLIST extend-project upstream CDATA #IMPLIED>
<!ATTLIST extend-project base-rev CDATA #IMPLIED>
<!ELEMENT remove-project EMPTY>
<!ATTLIST remove-project name CDATA #IMPLIED>
<!ATTLIST remove-project path CDATA #IMPLIED>
<!ATTLIST remove-project optional CDATA #IMPLIED>
<!ATTLIST remove-project base-rev CDATA #IMPLIED>
<!ELEMENT repo-hooks EMPTY>
<!ATTLIST repo-hooks in-project CDATA #REQUIRED>
@@ -433,6 +435,14 @@ project. Same syntax as the corresponding element of `project`.
Attribute `upstream`: If specified, overrides the upstream of the original
project. Same syntax as the corresponding element of `project`.
Attribute `base-rev`: If specified, adds a check against the revision
to be extended. Manifest parse will fail and give a list of mismatch extends
if the revisions being extended have changed since base-rev was set.
Intended for use with layered manifests using hash revisions to prevent
patch branches hiding newer upstream revisions. Also compares named refs
like branches or tags but is misleading if branches are used as base-rev.
Same syntax as the corresponding element of `project`.
### Element annotation
Zero or more annotation elements may be specified as children of a
@@ -496,6 +506,14 @@ name. Logic otherwise behaves like both are specified.
Attribute `optional`: Set to true to ignore remove-project elements with no
matching `project` element.
Attribute `base-rev`: If specified, adds a check against the revision
to be removed. Manifest parse will fail and give a list of mismatch removes
if the revisions being removed have changed since base-rev was set.
Intended for use with layered manifests using hash revisions to prevent
patch branches hiding newer upstream revisions. Also compares named refs
like branches or tags but is misleading if branches are used as base-rev.
Same syntax as the corresponding element of `project`.
### Element repo-hooks
NB: See the [practical documentation](./repo-hooks.md) for using repo hooks.

View File

@@ -96,6 +96,9 @@ If that tag is valid, then repo will warn and use that commit instead.
If that tag cannot be verified, it gives up and forces the user to resolve.
If env variable `REPO_SKIP_SELF_UPDATE` is defined, this will
bypass the self update algorithm.
### Force an update
The `repo selfupdate` command can be used to force an immediate update.

View File

@@ -168,8 +168,10 @@ class EventLog:
f.write("\n")
# An integer id that is unique across this invocation of the program.
_EVENT_ID = multiprocessing.Value("i", 1)
# An integer id that is unique across this invocation of the program, to be set
# by the first Add event. We can't set it here since it results in leaked
# resources (see: https://issues.gerritcodereview.com/353656374).
_EVENT_ID = None
def _NextEventId():
@@ -178,6 +180,12 @@ def _NextEventId():
Returns:
A unique, to this invocation of the program, integer id.
"""
global _EVENT_ID
if _EVENT_ID is None:
# There is a small chance of race condition - two parallel processes
# setting up _EVENT_ID. However, we expect TASK_COMMAND to happen before
# mp kicks in.
_EVENT_ID = multiprocessing.Value("i", 1)
with _EVENT_ID.get_lock():
val = _EVENT_ID.value
_EVENT_ID.value += 1

View File

@@ -1445,6 +1445,7 @@ https://gerrit.googlesource.com/git-repo/+/HEAD/docs/manifest-format.md
repo_hooks_project = None
enabled_repo_hooks = None
failed_revision_changes = []
for node in itertools.chain(*node_list):
if node.nodeName == "project":
project = self._ParseProject(node)
@@ -1471,6 +1472,7 @@ https://gerrit.googlesource.com/git-repo/+/HEAD/docs/manifest-format.md
remote = self._get_remote(node)
dest_branch = node.getAttribute("dest-branch")
upstream = node.getAttribute("upstream")
base_revision = node.getAttribute("base-rev")
named_projects = self._projects[name]
if dest_path and not path and len(named_projects) > 1:
@@ -1484,6 +1486,13 @@ https://gerrit.googlesource.com/git-repo/+/HEAD/docs/manifest-format.md
if groups:
p.groups.extend(groups)
if revision:
if base_revision:
if p.revisionExpr != base_revision:
failed_revision_changes.append(
"extend-project name %s mismatch base "
"%s vs revision %s"
% (name, base_revision, p.revisionExpr)
)
p.SetRevision(revision)
if remote_name:
@@ -1558,6 +1567,7 @@ https://gerrit.googlesource.com/git-repo/+/HEAD/docs/manifest-format.md
if node.nodeName == "remove-project":
name = node.getAttribute("name")
path = node.getAttribute("path")
base_revision = node.getAttribute("base-rev")
# Name or path needed.
if not name and not path:
@@ -1571,6 +1581,13 @@ https://gerrit.googlesource.com/git-repo/+/HEAD/docs/manifest-format.md
for projname, projects in list(self._projects.items()):
for p in projects:
if name == projname and not path:
if base_revision:
if p.revisionExpr != base_revision:
failed_revision_changes.append(
"remove-project name %s mismatch base "
"%s vs revision %s"
% (name, base_revision, p.revisionExpr)
)
del self._paths[p.relpath]
if not removed_project:
del self._projects[name]
@@ -1578,6 +1595,17 @@ https://gerrit.googlesource.com/git-repo/+/HEAD/docs/manifest-format.md
elif path == p.relpath and (
name == projname or not name
):
if base_revision:
if p.revisionExpr != base_revision:
failed_revision_changes.append(
"remove-project path %s mismatch base "
"%s vs revision %s"
% (
p.relpath,
base_revision,
p.revisionExpr,
)
)
self._projects[projname].remove(p)
del self._paths[p.relpath]
removed_project = p.name
@@ -1597,6 +1625,13 @@ https://gerrit.googlesource.com/git-repo/+/HEAD/docs/manifest-format.md
"project: %s" % node.toxml()
)
if failed_revision_changes:
raise ManifestParseError(
"revision base check failed, rebase patches and update "
"base revs for: ",
failed_revision_changes,
)
# Store repo hooks project information.
if repo_hooks_project:
# Store a reference to the Project.

View File

@@ -100,6 +100,7 @@ class Progress:
self._show = not delay
self._units = units
self._elide = elide and _TTY
self._quiet = quiet
# Only show the active jobs section if we run more than one in parallel.
self._show_jobs = False
@@ -114,13 +115,7 @@ class Progress:
)
self._update_thread.daemon = True
# When quiet, never show any output. It's a bit hacky, but reusing the
# existing logic that delays initial output keeps the rest of the class
# clean. Basically we set the start time to years in the future.
if quiet:
self._show = False
self._start += 2**32
elif show_elapsed:
if not quiet and show_elapsed:
self._update_thread.start()
def _update_loop(self):
@@ -160,7 +155,7 @@ class Progress:
msg = self._last_msg
self._last_msg = msg
if not _TTY or IsTraceToStderr():
if not _TTY or IsTraceToStderr() or self._quiet:
return
elapsed_sec = time.time() - self._start
@@ -202,7 +197,7 @@ class Progress:
def end(self):
self._update_event.set()
if not _TTY or IsTraceToStderr() or not self._show:
if not _TTY or IsTraceToStderr() or self._quiet:
return
duration = duration_str(time.time() - self._start)

View File

@@ -2296,7 +2296,9 @@ class Project:
try:
rev = self.GetRevisionId()
except GitError:
except (GitError, ManifestInvalidRevisionError):
# The git repo may be outdated (i.e. not fetched yet) and querying
# its submodules using the revision may not work; so return here.
return []
return get_submodules(self.gitdir, rev)
@@ -3373,24 +3375,29 @@ class Project:
setting = fp.read()
assert setting.startswith("gitdir:")
git_worktree_path = setting.split(":", 1)[1].strip()
# Some platforms (e.g. Windows) won't let us update dotgit in situ
# because of file permissions. Delete it and recreate it from scratch
# to avoid.
platform_utils.remove(dotgit)
# Use relative path from checkout->worktree & maintain Unix line endings
# on all OS's to match git behavior.
with open(dotgit, "w", newline="\n") as fp:
print(
"gitdir:",
os.path.relpath(git_worktree_path, self.worktree),
file=fp,
)
# Use relative path from worktree->checkout & maintain Unix line endings
# on all OS's to match git behavior.
with open(
os.path.join(git_worktree_path, "gitdir"), "w", newline="\n"
) as fp:
print(os.path.relpath(dotgit, git_worktree_path), file=fp)
# `gitdir` maybe be either relative or absolute depending on the
# behavior of the local copy of git, so only convert the path to
# relative if it needs to be converted.
if os.path.isabs(git_worktree_path):
# Some platforms (e.g. Windows) won't let us update dotgit in situ
# because of file permissions. Delete it and recreate it from
# scratch to avoid.
platform_utils.remove(dotgit)
# Use relative path from checkout->worktree & maintain Unix line
# endings on all OS's to match git behavior.
with open(dotgit, "w", newline="\n") as fp:
print(
"gitdir:",
os.path.relpath(git_worktree_path, self.worktree),
file=fp,
)
# Use relative path from worktree->checkout & maintain Unix line
# endings on all OS's to match git behavior.
with open(
os.path.join(git_worktree_path, "gitdir"), "w", newline="\n"
) as fp:
print(os.path.relpath(dotgit, git_worktree_path), file=fp)
self._InitMRef()

View File

@@ -70,8 +70,10 @@ It is equivalent to "git branch -D <branchname>".
else:
args.insert(0, "'All local branches'")
def _ExecuteOne(self, all_branches, nb, project):
@classmethod
def _ExecuteOne(cls, all_branches, nb, project_idx):
"""Abandon one project."""
project = cls.get_parallel_context()["projects"][project_idx]
if all_branches:
branches = project.GetBranches()
else:
@@ -89,7 +91,7 @@ It is equivalent to "git branch -D <branchname>".
if status is not None:
ret[name] = status
return (ret, project, errors)
return (ret, project_idx, errors)
def Execute(self, opt, args):
nb = args[0].split()
@@ -102,7 +104,8 @@ It is equivalent to "git branch -D <branchname>".
_RelPath = lambda p: p.RelPath(local=opt.this_manifest_only)
def _ProcessResults(_pool, pm, states):
for results, project, errors in states:
for results, project_idx, errors in states:
project = all_projects[project_idx]
for branch, status in results.items():
if status:
success[branch].append(project)
@@ -111,15 +114,18 @@ It is equivalent to "git branch -D <branchname>".
aggregate_errors.extend(errors)
pm.update(msg="")
self.ExecuteInParallel(
opt.jobs,
functools.partial(self._ExecuteOne, opt.all, nb),
all_projects,
callback=_ProcessResults,
output=Progress(
f"Abandon {nb}", len(all_projects), quiet=opt.quiet
),
)
with self.ParallelContext():
self.get_parallel_context()["projects"] = all_projects
self.ExecuteInParallel(
opt.jobs,
functools.partial(self._ExecuteOne, opt.all, nb),
range(len(all_projects)),
callback=_ProcessResults,
output=Progress(
f"Abandon {nb}", len(all_projects), quiet=opt.quiet
),
chunksize=1,
)
width = max(
itertools.chain(

View File

@@ -98,6 +98,22 @@ is shown, then the branch appears in all projects.
"""
PARALLEL_JOBS = DEFAULT_LOCAL_JOBS
@classmethod
def _ExpandProjectToBranches(cls, project_idx):
"""Expands a project into a list of branch names & associated info.
Args:
project_idx: project.Project index
Returns:
List[Tuple[str, git_config.Branch, int]]
"""
branches = []
project = cls.get_parallel_context()["projects"][project_idx]
for name, b in project.GetBranches().items():
branches.append((name, b, project_idx))
return branches
def Execute(self, opt, args):
projects = self.GetProjects(
args, all_manifests=not opt.this_manifest_only
@@ -107,17 +123,20 @@ is shown, then the branch appears in all projects.
project_cnt = len(projects)
def _ProcessResults(_pool, _output, results):
for name, b in itertools.chain.from_iterable(results):
for name, b, project_idx in itertools.chain.from_iterable(results):
b.project = projects[project_idx]
if name not in all_branches:
all_branches[name] = BranchInfo(name)
all_branches[name].add(b)
self.ExecuteInParallel(
opt.jobs,
expand_project_to_branches,
projects,
callback=_ProcessResults,
)
with self.ParallelContext():
self.get_parallel_context()["projects"] = projects
self.ExecuteInParallel(
opt.jobs,
self._ExpandProjectToBranches,
range(len(projects)),
callback=_ProcessResults,
)
names = sorted(all_branches)
@@ -191,19 +210,3 @@ is shown, then the branch appears in all projects.
else:
out.write(" in all projects")
out.nl()
def expand_project_to_branches(project):
"""Expands a project into a list of branch names & associated information.
Args:
project: project.Project
Returns:
List[Tuple[str, git_config.Branch]]
"""
branches = []
for name, b in project.GetBranches().items():
b.project = project
branches.append((name, b))
return branches

View File

@@ -20,7 +20,6 @@ from command import DEFAULT_LOCAL_JOBS
from error import GitError
from error import RepoExitError
from progress import Progress
from project import Project
from repo_logging import RepoLogger
@@ -30,7 +29,7 @@ logger = RepoLogger(__file__)
class CheckoutBranchResult(NamedTuple):
# Whether the Project is on the branch (i.e. branch exists and no errors)
result: bool
project: Project
project_idx: int
error: Exception
@@ -62,15 +61,17 @@ The command is equivalent to:
if not args:
self.Usage()
def _ExecuteOne(self, nb, project):
@classmethod
def _ExecuteOne(cls, nb, project_idx):
"""Checkout one project."""
error = None
result = None
project = cls.get_parallel_context()["projects"][project_idx]
try:
result = project.CheckoutBranch(nb)
except GitError as e:
error = e
return CheckoutBranchResult(result, project, error)
return CheckoutBranchResult(result, project_idx, error)
def Execute(self, opt, args):
nb = args[0]
@@ -83,22 +84,25 @@ The command is equivalent to:
def _ProcessResults(_pool, pm, results):
for result in results:
project = all_projects[result.project_idx]
if result.error is not None:
err.append(result.error)
err_projects.append(result.project)
err_projects.append(project)
elif result.result:
success.append(result.project)
success.append(project)
pm.update(msg="")
self.ExecuteInParallel(
opt.jobs,
functools.partial(self._ExecuteOne, nb),
all_projects,
callback=_ProcessResults,
output=Progress(
f"Checkout {nb}", len(all_projects), quiet=opt.quiet
),
)
with self.ParallelContext():
self.get_parallel_context()["projects"] = all_projects
self.ExecuteInParallel(
opt.jobs,
functools.partial(self._ExecuteOne, nb),
range(len(all_projects)),
callback=_ProcessResults,
output=Progress(
f"Checkout {nb}", len(all_projects), quiet=opt.quiet
),
)
if err_projects:
for p in err_projects:

View File

@@ -40,7 +40,8 @@ to the Unix 'patch' command.
help="paths are relative to the repository root",
)
def _ExecuteOne(self, absolute, local, project):
@classmethod
def _ExecuteOne(cls, absolute, local, project_idx):
"""Obtains the diff for a specific project.
Args:
@@ -48,12 +49,13 @@ to the Unix 'patch' command.
local: a boolean, if True, the path is relative to the local
(sub)manifest. If false, the path is relative to the outermost
manifest.
project: Project to get status of.
project_idx: Project index to get status of.
Returns:
The status of the project.
"""
buf = io.StringIO()
project = cls.get_parallel_context()["projects"][project_idx]
ret = project.PrintWorkTreeDiff(absolute, output_redir=buf, local=local)
return (ret, buf.getvalue())
@@ -71,12 +73,15 @@ to the Unix 'patch' command.
ret = 1
return ret
return self.ExecuteInParallel(
opt.jobs,
functools.partial(
self._ExecuteOne, opt.absolute, opt.this_manifest_only
),
all_projects,
callback=_ProcessResults,
ordered=True,
)
with self.ParallelContext():
self.get_parallel_context()["projects"] = all_projects
return self.ExecuteInParallel(
opt.jobs,
functools.partial(
self._ExecuteOne, opt.absolute, opt.this_manifest_only
),
range(len(all_projects)),
callback=_ProcessResults,
ordered=True,
chunksize=1,
)

View File

@@ -15,7 +15,6 @@
import errno
import functools
import io
import multiprocessing
import os
import re
import signal
@@ -26,7 +25,6 @@ from color import Coloring
from command import Command
from command import DEFAULT_LOCAL_JOBS
from command import MirrorSafeCommand
from command import WORKER_BATCH_SIZE
from error import ManifestInvalidRevisionError
from repo_logging import RepoLogger
@@ -241,7 +239,6 @@ without iterating through the remaining projects.
cmd.insert(cmd.index(cn) + 1, "--color")
mirror = self.manifest.IsMirror
rc = 0
smart_sync_manifest_name = "smart_sync_override.xml"
smart_sync_manifest_path = os.path.join(
@@ -264,35 +261,44 @@ without iterating through the remaining projects.
os.environ["REPO_COUNT"] = str(len(projects))
def _ProcessResults(_pool, _output, results):
rc = 0
first = True
for r, output in results:
if output:
if first:
first = False
elif opt.project_header:
print()
# To simplify the DoWorkWrapper, take care of automatic
# newlines.
end = "\n"
if output[-1] == "\n":
end = ""
print(output, end=end)
rc = rc or r
if r != 0 and opt.abort_on_errors:
raise Exception("Aborting due to previous error")
return rc
try:
config = self.manifest.manifestProject.config
with multiprocessing.Pool(opt.jobs, InitWorker) as pool:
results_it = pool.imap(
with self.ParallelContext():
self.get_parallel_context()["projects"] = projects
rc = self.ExecuteInParallel(
opt.jobs,
functools.partial(
DoWorkWrapper, mirror, opt, cmd, shell, config
self.DoWorkWrapper, mirror, opt, cmd, shell, config
),
enumerate(projects),
chunksize=WORKER_BATCH_SIZE,
range(len(projects)),
callback=_ProcessResults,
ordered=True,
initializer=self.InitWorker,
chunksize=1,
)
first = True
for r, output in results_it:
if output:
if first:
first = False
elif opt.project_header:
print()
# To simplify the DoWorkWrapper, take care of automatic
# newlines.
end = "\n"
if output[-1] == "\n":
end = ""
print(output, end=end)
rc = rc or r
if r != 0 and opt.abort_on_errors:
raise Exception("Aborting due to previous error")
except (KeyboardInterrupt, WorkerKeyboardInterrupt):
# Catch KeyboardInterrupt raised inside and outside of workers
rc = rc or errno.EINTR
rc = errno.EINTR
except Exception as e:
# Catch any other exceptions raised
logger.error(
@@ -300,35 +306,35 @@ without iterating through the remaining projects.
type(e).__name__,
e,
)
rc = rc or getattr(e, "errno", 1)
rc = getattr(e, "errno", 1)
if rc != 0:
sys.exit(rc)
@classmethod
def InitWorker(cls):
signal.signal(signal.SIGINT, signal.SIG_IGN)
@classmethod
def DoWorkWrapper(cls, mirror, opt, cmd, shell, config, project_idx):
"""A wrapper around the DoWork() method.
Catch the KeyboardInterrupt exceptions here and re-raise them as a
different, ``Exception``-based exception to stop it flooding the console
with stacktraces and making the parent hang indefinitely.
"""
project = cls.get_parallel_context()["projects"][project_idx]
try:
return DoWork(project, mirror, opt, cmd, shell, project_idx, config)
except KeyboardInterrupt:
print("%s: Worker interrupted" % project.name)
raise WorkerKeyboardInterrupt()
class WorkerKeyboardInterrupt(Exception):
"""Keyboard interrupt exception for worker processes."""
def InitWorker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def DoWorkWrapper(mirror, opt, cmd, shell, config, args):
"""A wrapper around the DoWork() method.
Catch the KeyboardInterrupt exceptions here and re-raise them as a
different, ``Exception``-based exception to stop it flooding the console
with stacktraces and making the parent hang indefinitely.
"""
cnt, project = args
try:
return DoWork(project, mirror, opt, cmd, shell, cnt, config)
except KeyboardInterrupt:
print("%s: Worker interrupted" % project.name)
raise WorkerKeyboardInterrupt()
def DoWork(project, mirror, opt, cmd, shell, cnt, config):
env = os.environ.copy()

View File

@@ -23,7 +23,6 @@ from error import GitError
from error import InvalidArgumentsError
from error import SilentRepoExitError
from git_command import GitCommand
from project import Project
from repo_logging import RepoLogger
@@ -40,7 +39,7 @@ class GrepColoring(Coloring):
class ExecuteOneResult(NamedTuple):
"""Result from an execute instance."""
project: Project
project_idx: int
rc: int
stdout: str
stderr: str
@@ -262,8 +261,10 @@ contain a line that matches both expressions:
help="Show only file names not containing matching lines",
)
def _ExecuteOne(self, cmd_argv, project):
@classmethod
def _ExecuteOne(cls, cmd_argv, project_idx):
"""Process one project."""
project = cls.get_parallel_context()["projects"][project_idx]
try:
p = GitCommand(
project,
@@ -274,7 +275,7 @@ contain a line that matches both expressions:
verify_command=True,
)
except GitError as e:
return ExecuteOneResult(project, -1, None, str(e), e)
return ExecuteOneResult(project_idx, -1, None, str(e), e)
try:
error = None
@@ -282,10 +283,12 @@ contain a line that matches both expressions:
except GitError as e:
rc = 1
error = e
return ExecuteOneResult(project, rc, p.stdout, p.stderr, error)
return ExecuteOneResult(project_idx, rc, p.stdout, p.stderr, error)
@staticmethod
def _ProcessResults(full_name, have_rev, opt, _pool, out, results):
def _ProcessResults(
full_name, have_rev, opt, projects, _pool, out, results
):
git_failed = False
bad_rev = False
have_match = False
@@ -293,9 +296,10 @@ contain a line that matches both expressions:
errors = []
for result in results:
project = projects[result.project_idx]
if result.rc < 0:
git_failed = True
out.project("--- project %s ---" % _RelPath(result.project))
out.project("--- project %s ---" % _RelPath(project))
out.nl()
out.fail("%s", result.stderr)
out.nl()
@@ -311,9 +315,7 @@ contain a line that matches both expressions:
):
bad_rev = True
else:
out.project(
"--- project %s ---" % _RelPath(result.project)
)
out.project("--- project %s ---" % _RelPath(project))
out.nl()
out.fail("%s", result.stderr.strip())
out.nl()
@@ -331,13 +333,13 @@ contain a line that matches both expressions:
rev, line = line.split(":", 1)
out.write("%s", rev)
out.write(":")
out.project(_RelPath(result.project))
out.project(_RelPath(project))
out.write("/")
out.write("%s", line)
out.nl()
elif full_name:
for line in r:
out.project(_RelPath(result.project))
out.project(_RelPath(project))
out.write("/")
out.write("%s", line)
out.nl()
@@ -381,16 +383,19 @@ contain a line that matches both expressions:
cmd_argv.extend(opt.revision)
cmd_argv.append("--")
git_failed, bad_rev, have_match, errors = self.ExecuteInParallel(
opt.jobs,
functools.partial(self._ExecuteOne, cmd_argv),
projects,
callback=functools.partial(
self._ProcessResults, full_name, have_rev, opt
),
output=out,
ordered=True,
)
with self.ParallelContext():
self.get_parallel_context()["projects"] = projects
git_failed, bad_rev, have_match, errors = self.ExecuteInParallel(
opt.jobs,
functools.partial(self._ExecuteOne, cmd_argv),
range(len(projects)),
callback=functools.partial(
self._ProcessResults, full_name, have_rev, opt, projects
),
output=out,
ordered=True,
chunksize=1,
)
if git_failed:
raise GrepCommandError(

View File

@@ -27,8 +27,10 @@ class Prune(PagedCommand):
"""
PARALLEL_JOBS = DEFAULT_LOCAL_JOBS
def _ExecuteOne(self, project):
@classmethod
def _ExecuteOne(cls, project_idx):
"""Process one project."""
project = cls.get_parallel_context()["projects"][project_idx]
return project.PruneHeads()
def Execute(self, opt, args):
@@ -41,13 +43,15 @@ class Prune(PagedCommand):
def _ProcessResults(_pool, _output, results):
return list(itertools.chain.from_iterable(results))
all_branches = self.ExecuteInParallel(
opt.jobs,
self._ExecuteOne,
projects,
callback=_ProcessResults,
ordered=True,
)
with self.ParallelContext():
self.get_parallel_context()["projects"] = projects
all_branches = self.ExecuteInParallel(
opt.jobs,
self._ExecuteOne,
range(len(projects)),
callback=_ProcessResults,
ordered=True,
)
if not all_branches:
return

View File

@@ -21,7 +21,6 @@ from error import RepoExitError
from git_command import git
from git_config import IsImmutable
from progress import Progress
from project import Project
from repo_logging import RepoLogger
@@ -29,7 +28,7 @@ logger = RepoLogger(__file__)
class ExecuteOneResult(NamedTuple):
project: Project
project_idx: int
error: Exception
@@ -80,18 +79,20 @@ revision specified in the manifest.
if not git.check_ref_format("heads/%s" % nb):
self.OptionParser.error("'%s' is not a valid name" % nb)
def _ExecuteOne(self, revision, nb, project):
@classmethod
def _ExecuteOne(cls, revision, nb, default_revisionExpr, project_idx):
"""Start one project."""
# If the current revision is immutable, such as a SHA1, a tag or
# a change, then we can't push back to it. Substitute with
# dest_branch, if defined; or with manifest default revision instead.
branch_merge = ""
error = None
project = cls.get_parallel_context()["projects"][project_idx]
if IsImmutable(project.revisionExpr):
if project.dest_branch:
branch_merge = project.dest_branch
else:
branch_merge = self.manifest.default.revisionExpr
branch_merge = default_revisionExpr
try:
project.StartBranch(
@@ -100,7 +101,7 @@ revision specified in the manifest.
except Exception as e:
logger.error("error: unable to checkout %s: %s", project.name, e)
error = e
return ExecuteOneResult(project, error)
return ExecuteOneResult(project_idx, error)
def Execute(self, opt, args):
nb = args[0]
@@ -120,19 +121,28 @@ revision specified in the manifest.
def _ProcessResults(_pool, pm, results):
for result in results:
if result.error:
err_projects.append(result.project)
project = all_projects[result.project_idx]
err_projects.append(project)
err.append(result.error)
pm.update(msg="")
self.ExecuteInParallel(
opt.jobs,
functools.partial(self._ExecuteOne, opt.revision, nb),
all_projects,
callback=_ProcessResults,
output=Progress(
f"Starting {nb}", len(all_projects), quiet=opt.quiet
),
)
with self.ParallelContext():
self.get_parallel_context()["projects"] = all_projects
self.ExecuteInParallel(
opt.jobs,
functools.partial(
self._ExecuteOne,
opt.revision,
nb,
self.manifest.default.revisionExpr,
),
range(len(all_projects)),
callback=_ProcessResults,
output=Progress(
f"Starting {nb}", len(all_projects), quiet=opt.quiet
),
chunksize=1,
)
if err_projects:
for p in err_projects:

View File

@@ -88,7 +88,8 @@ the following meanings:
"projects",
)
def _StatusHelper(self, quiet, local, project):
@classmethod
def _StatusHelper(cls, quiet, local, project_idx):
"""Obtains the status for a specific project.
Obtains the status for a project, redirecting the output to
@@ -99,12 +100,13 @@ the following meanings:
local: a boolean, if True, the path is relative to the local
(sub)manifest. If false, the path is relative to the outermost
manifest.
project: Project to get status of.
project_idx: Project index to get status of.
Returns:
The status of the project.
"""
buf = io.StringIO()
project = cls.get_parallel_context()["projects"][project_idx]
ret = project.PrintWorkTreeStatus(
quiet=quiet, output_redir=buf, local=local
)
@@ -143,15 +145,18 @@ the following meanings:
ret += 1
return ret
counter = self.ExecuteInParallel(
opt.jobs,
functools.partial(
self._StatusHelper, opt.quiet, opt.this_manifest_only
),
all_projects,
callback=_ProcessResults,
ordered=True,
)
with self.ParallelContext():
self.get_parallel_context()["projects"] = all_projects
counter = self.ExecuteInParallel(
opt.jobs,
functools.partial(
self._StatusHelper, opt.quiet, opt.this_manifest_only
),
range(len(all_projects)),
callback=_ProcessResults,
ordered=True,
chunksize=1,
)
if not opt.quiet and len(all_projects) == counter:
print("nothing to commit (working directory clean)")

View File

@@ -141,7 +141,7 @@ class _FetchOneResult(NamedTuple):
Attributes:
success (bool): True if successful.
project (Project): The fetched project.
project_idx (int): The fetched project index.
start (float): The starting time.time().
finish (float): The ending time.time().
remote_fetched (bool): True if the remote was actually queried.
@@ -149,7 +149,7 @@ class _FetchOneResult(NamedTuple):
success: bool
errors: List[Exception]
project: Project
project_idx: int
start: float
finish: float
remote_fetched: bool
@@ -182,14 +182,14 @@ class _CheckoutOneResult(NamedTuple):
Attributes:
success (bool): True if successful.
project (Project): The project.
project_idx (int): The project index.
start (float): The starting time.time().
finish (float): The ending time.time().
"""
success: bool
errors: List[Exception]
project: Project
project_idx: int
start: float
finish: float
@@ -592,7 +592,8 @@ later is required to fix a server side protocol bug.
branch = branch[len(R_HEADS) :]
return branch
def _GetCurrentBranchOnly(self, opt, manifest):
@classmethod
def _GetCurrentBranchOnly(cls, opt, manifest):
"""Returns whether current-branch or use-superproject options are
enabled.
@@ -710,7 +711,8 @@ later is required to fix a server side protocol bug.
if need_unload:
m.outer_client.manifest.Unload()
def _FetchProjectList(self, opt, projects):
@classmethod
def _FetchProjectList(cls, opt, projects):
"""Main function of the fetch worker.
The projects we're given share the same underlying git object store, so
@@ -722,21 +724,23 @@ later is required to fix a server side protocol bug.
opt: Program options returned from optparse. See _Options().
projects: Projects to fetch.
"""
return [self._FetchOne(opt, x) for x in projects]
return [cls._FetchOne(opt, x) for x in projects]
def _FetchOne(self, opt, project):
@classmethod
def _FetchOne(cls, opt, project_idx):
"""Fetch git objects for a single project.
Args:
opt: Program options returned from optparse. See _Options().
project: Project object for the project to fetch.
project_idx: Project index for the project to fetch.
Returns:
Whether the fetch was successful.
"""
project = cls.get_parallel_context()["projects"][project_idx]
start = time.time()
k = f"{project.name} @ {project.relpath}"
self._sync_dict[k] = start
cls.get_parallel_context()["sync_dict"][k] = start
success = False
remote_fetched = False
errors = []
@@ -746,7 +750,7 @@ later is required to fix a server side protocol bug.
quiet=opt.quiet,
verbose=opt.verbose,
output_redir=buf,
current_branch_only=self._GetCurrentBranchOnly(
current_branch_only=cls._GetCurrentBranchOnly(
opt, project.manifest
),
force_sync=opt.force_sync,
@@ -756,7 +760,7 @@ later is required to fix a server side protocol bug.
optimized_fetch=opt.optimized_fetch,
retry_fetches=opt.retry_fetches,
prune=opt.prune,
ssh_proxy=self.ssh_proxy,
ssh_proxy=cls.get_parallel_context()["ssh_proxy"],
clone_filter=project.manifest.CloneFilter,
partial_clone_exclude=project.manifest.PartialCloneExclude,
clone_filter_for_depth=project.manifest.CloneFilterForDepth,
@@ -788,24 +792,20 @@ later is required to fix a server side protocol bug.
type(e).__name__,
e,
)
del self._sync_dict[k]
errors.append(e)
raise
finally:
del cls.get_parallel_context()["sync_dict"][k]
finish = time.time()
del self._sync_dict[k]
return _FetchOneResult(
success, errors, project, start, finish, remote_fetched
success, errors, project_idx, start, finish, remote_fetched
)
@classmethod
def _FetchInitChild(cls, ssh_proxy):
cls.ssh_proxy = ssh_proxy
def _GetSyncProgressMessage(self):
earliest_time = float("inf")
earliest_proj = None
items = self._sync_dict.items()
items = self.get_parallel_context()["sync_dict"].items()
for project, t in items:
if t < earliest_time:
earliest_time = t
@@ -813,7 +813,7 @@ later is required to fix a server side protocol bug.
if not earliest_proj:
# This function is called when sync is still running but in some
# cases (by chance), _sync_dict can contain no entries. Return some
# cases (by chance), sync_dict can contain no entries. Return some
# text to indicate that sync is still working.
return "..working.."
@@ -821,6 +821,16 @@ later is required to fix a server side protocol bug.
jobs = jobs_str(len(items))
return f"{jobs} | {elapsed_str(elapsed)} {earliest_proj}"
@classmethod
def InitWorker(cls):
# Force connect to the manager server now.
# This is good because workers are initialized one by one. Without this,
# multiple workers may connect to the manager when handling the first
# job at the same time. Then the connection may fail if too many
# connections are pending and execeeded the socket listening backlog,
# especially on MacOS.
len(cls.get_parallel_context()["sync_dict"])
def _Fetch(self, projects, opt, err_event, ssh_proxy, errors):
ret = True
@@ -835,7 +845,6 @@ later is required to fix a server side protocol bug.
elide=True,
)
self._sync_dict = multiprocessing.Manager().dict()
sync_event = _threading.Event()
def _MonitorSyncLoop():
@@ -846,21 +855,13 @@ later is required to fix a server side protocol bug.
sync_progress_thread = _threading.Thread(target=_MonitorSyncLoop)
sync_progress_thread.daemon = True
sync_progress_thread.start()
objdir_project_map = dict()
for project in projects:
objdir_project_map.setdefault(project.objdir, []).append(project)
projects_list = list(objdir_project_map.values())
jobs = min(opt.jobs_network, len(projects_list))
def _ProcessResults(results_sets):
def _ProcessResults(pool, pm, results_sets):
ret = True
for results in results_sets:
for result in results:
success = result.success
project = result.project
project = projects[result.project_idx]
start = result.start
finish = result.finish
self._fetch_times.Set(project, finish - start)
@@ -884,45 +885,50 @@ later is required to fix a server side protocol bug.
fetched.add(project.gitdir)
pm.update()
if not ret and opt.fail_fast:
if pool:
pool.close()
break
return ret
# We pass the ssh proxy settings via the class. This allows
# multiprocessing to pickle it up when spawning children. We can't pass
# it as an argument to _FetchProjectList below as multiprocessing is
# unable to pickle those.
Sync.ssh_proxy = None
with self.ParallelContext():
self.get_parallel_context()["projects"] = projects
self.get_parallel_context()[
"sync_dict"
] = multiprocessing.Manager().dict()
# NB: Multiprocessing is heavy, so don't spin it up for one job.
if jobs == 1:
self._FetchInitChild(ssh_proxy)
if not _ProcessResults(
self._FetchProjectList(opt, x) for x in projects_list
):
ret = False
else:
objdir_project_map = dict()
for index, project in enumerate(projects):
objdir_project_map.setdefault(project.objdir, []).append(index)
projects_list = list(objdir_project_map.values())
jobs = min(opt.jobs_network, len(projects_list))
# We pass the ssh proxy settings via the class. This allows
# multiprocessing to pickle it up when spawning children. We can't
# pass it as an argument to _FetchProjectList below as
# multiprocessing is unable to pickle those.
self.get_parallel_context()["ssh_proxy"] = ssh_proxy
sync_progress_thread.start()
if not opt.quiet:
pm.update(inc=0, msg="warming up")
with multiprocessing.Pool(
jobs, initializer=self._FetchInitChild, initargs=(ssh_proxy,)
) as pool:
results = pool.imap_unordered(
try:
ret = self.ExecuteInParallel(
jobs,
functools.partial(self._FetchProjectList, opt),
projects_list,
chunksize=_chunksize(len(projects_list), jobs),
callback=_ProcessResults,
output=pm,
# Use chunksize=1 to avoid the chance that some workers are
# idle while other workers still have more than one job in
# their chunk queue.
chunksize=1,
initializer=self.InitWorker,
)
if not _ProcessResults(results):
ret = False
pool.close()
finally:
sync_event.set()
sync_progress_thread.join()
# Cleanup the reference now that we're done with it, and we're going to
# release any resources it points to. If we don't, later
# multiprocessing usage (e.g. checkouts) will try to pickle and then
# crash.
del Sync.ssh_proxy
sync_event.set()
pm.end()
self._fetch_times.Save()
self._local_sync_state.Save()
@@ -963,7 +969,9 @@ later is required to fix a server side protocol bug.
if not success:
err_event.set()
_PostRepoFetch(rp, opt.repo_verify)
# Call self update, unless requested not to
if os.environ.get("REPO_SKIP_SELF_UPDATE", "0") == "0":
_PostRepoFetch(rp, opt.repo_verify)
if opt.network_only:
# Bail out now; the rest touches the working tree.
if err_event.is_set():
@@ -1008,14 +1016,15 @@ later is required to fix a server side protocol bug.
return _FetchMainResult(all_projects)
@classmethod
def _CheckoutOne(
self,
cls,
detach_head,
force_sync,
force_checkout,
force_rebase,
verbose,
project,
project_idx,
):
"""Checkout work tree for one project
@@ -1027,11 +1036,12 @@ later is required to fix a server side protocol bug.
force_checkout: Force checking out of the repo content.
force_rebase: Force rebase.
verbose: Whether to show verbose messages.
project: Project object for the project to checkout.
project_idx: Project index for the project to checkout.
Returns:
Whether the fetch was successful.
"""
project = cls.get_parallel_context()["projects"][project_idx]
start = time.time()
syncbuf = SyncBuffer(
project.manifest.manifestProject.config, detach_head=detach_head
@@ -1065,7 +1075,7 @@ later is required to fix a server side protocol bug.
if not success:
logger.error("error: Cannot checkout %s", project.name)
finish = time.time()
return _CheckoutOneResult(success, errors, project, start, finish)
return _CheckoutOneResult(success, errors, project_idx, start, finish)
def _Checkout(self, all_projects, opt, err_results, checkout_errors):
"""Checkout projects listed in all_projects
@@ -1083,7 +1093,9 @@ later is required to fix a server side protocol bug.
ret = True
for result in results:
success = result.success
project = result.project
project = self.get_parallel_context()["projects"][
result.project_idx
]
start = result.start
finish = result.finish
self.event_log.AddSync(
@@ -1110,22 +1122,28 @@ later is required to fix a server side protocol bug.
return ret
for projects in _SafeCheckoutOrder(all_projects):
proc_res = self.ExecuteInParallel(
opt.jobs_checkout,
functools.partial(
self._CheckoutOne,
opt.detach_head,
opt.force_sync,
opt.force_checkout,
opt.rebase,
opt.verbose,
),
projects,
callback=_ProcessResults,
output=Progress(
"Checking out", len(all_projects), quiet=opt.quiet
),
)
with self.ParallelContext():
self.get_parallel_context()["projects"] = projects
proc_res = self.ExecuteInParallel(
opt.jobs_checkout,
functools.partial(
self._CheckoutOne,
opt.detach_head,
opt.force_sync,
opt.force_checkout,
opt.rebase,
opt.verbose,
),
range(len(projects)),
callback=_ProcessResults,
output=Progress(
"Checking out", len(all_projects), quiet=opt.quiet
),
# Use chunksize=1 to avoid the chance that some workers are
# idle while other workers still have more than one job in
# their chunk queue.
chunksize=1,
)
self._local_sync_state.Save()
return proc_res and not err_results

View File

@@ -603,19 +603,22 @@ Gerrit Code Review: https://www.gerritcodereview.com/
full_dest = destination
if not full_dest.startswith(R_HEADS):
full_dest = R_HEADS + full_dest
full_revision = branch.project.revisionExpr
if not full_revision.startswith(R_HEADS):
full_revision = R_HEADS + full_revision
# If the merge branch of the local branch is different from
# the project's revision AND destination, this might not be
# intentional.
if (
merge_branch
and merge_branch != branch.project.revisionExpr
and merge_branch != full_revision
and merge_branch != full_dest
):
print(
f"For local branch {branch.name}: merge branch "
f"{merge_branch} does not match destination branch "
f"{destination}"
f"{destination} and revision {branch.project.revisionExpr}"
)
print("skipping upload.")
print(
@@ -713,16 +716,17 @@ Gerrit Code Review: https://www.gerritcodereview.com/
merge_branch = p.stdout.strip()
return merge_branch
@staticmethod
def _GatherOne(opt, project):
@classmethod
def _GatherOne(cls, opt, project_idx):
"""Figure out the upload status for |project|."""
project = cls.get_parallel_context()["projects"][project_idx]
if opt.current_branch:
cbr = project.CurrentBranch
up_branch = project.GetUploadableBranch(cbr)
avail = [up_branch] if up_branch else None
else:
avail = project.GetUploadableBranches(opt.branch)
return (project, avail)
return (project_idx, avail)
def Execute(self, opt, args):
projects = self.GetProjects(
@@ -732,7 +736,8 @@ Gerrit Code Review: https://www.gerritcodereview.com/
def _ProcessResults(_pool, _out, results):
pending = []
for result in results:
project, avail = result
project_idx, avail = result
project = projects[project_idx]
if avail is None:
logger.error(
'repo: error: %s: Unable to upload branch "%s". '
@@ -743,15 +748,17 @@ Gerrit Code Review: https://www.gerritcodereview.com/
project.manifest.branch,
)
elif avail:
pending.append(result)
pending.append((project, avail))
return pending
pending = self.ExecuteInParallel(
opt.jobs,
functools.partial(self._GatherOne, opt),
projects,
callback=_ProcessResults,
)
with self.ParallelContext():
self.get_parallel_context()["projects"] = projects
pending = self.ExecuteInParallel(
opt.jobs,
functools.partial(self._GatherOne, opt),
range(len(projects)),
callback=_ProcessResults,
)
if not pending:
if opt.branch is None:

View File

@@ -1049,6 +1049,91 @@ class RemoveProjectElementTests(ManifestParseTestCase):
self.assertTrue(found_proj1_path1)
self.assertTrue(found_proj2)
def test_base_revision_checks_on_patching(self):
manifest_fail_wrong_tag = self.getXmlManifest(
"""
<manifest>
<remote name="default-remote" fetch="http://localhost" />
<default remote="default-remote" revision="tag.002" />
<project name="project1" path="tests/path1" />
<extend-project name="project1" revision="new_hash" base-rev="tag.001" />
</manifest>
"""
)
with self.assertRaises(error.ManifestParseError):
manifest_fail_wrong_tag.ToXml()
manifest_fail_remove = self.getXmlManifest(
"""
<manifest>
<remote name="default-remote" fetch="http://localhost" />
<default remote="default-remote" revision="refs/heads/main" />
<project name="project1" path="tests/path1" revision="hash1" />
<remove-project name="project1" base-rev="wrong_hash" />
</manifest>
"""
)
with self.assertRaises(error.ManifestParseError):
manifest_fail_remove.ToXml()
manifest_fail_extend = self.getXmlManifest(
"""
<manifest>
<remote name="default-remote" fetch="http://localhost" />
<default remote="default-remote" revision="refs/heads/main" />
<project name="project1" path="tests/path1" revision="hash1" />
<extend-project name="project1" revision="new_hash" base-rev="wrong_hash" />
</manifest>
"""
)
with self.assertRaises(error.ManifestParseError):
manifest_fail_extend.ToXml()
manifest_fail_unknown = self.getXmlManifest(
"""
<manifest>
<remote name="default-remote" fetch="http://localhost" />
<default remote="default-remote" revision="refs/heads/main" />
<project name="project1" path="tests/path1" />
<extend-project name="project1" revision="new_hash" base-rev="any_hash" />
</manifest>
"""
)
with self.assertRaises(error.ManifestParseError):
manifest_fail_unknown.ToXml()
manifest_ok = self.getXmlManifest(
"""
<manifest>
<remote name="default-remote" fetch="http://localhost" />
<default remote="default-remote" revision="refs/heads/main" />
<project name="project1" path="tests/path1" revision="hash1" />
<project name="project2" path="tests/path2" revision="hash2" />
<project name="project3" path="tests/path3" revision="hash3" />
<project name="project4" path="tests/path4" revision="hash4" />
<remove-project name="project1" />
<remove-project name="project2" base-rev="hash2" />
<project name="project2" path="tests/path2" revision="new_hash2" />
<extend-project name="project3" base-rev="hash3" revision="new_hash3" />
<extend-project name="project3" base-rev="new_hash3" revision="newer_hash3" />
<remove-project path="tests/path4" base-rev="hash4" />
</manifest>
"""
)
found_proj2 = False
found_proj3 = False
for proj in manifest_ok.projects:
if proj.name == "project2":
found_proj2 = True
if proj.name == "project3":
found_proj3 = True
self.assertNotEqual(proj.name, "project1")
self.assertNotEqual(proj.name, "project4")
self.assertTrue(found_proj2)
self.assertTrue(found_proj3)
self.assertTrue(len(manifest_ok.projects) == 2)
class ExtendProjectElementTests(ManifestParseTestCase):
"""Tests for <extend-project>."""