mirror of
https://git.yoctoproject.org/poky
synced 2026-06-02 01:19:52 +00:00
bitbake: prserv: add "upstream" server support
Introduce a PRSERVER_UPSTREAM variable that makes the local PR server connect to an "upstream" one. This makes it possible to implement local fixes to an upstream package (revision "x", in a way that gives the local update priority (revision "x.y"). Update the calculation of the new revisions to support the case when prior revisions are not integers, but have an "x.y..." format." Set the comments in the handle_get_pr() function in serv.py for details about the calculation of the local revision. This is done by going on supporting the "history" mode that wasn't used so far (revisions can return to a previous historical value), in addition to the default "no history" mode (revisions can never decrease). Rather than storing the history mode in the database table itself (i.e. "PRMAIN_hist" and "PRMAIN_nohist"), the history mode is now passed through the client requests. As a consequence, the table name is now "PRMAIN", which is incompatible with what was generated before, but avoids confusion if we kept the "PRMAIN_nohist" name for both "history" and "no history" modes. Update the server version to "2.0.0". (Bitbake rev: 48857ec3e075791bd73d92747c609a0a4fda0e0c) Signed-off-by: Michael Opdenacker <michael.opdenacker@bootlin.com> Cc: Joshua Watt <JPEWhacker@gmail.com> Cc: Tim Orling <ticotimo@gmail.com> Cc: Thomas Petazzoni <thomas.petazzoni@bootlin.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
committed by
Richard Purdie
parent
5f99010e41
commit
4cbce9cdf7
+117
-17
@@ -12,6 +12,7 @@ import sqlite3
|
||||
import prserv
|
||||
import prserv.db
|
||||
import errno
|
||||
from . import create_async_client, revision_smaller, increase_revision
|
||||
import bb.asyncrpc
|
||||
|
||||
logger = logging.getLogger("BitBake.PRserv")
|
||||
@@ -51,8 +52,9 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection):
|
||||
version = request["version"]
|
||||
pkgarch = request["pkgarch"]
|
||||
checksum = request["checksum"]
|
||||
history = request["history"]
|
||||
|
||||
value = self.server.table.find_value(version, pkgarch, checksum)
|
||||
value = self.server.table.find_value(version, pkgarch, checksum, history)
|
||||
return {"value": value}
|
||||
|
||||
async def handle_test_package(self, request):
|
||||
@@ -68,22 +70,110 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection):
|
||||
version = request["version"]
|
||||
pkgarch = request["pkgarch"]
|
||||
|
||||
value = self.server.table.find_max_value(version, pkgarch)
|
||||
value = self.server.table.find_package_max_value(version, pkgarch)
|
||||
return {"value": value}
|
||||
|
||||
async def handle_get_pr(self, request):
|
||||
version = request["version"]
|
||||
pkgarch = request["pkgarch"]
|
||||
checksum = request["checksum"]
|
||||
history = request["history"]
|
||||
|
||||
if self.upstream_client is None:
|
||||
value = self.server.table.get_value(version, pkgarch, checksum, history)
|
||||
return {"value": value}
|
||||
|
||||
# We have an upstream server.
|
||||
# Check whether the local server already knows the requested configuration.
|
||||
# If the configuration is a new one, the generated value we will add will
|
||||
# depend on what's on the upstream server. That's why we're calling find_value()
|
||||
# instead of get_value() directly.
|
||||
|
||||
value = self.server.table.find_value(version, pkgarch, checksum, history)
|
||||
upstream_max = await self.upstream_client.max_package_pr(version, pkgarch)
|
||||
|
||||
if value is not None:
|
||||
|
||||
# The configuration is already known locally.
|
||||
|
||||
if history:
|
||||
value = self.server.table.get_value(version, pkgarch, checksum, history)
|
||||
else:
|
||||
existing_value = value
|
||||
# In "no history", we need to make sure the value doesn't decrease
|
||||
# and is at least greater than the maximum upstream value
|
||||
# and the maximum local value
|
||||
|
||||
local_max = self.server.table.find_package_max_value(version, pkgarch)
|
||||
if revision_smaller(value, local_max):
|
||||
value = increase_revision(local_max)
|
||||
|
||||
if revision_smaller(value, upstream_max):
|
||||
# Ask upstream whether it knows the checksum
|
||||
upstream_value = await self.upstream_client.test_pr(version, pkgarch, checksum)
|
||||
if upstream_value is None:
|
||||
# Upstream doesn't have our checksum, let create a new one
|
||||
value = upstream_max + ".0"
|
||||
else:
|
||||
# Fine to take the same value as upstream
|
||||
value = upstream_max
|
||||
|
||||
if not value == existing_value and not self.server.read_only:
|
||||
self.server.table.store_value(version, pkgarch, checksum, value)
|
||||
|
||||
return {"value": value}
|
||||
|
||||
# The configuration is a new one for the local server
|
||||
# Let's ask the upstream server whether it knows it
|
||||
|
||||
known_upstream = await self.upstream_client.test_package(version, pkgarch)
|
||||
|
||||
if not known_upstream:
|
||||
|
||||
# The package is not known upstream, must be a local-only package
|
||||
# Let's compute the PR number using the local-only method
|
||||
|
||||
value = self.server.table.get_value(version, pkgarch, checksum, history)
|
||||
return {"value": value}
|
||||
|
||||
# The package is known upstream, let's ask the upstream server
|
||||
# whether it knows our new output hash
|
||||
|
||||
value = await self.upstream_client.test_pr(version, pkgarch, checksum)
|
||||
|
||||
if value is not None:
|
||||
|
||||
# Upstream knows this output hash, let's store it and use it too.
|
||||
|
||||
if not self.server.read_only:
|
||||
self.server.table.store_value(version, pkgarch, checksum, value)
|
||||
# If the local server is read only, won't be able to store the new
|
||||
# value in the database and will have to keep asking the upstream server
|
||||
return {"value": value}
|
||||
|
||||
# The output hash doesn't exist upstream, get the most recent number from upstream (x)
|
||||
# Then, we want to have a new PR value for the local server: x.y
|
||||
|
||||
upstream_max = await self.upstream_client.max_package_pr(version, pkgarch)
|
||||
# Here we know that the package is known upstream, so upstream_max can't be None
|
||||
subvalue = self.server.table.find_new_subvalue(version, pkgarch, upstream_max)
|
||||
|
||||
if not self.server.read_only:
|
||||
self.server.table.store_value(version, pkgarch, checksum, subvalue)
|
||||
|
||||
return {"value": subvalue}
|
||||
|
||||
async def process_requests(self):
|
||||
if self.server.upstream is not None:
|
||||
self.upstream_client = await create_async_client(self.server.upstream)
|
||||
else:
|
||||
self.upstream_client = None
|
||||
|
||||
response = None
|
||||
try:
|
||||
value = self.server.table.get_value(version, pkgarch, checksum)
|
||||
response = {"value": value}
|
||||
except prserv.NotFoundError:
|
||||
self.logger.error("failure storing value in database for (%s, %s)",version, checksum)
|
||||
|
||||
return response
|
||||
await super().process_requests()
|
||||
finally:
|
||||
if self.upstream_client is not None:
|
||||
await self.upstream_client.close()
|
||||
|
||||
async def handle_import_one(self, request):
|
||||
response = None
|
||||
@@ -92,8 +182,9 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection):
|
||||
pkgarch = request["pkgarch"]
|
||||
checksum = request["checksum"]
|
||||
value = request["value"]
|
||||
history = request["history"]
|
||||
|
||||
value = self.server.table.importone(version, pkgarch, checksum, value)
|
||||
value = self.server.table.importone(version, pkgarch, checksum, value, history)
|
||||
if value is not None:
|
||||
response = {"value": value}
|
||||
|
||||
@@ -104,9 +195,10 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection):
|
||||
pkgarch = request["pkgarch"]
|
||||
checksum = request["checksum"]
|
||||
colinfo = request["colinfo"]
|
||||
history = request["history"]
|
||||
|
||||
try:
|
||||
(metainfo, datainfo) = self.server.table.export(version, pkgarch, checksum, colinfo)
|
||||
(metainfo, datainfo) = self.server.table.export(version, pkgarch, checksum, colinfo, history)
|
||||
except sqlite3.Error as exc:
|
||||
self.logger.error(str(exc))
|
||||
metainfo = datainfo = None
|
||||
@@ -117,11 +209,12 @@ class PRServerClient(bb.asyncrpc.AsyncServerConnection):
|
||||
return {"readonly": self.server.read_only}
|
||||
|
||||
class PRServer(bb.asyncrpc.AsyncServer):
|
||||
def __init__(self, dbfile, read_only=False):
|
||||
def __init__(self, dbfile, read_only=False, upstream=None):
|
||||
super().__init__(logger)
|
||||
self.dbfile = dbfile
|
||||
self.table = None
|
||||
self.read_only = read_only
|
||||
self.upstream = upstream
|
||||
|
||||
def accept_client(self, socket):
|
||||
return PRServerClient(socket, self)
|
||||
@@ -134,6 +227,9 @@ class PRServer(bb.asyncrpc.AsyncServer):
|
||||
self.logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
|
||||
(self.dbfile, self.address, str(os.getpid())))
|
||||
|
||||
if self.upstream is not None:
|
||||
self.logger.info("And upstream PRServer: %s " % (self.upstream))
|
||||
|
||||
return tasks
|
||||
|
||||
async def stop(self):
|
||||
@@ -147,14 +243,15 @@ class PRServer(bb.asyncrpc.AsyncServer):
|
||||
self.table.sync()
|
||||
|
||||
class PRServSingleton(object):
|
||||
def __init__(self, dbfile, logfile, host, port):
|
||||
def __init__(self, dbfile, logfile, host, port, upstream):
|
||||
self.dbfile = dbfile
|
||||
self.logfile = logfile
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.upstream = upstream
|
||||
|
||||
def start(self):
|
||||
self.prserv = PRServer(self.dbfile)
|
||||
self.prserv = PRServer(self.dbfile, upstream=self.upstream)
|
||||
self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port)
|
||||
self.process = self.prserv.serve_as_process(log_level=logging.WARNING)
|
||||
|
||||
@@ -233,7 +330,7 @@ def run_as_daemon(func, pidfile, logfile):
|
||||
os.remove(pidfile)
|
||||
os._exit(0)
|
||||
|
||||
def start_daemon(dbfile, host, port, logfile, read_only=False):
|
||||
def start_daemon(dbfile, host, port, logfile, read_only=False, upstream=None):
|
||||
ip = socket.gethostbyname(host)
|
||||
pidfile = PIDPREFIX % (ip, port)
|
||||
try:
|
||||
@@ -249,7 +346,7 @@ def start_daemon(dbfile, host, port, logfile, read_only=False):
|
||||
|
||||
dbfile = os.path.abspath(dbfile)
|
||||
def daemon_main():
|
||||
server = PRServer(dbfile, read_only=read_only)
|
||||
server = PRServer(dbfile, read_only=read_only, upstream=upstream)
|
||||
server.start_tcp_server(ip, port)
|
||||
server.serve_forever()
|
||||
|
||||
@@ -336,6 +433,9 @@ def auto_start(d):
|
||||
|
||||
host = host_params[0].strip().lower()
|
||||
port = int(host_params[1])
|
||||
|
||||
upstream = d.getVar("PRSERV_UPSTREAM") or None
|
||||
|
||||
if is_local_special(host, port):
|
||||
import bb.utils
|
||||
cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE"))
|
||||
@@ -350,7 +450,7 @@ def auto_start(d):
|
||||
auto_shutdown()
|
||||
if not singleton:
|
||||
bb.utils.mkdirhier(cachedir)
|
||||
singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port)
|
||||
singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port, upstream)
|
||||
singleton.start()
|
||||
if singleton:
|
||||
host = singleton.host
|
||||
|
||||
Reference in New Issue
Block a user