mirror of
https://git.yoctoproject.org/poky
synced 2026-05-07 16:59:22 +00:00
bitbake: hashserv: Add Client Pool
Implements a Client Pool derived from the AsyncRPC client pool that allows querying for multiple equivalent hashes in parallel (Bitbake rev: ba4c764d8061c7b88cd4985ca493d6ea6e317106) Signed-off-by: Joshua Watt <JPEWhacker@gmail.com> Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
committed by
Richard Purdie
parent
2406bd1055
commit
37b4d7e493
@@ -283,3 +283,83 @@ class Client(bb.asyncrpc.Client):
|
||||
|
||||
def _get_async_client(self):
|
||||
return AsyncClient(self.username, self.password)
|
||||
|
||||
|
||||
class ClientPool(bb.asyncrpc.ClientPool):
|
||||
def __init__(
|
||||
self,
|
||||
address,
|
||||
max_clients,
|
||||
*,
|
||||
username=None,
|
||||
password=None,
|
||||
become=None,
|
||||
):
|
||||
super().__init__(max_clients)
|
||||
self.address = address
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.become = become
|
||||
|
||||
async def _new_client(self):
|
||||
client = await create_async_client(
|
||||
self.address,
|
||||
username=self.username,
|
||||
password=self.password,
|
||||
)
|
||||
if self.become:
|
||||
await client.become_user(self.become)
|
||||
return client
|
||||
|
||||
def _run_key_tasks(self, queries, call):
|
||||
results = {key: None for key in queries.keys()}
|
||||
|
||||
def make_task(key, args):
|
||||
async def task(client):
|
||||
nonlocal results
|
||||
unihash = await call(client, args)
|
||||
results[key] = unihash
|
||||
|
||||
return task
|
||||
|
||||
def gen_tasks():
|
||||
for key, args in queries.items():
|
||||
yield make_task(key, args)
|
||||
|
||||
self.run_tasks(gen_tasks())
|
||||
return results
|
||||
|
||||
def get_unihashes(self, queries):
|
||||
"""
|
||||
Query multiple unihashes in parallel.
|
||||
|
||||
The queries argument is a dictionary with arbitrary key. The values
|
||||
must be a tuple of (method, taskhash).
|
||||
|
||||
Returns a dictionary with a corresponding key for each input key, and
|
||||
the value is the queried unihash (which might be none if the query
|
||||
failed)
|
||||
"""
|
||||
|
||||
async def call(client, args):
|
||||
method, taskhash = args
|
||||
return await client.get_unihash(method, taskhash)
|
||||
|
||||
return self._run_key_tasks(queries, call)
|
||||
|
||||
def unihashes_exist(self, queries):
|
||||
"""
|
||||
Query multiple unihash existence checks in parallel.
|
||||
|
||||
The queries argument is a dictionary with arbitrary key. The values
|
||||
must be a unihash.
|
||||
|
||||
Returns a dictionary with a corresponding key for each input key, and
|
||||
the value is True or False if the unihash is known by the server (or
|
||||
None if there was a failure)
|
||||
"""
|
||||
|
||||
async def call(client, unihash):
|
||||
return await client.unihash_exists(unihash)
|
||||
|
||||
return self._run_key_tasks(queries, call)
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
from . import create_server, create_client
|
||||
from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS
|
||||
from bb.asyncrpc import InvokeError
|
||||
from .client import ClientPool
|
||||
import hashlib
|
||||
import logging
|
||||
import multiprocessing
|
||||
@@ -554,6 +555,88 @@ class HashEquivalenceCommonTests(object):
|
||||
# shares a taskhash with Task 2
|
||||
self.assertClientGetHash(self.client, taskhash2, unihash2)
|
||||
|
||||
|
||||
def test_client_pool_get_unihashes(self):
|
||||
TEST_INPUT = (
|
||||
# taskhash outhash unihash
|
||||
('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'),
|
||||
# Duplicated taskhash with multiple output hashes and unihashes.
|
||||
('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'),
|
||||
# Equivalent hash
|
||||
("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"),
|
||||
("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"),
|
||||
('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'),
|
||||
('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'),
|
||||
('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'),
|
||||
)
|
||||
EXTRA_QUERIES = (
|
||||
"6b6be7a84ab179b4240c4302518dc3f6",
|
||||
)
|
||||
|
||||
with ClientPool(self.server_address, 10) as client_pool:
|
||||
for taskhash, outhash, unihash in TEST_INPUT:
|
||||
self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
|
||||
|
||||
query = {idx: (self.METHOD, data[0]) for idx, data in enumerate(TEST_INPUT)}
|
||||
for idx, taskhash in enumerate(EXTRA_QUERIES):
|
||||
query[idx + len(TEST_INPUT)] = (self.METHOD, taskhash)
|
||||
|
||||
result = client_pool.get_unihashes(query)
|
||||
|
||||
self.assertDictEqual(result, {
|
||||
0: "218e57509998197d570e2c98512d0105985dffc9",
|
||||
1: "218e57509998197d570e2c98512d0105985dffc9",
|
||||
2: "218e57509998197d570e2c98512d0105985dffc9",
|
||||
3: "3b5d3d83f07f259e9086fcb422c855286e18a57d",
|
||||
4: "f46d3fbb439bd9b921095da657a4de906510d2cd",
|
||||
5: "f46d3fbb439bd9b921095da657a4de906510d2cd",
|
||||
6: "05d2a63c81e32f0a36542ca677e8ad852365c538",
|
||||
7: None,
|
||||
})
|
||||
|
||||
def test_client_pool_unihash_exists(self):
|
||||
TEST_INPUT = (
|
||||
# taskhash outhash unihash
|
||||
('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'),
|
||||
# Duplicated taskhash with multiple output hashes and unihashes.
|
||||
('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'),
|
||||
# Equivalent hash
|
||||
("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"),
|
||||
("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"),
|
||||
('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'),
|
||||
('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'),
|
||||
('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'),
|
||||
)
|
||||
EXTRA_QUERIES = (
|
||||
"6b6be7a84ab179b4240c4302518dc3f6",
|
||||
)
|
||||
|
||||
result_unihashes = set()
|
||||
|
||||
|
||||
with ClientPool(self.server_address, 10) as client_pool:
|
||||
for taskhash, outhash, unihash in TEST_INPUT:
|
||||
result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
|
||||
result_unihashes.add(result["unihash"])
|
||||
|
||||
query = {}
|
||||
expected = {}
|
||||
|
||||
for _, _, unihash in TEST_INPUT:
|
||||
idx = len(query)
|
||||
query[idx] = unihash
|
||||
expected[idx] = unihash in result_unihashes
|
||||
|
||||
|
||||
for unihash in EXTRA_QUERIES:
|
||||
idx = len(query)
|
||||
query[idx] = unihash
|
||||
expected[idx] = False
|
||||
|
||||
result = client_pool.unihashes_exist(query)
|
||||
self.assertDictEqual(result, expected)
|
||||
|
||||
|
||||
def test_auth_read_perms(self):
|
||||
admin_client = self.start_auth_server()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user