1
0
mirror of https://git.yoctoproject.org/poky synced 2026-05-31 12:49:46 +00:00

bitbake: Switch to use subprocess for forking tasks and FAKEROOTENV to run shell and python under a fakeroot environment

Signed-off-by: Richard Purdie <rpurdie@linux.intel.com>
This commit is contained in:
Richard Purdie
2010-08-16 16:37:29 +01:00
parent a45e1d54e1
commit b6bfe14205
7 changed files with 198 additions and 109 deletions
+77
View File
@@ -0,0 +1,77 @@
#!/usr/bin/env python
import os
import sys
import warnings
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
class BBConfiguration(object):
"""
Manages build options and configurations for one run
"""
def __init__(self):
setattr(self, "data", {})
setattr(self, "file", [])
setattr(self, "cmd", None)
_warnings_showwarning = warnings.showwarning
def _showwarning(message, category, filename, lineno, file=None, line=None):
"""Display python warning messages using bb.msg"""
if file is not None:
if _warnings_showwarning is not None:
_warnings_showwarning(message, category, filename, lineno, file, line)
else:
s = warnings.formatwarning(message, category, filename, lineno)
s = s.split("\n")[0]
bb.msg.warn(None, s)
warnings.showwarning = _showwarning
warnings.simplefilter("ignore", DeprecationWarning)
import bb.event
# Need to map our I/O correctly. Currently stdout is a pipe to
# the server expecting events. We save this and map stdout to stderr.
eventfd = os.dup(sys.stdout.fileno())
bb.event.worker_pipe = os.fdopen(eventfd, 'w', 0)
# Replace those fds with our own
os.dup2(sys.stderr.fileno(), sys.stdout.fileno())
# Save out the PID so that the event can include it the
# events
bb.event.worker_pid = os.getpid()
bb.event.usestdout = False
import bb.cooker
cooker = bb.cooker.BBCooker(BBConfiguration(), None)
buildfile = sys.argv[1]
taskname = sys.argv[2]
cooker.parseConfiguration()
cooker.bb_cache = bb.cache.init(cooker)
cooker.status = bb.cache.CacheData()
(fn, cls) = cooker.bb_cache.virtualfn2realfn(buildfile)
buildfile = cooker.matchFile(fn)
fn = cooker.bb_cache.realfn2virtual(buildfile, cls)
cooker.buildSetVars()
# Load data into the cache for fn and parse the loaded cache data
the_data = cooker.bb_cache.loadDataFull(fn, cooker.get_file_appends(fn), cooker.configuration.data)
cooker.bb_cache.setData(fn, buildfile, the_data)
cooker.bb_cache.handle_data(fn, cooker.status)
if taskname.endswith("_setscene"):
the_data.setVarFlag(taskname, "quieterrors", "1")
ret = 0
if sys.argv[3] != "True":
ret = bb.build.exec_task(taskname, the_data)
sys.exit(ret)
+40 -22
View File
@@ -44,12 +44,6 @@ class FuncFailed(Exception):
Second paramter is a logfile (optional) Second paramter is a logfile (optional)
""" """
class EventException(Exception):
"""Exception which is associated with an Event."""
def __init__(self, msg, event):
self.args = msg, event
class TaskBase(event.Event): class TaskBase(event.Event):
"""Base class for task events""" """Base class for task events"""
@@ -80,7 +74,7 @@ class TaskFailed(TaskBase):
self.msg = msg self.msg = msg
TaskBase.__init__(self, t, d) TaskBase.__init__(self, t, d)
class InvalidTask(TaskBase): class TaskInvalid(TaskBase):
"""Invalid Task""" """Invalid Task"""
# functions # functions
@@ -94,7 +88,7 @@ def exec_func(func, d, dirs = None):
return return
flags = data.getVarFlags(func, d) flags = data.getVarFlags(func, d)
for item in ['deps', 'check', 'interactive', 'python', 'cleandirs', 'dirs', 'lockfiles', 'fakeroot']: for item in ['deps', 'check', 'interactive', 'python', 'cleandirs', 'dirs', 'lockfiles', 'fakeroot', 'task']:
if not item in flags: if not item in flags:
flags[item] = None flags[item] = None
@@ -138,7 +132,7 @@ def exec_func(func, d, dirs = None):
# Handle logfiles # Handle logfiles
si = file('/dev/null', 'r') si = file('/dev/null', 'r')
try: try:
if bb.msg.debug_level['default'] > 0 or ispython: if bb.msg.debug_level['default'] > 0 and not ispython:
so = os.popen("tee \"%s\"" % logfile, "w") so = os.popen("tee \"%s\"" % logfile, "w")
else: else:
so = file(logfile, 'w') so = file(logfile, 'w')
@@ -158,6 +152,8 @@ def exec_func(func, d, dirs = None):
os.dup2(so.fileno(), oso[1]) os.dup2(so.fileno(), oso[1])
os.dup2(se.fileno(), ose[1]) os.dup2(se.fileno(), ose[1])
bb.event.useStdout = True
locks = [] locks = []
lockfiles = flags['lockfiles'] lockfiles = flags['lockfiles']
if lockfiles: if lockfiles:
@@ -183,6 +179,8 @@ def exec_func(func, d, dirs = None):
for lock in locks: for lock in locks:
bb.utils.unlockfile(lock) bb.utils.unlockfile(lock)
bb.event.useStdout = False
# Restore the backup fds # Restore the backup fds
os.dup2(osi[0], osi[1]) os.dup2(osi[0], osi[1])
os.dup2(oso[0], oso[1]) os.dup2(oso[0], oso[1])
@@ -221,6 +219,7 @@ def exec_func_python(func, d, runfile, logfile):
raise raise
raise FuncFailed("Function %s failed" % func, logfile) raise FuncFailed("Function %s failed" % func, logfile)
def exec_func_shell(func, d, runfile, logfile, flags): def exec_func_shell(func, d, runfile, logfile, flags):
"""Execute a shell BB 'function' Returns true if execution was successful. """Execute a shell BB 'function' Returns true if execution was successful.
@@ -251,12 +250,11 @@ def exec_func_shell(func, d, runfile, logfile, flags):
raise FuncFailed("Function not specified for exec_func_shell") raise FuncFailed("Function not specified for exec_func_shell")
# execute function # execute function
if flags['fakeroot']: if flags['fakeroot'] and not flags['task']:
maybe_fakeroot = "PATH=\"%s\" %s " % (bb.data.getVar("PATH", d, 1), bb.data.getVar("FAKEROOT", d, 1) or "fakeroot") bb.fatal("Function %s specifies fakeroot but isn't a task?!" % func)
else:
maybe_fakeroot = ''
lang_environment = "LC_ALL=C " lang_environment = "LC_ALL=C "
ret = os.system('%s%ssh -e %s' % (lang_environment, maybe_fakeroot, runfile)) ret = os.system('%ssh -e %s' % (lang_environment, runfile))
if ret == 0: if ret == 0:
return return
@@ -273,7 +271,13 @@ def exec_task(task, d):
# Check whther this is a valid task # Check whther this is a valid task
if not data.getVarFlag(task, 'task', d): if not data.getVarFlag(task, 'task', d):
raise EventException("No such task", InvalidTask(task, d)) event.fire(TaskInvalid(task, d), d)
bb.msg.error(bb.msg.domain.Build, "No such task: %s" % task)
return 1
quieterr = False
if d.getVarFlag(task, "quieterrors") is not None:
quieterr = True
try: try:
bb.msg.debug(1, bb.msg.domain.Build, "Executing task %s" % task) bb.msg.debug(1, bb.msg.domain.Build, "Executing task %s" % task)
@@ -292,6 +296,11 @@ def exec_task(task, d):
for func in postfuncs: for func in postfuncs:
exec_func(func, localdata) exec_func(func, localdata)
event.fire(TaskSucceeded(task, localdata), localdata) event.fire(TaskSucceeded(task, localdata), localdata)
# make stamp, or cause event and raise exception
if not data.getVarFlag(task, 'nostamp', d) and not data.getVarFlag(task, 'selfstamp', d):
make_stamp(task, d)
except FuncFailed as message: except FuncFailed as message:
# Try to extract the optional logfile # Try to extract the optional logfile
try: try:
@@ -299,14 +308,22 @@ def exec_task(task, d):
except: except:
logfile = None logfile = None
msg = message msg = message
bb.msg.note(1, bb.msg.domain.Build, "Task failed: %s" % message ) if not quieterr:
failedevent = TaskFailed(msg, logfile, task, d) bb.msg.error(bb.msg.domain.Build, "Task failed: %s" % message )
event.fire(failedevent, d) failedevent = TaskFailed(msg, logfile, task, d)
raise EventException("Function failed in task: %s" % message, failedevent) event.fire(failedevent, d)
return 1
# make stamp, or cause event and raise exception except Exception:
if not data.getVarFlag(task, 'nostamp', d) and not data.getVarFlag(task, 'selfstamp', d): from traceback import format_exc
make_stamp(task, d) if not quieterr:
bb.msg.error(bb.msg.domain.Build, "Build of %s failed" % (task))
bb.msg.error(bb.msg.domain.Build, format_exc())
failedevent = TaskFailed("Task Failed", None, task, d)
event.fire(failedevent, d)
return 1
return 0
def extract_stamp(d, fn): def extract_stamp(d, fn):
""" """
@@ -380,6 +397,7 @@ def add_tasks(tasklist, d):
getTask('rdeptask') getTask('rdeptask')
getTask('recrdeptask') getTask('recrdeptask')
getTask('nostamp') getTask('nostamp')
getTask('fakeroot')
task_deps['parents'][task] = [] task_deps['parents'][task] = []
for dep in flags['deps']: for dep in flags['deps']:
dep = data.expand(dep, d) dep = data.expand(dep, d)
+5 -2
View File
@@ -70,12 +70,16 @@ class BBCooker:
self.cache = None self.cache = None
self.bb_cache = None self.bb_cache = None
self.server = server.BitBakeServer(self) if server:
self.server = server.BitBakeServer(self)
self.configuration = configuration self.configuration = configuration
self.configuration.data = bb.data.init() self.configuration.data = bb.data.init()
if not server:
bb.data.setVar("BB_WORKERCONTEXT", "1", self.configuration.data)
bb.data.inheritFromOS(self.configuration.data) bb.data.inheritFromOS(self.configuration.data)
self.parseConfigurationFiles(self.configuration.file) self.parseConfigurationFiles(self.configuration.file)
@@ -544,7 +548,6 @@ class BBCooker:
bb.event.fire(bb.event.ConfigParsed(), self.configuration.data) bb.event.fire(bb.event.ConfigParsed(), self.configuration.data)
except IOError as e: except IOError as e:
bb.msg.fatal(bb.msg.domain.Parsing, "Error when parsing %s: %s" % (files, str(e))) bb.msg.fatal(bb.msg.domain.Parsing, "Error when parsing %s: %s" % (files, str(e)))
except bb.parse.ParseError as details: except bb.parse.ParseError as details:
+9
View File
@@ -229,6 +229,15 @@ def emit_env(o=sys.__stdout__, d = init(), all=False):
for key in keys: for key in keys:
emit_var(key, o, d, all and not isfunc) and o.write('\n') emit_var(key, o, d, all and not isfunc) and o.write('\n')
def export_vars(d):
keys = (key for key in d.keys() if d.getVarFlag(key, "export"))
ret = {}
for k in keys:
v = d.getVar(k, True)
if v:
ret[k] = v
return ret
def update_data(d): def update_data(d):
"""Performs final steps upon the datastore, including application of overrides""" """Performs final steps upon the datastore, including application of overrides"""
d.finalize() d.finalize()
+5 -6
View File
@@ -31,6 +31,7 @@ import pickle
# the runqueue forks off. # the runqueue forks off.
worker_pid = 0 worker_pid = 0
worker_pipe = None worker_pipe = None
useStdout = True
class Event: class Event:
"""Base class for events""" """Base class for events"""
@@ -102,15 +103,12 @@ def fire(event, d):
def worker_fire(event, d): def worker_fire(event, d):
data = "<event>" + pickle.dumps(event) + "</event>" data = "<event>" + pickle.dumps(event) + "</event>"
try: worker_pipe.write(data)
if os.write(worker_pipe, data) != len (data): worker_pipe.flush()
print("Error sending event to server (short write)")
except OSError:
sys.exit(1)
def fire_from_worker(event, d): def fire_from_worker(event, d):
if not event.startswith("<event>") or not event.endswith("</event>"): if not event.startswith("<event>") or not event.endswith("</event>"):
print("Error, not an event") print("Error, not an event %s" % event)
return return
event = pickle.loads(event[7:-8]) event = pickle.loads(event[7:-8])
fire_ui_handlers(event, d) fire_ui_handlers(event, d)
@@ -140,6 +138,7 @@ def remove(name, handler):
def register_UIHhandler(handler): def register_UIHhandler(handler):
bb.event._ui_handler_seq = bb.event._ui_handler_seq + 1 bb.event._ui_handler_seq = bb.event._ui_handler_seq + 1
_ui_handlers[_ui_handler_seq] = handler _ui_handlers[_ui_handler_seq] = handler
bb.event.useStdout = False
return _ui_handler_seq return _ui_handler_seq
def unregister_UIHhandler(handlerNum): def unregister_UIHhandler(handlerNum):
+6 -5
View File
@@ -109,7 +109,7 @@ def debug(level, msgdomain, msg, fn = None):
if debug_level[msgdomain] >= level: if debug_level[msgdomain] >= level:
bb.event.fire(MsgDebug(msg), None) bb.event.fire(MsgDebug(msg), None)
if not bb.event._ui_handlers: if bb.event.useStdout:
print('DEBUG: %s' % (msg)) print('DEBUG: %s' % (msg))
def note(level, msgdomain, msg, fn = None): def note(level, msgdomain, msg, fn = None):
@@ -118,17 +118,18 @@ def note(level, msgdomain, msg, fn = None):
if level == 1 or verbose or debug_level[msgdomain] >= 1: if level == 1 or verbose or debug_level[msgdomain] >= 1:
bb.event.fire(MsgNote(msg), None) bb.event.fire(MsgNote(msg), None)
if not bb.event._ui_handlers: if bb.event.useStdout:
print('NOTE: %s' % (msg)) print('NOTE: %s' % (msg))
def warn(msgdomain, msg, fn = None): def warn(msgdomain, msg, fn = None):
bb.event.fire(MsgWarn(msg), None) bb.event.fire(MsgWarn(msg), None)
if not bb.event._ui_handlers: if bb.event.useStdout:
print('WARNING: %s' % (msg)) print('WARNING: %s' % (msg))
def error(msgdomain, msg, fn = None): def error(msgdomain, msg, fn = None):
bb.event.fire(MsgError(msg), None) bb.event.fire(MsgError(msg), None)
print('ERROR: %s' % (msg)) if bb.event.useStdout:
print('ERROR: %s' % (msg))
def fatal(msgdomain, msg, fn = None): def fatal(msgdomain, msg, fn = None):
bb.event.fire(MsgFatal(msg), None) bb.event.fire(MsgFatal(msg), None)
@@ -137,5 +138,5 @@ def fatal(msgdomain, msg, fn = None):
def plain(msg, fn = None): def plain(msg, fn = None):
bb.event.fire(MsgPlain(msg), None) bb.event.fire(MsgPlain(msg), None)
if not bb.event._ui_handlers: if bb.event.useStdout:
print(msg) print(msg)
+56 -74
View File
@@ -23,6 +23,7 @@ Handles preparation and execution of a queue of tasks
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import bb, os, sys import bb, os, sys
import subprocess
from bb import msg, data, event from bb import msg, data, event
import signal import signal
import stat import stat
@@ -937,6 +938,7 @@ class RunQueueExecute:
self.runq_complete = [] self.runq_complete = []
self.build_pids = {} self.build_pids = {}
self.build_pipes = {} self.build_pipes = {}
self.build_procs = {}
self.failed_fnids = [] self.failed_fnids = []
def runqueue_process_waitpid(self): def runqueue_process_waitpid(self):
@@ -944,19 +946,22 @@ class RunQueueExecute:
Return none is there are no processes awaiting result collection, otherwise Return none is there are no processes awaiting result collection, otherwise
collect the process exit codes and close the information pipe. collect the process exit codes and close the information pipe.
""" """
result = os.waitpid(-1, os.WNOHANG) for pid in self.build_procs.keys():
if result[0] is 0 and result[1] is 0: proc = self.build_procs[pid]
return None proc.poll()
task = self.build_pids[result[0]] if proc.returncode is not None:
del self.build_pids[result[0]] task = self.build_pids[pid]
self.build_pipes[result[0]].close() del self.build_pids[pid]
del self.build_pipes[result[0]] self.build_pipes[pid].close()
if result[1] != 0: del self.build_pipes[pid]
self.task_fail(task, result[1]) del self.build_procs[pid]
else: if proc.returncode != 0:
self.task_complete(task) self.task_fail(task, proc.returncode)
self.stats.taskCompleted() else:
bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) self.task_complete(task)
self.stats.taskCompleted()
bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
def finish_now(self): def finish_now(self):
if self.stats.active: if self.stats.active:
@@ -990,31 +995,8 @@ class RunQueueExecute:
def fork_off_task(self, fn, task, taskname): def fork_off_task(self, fn, task, taskname):
sys.stdout.flush() sys.stdout.flush()
sys.stderr.flush() sys.stderr.flush()
try: try:
pipein, pipeout = os.pipe()
pid = os.fork()
except OSError as e:
bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
if pid == 0:
os.close(pipein)
# Save out the PID so that the event can include it the
# events
bb.event.worker_pid = os.getpid()
bb.event.worker_pipe = pipeout
self.rq.state = runQueueChildProcess
# Make the child the process group leader
os.setpgid(0, 0)
# No stdin
newsi = os.open('/dev/null', os.O_RDWR)
os.dup2(newsi, sys.stdin.fileno())
# Stdout to a logfile
#logout = data.expand("${TMPDIR}/log/stdout.%s" % os.getpid(), self.cfgData, True)
#mkdirhier(os.path.dirname(logout))
#newso = open(logout, 'w')
#os.dup2(newso.fileno(), sys.stdout.fileno())
#os.dup2(newso.fileno(), sys.stderr.fileno())
bb.event.fire(runQueueTaskStarted(task, self.stats, self.rq), self.cfgData) bb.event.fire(runQueueTaskStarted(task, self.stats, self.rq), self.cfgData)
bb.msg.note(1, bb.msg.domain.RunQueue, bb.msg.note(1, bb.msg.domain.RunQueue,
"Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + self.stats.failed + 1, "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.stats.active + self.stats.failed + 1,
@@ -1022,26 +1004,25 @@ class RunQueueExecute:
task, task,
self.rqdata.get_user_idstring(task))) self.rqdata.get_user_idstring(task)))
bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) the_data = self.cooker.bb_cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data)
try:
the_data = self.cooker.bb_cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
if not self.cooker.configuration.dry_run: env = bb.data.export_vars(the_data)
bb.build.exec_task(taskname, the_data)
os._exit(0)
except bb.build.EventException as e: taskdep = self.rqdata.dataCache.task_deps[fn]
event = e.args[1] if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
bb.msg.error(bb.msg.domain.Build, "%s event exception, aborting" % bb.event.getName(event)) envvars = the_data.getVar("FAKEROOTENV", True).split()
os._exit(1) for var in envvars:
except Exception: comps = var.split("=")
from traceback import format_exc env[comps[0]] = comps[1]
bb.msg.error(bb.msg.domain.Build, "Build of %s %s failed" % (fn, taskname))
bb.msg.error(bb.msg.domain.Build, format_exc()) proc = subprocess.Popen(["bitbake-runtask", fn, taskname, str(self.cooker.configuration.dry_run)], env=env, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
os._exit(1) pipein = proc.stdout
os._exit(0) pipeout = proc.stdin
return pid, pipein, pipeout pid = proc.pid
except OSError as e:
bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
return proc
class RunQueueExecuteTasks(RunQueueExecute): class RunQueueExecuteTasks(RunQueueExecute):
def __init__(self, rq): def __init__(self, rq):
@@ -1153,10 +1134,11 @@ class RunQueueExecuteTasks(RunQueueExecute):
self.task_skip(task) self.task_skip(task)
continue continue
pid, pipein, pipeout = self.fork_off_task(fn, task, taskname) proc = self.fork_off_task(fn, task, taskname)
self.build_pids[pid] = task self.build_pids[proc.pid] = task
self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData) self.build_procs[proc.pid] = proc
self.build_pipes[proc.pid] = runQueuePipe(proc.stdout, proc.stdin, self.cfgData)
self.runq_running[task] = 1 self.runq_running[task] = 1
self.stats.taskActive() self.stats.taskActive()
if self.stats.active < self.number_tasks: if self.stats.active < self.number_tasks:
@@ -1356,10 +1338,11 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
self.task_skip(task) self.task_skip(task)
return True return True
pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname) proc = self.fork_off_task(fn, realtask, taskname)
self.build_pids[pid] = task self.build_pids[proc.pid] = task
self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData) self.build_procs[proc.pid] = proc
self.build_pipes[proc.pid] = runQueuePipe(proc.stdout, proc.stdin, self.cfgData)
self.runq_running[task] = 1 self.runq_running[task] = 1
self.stats.taskActive() self.stats.taskActive()
if self.stats.active < self.number_tasks: if self.stats.active < self.number_tasks:
@@ -1384,7 +1367,6 @@ class RunQueueExecuteScenequeue(RunQueueExecute):
self.rq.state = runQueueRunInit self.rq.state = runQueueRunInit
return True return True
class TaskFailure(Exception): class TaskFailure(Exception):
""" """
Exception raised when a task in a runqueue fails Exception raised when a task in a runqueue fails
@@ -1437,14 +1419,14 @@ class runQueueTaskCompleted(runQueueEvent):
runQueueEvent.__init__(self, task, stats, rq) runQueueEvent.__init__(self, task, stats, rq)
self.message = "Task %s completed (%s)" % (task, self.taskstring) self.message = "Task %s completed (%s)" % (task, self.taskstring)
def check_stamp_fn(fn, taskname, d): #def check_stamp_fn(fn, taskname, d):
rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d) # rq = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", d)
fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d) # fn = bb.data.getVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", d)
fnid = rq.rqdata.taskData.getfn_id(fn) # fnid = rq.rqdata.taskData.getfn_id(fn)
taskid = rq.get_task_id(fnid, taskname) # taskid = rq.get_task_id(fnid, taskname)
if taskid is not None: # if taskid is not None:
return rq.check_stamp_task(taskid) # return rq.check_stamp_task(taskid)
return None # return None
class runQueuePipe(): class runQueuePipe():
""" """
@@ -1452,7 +1434,7 @@ class runQueuePipe():
""" """
def __init__(self, pipein, pipeout, d): def __init__(self, pipein, pipeout, d):
self.fd = pipein self.fd = pipein
os.close(pipeout) pipeout.close()
fcntl.fcntl(self.fd, fcntl.F_SETFL, fcntl.fcntl(self.fd, fcntl.F_GETFL) | os.O_NONBLOCK) fcntl.fcntl(self.fd, fcntl.F_SETFL, fcntl.fcntl(self.fd, fcntl.F_GETFL) | os.O_NONBLOCK)
self.queue = "" self.queue = ""
self.d = d self.d = d
@@ -1460,8 +1442,8 @@ class runQueuePipe():
def read(self): def read(self):
start = len(self.queue) start = len(self.queue)
try: try:
self.queue = self.queue + os.read(self.fd, 1024) self.queue = self.queue + self.fd.read(1024)
except OSError: except IOError:
pass pass
end = len(self.queue) end = len(self.queue)
index = self.queue.find("</event>") index = self.queue.find("</event>")
@@ -1476,4 +1458,4 @@ class runQueuePipe():
continue continue
if len(self.queue) > 0: if len(self.queue) > 0:
print("Warning, worker left partial message") print("Warning, worker left partial message")
os.close(self.fd) self.fd.close()