publish: lock source repos/snapshots

Concurrent tasks were not properly locking their resources, leading to
inconsistent published indexes:

  Task A: apiPublishUpdateSwitch loads published, reads source repo/snapshot
  Request B: modifies same source repo or snapshot (add/remove packages, etc)
  Task A: Update() + Publish() reads stale/modified source -> inconsistent
           published index, or partial write if source deleted mid-task.

This changes introduces resource locking for publishing:
* SourceLocalRepo: iterate published.Sources (component -> source UUID), look up each local repo via localRepoCollection.ByUUID and append
string(repo.Key()) to resources
* SourceSnapshot: iterate b.Snapshots,look up each snapshot via snapshotCollection.ByName and append
string(snapshot.ResourceKey()) to resources.
This commit is contained in:
André Roth
2026-05-25 10:15:59 +02:00
parent 826c6a19fd
commit 2266eba019
+42 -4
View File
@@ -457,6 +457,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
localRepoCollection := collectionFactory.LocalRepoCollection()
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
@@ -464,18 +465,29 @@ func apiPublishUpdateSwitch(c *gin.Context) {
return
}
resources := []string{string(published.Key())}
if published.SourceKind == deb.SourceLocalRepo {
if len(b.Snapshots) > 0 {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo"))
return
}
} else if published.SourceKind == deb.SourceSnapshot {
for _, snapshotInfo := range b.Snapshots {
_, err2 := snapshotCollection.ByName(snapshotInfo.Name)
for _, uuid := range published.Sources {
repo, err2 := localRepoCollection.ByUUID(uuid)
if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(repo.Key()))
}
} else if published.SourceKind == deb.SourceSnapshot {
for _, snapshotInfo := range b.Snapshots {
snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name)
if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(snapshot.ResourceKey()))
}
} else {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type"))
@@ -484,7 +496,6 @@ func apiPublishUpdateSwitch(c *gin.Context) {
// Field mutations and fresh DB load are deferred to inside the task so
// they always operate on a consistent state after the lock is held.
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory()
@@ -1111,6 +1122,33 @@ func apiPublishUpdate(c *gin.Context) {
}
resources := []string{string(published.Key())}
// Lock source repos / snapshots the same way apiPublishUpdateSwitch does,
// because published.Update() reads from them and concurrent modification
// would produce an inconsistent view.
snapshotCollection := collectionFactory.SnapshotCollection()
localRepoCollection := collectionFactory.LocalRepoCollection()
if published.SourceKind == deb.SourceLocalRepo {
for _, uuid := range published.Sources {
repo, err2 := localRepoCollection.ByUUID(uuid)
if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(repo.Key()))
}
} else if published.SourceKind == deb.SourceSnapshot {
for _, uuid := range published.Sources {
snapshot, err2 := snapshotCollection.ByUUID(uuid)
if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(snapshot.ResourceKey()))
}
}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory()