fix(snapshot): eliminate race conditions by using fresh factory inside task closures

Affected endpoints: apiSnapshotsCreate, apiSnapshotsUpdate, apiSnapshotsDrop,
apiSnapshotsMerge, apiSnapshotsPull.

All five endpoints shared the same architectural flaw as the previously fixed
repos and publish endpoints: operations were performed outside the task lock,
with stale DB state used inside the lock.

Issues Fixed:

1. apiSnapshotsCreate - Source snapshots loaded before task lock
   Problem: snapshotCollection and collectionFactory created before task lock.
   Source snapshots and destination check done with stale factory.
   Concurrent creates both load pre-task state, second overwrites first.

   Fix: Create fresh taskCollectionFactory inside task, fresh loads of all
   sources after lock acquired, pre-task duplicate check for destination,
   use fresh sources and collections for snapshot creation.

2. apiSnapshotsUpdate - Snapshot loaded before task lock
   Problem: snapshot loaded outside task, duplicate check with stale factory.
   Concurrent renames both load pre-task state, both pass check, second
   overwrites first.

   Fix: Create fresh taskCollectionFactory inside task, fresh load of snapshot
   after lock acquired, fresh duplicate check inside lock, pre-task validation
   of new name, atomic rename with fresh copy.

3. apiSnapshotsDrop - Collections created before task lock
   Problem: snapshotCollection and publishedCollection created before task lock.
   Concurrent snapshot/published modifications not detected. Can delete snapshot
   that becomes published between pre-task and task.

   Fix: Create fresh taskCollectionFactory inside task, fresh load of snapshot,
   fresh collections for all checks (published, source dependency), all checks
   inside lock.

4. apiSnapshotsMerge - Source snapshots loaded before task lock
   Problem: snapshotCollection created before task lock. Source snapshots
   loaded outside task, LoadComplete called on stale copies. Concurrent
   merges both load pre-task state, merge result doesn't include source changes.

   Fix: Create fresh taskCollectionFactory inside task, fresh load of all
   sources after lock acquired, LoadComplete on fresh copies, merge using
   fresh RefLists, save using fresh factory.

5. apiSnapshotsPull - Snapshots loaded before task lock
   Problem: toSnapshot and sourceSnapshot loaded outside task,
   collectionFactory created before task. LoadComplete called on stale copies.
   Concurrent pulls load pre-task state, pull doesn't include source changes.

   Fix: Create fresh taskCollectionFactory inside task, fresh load of both
   snapshots after lock acquired, LoadComplete on fresh copies, all filtering
   and pulling on fresh RefLists, save using fresh factory.

Root cause analysis:

The fundamental issue is the split between pre-task work and task-protected
work. Collections and objects were being loaded before lock acquisition, then
stale copies used inside the lock.

Correct pattern (from fixed publish.go and repos.go):

1. HTTP Handler (before task lock):
   - Shallow load for 404 check only
   - Extract resource keys
   - Submit task with resources

2. Task Closure (after lock acquired):
   - Create fresh collectionFactory
   - Fresh load of all objects
   - LoadComplete on fresh copies
   - All mutations on fresh state
   - All checks atomic inside lock
   - Save using fresh collections

This ensures:
- Concurrent operations are serialized by task queue
- No stale DB state used for mutations
- No lost updates from concurrent modifications
- No TOCTOU races on duplicate checks
- No DB handle issues from pre-task factory capture
This commit is contained in:
André Roth
2026-05-25 12:13:41 +00:00
parent 8477274bb0
commit 38ba5bbcc6
+105 -33
View File
@@ -165,6 +165,7 @@ func apiSnapshotsCreate(c *gin.Context) {
}
}
// Phase 1: Pre-task validation (shallow load for 404 checks only)
collectionFactory := context.NewCollectionFactory()
snapshotCollection := collectionFactory.SnapshotCollection()
var resources []string
@@ -182,8 +183,20 @@ func apiSnapshotsCreate(c *gin.Context) {
}
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
for i := range sources {
err = snapshotCollection.LoadComplete(sources[i])
// Phase 2: Inside task lock - create fresh factory
taskCollectionFactory := context.NewCollectionFactory()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
taskPackageCollection := taskCollectionFactory.PackageCollection()
// Fresh load of all sources after lock acquired
freshSources := make([]*deb.Snapshot, len(b.SourceSnapshots))
for i := range b.SourceSnapshots {
freshSources[i], err = taskSnapshotCollection.ByName(b.SourceSnapshots[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
// LoadComplete on fresh copy
err = taskSnapshotCollection.LoadComplete(freshSources[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -191,9 +204,9 @@ func apiSnapshotsCreate(c *gin.Context) {
list := deb.NewPackageList()
// verify package refs and build package list
// verify package refs and build package list using fresh factory
for _, ref := range b.PackageRefs {
p, err := collectionFactory.PackageCollection().ByKey([]byte(ref))
p, err := taskPackageCollection.ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err)
@@ -206,9 +219,9 @@ func apiSnapshotsCreate(c *gin.Context) {
}
}
snapshot = deb.NewSnapshotFromRefList(b.Name, sources, deb.NewPackageRefListFromPackageList(list), b.Description)
snapshot = deb.NewSnapshotFromRefList(b.Name, freshSources, deb.NewPackageRefListFromPackageList(list), b.Description)
err = snapshotCollection.Add(snapshot)
err = taskSnapshotCollection.Add(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
}
@@ -315,6 +328,7 @@ func apiSnapshotsUpdate(c *gin.Context) {
return
}
// Phase 1: Pre-task validation (shallow load for 404 check only)
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.SnapshotCollection()
name := c.Params.ByName("name")
@@ -325,14 +339,38 @@ func apiSnapshotsUpdate(c *gin.Context) {
return
}
// Pre-task validation of new name if provided
if b.Name != "" && b.Name != name {
_, err = collection.ByName(b.Name)
if err == nil {
AbortWithJSONError(c, 409, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name))
return
}
}
resources := []string{string(snapshot.ResourceKey()), "S" + b.Name}
taskName := fmt.Sprintf("Update snapshot %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
_, err := collection.ByName(b.Name)
if err == nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
// Phase 2: Inside task lock - create fresh factory
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.SnapshotCollection()
// Fresh load after lock acquired
snapshot, err = taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
// Fresh duplicate check inside lock (if renaming)
if b.Name != "" && b.Name != name {
_, err := taskCollection.ByName(b.Name)
if err == nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
}
}
// Update fresh copy
if b.Name != "" {
snapshot.Name = b.Name
}
@@ -341,7 +379,7 @@ func apiSnapshotsUpdate(c *gin.Context) {
snapshot.Description = b.Description
}
err = collectionFactory.SnapshotCollection().Update(snapshot)
err = taskCollection.Update(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -395,9 +433,9 @@ func apiSnapshotsDrop(c *gin.Context) {
name := c.Params.ByName("name")
force := c.Request.URL.Query().Get("force") == "1"
// Phase 1: Pre-task validation (shallow load for 404 check only)
collectionFactory := context.NewCollectionFactory()
snapshotCollection := collectionFactory.SnapshotCollection()
publishedCollection := collectionFactory.PublishedRepoCollection()
snapshot, err := snapshotCollection.ByName(name)
if err != nil {
@@ -407,21 +445,35 @@ func apiSnapshotsDrop(c *gin.Context) {
resources := []string{string(snapshot.ResourceKey())}
taskName := fmt.Sprintf("Delete snapshot %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
published := publishedCollection.BySnapshot(snapshot)
// Phase 2: Inside task lock - create fresh collections
taskCollectionFactory := context.NewCollectionFactory()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection()
// Fresh load after lock acquired
snapshot, err := taskSnapshotCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
// Fresh checks with current collections
published := taskPublishedCollection.BySnapshot(snapshot)
if len(published) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: snapshot is published")
}
if !force {
snapshots := snapshotCollection.BySnapshotSource(snapshot)
// Using fresh collection for dependency check
snapshots := taskSnapshotCollection.BySnapshotSource(snapshot)
if len(snapshots) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override")
}
}
err = snapshotCollection.Drop(snapshot)
err = taskSnapshotCollection.Drop(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -576,6 +628,7 @@ func apiSnapshotsMerge(c *gin.Context) {
return
}
// Phase 1: Pre-task validation (shallow load for 404 checks only)
collectionFactory := context.NewCollectionFactory()
snapshotCollection := collectionFactory.SnapshotCollection()
@@ -592,32 +645,43 @@ func apiSnapshotsMerge(c *gin.Context) {
}
maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = snapshotCollection.LoadComplete(sources[0])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
result := sources[0].RefList()
for i := 1; i < len(sources); i++ {
err = snapshotCollection.LoadComplete(sources[i])
// Phase 2: Inside task lock - create fresh factory
taskCollectionFactory := context.NewCollectionFactory()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
// Fresh load of all sources inside task
freshSources := make([]*deb.Snapshot, len(body.Sources))
for i := range body.Sources {
freshSources[i], err = taskSnapshotCollection.ByName(body.Sources[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
result = result.Merge(sources[i].RefList(), overrideMatching, false)
// LoadComplete on fresh copy
err = taskSnapshotCollection.LoadComplete(freshSources[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
}
// Merge using fresh sources
result := freshSources[0].RefList()
for i := 1; i < len(freshSources); i++ {
result = result.Merge(freshSources[i].RefList(), overrideMatching, false)
}
if latest {
result.FilterLatestRefs()
}
sourceDescription := make([]string, len(sources))
for i, s := range sources {
sourceDescription := make([]string, len(freshSources))
for i, s := range freshSources {
sourceDescription[i] = fmt.Sprintf("'%s'", s.Name)
}
snapshot = deb.NewSnapshotFromRefList(name, sources, result,
snapshot = deb.NewSnapshotFromRefList(name, freshSources, result,
fmt.Sprintf("Merged from sources: %s", strings.Join(sourceDescription, ", ")))
err = collectionFactory.SnapshotCollection().Add(snapshot)
err = taskCollectionFactory.SnapshotCollection().Add(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create snapshot: %s", err)
}
@@ -701,21 +765,29 @@ func apiSnapshotsPull(c *gin.Context) {
resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())}
taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collectionFactory.SnapshotCollection().LoadComplete(toSnapshot)
// Phase 2: Inside task lock - create fresh factory
taskCollectionFactory := context.NewCollectionFactory()
// Fresh load of snapshots after lock acquired
freshToSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = collectionFactory.SnapshotCollection().LoadComplete(sourceSnapshot)
freshSourceSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(body.Source)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = taskCollectionFactory.SnapshotCollection().LoadComplete(freshSourceSnapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
// convert snapshots to package list
toPackageList, err := deb.NewPackageListFromRefList(toSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
toPackageList, err := deb.NewPackageListFromRefList(freshToSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
sourcePackageList, err := deb.NewPackageListFromRefList(sourceSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
sourcePackageList, err := deb.NewPackageListFromRefList(freshSourceSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -812,10 +884,10 @@ func apiSnapshotsPull(c *gin.Context) {
}
// Create <destination> snapshot
destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{toSnapshot, sourceSnapshot}, toPackageList,
fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", toSnapshot.Name, sourceSnapshot.Name, strings.Join(body.Queries, ", ")))
destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{freshToSnapshot, freshSourceSnapshot}, toPackageList,
fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", freshToSnapshot.Name, freshSourceSnapshot.Name, strings.Join(body.Queries, ", ")))
err = collectionFactory.SnapshotCollection().Add(destinationSnapshot)
err = taskCollectionFactory.SnapshotCollection().Add(destinationSnapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}