diff --git a/api/repos.go b/api/repos.go index 1d6dfae3..05452890 100644 --- a/api/repos.go +++ b/api/repos.go @@ -131,46 +131,69 @@ func apiReposCreate(c *gin.Context) { return } - repo := deb.NewLocalRepo(b.Name, b.Comment) - repo.DefaultComponent = b.DefaultComponent - repo.DefaultDistribution = b.DefaultDistribution - + // Handler: Pre-task validations (shallow) collectionFactory := context.NewCollectionFactory() if b.FromSnapshot != "" { - var snapshot *deb.Snapshot - snapshotCollection := collectionFactory.SnapshotCollection() - snapshot, err := snapshotCollection.ByName(b.FromSnapshot) + _, err := snapshotCollection.ByName(b.FromSnapshot) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("source snapshot not found: %s", err)) return } + // Just verify it exists - don't load here + } - err = snapshotCollection.LoadComplete(snapshot) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to load source snapshot: %s", err)) - return + // Use generated key resource for repo being created + resources := []string{"LocalRepo:" + b.Name} + if b.FromSnapshot != "" { + resources = append(resources, "Snapshot:"+b.FromSnapshot) + } + + taskName := fmt.Sprintf("Create repository %s", b.Name) + + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + // Task: Create fresh collection and check/create ATOMIC inside task + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Check duplicate inside lock + if _, err := taskCollection.ByName(b.Name); err == nil { + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, + fmt.Errorf("local repo with name %s already exists", b.Name) } - repo.UpdateRefList(snapshot.RefList()) - } + // Create repo + repo := deb.NewLocalRepo(b.Name, b.Comment) + repo.DefaultComponent = b.DefaultComponent + repo.DefaultDistribution = b.DefaultDistribution - localRepoCollection := collectionFactory.LocalRepoCollection() + if b.FromSnapshot != "" { + snapshotCollection := taskCollectionFactory.SnapshotCollection() - if _, err := localRepoCollection.ByName(b.Name); err == nil { - AbortWithJSONError(c, http.StatusConflict, fmt.Errorf("local repo with name %s already exists", b.Name)) - return - } + snapshot, err := snapshotCollection.ByName(b.FromSnapshot) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, + fmt.Errorf("source snapshot not found: %s", err) + } - err := localRepoCollection.Add(repo) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, err) - return - } + err = snapshotCollection.LoadComplete(snapshot) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, + fmt.Errorf("unable to load source snapshot: %s", err) + } - c.JSON(http.StatusCreated, repo) + repo.UpdateRefList(snapshot.RefList()) + } + + err := taskCollection.Add(repo) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + return &task.ProcessReturnValue{Code: http.StatusCreated, Value: repo}, nil + }) } type reposEditParams struct { @@ -201,6 +224,8 @@ func apiReposEdit(c *gin.Context) { return } + // Load shallowly for 404 check and resource key. + // Mutation and duplicate check happen inside the task for atomicity. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() @@ -211,32 +236,47 @@ func apiReposEdit(c *gin.Context) { return } - if b.Name != nil && *b.Name != name { - _, err := collection.ByName(*b.Name) - if err == nil { - // already exists - AbortWithJSONError(c, 404, fmt.Errorf("local repo with name %q already exists", *b.Name)) - return + resources := []string{string(repo.Key())} + taskName := fmt.Sprintf("Edit repository %s", name) + + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + // Task: Create fresh collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Fresh load after lock acquired + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err } - repo.Name = *b.Name - } - if b.Comment != nil { - repo.Comment = *b.Comment - } - if b.DefaultDistribution != nil { - repo.DefaultDistribution = *b.DefaultDistribution - } - if b.DefaultComponent != nil { - repo.DefaultComponent = *b.DefaultComponent - } - err = collection.Update(repo) - if err != nil { - AbortWithJSONError(c, 500, err) - return - } + // Check and update ATOMIC (inside lock) + if b.Name != nil && *b.Name != name { + _, err := taskCollection.ByName(*b.Name) + if err == nil { + // already exists + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, + fmt.Errorf("local repo with name %q already exists", *b.Name) + } + repo.Name = *b.Name + } + if b.Comment != nil { + repo.Comment = *b.Comment + } + if b.DefaultDistribution != nil { + repo.DefaultDistribution = *b.DefaultDistribution + } + if b.DefaultComponent != nil { + repo.DefaultComponent = *b.DefaultComponent + } - c.JSON(200, repo) + err = taskCollection.Update(repo) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + return &task.ProcessReturnValue{Code: http.StatusOK, Value: repo}, nil + }) } // GET /api/repos/:name @@ -278,10 +318,10 @@ func apiReposDrop(c *gin.Context) { force := c.Request.URL.Query().Get("force") == "1" name := c.Params.ByName("name") + // Load shallowly for 404 check, resource key, and task name. + // Full checks (published/snapshots) happen inside the task. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() - snapshotCollection := collectionFactory.SnapshotCollection() - publishedCollection := collectionFactory.PublishedRepoCollection() repo, err := collection.ByName(name) if err != nil { @@ -292,19 +332,32 @@ func apiReposDrop(c *gin.Context) { resources := []string{string(repo.Key())} taskName := fmt.Sprintf("Delete repo %s", name) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - published := publishedCollection.ByLocalRepo(repo) + // Task: Create fresh collections inside task after lock acquired + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection() + + // Re-read repo with fresh collection after lock + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: %s", err) + } + + // Check with fresh collections + published := taskPublishedCollection.ByLocalRepo(repo) if len(published) > 0 { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo is published") } if !force { - snapshots := snapshotCollection.ByLocalRepoSource(repo) + snapshots := taskSnapshotCollection.ByLocalRepoSource(repo) if len(snapshots) > 0 { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override") } } - return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, collection.Drop(repo) + return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, taskCollection.Drop(repo) }) } @@ -361,6 +414,8 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li return } + // Load shallowly for 404 check and resource key. + // Full load and mutations happen inside the task. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() @@ -373,13 +428,23 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li resources := []string{string(repo.Key())} maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.LoadComplete(repo) + // Task: Create fresh factory and collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Fresh load after lock acquired + repo, err := taskCollection.ByName(c.Params.ByName("name")) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err + } + + err = taskCollection.LoadComplete(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } out.Printf("Loading packages...\n") - list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil) + list, err := deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -388,7 +453,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li for _, ref := range b.PackageRefs { var p *deb.Package - p, err = collectionFactory.PackageCollection().ByKey([]byte(ref)) + p, err = taskCollectionFactory.PackageCollection().ByKey([]byte(ref)) if err != nil { if err == database.ErrNotFound { return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("packages %s: %s", ref, err) @@ -404,7 +469,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list)) - err = collectionFactory.LocalRepoCollection().Update(repo) + err = taskCollection.Update(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) } @@ -511,6 +576,8 @@ func apiReposPackageFromDir(c *gin.Context) { return } + // Load shallowly for 404 check and resource key. + // Full load and mutations happen inside the task. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() @@ -534,7 +601,17 @@ func apiReposPackageFromDir(c *gin.Context) { resources := []string{string(repo.Key())} resources = append(resources, sources...) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.LoadComplete(repo) + // Task: Create fresh factory and collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Fresh load after lock acquired + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + err = taskCollection.LoadComplete(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -555,13 +632,13 @@ func apiReposPackageFromDir(c *gin.Context) { packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter) - list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil) + list, err = deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages: %s", err) } processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(), - collectionFactory.PackageCollection(), reporter, nil, collectionFactory.ChecksumCollection) + taskCollectionFactory.PackageCollection(), reporter, nil, taskCollectionFactory.ChecksumCollection) failedFiles = append(failedFiles, failedFiles2...) processedFiles = append(processedFiles, otherFiles...) @@ -571,7 +648,7 @@ func apiReposPackageFromDir(c *gin.Context) { repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list)) - err = collectionFactory.LocalRepoCollection().Update(repo) + err = taskCollection.Update(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) } @@ -650,6 +727,8 @@ func apiReposCopyPackage(c *gin.Context) { return } + // Load shallowly for 404 check and resource keys. + // Full load and mutations happen inside the task. collectionFactory := context.NewCollectionFactory() dstRepo, err := collectionFactory.LocalRepoCollection().ByName(dstRepoName) if err != nil { @@ -673,12 +752,26 @@ func apiReposCopyPackage(c *gin.Context) { resources := []string{string(dstRepo.Key()), string(srcRepo.Key())} maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo) + // Task: Create fresh factory and collections inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + + // Fresh load of both repos after lock acquired + dstRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(dstRepoName) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err) } - err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo) + srcRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(srcRepoName) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err) + } + + err = taskCollectionFactory.LocalRepoCollection().LoadComplete(dstRepo) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err) + } + + err = taskCollectionFactory.LocalRepoCollection().LoadComplete(srcRepo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err) } @@ -691,12 +784,12 @@ func apiReposCopyPackage(c *gin.Context) { RemovedLines: []string{}, } - dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), collectionFactory.PackageCollection(), context.Progress()) + dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), taskCollectionFactory.PackageCollection(), context.Progress()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in dest: %s", err) } - srcList, err := deb.NewPackageListFromRefList(srcRefList, collectionFactory.PackageCollection(), context.Progress()) + srcList, err := deb.NewPackageListFromRefList(srcRefList, taskCollectionFactory.PackageCollection(), context.Progress()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in src: %s", err) } @@ -764,7 +857,7 @@ func apiReposCopyPackage(c *gin.Context) { } else { dstRepo.UpdateRefList(deb.NewPackageRefListFromPackageList(dstList)) - err = collectionFactory.LocalRepoCollection().Update(dstRepo) + err = taskCollectionFactory.LocalRepoCollection().Update(dstRepo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) } @@ -867,6 +960,9 @@ func apiReposIncludePackageFromDir(c *gin.Context) { resources = append(resources, sources...) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + // Task: Create fresh factory and collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + var ( err error verifier = context.GetVerifier() @@ -882,8 +978,8 @@ func apiReposIncludePackageFromDir(c *gin.Context) { changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter) _, failedFiles2, err = deb.ImportChangesFiles( changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier, - repoTemplate, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(), - context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse) + repoTemplate, context.Progress(), taskCollectionFactory.LocalRepoCollection(), taskCollectionFactory.PackageCollection(), + context.PackagePool(), taskCollectionFactory.ChecksumCollection, nil, query.Parse) failedFiles = append(failedFiles, failedFiles2...) if err != nil {