fix(publish): reload published inside task for create/drop endpoints

Affected endpoints: apiPublishRepoOrSnapshot (POST /api/publish/{prefix}),
apiPublishDrop (DELETE /api/publish/{prefix}/{distribution}).

Both handlers used the outer-scoped collectionFactory and collection
variables inside the task closure.  These were captured before the task
lock was acquired, so under concurrent load each task operated on a
stale DB view:

  apiPublishRepoOrSnapshot:  snapshot/localRepo LoadComplete,
  NewPublishedRepo, CheckDuplicate, Publish, and collection.Add all
  used the pre-lock collectionFactory/collection.  Two concurrent
  POST to same prefix could both pass CheckDuplicate (neither sees
  the other in the stale DB view) and race on disk writes.

  apiPublishDrop:  collection.Remove used pre-lock collection,
  potentially racing with concurrent updates/other drops.

Fix: inside the task closure create a fresh taskCollectionFactory and
taskCollection.  All DB reads (LoadComplete) and writes
(CheckDuplicate, Add, Remove, Publish) now run against the authoritative
DB state after the lock is held.
This commit is contained in:
André Roth
2026-05-24 19:46:09 +00:00
parent b7969c7a2d
commit 9e91ee4c4a
+15 -11
View File
@@ -298,8 +298,6 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
multiDist = *b.MultiDist multiDist = *b.MultiDist
} }
collection := collectionFactory.PublishedRepoCollection()
// Pre-register the published repo key in resources so that concurrent // Pre-register the published repo key in resources so that concurrent
// POST requests for the same prefix/distribution are serialized by the // POST requests for the same prefix/distribution are serialized by the
// task queue rather than racing on CheckDuplicate + Add. // task queue rather than racing on CheckDuplicate + Add.
@@ -314,6 +312,9 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"", taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"",
b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`)) b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`))
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
taskDetail := task.PublishDetail{ taskDetail := task.PublishDetail{
Detail: detail, Detail: detail,
} }
@@ -325,10 +326,10 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
for _, source := range sources { for _, source := range sources {
switch s := source.(type) { switch s := source.(type) {
case *deb.Snapshot: case *deb.Snapshot:
snapshotCollection := collectionFactory.SnapshotCollection() snapshotCollection := taskCollectionFactory.SnapshotCollection()
err = snapshotCollection.LoadComplete(s) err = snapshotCollection.LoadComplete(s)
case *deb.LocalRepo: case *deb.LocalRepo:
localCollection := collectionFactory.LocalRepoCollection() localCollection := taskCollectionFactory.LocalRepoCollection()
err = localCollection.LoadComplete(s) err = localCollection.LoadComplete(s)
default: default:
err = fmt.Errorf("unexpected type for source: %T", source) err = fmt.Errorf("unexpected type for source: %T", source)
@@ -338,7 +339,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
} }
} }
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, multiDist) published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, taskCollectionFactory, multiDist)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
} }
@@ -376,18 +377,18 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
published.Version = b.Version published.Version = b.Version
} }
duplicate := collection.CheckDuplicate(published) duplicate := taskCollection.CheckDuplicate(published)
if duplicate != nil { if duplicate != nil {
_ = collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory) _ = taskCollectionFactory.PublishedRepoCollection().LoadComplete(duplicate, taskCollectionFactory)
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate) return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate)
} }
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath()) err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath())
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
} }
err = collection.Add(published) err = taskCollection.Add(published)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
} }
@@ -615,8 +616,11 @@ func apiPublishDrop(c *gin.Context) {
resources := []string{string(published.Key())} resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err := collection.Remove(context, storage, prefix, distribution, taskCollectionFactory := context.NewCollectionFactory()
collectionFactory, out, force, skipCleanup) taskCollection := taskCollectionFactory.PublishedRepoCollection()
err := taskCollection.Remove(context, storage, prefix, distribution,
taskCollectionFactory, out, force, skipCleanup)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %s", err)
} }