1
0
mirror of https://git.yoctoproject.org/poky synced 2026-05-07 16:59:22 +00:00

oeqa/core/threaded: Add support of OETestRunnerThreaded

The OETestRunnerThreaded overrides the run method of OETestRunner
it recieves a list of suites to be executed by a ThreadPool.

The new run method handles the ThreadPool creation and the
OETestResultThreaded fill.

[YOCTO #11450]

(From OE-Core rev: 48b7a407d692e6c49c41b16f2bd11e8c3f47a421)

Signed-off-by: Aníbal Limón <anibal.limon@linux.intel.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Aníbal Limón
2017-05-26 15:37:37 -05:00
committed by Richard Purdie
parent 44285351f5
commit 6632277986
+74 -1
View File
@@ -3,11 +3,13 @@
import threading
import multiprocessing
import queue
import time
from unittest.suite import TestSuite
from oeqa.core.loader import OETestLoader
from oeqa.core.runner import OEStreamLogger, OETestResult
from oeqa.core.runner import OEStreamLogger, OETestResult, OETestRunner
class OETestLoaderThreaded(OETestLoader):
def __init__(self, tc, module_paths, modules, tests, modules_required,
@@ -185,3 +187,74 @@ class OETestResultThreaded(object):
tid = list(self._results)[0]
result = self._results[tid]['result']
result.logDetails()
class _Worker(threading.Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks, result, stream):
threading.Thread.__init__(self)
self.tasks = tasks
self.result = result
self.stream = stream
def run(self):
while True:
try:
func, args, kargs = self.tasks.get(block=False)
except queue.Empty:
break
try:
run_start_time = time.time()
rc = func(*args, **kargs)
run_end_time = time.time()
self.result.addResult(rc, run_start_time, run_end_time)
self.stream.finish()
except Exception as e:
print(e)
finally:
self.tasks.task_done()
class _ThreadedPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_workers, num_tasks, stream=None, result=None):
self.tasks = queue.Queue(num_tasks)
self.workers = []
for _ in range(num_workers):
worker = _Worker(self.tasks, result, stream)
self.workers.append(worker)
def start(self):
for worker in self.workers:
worker.start()
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
for worker in self.workers:
worker.join()
class OETestRunnerThreaded(OETestRunner):
streamLoggerClass = OEStreamLoggerThreaded
def __init__(self, tc, *args, **kwargs):
super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs)
self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__
def run(self, suites):
result = OETestResultThreaded(self.tc)
pool = _ThreadedPool(len(suites), len(suites), stream=self.stream,
result=result)
for s in suites:
pool.add_task(super(OETestRunnerThreaded, self).run, s)
pool.start()
pool.wait_completion()
result._fill_tc_results()
return result