mirror of
https://git.yoctoproject.org/poky
synced 2026-06-01 00:59:48 +00:00
oeqa: Add selftest parallelisation support
This allows oe-selftest to take a -j option which specifies how much test parallelisation to use. Currently this is "module" based with each module being split and run in a separate build directory. Further splitting could be done but this seems a good compromise between test setup and parallelism. You need python-testtools and python-subunit installed to use this but only when the -j option is specified. See notes posted to the openedmbedded-architecture list for more details about the design choices here. Some of this functionality may make more sense in the oeqa core ultimately. (From OE-Core rev: 326ababfd620ae5ea29bf486b9d68ba3d60cad30) Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
@@ -58,14 +58,20 @@ class OETestContext(object):
|
|||||||
modules_required, filters)
|
modules_required, filters)
|
||||||
self.suites = self.loader.discover()
|
self.suites = self.loader.discover()
|
||||||
|
|
||||||
def runTests(self, skips=[]):
|
def runTests(self, processes=None, skips=[]):
|
||||||
self.runner = self.runnerClass(self, descriptions=False, verbosity=2, buffer=True)
|
self.runner = self.runnerClass(self, descriptions=False, verbosity=2, buffer=True)
|
||||||
|
|
||||||
# Dinamically skip those tests specified though arguments
|
# Dinamically skip those tests specified though arguments
|
||||||
self.skipTests(skips)
|
self.skipTests(skips)
|
||||||
|
|
||||||
self._run_start_time = time.time()
|
self._run_start_time = time.time()
|
||||||
result = self.runner.run(self.suites)
|
if processes:
|
||||||
|
from oeqa.core.utils.concurrencytest import ConcurrentTestSuite
|
||||||
|
|
||||||
|
concurrent_suite = ConcurrentTestSuite(self.suites, processes)
|
||||||
|
result = self.runner.run(concurrent_suite)
|
||||||
|
else:
|
||||||
|
result = self.runner.run(self.suites)
|
||||||
self._run_end_time = time.time()
|
self._run_end_time = time.time()
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|||||||
@@ -43,11 +43,17 @@ class OETestResult(_TestResult):
|
|||||||
super(OETestResult, self).__init__(*args, **kwargs)
|
super(OETestResult, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
self.successes = []
|
self.successes = []
|
||||||
|
self.starttime = {}
|
||||||
|
self.endtime = {}
|
||||||
|
self.progressinfo = {}
|
||||||
|
|
||||||
self.tc = tc
|
self.tc = tc
|
||||||
self._tc_map_results()
|
self._tc_map_results()
|
||||||
|
|
||||||
def startTest(self, test):
|
def startTest(self, test):
|
||||||
|
# May have been set by concurrencytest
|
||||||
|
if test.id() not in self.starttime:
|
||||||
|
self.starttime[test.id()] = time.time()
|
||||||
super(OETestResult, self).startTest(test)
|
super(OETestResult, self).startTest(test)
|
||||||
|
|
||||||
def _tc_map_results(self):
|
def _tc_map_results(self):
|
||||||
@@ -57,6 +63,12 @@ class OETestResult(_TestResult):
|
|||||||
self.tc._results['expectedFailures'] = self.expectedFailures
|
self.tc._results['expectedFailures'] = self.expectedFailures
|
||||||
self.tc._results['successes'] = self.successes
|
self.tc._results['successes'] = self.successes
|
||||||
|
|
||||||
|
def stopTest(self, test):
|
||||||
|
self.endtime[test.id()] = time.time()
|
||||||
|
super(OETestResult, self).stopTest(test)
|
||||||
|
if test.id() in self.progressinfo:
|
||||||
|
print(self.progressinfo[test.id()])
|
||||||
|
|
||||||
def logSummary(self, component, context_msg=''):
|
def logSummary(self, component, context_msg=''):
|
||||||
elapsed_time = self.tc._run_end_time - self.tc._run_start_time
|
elapsed_time = self.tc._run_end_time - self.tc._run_start_time
|
||||||
self.tc.logger.info("SUMMARY:")
|
self.tc.logger.info("SUMMARY:")
|
||||||
@@ -141,12 +153,16 @@ class OETestResult(_TestResult):
|
|||||||
if hasattr(d, 'oeid'):
|
if hasattr(d, 'oeid'):
|
||||||
oeid = d.oeid
|
oeid = d.oeid
|
||||||
|
|
||||||
|
t = ""
|
||||||
|
if case.id() in self.starttime and case.id() in self.endtime:
|
||||||
|
t = " (" + "{0:.2f}".format(self.endtime[case.id()] - self.starttime[case.id()]) + "s)"
|
||||||
|
|
||||||
if fail:
|
if fail:
|
||||||
self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(),
|
self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(),
|
||||||
oeid, desc))
|
oeid, desc, t))
|
||||||
else:
|
else:
|
||||||
self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(),
|
self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(),
|
||||||
oeid, 'UNKNOWN'))
|
oeid, 'UNKNOWN', t))
|
||||||
|
|
||||||
class OEListTestsResult(object):
|
class OEListTestsResult(object):
|
||||||
def wasSuccessful(self):
|
def wasSuccessful(self):
|
||||||
|
|||||||
@@ -0,0 +1,254 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
#
|
||||||
|
# Modified for use in OE by Richard Purdie, 2018
|
||||||
|
#
|
||||||
|
# Modified by: Corey Goldberg, 2013
|
||||||
|
# License: GPLv2+
|
||||||
|
#
|
||||||
|
# Original code from:
|
||||||
|
# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
|
||||||
|
# Copyright (C) 2005-2011 Canonical Ltd
|
||||||
|
# License: GPLv2+
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import traceback
|
||||||
|
import unittest
|
||||||
|
import subprocess
|
||||||
|
import testtools
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import io
|
||||||
|
|
||||||
|
from queue import Queue
|
||||||
|
from itertools import cycle
|
||||||
|
from subunit import ProtocolTestCase, TestProtocolClient
|
||||||
|
from subunit.test_results import AutoTimingTestResultDecorator
|
||||||
|
from testtools import ThreadsafeForwardingResult, iterate_tests
|
||||||
|
|
||||||
|
import bb.utils
|
||||||
|
import oe.path
|
||||||
|
|
||||||
|
_all__ = [
|
||||||
|
'ConcurrentTestSuite',
|
||||||
|
'fork_for_tests',
|
||||||
|
'partition_tests',
|
||||||
|
]
|
||||||
|
|
||||||
|
#
|
||||||
|
# Patch the version from testtools to allow access to _test_start and allow
|
||||||
|
# computation of timing information and threading progress
|
||||||
|
#
|
||||||
|
class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
|
||||||
|
|
||||||
|
def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests):
|
||||||
|
super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
|
||||||
|
self.threadnum = threadnum
|
||||||
|
self.totalinprocess = totalinprocess
|
||||||
|
self.totaltests = totaltests
|
||||||
|
|
||||||
|
def _add_result_with_semaphore(self, method, test, *args, **kwargs):
|
||||||
|
self.semaphore.acquire()
|
||||||
|
try:
|
||||||
|
self.result.starttime[test.id()] = self._test_start.timestamp()
|
||||||
|
self.result.threadprogress[self.threadnum].append(test.id())
|
||||||
|
totalprogress = sum(len(x) for x in self.result.threadprogress.values())
|
||||||
|
self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s)" % (
|
||||||
|
self.threadnum,
|
||||||
|
len(self.result.threadprogress[self.threadnum]),
|
||||||
|
self.totalinprocess,
|
||||||
|
totalprogress,
|
||||||
|
self.totaltests,
|
||||||
|
"{0:.2f}".format(time.time()-self._test_start.timestamp()),
|
||||||
|
test.id())
|
||||||
|
finally:
|
||||||
|
self.semaphore.release()
|
||||||
|
super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs)
|
||||||
|
|
||||||
|
#
|
||||||
|
# A dummy structure to add to io.StringIO so that the .buffer object
|
||||||
|
# is available and accepts writes. This allows unittest with buffer=True
|
||||||
|
# to interact ok with subunit which wants to access sys.stdout.buffer.
|
||||||
|
#
|
||||||
|
class dummybuf(object):
|
||||||
|
def __init__(self, parent):
|
||||||
|
self.p = parent
|
||||||
|
def write(self, data):
|
||||||
|
self.p.write(data.decode("utf-8"))
|
||||||
|
|
||||||
|
#
|
||||||
|
# Taken from testtools.ConncurrencyTestSuite but modified for OE use
|
||||||
|
#
|
||||||
|
class ConcurrentTestSuite(unittest.TestSuite):
|
||||||
|
|
||||||
|
def __init__(self, suite, processes):
|
||||||
|
super(ConcurrentTestSuite, self).__init__([suite])
|
||||||
|
self.processes = processes
|
||||||
|
|
||||||
|
def run(self, result):
|
||||||
|
tests, totaltests = fork_for_tests(self.processes, self)
|
||||||
|
try:
|
||||||
|
threads = {}
|
||||||
|
queue = Queue()
|
||||||
|
semaphore = threading.Semaphore(1)
|
||||||
|
result.threadprogress = {}
|
||||||
|
for i, (test, testnum) in enumerate(tests):
|
||||||
|
result.threadprogress[i] = []
|
||||||
|
process_result = BBThreadsafeForwardingResult(result, semaphore, i, testnum, totaltests)
|
||||||
|
# Force buffering of stdout/stderr so the console doesn't get corrupted by test output
|
||||||
|
# as per default in parent code
|
||||||
|
process_result.buffer = True
|
||||||
|
# We have to add a buffer object to stdout to keep subunit happy
|
||||||
|
process_result._stderr_buffer = io.StringIO()
|
||||||
|
process_result._stderr_buffer.buffer = dummybuf(process_result._stderr_buffer)
|
||||||
|
process_result._stdout_buffer = io.StringIO()
|
||||||
|
process_result._stdout_buffer.buffer = dummybuf(process_result._stdout_buffer)
|
||||||
|
reader_thread = threading.Thread(
|
||||||
|
target=self._run_test, args=(test, process_result, queue))
|
||||||
|
threads[test] = reader_thread, process_result
|
||||||
|
reader_thread.start()
|
||||||
|
while threads:
|
||||||
|
finished_test = queue.get()
|
||||||
|
threads[finished_test][0].join()
|
||||||
|
del threads[finished_test]
|
||||||
|
except:
|
||||||
|
for thread, process_result in threads.values():
|
||||||
|
process_result.stop()
|
||||||
|
raise
|
||||||
|
|
||||||
|
def _run_test(self, test, process_result, queue):
|
||||||
|
try:
|
||||||
|
try:
|
||||||
|
test.run(process_result)
|
||||||
|
except Exception:
|
||||||
|
# The run logic itself failed
|
||||||
|
case = testtools.ErrorHolder(
|
||||||
|
"broken-runner",
|
||||||
|
error=sys.exc_info())
|
||||||
|
case.run(process_result)
|
||||||
|
finally:
|
||||||
|
queue.put(test)
|
||||||
|
|
||||||
|
def removebuilddir(d):
|
||||||
|
delay = 5
|
||||||
|
while delay and os.path.exists(d + "/bitbake.lock"):
|
||||||
|
time.sleep(1)
|
||||||
|
delay = delay - 1
|
||||||
|
bb.utils.prunedir(d)
|
||||||
|
|
||||||
|
def fork_for_tests(concurrency_num, suite):
|
||||||
|
result = []
|
||||||
|
test_blocks = partition_tests(suite, concurrency_num)
|
||||||
|
# Clear the tests from the original suite so it doesn't keep them alive
|
||||||
|
suite._tests[:] = []
|
||||||
|
totaltests = sum(len(x) for x in test_blocks)
|
||||||
|
for process_tests in test_blocks:
|
||||||
|
numtests = len(process_tests)
|
||||||
|
process_suite = unittest.TestSuite(process_tests)
|
||||||
|
# Also clear each split list so new suite has only reference
|
||||||
|
process_tests[:] = []
|
||||||
|
c2pread, c2pwrite = os.pipe()
|
||||||
|
# Clear buffers before fork to avoid duplicate output
|
||||||
|
sys.stdout.flush()
|
||||||
|
sys.stderr.flush()
|
||||||
|
pid = os.fork()
|
||||||
|
if pid == 0:
|
||||||
|
ourpid = os.getpid()
|
||||||
|
try:
|
||||||
|
newbuilddir = None
|
||||||
|
stream = os.fdopen(c2pwrite, 'wb', 1)
|
||||||
|
os.close(c2pread)
|
||||||
|
|
||||||
|
# Create a new separate BUILDDIR for each group of tests
|
||||||
|
if 'BUILDDIR' in os.environ:
|
||||||
|
builddir = os.environ['BUILDDIR']
|
||||||
|
newbuilddir = builddir + "-st-" + str(ourpid)
|
||||||
|
selftestdir = os.path.abspath(builddir + "/../meta-selftest")
|
||||||
|
newselftestdir = newbuilddir + "/meta-selftest"
|
||||||
|
|
||||||
|
bb.utils.mkdirhier(newbuilddir)
|
||||||
|
oe.path.copytree(builddir + "/conf", newbuilddir + "/conf")
|
||||||
|
oe.path.copytree(builddir + "/cache", newbuilddir + "/cache")
|
||||||
|
oe.path.copytree(selftestdir, newselftestdir)
|
||||||
|
|
||||||
|
for e in os.environ:
|
||||||
|
if builddir in os.environ[e]:
|
||||||
|
os.environ[e] = os.environ[e].replace(builddir, newbuilddir)
|
||||||
|
|
||||||
|
subprocess.check_output("git init; git add *; git commit -a -m 'initial'", cwd=newselftestdir, shell=True)
|
||||||
|
|
||||||
|
# Tried to used bitbake-layers add/remove but it requires recipe parsing and hence is too slow
|
||||||
|
subprocess.check_output("sed %s/conf/bblayers.conf -i -e 's#%s#%s#g'" % (newbuilddir, selftestdir, newselftestdir), cwd=newbuilddir, shell=True)
|
||||||
|
|
||||||
|
os.chdir(newbuilddir)
|
||||||
|
|
||||||
|
for t in process_suite:
|
||||||
|
if not hasattr(t, "tc"):
|
||||||
|
continue
|
||||||
|
cp = t.tc.config_paths
|
||||||
|
for p in cp:
|
||||||
|
if selftestdir in cp[p] and newselftestdir not in cp[p]:
|
||||||
|
cp[p] = cp[p].replace(selftestdir, newselftestdir)
|
||||||
|
if builddir in cp[p] and newbuilddir not in cp[p]:
|
||||||
|
cp[p] = cp[p].replace(builddir, newbuilddir)
|
||||||
|
|
||||||
|
# Leave stderr and stdout open so we can see test noise
|
||||||
|
# Close stdin so that the child goes away if it decides to
|
||||||
|
# read from stdin (otherwise its a roulette to see what
|
||||||
|
# child actually gets keystrokes for pdb etc).
|
||||||
|
newsi = os.open(os.devnull, os.O_RDWR)
|
||||||
|
os.dup2(newsi, sys.stdin.fileno())
|
||||||
|
|
||||||
|
subunit_client = TestProtocolClient(stream)
|
||||||
|
# Force buffering of stdout/stderr so the console doesn't get corrupted by test output
|
||||||
|
# as per default in parent code
|
||||||
|
subunit_client.buffer = True
|
||||||
|
subunit_result = AutoTimingTestResultDecorator(subunit_client)
|
||||||
|
process_suite.run(subunit_result)
|
||||||
|
if ourpid != os.getpid():
|
||||||
|
os._exit(0)
|
||||||
|
if newbuilddir:
|
||||||
|
removebuilddir(newbuilddir)
|
||||||
|
except:
|
||||||
|
# Don't do anything with process children
|
||||||
|
if ourpid != os.getpid():
|
||||||
|
os._exit(1)
|
||||||
|
# Try and report traceback on stream, but exit with error
|
||||||
|
# even if stream couldn't be created or something else
|
||||||
|
# goes wrong. The traceback is formatted to a string and
|
||||||
|
# written in one go to avoid interleaving lines from
|
||||||
|
# multiple failing children.
|
||||||
|
try:
|
||||||
|
stream.write(traceback.format_exc().encode('utf-8'))
|
||||||
|
except:
|
||||||
|
sys.stderr.write(traceback.format_exc())
|
||||||
|
finally:
|
||||||
|
if newbuilddir:
|
||||||
|
removebuilddir(newbuilddir)
|
||||||
|
os._exit(1)
|
||||||
|
os._exit(0)
|
||||||
|
else:
|
||||||
|
os.close(c2pwrite)
|
||||||
|
stream = os.fdopen(c2pread, 'rb', 1)
|
||||||
|
test = ProtocolTestCase(stream)
|
||||||
|
result.append((test, numtests))
|
||||||
|
return result, totaltests
|
||||||
|
|
||||||
|
def partition_tests(suite, count):
|
||||||
|
# Keep tests from the same class together but allow tests from modules
|
||||||
|
# to go to different processes to aid parallelisation.
|
||||||
|
modules = {}
|
||||||
|
for test in iterate_tests(suite):
|
||||||
|
m = test.__module__ + "." + test.__class__.__name__
|
||||||
|
if m not in modules:
|
||||||
|
modules[m] = []
|
||||||
|
modules[m].append(test)
|
||||||
|
|
||||||
|
# Simply divide the test blocks between the available processes
|
||||||
|
partitions = [list() for _ in range(count)]
|
||||||
|
for partition, m in zip(cycle(partitions), modules):
|
||||||
|
partition.extend(modules[m])
|
||||||
|
|
||||||
|
# No point in empty threads so drop them
|
||||||
|
return [p for p in partitions if p]
|
||||||
|
|
||||||
@@ -25,14 +25,14 @@ class OESelftestTestContext(OETestContext):
|
|||||||
self.custommachine = None
|
self.custommachine = None
|
||||||
self.config_paths = config_paths
|
self.config_paths = config_paths
|
||||||
|
|
||||||
def runTests(self, machine=None, skips=[]):
|
def runTests(self, processes=None, machine=None, skips=[]):
|
||||||
if machine:
|
if machine:
|
||||||
self.custommachine = machine
|
self.custommachine = machine
|
||||||
if machine == 'random':
|
if machine == 'random':
|
||||||
self.custommachine = choice(self.machines)
|
self.custommachine = choice(self.machines)
|
||||||
self.logger.info('Run tests with custom MACHINE set to: %s' % \
|
self.logger.info('Run tests with custom MACHINE set to: %s' % \
|
||||||
self.custommachine)
|
self.custommachine)
|
||||||
return super(OESelftestTestContext, self).runTests(skips)
|
return super(OESelftestTestContext, self).runTests(processes, skips)
|
||||||
|
|
||||||
def listTests(self, display_type, machine=None):
|
def listTests(self, display_type, machine=None):
|
||||||
return super(OESelftestTestContext, self).listTests(display_type)
|
return super(OESelftestTestContext, self).listTests(display_type)
|
||||||
@@ -68,6 +68,9 @@ class OESelftestTestContextExecutor(OETestContextExecutor):
|
|||||||
action="store_true", default=False,
|
action="store_true", default=False,
|
||||||
help='List all available tests.')
|
help='List all available tests.')
|
||||||
|
|
||||||
|
parser.add_argument('-j', '--num-processes', dest='processes', action='store',
|
||||||
|
type=int, help="number of processes to execute in parallel with")
|
||||||
|
|
||||||
parser.add_argument('--machine', required=False, choices=['random', 'all'],
|
parser.add_argument('--machine', required=False, choices=['random', 'all'],
|
||||||
help='Run tests on different machines (random/all).')
|
help='Run tests on different machines (random/all).')
|
||||||
|
|
||||||
@@ -137,6 +140,7 @@ class OESelftestTestContextExecutor(OETestContextExecutor):
|
|||||||
self.tc_kwargs['init']['config_paths']['bblayers_backup'])
|
self.tc_kwargs['init']['config_paths']['bblayers_backup'])
|
||||||
|
|
||||||
self.tc_kwargs['run']['skips'] = args.skips
|
self.tc_kwargs['run']['skips'] = args.skips
|
||||||
|
self.tc_kwargs['run']['processes'] = args.processes
|
||||||
|
|
||||||
def _pre_run(self):
|
def _pre_run(self):
|
||||||
def _check_required_env_variables(vars):
|
def _check_required_env_variables(vars):
|
||||||
|
|||||||
Reference in New Issue
Block a user