From fb3b05fe8da817967c9f90d4c4c0c1fee87c9f01 Mon Sep 17 00:00:00 2001 From: Paul Barker Date: Thu, 19 Aug 2021 12:46:43 -0400 Subject: [PATCH] bitbake: prserv: Replace XML RPC with modern asyncrpc implementation Update the prserv client and server classes to use the modern json and asyncio based RPC system implemented by the asyncrpc module. (Bitbake rev: 6a2b23e27bb61185b8afb382e20ce79f996d9183) Signed-off-by: Paul Barker [updated for asyncrpc changes, client split to separate file] Signed-off-by: Scott Murray Signed-off-by: Richard Purdie --- bitbake/lib/prserv/client.py | 41 ++++++ bitbake/lib/prserv/serv.py | 252 ++++++++++++++++------------------- 2 files changed, 156 insertions(+), 137 deletions(-) create mode 100644 bitbake/lib/prserv/client.py diff --git a/bitbake/lib/prserv/client.py b/bitbake/lib/prserv/client.py new file mode 100644 index 0000000000..285dce72f6 --- /dev/null +++ b/bitbake/lib/prserv/client.py @@ -0,0 +1,41 @@ +# +# SPDX-License-Identifier: GPL-2.0-only +# + +import logging +import bb.asyncrpc + +logger = logging.getLogger("BitBake.PRserv") + +class PRAsyncClient(bb.asyncrpc.AsyncClient): + def __init__(self): + super().__init__('PRSERVICE', '1.0', logger) + + async def getPR(self, version, pkgarch, checksum): + response = await self.send_message( + {'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}} + ) + if response: + return response['value'] + + async def importone(self, version, pkgarch, checksum, value): + response = await self.send_message( + {'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}} + ) + if response: + return response['value'] + + async def export(self, version, pkgarch, checksum, colinfo): + response = await self.send_message( + {'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}} + ) + if response: + return (response['metainfo'], response['datainfo']) + +class PRClient(bb.asyncrpc.Client): + def __init__(self): + super().__init__() + self._add_methods('getPR', 'importone', 'export') + + def _get_async_client(self): + return PRAsyncClient() diff --git a/bitbake/lib/prserv/serv.py b/bitbake/lib/prserv/serv.py index 5e322bf83d..1fa4e1766c 100644 --- a/bitbake/lib/prserv/serv.py +++ b/bitbake/lib/prserv/serv.py @@ -4,157 +4,125 @@ import os,sys,logging import signal, time -from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler import socket import io import sqlite3 -import bb.server.xmlrpcclient import prserv import prserv.db import errno -import multiprocessing +import bb.asyncrpc logger = logging.getLogger("BitBake.PRserv") -class Handler(SimpleXMLRPCRequestHandler): - def _dispatch(self,method,params): - try: - value=self.server.funcs[method](*params) - except: - import traceback - traceback.print_exc() - raise - return value - PIDPREFIX = "/tmp/PRServer_%s_%s.pid" singleton = None +class PRServerClient(bb.asyncrpc.AsyncServerConnection): + def __init__(self, reader, writer, table): + super().__init__(reader, writer, 'PRSERVICE', logger) + self.handlers.update({ + 'get-pr': self.handle_get_pr, + 'import-one': self.handle_import_one, + 'export': self.handle_export, + }) + self.table = table -class PRServer(SimpleXMLRPCServer): - def __init__(self, dbfile, logfile, interface): - ''' constructor ''' + def validate_proto_version(self): + return (self.proto_version == (1, 0)) + + async def dispatch_message(self, msg): try: - SimpleXMLRPCServer.__init__(self, interface, - logRequests=False, allow_none=True) - except socket.error: - ip=socket.gethostbyname(interface[0]) - port=interface[1] - msg="PR Server unable to bind to %s:%s\n" % (ip, port) - sys.stderr.write(msg) - raise PRServiceConfigError - - self.dbfile=dbfile - self.logfile=logfile - self.host, self.port = self.socket.getsockname() - - self.register_function(self.getPR, "getPR") - self.register_function(self.ping, "ping") - self.register_function(self.export, "export") - self.register_function(self.importone, "importone") - self.register_introspection_functions() - - self.iter_count = 0 - # 60 iterations between syncs or sync if dirty every ~30 seconds - self.iterations_between_sync = 60 - - def sigint_handler(self, signum, stack): - if self.table: - self.table.sync() - - def sigterm_handler(self, signum, stack): - if self.table: - self.table.sync() - raise(SystemExit) - - def process_request(self, request, client_address): - if request is None: - return - try: - self.finish_request(request, client_address) - self.shutdown_request(request) - self.iter_count = (self.iter_count + 1) % self.iterations_between_sync - if self.iter_count == 0: - self.table.sync_if_dirty() + await super().dispatch_message(msg) except: - self.handle_error(request, client_address) - self.shutdown_request(request) self.table.sync() + raise + self.table.sync_if_dirty() - def serve_forever(self, poll_interval=0.5): - signal.signal(signal.SIGINT, self.sigint_handler) - signal.signal(signal.SIGTERM, self.sigterm_handler) + async def handle_get_pr(self, request): + version = request['version'] + pkgarch = request['pkgarch'] + checksum = request['checksum'] - self.db = prserv.db.PRData(self.dbfile) - self.table = self.db["PRMAIN"] - return super().serve_forever(poll_interval) - - def export(self, version=None, pkgarch=None, checksum=None, colinfo=True): + response = None try: - return self.table.export(version, pkgarch, checksum, colinfo) - except sqlite3.Error as exc: - logger.error(str(exc)) - return None - - def importone(self, version, pkgarch, checksum, value): - return self.table.importone(version, pkgarch, checksum, value) - - def ping(self): - return True - - def getinfo(self): - return (self.host, self.port) - - def getPR(self, version, pkgarch, checksum): - try: - return self.table.getValue(version, pkgarch, checksum) + value = self.table.getValue(version, pkgarch, checksum) + response = {'value': value} except prserv.NotFoundError: logger.error("can not find value for (%s, %s)",version, checksum) - return None except sqlite3.Error as exc: logger.error(str(exc)) - return None + + self.write_message(response) + + async def handle_import_one(self, request): + version = request['version'] + pkgarch = request['pkgarch'] + checksum = request['checksum'] + value = request['value'] + + value = self.table.importone(version, pkgarch, checksum, value) + if value is not None: + response = {'value': value} + else: + response = None + self.write_message(response) + + async def handle_export(self, request): + version = request['version'] + pkgarch = request['pkgarch'] + checksum = request['checksum'] + colinfo = request['colinfo'] + + try: + (metainfo, datainfo) = self.table.export(version, pkgarch, checksum, colinfo) + except sqlite3.Error as exc: + logger.error(str(exc)) + metainfo = datainfo = None + + response = {'metainfo': metainfo, 'datainfo': datainfo} + self.write_message(response) + +class PRServer(bb.asyncrpc.AsyncServer): + def __init__(self, dbfile): + super().__init__(logger) + self.dbfile = dbfile + self.table = None + + def accept_client(self, reader, writer): + return PRServerClient(reader, writer, self.table) + + def _serve_forever(self): + self.db = prserv.db.PRData(self.dbfile) + self.table = self.db["PRMAIN"] + + logger.debug("Started PRServer with DBfile: %s, Address: %s, PID: %s" % + (self.dbfile, self.address, str(os.getpid()))) + + super()._serve_forever() + + self.table.sync_if_dirty() + self.db.disconnect() + + def signal_handler(self): + super().signal_handler() + if self.table: + self.table.sync() class PRServSingleton(object): - def __init__(self, dbfile, logfile, interface): + def __init__(self, dbfile, logfile, host, port): self.dbfile = dbfile self.logfile = logfile - self.interface = interface - self.host = None - self.port = None - - def start(self): - self.prserv = PRServer(self.dbfile, self.logfile, self.interface) - self.process = multiprocessing.Process(target=self.prserv.serve_forever) - self.process.start() - - self.host, self.port = self.prserv.getinfo() - - def getinfo(self): - return (self.host, self.port) - -class PRServerConnection(object): - def __init__(self, host, port): - if is_local_special(host, port): - host, port = singleton.getinfo() self.host = host self.port = port - self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port) - def getPR(self, version, pkgarch, checksum): - return self.connection.getPR(version, pkgarch, checksum) + def start(self): + self.prserv = PRServer(self.dbfile) + self.prserv.start_tcp_server(self.host, self.port) + self.process = self.prserv.serve_as_process() - def ping(self): - return self.connection.ping() - - def export(self,version=None, pkgarch=None, checksum=None, colinfo=True): - return self.connection.export(version, pkgarch, checksum, colinfo) - - def importone(self, version, pkgarch, checksum, value): - return self.connection.importone(version, pkgarch, checksum, value) - - def getinfo(self): - return self.host, self.port + if not self.port: + self.port = int(self.prserv.address.rsplit(':', 1)[1]) def run_as_daemon(func, pidfile, logfile): """ @@ -240,15 +208,13 @@ def start_daemon(dbfile, host, port, logfile): % pidfile) return 1 - server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port)) - run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile)) + dbfile = os.path.abspath(dbfile) + def daemon_main(): + server = PRServer(dbfile) + server.start_tcp_server(host, port) + server.serve_forever() - # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with - # the one the server actually is listening, so at least warn the user about it - _,rport = server.getinfo() - if port != rport: - sys.stdout.write("Server is listening at port %s instead of %s\n" - % (rport,port)) + run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile)) return 0 def stop_daemon(host, port): @@ -302,7 +268,7 @@ def is_running(pid): return True def is_local_special(host, port): - if host.strip().upper() == 'localhost'.upper() and (not port): + if host.strip().lower() == 'localhost' and not port: return True else: return False @@ -340,20 +306,19 @@ def auto_start(d): auto_shutdown() if not singleton: bb.utils.mkdirhier(cachedir) - singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0)) + singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), "localhost", 0) singleton.start() if singleton: - host, port = singleton.getinfo() + host = singleton.host + port = singleton.port else: host = host_params[0] port = int(host_params[1]) try: - connection = PRServerConnection(host,port) - connection.ping() - realhost, realport = connection.getinfo() - return str(realhost) + ":" + str(realport) - + ping(host, port) + return str(host) + ":" + str(port) + except Exception: logger.critical("PRservice %s:%d not available" % (host, port)) raise PRServiceConfigError @@ -366,8 +331,21 @@ def auto_shutdown(): singleton = None def ping(host, port): - conn=PRServerConnection(host, port) + from . import client + + conn = client.PRClient() + conn.connect_tcp(host, port) return conn.ping() def connect(host, port): - return PRServerConnection(host, port) + from . import client + + global singleton + + if host.strip().lower() == 'localhost' and not port: + host = 'localhost' + port = singleton.port + + conn = client.PRClient() + conn.connect_tcp(host, port) + return conn