1
0
mirror of https://git.yoctoproject.org/poky synced 2026-05-09 17:39:31 +00:00

bitbake: hashserv: tests: Allow authentication for external server tests

If BB_TEST_HASHSERV_USERNAME and BB_TEST_HASHSERV_PASSWORD are provided
for a server admin user, the authentication tests for the external
hashserver will run. In addition, any users that get created will now be
deleted when the test finishes.

(Bitbake rev: 0e945d3dec02479df1157f48fd44223c2bfb34a3)

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
This commit is contained in:
Joshua Watt
2023-11-03 08:26:38 -06:00
committed by Richard Purdie
parent 407afec92a
commit 1699870a0c
+74 -35
View File
@@ -84,17 +84,13 @@ class HashEquivalenceTestSetup(object):
return self.server.address return self.server.address
def start_auth_server(self): def start_auth_server(self):
self.auth_server = self.start_server(self.server.dbpath, anon_perms=[], admin_username="admin", admin_password="password") auth_server = self.start_server(self.server.dbpath, anon_perms=[], admin_username="admin", admin_password="password")
self.admin_client = self.start_client(self.auth_server.address, username="admin", password="password") self.auth_server_address = auth_server.address
self.admin_client = self.start_client(auth_server.address, username="admin", password="password")
return self.admin_client return self.admin_client
def auth_client(self, user): def auth_client(self, user):
return self.start_client(self.auth_server.address, user["username"], user["token"]) return self.start_client(self.auth_server_address, user["username"], user["token"])
def auth_perms(self, *permissions):
self.client_index += 1
user = self.admin_client.new_user(f"user-{self.client_index}", permissions)
return self.auth_client(user)
def setUp(self): def setUp(self):
if sys.version_info < (3, 5, 0): if sys.version_info < (3, 5, 0):
@@ -120,11 +116,11 @@ class HashEquivalenceTestSetup(object):
}) })
def assertUserCanAuth(self, user): def assertUserCanAuth(self, user):
with self.start_client(self.auth_server.address) as client: with self.start_client(self.auth_server_address) as client:
client.auth(user["username"], user["token"]) client.auth(user["username"], user["token"])
def assertUserCannotAuth(self, user): def assertUserCannotAuth(self, user):
with self.start_client(self.auth_server.address) as client, self.assertRaises(InvokeError): with self.start_client(self.auth_server_address) as client, self.assertRaises(InvokeError):
client.auth(user["username"], user["token"]) client.auth(user["username"], user["token"])
def create_test_hash(self, client): def create_test_hash(self, client):
@@ -157,6 +153,26 @@ class HashEquivalenceTestSetup(object):
class HashEquivalenceCommonTests(object): class HashEquivalenceCommonTests(object):
def auth_perms(self, *permissions):
self.client_index += 1
user = self.create_user(f"user-{self.client_index}", permissions)
return self.auth_client(user)
def create_user(self, username, permissions, *, client=None):
def remove_user(username):
try:
self.admin_client.delete_user(username)
except bb.asyncrpc.InvokeError:
pass
if client is None:
client = self.admin_client
user = client.new_user(username, permissions)
self.addCleanup(remove_user, username)
return user
def test_create_hash(self): def test_create_hash(self):
return self.create_test_hash(self.client) return self.create_test_hash(self.client)
@@ -571,14 +587,14 @@ class HashEquivalenceCommonTests(object):
def test_auth_no_token_refresh_from_anon_user(self): def test_auth_no_token_refresh_from_anon_user(self):
self.start_auth_server() self.start_auth_server()
with self.start_client(self.auth_server.address) as client, self.assertRaises(InvokeError): with self.start_client(self.auth_server_address) as client, self.assertRaises(InvokeError):
client.refresh_token() client.refresh_token()
def test_auth_self_token_refresh(self): def test_auth_self_token_refresh(self):
admin_client = self.start_auth_server() admin_client = self.start_auth_server()
# Create a new user with no permissions # Create a new user with no permissions
user = admin_client.new_user("test-user", []) user = self.create_user("test-user", [])
with self.auth_client(user) as client: with self.auth_client(user) as client:
new_user = client.refresh_token() new_user = client.refresh_token()
@@ -601,7 +617,7 @@ class HashEquivalenceCommonTests(object):
def test_auth_token_refresh(self): def test_auth_token_refresh(self):
admin_client = self.start_auth_server() admin_client = self.start_auth_server()
user = admin_client.new_user("test-user", []) user = self.create_user("test-user", [])
with self.auth_perms() as client, self.assertRaises(InvokeError): with self.auth_perms() as client, self.assertRaises(InvokeError):
client.refresh_token(user["username"]) client.refresh_token(user["username"])
@@ -617,7 +633,7 @@ class HashEquivalenceCommonTests(object):
def test_auth_self_get_user(self): def test_auth_self_get_user(self):
admin_client = self.start_auth_server() admin_client = self.start_auth_server()
user = admin_client.new_user("test-user", []) user = self.create_user("test-user", [])
user_info = user.copy() user_info = user.copy()
del user_info["token"] del user_info["token"]
@@ -632,7 +648,7 @@ class HashEquivalenceCommonTests(object):
def test_auth_get_user(self): def test_auth_get_user(self):
admin_client = self.start_auth_server() admin_client = self.start_auth_server()
user = admin_client.new_user("test-user", []) user = self.create_user("test-user", [])
user_info = user.copy() user_info = user.copy()
del user_info["token"] del user_info["token"]
@@ -649,7 +665,7 @@ class HashEquivalenceCommonTests(object):
def test_auth_reconnect(self): def test_auth_reconnect(self):
admin_client = self.start_auth_server() admin_client = self.start_auth_server()
user = admin_client.new_user("test-user", []) user = self.create_user("test-user", [])
user_info = user.copy() user_info = user.copy()
del user_info["token"] del user_info["token"]
@@ -665,7 +681,7 @@ class HashEquivalenceCommonTests(object):
def test_auth_delete_user(self): def test_auth_delete_user(self):
admin_client = self.start_auth_server() admin_client = self.start_auth_server()
user = admin_client.new_user("test-user", []) user = self.create_user("test-user", [])
# No self service # No self service
with self.auth_client(user) as client, self.assertRaises(InvokeError): with self.auth_client(user) as client, self.assertRaises(InvokeError):
@@ -685,7 +701,7 @@ class HashEquivalenceCommonTests(object):
def test_auth_set_user_perms(self): def test_auth_set_user_perms(self):
admin_client = self.start_auth_server() admin_client = self.start_auth_server()
user = admin_client.new_user("test-user", []) user = self.create_user("test-user", [])
self.assertUserPerms(user, []) self.assertUserPerms(user, [])
@@ -710,7 +726,7 @@ class HashEquivalenceCommonTests(object):
def test_auth_get_all_users(self): def test_auth_get_all_users(self):
admin_client = self.start_auth_server() admin_client = self.start_auth_server()
user = admin_client.new_user("test-user", []) user = self.create_user("test-user", [])
with self.auth_client(user) as client, self.assertRaises(InvokeError): with self.auth_client(user) as client, self.assertRaises(InvokeError):
client.get_all_users() client.get_all_users()
@@ -744,10 +760,10 @@ class HashEquivalenceCommonTests(object):
permissions.sort() permissions.sort()
with self.auth_perms() as client, self.assertRaises(InvokeError): with self.auth_perms() as client, self.assertRaises(InvokeError):
client.new_user("test-user", permissions) self.create_user("test-user", permissions, client=client)
with self.auth_perms("@user-admin") as client: with self.auth_perms("@user-admin") as client:
user = client.new_user("test-user", permissions) user = self.create_user("test-user", permissions, client=client)
self.assertIn("token", user) self.assertIn("token", user)
self.assertEqual(user["username"], "test-user") self.assertEqual(user["username"], "test-user")
self.assertEqual(user["permissions"], permissions) self.assertEqual(user["permissions"], permissions)
@@ -755,7 +771,7 @@ class HashEquivalenceCommonTests(object):
def test_auth_become_user(self): def test_auth_become_user(self):
admin_client = self.start_auth_server() admin_client = self.start_auth_server()
user = admin_client.new_user("test-user", ["@read", "@report"]) user = self.create_user("test-user", ["@read", "@report"])
user_info = user.copy() user_info = user.copy()
del user_info["token"] del user_info["token"]
@@ -898,7 +914,7 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
user = admin_client.new_user("test-user", ["@read", "@report"]) user = admin_client.new_user("test-user", ["@read", "@report"])
p = self.run_hashclient([ p = self.run_hashclient([
"--address", self.auth_server.address, "--address", self.auth_server_address,
"--login", user["username"], "--login", user["username"],
"--password", user["token"], "--password", user["token"],
"refresh-token" "refresh-token"
@@ -916,7 +932,7 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
print("New token is %r" % new_token) print("New token is %r" % new_token)
self.run_hashclient([ self.run_hashclient([
"--address", self.auth_server.address, "--address", self.auth_server_address,
"--login", user["username"], "--login", user["username"],
"--password", new_token, "--password", new_token,
"get-user" "get-user"
@@ -928,7 +944,7 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
user = admin_client.new_user("test-user", ["@read"]) user = admin_client.new_user("test-user", ["@read"])
self.run_hashclient([ self.run_hashclient([
"--address", self.auth_server.address, "--address", self.auth_server_address,
"--login", admin_client.username, "--login", admin_client.username,
"--password", admin_client.password, "--password", admin_client.password,
"set-user-perms", "set-user-perms",
@@ -946,7 +962,7 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
user = admin_client.new_user("test-user", ["@read"]) user = admin_client.new_user("test-user", ["@read"])
p = self.run_hashclient([ p = self.run_hashclient([
"--address", self.auth_server.address, "--address", self.auth_server_address,
"--login", admin_client.username, "--login", admin_client.username,
"--password", admin_client.password, "--password", admin_client.password,
"get-user", "get-user",
@@ -957,7 +973,7 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
self.assertIn("Permissions:", p.stdout) self.assertIn("Permissions:", p.stdout)
p = self.run_hashclient([ p = self.run_hashclient([
"--address", self.auth_server.address, "--address", self.auth_server_address,
"--login", user["username"], "--login", user["username"],
"--password", user["token"], "--password", user["token"],
"get-user", "get-user",
@@ -973,7 +989,7 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
admin_client.new_user("test-user2", ["@read"]) admin_client.new_user("test-user2", ["@read"])
p = self.run_hashclient([ p = self.run_hashclient([
"--address", self.auth_server.address, "--address", self.auth_server_address,
"--login", admin_client.username, "--login", admin_client.username,
"--password", admin_client.password, "--password", admin_client.password,
"get-all-users", "get-all-users",
@@ -987,7 +1003,7 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
admin_client = self.start_auth_server() admin_client = self.start_auth_server()
p = self.run_hashclient([ p = self.run_hashclient([
"--address", self.auth_server.address, "--address", self.auth_server_address,
"--login", admin_client.username, "--login", admin_client.username,
"--password", admin_client.password, "--password", admin_client.password,
"new-user", "new-user",
@@ -1017,14 +1033,13 @@ class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
user = admin_client.new_user("test-user", ["@read"]) user = admin_client.new_user("test-user", ["@read"])
p = self.run_hashclient([ p = self.run_hashclient([
"--address", self.auth_server.address, "--address", self.auth_server_address,
"--login", admin_client.username, "--login", admin_client.username,
"--password", admin_client.password, "--password", admin_client.password,
"delete-user", "delete-user",
"-u", user["username"], "-u", user["username"],
], check=True) ], check=True)
self.assertIsNone(admin_client.get_user(user["username"])) self.assertIsNone(admin_client.get_user(user["username"]))
def test_get_db_usage(self): def test_get_db_usage(self):
@@ -1104,19 +1119,43 @@ class TestHashEquivalenceWebsocketsSQLAlchemyServer(TestHashEquivalenceWebsocket
class TestHashEquivalenceExternalServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase): class TestHashEquivalenceExternalServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase):
def start_test_server(self): def get_env(self, name):
if 'BB_TEST_HASHSERV' not in os.environ: v = os.environ.get(name)
self.skipTest('BB_TEST_HASHSERV not defined to test an external server') if not v:
self.skipTest(f'{name} not defined to test an external server')
return v
return os.environ['BB_TEST_HASHSERV'] def start_test_server(self):
return self.get_env('BB_TEST_HASHSERV')
def start_server(self, *args, **kwargs): def start_server(self, *args, **kwargs):
self.skipTest('Cannot start local server when testing external servers') self.skipTest('Cannot start local server when testing external servers')
def start_auth_server(self):
self.auth_server_address = self.server_address
self.admin_client = self.start_client(
self.server_address,
username=self.get_env('BB_TEST_HASHSERV_USERNAME'),
password=self.get_env('BB_TEST_HASHSERV_PASSWORD'),
)
return self.admin_client
def setUp(self): def setUp(self):
super().setUp() super().setUp()
if "BB_TEST_HASHSERV_USERNAME" in os.environ:
self.client = self.start_client(
self.server_address,
username=os.environ["BB_TEST_HASHSERV_USERNAME"],
password=os.environ["BB_TEST_HASHSERV_PASSWORD"],
)
self.client.remove({"method": self.METHOD}) self.client.remove({"method": self.METHOD})
def tearDown(self): def tearDown(self):
self.client.remove({"method": self.METHOD}) self.client.remove({"method": self.METHOD})
super().tearDown() super().tearDown()
def test_auth_get_all_users(self):
self.skipTest("Cannot test all users with external server")