mirror of
https://git.yoctoproject.org/poky
synced 2026-07-01 10:57:21 +00:00
5cc40d3e64
Fixes [YOCTO #16130] When extracting the instance name from a template instances such as 'example@host.domain.com.service', the systemctl replacement script will split the instance on the first period, producing an instance argument of 'host' and a template of 'example@.domain.com.service'. This is incorrect, as systemd will split on the last period, producing an instance argument of 'host.domain.com' and a template of 'example@.service'. When constructing the template name, the script will also pass the string as is to re.sub(), which will try to process any backslash escapes in the string. These are legal in systemd unit names and should be preserved. They also are not valid Python escape sequences. Use re.escape() to preserve anything in the unit name that might be considered a regex exscape. (From OE-Core rev: 0514c317523330f75937123c45bb0528e4830f61) Signed-off-by: Trent Piepho <trent.piepho@igorinstitute.com> Signed-off-by: Yoann Congal <yoann.congal@smile.fr> Signed-off-by: Paul Barker <paul@pbarker.dev>
364 lines
12 KiB
Python
Executable File
364 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""systemctl: subset of systemctl used for image construction
|
|
|
|
Mask/preset systemd units
|
|
"""
|
|
|
|
import argparse
|
|
import fnmatch
|
|
import os
|
|
import re
|
|
import sys
|
|
|
|
from collections import namedtuple
|
|
from itertools import chain
|
|
from pathlib import Path
|
|
|
|
version = 1.0
|
|
|
|
ROOT = Path("/")
|
|
SYSCONFDIR = Path("etc")
|
|
BASE_LIBDIR = Path("lib")
|
|
LIBDIR = Path("usr", "lib")
|
|
|
|
locations = list()
|
|
|
|
|
|
class SystemdFile():
|
|
"""Class representing a single systemd configuration file"""
|
|
|
|
_clearable_keys = ['WantedBy']
|
|
|
|
def __init__(self, root, path, instance_unit_name):
|
|
self.sections = dict()
|
|
self._parse(root, path)
|
|
dirname = os.path.basename(path.name) + ".d"
|
|
for location in locations:
|
|
files = (root / location / "system" / dirname).glob("*.conf")
|
|
if instance_unit_name:
|
|
inst_dirname = instance_unit_name + ".d"
|
|
files = chain(files, (root / location / "system" / inst_dirname).glob("*.conf"))
|
|
for path2 in sorted(files):
|
|
self._parse(root, path2)
|
|
|
|
def _parse(self, root, path):
|
|
"""Parse a systemd syntax configuration file
|
|
|
|
Args:
|
|
path: A pathlib.Path object pointing to the file
|
|
|
|
"""
|
|
skip_re = re.compile(r"^\s*([#;]|$)")
|
|
section_re = re.compile(r"^\s*\[(?P<section>.*)\]")
|
|
kv_re = re.compile(r"^\s*(?P<key>[^\s]+)\s*=\s*(?P<value>.*)")
|
|
section = None
|
|
|
|
if path.is_symlink():
|
|
try:
|
|
path.resolve()
|
|
except FileNotFoundError:
|
|
# broken symlink, try relative to root
|
|
path = root / Path(os.readlink(str(path))).relative_to(ROOT)
|
|
|
|
with path.open() as f:
|
|
for line in f:
|
|
if skip_re.match(line):
|
|
continue
|
|
|
|
line = line.strip()
|
|
m = section_re.match(line)
|
|
if m:
|
|
if m.group('section') not in self.sections:
|
|
section = dict()
|
|
self.sections[m.group('section')] = section
|
|
else:
|
|
section = self.sections[m.group('section')]
|
|
continue
|
|
|
|
while line.endswith("\\"):
|
|
line += f.readline().rstrip("\n")
|
|
|
|
m = kv_re.match(line)
|
|
k = m.group('key')
|
|
v = m.group('value')
|
|
if k not in section:
|
|
section[k] = list()
|
|
|
|
# If we come across a "key=" line for a "clearable key", then
|
|
# forget all preceding assignments. This works because we are
|
|
# processing files in correct parse order.
|
|
if k in self._clearable_keys and not v:
|
|
del section[k]
|
|
continue
|
|
|
|
section[k].extend(v.split())
|
|
|
|
def get(self, section, prop):
|
|
"""Get a property from section
|
|
|
|
Args:
|
|
section: Section to retrieve property from
|
|
prop: Property to retrieve
|
|
|
|
Returns:
|
|
List representing all properties of type prop in section.
|
|
|
|
Raises:
|
|
KeyError: if ``section`` or ``prop`` not found
|
|
"""
|
|
return self.sections[section][prop]
|
|
|
|
|
|
class Presets():
|
|
"""Class representing all systemd presets"""
|
|
def __init__(self, scope, root):
|
|
self.directives = list()
|
|
self._collect_presets(scope, root)
|
|
|
|
def _parse_presets(self, presets):
|
|
"""Parse presets out of a set of preset files"""
|
|
skip_re = re.compile(r"^\s*([#;]|$)")
|
|
directive_re = re.compile(r"^\s*(?P<action>enable|disable)\s+(?P<unit_name>(.+))")
|
|
|
|
Directive = namedtuple("Directive", "action unit_name")
|
|
for preset in presets:
|
|
with preset.open() as f:
|
|
for line in f:
|
|
m = directive_re.match(line)
|
|
if m:
|
|
directive = Directive(action=m.group('action'),
|
|
unit_name=m.group('unit_name'))
|
|
self.directives.append(directive)
|
|
elif skip_re.match(line):
|
|
pass
|
|
else:
|
|
sys.exit("Unparsed preset line in {}".format(preset))
|
|
|
|
def _collect_presets(self, scope, root):
|
|
"""Collect list of preset files"""
|
|
presets = dict()
|
|
for location in locations:
|
|
paths = (root / location / scope).glob("*.preset")
|
|
for path in paths:
|
|
# earlier names override later ones
|
|
if path.name not in presets:
|
|
presets[path.name] = path
|
|
|
|
self._parse_presets([v for k, v in sorted(presets.items())])
|
|
|
|
def state(self, unit_name):
|
|
"""Return state of preset for unit_name
|
|
|
|
Args:
|
|
presets: set of presets
|
|
unit_name: name of the unit
|
|
|
|
Returns:
|
|
None: no matching preset
|
|
`enable`: unit_name is enabled
|
|
`disable`: unit_name is disabled
|
|
"""
|
|
for directive in self.directives:
|
|
if fnmatch.fnmatch(unit_name, directive.unit_name):
|
|
return directive.action
|
|
|
|
return None
|
|
|
|
|
|
def add_link(path, target):
|
|
try:
|
|
path.parent.mkdir(parents=True)
|
|
except FileExistsError:
|
|
pass
|
|
if not path.is_symlink():
|
|
print("ln -s {} {}".format(target, path))
|
|
path.symlink_to(target)
|
|
|
|
|
|
class SystemdUnitNotFoundError(Exception):
|
|
def __init__(self, path, unit):
|
|
self.path = path
|
|
self.unit = unit
|
|
|
|
|
|
class SystemdUnit():
|
|
def __init__(self, root, unit):
|
|
self.root = root
|
|
self.unit = unit
|
|
self.config = None
|
|
|
|
def _path_for_unit(self, unit):
|
|
for location in locations:
|
|
path = self.root / location / "system" / unit
|
|
if path.exists() or path.is_symlink():
|
|
return path
|
|
|
|
raise SystemdUnitNotFoundError(self.root, unit)
|
|
|
|
def _process_deps(self, config, service, location, prop, dirstem, instance):
|
|
systemdir = self.root / SYSCONFDIR / "systemd" / "system"
|
|
|
|
target = ROOT / location.relative_to(self.root)
|
|
try:
|
|
for dependent in config.get('Install', prop):
|
|
# expand any %i to instance (ignoring escape sequence %%)
|
|
if instance is not None:
|
|
dependent = re.sub("([^%](%%)*)%i", "\\g<1>{}".format(re.escape(instance)), dependent)
|
|
wants = systemdir / "{}.{}".format(dependent, dirstem) / service
|
|
add_link(wants, target)
|
|
|
|
except KeyError:
|
|
pass
|
|
|
|
def enable(self, units_enabled=[]):
|
|
# if we're enabling an instance, first extract the actual instance
|
|
# then figure out what the template unit is
|
|
template = re.match(r"[^@]+@(?P<instance>.*)\.", self.unit)
|
|
instance_unit_name = None
|
|
if template:
|
|
instance = template.group('instance')
|
|
if instance != "":
|
|
instance_unit_name = self.unit
|
|
unit = re.sub(r"@{}\.".format(re.escape(instance)), "@.", self.unit, 1)
|
|
else:
|
|
instance = None
|
|
unit = self.unit
|
|
|
|
path = self._path_for_unit(unit)
|
|
|
|
if path.is_symlink():
|
|
# ignore aliases
|
|
return
|
|
|
|
config = SystemdFile(self.root, path, instance_unit_name)
|
|
if instance == "":
|
|
try:
|
|
default_instance = config.get('Install', 'DefaultInstance')[0]
|
|
except KeyError:
|
|
# no default instance, so nothing to enable
|
|
return
|
|
|
|
service = self.unit.replace("@.",
|
|
"@{}.".format(default_instance))
|
|
else:
|
|
service = self.unit
|
|
|
|
self._process_deps(config, service, path, 'WantedBy', 'wants', instance)
|
|
self._process_deps(config, service, path, 'RequiredBy', 'requires', instance)
|
|
|
|
try:
|
|
for also in config.get('Install', 'Also'):
|
|
try:
|
|
units_enabled.append(unit)
|
|
if also not in units_enabled:
|
|
SystemdUnit(self.root, also).enable(units_enabled)
|
|
except SystemdUnitNotFoundError as e:
|
|
sys.exit("Error: Systemctl also enable issue with %s (%s)" % (service, e.unit))
|
|
|
|
except KeyError:
|
|
pass
|
|
|
|
systemdir = self.root / SYSCONFDIR / "systemd" / "system"
|
|
target = ROOT / path.relative_to(self.root)
|
|
try:
|
|
for dest in config.get('Install', 'Alias'):
|
|
alias = systemdir / dest
|
|
add_link(alias, target)
|
|
|
|
except KeyError:
|
|
pass
|
|
|
|
def mask(self):
|
|
systemdir = self.root / SYSCONFDIR / "systemd" / "system"
|
|
add_link(systemdir / self.unit, "/dev/null")
|
|
|
|
|
|
def collect_services(root):
|
|
"""Collect list of service files"""
|
|
services = set()
|
|
for location in locations:
|
|
paths = (root / location / "system").glob("*")
|
|
for path in paths:
|
|
if path.is_dir():
|
|
continue
|
|
services.add(path.name)
|
|
|
|
return services
|
|
|
|
|
|
def preset_all(root):
|
|
presets = Presets('system-preset', root)
|
|
services = collect_services(root)
|
|
|
|
for service in services:
|
|
state = presets.state(service)
|
|
|
|
if state == "enable" or state is None:
|
|
try:
|
|
SystemdUnit(root, service).enable()
|
|
except SystemdUnitNotFoundError:
|
|
sys.exit("Error: Systemctl preset_all issue in %s" % service)
|
|
|
|
# If we populate the systemd links we also create /etc/machine-id, which
|
|
# allows systemd to boot with the filesystem read-only before generating
|
|
# a real value and then committing it back.
|
|
#
|
|
# For the stateless configuration, where /etc is generated at runtime
|
|
# (for example on a tmpfs), this script shouldn't run at all and we
|
|
# allow systemd to completely populate /etc.
|
|
(root / SYSCONFDIR / "machine-id").touch()
|
|
|
|
|
|
def main():
|
|
if sys.version_info < (3, 4, 0):
|
|
sys.exit("Python 3.4 or greater is required")
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('command', nargs='?', choices=['enable', 'mask',
|
|
'preset-all'])
|
|
parser.add_argument('service', nargs=argparse.REMAINDER)
|
|
parser.add_argument('--root')
|
|
parser.add_argument('--preset-mode',
|
|
choices=['full', 'enable-only', 'disable-only'],
|
|
default='full')
|
|
|
|
args = parser.parse_args()
|
|
|
|
root = Path(args.root) if args.root else ROOT
|
|
|
|
locations.append(SYSCONFDIR / "systemd")
|
|
# Handle the usrmerge case by ignoring /lib when it's a symlink
|
|
if not (root / BASE_LIBDIR).is_symlink():
|
|
locations.append(BASE_LIBDIR / "systemd")
|
|
locations.append(LIBDIR / "systemd")
|
|
|
|
command = args.command
|
|
if not command:
|
|
parser.print_help()
|
|
return 0
|
|
|
|
if command == "mask":
|
|
for service in args.service:
|
|
try:
|
|
SystemdUnit(root, service).mask()
|
|
except SystemdUnitNotFoundError as e:
|
|
sys.exit("Error: Systemctl main mask issue in %s (%s)" % (service, e.unit))
|
|
elif command == "enable":
|
|
for service in args.service:
|
|
try:
|
|
SystemdUnit(root, service).enable()
|
|
except SystemdUnitNotFoundError as e:
|
|
sys.exit("Error: Systemctl main enable issue in %s (%s)" % (service, e.unit))
|
|
elif command == "preset-all":
|
|
if len(args.service) != 0:
|
|
sys.exit("Too many arguments.")
|
|
if args.preset_mode != "enable-only":
|
|
sys.exit("Only enable-only is supported as preset-mode.")
|
|
preset_all(root)
|
|
else:
|
|
raise RuntimeError()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|