git_refs: read refs via git plumbing for files/reftable

Replace direct `packed-refs` file parsing with `git for-each-ref`
plumbing to support both `files` and `reftable` backends.

Bug: 476209856
Change-Id: I2ad8ff8f3382426600f15370c997f9bc17165485
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/550401
Commit-Queue: Gavin Mak <gavinmak@google.com>
Reviewed-by: Mike Frysinger <vapier@google.com>
Tested-by: Gavin Mak <gavinmak@google.com>
This commit is contained in:
Gavin Mak
2026-02-06 14:55:34 -08:00
committed by LUCI
parent 551087cd98
commit 67881c0c3b
4 changed files with 237 additions and 62 deletions
+88 -57
View File
@@ -14,6 +14,7 @@
import os
from git_command import GitCommand
import platform_utils
from repo_trace import Trace
@@ -86,9 +87,8 @@ class GitRefs:
self._symref = {}
self._mtime = {}
self._ReadPackedRefs()
self._ReadLoose("refs/")
self._ReadLoose1(os.path.join(self._gitdir, HEAD), HEAD)
self._ReadRefs()
self._ReadSymbolicRef(HEAD)
scan = self._symref
attempts = 0
@@ -102,64 +102,95 @@ class GitRefs:
scan = scan_next
attempts += 1
def _ReadPackedRefs(self):
path = os.path.join(self._gitdir, "packed-refs")
self._TrackMtime(HEAD)
self._TrackMtime("config")
self._TrackMtime("packed-refs")
self._TrackTreeMtimes("refs")
self._TrackTreeMtimes("reftable")
@staticmethod
def _IsNullRef(ref_id: str) -> bool:
"""Check if a ref_id is a null object ID."""
return ref_id and all(ch == "0" for ch in ref_id)
def _ReadRefs(self) -> None:
"""Read all references using git for-each-ref."""
p = GitCommand(
None,
["for-each-ref", "--format=%(objectname)%00%(refname)%00%(symref)"],
capture_stdout=True,
capture_stderr=True,
bare=True,
gitdir=self._gitdir,
)
if p.Wait() != 0:
return
for line in p.stdout.splitlines():
ref_id, name, symref = line.split("\0")
if symref:
self._symref[name] = symref
elif ref_id and not self._IsNullRef(ref_id):
self._phyref[name] = ref_id
def _ReadSymbolicRef(self, name: str) -> None:
"""Read a symbolic reference."""
p = GitCommand(
None,
["symbolic-ref", "-q", name],
capture_stdout=True,
capture_stderr=True,
bare=True,
gitdir=self._gitdir,
)
if p.Wait() == 0:
ref = p.stdout.strip()
if ref:
self._symref[name] = ref
return
p = GitCommand(
None,
["rev-parse", "--verify", "-q", name],
capture_stdout=True,
capture_stderr=True,
bare=True,
gitdir=self._gitdir,
)
if p.Wait() == 0:
ref_id = p.stdout.strip()
if ref_id:
self._phyref[name] = ref_id
def _TrackMtime(self, name: str) -> None:
"""Track the modification time of a specific gitdir path."""
path = os.path.join(self._gitdir, name)
try:
fd = open(path)
mtime = os.path.getmtime(path)
self._mtime[name] = os.path.getmtime(path)
except OSError:
return
def _TrackTreeMtimes(self, root: str) -> None:
"""Recursively track modification times for a directory tree."""
root_path = os.path.join(self._gitdir, root)
try:
for line in fd:
line = str(line)
if line[0] == "#":
continue
if line[0] == "^":
continue
line = line[:-1]
p = line.split(" ")
ref_id = p[0]
name = p[1]
self._phyref[name] = ref_id
finally:
fd.close()
self._mtime["packed-refs"] = mtime
def _ReadLoose(self, prefix):
base = os.path.join(self._gitdir, prefix)
for name in platform_utils.listdir(base):
p = os.path.join(base, name)
# We don't implement the full ref validation algorithm, just the
# simple rules that would show up in local filesystems.
# https://git-scm.com/docs/git-check-ref-format
if name.startswith(".") or name.endswith(".lock"):
pass
elif platform_utils.isdir(p):
self._mtime[prefix] = os.path.getmtime(base)
self._ReadLoose(prefix + name + "/")
else:
self._ReadLoose1(p, prefix + name)
def _ReadLoose1(self, path, name):
try:
with open(path) as fd:
mtime = os.path.getmtime(path)
ref_id = fd.readline()
except (OSError, UnicodeError):
if not platform_utils.isdir(root_path):
return
except OSError:
return
try:
ref_id = ref_id.decode()
except AttributeError:
pass
if not ref_id:
return
ref_id = ref_id[:-1]
to_scan = [root]
while to_scan:
name = to_scan.pop()
self._TrackMtime(name)
path = os.path.join(self._gitdir, name)
if not platform_utils.isdir(path):
continue
if ref_id.startswith("ref: "):
self._symref[name] = ref_id[5:]
else:
self._phyref[name] = ref_id
self._mtime[name] = mtime
for child in platform_utils.listdir(path):
child_name = os.path.join(name, child)
child_path = os.path.join(self._gitdir, child_name)
if platform_utils.isdir(child_path):
to_scan.append(child_name)
else:
self._TrackMtime(child_name)