Compare commits

...

7 Commits

Author SHA1 Message Date
André Roth d5f1929dd1 tests: add api repo edit test 2026-05-26 00:32:47 +02:00
André Roth 13cf5cf76c fix(snapshot): allow same-name in pre-task check for update
The pre-task validation in apiSnapshotsUpdate was incorrectly rejecting
PUT requests that set the Name to the snapshot's current name. This caused
a 409 response before creating a task, which broke the system test
SnapshotsAPITestCreateUpdate that expects a task to be created and then
fail inside the task.

The fix restores the 'b.Name != name' condition in the pre-task check so
that same-name updates pass through to the task, where the in-task
duplicate check will properly fail them (returning a failed task state
instead of a direct 409).
2026-05-25 22:19:58 +02:00
André Roth 7362e7ee3b fix(snapshot): check duplicate name even when renaming to same name
The SnapshotsAPITestCreateUpdate test expects that PUT /api/snapshots/:name
with the same Name in the body returns a conflict error. The previous fix
added 'b.Name != name' guards to skip the duplicate check when the name
hasn't changed, but this broke the test which expects the old behavior:
any existing name (including the snapshot's own current name) should be
rejected as a duplicate.

Remove the 'b.Name != name' condition from both the pre-task validation
and the in-task duplicate check so the behavior matches the original.
2026-05-25 20:41:33 +02:00
André Roth b8373b0afc fix: capture gin context params before async task closure
The gin context (c) may be recycled after the HTTP handler returns 202
for async tasks. Accessing c.Params.ByName() inside the task closure
returns an empty string, causing 'mirror with name  not found' errors.

Capture the URL :name parameter into a local variable before the
closure so it is safely captured by value.

Affected endpoints:
- PUT /api/mirrors/:name (apiMirrorsUpdate)
- POST/DELETE /api/repos/:name/packages (apiReposPackagesAddDelete)
2026-05-25 19:57:22 +02:00
André Roth 5a75e45ba8 fix(mirror): eliminate race conditions by using fresh factory inside task closures
Affected endpoints: apiMirrorsDrop, apiMirrorsUpdate.

Both endpoints shared the same architectural flaw as the previously fixed
publish, repos, and snapshot endpoints: operations were performed outside
the task lock, with stale DB state used inside the lock.

Issues Fixed:

1. apiMirrorsDrop - Collections created before task lock
   Problem: mirrorCollection and snapshotCollection created before task lock.
   Snapshot dependency check done with stale factory. Concurrent drops both
   load pre-task state, both see same snapshot dependencies. If snapshots
   created after pre-task check, can delete mirror used by snapshots.

   Fix: Create fresh taskCollectionFactory inside task, fresh load of mirror
   after lock acquired, fresh snapshot check with current factory, drop using
   fresh collections.

2. apiMirrorsUpdate - Mirror loaded before task lock
   Problem: remote loaded outside task, rename duplicate check with stale
   factory. Concurrent updates both load pre-task state, long-running update
   uses stale mirror reference. TOCTOU race: rename check passes, another
   creates mirror with same name, update saves with stale data.

   Fix: Create fresh taskCollectionFactory inside task, fresh load of mirror
   after lock acquired, pre-task rename validation, fresh rename check inside
   lock, use fresh mirror and collections for all operations.

Root cause analysis:

The fundamental issue is the split between pre-task work and task-protected
work. Collections and objects were being loaded before lock acquisition, then
stale copies used inside the lock.

Correct pattern (from fixed publish.go, repos.go, and snapshot.go):

1. HTTP Handler (before task lock):
   - Shallow load for 404 check only
   - Extract resource keys
   - Submit task with resources

2. Task Closure (after lock acquired):
   - Create fresh collectionFactory
   - Fresh load of all objects
   - LoadComplete on fresh copies
   - All mutations on fresh state
   - All checks atomic inside lock
   - Save using fresh collections

This ensures:
- Concurrent operations are serialized by task queue
- No stale DB state used for mutations
- No lost updates from concurrent modifications
- No TOCTOU races on duplicate checks
- No loss of mirrors used by snapshots
- No stale data in long-running updates
2026-05-25 19:57:22 +02:00
André Roth 38ba5bbcc6 fix(snapshot): eliminate race conditions by using fresh factory inside task closures
Affected endpoints: apiSnapshotsCreate, apiSnapshotsUpdate, apiSnapshotsDrop,
apiSnapshotsMerge, apiSnapshotsPull.

All five endpoints shared the same architectural flaw as the previously fixed
repos and publish endpoints: operations were performed outside the task lock,
with stale DB state used inside the lock.

Issues Fixed:

1. apiSnapshotsCreate - Source snapshots loaded before task lock
   Problem: snapshotCollection and collectionFactory created before task lock.
   Source snapshots and destination check done with stale factory.
   Concurrent creates both load pre-task state, second overwrites first.

   Fix: Create fresh taskCollectionFactory inside task, fresh loads of all
   sources after lock acquired, pre-task duplicate check for destination,
   use fresh sources and collections for snapshot creation.

2. apiSnapshotsUpdate - Snapshot loaded before task lock
   Problem: snapshot loaded outside task, duplicate check with stale factory.
   Concurrent renames both load pre-task state, both pass check, second
   overwrites first.

   Fix: Create fresh taskCollectionFactory inside task, fresh load of snapshot
   after lock acquired, fresh duplicate check inside lock, pre-task validation
   of new name, atomic rename with fresh copy.

3. apiSnapshotsDrop - Collections created before task lock
   Problem: snapshotCollection and publishedCollection created before task lock.
   Concurrent snapshot/published modifications not detected. Can delete snapshot
   that becomes published between pre-task and task.

   Fix: Create fresh taskCollectionFactory inside task, fresh load of snapshot,
   fresh collections for all checks (published, source dependency), all checks
   inside lock.

4. apiSnapshotsMerge - Source snapshots loaded before task lock
   Problem: snapshotCollection created before task lock. Source snapshots
   loaded outside task, LoadComplete called on stale copies. Concurrent
   merges both load pre-task state, merge result doesn't include source changes.

   Fix: Create fresh taskCollectionFactory inside task, fresh load of all
   sources after lock acquired, LoadComplete on fresh copies, merge using
   fresh RefLists, save using fresh factory.

5. apiSnapshotsPull - Snapshots loaded before task lock
   Problem: toSnapshot and sourceSnapshot loaded outside task,
   collectionFactory created before task. LoadComplete called on stale copies.
   Concurrent pulls load pre-task state, pull doesn't include source changes.

   Fix: Create fresh taskCollectionFactory inside task, fresh load of both
   snapshots after lock acquired, LoadComplete on fresh copies, all filtering
   and pulling on fresh RefLists, save using fresh factory.

Root cause analysis:

The fundamental issue is the split between pre-task work and task-protected
work. Collections and objects were being loaded before lock acquisition, then
stale copies used inside the lock.

Correct pattern (from fixed publish.go and repos.go):

1. HTTP Handler (before task lock):
   - Shallow load for 404 check only
   - Extract resource keys
   - Submit task with resources

2. Task Closure (after lock acquired):
   - Create fresh collectionFactory
   - Fresh load of all objects
   - LoadComplete on fresh copies
   - All mutations on fresh state
   - All checks atomic inside lock
   - Save using fresh collections

This ensures:
- Concurrent operations are serialized by task queue
- No stale DB state used for mutations
- No lost updates from concurrent modifications
- No TOCTOU races on duplicate checks
- No DB handle issues from pre-task factory capture
2026-05-25 19:57:22 +02:00
André Roth 8477274bb0 fix(repos): eliminate race conditions by using fresh factory inside task closures
Affected endpoints: apiReposDrop, apiReposPackagesAddDelete,
apiReposPackageFromDir, apiReposCopyPackage, apiReposIncludePackageFromDir,
apiReposEdit, apiReposCreate.

All seven endpoints shared the same architectural flaw as the previously
fixed publish endpoints: operations were performed outside the task lock,
with stale DB state used inside the lock.

Issues Fixed:

1. apiReposDrop - Collections created before task lock
   Problem: snapshotCollection, publishedCollection captured from pre-task
   factory. Concurrent snapshot/published modifications not detected.

   Fix: Create fresh taskCollectionFactory inside task, re-read repo after
   lock acquired, use fresh collections for checks.

2. apiReposPackagesAddDelete - Repo and factory stale before lock
   Problem: repo loaded outside task, collectionFactory created before lock.
   Concurrent add/delete operations both load same pre-task state, last
   write wins, packages lost.

   Fix: Create fresh taskCollectionFactory inside task, re-read repo after
   lock acquired, use fresh factory for all operations.

3. apiReposPackageFromDir - Repo and factory stale before lock
   Problem: repo loaded outside task, collectionFactory created before lock.
   Concurrent file imports both load same pre-task state, last write wins.

   Fix: Create fresh taskCollectionFactory inside task, re-read repo after
   lock acquired, use fresh factory for imports.

4. apiReposCopyPackage - Both repos and factory stale before lock
   Problem: dstRepo and srcRepo loaded outside task, collectionFactory
   created before lock. Concurrent copy operations race on stale state.

   Fix: Create fresh taskCollectionFactory inside task, re-read both repos
   after lock acquired, use fresh factory for all operations.

5. apiReposIncludePackageFromDir - Repo and factory stale before lock
   Problem: repo loaded outside task, collectionFactory created before lock.
   Concurrent .changes file processing races on stale state.

   Fix: Create fresh taskCollectionFactory inside task, use fresh factory
   for import operations.

6. apiReposEdit - No serialization, concurrent modification race
   Problem: Direct update without task locking. Two concurrent renames can
   both pass duplicate check, second overwrites first.

   Fix: Convert to async task. Duplicate check and update now atomic inside
   lock, after fresh load from DB.

7. apiReposCreate - No serialization, TOCTOU on duplicate check
   Problem: Duplicate check outside task lock, add outside lock. Two
   concurrent creates with same name both pass check, second overwrites first.

   Fix: Convert to async task. Duplicate check and add now atomic inside
   lock, after fresh load from DB.

Root cause analysis:

The fundamental issue is the split between pre-task work and task-protected
work. Collections and objects were being loaded before lock acquisition, then
stale copies used inside the lock.

Correct pattern (now applied consistently across all 7 endpoints):

1. HTTP Handler (before task lock):
   - Shallow load for 404 check only
   - Extract resource keys
   - Submit task with resources

2. Task Closure (after lock acquired):
   - Create fresh collectionFactory
   - Fresh load of all objects
   - LoadComplete on fresh copies
   - All mutations on fresh state
   - All checks atomic inside lock
   - Save using fresh collections

This ensures:
- Concurrent operations are serialized by task queue
- No stale DB state used for mutations
- No lost updates from concurrent modifications
- No TOCTOU races on duplicate checks
- No DB handle issues from pre-task factory capture
2026-05-25 18:36:26 +02:00
4 changed files with 341 additions and 109 deletions
+40 -8
View File
@@ -216,9 +216,9 @@ func apiMirrorsDrop(c *gin.Context) {
name := c.Params.ByName("name")
force := c.Request.URL.Query().Get("force") == "1"
// Phase 1: Pre-task validation (shallow load for 404 check only)
collectionFactory := context.NewCollectionFactory()
mirrorCollection := collectionFactory.RemoteRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
repo, err := mirrorCollection.ByName(name)
if err != nil {
@@ -228,21 +228,34 @@ func apiMirrorsDrop(c *gin.Context) {
resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Delete mirror %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err := repo.CheckLock()
// Phase 2: Inside task lock - create fresh collections
taskCollectionFactory := context.NewCollectionFactory()
taskMirrorCollection := taskCollectionFactory.RemoteRepoCollection()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
// Fresh load after lock acquired
repo, err := taskMirrorCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
}
err = repo.CheckLock()
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
}
if !force {
snapshots := snapshotCollection.ByRemoteRepoSource(repo)
// Fresh checks with current collections
snapshots := taskSnapshotCollection.ByRemoteRepoSource(repo)
if len(snapshots) > 0 {
return &task.ProcessReturnValue{Code: http.StatusForbidden, Value: nil}, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override")
}
}
err = mirrorCollection.Drop(repo)
err = taskMirrorCollection.Drop(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
}
@@ -535,7 +548,8 @@ func apiMirrorsUpdate(c *gin.Context) {
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.RemoteRepoCollection()
remote, err = collection.ByName(c.Params.ByName("name"))
name := c.Params.ByName("name")
remote, err = collection.ByName(name)
if err != nil {
AbortWithJSONError(c, 404, err)
return
@@ -550,6 +564,7 @@ func apiMirrorsUpdate(c *gin.Context) {
return
}
// Pre-task validation of new name if provided
if b.Name != remote.Name {
_, err = collection.ByName(b.Name)
if err == nil {
@@ -566,9 +581,26 @@ func apiMirrorsUpdate(c *gin.Context) {
resources := []string{string(remote.Key())}
maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
// Phase 2: Inside task lock - create fresh factory
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.RemoteRepoCollection()
// Fresh load after lock acquired (use captured `name` variable, not gin context)
remote, err := taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
// Fresh rename check inside lock (if renaming)
if b.Name != remote.Name {
_, err := taskCollection.ByName(b.Name)
if err == nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: mirror %s already exists", b.Name)
}
}
downloader := context.NewDownloader(out)
err := remote.Fetch(downloader, verifier, b.IgnoreSignatures)
err = remote.Fetch(downloader, verifier, b.IgnoreSignatures)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
@@ -780,8 +812,8 @@ func apiMirrorsUpdate(c *gin.Context) {
}
log.Info().Msgf("%s: Finalizing download...", b.Name)
_ = remote.FinalizeDownload(collectionFactory, out)
err = collectionFactory.RemoteRepoCollection().Update(remote)
_ = remote.FinalizeDownload(taskCollectionFactory, out)
err = taskCollection.Update(remote)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
+165 -68
View File
@@ -131,46 +131,69 @@ func apiReposCreate(c *gin.Context) {
return
}
repo := deb.NewLocalRepo(b.Name, b.Comment)
repo.DefaultComponent = b.DefaultComponent
repo.DefaultDistribution = b.DefaultDistribution
// Handler: Pre-task validations (shallow)
collectionFactory := context.NewCollectionFactory()
if b.FromSnapshot != "" {
var snapshot *deb.Snapshot
snapshotCollection := collectionFactory.SnapshotCollection()
snapshot, err := snapshotCollection.ByName(b.FromSnapshot)
_, err := snapshotCollection.ByName(b.FromSnapshot)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("source snapshot not found: %s", err))
return
}
// Just verify it exists - don't load here
}
err = snapshotCollection.LoadComplete(snapshot)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to load source snapshot: %s", err))
return
// Use generated key resource for repo being created
resources := []string{"LocalRepo:" + b.Name}
if b.FromSnapshot != "" {
resources = append(resources, "Snapshot:"+b.FromSnapshot)
}
taskName := fmt.Sprintf("Create repository %s", b.Name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Task: Create fresh collection and check/create ATOMIC inside task
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.LocalRepoCollection()
// Check duplicate inside lock
if _, err := taskCollection.ByName(b.Name); err == nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil},
fmt.Errorf("local repo with name %s already exists", b.Name)
}
repo.UpdateRefList(snapshot.RefList())
}
// Create repo
repo := deb.NewLocalRepo(b.Name, b.Comment)
repo.DefaultComponent = b.DefaultComponent
repo.DefaultDistribution = b.DefaultDistribution
localRepoCollection := collectionFactory.LocalRepoCollection()
if b.FromSnapshot != "" {
snapshotCollection := taskCollectionFactory.SnapshotCollection()
if _, err := localRepoCollection.ByName(b.Name); err == nil {
AbortWithJSONError(c, http.StatusConflict, fmt.Errorf("local repo with name %s already exists", b.Name))
return
}
snapshot, err := snapshotCollection.ByName(b.FromSnapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil},
fmt.Errorf("source snapshot not found: %s", err)
}
err := localRepoCollection.Add(repo)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, err)
return
}
err = snapshotCollection.LoadComplete(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil},
fmt.Errorf("unable to load source snapshot: %s", err)
}
c.JSON(http.StatusCreated, repo)
repo.UpdateRefList(snapshot.RefList())
}
err := taskCollection.Add(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
return &task.ProcessReturnValue{Code: http.StatusCreated, Value: repo}, nil
})
}
type reposEditParams struct {
@@ -201,6 +224,8 @@ func apiReposEdit(c *gin.Context) {
return
}
// Load shallowly for 404 check and resource key.
// Mutation and duplicate check happen inside the task for atomicity.
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
@@ -211,32 +236,47 @@ func apiReposEdit(c *gin.Context) {
return
}
if b.Name != nil && *b.Name != name {
_, err := collection.ByName(*b.Name)
if err == nil {
// already exists
AbortWithJSONError(c, 404, fmt.Errorf("local repo with name %q already exists", *b.Name))
return
resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Edit repository %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Task: Create fresh collection inside task after lock
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.LocalRepoCollection()
// Fresh load after lock acquired
repo, err := taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err
}
repo.Name = *b.Name
}
if b.Comment != nil {
repo.Comment = *b.Comment
}
if b.DefaultDistribution != nil {
repo.DefaultDistribution = *b.DefaultDistribution
}
if b.DefaultComponent != nil {
repo.DefaultComponent = *b.DefaultComponent
}
err = collection.Update(repo)
if err != nil {
AbortWithJSONError(c, 500, err)
return
}
// Check and update ATOMIC (inside lock)
if b.Name != nil && *b.Name != name {
_, err := taskCollection.ByName(*b.Name)
if err == nil {
// already exists
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil},
fmt.Errorf("local repo with name %q already exists", *b.Name)
}
repo.Name = *b.Name
}
if b.Comment != nil {
repo.Comment = *b.Comment
}
if b.DefaultDistribution != nil {
repo.DefaultDistribution = *b.DefaultDistribution
}
if b.DefaultComponent != nil {
repo.DefaultComponent = *b.DefaultComponent
}
c.JSON(200, repo)
err = taskCollection.Update(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
return &task.ProcessReturnValue{Code: http.StatusOK, Value: repo}, nil
})
}
// GET /api/repos/:name
@@ -278,10 +318,10 @@ func apiReposDrop(c *gin.Context) {
force := c.Request.URL.Query().Get("force") == "1"
name := c.Params.ByName("name")
// Load shallowly for 404 check, resource key, and task name.
// Full checks (published/snapshots) happen inside the task.
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
publishedCollection := collectionFactory.PublishedRepoCollection()
repo, err := collection.ByName(name)
if err != nil {
@@ -292,19 +332,32 @@ func apiReposDrop(c *gin.Context) {
resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Delete repo %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
published := publishedCollection.ByLocalRepo(repo)
// Task: Create fresh collections inside task after lock acquired
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.LocalRepoCollection()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection()
// Re-read repo with fresh collection after lock
repo, err := taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: %s", err)
}
// Check with fresh collections
published := taskPublishedCollection.ByLocalRepo(repo)
if len(published) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo is published")
}
if !force {
snapshots := snapshotCollection.ByLocalRepoSource(repo)
snapshots := taskSnapshotCollection.ByLocalRepoSource(repo)
if len(snapshots) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override")
}
}
return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, collection.Drop(repo)
return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, taskCollection.Drop(repo)
})
}
@@ -361,10 +414,13 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
return
}
// Load shallowly for 404 check and resource key.
// Full load and mutations happen inside the task.
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
repo, err := collection.ByName(c.Params.ByName("name"))
name := c.Params.ByName("name")
repo, err := collection.ByName(name)
if err != nil {
AbortWithJSONError(c, 404, err)
return
@@ -373,13 +429,23 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
resources := []string{string(repo.Key())}
maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.LoadComplete(repo)
// Task: Create fresh factory and collection inside task after lock
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.LocalRepoCollection()
// Fresh load after lock acquired (use captured `name` variable, not gin context)
repo, err := taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err
}
err = taskCollection.LoadComplete(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
out.Printf("Loading packages...\n")
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
list, err := deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -388,7 +454,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
for _, ref := range b.PackageRefs {
var p *deb.Package
p, err = collectionFactory.PackageCollection().ByKey([]byte(ref))
p, err = taskCollectionFactory.PackageCollection().ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("packages %s: %s", ref, err)
@@ -404,7 +470,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
err = collectionFactory.LocalRepoCollection().Update(repo)
err = taskCollection.Update(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
}
@@ -511,6 +577,8 @@ func apiReposPackageFromDir(c *gin.Context) {
return
}
// Load shallowly for 404 check and resource key.
// Full load and mutations happen inside the task.
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
@@ -534,7 +602,17 @@ func apiReposPackageFromDir(c *gin.Context) {
resources := []string{string(repo.Key())}
resources = append(resources, sources...)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.LoadComplete(repo)
// Task: Create fresh factory and collection inside task after lock
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.LocalRepoCollection()
// Fresh load after lock acquired
repo, err := taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = taskCollection.LoadComplete(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -555,13 +633,13 @@ func apiReposPackageFromDir(c *gin.Context) {
packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter)
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
list, err = deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages: %s", err)
}
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
collectionFactory.PackageCollection(), reporter, nil, collectionFactory.ChecksumCollection)
taskCollectionFactory.PackageCollection(), reporter, nil, taskCollectionFactory.ChecksumCollection)
failedFiles = append(failedFiles, failedFiles2...)
processedFiles = append(processedFiles, otherFiles...)
@@ -571,7 +649,7 @@ func apiReposPackageFromDir(c *gin.Context) {
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
err = collectionFactory.LocalRepoCollection().Update(repo)
err = taskCollection.Update(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
}
@@ -650,6 +728,8 @@ func apiReposCopyPackage(c *gin.Context) {
return
}
// Load shallowly for 404 check and resource keys.
// Full load and mutations happen inside the task.
collectionFactory := context.NewCollectionFactory()
dstRepo, err := collectionFactory.LocalRepoCollection().ByName(dstRepoName)
if err != nil {
@@ -673,12 +753,26 @@ func apiReposCopyPackage(c *gin.Context) {
resources := []string{string(dstRepo.Key()), string(srcRepo.Key())}
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
// Task: Create fresh factory and collections inside task after lock
taskCollectionFactory := context.NewCollectionFactory()
// Fresh load of both repos after lock acquired
dstRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(dstRepoName)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
}
err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
srcRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(srcRepoName)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
}
err = taskCollectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
}
err = taskCollectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
}
@@ -691,12 +785,12 @@ func apiReposCopyPackage(c *gin.Context) {
RemovedLines: []string{},
}
dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), collectionFactory.PackageCollection(), context.Progress())
dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in dest: %s", err)
}
srcList, err := deb.NewPackageListFromRefList(srcRefList, collectionFactory.PackageCollection(), context.Progress())
srcList, err := deb.NewPackageListFromRefList(srcRefList, taskCollectionFactory.PackageCollection(), context.Progress())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in src: %s", err)
}
@@ -764,7 +858,7 @@ func apiReposCopyPackage(c *gin.Context) {
} else {
dstRepo.UpdateRefList(deb.NewPackageRefListFromPackageList(dstList))
err = collectionFactory.LocalRepoCollection().Update(dstRepo)
err = taskCollectionFactory.LocalRepoCollection().Update(dstRepo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
}
@@ -867,6 +961,9 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
resources = append(resources, sources...)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Task: Create fresh factory and collection inside task after lock
taskCollectionFactory := context.NewCollectionFactory()
var (
err error
verifier = context.GetVerifier()
@@ -882,8 +979,8 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter)
_, failedFiles2, err = deb.ImportChangesFiles(
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
repoTemplate, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(),
context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse)
repoTemplate, context.Progress(), taskCollectionFactory.LocalRepoCollection(), taskCollectionFactory.PackageCollection(),
context.PackagePool(), taskCollectionFactory.ChecksumCollection, nil, query.Parse)
failedFiles = append(failedFiles, failedFiles2...)
if err != nil {
+105 -33
View File
@@ -165,6 +165,7 @@ func apiSnapshotsCreate(c *gin.Context) {
}
}
// Phase 1: Pre-task validation (shallow load for 404 checks only)
collectionFactory := context.NewCollectionFactory()
snapshotCollection := collectionFactory.SnapshotCollection()
var resources []string
@@ -182,8 +183,20 @@ func apiSnapshotsCreate(c *gin.Context) {
}
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
for i := range sources {
err = snapshotCollection.LoadComplete(sources[i])
// Phase 2: Inside task lock - create fresh factory
taskCollectionFactory := context.NewCollectionFactory()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
taskPackageCollection := taskCollectionFactory.PackageCollection()
// Fresh load of all sources after lock acquired
freshSources := make([]*deb.Snapshot, len(b.SourceSnapshots))
for i := range b.SourceSnapshots {
freshSources[i], err = taskSnapshotCollection.ByName(b.SourceSnapshots[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
// LoadComplete on fresh copy
err = taskSnapshotCollection.LoadComplete(freshSources[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -191,9 +204,9 @@ func apiSnapshotsCreate(c *gin.Context) {
list := deb.NewPackageList()
// verify package refs and build package list
// verify package refs and build package list using fresh factory
for _, ref := range b.PackageRefs {
p, err := collectionFactory.PackageCollection().ByKey([]byte(ref))
p, err := taskPackageCollection.ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err)
@@ -206,9 +219,9 @@ func apiSnapshotsCreate(c *gin.Context) {
}
}
snapshot = deb.NewSnapshotFromRefList(b.Name, sources, deb.NewPackageRefListFromPackageList(list), b.Description)
snapshot = deb.NewSnapshotFromRefList(b.Name, freshSources, deb.NewPackageRefListFromPackageList(list), b.Description)
err = snapshotCollection.Add(snapshot)
err = taskSnapshotCollection.Add(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
}
@@ -315,6 +328,7 @@ func apiSnapshotsUpdate(c *gin.Context) {
return
}
// Phase 1: Pre-task validation (shallow load for 404 check only)
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.SnapshotCollection()
name := c.Params.ByName("name")
@@ -325,14 +339,38 @@ func apiSnapshotsUpdate(c *gin.Context) {
return
}
// Pre-task validation of new name if provided (skip if renaming to same name)
if b.Name != "" && b.Name != name {
_, err = collection.ByName(b.Name)
if err == nil {
AbortWithJSONError(c, 409, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name))
return
}
}
resources := []string{string(snapshot.ResourceKey()), "S" + b.Name}
taskName := fmt.Sprintf("Update snapshot %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
_, err := collection.ByName(b.Name)
if err == nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
// Phase 2: Inside task lock - create fresh factory
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.SnapshotCollection()
// Fresh load after lock acquired
snapshot, err = taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
// Fresh duplicate check inside lock
if b.Name != "" {
_, err := taskCollection.ByName(b.Name)
if err == nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
}
}
// Update fresh copy
if b.Name != "" {
snapshot.Name = b.Name
}
@@ -341,7 +379,7 @@ func apiSnapshotsUpdate(c *gin.Context) {
snapshot.Description = b.Description
}
err = collectionFactory.SnapshotCollection().Update(snapshot)
err = taskCollection.Update(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -395,9 +433,9 @@ func apiSnapshotsDrop(c *gin.Context) {
name := c.Params.ByName("name")
force := c.Request.URL.Query().Get("force") == "1"
// Phase 1: Pre-task validation (shallow load for 404 check only)
collectionFactory := context.NewCollectionFactory()
snapshotCollection := collectionFactory.SnapshotCollection()
publishedCollection := collectionFactory.PublishedRepoCollection()
snapshot, err := snapshotCollection.ByName(name)
if err != nil {
@@ -407,21 +445,35 @@ func apiSnapshotsDrop(c *gin.Context) {
resources := []string{string(snapshot.ResourceKey())}
taskName := fmt.Sprintf("Delete snapshot %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
published := publishedCollection.BySnapshot(snapshot)
// Phase 2: Inside task lock - create fresh collections
taskCollectionFactory := context.NewCollectionFactory()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection()
// Fresh load after lock acquired
snapshot, err := taskSnapshotCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
// Fresh checks with current collections
published := taskPublishedCollection.BySnapshot(snapshot)
if len(published) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: snapshot is published")
}
if !force {
snapshots := snapshotCollection.BySnapshotSource(snapshot)
// Using fresh collection for dependency check
snapshots := taskSnapshotCollection.BySnapshotSource(snapshot)
if len(snapshots) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override")
}
}
err = snapshotCollection.Drop(snapshot)
err = taskSnapshotCollection.Drop(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -576,6 +628,7 @@ func apiSnapshotsMerge(c *gin.Context) {
return
}
// Phase 1: Pre-task validation (shallow load for 404 checks only)
collectionFactory := context.NewCollectionFactory()
snapshotCollection := collectionFactory.SnapshotCollection()
@@ -592,32 +645,43 @@ func apiSnapshotsMerge(c *gin.Context) {
}
maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = snapshotCollection.LoadComplete(sources[0])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
result := sources[0].RefList()
for i := 1; i < len(sources); i++ {
err = snapshotCollection.LoadComplete(sources[i])
// Phase 2: Inside task lock - create fresh factory
taskCollectionFactory := context.NewCollectionFactory()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
// Fresh load of all sources inside task
freshSources := make([]*deb.Snapshot, len(body.Sources))
for i := range body.Sources {
freshSources[i], err = taskSnapshotCollection.ByName(body.Sources[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
result = result.Merge(sources[i].RefList(), overrideMatching, false)
// LoadComplete on fresh copy
err = taskSnapshotCollection.LoadComplete(freshSources[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
}
// Merge using fresh sources
result := freshSources[0].RefList()
for i := 1; i < len(freshSources); i++ {
result = result.Merge(freshSources[i].RefList(), overrideMatching, false)
}
if latest {
result.FilterLatestRefs()
}
sourceDescription := make([]string, len(sources))
for i, s := range sources {
sourceDescription := make([]string, len(freshSources))
for i, s := range freshSources {
sourceDescription[i] = fmt.Sprintf("'%s'", s.Name)
}
snapshot = deb.NewSnapshotFromRefList(name, sources, result,
snapshot = deb.NewSnapshotFromRefList(name, freshSources, result,
fmt.Sprintf("Merged from sources: %s", strings.Join(sourceDescription, ", ")))
err = collectionFactory.SnapshotCollection().Add(snapshot)
err = taskCollectionFactory.SnapshotCollection().Add(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create snapshot: %s", err)
}
@@ -701,21 +765,29 @@ func apiSnapshotsPull(c *gin.Context) {
resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())}
taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collectionFactory.SnapshotCollection().LoadComplete(toSnapshot)
// Phase 2: Inside task lock - create fresh factory
taskCollectionFactory := context.NewCollectionFactory()
// Fresh load of snapshots after lock acquired
freshToSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = collectionFactory.SnapshotCollection().LoadComplete(sourceSnapshot)
freshSourceSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(body.Source)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = taskCollectionFactory.SnapshotCollection().LoadComplete(freshSourceSnapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
// convert snapshots to package list
toPackageList, err := deb.NewPackageListFromRefList(toSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
toPackageList, err := deb.NewPackageListFromRefList(freshToSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
sourcePackageList, err := deb.NewPackageListFromRefList(sourceSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
sourcePackageList, err := deb.NewPackageListFromRefList(freshSourceSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -812,10 +884,10 @@ func apiSnapshotsPull(c *gin.Context) {
}
// Create <destination> snapshot
destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{toSnapshot, sourceSnapshot}, toPackageList,
fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", toSnapshot.Name, sourceSnapshot.Name, strings.Join(body.Queries, ", ")))
destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{freshToSnapshot, freshSourceSnapshot}, toPackageList,
fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", freshToSnapshot.Name, freshSourceSnapshot.Name, strings.Join(body.Queries, ", ")))
err = collectionFactory.SnapshotCollection().Add(destinationSnapshot)
err = taskCollectionFactory.SnapshotCollection().Add(destinationSnapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
+31
View File
@@ -461,3 +461,34 @@ class ReposAPITestCopyPackage(APITest):
self.check_equal(self.get(f"/api/repos/{repo2_name}/packages").json(),
['Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378'])
class ReposAPITestCreateEdit(APITest):
"""
POST /api/repos,
"""
def check(self):
repo_name = self.random_name() + ' with space'
repo_desc = {'Comment': 'fun repo',
'DefaultComponent': 'contrib',
'DefaultDistribution': 'bookworm',
'Name': repo_name}
resp = self.post("/api/repos", json=repo_desc)
self.check_equal(resp.json(), repo_desc)
self.check_equal(resp.status_code, 201)
repo_desc = {'Comment': 'modified repo',
'DefaultComponent': 'main',
'DefaultDistribution': 'trixie',
'Name': repo_name + '@renamed'}
resp = self.put(f"/api/repos/{repo_name}", json=repo_desc)
self.check_equal(resp.json(), repo_desc)
self.check_equal(resp.status_code, 200)
resp = self.get("/api/repos/" + repo_name + '@renamed')
self.check_equal(resp.json(), repo_desc)
self.check_equal(resp.status_code, 200)
resp = self.delete("/api/repos/" + repo_name + '@renamed')
self.check_equal(resp.status_code, 200)