From 8477274bb0d7939e8c1af893ef5980cb2279b6e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Mon, 25 May 2026 12:01:49 +0000 Subject: [PATCH] fix(repos): eliminate race conditions by using fresh factory inside task closures Affected endpoints: apiReposDrop, apiReposPackagesAddDelete, apiReposPackageFromDir, apiReposCopyPackage, apiReposIncludePackageFromDir, apiReposEdit, apiReposCreate. All seven endpoints shared the same architectural flaw as the previously fixed publish endpoints: operations were performed outside the task lock, with stale DB state used inside the lock. Issues Fixed: 1. apiReposDrop - Collections created before task lock Problem: snapshotCollection, publishedCollection captured from pre-task factory. Concurrent snapshot/published modifications not detected. Fix: Create fresh taskCollectionFactory inside task, re-read repo after lock acquired, use fresh collections for checks. 2. apiReposPackagesAddDelete - Repo and factory stale before lock Problem: repo loaded outside task, collectionFactory created before lock. Concurrent add/delete operations both load same pre-task state, last write wins, packages lost. Fix: Create fresh taskCollectionFactory inside task, re-read repo after lock acquired, use fresh factory for all operations. 3. apiReposPackageFromDir - Repo and factory stale before lock Problem: repo loaded outside task, collectionFactory created before lock. Concurrent file imports both load same pre-task state, last write wins. Fix: Create fresh taskCollectionFactory inside task, re-read repo after lock acquired, use fresh factory for imports. 4. apiReposCopyPackage - Both repos and factory stale before lock Problem: dstRepo and srcRepo loaded outside task, collectionFactory created before lock. Concurrent copy operations race on stale state. Fix: Create fresh taskCollectionFactory inside task, re-read both repos after lock acquired, use fresh factory for all operations. 5. apiReposIncludePackageFromDir - Repo and factory stale before lock Problem: repo loaded outside task, collectionFactory created before lock. Concurrent .changes file processing races on stale state. Fix: Create fresh taskCollectionFactory inside task, use fresh factory for import operations. 6. apiReposEdit - No serialization, concurrent modification race Problem: Direct update without task locking. Two concurrent renames can both pass duplicate check, second overwrites first. Fix: Convert to async task. Duplicate check and update now atomic inside lock, after fresh load from DB. 7. apiReposCreate - No serialization, TOCTOU on duplicate check Problem: Duplicate check outside task lock, add outside lock. Two concurrent creates with same name both pass check, second overwrites first. Fix: Convert to async task. Duplicate check and add now atomic inside lock, after fresh load from DB. Root cause analysis: The fundamental issue is the split between pre-task work and task-protected work. Collections and objects were being loaded before lock acquisition, then stale copies used inside the lock. Correct pattern (now applied consistently across all 7 endpoints): 1. HTTP Handler (before task lock): - Shallow load for 404 check only - Extract resource keys - Submit task with resources 2. Task Closure (after lock acquired): - Create fresh collectionFactory - Fresh load of all objects - LoadComplete on fresh copies - All mutations on fresh state - All checks atomic inside lock - Save using fresh collections This ensures: - Concurrent operations are serialized by task queue - No stale DB state used for mutations - No lost updates from concurrent modifications - No TOCTOU races on duplicate checks - No DB handle issues from pre-task factory capture --- api/repos.go | 230 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 163 insertions(+), 67 deletions(-) diff --git a/api/repos.go b/api/repos.go index 1d6dfae3..05452890 100644 --- a/api/repos.go +++ b/api/repos.go @@ -131,46 +131,69 @@ func apiReposCreate(c *gin.Context) { return } - repo := deb.NewLocalRepo(b.Name, b.Comment) - repo.DefaultComponent = b.DefaultComponent - repo.DefaultDistribution = b.DefaultDistribution - + // Handler: Pre-task validations (shallow) collectionFactory := context.NewCollectionFactory() if b.FromSnapshot != "" { - var snapshot *deb.Snapshot - snapshotCollection := collectionFactory.SnapshotCollection() - snapshot, err := snapshotCollection.ByName(b.FromSnapshot) + _, err := snapshotCollection.ByName(b.FromSnapshot) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("source snapshot not found: %s", err)) return } + // Just verify it exists - don't load here + } - err = snapshotCollection.LoadComplete(snapshot) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to load source snapshot: %s", err)) - return + // Use generated key resource for repo being created + resources := []string{"LocalRepo:" + b.Name} + if b.FromSnapshot != "" { + resources = append(resources, "Snapshot:"+b.FromSnapshot) + } + + taskName := fmt.Sprintf("Create repository %s", b.Name) + + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + // Task: Create fresh collection and check/create ATOMIC inside task + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Check duplicate inside lock + if _, err := taskCollection.ByName(b.Name); err == nil { + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, + fmt.Errorf("local repo with name %s already exists", b.Name) } - repo.UpdateRefList(snapshot.RefList()) - } + // Create repo + repo := deb.NewLocalRepo(b.Name, b.Comment) + repo.DefaultComponent = b.DefaultComponent + repo.DefaultDistribution = b.DefaultDistribution - localRepoCollection := collectionFactory.LocalRepoCollection() + if b.FromSnapshot != "" { + snapshotCollection := taskCollectionFactory.SnapshotCollection() - if _, err := localRepoCollection.ByName(b.Name); err == nil { - AbortWithJSONError(c, http.StatusConflict, fmt.Errorf("local repo with name %s already exists", b.Name)) - return - } + snapshot, err := snapshotCollection.ByName(b.FromSnapshot) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, + fmt.Errorf("source snapshot not found: %s", err) + } - err := localRepoCollection.Add(repo) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, err) - return - } + err = snapshotCollection.LoadComplete(snapshot) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, + fmt.Errorf("unable to load source snapshot: %s", err) + } - c.JSON(http.StatusCreated, repo) + repo.UpdateRefList(snapshot.RefList()) + } + + err := taskCollection.Add(repo) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + return &task.ProcessReturnValue{Code: http.StatusCreated, Value: repo}, nil + }) } type reposEditParams struct { @@ -201,6 +224,8 @@ func apiReposEdit(c *gin.Context) { return } + // Load shallowly for 404 check and resource key. + // Mutation and duplicate check happen inside the task for atomicity. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() @@ -211,32 +236,47 @@ func apiReposEdit(c *gin.Context) { return } - if b.Name != nil && *b.Name != name { - _, err := collection.ByName(*b.Name) - if err == nil { - // already exists - AbortWithJSONError(c, 404, fmt.Errorf("local repo with name %q already exists", *b.Name)) - return + resources := []string{string(repo.Key())} + taskName := fmt.Sprintf("Edit repository %s", name) + + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + // Task: Create fresh collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Fresh load after lock acquired + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err } - repo.Name = *b.Name - } - if b.Comment != nil { - repo.Comment = *b.Comment - } - if b.DefaultDistribution != nil { - repo.DefaultDistribution = *b.DefaultDistribution - } - if b.DefaultComponent != nil { - repo.DefaultComponent = *b.DefaultComponent - } - err = collection.Update(repo) - if err != nil { - AbortWithJSONError(c, 500, err) - return - } + // Check and update ATOMIC (inside lock) + if b.Name != nil && *b.Name != name { + _, err := taskCollection.ByName(*b.Name) + if err == nil { + // already exists + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, + fmt.Errorf("local repo with name %q already exists", *b.Name) + } + repo.Name = *b.Name + } + if b.Comment != nil { + repo.Comment = *b.Comment + } + if b.DefaultDistribution != nil { + repo.DefaultDistribution = *b.DefaultDistribution + } + if b.DefaultComponent != nil { + repo.DefaultComponent = *b.DefaultComponent + } - c.JSON(200, repo) + err = taskCollection.Update(repo) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + return &task.ProcessReturnValue{Code: http.StatusOK, Value: repo}, nil + }) } // GET /api/repos/:name @@ -278,10 +318,10 @@ func apiReposDrop(c *gin.Context) { force := c.Request.URL.Query().Get("force") == "1" name := c.Params.ByName("name") + // Load shallowly for 404 check, resource key, and task name. + // Full checks (published/snapshots) happen inside the task. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() - snapshotCollection := collectionFactory.SnapshotCollection() - publishedCollection := collectionFactory.PublishedRepoCollection() repo, err := collection.ByName(name) if err != nil { @@ -292,19 +332,32 @@ func apiReposDrop(c *gin.Context) { resources := []string{string(repo.Key())} taskName := fmt.Sprintf("Delete repo %s", name) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - published := publishedCollection.ByLocalRepo(repo) + // Task: Create fresh collections inside task after lock acquired + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection() + + // Re-read repo with fresh collection after lock + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: %s", err) + } + + // Check with fresh collections + published := taskPublishedCollection.ByLocalRepo(repo) if len(published) > 0 { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo is published") } if !force { - snapshots := snapshotCollection.ByLocalRepoSource(repo) + snapshots := taskSnapshotCollection.ByLocalRepoSource(repo) if len(snapshots) > 0 { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override") } } - return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, collection.Drop(repo) + return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, taskCollection.Drop(repo) }) } @@ -361,6 +414,8 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li return } + // Load shallowly for 404 check and resource key. + // Full load and mutations happen inside the task. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() @@ -373,13 +428,23 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li resources := []string{string(repo.Key())} maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.LoadComplete(repo) + // Task: Create fresh factory and collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Fresh load after lock acquired + repo, err := taskCollection.ByName(c.Params.ByName("name")) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err + } + + err = taskCollection.LoadComplete(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } out.Printf("Loading packages...\n") - list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil) + list, err := deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -388,7 +453,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li for _, ref := range b.PackageRefs { var p *deb.Package - p, err = collectionFactory.PackageCollection().ByKey([]byte(ref)) + p, err = taskCollectionFactory.PackageCollection().ByKey([]byte(ref)) if err != nil { if err == database.ErrNotFound { return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("packages %s: %s", ref, err) @@ -404,7 +469,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list)) - err = collectionFactory.LocalRepoCollection().Update(repo) + err = taskCollection.Update(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) } @@ -511,6 +576,8 @@ func apiReposPackageFromDir(c *gin.Context) { return } + // Load shallowly for 404 check and resource key. + // Full load and mutations happen inside the task. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() @@ -534,7 +601,17 @@ func apiReposPackageFromDir(c *gin.Context) { resources := []string{string(repo.Key())} resources = append(resources, sources...) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.LoadComplete(repo) + // Task: Create fresh factory and collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Fresh load after lock acquired + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + err = taskCollection.LoadComplete(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -555,13 +632,13 @@ func apiReposPackageFromDir(c *gin.Context) { packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter) - list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil) + list, err = deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages: %s", err) } processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(), - collectionFactory.PackageCollection(), reporter, nil, collectionFactory.ChecksumCollection) + taskCollectionFactory.PackageCollection(), reporter, nil, taskCollectionFactory.ChecksumCollection) failedFiles = append(failedFiles, failedFiles2...) processedFiles = append(processedFiles, otherFiles...) @@ -571,7 +648,7 @@ func apiReposPackageFromDir(c *gin.Context) { repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list)) - err = collectionFactory.LocalRepoCollection().Update(repo) + err = taskCollection.Update(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) } @@ -650,6 +727,8 @@ func apiReposCopyPackage(c *gin.Context) { return } + // Load shallowly for 404 check and resource keys. + // Full load and mutations happen inside the task. collectionFactory := context.NewCollectionFactory() dstRepo, err := collectionFactory.LocalRepoCollection().ByName(dstRepoName) if err != nil { @@ -673,12 +752,26 @@ func apiReposCopyPackage(c *gin.Context) { resources := []string{string(dstRepo.Key()), string(srcRepo.Key())} maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo) + // Task: Create fresh factory and collections inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + + // Fresh load of both repos after lock acquired + dstRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(dstRepoName) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err) } - err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo) + srcRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(srcRepoName) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err) + } + + err = taskCollectionFactory.LocalRepoCollection().LoadComplete(dstRepo) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err) + } + + err = taskCollectionFactory.LocalRepoCollection().LoadComplete(srcRepo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err) } @@ -691,12 +784,12 @@ func apiReposCopyPackage(c *gin.Context) { RemovedLines: []string{}, } - dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), collectionFactory.PackageCollection(), context.Progress()) + dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), taskCollectionFactory.PackageCollection(), context.Progress()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in dest: %s", err) } - srcList, err := deb.NewPackageListFromRefList(srcRefList, collectionFactory.PackageCollection(), context.Progress()) + srcList, err := deb.NewPackageListFromRefList(srcRefList, taskCollectionFactory.PackageCollection(), context.Progress()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in src: %s", err) } @@ -764,7 +857,7 @@ func apiReposCopyPackage(c *gin.Context) { } else { dstRepo.UpdateRefList(deb.NewPackageRefListFromPackageList(dstList)) - err = collectionFactory.LocalRepoCollection().Update(dstRepo) + err = taskCollectionFactory.LocalRepoCollection().Update(dstRepo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) } @@ -867,6 +960,9 @@ func apiReposIncludePackageFromDir(c *gin.Context) { resources = append(resources, sources...) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + // Task: Create fresh factory and collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + var ( err error verifier = context.GetVerifier() @@ -882,8 +978,8 @@ func apiReposIncludePackageFromDir(c *gin.Context) { changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter) _, failedFiles2, err = deb.ImportChangesFiles( changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier, - repoTemplate, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(), - context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse) + repoTemplate, context.Progress(), taskCollectionFactory.LocalRepoCollection(), taskCollectionFactory.PackageCollection(), + context.PackagePool(), taskCollectionFactory.ChecksumCollection, nil, query.Parse) failedFiles = append(failedFiles, failedFiles2...) if err != nil {