From 7732c612d6190ea103236e095bf106ef2f697f95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Sun, 24 May 2026 19:46:09 +0000 Subject: [PATCH] fix(publish): reload published inside task for create/drop endpoints Affected endpoints: apiPublishRepoOrSnapshot (POST /api/publish/{prefix}), apiPublishDrop (DELETE /api/publish/{prefix}/{distribution}). Both handlers used the outer-scoped collectionFactory and collection variables inside the task closure. These were captured before the task lock was acquired, so under concurrent load each task operated on a stale DB view: apiPublishRepoOrSnapshot: snapshot/localRepo LoadComplete, NewPublishedRepo, CheckDuplicate, Publish, and collection.Add all used the pre-lock collectionFactory/collection. Two concurrent POST to same prefix could both pass CheckDuplicate (neither sees the other in the stale DB view) and race on disk writes. apiPublishDrop: collection.Remove used pre-lock collection, potentially racing with concurrent updates/other drops. Fix: inside the task closure create a fresh taskCollectionFactory and taskCollection. All DB reads (LoadComplete) and writes (CheckDuplicate, Add, Remove, Publish) now run against the authoritative DB state after the lock is held. --- api/publish.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/api/publish.go b/api/publish.go index 4e4b75f8..2c463b64 100644 --- a/api/publish.go +++ b/api/publish.go @@ -314,6 +314,9 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"", b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`)) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + taskDetail := task.PublishDetail{ Detail: detail, } @@ -325,10 +328,10 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { for _, source := range sources { switch s := source.(type) { case *deb.Snapshot: - snapshotCollection := collectionFactory.SnapshotCollection() + snapshotCollection := taskCollectionFactory.SnapshotCollection() err = snapshotCollection.LoadComplete(s) case *deb.LocalRepo: - localCollection := collectionFactory.LocalRepoCollection() + localCollection := taskCollectionFactory.LocalRepoCollection() err = localCollection.LoadComplete(s) default: err = fmt.Errorf("unexpected type for source: %T", source) @@ -338,7 +341,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { } } - published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, multiDist) + published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, taskCollectionFactory, multiDist) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) } @@ -376,18 +379,18 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { published.Version = b.Version } - duplicate := collection.CheckDuplicate(published) + duplicate := taskCollection.CheckDuplicate(published) if duplicate != nil { - _ = collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory) + _ = taskCollectionFactory.PublishedRepoCollection().LoadComplete(duplicate, taskCollectionFactory) return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath()) + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) } - err = collection.Add(published) + err = taskCollection.Add(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -615,8 +618,11 @@ func apiPublishDrop(c *gin.Context) { resources := []string{string(published.Key())} taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err := collection.Remove(context, storage, prefix, distribution, - collectionFactory, out, force, skipCleanup) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + err := taskCollection.Remove(context, storage, prefix, distribution, + taskCollectionFactory, out, force, skipCleanup) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %s", err) }