api:publish: test concurrency

This commit introduces a test which runs concurrent publishes (which
could be parallell with multiproccessing, python is fun).
The test purposly fails (at the point in history that this patch is
written) in order to make it as easy as possible to verify later patches,
which hopefully addresses concurrency problems.

The same behaviour can easily be tested outside of the system tests with
the following (or similar) shell

$ aptly serve -listen=:8080 -no-lock
$ aptly repo create create -distributions=testing local-repo
$ atply publish repo -architectures=amd64 local-repo
$ apt download aptly
$ aptly repo add local-repo ./aptly*.deb
$ for _ in $(seq 10); do curl -X PUT 127.0.0.1:8080/api/publish//testing

In the local testing perfomed (on a dual core vm) the first 1-4 jobs
would typically succeed and the rest would error out.
This commit is contained in:
Ramón N.Rodriguez
2024-04-02 00:25:00 +02:00
committed by André Roth
parent 787f954833
commit 47696a3303

View File

@@ -1,5 +1,6 @@
import inspect
import os
import threading
from api_lib import TASK_SUCCEEDED, APITest
@@ -282,6 +283,107 @@ class PublishUpdateAPITestRepo(APITest):
self.check_not_exists("public/" + prefix + "dists/")
class PublishConcurrentUpdateAPITestRepo(APITest):
"""
PUT /publish/:prefix/:distribution (local repos), DELETE /publish/:prefix/:distribution
"""
fixtureGpg = True
def check(self):
repo_name = self.random_name()
self.check_equal(self.post(
"/api/repos", json={"Name": repo_name, "DefaultDistribution": "wheezy"}).status_code, 201)
d = self.random_name()
self.check_equal(
self.upload("/api/files/" + d,
"pyspi_0.6.1-1.3.dsc",
"pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz",
"pyspi-0.6.1-1.3.stripped.dsc").status_code, 200)
self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], TASK_SUCCEEDED)
prefix = self.random_name()
resp = self.post_task(
"/api/publish/" + prefix,
json={
"Architectures": ["i386", "source"],
"SourceKind": "local",
"Sources": [{"Name": repo_name}],
"Signing": DefaultSigningOptions,
}
)
self.check_equal(resp.json()['State'], TASK_SUCCEEDED)
self.check_not_exists(
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
self.check_exists("public/" + prefix +
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
d = self.random_name()
self.check_equal(self.upload("/api/files/" + d,
"libboost-program-options-dev_1.49.0.1_i386.deb").status_code, 200)
self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], TASK_SUCCEEDED)
self.check_equal(self.delete_task("/api/repos/" + repo_name + "/packages/",
json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89']}).json()['State'], TASK_SUCCEEDED)
def _do_update(result, index):
resp = self.put_task(
"/api/publish/" + prefix + "/wheezy",
json={
"AcquireByHash": True,
"Signing": DefaultSigningOptions,
}
)
try:
self.check_equal(resp.json()['State'], TASK_SUCCEEDED)
except BaseException as e:
result[index] = e
n_workers = 10
worker_results = [None] * n_workers
tasks = [threading.Thread(target=_do_update, args=(worker_results, i,)) for i in range(n_workers)]
[task.start() for task in tasks]
[task.join() for task in tasks]
for result in worker_results:
if isinstance(result, BaseException):
raise result
repo_expected = {
'AcquireByHash': True,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': '',
'Origin': '',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SkipContents': False,
'SourceKind': 'local',
'Sources': [{'Component': 'main', 'Name': repo_name}],
'Storage': '',
'Suite': ''}
all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
self.check_exists("public/" + prefix +
"/dists/wheezy/main/binary-i386/by-hash")
self.check_exists(
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
self.check_not_exists(
"public/" + prefix + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
self.check_equal(self.delete_task("/api/publish/" + prefix + "/wheezy").json()['State'], TASK_SUCCEEDED)
self.check_not_exists("public/" + prefix + "dists/")
class PublishUpdateSkipCleanupAPITestRepo(APITest):
"""
PUT /publish/:prefix/:distribution (local repos), DELETE /publish/:prefix/:distribution