mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-05-30 04:20:53 +00:00
fix(mirror): eliminate race conditions by using fresh factory inside task closures
Affected endpoints: apiMirrorsDrop, apiMirrorsUpdate. Both endpoints shared the same architectural flaw as the previously fixed publish, repos, and snapshot endpoints: operations were performed outside the task lock, with stale DB state used inside the lock. Issues Fixed: 1. apiMirrorsDrop - Collections created before task lock Problem: mirrorCollection and snapshotCollection created before task lock. Snapshot dependency check done with stale factory. Concurrent drops both load pre-task state, both see same snapshot dependencies. If snapshots created after pre-task check, can delete mirror used by snapshots. Fix: Create fresh taskCollectionFactory inside task, fresh load of mirror after lock acquired, fresh snapshot check with current factory, drop using fresh collections. 2. apiMirrorsUpdate - Mirror loaded before task lock Problem: remote loaded outside task, rename duplicate check with stale factory. Concurrent updates both load pre-task state, long-running update uses stale mirror reference. TOCTOU race: rename check passes, another creates mirror with same name, update saves with stale data. Fix: Create fresh taskCollectionFactory inside task, fresh load of mirror after lock acquired, pre-task rename validation, fresh rename check inside lock, use fresh mirror and collections for all operations. Root cause analysis: The fundamental issue is the split between pre-task work and task-protected work. Collections and objects were being loaded before lock acquisition, then stale copies used inside the lock. Correct pattern (from fixed publish.go, repos.go, and snapshot.go): 1. HTTP Handler (before task lock): - Shallow load for 404 check only - Extract resource keys - Submit task with resources 2. Task Closure (after lock acquired): - Create fresh collectionFactory - Fresh load of all objects - LoadComplete on fresh copies - All mutations on fresh state - All checks atomic inside lock - Save using fresh collections This ensures: - Concurrent operations are serialized by task queue - No stale DB state used for mutations - No lost updates from concurrent modifications - No TOCTOU races on duplicate checks - No loss of mirrors used by snapshots - No stale data in long-running updates
This commit is contained in:
+38
-7
@@ -216,9 +216,9 @@ func apiMirrorsDrop(c *gin.Context) {
|
||||
name := c.Params.ByName("name")
|
||||
force := c.Request.URL.Query().Get("force") == "1"
|
||||
|
||||
// Phase 1: Pre-task validation (shallow load for 404 check only)
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
mirrorCollection := collectionFactory.RemoteRepoCollection()
|
||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||
|
||||
repo, err := mirrorCollection.ByName(name)
|
||||
if err != nil {
|
||||
@@ -228,21 +228,34 @@ func apiMirrorsDrop(c *gin.Context) {
|
||||
|
||||
resources := []string{string(repo.Key())}
|
||||
taskName := fmt.Sprintf("Delete mirror %s", name)
|
||||
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
err := repo.CheckLock()
|
||||
// Phase 2: Inside task lock - create fresh collections
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskMirrorCollection := taskCollectionFactory.RemoteRepoCollection()
|
||||
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||
|
||||
// Fresh load after lock acquired
|
||||
repo, err := taskMirrorCollection.ByName(name)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
|
||||
}
|
||||
|
||||
err = repo.CheckLock()
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
|
||||
}
|
||||
|
||||
if !force {
|
||||
snapshots := snapshotCollection.ByRemoteRepoSource(repo)
|
||||
// Fresh checks with current collections
|
||||
snapshots := taskSnapshotCollection.ByRemoteRepoSource(repo)
|
||||
|
||||
if len(snapshots) > 0 {
|
||||
return &task.ProcessReturnValue{Code: http.StatusForbidden, Value: nil}, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override")
|
||||
}
|
||||
}
|
||||
|
||||
err = mirrorCollection.Drop(repo)
|
||||
err = taskMirrorCollection.Drop(repo)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
|
||||
}
|
||||
@@ -550,6 +563,7 @@ func apiMirrorsUpdate(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Pre-task validation of new name if provided
|
||||
if b.Name != remote.Name {
|
||||
_, err = collection.ByName(b.Name)
|
||||
if err == nil {
|
||||
@@ -566,9 +580,26 @@ func apiMirrorsUpdate(c *gin.Context) {
|
||||
|
||||
resources := []string{string(remote.Key())}
|
||||
maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
// Phase 2: Inside task lock - create fresh factory
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskCollection := taskCollectionFactory.RemoteRepoCollection()
|
||||
|
||||
// Fresh load after lock acquired
|
||||
remote, err := taskCollection.ByName(c.Params.ByName("name"))
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||
}
|
||||
|
||||
// Fresh rename check inside lock (if renaming)
|
||||
if b.Name != remote.Name {
|
||||
_, err := taskCollection.ByName(b.Name)
|
||||
if err == nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: mirror %s already exists", b.Name)
|
||||
}
|
||||
}
|
||||
|
||||
downloader := context.NewDownloader(out)
|
||||
err := remote.Fetch(downloader, verifier, b.IgnoreSignatures)
|
||||
err = remote.Fetch(downloader, verifier, b.IgnoreSignatures)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||
}
|
||||
@@ -780,8 +811,8 @@ func apiMirrorsUpdate(c *gin.Context) {
|
||||
}
|
||||
|
||||
log.Info().Msgf("%s: Finalizing download...", b.Name)
|
||||
_ = remote.FinalizeDownload(collectionFactory, out)
|
||||
err = collectionFactory.RemoteRepoCollection().Update(remote)
|
||||
_ = remote.FinalizeDownload(taskCollectionFactory, out)
|
||||
err = taskCollection.Update(remote)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user