1
0
mirror of https://git.yoctoproject.org/meta-arm synced 2026-06-02 13:30:09 +00:00
Files
meta-arm/scripts/runfvp
T
Ross Burton 0a64644bc9 runfvp: reset the process group on startup
So that it is easy to kill runfvp and everything it starts (such as
telnet or the FVP itself), reset the process group on startup.

Signed-off-by: Ross Burton <ross.burton@arm.com>
Signed-off-by: Jon Mason <jon.mason@arm.com>
2021-10-20 14:07:11 -04:00

226 lines
7.5 KiB
Python
Executable File

#! /usr/bin/env python3
import asyncio
import json
import os
import re
import sys
import subprocess
import pathlib
import logging
logger = logging.getLogger("RunFVP")
# TODO: option to switch between telnet and netcat
connect_command = "telnet localhost %port"
terminal_map = {
"tmux": f"tmux new-window -n \"%title\" \"{connect_command}\"",
"xterm": f"xterm -title \"%title\" -e {connect_command}",
"none": ""
# TODO more terminals
}
def get_default_terminal():
import shutil
import shlex
for terminal in "xterm",:
command = shlex.split(terminal_map[terminal])[0]
if shutil.which(command):
return terminal
return "none"
def get_image_directory(machine=None):
"""
Get the DEPLOY_DIR_IMAGE for the specified machine
(or the configured machine if not set).
"""
try:
import bb.tinfoil
except ImportError:
logger.error("Cannot connect to BitBake, did you oe-init-build-env?")
sys.exit(1)
if machine:
os.environ["MACHINE"] = machine
with bb.tinfoil.Tinfoil() as tinfoil:
tinfoil.prepare(config_only=True)
image_dir = tinfoil.config_data.getVar("DEPLOY_DIR_IMAGE")
logger.debug(f"Got DEPLOY_DIR_IMAGE {image_dir}")
return pathlib.Path(image_dir)
def parse_args(arguments):
import argparse
parser = argparse.ArgumentParser(description="Run images in a FVP")
parser.add_argument("config", nargs="?", help="Machine name or path to .fvpconf file")
group = parser.add_mutually_exclusive_group()
group.add_argument("-t", "--terminals", choices=terminal_map.keys(), default=get_default_terminal(), help="Automatically start terminals (default: %(default)s)")
group.add_argument("-c", "--console", action="store_true", help="Attach the first uart to stdin/stdout")
parser.add_argument("--verbose", action="store_true", help="Output verbose logging")
parser.usage = f"{parser.format_usage().strip()} -- [ arguments passed to FVP ]"
# TODO option for telnet vs netcat
# If the arguments contains -- then everything after it should be passed to the FVP binary directly.
if "--" in arguments:
i = arguments.index("--")
fvp_args = arguments[i+1:]
arguments = arguments[:i]
else:
fvp_args = []
args = parser.parse_args(args=arguments)
logging.basicConfig(level=args.verbose and logging.DEBUG or logging.WARNING)
# If we're hooking up the console, don't start any terminals
if args.console:
args.terminals = "none"
logger.debug(f"Parsed arguments: {vars(args)}")
logger.debug(f"FVP arguments: {fvp_args}")
return args, fvp_args
def find_config(args):
if args.config and os.path.exists(args.config):
return args.config
else:
image_dir = get_image_directory(args.config)
# All .fvpconf configuration files
configs = image_dir.glob("*.fvpconf")
# Just the files
configs = [p for p in configs if p.is_file() and not p.is_symlink()]
if not configs:
print(f"Cannot find any .fvpconf in {image_dir}")
sys.exit(1)
# Sorted by modification time
configs = sorted(configs, key=lambda p: p.stat().st_mtime)
return configs[-1]
def load_config(config_file):
logger.debug(f"Loading {config_file}")
with open(config_file) as f:
config = json.load(f)
# Ensure that all expected keys are present
def sanitise(key, value):
if key not in config or config[key] is None:
config[key] = value
sanitise("fvp-bindir", "")
sanitise("exe", "")
sanitise("parameters", {})
sanitise("data", {})
sanitise("applications", {})
sanitise("terminals", {})
sanitise("args", [])
sanitise("console", "")
if not config["exe"]:
logger.error("Required value FVP_EXE not set in machine configuration")
sys.exit(1)
return config
def parse_config(args, config):
cli = []
if config["fvp-bindir"]:
cli.append(os.path.join(config["fvp-bindir"], config["exe"]))
else:
cli.append(config["exe"])
for param, value in config["parameters"].items():
cli.extend(["--parameter", f"{param}={value}"])
for value in config["data"]:
cli.extend(["--data", value])
for param, value in config["applications"].items():
cli.extend(["--application", f"{param}={value}"])
for terminal, name in config["terminals"].items():
# If terminals are enabled and this terminal has been named
if args.terminals != "none" and name:
# TODO if raw mode
# cli.extend(["--parameter", f"{terminal}.mode=raw"])
# TODO put name into terminal title
cli.extend(["--parameter", f"{terminal}.terminal_command={terminal_map[args.terminals]}"])
else:
# Disable terminal
cli.extend(["--parameter", f"{terminal}.start_telnet=0"])
cli.extend(config["args"])
return cli
async def start_fvp(cli, console_cb):
try:
fvp_process = await asyncio.create_subprocess_exec(*cli, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
async for line in fvp_process.stdout:
line = line.strip().decode("utf-8", errors="replace")
if console_cb:
logger.debug(f"FVP output: {line}")
else:
print(line)
# Look for serial connections opening
if console_cb:
m = re.match(fr"^(\S+): Listening for serial connection on port (\d+)$", line)
if m:
terminal = m.group(1)
port = int(m.group(2))
logger.debug(f"Console for {terminal} started on port {port}")
# When we can assume Py3.7+, this can be create_task
asyncio.ensure_future(console_cb(terminal, port))
finally:
# If we get cancelled or throw an exception, kill the FVP
logger.debug(f"Killing FVP PID {fvp_process.pid}")
fvp_process.terminate()
if await fvp_process.wait() != 0:
logger.info(f"{cli[0]} quit with code {fvp_process.returncode}")
def runfvp(cli_args):
args, fvp_args = parse_args(cli_args)
config_file = find_config(args)
config = load_config(config_file)
cli = parse_config(args, config)
cli.extend(fvp_args)
logger.debug(f"Constructed FVP call: {cli}")
if args.console:
expected_terminal = config["console"]
if not expected_terminal:
logger.error("--console used but FVP_CONSOLE not set in machine configuration")
sys.exit(1)
else:
expected_terminal = None
async def console_started(name, port):
if name == expected_terminal:
telnet = await asyncio.create_subprocess_exec("telnet", "localhost", str(port), stdin=sys.stdin, stdout=sys.stdout)
await telnet.wait()
logger.debug(f"Telnet quit, cancelling tasks")
for t in asyncio.all_tasks():
logger.debug(f"Cancelling {t}")
t.cancel()
try:
# When we can assume Py3.7+, this can simply be asyncio.run()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(start_fvp(cli, console_cb=console_started)))
except asyncio.CancelledError:
pass
if __name__ == "__main__":
try:
# Set the process group so that it's possible to kill runfvp and
# everything it spawns easily.
os.setpgid(0, 0)
runfvp(sys.argv[1:])
except KeyboardInterrupt:
pass