From 2266eba0190b1306aac66dc64334214f3f0d1ff2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Mon, 25 May 2026 10:15:59 +0200 Subject: [PATCH] publish: lock source repos/snapshots Concurrent tasks were not properly locking their resources, leading to inconsistent published indexes: Task A: apiPublishUpdateSwitch loads published, reads source repo/snapshot Request B: modifies same source repo or snapshot (add/remove packages, etc) Task A: Update() + Publish() reads stale/modified source -> inconsistent published index, or partial write if source deleted mid-task. This changes introduces resource locking for publishing: * SourceLocalRepo: iterate published.Sources (component -> source UUID), look up each local repo via localRepoCollection.ByUUID and append string(repo.Key()) to resources * SourceSnapshot: iterate b.Snapshots,look up each snapshot via snapshotCollection.ByName and append string(snapshot.ResourceKey()) to resources. --- api/publish.go | 46 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/api/publish.go b/api/publish.go index 0454200e..86fa3f8f 100644 --- a/api/publish.go +++ b/api/publish.go @@ -457,6 +457,7 @@ func apiPublishUpdateSwitch(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() snapshotCollection := collectionFactory.SnapshotCollection() + localRepoCollection := collectionFactory.LocalRepoCollection() published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { @@ -464,18 +465,29 @@ func apiPublishUpdateSwitch(c *gin.Context) { return } + resources := []string{string(published.Key())} + if published.SourceKind == deb.SourceLocalRepo { if len(b.Snapshots) > 0 { AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo")) return } - } else if published.SourceKind == deb.SourceSnapshot { - for _, snapshotInfo := range b.Snapshots { - _, err2 := snapshotCollection.ByName(snapshotInfo.Name) + for _, uuid := range published.Sources { + repo, err2 := localRepoCollection.ByUUID(uuid) if err2 != nil { AbortWithJSONError(c, http.StatusNotFound, err2) return } + resources = append(resources, string(repo.Key())) + } + } else if published.SourceKind == deb.SourceSnapshot { + for _, snapshotInfo := range b.Snapshots { + snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(snapshot.ResourceKey())) } } else { AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type")) @@ -484,7 +496,6 @@ func apiPublishUpdateSwitch(c *gin.Context) { // Field mutations and fresh DB load are deferred to inside the task so // they always operate on a consistent state after the lock is held. - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { taskCollectionFactory := context.NewCollectionFactory() @@ -1111,6 +1122,33 @@ func apiPublishUpdate(c *gin.Context) { } resources := []string{string(published.Key())} + + // Lock source repos / snapshots the same way apiPublishUpdateSwitch does, + // because published.Update() reads from them and concurrent modification + // would produce an inconsistent view. + snapshotCollection := collectionFactory.SnapshotCollection() + localRepoCollection := collectionFactory.LocalRepoCollection() + + if published.SourceKind == deb.SourceLocalRepo { + for _, uuid := range published.Sources { + repo, err2 := localRepoCollection.ByUUID(uuid) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(repo.Key())) + } + } else if published.SourceKind == deb.SourceSnapshot { + for _, uuid := range published.Sources { + snapshot, err2 := snapshotCollection.ByUUID(uuid) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(snapshot.ResourceKey())) + } + } + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { taskCollectionFactory := context.NewCollectionFactory()