mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-05-31 04:30:44 +00:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d5f1929dd1 | |||
| 13cf5cf76c | |||
| 7362e7ee3b | |||
| b8373b0afc | |||
| 5a75e45ba8 | |||
| 38ba5bbcc6 | |||
| 8477274bb0 |
+40
-8
@@ -216,9 +216,9 @@ func apiMirrorsDrop(c *gin.Context) {
|
||||
name := c.Params.ByName("name")
|
||||
force := c.Request.URL.Query().Get("force") == "1"
|
||||
|
||||
// Phase 1: Pre-task validation (shallow load for 404 check only)
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
mirrorCollection := collectionFactory.RemoteRepoCollection()
|
||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||
|
||||
repo, err := mirrorCollection.ByName(name)
|
||||
if err != nil {
|
||||
@@ -228,21 +228,34 @@ func apiMirrorsDrop(c *gin.Context) {
|
||||
|
||||
resources := []string{string(repo.Key())}
|
||||
taskName := fmt.Sprintf("Delete mirror %s", name)
|
||||
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
err := repo.CheckLock()
|
||||
// Phase 2: Inside task lock - create fresh collections
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskMirrorCollection := taskCollectionFactory.RemoteRepoCollection()
|
||||
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||
|
||||
// Fresh load after lock acquired
|
||||
repo, err := taskMirrorCollection.ByName(name)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
|
||||
}
|
||||
|
||||
err = repo.CheckLock()
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
|
||||
}
|
||||
|
||||
if !force {
|
||||
snapshots := snapshotCollection.ByRemoteRepoSource(repo)
|
||||
// Fresh checks with current collections
|
||||
snapshots := taskSnapshotCollection.ByRemoteRepoSource(repo)
|
||||
|
||||
if len(snapshots) > 0 {
|
||||
return &task.ProcessReturnValue{Code: http.StatusForbidden, Value: nil}, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override")
|
||||
}
|
||||
}
|
||||
|
||||
err = mirrorCollection.Drop(repo)
|
||||
err = taskMirrorCollection.Drop(repo)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
|
||||
}
|
||||
@@ -535,7 +548,8 @@ func apiMirrorsUpdate(c *gin.Context) {
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
collection := collectionFactory.RemoteRepoCollection()
|
||||
|
||||
remote, err = collection.ByName(c.Params.ByName("name"))
|
||||
name := c.Params.ByName("name")
|
||||
remote, err = collection.ByName(name)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, 404, err)
|
||||
return
|
||||
@@ -550,6 +564,7 @@ func apiMirrorsUpdate(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Pre-task validation of new name if provided
|
||||
if b.Name != remote.Name {
|
||||
_, err = collection.ByName(b.Name)
|
||||
if err == nil {
|
||||
@@ -566,9 +581,26 @@ func apiMirrorsUpdate(c *gin.Context) {
|
||||
|
||||
resources := []string{string(remote.Key())}
|
||||
maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
// Phase 2: Inside task lock - create fresh factory
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskCollection := taskCollectionFactory.RemoteRepoCollection()
|
||||
|
||||
// Fresh load after lock acquired (use captured `name` variable, not gin context)
|
||||
remote, err := taskCollection.ByName(name)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||
}
|
||||
|
||||
// Fresh rename check inside lock (if renaming)
|
||||
if b.Name != remote.Name {
|
||||
_, err := taskCollection.ByName(b.Name)
|
||||
if err == nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: mirror %s already exists", b.Name)
|
||||
}
|
||||
}
|
||||
|
||||
downloader := context.NewDownloader(out)
|
||||
err := remote.Fetch(downloader, verifier, b.IgnoreSignatures)
|
||||
err = remote.Fetch(downloader, verifier, b.IgnoreSignatures)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||
}
|
||||
@@ -780,8 +812,8 @@ func apiMirrorsUpdate(c *gin.Context) {
|
||||
}
|
||||
|
||||
log.Info().Msgf("%s: Finalizing download...", b.Name)
|
||||
_ = remote.FinalizeDownload(collectionFactory, out)
|
||||
err = collectionFactory.RemoteRepoCollection().Update(remote)
|
||||
_ = remote.FinalizeDownload(taskCollectionFactory, out)
|
||||
err = taskCollection.Update(remote)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||
}
|
||||
|
||||
+165
-68
@@ -131,46 +131,69 @@ func apiReposCreate(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
repo := deb.NewLocalRepo(b.Name, b.Comment)
|
||||
repo.DefaultComponent = b.DefaultComponent
|
||||
repo.DefaultDistribution = b.DefaultDistribution
|
||||
|
||||
// Handler: Pre-task validations (shallow)
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
|
||||
if b.FromSnapshot != "" {
|
||||
var snapshot *deb.Snapshot
|
||||
|
||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||
|
||||
snapshot, err := snapshotCollection.ByName(b.FromSnapshot)
|
||||
_, err := snapshotCollection.ByName(b.FromSnapshot)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("source snapshot not found: %s", err))
|
||||
return
|
||||
}
|
||||
// Just verify it exists - don't load here
|
||||
}
|
||||
|
||||
err = snapshotCollection.LoadComplete(snapshot)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to load source snapshot: %s", err))
|
||||
return
|
||||
// Use generated key resource for repo being created
|
||||
resources := []string{"LocalRepo:" + b.Name}
|
||||
if b.FromSnapshot != "" {
|
||||
resources = append(resources, "Snapshot:"+b.FromSnapshot)
|
||||
}
|
||||
|
||||
taskName := fmt.Sprintf("Create repository %s", b.Name)
|
||||
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
// Task: Create fresh collection and check/create ATOMIC inside task
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskCollection := taskCollectionFactory.LocalRepoCollection()
|
||||
|
||||
// Check duplicate inside lock
|
||||
if _, err := taskCollection.ByName(b.Name); err == nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil},
|
||||
fmt.Errorf("local repo with name %s already exists", b.Name)
|
||||
}
|
||||
|
||||
repo.UpdateRefList(snapshot.RefList())
|
||||
}
|
||||
// Create repo
|
||||
repo := deb.NewLocalRepo(b.Name, b.Comment)
|
||||
repo.DefaultComponent = b.DefaultComponent
|
||||
repo.DefaultDistribution = b.DefaultDistribution
|
||||
|
||||
localRepoCollection := collectionFactory.LocalRepoCollection()
|
||||
if b.FromSnapshot != "" {
|
||||
snapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||
|
||||
if _, err := localRepoCollection.ByName(b.Name); err == nil {
|
||||
AbortWithJSONError(c, http.StatusConflict, fmt.Errorf("local repo with name %s already exists", b.Name))
|
||||
return
|
||||
}
|
||||
snapshot, err := snapshotCollection.ByName(b.FromSnapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil},
|
||||
fmt.Errorf("source snapshot not found: %s", err)
|
||||
}
|
||||
|
||||
err := localRepoCollection.Add(repo)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
err = snapshotCollection.LoadComplete(snapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil},
|
||||
fmt.Errorf("unable to load source snapshot: %s", err)
|
||||
}
|
||||
|
||||
c.JSON(http.StatusCreated, repo)
|
||||
repo.UpdateRefList(snapshot.RefList())
|
||||
}
|
||||
|
||||
err := taskCollection.Add(repo)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
|
||||
return &task.ProcessReturnValue{Code: http.StatusCreated, Value: repo}, nil
|
||||
})
|
||||
}
|
||||
|
||||
type reposEditParams struct {
|
||||
@@ -201,6 +224,8 @@ func apiReposEdit(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Load shallowly for 404 check and resource key.
|
||||
// Mutation and duplicate check happen inside the task for atomicity.
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
collection := collectionFactory.LocalRepoCollection()
|
||||
|
||||
@@ -211,32 +236,47 @@ func apiReposEdit(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
if b.Name != nil && *b.Name != name {
|
||||
_, err := collection.ByName(*b.Name)
|
||||
if err == nil {
|
||||
// already exists
|
||||
AbortWithJSONError(c, 404, fmt.Errorf("local repo with name %q already exists", *b.Name))
|
||||
return
|
||||
resources := []string{string(repo.Key())}
|
||||
taskName := fmt.Sprintf("Edit repository %s", name)
|
||||
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
// Task: Create fresh collection inside task after lock
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskCollection := taskCollectionFactory.LocalRepoCollection()
|
||||
|
||||
// Fresh load after lock acquired
|
||||
repo, err := taskCollection.ByName(name)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err
|
||||
}
|
||||
repo.Name = *b.Name
|
||||
}
|
||||
if b.Comment != nil {
|
||||
repo.Comment = *b.Comment
|
||||
}
|
||||
if b.DefaultDistribution != nil {
|
||||
repo.DefaultDistribution = *b.DefaultDistribution
|
||||
}
|
||||
if b.DefaultComponent != nil {
|
||||
repo.DefaultComponent = *b.DefaultComponent
|
||||
}
|
||||
|
||||
err = collection.Update(repo)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, 500, err)
|
||||
return
|
||||
}
|
||||
// Check and update ATOMIC (inside lock)
|
||||
if b.Name != nil && *b.Name != name {
|
||||
_, err := taskCollection.ByName(*b.Name)
|
||||
if err == nil {
|
||||
// already exists
|
||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil},
|
||||
fmt.Errorf("local repo with name %q already exists", *b.Name)
|
||||
}
|
||||
repo.Name = *b.Name
|
||||
}
|
||||
if b.Comment != nil {
|
||||
repo.Comment = *b.Comment
|
||||
}
|
||||
if b.DefaultDistribution != nil {
|
||||
repo.DefaultDistribution = *b.DefaultDistribution
|
||||
}
|
||||
if b.DefaultComponent != nil {
|
||||
repo.DefaultComponent = *b.DefaultComponent
|
||||
}
|
||||
|
||||
c.JSON(200, repo)
|
||||
err = taskCollection.Update(repo)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
|
||||
return &task.ProcessReturnValue{Code: http.StatusOK, Value: repo}, nil
|
||||
})
|
||||
}
|
||||
|
||||
// GET /api/repos/:name
|
||||
@@ -278,10 +318,10 @@ func apiReposDrop(c *gin.Context) {
|
||||
force := c.Request.URL.Query().Get("force") == "1"
|
||||
name := c.Params.ByName("name")
|
||||
|
||||
// Load shallowly for 404 check, resource key, and task name.
|
||||
// Full checks (published/snapshots) happen inside the task.
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
collection := collectionFactory.LocalRepoCollection()
|
||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||
publishedCollection := collectionFactory.PublishedRepoCollection()
|
||||
|
||||
repo, err := collection.ByName(name)
|
||||
if err != nil {
|
||||
@@ -292,19 +332,32 @@ func apiReposDrop(c *gin.Context) {
|
||||
resources := []string{string(repo.Key())}
|
||||
taskName := fmt.Sprintf("Delete repo %s", name)
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
published := publishedCollection.ByLocalRepo(repo)
|
||||
// Task: Create fresh collections inside task after lock acquired
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskCollection := taskCollectionFactory.LocalRepoCollection()
|
||||
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||
taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection()
|
||||
|
||||
// Re-read repo with fresh collection after lock
|
||||
repo, err := taskCollection.ByName(name)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: %s", err)
|
||||
}
|
||||
|
||||
// Check with fresh collections
|
||||
published := taskPublishedCollection.ByLocalRepo(repo)
|
||||
if len(published) > 0 {
|
||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo is published")
|
||||
}
|
||||
|
||||
if !force {
|
||||
snapshots := snapshotCollection.ByLocalRepoSource(repo)
|
||||
snapshots := taskSnapshotCollection.ByLocalRepoSource(repo)
|
||||
if len(snapshots) > 0 {
|
||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override")
|
||||
}
|
||||
}
|
||||
|
||||
return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, collection.Drop(repo)
|
||||
return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, taskCollection.Drop(repo)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -361,10 +414,13 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
|
||||
return
|
||||
}
|
||||
|
||||
// Load shallowly for 404 check and resource key.
|
||||
// Full load and mutations happen inside the task.
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
collection := collectionFactory.LocalRepoCollection()
|
||||
|
||||
repo, err := collection.ByName(c.Params.ByName("name"))
|
||||
name := c.Params.ByName("name")
|
||||
repo, err := collection.ByName(name)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, 404, err)
|
||||
return
|
||||
@@ -373,13 +429,23 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
|
||||
resources := []string{string(repo.Key())}
|
||||
|
||||
maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
err = collection.LoadComplete(repo)
|
||||
// Task: Create fresh factory and collection inside task after lock
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskCollection := taskCollectionFactory.LocalRepoCollection()
|
||||
|
||||
// Fresh load after lock acquired (use captured `name` variable, not gin context)
|
||||
repo, err := taskCollection.ByName(name)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err
|
||||
}
|
||||
|
||||
err = taskCollection.LoadComplete(repo)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
|
||||
out.Printf("Loading packages...\n")
|
||||
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
|
||||
list, err := deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
@@ -388,7 +454,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
|
||||
for _, ref := range b.PackageRefs {
|
||||
var p *deb.Package
|
||||
|
||||
p, err = collectionFactory.PackageCollection().ByKey([]byte(ref))
|
||||
p, err = taskCollectionFactory.PackageCollection().ByKey([]byte(ref))
|
||||
if err != nil {
|
||||
if err == database.ErrNotFound {
|
||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("packages %s: %s", ref, err)
|
||||
@@ -404,7 +470,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
|
||||
|
||||
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
|
||||
|
||||
err = collectionFactory.LocalRepoCollection().Update(repo)
|
||||
err = taskCollection.Update(repo)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
|
||||
}
|
||||
@@ -511,6 +577,8 @@ func apiReposPackageFromDir(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Load shallowly for 404 check and resource key.
|
||||
// Full load and mutations happen inside the task.
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
collection := collectionFactory.LocalRepoCollection()
|
||||
|
||||
@@ -534,7 +602,17 @@ func apiReposPackageFromDir(c *gin.Context) {
|
||||
resources := []string{string(repo.Key())}
|
||||
resources = append(resources, sources...)
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
err = collection.LoadComplete(repo)
|
||||
// Task: Create fresh factory and collection inside task after lock
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskCollection := taskCollectionFactory.LocalRepoCollection()
|
||||
|
||||
// Fresh load after lock acquired
|
||||
repo, err := taskCollection.ByName(name)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
|
||||
err = taskCollection.LoadComplete(repo)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
@@ -555,13 +633,13 @@ func apiReposPackageFromDir(c *gin.Context) {
|
||||
|
||||
packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter)
|
||||
|
||||
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
|
||||
list, err = deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages: %s", err)
|
||||
}
|
||||
|
||||
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
|
||||
collectionFactory.PackageCollection(), reporter, nil, collectionFactory.ChecksumCollection)
|
||||
taskCollectionFactory.PackageCollection(), reporter, nil, taskCollectionFactory.ChecksumCollection)
|
||||
failedFiles = append(failedFiles, failedFiles2...)
|
||||
processedFiles = append(processedFiles, otherFiles...)
|
||||
|
||||
@@ -571,7 +649,7 @@ func apiReposPackageFromDir(c *gin.Context) {
|
||||
|
||||
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
|
||||
|
||||
err = collectionFactory.LocalRepoCollection().Update(repo)
|
||||
err = taskCollection.Update(repo)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
|
||||
}
|
||||
@@ -650,6 +728,8 @@ func apiReposCopyPackage(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Load shallowly for 404 check and resource keys.
|
||||
// Full load and mutations happen inside the task.
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
dstRepo, err := collectionFactory.LocalRepoCollection().ByName(dstRepoName)
|
||||
if err != nil {
|
||||
@@ -673,12 +753,26 @@ func apiReposCopyPackage(c *gin.Context) {
|
||||
resources := []string{string(dstRepo.Key()), string(srcRepo.Key())}
|
||||
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
|
||||
// Task: Create fresh factory and collections inside task after lock
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
|
||||
// Fresh load of both repos after lock acquired
|
||||
dstRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(dstRepoName)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
|
||||
}
|
||||
|
||||
err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
|
||||
srcRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(srcRepoName)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
|
||||
}
|
||||
|
||||
err = taskCollectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
|
||||
}
|
||||
|
||||
err = taskCollectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
|
||||
}
|
||||
@@ -691,12 +785,12 @@ func apiReposCopyPackage(c *gin.Context) {
|
||||
RemovedLines: []string{},
|
||||
}
|
||||
|
||||
dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), collectionFactory.PackageCollection(), context.Progress())
|
||||
dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in dest: %s", err)
|
||||
}
|
||||
|
||||
srcList, err := deb.NewPackageListFromRefList(srcRefList, collectionFactory.PackageCollection(), context.Progress())
|
||||
srcList, err := deb.NewPackageListFromRefList(srcRefList, taskCollectionFactory.PackageCollection(), context.Progress())
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in src: %s", err)
|
||||
}
|
||||
@@ -764,7 +858,7 @@ func apiReposCopyPackage(c *gin.Context) {
|
||||
} else {
|
||||
dstRepo.UpdateRefList(deb.NewPackageRefListFromPackageList(dstList))
|
||||
|
||||
err = collectionFactory.LocalRepoCollection().Update(dstRepo)
|
||||
err = taskCollectionFactory.LocalRepoCollection().Update(dstRepo)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
|
||||
}
|
||||
@@ -867,6 +961,9 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
|
||||
resources = append(resources, sources...)
|
||||
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
// Task: Create fresh factory and collection inside task after lock
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
|
||||
var (
|
||||
err error
|
||||
verifier = context.GetVerifier()
|
||||
@@ -882,8 +979,8 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
|
||||
changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter)
|
||||
_, failedFiles2, err = deb.ImportChangesFiles(
|
||||
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
|
||||
repoTemplate, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(),
|
||||
context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse)
|
||||
repoTemplate, context.Progress(), taskCollectionFactory.LocalRepoCollection(), taskCollectionFactory.PackageCollection(),
|
||||
context.PackagePool(), taskCollectionFactory.ChecksumCollection, nil, query.Parse)
|
||||
failedFiles = append(failedFiles, failedFiles2...)
|
||||
|
||||
if err != nil {
|
||||
|
||||
+105
-33
@@ -165,6 +165,7 @@ func apiSnapshotsCreate(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// Phase 1: Pre-task validation (shallow load for 404 checks only)
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||
var resources []string
|
||||
@@ -182,8 +183,20 @@ func apiSnapshotsCreate(c *gin.Context) {
|
||||
}
|
||||
|
||||
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
for i := range sources {
|
||||
err = snapshotCollection.LoadComplete(sources[i])
|
||||
// Phase 2: Inside task lock - create fresh factory
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||
taskPackageCollection := taskCollectionFactory.PackageCollection()
|
||||
|
||||
// Fresh load of all sources after lock acquired
|
||||
freshSources := make([]*deb.Snapshot, len(b.SourceSnapshots))
|
||||
for i := range b.SourceSnapshots {
|
||||
freshSources[i], err = taskSnapshotCollection.ByName(b.SourceSnapshots[i])
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
// LoadComplete on fresh copy
|
||||
err = taskSnapshotCollection.LoadComplete(freshSources[i])
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
@@ -191,9 +204,9 @@ func apiSnapshotsCreate(c *gin.Context) {
|
||||
|
||||
list := deb.NewPackageList()
|
||||
|
||||
// verify package refs and build package list
|
||||
// verify package refs and build package list using fresh factory
|
||||
for _, ref := range b.PackageRefs {
|
||||
p, err := collectionFactory.PackageCollection().ByKey([]byte(ref))
|
||||
p, err := taskPackageCollection.ByKey([]byte(ref))
|
||||
if err != nil {
|
||||
if err == database.ErrNotFound {
|
||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err)
|
||||
@@ -206,9 +219,9 @@ func apiSnapshotsCreate(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
snapshot = deb.NewSnapshotFromRefList(b.Name, sources, deb.NewPackageRefListFromPackageList(list), b.Description)
|
||||
snapshot = deb.NewSnapshotFromRefList(b.Name, freshSources, deb.NewPackageRefListFromPackageList(list), b.Description)
|
||||
|
||||
err = snapshotCollection.Add(snapshot)
|
||||
err = taskSnapshotCollection.Add(snapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
|
||||
}
|
||||
@@ -315,6 +328,7 @@ func apiSnapshotsUpdate(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Phase 1: Pre-task validation (shallow load for 404 check only)
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
collection := collectionFactory.SnapshotCollection()
|
||||
name := c.Params.ByName("name")
|
||||
@@ -325,14 +339,38 @@ func apiSnapshotsUpdate(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Pre-task validation of new name if provided (skip if renaming to same name)
|
||||
if b.Name != "" && b.Name != name {
|
||||
_, err = collection.ByName(b.Name)
|
||||
if err == nil {
|
||||
AbortWithJSONError(c, 409, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
resources := []string{string(snapshot.ResourceKey()), "S" + b.Name}
|
||||
taskName := fmt.Sprintf("Update snapshot %s", name)
|
||||
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
_, err := collection.ByName(b.Name)
|
||||
if err == nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
|
||||
// Phase 2: Inside task lock - create fresh factory
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskCollection := taskCollectionFactory.SnapshotCollection()
|
||||
|
||||
// Fresh load after lock acquired
|
||||
snapshot, err = taskCollection.ByName(name)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
|
||||
// Fresh duplicate check inside lock
|
||||
if b.Name != "" {
|
||||
_, err := taskCollection.ByName(b.Name)
|
||||
if err == nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// Update fresh copy
|
||||
if b.Name != "" {
|
||||
snapshot.Name = b.Name
|
||||
}
|
||||
@@ -341,7 +379,7 @@ func apiSnapshotsUpdate(c *gin.Context) {
|
||||
snapshot.Description = b.Description
|
||||
}
|
||||
|
||||
err = collectionFactory.SnapshotCollection().Update(snapshot)
|
||||
err = taskCollection.Update(snapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
@@ -395,9 +433,9 @@ func apiSnapshotsDrop(c *gin.Context) {
|
||||
name := c.Params.ByName("name")
|
||||
force := c.Request.URL.Query().Get("force") == "1"
|
||||
|
||||
// Phase 1: Pre-task validation (shallow load for 404 check only)
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||
publishedCollection := collectionFactory.PublishedRepoCollection()
|
||||
|
||||
snapshot, err := snapshotCollection.ByName(name)
|
||||
if err != nil {
|
||||
@@ -407,21 +445,35 @@ func apiSnapshotsDrop(c *gin.Context) {
|
||||
|
||||
resources := []string{string(snapshot.ResourceKey())}
|
||||
taskName := fmt.Sprintf("Delete snapshot %s", name)
|
||||
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
published := publishedCollection.BySnapshot(snapshot)
|
||||
// Phase 2: Inside task lock - create fresh collections
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||
taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection()
|
||||
|
||||
// Fresh load after lock acquired
|
||||
snapshot, err := taskSnapshotCollection.ByName(name)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
|
||||
// Fresh checks with current collections
|
||||
published := taskPublishedCollection.BySnapshot(snapshot)
|
||||
|
||||
if len(published) > 0 {
|
||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: snapshot is published")
|
||||
}
|
||||
|
||||
if !force {
|
||||
snapshots := snapshotCollection.BySnapshotSource(snapshot)
|
||||
// Using fresh collection for dependency check
|
||||
snapshots := taskSnapshotCollection.BySnapshotSource(snapshot)
|
||||
if len(snapshots) > 0 {
|
||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override")
|
||||
}
|
||||
}
|
||||
|
||||
err = snapshotCollection.Drop(snapshot)
|
||||
err = taskSnapshotCollection.Drop(snapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
@@ -576,6 +628,7 @@ func apiSnapshotsMerge(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Phase 1: Pre-task validation (shallow load for 404 checks only)
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||
|
||||
@@ -592,32 +645,43 @@ func apiSnapshotsMerge(c *gin.Context) {
|
||||
}
|
||||
|
||||
maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
err = snapshotCollection.LoadComplete(sources[0])
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
result := sources[0].RefList()
|
||||
for i := 1; i < len(sources); i++ {
|
||||
err = snapshotCollection.LoadComplete(sources[i])
|
||||
// Phase 2: Inside task lock - create fresh factory
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||
|
||||
// Fresh load of all sources inside task
|
||||
freshSources := make([]*deb.Snapshot, len(body.Sources))
|
||||
for i := range body.Sources {
|
||||
freshSources[i], err = taskSnapshotCollection.ByName(body.Sources[i])
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
result = result.Merge(sources[i].RefList(), overrideMatching, false)
|
||||
// LoadComplete on fresh copy
|
||||
err = taskSnapshotCollection.LoadComplete(freshSources[i])
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
}
|
||||
|
||||
// Merge using fresh sources
|
||||
result := freshSources[0].RefList()
|
||||
for i := 1; i < len(freshSources); i++ {
|
||||
result = result.Merge(freshSources[i].RefList(), overrideMatching, false)
|
||||
}
|
||||
|
||||
if latest {
|
||||
result.FilterLatestRefs()
|
||||
}
|
||||
|
||||
sourceDescription := make([]string, len(sources))
|
||||
for i, s := range sources {
|
||||
sourceDescription := make([]string, len(freshSources))
|
||||
for i, s := range freshSources {
|
||||
sourceDescription[i] = fmt.Sprintf("'%s'", s.Name)
|
||||
}
|
||||
|
||||
snapshot = deb.NewSnapshotFromRefList(name, sources, result,
|
||||
snapshot = deb.NewSnapshotFromRefList(name, freshSources, result,
|
||||
fmt.Sprintf("Merged from sources: %s", strings.Join(sourceDescription, ", ")))
|
||||
|
||||
err = collectionFactory.SnapshotCollection().Add(snapshot)
|
||||
err = taskCollectionFactory.SnapshotCollection().Add(snapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create snapshot: %s", err)
|
||||
}
|
||||
@@ -701,21 +765,29 @@ func apiSnapshotsPull(c *gin.Context) {
|
||||
resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())}
|
||||
taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination)
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
err = collectionFactory.SnapshotCollection().LoadComplete(toSnapshot)
|
||||
// Phase 2: Inside task lock - create fresh factory
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
|
||||
// Fresh load of snapshots after lock acquired
|
||||
freshToSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(name)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
err = collectionFactory.SnapshotCollection().LoadComplete(sourceSnapshot)
|
||||
freshSourceSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(body.Source)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
err = taskCollectionFactory.SnapshotCollection().LoadComplete(freshSourceSnapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
|
||||
// convert snapshots to package list
|
||||
toPackageList, err := deb.NewPackageListFromRefList(toSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
|
||||
toPackageList, err := deb.NewPackageListFromRefList(freshToSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
sourcePackageList, err := deb.NewPackageListFromRefList(sourceSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
|
||||
sourcePackageList, err := deb.NewPackageListFromRefList(freshSourceSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
@@ -812,10 +884,10 @@ func apiSnapshotsPull(c *gin.Context) {
|
||||
}
|
||||
|
||||
// Create <destination> snapshot
|
||||
destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{toSnapshot, sourceSnapshot}, toPackageList,
|
||||
fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", toSnapshot.Name, sourceSnapshot.Name, strings.Join(body.Queries, ", ")))
|
||||
destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{freshToSnapshot, freshSourceSnapshot}, toPackageList,
|
||||
fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", freshToSnapshot.Name, freshSourceSnapshot.Name, strings.Join(body.Queries, ", ")))
|
||||
|
||||
err = collectionFactory.SnapshotCollection().Add(destinationSnapshot)
|
||||
err = taskCollectionFactory.SnapshotCollection().Add(destinationSnapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
|
||||
@@ -461,3 +461,34 @@ class ReposAPITestCopyPackage(APITest):
|
||||
|
||||
self.check_equal(self.get(f"/api/repos/{repo2_name}/packages").json(),
|
||||
['Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378'])
|
||||
|
||||
|
||||
class ReposAPITestCreateEdit(APITest):
|
||||
"""
|
||||
POST /api/repos,
|
||||
"""
|
||||
def check(self):
|
||||
repo_name = self.random_name() + ' with space'
|
||||
repo_desc = {'Comment': 'fun repo',
|
||||
'DefaultComponent': 'contrib',
|
||||
'DefaultDistribution': 'bookworm',
|
||||
'Name': repo_name}
|
||||
|
||||
resp = self.post("/api/repos", json=repo_desc)
|
||||
self.check_equal(resp.json(), repo_desc)
|
||||
self.check_equal(resp.status_code, 201)
|
||||
|
||||
repo_desc = {'Comment': 'modified repo',
|
||||
'DefaultComponent': 'main',
|
||||
'DefaultDistribution': 'trixie',
|
||||
'Name': repo_name + '@renamed'}
|
||||
resp = self.put(f"/api/repos/{repo_name}", json=repo_desc)
|
||||
self.check_equal(resp.json(), repo_desc)
|
||||
self.check_equal(resp.status_code, 200)
|
||||
|
||||
resp = self.get("/api/repos/" + repo_name + '@renamed')
|
||||
self.check_equal(resp.json(), repo_desc)
|
||||
self.check_equal(resp.status_code, 200)
|
||||
|
||||
resp = self.delete("/api/repos/" + repo_name + '@renamed')
|
||||
self.check_equal(resp.status_code, 200)
|
||||
|
||||
Reference in New Issue
Block a user