Compare commits

...

9 Commits

Author SHA1 Message Date
André Roth c77d788493 publish: lock all distributions with MultiDist 2026-05-05 11:04:36 +02:00
André Roth 5ff552d919 publish: fix locking of snapshots
snapshots come in 3 kinds: local, remote and snapshot

find resources to be locked for each kind, recursively for the snapshot kind
2026-05-05 00:24:01 +02:00
André Roth 4defa49b7f publish: lock resources from all SourceKinds 2026-05-04 20:48:05 +02:00
André Roth 6fbcbc108c more debug 2026-05-04 18:41:50 +02:00
André Roth 41f5d22637 publish: remove useless ressource assignment 2026-05-04 17:19:36 +02:00
André Roth 8179f73bf0 publish: cleanup 2026-05-04 17:19:15 +02:00
André Roth f8efb3e9b7 publish update: lock all snapshots and repos as well 2026-05-04 16:12:54 +02:00
André Roth 55b2943f44 more debug 2026-05-04 13:49:24 +02:00
André Roth 9280231c1d publish: debug locking 2026-05-04 12:49:16 +02:00
4 changed files with 303 additions and 17 deletions
+60 -10
View File
@@ -255,13 +255,13 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
if b.SourceKind == deb.SourceSnapshot { if b.SourceKind == deb.SourceSnapshot {
var snapshot *deb.Snapshot var snapshot *deb.Snapshot
snapshotCollection := collectionFactory.SnapshotCollection() tmpCollection := collectionFactory.SnapshotCollection()
for _, source := range b.Sources { for _, source := range b.Sources {
components = append(components, source.Component) components = append(components, source.Component)
names = append(names, source.Name) names = append(names, source.Name)
snapshot, err = snapshotCollection.ByName(source.Name) snapshot, err = tmpCollection.ByName(source.Name)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err))
return return
@@ -273,13 +273,13 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
} else if b.SourceKind == deb.SourceLocalRepo { } else if b.SourceKind == deb.SourceLocalRepo {
var localRepo *deb.LocalRepo var localRepo *deb.LocalRepo
localCollection := collectionFactory.LocalRepoCollection() tmpCollection := collectionFactory.LocalRepoCollection()
for _, source := range b.Sources { for _, source := range b.Sources {
components = append(components, source.Component) components = append(components, source.Component)
names = append(names, source.Name) names = append(names, source.Name)
localRepo, err = localCollection.ByName(source.Name) localRepo, err = tmpCollection.ByName(source.Name)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err))
return return
@@ -332,8 +332,6 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
} }
resources = append(resources, string(published.Key()))
if b.Origin != "" { if b.Origin != "" {
published.Origin = b.Origin published.Origin = b.Origin
} }
@@ -387,6 +385,46 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
}) })
} }
// Return resources to be locked for a Snapshot name
func getSnapshotResources(snapshotCollection *deb.SnapshotCollection, snapshotName string) (resources []string, err error) {
snapshot, err := snapshotCollection.ByName(snapshotName)
if err != nil {
return
}
resources = append(resources, string(snapshot.ResourceKey()))
for _, sourceID := range snapshot.SourceIDs {
if snapshot.SourceKind == deb.SourceSnapshot {
snapshot2, err2 := snapshotCollection.ByUUID(sourceID)
if err2 != nil {
err = err2
return
}
res, err3 := getSnapshotResources(snapshotCollection, snapshot2.Name)
if err3 != nil {
err = err3
return
}
resources = append(resources, res...)
} else if snapshot.SourceKind == deb.SourceLocalRepo {
var repo *deb.LocalRepo
repo, err = context.NewCollectionFactory().LocalRepoCollection().ByUUID(sourceID)
if err != nil {
return
}
resources = append(resources, string(repo.Key()))
} else if snapshot.SourceKind == deb.SourceRemoteRepo {
var mirror *deb.RemoteRepo
mirror, err = context.NewCollectionFactory().RemoteRepoCollection().ByUUID(sourceID)
if err != nil {
return
}
resources = append(resources, string(mirror.Key()))
}
}
return
}
type publishedRepoUpdateSwitchParams struct { type publishedRepoUpdateSwitchParams struct {
// when publishing, overwrite files in pool/ directory without notice // when publishing, overwrite files in pool/ directory without notice
ForceOverwrite bool ` json:"ForceOverwrite" example:"false"` ForceOverwrite bool ` json:"ForceOverwrite" example:"false"`
@@ -465,18 +503,31 @@ func apiPublishUpdateSwitch(c *gin.Context) {
return return
} }
resources := []string{string(published.Key())}
if published.SourceKind == deb.SourceLocalRepo { if published.SourceKind == deb.SourceLocalRepo {
if len(b.Snapshots) > 0 { if len(b.Snapshots) > 0 {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo")) AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo"))
return return
} }
} else if published.SourceKind == deb.SourceSnapshot {
for _, snapshotInfo := range b.Snapshots { localCollection := collectionFactory.LocalRepoCollection()
_, err2 := snapshotCollection.ByName(snapshotInfo.Name) for _, sourceID := range published.Sources {
localRepo, err2 := localCollection.ByUUID(sourceID)
if err2 != nil { if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2) AbortWithJSONError(c, http.StatusNotFound, err2)
return return
} }
resources = append(resources, string(localRepo.Key()))
}
} else if published.SourceKind == deb.SourceSnapshot {
for _, snapshotInfo := range b.Snapshots {
res, err2 := getSnapshotResources(snapshotCollection, snapshotInfo.Name)
if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, res...)
} }
} else { } else {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type")) AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type"))
@@ -515,7 +566,6 @@ func apiPublishUpdateSwitch(c *gin.Context) {
published.Version = *b.Version published.Version = *b.Version
} }
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.LoadComplete(published, collectionFactory) err = collection.LoadComplete(published, collectionFactory)
+5
View File
@@ -609,7 +609,12 @@ func (p *PublishedRepo) StoragePrefix() string {
// Key returns unique key identifying PublishedRepo // Key returns unique key identifying PublishedRepo
func (p *PublishedRepo) Key() []byte { func (p *PublishedRepo) Key() []byte {
if p.MultiDist {
// do not lock Distribution in MultiDist
return []byte("UM" + p.StoragePrefix())
} else {
return []byte("U" + p.StoragePrefix() + ">>" + p.Distribution) return []byte("U" + p.StoragePrefix() + ">>" + p.Distribution)
}
} }
// RefKey is a unique id for package reference list // RefKey is a unique id for package reference list
+226
View File
@@ -992,6 +992,232 @@ class PublishSwitchAPITestRepo(APITest):
self.check_not_exists("public/" + prefix + "dists/") self.check_not_exists("public/" + prefix + "dists/")
class PublishSwitchAPITestMirror(APITest):
"""
PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution
"""
fixtureGpg = True
def check(self):
mirror_name = self.random_name()
mirror_desc = {'Name': mirror_name,
'ArchiveURL': 'http://repo.aptly.info/system-tests/packagecloud.io/varnishcache/varnish30/debian/',
'Distribution': 'wheezy',
'Keyrings': ["aptlytest.gpg"],
'Architectures': ["amd64"],
'Components': ['main']}
mirror_desc['IgnoreSignatures'] = True
# Create Mirror
resp = self.post("/api/mirrors", json=mirror_desc)
self.check_equal(resp.status_code, 201)
# Get Mirror
resp = self.get("/api/mirrors/" + mirror_name + "/packages")
self.check_equal(resp.status_code, 404)
# Update Mirror
resp = self.put_task("/api/mirrors/" + mirror_name, json=mirror_desc)
self.check_task(resp)
# Snapshot Mirror
snapshot1_name = self.random_name()
task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot1_name})
self.check_task(task)
# Publish Snapshot
prefix = self.random_name()
task = self.post_task(
"/api/publish/" + prefix,
json={
"Architectures": ["i386", "source"],
"SourceKind": "snapshot",
"Sources": [{"Name": snapshot1_name}],
"Signing": DefaultSigningOptions,
})
self.check_task(task)
repo_expected = {
'AcquireByHash': False,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': '',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Origin': 'packagecloud.io/varnishcache/varnish30',
'Version': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SignedBy': '',
'SkipContents': False,
'MultiDist': False,
'SourceKind': 'snapshot',
'Sources': [{'Component': 'main', 'Name': snapshot1_name}],
'Storage': '',
'Suite': ''}
all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
# Snapshot Mirror 2
snapshot2_name = self.random_name()
task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot2_name})
self.check_task(task)
task = self.put_task(
"/api/publish/" + prefix + "/wheezy",
json={
"Snapshots": [{"Component": "main", "Name": snapshot2_name}],
"Signing": DefaultSigningOptions,
"SkipContents": True,
"Label": "fun",
"Origin": "earth",
"Version": "13.3",
})
self.check_task(task)
repo_expected = {
'AcquireByHash': False,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': 'fun',
'Origin': 'earth',
'Version': '13.3',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SignedBy': '',
'SkipContents': True,
'MultiDist': False,
'SourceKind': 'snapshot',
'Sources': [{'Component': 'main', 'Name': snapshot2_name}],
'Storage': '',
'Suite': ''}
all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
task = self.delete_task("/api/publish/" + prefix + "/wheezy")
self.check_task(task)
self.check_not_exists("public/" + prefix + "dists/")
class PublishSwitchAPITestSnapshot(APITest):
"""
publish snapshot of snapshot
"""
fixtureGpg = True
def check(self):
repo_name = self.random_name()
self.check_equal(self.post(
"/api/repos", json={"Name": repo_name, "DefaultDistribution": "wheezy"}).status_code, 201)
d = self.random_name()
self.check_equal(
self.upload("/api/files/" + d,
"pyspi_0.6.1-1.3.dsc",
"pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz",
"pyspi-0.6.1-1.3.stripped.dsc").status_code, 200)
task = self.post_task("/api/repos/" + repo_name + "/file/" + d)
self.check_task(task)
snapshot1_name = self.random_name()
task = self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot1_name})
self.check_task(task)
prefix = self.random_name()
task = self.post_task(
"/api/publish/" + prefix,
json={
"Architectures": ["i386", "source"],
"SourceKind": "snapshot",
"Sources": [{"Name": snapshot1_name}],
"Signing": DefaultSigningOptions,
})
self.check_task(task)
repo_expected = {
'AcquireByHash': False,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': '',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Origin': '',
'Version': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SignedBy': '',
'SkipContents': False,
'MultiDist': False,
'SourceKind': 'snapshot',
'Sources': [{'Component': 'main', 'Name': snapshot1_name}],
'Storage': '',
'Suite': ''}
all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
self.check_not_exists(
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
self.check_exists("public/" + prefix +
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
snapshot2_name = self.random_name()
task = self.post_task("/api/snapshots", json={"Name": snapshot2_name, 'SourceSnapshots': [snapshot1_name]})
self.check_task(task)
task = self.put_task(
"/api/publish/" + prefix + "/wheezy",
json={
"Snapshots": [{"Component": "main", "Name": snapshot2_name}],
"Signing": DefaultSigningOptions,
"SkipContents": True,
"Label": "fun",
"Origin": "earth",
"Version": "13.3",
})
self.check_task(task)
repo_expected = {
'AcquireByHash': False,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': 'fun',
'Origin': 'earth',
'Version': '13.3',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SignedBy': '',
'SkipContents': True,
'MultiDist': False,
'SourceKind': 'snapshot',
'Sources': [{'Component': 'main', 'Name': snapshot2_name}],
'Storage': '',
'Suite': ''}
all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
# FIXME: what should exist here ? publish snapshot of snapshot
self.check_not_exists(
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
self.check_not_exists("public/" + prefix +
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
task = self.delete_task("/api/publish/" + prefix + "/wheezy")
self.check_task(task)
self.check_not_exists("public/" + prefix + "dists/")
class PublishSwitchAPITestRepoSignedBy(APITest): class PublishSwitchAPITestRepoSignedBy(APITest):
""" """
PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution
+5
View File
@@ -65,6 +65,7 @@ func (list *List) consumer() {
task.State = SUCCEEDED task.State = SUCCEEDED
} }
fmt.Printf("RACE DEBUG: Task Done '%s', freeing %s\n", task.Name, task.resources)
list.usedResources.Free(task.resources) list.usedResources.Free(task.resources)
task.wgTask.Done() task.wgTask.Done()
@@ -77,6 +78,8 @@ func (list *List) consumer() {
blockingTasks := list.usedResources.UsedBy(t.resources) blockingTasks := list.usedResources.UsedBy(t.resources)
if len(blockingTasks) == 0 { if len(blockingTasks) == 0 {
list.usedResources.MarkInUse(t.resources, t) list.usedResources.MarkInUse(t.resources, t)
fmt.Printf("RACE DEBUG: Task Resuming '%s', locking %s\n", t.Name, t.resources)
// unlock list since queueing may block // unlock list since queueing may block
list.Unlock() list.Unlock()
unlocked = true unlocked = true
@@ -209,10 +212,12 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
tasks := list.usedResources.UsedBy(resources) tasks := list.usedResources.UsedBy(resources)
if len(tasks) == 0 { if len(tasks) == 0 {
list.usedResources.MarkInUse(task.resources, task) list.usedResources.MarkInUse(task.resources, task)
fmt.Printf("RACE DEBUG: Task Starting '%s', locking %s\n", name, resources)
// queueing task might block if channel not ready, unlock list before queueing // queueing task might block if channel not ready, unlock list before queueing
list.Unlock() list.Unlock()
list.queue <- task list.queue <- task
} else { } else {
fmt.Printf("RACE DEBUG: Task Queued '%s', waiting on %s\n", name, resources)
list.Unlock() list.Unlock()
} }