1
0
mirror of https://git.yoctoproject.org/poky synced 2026-06-01 13:09:50 +00:00

bitbake: bitbake: hashserve: Add async client

Adds support for create a client that operates using Python asynchronous
I/O.

(Bitbake rev: cf9bc0310b0092bf52b61057405aeb51c86ba137)

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Joshua Watt
2020-11-10 08:59:55 -06:00
committed by Richard Purdie
parent 451af0105b
commit 859f43e176
2 changed files with 139 additions and 104 deletions
+13
View File
@@ -3,6 +3,7 @@
# SPDX-License-Identifier: GPL-2.0-only # SPDX-License-Identifier: GPL-2.0-only
# #
import asyncio
from contextlib import closing from contextlib import closing
import re import re
import sqlite3 import sqlite3
@@ -113,3 +114,15 @@ def create_client(addr):
c.connect_tcp(*a) c.connect_tcp(*a)
return c return c
async def create_async_client(addr):
from . import client
c = client.AsyncClient()
(typ, a) = parse_address(addr)
if typ == ADDR_TYPE_UNIX:
await c.connect_unix(*a)
else:
await c.connect_tcp(*a)
return c
+126 -104
View File
@@ -3,189 +3,211 @@
# SPDX-License-Identifier: GPL-2.0-only # SPDX-License-Identifier: GPL-2.0-only
# #
import asyncio
import json import json
import logging import logging
import socket import socket
import os import os
from . import chunkify, DEFAULT_MAX_CHUNK from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client
logger = logging.getLogger('hashserv.client') logger = logging.getLogger("hashserv.client")
class HashConnectionError(Exception): class HashConnectionError(Exception):
pass pass
class Client(object): class AsyncClient(object):
MODE_NORMAL = 0 MODE_NORMAL = 0
MODE_GET_STREAM = 1 MODE_GET_STREAM = 1
def __init__(self): def __init__(self):
self._socket = None
self.reader = None self.reader = None
self.writer = None self.writer = None
self.mode = self.MODE_NORMAL self.mode = self.MODE_NORMAL
self.max_chunk = DEFAULT_MAX_CHUNK self.max_chunk = DEFAULT_MAX_CHUNK
def connect_tcp(self, address, port): async def connect_tcp(self, address, port):
def connect_sock(): async def connect_sock():
s = socket.create_connection((address, port)) return await asyncio.open_connection(address, port)
s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
return s
self._connect_sock = connect_sock self._connect_sock = connect_sock
def connect_unix(self, path): async def connect_unix(self, path):
def connect_sock(): async def connect_sock():
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) return await asyncio.open_unix_connection(path)
# AF_UNIX has path length issues so chdir here to workaround
cwd = os.getcwd()
try:
os.chdir(os.path.dirname(path))
s.connect(os.path.basename(path))
finally:
os.chdir(cwd)
return s
self._connect_sock = connect_sock self._connect_sock = connect_sock
def connect(self): async def _connect(self):
if self._socket is None: if self.reader is None or self.writer is None:
self._socket = self._connect_sock() (self.reader, self.writer) = await self._connect_sock()
self.reader = self._socket.makefile('r', encoding='utf-8') self.writer.write("OEHASHEQUIV 1.1\n\n".encode("utf-8"))
self.writer = self._socket.makefile('w', encoding='utf-8') await self.writer.drain()
self.writer.write('OEHASHEQUIV 1.1\n\n')
self.writer.flush()
# Restore mode if the socket is being re-created
cur_mode = self.mode cur_mode = self.mode
self.mode = self.MODE_NORMAL self.mode = self.MODE_NORMAL
self._set_mode(cur_mode) await self._set_mode(cur_mode)
return self._socket async def close(self):
self.reader = None
def close(self): if self.writer is not None:
if self._socket is not None: self.writer.close()
self._socket.close()
self._socket = None
self.reader = None
self.writer = None self.writer = None
def _send_wrapper(self, proc): async def _send_wrapper(self, proc):
count = 0 count = 0
while True: while True:
try: try:
self.connect() await self._connect()
return proc() return await proc()
except (OSError, HashConnectionError, json.JSONDecodeError, UnicodeDecodeError) as e: except (
logger.warning('Error talking to server: %s' % e) OSError,
HashConnectionError,
json.JSONDecodeError,
UnicodeDecodeError,
) as e:
logger.warning("Error talking to server: %s" % e)
if count >= 3: if count >= 3:
if not isinstance(e, HashConnectionError): if not isinstance(e, HashConnectionError):
raise HashConnectionError(str(e)) raise HashConnectionError(str(e))
raise e raise e
self.close() await self.close()
count += 1 count += 1
def send_message(self, msg): async def send_message(self, msg):
def get_line(): async def get_line():
line = self.reader.readline() line = await self.reader.readline()
if not line: if not line:
raise HashConnectionError('Connection closed') raise HashConnectionError("Connection closed")
if not line.endswith('\n'): line = line.decode("utf-8")
raise HashConnectionError('Bad message %r' % message)
if not line.endswith("\n"):
raise HashConnectionError("Bad message %r" % message)
return line return line
def proc(): async def proc():
for c in chunkify(json.dumps(msg), self.max_chunk): for c in chunkify(json.dumps(msg), self.max_chunk):
self.writer.write(c) self.writer.write(c.encode("utf-8"))
self.writer.flush() await self.writer.drain()
l = get_line() l = await get_line()
m = json.loads(l) m = json.loads(l)
if 'chunk-stream' in m: if "chunk-stream" in m:
lines = [] lines = []
while True: while True:
l = get_line().rstrip('\n') l = (await get_line()).rstrip("\n")
if not l: if not l:
break break
lines.append(l) lines.append(l)
m = json.loads(''.join(lines)) m = json.loads("".join(lines))
return m return m
return self._send_wrapper(proc) return await self._send_wrapper(proc)
def send_stream(self, msg): async def send_stream(self, msg):
def proc(): async def proc():
self.writer.write("%s\n" % msg) self.writer.write(("%s\n" % msg).encode("utf-8"))
self.writer.flush() await self.writer.drain()
l = self.reader.readline() l = await self.reader.readline()
if not l: if not l:
raise HashConnectionError('Connection closed') raise HashConnectionError("Connection closed")
return l.rstrip() return l.decode("utf-8").rstrip()
return self._send_wrapper(proc) return await self._send_wrapper(proc)
def _set_mode(self, new_mode): async def _set_mode(self, new_mode):
if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM:
r = self.send_stream('END') r = await self.send_stream("END")
if r != 'ok': if r != "ok":
raise HashConnectionError('Bad response from server %r' % r) raise HashConnectionError("Bad response from server %r" % r)
elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL:
r = self.send_message({'get-stream': None}) r = await self.send_message({"get-stream": None})
if r != 'ok': if r != "ok":
raise HashConnectionError('Bad response from server %r' % r) raise HashConnectionError("Bad response from server %r" % r)
elif new_mode != self.mode: elif new_mode != self.mode:
raise Exception('Undefined mode transition %r -> %r' % (self.mode, new_mode)) raise Exception(
"Undefined mode transition %r -> %r" % (self.mode, new_mode)
)
self.mode = new_mode self.mode = new_mode
def get_unihash(self, method, taskhash): async def get_unihash(self, method, taskhash):
self._set_mode(self.MODE_GET_STREAM) await self._set_mode(self.MODE_GET_STREAM)
r = self.send_stream('%s %s' % (method, taskhash)) r = await self.send_stream("%s %s" % (method, taskhash))
if not r: if not r:
return None return None
return r return r
def report_unihash(self, taskhash, method, outhash, unihash, extra={}): async def report_unihash(self, taskhash, method, outhash, unihash, extra={}):
self._set_mode(self.MODE_NORMAL) await self._set_mode(self.MODE_NORMAL)
m = extra.copy() m = extra.copy()
m['taskhash'] = taskhash m["taskhash"] = taskhash
m['method'] = method m["method"] = method
m['outhash'] = outhash m["outhash"] = outhash
m['unihash'] = unihash m["unihash"] = unihash
return self.send_message({'report': m}) return await self.send_message({"report": m})
def report_unihash_equiv(self, taskhash, method, unihash, extra={}): async def report_unihash_equiv(self, taskhash, method, unihash, extra={}):
self._set_mode(self.MODE_NORMAL) await self._set_mode(self.MODE_NORMAL)
m = extra.copy() m = extra.copy()
m['taskhash'] = taskhash m["taskhash"] = taskhash
m['method'] = method m["method"] = method
m['unihash'] = unihash m["unihash"] = unihash
return self.send_message({'report-equiv': m}) return await self.send_message({"report-equiv": m})
def get_taskhash(self, method, taskhash, all_properties=False): async def get_taskhash(self, method, taskhash, all_properties=False):
self._set_mode(self.MODE_NORMAL) await self._set_mode(self.MODE_NORMAL)
return self.send_message({'get': { return await self.send_message(
'taskhash': taskhash, {"get": {"taskhash": taskhash, "method": method, "all": all_properties}}
'method': method, )
'all': all_properties
}})
def get_stats(self): async def get_stats(self):
self._set_mode(self.MODE_NORMAL) await self._set_mode(self.MODE_NORMAL)
return self.send_message({'get-stats': None}) return await self.send_message({"get-stats": None})
def reset_stats(self): async def reset_stats(self):
self._set_mode(self.MODE_NORMAL) await self._set_mode(self.MODE_NORMAL)
return self.send_message({'reset-stats': None}) return await self.send_message({"reset-stats": None})
class Client(object):
def __init__(self):
self.client = AsyncClient()
self.loop = asyncio.new_event_loop()
def get_wrapper(self, downcall):
def wrapper(*args, **kwargs):
return self.loop.run_until_complete(downcall(*args, **kwargs))
return wrapper
for call in (
"connect_tcp",
"connect_unix",
"close",
"get_unihash",
"report_unihash",
"report_unihash_equiv",
"get_taskhash",
"get_stats",
"reset_stats",
):
downcall = getattr(self.client, call)
setattr(self, call, get_wrapper(self, downcall))
@property
def max_chunk(self):
return self.client.max_chunk
@max_chunk.setter
def max_chunk(self, value):
self.client.max_chunk = value