mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-05-31 04:30:44 +00:00
fix(repos): eliminate race conditions by using fresh factory inside task closures
Affected endpoints: apiReposDrop, apiReposPackagesAddDelete, apiReposPackageFromDir, apiReposCopyPackage, apiReposIncludePackageFromDir, apiReposEdit, apiReposCreate. All seven endpoints shared the same architectural flaw as the previously fixed publish endpoints: operations were performed outside the task lock, with stale DB state used inside the lock. Issues Fixed: 1. apiReposDrop - Collections created before task lock Problem: snapshotCollection, publishedCollection captured from pre-task factory. Concurrent snapshot/published modifications not detected. Fix: Create fresh taskCollectionFactory inside task, re-read repo after lock acquired, use fresh collections for checks. 2. apiReposPackagesAddDelete - Repo and factory stale before lock Problem: repo loaded outside task, collectionFactory created before lock. Concurrent add/delete operations both load same pre-task state, last write wins, packages lost. Fix: Create fresh taskCollectionFactory inside task, re-read repo after lock acquired, use fresh factory for all operations. 3. apiReposPackageFromDir - Repo and factory stale before lock Problem: repo loaded outside task, collectionFactory created before lock. Concurrent file imports both load same pre-task state, last write wins. Fix: Create fresh taskCollectionFactory inside task, re-read repo after lock acquired, use fresh factory for imports. 4. apiReposCopyPackage - Both repos and factory stale before lock Problem: dstRepo and srcRepo loaded outside task, collectionFactory created before lock. Concurrent copy operations race on stale state. Fix: Create fresh taskCollectionFactory inside task, re-read both repos after lock acquired, use fresh factory for all operations. 5. apiReposIncludePackageFromDir - Repo and factory stale before lock Problem: repo loaded outside task, collectionFactory created before lock. Concurrent .changes file processing races on stale state. Fix: Create fresh taskCollectionFactory inside task, use fresh factory for import operations. 6. apiReposEdit - No serialization, concurrent modification race Problem: Direct update without task locking. Two concurrent renames can both pass duplicate check, second overwrites first. Fix: Convert to async task. Duplicate check and update now atomic inside lock, after fresh load from DB. 7. apiReposCreate - No serialization, TOCTOU on duplicate check Problem: Duplicate check outside task lock, add outside lock. Two concurrent creates with same name both pass check, second overwrites first. Fix: Convert to async task. Duplicate check and add now atomic inside lock, after fresh load from DB. Root cause analysis: The fundamental issue is the split between pre-task work and task-protected work. Collections and objects were being loaded before lock acquisition, then stale copies used inside the lock. Correct pattern (now applied consistently across all 7 endpoints): 1. HTTP Handler (before task lock): - Shallow load for 404 check only - Extract resource keys - Submit task with resources 2. Task Closure (after lock acquired): - Create fresh collectionFactory - Fresh load of all objects - LoadComplete on fresh copies - All mutations on fresh state - All checks atomic inside lock - Save using fresh collections This ensures: - Concurrent operations are serialized by task queue - No stale DB state used for mutations - No lost updates from concurrent modifications - No TOCTOU races on duplicate checks - No DB handle issues from pre-task factory capture
This commit is contained in:
+163
-67
@@ -131,46 +131,69 @@ func apiReposCreate(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
repo := deb.NewLocalRepo(b.Name, b.Comment)
|
// Handler: Pre-task validations (shallow)
|
||||||
repo.DefaultComponent = b.DefaultComponent
|
|
||||||
repo.DefaultDistribution = b.DefaultDistribution
|
|
||||||
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
|
|
||||||
if b.FromSnapshot != "" {
|
if b.FromSnapshot != "" {
|
||||||
var snapshot *deb.Snapshot
|
|
||||||
|
|
||||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||||
|
|
||||||
snapshot, err := snapshotCollection.ByName(b.FromSnapshot)
|
_, err := snapshotCollection.ByName(b.FromSnapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("source snapshot not found: %s", err))
|
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("source snapshot not found: %s", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// Just verify it exists - don't load here
|
||||||
|
}
|
||||||
|
|
||||||
err = snapshotCollection.LoadComplete(snapshot)
|
// Use generated key resource for repo being created
|
||||||
if err != nil {
|
resources := []string{"LocalRepo:" + b.Name}
|
||||||
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to load source snapshot: %s", err))
|
if b.FromSnapshot != "" {
|
||||||
return
|
resources = append(resources, "Snapshot:"+b.FromSnapshot)
|
||||||
|
}
|
||||||
|
|
||||||
|
taskName := fmt.Sprintf("Create repository %s", b.Name)
|
||||||
|
|
||||||
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
|
// Task: Create fresh collection and check/create ATOMIC inside task
|
||||||
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
taskCollection := taskCollectionFactory.LocalRepoCollection()
|
||||||
|
|
||||||
|
// Check duplicate inside lock
|
||||||
|
if _, err := taskCollection.ByName(b.Name); err == nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil},
|
||||||
|
fmt.Errorf("local repo with name %s already exists", b.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
repo.UpdateRefList(snapshot.RefList())
|
// Create repo
|
||||||
}
|
repo := deb.NewLocalRepo(b.Name, b.Comment)
|
||||||
|
repo.DefaultComponent = b.DefaultComponent
|
||||||
|
repo.DefaultDistribution = b.DefaultDistribution
|
||||||
|
|
||||||
localRepoCollection := collectionFactory.LocalRepoCollection()
|
if b.FromSnapshot != "" {
|
||||||
|
snapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||||
|
|
||||||
if _, err := localRepoCollection.ByName(b.Name); err == nil {
|
snapshot, err := snapshotCollection.ByName(b.FromSnapshot)
|
||||||
AbortWithJSONError(c, http.StatusConflict, fmt.Errorf("local repo with name %s already exists", b.Name))
|
if err != nil {
|
||||||
return
|
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil},
|
||||||
}
|
fmt.Errorf("source snapshot not found: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
err := localRepoCollection.Add(repo)
|
err = snapshotCollection.LoadComplete(snapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, http.StatusInternalServerError, err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil},
|
||||||
return
|
fmt.Errorf("unable to load source snapshot: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.JSON(http.StatusCreated, repo)
|
repo.UpdateRefList(snapshot.RefList())
|
||||||
|
}
|
||||||
|
|
||||||
|
err := taskCollection.Add(repo)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusCreated, Value: repo}, nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type reposEditParams struct {
|
type reposEditParams struct {
|
||||||
@@ -201,6 +224,8 @@ func apiReposEdit(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Load shallowly for 404 check and resource key.
|
||||||
|
// Mutation and duplicate check happen inside the task for atomicity.
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.LocalRepoCollection()
|
collection := collectionFactory.LocalRepoCollection()
|
||||||
|
|
||||||
@@ -211,32 +236,47 @@ func apiReposEdit(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.Name != nil && *b.Name != name {
|
resources := []string{string(repo.Key())}
|
||||||
_, err := collection.ByName(*b.Name)
|
taskName := fmt.Sprintf("Edit repository %s", name)
|
||||||
if err == nil {
|
|
||||||
// already exists
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
AbortWithJSONError(c, 404, fmt.Errorf("local repo with name %q already exists", *b.Name))
|
// Task: Create fresh collection inside task after lock
|
||||||
return
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
taskCollection := taskCollectionFactory.LocalRepoCollection()
|
||||||
|
|
||||||
|
// Fresh load after lock acquired
|
||||||
|
repo, err := taskCollection.ByName(name)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err
|
||||||
}
|
}
|
||||||
repo.Name = *b.Name
|
|
||||||
}
|
|
||||||
if b.Comment != nil {
|
|
||||||
repo.Comment = *b.Comment
|
|
||||||
}
|
|
||||||
if b.DefaultDistribution != nil {
|
|
||||||
repo.DefaultDistribution = *b.DefaultDistribution
|
|
||||||
}
|
|
||||||
if b.DefaultComponent != nil {
|
|
||||||
repo.DefaultComponent = *b.DefaultComponent
|
|
||||||
}
|
|
||||||
|
|
||||||
err = collection.Update(repo)
|
// Check and update ATOMIC (inside lock)
|
||||||
if err != nil {
|
if b.Name != nil && *b.Name != name {
|
||||||
AbortWithJSONError(c, 500, err)
|
_, err := taskCollection.ByName(*b.Name)
|
||||||
return
|
if err == nil {
|
||||||
}
|
// already exists
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil},
|
||||||
|
fmt.Errorf("local repo with name %q already exists", *b.Name)
|
||||||
|
}
|
||||||
|
repo.Name = *b.Name
|
||||||
|
}
|
||||||
|
if b.Comment != nil {
|
||||||
|
repo.Comment = *b.Comment
|
||||||
|
}
|
||||||
|
if b.DefaultDistribution != nil {
|
||||||
|
repo.DefaultDistribution = *b.DefaultDistribution
|
||||||
|
}
|
||||||
|
if b.DefaultComponent != nil {
|
||||||
|
repo.DefaultComponent = *b.DefaultComponent
|
||||||
|
}
|
||||||
|
|
||||||
c.JSON(200, repo)
|
err = taskCollection.Update(repo)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusOK, Value: repo}, nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// GET /api/repos/:name
|
// GET /api/repos/:name
|
||||||
@@ -278,10 +318,10 @@ func apiReposDrop(c *gin.Context) {
|
|||||||
force := c.Request.URL.Query().Get("force") == "1"
|
force := c.Request.URL.Query().Get("force") == "1"
|
||||||
name := c.Params.ByName("name")
|
name := c.Params.ByName("name")
|
||||||
|
|
||||||
|
// Load shallowly for 404 check, resource key, and task name.
|
||||||
|
// Full checks (published/snapshots) happen inside the task.
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.LocalRepoCollection()
|
collection := collectionFactory.LocalRepoCollection()
|
||||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
|
||||||
publishedCollection := collectionFactory.PublishedRepoCollection()
|
|
||||||
|
|
||||||
repo, err := collection.ByName(name)
|
repo, err := collection.ByName(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -292,19 +332,32 @@ func apiReposDrop(c *gin.Context) {
|
|||||||
resources := []string{string(repo.Key())}
|
resources := []string{string(repo.Key())}
|
||||||
taskName := fmt.Sprintf("Delete repo %s", name)
|
taskName := fmt.Sprintf("Delete repo %s", name)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
published := publishedCollection.ByLocalRepo(repo)
|
// Task: Create fresh collections inside task after lock acquired
|
||||||
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
taskCollection := taskCollectionFactory.LocalRepoCollection()
|
||||||
|
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||||
|
taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
|
// Re-read repo with fresh collection after lock
|
||||||
|
repo, err := taskCollection.ByName(name)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check with fresh collections
|
||||||
|
published := taskPublishedCollection.ByLocalRepo(repo)
|
||||||
if len(published) > 0 {
|
if len(published) > 0 {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo is published")
|
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo is published")
|
||||||
}
|
}
|
||||||
|
|
||||||
if !force {
|
if !force {
|
||||||
snapshots := snapshotCollection.ByLocalRepoSource(repo)
|
snapshots := taskSnapshotCollection.ByLocalRepoSource(repo)
|
||||||
if len(snapshots) > 0 {
|
if len(snapshots) > 0 {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override")
|
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, collection.Drop(repo)
|
return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, taskCollection.Drop(repo)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -361,6 +414,8 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Load shallowly for 404 check and resource key.
|
||||||
|
// Full load and mutations happen inside the task.
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.LocalRepoCollection()
|
collection := collectionFactory.LocalRepoCollection()
|
||||||
|
|
||||||
@@ -373,13 +428,23 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
|
|||||||
resources := []string{string(repo.Key())}
|
resources := []string{string(repo.Key())}
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
err = collection.LoadComplete(repo)
|
// Task: Create fresh factory and collection inside task after lock
|
||||||
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
taskCollection := taskCollectionFactory.LocalRepoCollection()
|
||||||
|
|
||||||
|
// Fresh load after lock acquired
|
||||||
|
repo, err := taskCollection.ByName(c.Params.ByName("name"))
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = taskCollection.LoadComplete(repo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
out.Printf("Loading packages...\n")
|
out.Printf("Loading packages...\n")
|
||||||
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
|
list, err := deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -388,7 +453,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
|
|||||||
for _, ref := range b.PackageRefs {
|
for _, ref := range b.PackageRefs {
|
||||||
var p *deb.Package
|
var p *deb.Package
|
||||||
|
|
||||||
p, err = collectionFactory.PackageCollection().ByKey([]byte(ref))
|
p, err = taskCollectionFactory.PackageCollection().ByKey([]byte(ref))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == database.ErrNotFound {
|
if err == database.ErrNotFound {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("packages %s: %s", ref, err)
|
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("packages %s: %s", ref, err)
|
||||||
@@ -404,7 +469,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
|
|||||||
|
|
||||||
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
|
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
|
||||||
|
|
||||||
err = collectionFactory.LocalRepoCollection().Update(repo)
|
err = taskCollection.Update(repo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
|
||||||
}
|
}
|
||||||
@@ -511,6 +576,8 @@ func apiReposPackageFromDir(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Load shallowly for 404 check and resource key.
|
||||||
|
// Full load and mutations happen inside the task.
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.LocalRepoCollection()
|
collection := collectionFactory.LocalRepoCollection()
|
||||||
|
|
||||||
@@ -534,7 +601,17 @@ func apiReposPackageFromDir(c *gin.Context) {
|
|||||||
resources := []string{string(repo.Key())}
|
resources := []string{string(repo.Key())}
|
||||||
resources = append(resources, sources...)
|
resources = append(resources, sources...)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
err = collection.LoadComplete(repo)
|
// Task: Create fresh factory and collection inside task after lock
|
||||||
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
taskCollection := taskCollectionFactory.LocalRepoCollection()
|
||||||
|
|
||||||
|
// Fresh load after lock acquired
|
||||||
|
repo, err := taskCollection.ByName(name)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = taskCollection.LoadComplete(repo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -555,13 +632,13 @@ func apiReposPackageFromDir(c *gin.Context) {
|
|||||||
|
|
||||||
packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter)
|
packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter)
|
||||||
|
|
||||||
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
|
list, err = deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
|
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
|
||||||
collectionFactory.PackageCollection(), reporter, nil, collectionFactory.ChecksumCollection)
|
taskCollectionFactory.PackageCollection(), reporter, nil, taskCollectionFactory.ChecksumCollection)
|
||||||
failedFiles = append(failedFiles, failedFiles2...)
|
failedFiles = append(failedFiles, failedFiles2...)
|
||||||
processedFiles = append(processedFiles, otherFiles...)
|
processedFiles = append(processedFiles, otherFiles...)
|
||||||
|
|
||||||
@@ -571,7 +648,7 @@ func apiReposPackageFromDir(c *gin.Context) {
|
|||||||
|
|
||||||
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
|
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
|
||||||
|
|
||||||
err = collectionFactory.LocalRepoCollection().Update(repo)
|
err = taskCollection.Update(repo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
|
||||||
}
|
}
|
||||||
@@ -650,6 +727,8 @@ func apiReposCopyPackage(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Load shallowly for 404 check and resource keys.
|
||||||
|
// Full load and mutations happen inside the task.
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
dstRepo, err := collectionFactory.LocalRepoCollection().ByName(dstRepoName)
|
dstRepo, err := collectionFactory.LocalRepoCollection().ByName(dstRepoName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -673,12 +752,26 @@ func apiReposCopyPackage(c *gin.Context) {
|
|||||||
resources := []string{string(dstRepo.Key()), string(srcRepo.Key())}
|
resources := []string{string(dstRepo.Key()), string(srcRepo.Key())}
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
|
// Task: Create fresh factory and collections inside task after lock
|
||||||
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
|
||||||
|
// Fresh load of both repos after lock acquired
|
||||||
|
dstRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(dstRepoName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
|
srcRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(srcRepoName)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = taskCollectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = taskCollectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
|
||||||
}
|
}
|
||||||
@@ -691,12 +784,12 @@ func apiReposCopyPackage(c *gin.Context) {
|
|||||||
RemovedLines: []string{},
|
RemovedLines: []string{},
|
||||||
}
|
}
|
||||||
|
|
||||||
dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), collectionFactory.PackageCollection(), context.Progress())
|
dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in dest: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in dest: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
srcList, err := deb.NewPackageListFromRefList(srcRefList, collectionFactory.PackageCollection(), context.Progress())
|
srcList, err := deb.NewPackageListFromRefList(srcRefList, taskCollectionFactory.PackageCollection(), context.Progress())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in src: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in src: %s", err)
|
||||||
}
|
}
|
||||||
@@ -764,7 +857,7 @@ func apiReposCopyPackage(c *gin.Context) {
|
|||||||
} else {
|
} else {
|
||||||
dstRepo.UpdateRefList(deb.NewPackageRefListFromPackageList(dstList))
|
dstRepo.UpdateRefList(deb.NewPackageRefListFromPackageList(dstList))
|
||||||
|
|
||||||
err = collectionFactory.LocalRepoCollection().Update(dstRepo)
|
err = taskCollectionFactory.LocalRepoCollection().Update(dstRepo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
|
||||||
}
|
}
|
||||||
@@ -867,6 +960,9 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
|
|||||||
resources = append(resources, sources...)
|
resources = append(resources, sources...)
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
|
// Task: Create fresh factory and collection inside task after lock
|
||||||
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
verifier = context.GetVerifier()
|
verifier = context.GetVerifier()
|
||||||
@@ -882,8 +978,8 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
|
|||||||
changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter)
|
changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter)
|
||||||
_, failedFiles2, err = deb.ImportChangesFiles(
|
_, failedFiles2, err = deb.ImportChangesFiles(
|
||||||
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
|
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
|
||||||
repoTemplate, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(),
|
repoTemplate, context.Progress(), taskCollectionFactory.LocalRepoCollection(), taskCollectionFactory.PackageCollection(),
|
||||||
context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse)
|
context.PackagePool(), taskCollectionFactory.ChecksumCollection, nil, query.Parse)
|
||||||
failedFiles = append(failedFiles, failedFiles2...)
|
failedFiles = append(failedFiles, failedFiles2...)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user