mirror of
https://git.yoctoproject.org/meta-arm
synced 2026-04-20 11:29:54 +00:00
scripts,arm/lib: Refactor runfvp into FVPRunner
Refactor runfvp into a "fvp" library inside meta-arm. Split into terminal, conffile and runner. Issue-Id: SCM-4957 Signed-off-by: Peter Hoyes <Peter.Hoyes@arm.com> Change-Id: I797f9a4eab810f3cc331b7db140f59c9911231fd Signed-off-by: Jon Mason <jon.mason@arm.com>
This commit is contained in:
0
meta-arm/lib/fvp/__init__.py
Normal file
0
meta-arm/lib/fvp/__init__.py
Normal file
58
meta-arm/lib/fvp/conffile.py
Normal file
58
meta-arm/lib/fvp/conffile.py
Normal file
@@ -0,0 +1,58 @@
|
||||
import json
|
||||
import pathlib
|
||||
import os
|
||||
|
||||
|
||||
def get_image_directory(machine=None):
|
||||
"""
|
||||
Get the DEPLOY_DIR_IMAGE for the specified machine
|
||||
(or the configured machine if not set).
|
||||
"""
|
||||
try:
|
||||
import bb.tinfoil
|
||||
except ImportError as e:
|
||||
raise RuntimeError("Cannot connect to BitBake, did you oe-init-build-env?") from e
|
||||
|
||||
if machine:
|
||||
os.environ["MACHINE"] = machine
|
||||
|
||||
with bb.tinfoil.Tinfoil() as tinfoil:
|
||||
tinfoil.prepare(config_only=True)
|
||||
image_dir = tinfoil.config_data.getVar("DEPLOY_DIR_IMAGE")
|
||||
return pathlib.Path(image_dir)
|
||||
|
||||
def find(machine):
|
||||
image_dir = get_image_directory(machine)
|
||||
# All .fvpconf configuration files
|
||||
configs = image_dir.glob("*.fvpconf")
|
||||
# Just the files
|
||||
configs = [p for p in configs if p.is_file() and not p.is_symlink()]
|
||||
if not configs:
|
||||
print(f"Cannot find any .fvpconf in {image_dir}")
|
||||
raise RuntimeError()
|
||||
# Sorted by modification time
|
||||
configs = sorted(configs, key=lambda p: p.stat().st_mtime)
|
||||
return configs[-1]
|
||||
|
||||
|
||||
def load(config_file):
|
||||
with open(config_file) as f:
|
||||
config = json.load(f)
|
||||
|
||||
# Ensure that all expected keys are present
|
||||
def sanitise(key, value):
|
||||
if key not in config or config[key] is None:
|
||||
config[key] = value
|
||||
sanitise("fvp-bindir", "")
|
||||
sanitise("exe", "")
|
||||
sanitise("parameters", {})
|
||||
sanitise("data", {})
|
||||
sanitise("applications", {})
|
||||
sanitise("terminals", {})
|
||||
sanitise("args", [])
|
||||
sanitise("console", "")
|
||||
|
||||
if not config["exe"]:
|
||||
raise ValueError("Required value FVP_EXE not set in machine configuration")
|
||||
|
||||
return config
|
||||
115
meta-arm/lib/fvp/runner.py
Normal file
115
meta-arm/lib/fvp/runner.py
Normal file
@@ -0,0 +1,115 @@
|
||||
import asyncio
|
||||
import re
|
||||
import subprocess
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
|
||||
from .terminal import terminals
|
||||
|
||||
|
||||
def cli_from_config(config, terminal_choice):
|
||||
cli = []
|
||||
if config["fvp-bindir"]:
|
||||
cli.append(os.path.join(config["fvp-bindir"], config["exe"]))
|
||||
else:
|
||||
cli.append(config["exe"])
|
||||
|
||||
for param, value in config["parameters"].items():
|
||||
cli.extend(["--parameter", f"{param}={value}"])
|
||||
|
||||
for value in config["data"]:
|
||||
cli.extend(["--data", value])
|
||||
|
||||
for param, value in config["applications"].items():
|
||||
cli.extend(["--application", f"{param}={value}"])
|
||||
|
||||
for terminal, name in config["terminals"].items():
|
||||
# If terminals are enabled and this terminal has been named
|
||||
if terminal_choice != "none" and name:
|
||||
# TODO if raw mode
|
||||
# cli.extend(["--parameter", f"{terminal}.mode=raw"])
|
||||
# TODO put name into terminal title
|
||||
cli.extend(["--parameter", f"{terminal}.terminal_command={terminals[terminal_choice].command}"])
|
||||
else:
|
||||
# Disable terminal
|
||||
cli.extend(["--parameter", f"{terminal}.start_telnet=0"])
|
||||
|
||||
cli.extend(config["args"])
|
||||
|
||||
return cli
|
||||
|
||||
def check_telnet():
|
||||
# Check that telnet is present
|
||||
if not bool(shutil.which("telnet")):
|
||||
raise RuntimeError("Cannot find telnet, this is needed to connect to the FVP.")
|
||||
|
||||
class FVPRunner:
|
||||
def __init__(self, logger):
|
||||
self._terminal_ports = {}
|
||||
self._line_callbacks = []
|
||||
self._logger = logger
|
||||
self._fvp_process = None
|
||||
self._telnets = []
|
||||
|
||||
def add_line_callback(self, callback):
|
||||
self._line_callbacks.append(callback)
|
||||
|
||||
async def start(self, config, extra_args=[], terminal_choice="none"):
|
||||
cli = cli_from_config(config, terminal_choice)
|
||||
cli += extra_args
|
||||
self._logger.debug(f"Constructed FVP call: {cli}")
|
||||
self._fvp_process = await asyncio.create_subprocess_exec(*cli, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
|
||||
def detect_terminals(line):
|
||||
m = re.match(r"^(\S+): Listening for serial connection on port (\d+)$", line)
|
||||
if m:
|
||||
terminal = m.group(1)
|
||||
port = int(m.group(2))
|
||||
self._terminal_ports[terminal] = port
|
||||
self.add_line_callback(detect_terminals)
|
||||
|
||||
async def stop(self):
|
||||
if self._fvp_process:
|
||||
self._logger.debug(f"Killing FVP PID {self._fvp_process.pid}")
|
||||
try:
|
||||
self._fvp_process.terminate()
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
|
||||
if await self._fvp_process.wait() != 0:
|
||||
self._logger.info(f"FVP quit with code {self._fvp_process.returncode}")
|
||||
return self._fvp_process.returncode
|
||||
else:
|
||||
return 0
|
||||
|
||||
for telnet in self._telnets:
|
||||
await telnet.terminate()
|
||||
await telnet.wait()
|
||||
|
||||
async def run(self, until=None):
|
||||
if until and until():
|
||||
return
|
||||
|
||||
async for line in self._fvp_process.stdout:
|
||||
line = line.strip().decode("utf-8", errors="replace")
|
||||
for callback in self._line_callbacks:
|
||||
callback(line)
|
||||
if until and until():
|
||||
return
|
||||
|
||||
async def _get_terminal_port(self, terminal, timeout):
|
||||
def terminal_exists():
|
||||
return terminal in self._terminal_ports
|
||||
await asyncio.wait_for(self.run(terminal_exists), timeout)
|
||||
return self._terminal_ports[terminal]
|
||||
|
||||
async def create_telnet(self, terminal, timeout=15.0):
|
||||
check_telnet()
|
||||
port = await self._get_terminal_port(terminal, timeout)
|
||||
telnet = await asyncio.create_subprocess_exec("telnet", "localhost", str(port), stdin=sys.stdin, stdout=sys.stdout)
|
||||
self._telnets.append(telnet)
|
||||
return telnet
|
||||
|
||||
def pid(self):
|
||||
return self._fvp_process.pid
|
||||
59
meta-arm/lib/fvp/terminal.py
Normal file
59
meta-arm/lib/fvp/terminal.py
Normal file
@@ -0,0 +1,59 @@
|
||||
import shutil
|
||||
import collections
|
||||
import pathlib
|
||||
import os
|
||||
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
def get_config_dir() -> pathlib.Path:
|
||||
value = os.environ.get("XDG_CONFIG_HOME")
|
||||
if value and os.path.isabs(value):
|
||||
return pathlib.Path(value)
|
||||
else:
|
||||
return pathlib.Path.home() / ".config"
|
||||
|
||||
class Terminals:
|
||||
Terminal = collections.namedtuple("Terminal", ["priority", "name", "command"])
|
||||
|
||||
def __init__(self):
|
||||
self.terminals = []
|
||||
|
||||
def add_terminal(self, priority, name, command):
|
||||
self.terminals.append(Terminals.Terminal(priority, name, command))
|
||||
# Keep this list sorted by priority
|
||||
self.terminals.sort(reverse=True, key=lambda t: t.priority)
|
||||
self.name_map = {t.name: t for t in self.terminals}
|
||||
|
||||
def configured_terminal(self) -> Optional[str]:
|
||||
import configparser
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read(get_config_dir() / "runfvp.conf")
|
||||
return config.get("RunFVP", "Terminal", fallback=None)
|
||||
|
||||
def preferred_terminal(self) -> str:
|
||||
import shlex
|
||||
|
||||
preferred = self.configured_terminal()
|
||||
if preferred:
|
||||
return preferred
|
||||
|
||||
for t in self.terminals:
|
||||
if t.command and shutil.which(shlex.split(t.command)[0]):
|
||||
return t.name
|
||||
return self.terminals[-1].name
|
||||
|
||||
def all_terminals(self) -> List[str]:
|
||||
return self.name_map.keys()
|
||||
|
||||
def __getitem__(self, name: str):
|
||||
return self.name_map[name]
|
||||
|
||||
terminals = Terminals()
|
||||
# TODO: option to switch between telnet and netcat
|
||||
connect_command = "telnet localhost %port"
|
||||
terminals.add_terminal(2, "tmux", f"tmux new-window -n \"%title\" \"{connect_command}\""),
|
||||
terminals.add_terminal(2, "gnome-terminal", f"gnome-terminal --window --title \"%title\" --command \"{connect_command}\""),
|
||||
terminals.add_terminal(1, "xterm", f"xterm -title \"%title\" -e {connect_command}"),
|
||||
terminals.add_terminal(0, "none", None)
|
||||
244
scripts/runfvp
244
scripts/runfvp
@@ -1,96 +1,23 @@
|
||||
#! /usr/bin/env python3
|
||||
|
||||
import asyncio
|
||||
import collections
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import pathlib
|
||||
import signal
|
||||
import sys
|
||||
import subprocess
|
||||
import pathlib
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger("RunFVP")
|
||||
|
||||
from typing import List, Optional
|
||||
|
||||
def get_config_dir() -> pathlib.Path:
|
||||
value = os.environ.get("XDG_CONFIG_HOME")
|
||||
if value and os.path.isabs(value):
|
||||
return pathlib.Path(value)
|
||||
else:
|
||||
return pathlib.Path.home() / ".config"
|
||||
|
||||
class Terminals:
|
||||
Terminal = collections.namedtuple("Terminal", ["priority", "name", "command"])
|
||||
|
||||
def __init__(self):
|
||||
self.terminals = []
|
||||
|
||||
def add_terminal(self, priority, name, command):
|
||||
self.terminals.append(Terminals.Terminal(priority, name, command))
|
||||
# Keep this list sorted by priority
|
||||
self.terminals.sort(reverse=True, key=lambda t: t.priority)
|
||||
self.name_map = {t.name: t for t in self.terminals}
|
||||
|
||||
def configured_terminal(self) -> Optional[str]:
|
||||
import configparser
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read(get_config_dir() / "runfvp.conf")
|
||||
return config.get("RunFVP", "Terminal", fallback=None)
|
||||
|
||||
def preferred_terminal(self) -> str:
|
||||
import shlex
|
||||
|
||||
preferred = self.configured_terminal()
|
||||
if preferred:
|
||||
return preferred
|
||||
|
||||
for t in self.terminals:
|
||||
if t.command and shutil.which(shlex.split(t.command)[0]):
|
||||
return t.name
|
||||
return self.terminals[-1].name
|
||||
|
||||
def all_terminals(self) -> List[str]:
|
||||
return self.name_map.keys()
|
||||
|
||||
def __getitem__(self, name: str):
|
||||
return self.name_map[name]
|
||||
|
||||
terminals = Terminals()
|
||||
# TODO: option to switch between telnet and netcat
|
||||
connect_command = "telnet localhost %port"
|
||||
terminals.add_terminal(2, "tmux", f"tmux new-window -n \"%title\" \"{connect_command}\""),
|
||||
terminals.add_terminal(2, "gnome-terminal", f"gnome-terminal --window --title \"%title\" --command \"{connect_command}\""),
|
||||
terminals.add_terminal(1, "xterm", f"xterm -title \"%title\" -e {connect_command}"),
|
||||
terminals.add_terminal(0, "none", None)
|
||||
|
||||
def get_image_directory(machine=None):
|
||||
"""
|
||||
Get the DEPLOY_DIR_IMAGE for the specified machine
|
||||
(or the configured machine if not set).
|
||||
"""
|
||||
try:
|
||||
import bb.tinfoil
|
||||
except ImportError:
|
||||
logger.error("Cannot connect to BitBake, did you oe-init-build-env?")
|
||||
sys.exit(1)
|
||||
|
||||
if machine:
|
||||
os.environ["MACHINE"] = machine
|
||||
|
||||
with bb.tinfoil.Tinfoil() as tinfoil:
|
||||
tinfoil.prepare(config_only=True)
|
||||
image_dir = tinfoil.config_data.getVar("DEPLOY_DIR_IMAGE")
|
||||
logger.debug(f"Got DEPLOY_DIR_IMAGE {image_dir}")
|
||||
return pathlib.Path(image_dir)
|
||||
# Add meta-arm/lib/ to path
|
||||
libdir = pathlib.Path(__file__).parents[1] / "meta-arm" / "lib"
|
||||
sys.path.insert(0, str(libdir))
|
||||
|
||||
from fvp import terminal, runner, conffile
|
||||
|
||||
def parse_args(arguments):
|
||||
import argparse
|
||||
terminals = terminal.terminals
|
||||
|
||||
parser = argparse.ArgumentParser(description="Run images in a FVP")
|
||||
parser.add_argument("config", nargs="?", help="Machine name or path to .fvpconf file")
|
||||
@@ -120,148 +47,41 @@ def parse_args(arguments):
|
||||
logger.debug(f"FVP arguments: {fvp_args}")
|
||||
return args, fvp_args
|
||||
|
||||
def find_config(args):
|
||||
if args.config and os.path.exists(args.config):
|
||||
return args.config
|
||||
else:
|
||||
image_dir = get_image_directory(args.config)
|
||||
# All .fvpconf configuration files
|
||||
configs = image_dir.glob("*.fvpconf")
|
||||
# Just the files
|
||||
configs = [p for p in configs if p.is_file() and not p.is_symlink()]
|
||||
if not configs:
|
||||
print(f"Cannot find any .fvpconf in {image_dir}")
|
||||
sys.exit(1)
|
||||
# Sorted by modification time
|
||||
configs = sorted(configs, key=lambda p: p.stat().st_mtime)
|
||||
return configs[-1]
|
||||
|
||||
|
||||
def load_config(config_file):
|
||||
logger.debug(f"Loading {config_file}")
|
||||
with open(config_file) as f:
|
||||
config = json.load(f)
|
||||
|
||||
# Ensure that all expected keys are present
|
||||
def sanitise(key, value):
|
||||
if key not in config or config[key] is None:
|
||||
config[key] = value
|
||||
sanitise("fvp-bindir", "")
|
||||
sanitise("exe", "")
|
||||
sanitise("parameters", {})
|
||||
sanitise("data", {})
|
||||
sanitise("applications", {})
|
||||
sanitise("terminals", {})
|
||||
sanitise("args", [])
|
||||
sanitise("console", "")
|
||||
|
||||
if not config["exe"]:
|
||||
logger.error("Required value FVP_EXE not set in machine configuration")
|
||||
sys.exit(1)
|
||||
|
||||
return config
|
||||
|
||||
def parse_config(args, config):
|
||||
cli = []
|
||||
if config["fvp-bindir"]:
|
||||
cli.append(os.path.join(config["fvp-bindir"], config["exe"]))
|
||||
else:
|
||||
cli.append(config["exe"])
|
||||
|
||||
for param, value in config["parameters"].items():
|
||||
cli.extend(["--parameter", f"{param}={value}"])
|
||||
|
||||
for value in config["data"]:
|
||||
cli.extend(["--data", value])
|
||||
|
||||
for param, value in config["applications"].items():
|
||||
cli.extend(["--application", f"{param}={value}"])
|
||||
|
||||
for terminal, name in config["terminals"].items():
|
||||
# If terminals are enabled and this terminal has been named
|
||||
if args.terminals != "none" and name:
|
||||
# TODO if raw mode
|
||||
# cli.extend(["--parameter", f"{terminal}.mode=raw"])
|
||||
# TODO put name into terminal title
|
||||
cli.extend(["--parameter", f"{terminal}.terminal_command={terminals[args.terminals].command}"])
|
||||
else:
|
||||
# Disable terminal
|
||||
cli.extend(["--parameter", f"{terminal}.start_telnet=0"])
|
||||
|
||||
cli.extend(config["args"])
|
||||
|
||||
return cli
|
||||
|
||||
async def start_fvp(cli, console_cb):
|
||||
async def start_fvp(args, config, extra_args):
|
||||
fvp = runner.FVPRunner(logger)
|
||||
try:
|
||||
fvp_process = await asyncio.create_subprocess_exec(*cli, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
||||
await fvp.start(config, extra_args, args.terminals)
|
||||
|
||||
async for line in fvp_process.stdout:
|
||||
line = line.strip().decode("utf-8", errors="replace")
|
||||
if console_cb:
|
||||
logger.debug(f"FVP output: {line}")
|
||||
else:
|
||||
print(line)
|
||||
|
||||
# Look for serial connections opening
|
||||
if console_cb:
|
||||
m = re.match(r"^(\S+): Listening for serial connection on port (\d+)$", line)
|
||||
if m:
|
||||
terminal = m.group(1)
|
||||
port = int(m.group(2))
|
||||
logger.debug(f"Console for {terminal} started on port {port}")
|
||||
# When we can assume Py3.7+, this can be create_task
|
||||
asyncio.ensure_future(console_cb(terminal, port))
|
||||
finally:
|
||||
# If we get cancelled or throw an exception, kill the FVP
|
||||
logger.debug(f"Killing FVP PID {fvp_process.pid}")
|
||||
try:
|
||||
fvp_process.terminate()
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
|
||||
if await fvp_process.wait() != 0:
|
||||
logger.info(f"{cli[0]} quit with code {fvp_process.returncode}")
|
||||
return fvp_process.returncode
|
||||
else:
|
||||
return 0
|
||||
|
||||
def runfvp(cli_args):
|
||||
args, fvp_args = parse_args(cli_args)
|
||||
config_file = find_config(args)
|
||||
config = load_config(config_file)
|
||||
cli = parse_config(args, config)
|
||||
cli.extend(fvp_args)
|
||||
logger.debug(f"Constructed FVP call: {cli}")
|
||||
|
||||
# Check that telnet is present
|
||||
if not bool(shutil.which("telnet")):
|
||||
logger.error("Cannot find telnet, this is needed to connect to the FVP.")
|
||||
return 1
|
||||
|
||||
if args.console:
|
||||
expected_terminal = config["console"]
|
||||
if not expected_terminal:
|
||||
logger.error("--console used but FVP_CONSOLE not set in machine configuration")
|
||||
return 1
|
||||
else:
|
||||
expected_terminal = None
|
||||
|
||||
async def console_started(name, port):
|
||||
if name == expected_terminal:
|
||||
telnet = await asyncio.create_subprocess_exec("telnet", "localhost", str(port), stdin=sys.stdin, stdout=sys.stdout)
|
||||
if args.console:
|
||||
fvp.add_line_callback(lambda line: logger.debug(f"FVP output: {line}"))
|
||||
expected_terminal = config["console"]
|
||||
if not expected_terminal:
|
||||
logger.error("--console used but FVP_CONSOLE not set in machine configuration")
|
||||
return 1
|
||||
telnet = await fvp.create_telnet(expected_terminal)
|
||||
await telnet.wait()
|
||||
logger.debug(f"Telnet quit, cancelling tasks")
|
||||
# TODO: this is 3.7+
|
||||
for t in asyncio.all_tasks():
|
||||
logger.debug(f"Cancelling {t}")
|
||||
t.cancel()
|
||||
else:
|
||||
fvp.add_line_callback(lambda line: print(line))
|
||||
|
||||
await fvp.run()
|
||||
finally:
|
||||
await fvp.stop()
|
||||
|
||||
def runfvp(cli_args):
|
||||
args, extra_args = parse_args(cli_args)
|
||||
if args.config and pathlib.Path(args.config).exists():
|
||||
config_file = args.config
|
||||
else:
|
||||
config_file = conffile.find(args.config)
|
||||
logger.debug(f"Loading {config_file}")
|
||||
config = conffile.load(config_file)
|
||||
|
||||
try:
|
||||
# When we can assume Py3.7+, this can simply be asyncio.run()
|
||||
loop = asyncio.get_event_loop()
|
||||
console_cb = expected_terminal and console_started or None
|
||||
return loop.run_until_complete(start_fvp(cli, console_cb=console_cb))
|
||||
return loop.run_until_complete(start_fvp(args, config, extra_args))
|
||||
except asyncio.CancelledError:
|
||||
# This means telnet exited, which isn't an error
|
||||
return 0
|
||||
|
||||
Reference in New Issue
Block a user