mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-06-08 05:50:47 +00:00
snapshot: load data inside background tasks
This fixes a flaw in async apis, which loaded data from the DB and mutated it outside the task closure, before the task lock was acquired. * perform collection.LoadComplete inside maybeRunTaskInBackground * have tasks use a fresh copy of taskCollectionFactory, taskCollection
This commit is contained in:
+130
-44
@@ -83,11 +83,9 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) {
|
||||
}
|
||||
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
collection := collectionFactory.RemoteRepoCollection()
|
||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||
name := c.Params.ByName("name")
|
||||
|
||||
repo, err = collection.ByName(name)
|
||||
repo, err = collectionFactory.RemoteRepoCollection().ByName(name)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, 404, err)
|
||||
return
|
||||
@@ -97,12 +95,21 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) {
|
||||
resources := []string{string(repo.Key()), "S" + b.Name}
|
||||
taskName := fmt.Sprintf("Create snapshot of mirror %s", name)
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
err := repo.CheckLock()
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskMirrorCollection := taskCollectionFactory.RemoteRepoCollection()
|
||||
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||
|
||||
repo, err := taskMirrorCollection.ByName(name)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
|
||||
err = repo.CheckLock()
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, err
|
||||
}
|
||||
|
||||
err = collection.LoadComplete(repo)
|
||||
err = taskMirrorCollection.LoadComplete(repo)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
@@ -116,7 +123,7 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) {
|
||||
snapshot.Description = b.Description
|
||||
}
|
||||
|
||||
err = snapshotCollection.Add(snapshot)
|
||||
err = taskSnapshotCollection.Add(snapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
|
||||
}
|
||||
@@ -165,6 +172,7 @@ func apiSnapshotsCreate(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// Phase 1: Pre-task validation (shallow load for 404 checks only)
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||
var resources []string
|
||||
@@ -182,8 +190,20 @@ func apiSnapshotsCreate(c *gin.Context) {
|
||||
}
|
||||
|
||||
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
for i := range sources {
|
||||
err = snapshotCollection.LoadComplete(sources[i])
|
||||
// Phase 2: Inside task lock - create fresh factory
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||
taskPackageCollection := taskCollectionFactory.PackageCollection()
|
||||
|
||||
// Fresh load of all sources after lock acquired
|
||||
freshSources := make([]*deb.Snapshot, len(b.SourceSnapshots))
|
||||
for i := range b.SourceSnapshots {
|
||||
freshSources[i], err = taskSnapshotCollection.ByName(b.SourceSnapshots[i])
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
// LoadComplete on fresh copy
|
||||
err = taskSnapshotCollection.LoadComplete(freshSources[i])
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
@@ -191,9 +211,9 @@ func apiSnapshotsCreate(c *gin.Context) {
|
||||
|
||||
list := deb.NewPackageList()
|
||||
|
||||
// verify package refs and build package list
|
||||
// verify package refs and build package list using fresh factory
|
||||
for _, ref := range b.PackageRefs {
|
||||
p, err := collectionFactory.PackageCollection().ByKey([]byte(ref))
|
||||
p, err := taskPackageCollection.ByKey([]byte(ref))
|
||||
if err != nil {
|
||||
if err == database.ErrNotFound {
|
||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err)
|
||||
@@ -206,9 +226,9 @@ func apiSnapshotsCreate(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
snapshot = deb.NewSnapshotFromRefList(b.Name, sources, deb.NewPackageRefListFromPackageList(list), b.Description)
|
||||
snapshot = deb.NewSnapshotFromRefList(b.Name, freshSources, deb.NewPackageRefListFromPackageList(list), b.Description)
|
||||
|
||||
err = snapshotCollection.Add(snapshot)
|
||||
err = taskSnapshotCollection.Add(snapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
|
||||
}
|
||||
@@ -249,11 +269,9 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) {
|
||||
}
|
||||
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
collection := collectionFactory.LocalRepoCollection()
|
||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||
name := c.Params.ByName("name")
|
||||
|
||||
repo, err = collection.ByName(name)
|
||||
repo, err = collectionFactory.LocalRepoCollection().ByName(name)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, 404, err)
|
||||
return
|
||||
@@ -263,7 +281,16 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) {
|
||||
resources := []string{string(repo.Key()), "S" + b.Name}
|
||||
taskName := fmt.Sprintf("Create snapshot of repo %s", name)
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
err := collection.LoadComplete(repo)
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskRepoCollection := taskCollectionFactory.LocalRepoCollection()
|
||||
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||
|
||||
repo, err := taskRepoCollection.ByName(name)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
|
||||
err = taskRepoCollection.LoadComplete(repo)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
@@ -277,7 +304,7 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) {
|
||||
snapshot.Description = b.Description
|
||||
}
|
||||
|
||||
err = snapshotCollection.Add(snapshot)
|
||||
err = taskSnapshotCollection.Add(snapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
|
||||
}
|
||||
@@ -315,6 +342,7 @@ func apiSnapshotsUpdate(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Phase 1: Pre-task validation (shallow load for 404 check only)
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
collection := collectionFactory.SnapshotCollection()
|
||||
name := c.Params.ByName("name")
|
||||
@@ -325,14 +353,38 @@ func apiSnapshotsUpdate(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Pre-task validation of new name if provided (skip if renaming to same name)
|
||||
if b.Name != "" && b.Name != name {
|
||||
_, err = collection.ByName(b.Name)
|
||||
if err == nil {
|
||||
AbortWithJSONError(c, 409, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
resources := []string{string(snapshot.ResourceKey()), "S" + b.Name}
|
||||
taskName := fmt.Sprintf("Update snapshot %s", name)
|
||||
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
_, err := collection.ByName(b.Name)
|
||||
if err == nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
|
||||
// Phase 2: Inside task lock - create fresh factory
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskCollection := taskCollectionFactory.SnapshotCollection()
|
||||
|
||||
// Fresh load after lock acquired
|
||||
snapshot, err = taskCollection.ByName(name)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
|
||||
// Fresh duplicate check inside lock
|
||||
if b.Name != "" {
|
||||
_, err := taskCollection.ByName(b.Name)
|
||||
if err == nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// Update fresh copy
|
||||
if b.Name != "" {
|
||||
snapshot.Name = b.Name
|
||||
}
|
||||
@@ -341,7 +393,7 @@ func apiSnapshotsUpdate(c *gin.Context) {
|
||||
snapshot.Description = b.Description
|
||||
}
|
||||
|
||||
err = collectionFactory.SnapshotCollection().Update(snapshot)
|
||||
err = taskCollection.Update(snapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
@@ -395,9 +447,9 @@ func apiSnapshotsDrop(c *gin.Context) {
|
||||
name := c.Params.ByName("name")
|
||||
force := c.Request.URL.Query().Get("force") == "1"
|
||||
|
||||
// Phase 1: Pre-task validation (shallow load for 404 check only)
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||
publishedCollection := collectionFactory.PublishedRepoCollection()
|
||||
|
||||
snapshot, err := snapshotCollection.ByName(name)
|
||||
if err != nil {
|
||||
@@ -407,21 +459,35 @@ func apiSnapshotsDrop(c *gin.Context) {
|
||||
|
||||
resources := []string{string(snapshot.ResourceKey())}
|
||||
taskName := fmt.Sprintf("Delete snapshot %s", name)
|
||||
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
published := publishedCollection.BySnapshot(snapshot)
|
||||
// Phase 2: Inside task lock - create fresh collections
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||
taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection()
|
||||
|
||||
// Fresh load after lock acquired
|
||||
snapshot, err := taskSnapshotCollection.ByName(name)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
|
||||
// Fresh checks with current collections
|
||||
published := taskPublishedCollection.BySnapshot(snapshot)
|
||||
|
||||
if len(published) > 0 {
|
||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: snapshot is published")
|
||||
}
|
||||
|
||||
if !force {
|
||||
snapshots := snapshotCollection.BySnapshotSource(snapshot)
|
||||
// Using fresh collection for dependency check
|
||||
snapshots := taskSnapshotCollection.BySnapshotSource(snapshot)
|
||||
if len(snapshots) > 0 {
|
||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override")
|
||||
}
|
||||
}
|
||||
|
||||
err = snapshotCollection.Drop(snapshot)
|
||||
err = taskSnapshotCollection.Drop(snapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
@@ -576,6 +642,7 @@ func apiSnapshotsMerge(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Phase 1: Pre-task validation (shallow load for 404 checks only)
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||
|
||||
@@ -592,32 +659,43 @@ func apiSnapshotsMerge(c *gin.Context) {
|
||||
}
|
||||
|
||||
maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
err = snapshotCollection.LoadComplete(sources[0])
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
result := sources[0].RefList()
|
||||
for i := 1; i < len(sources); i++ {
|
||||
err = snapshotCollection.LoadComplete(sources[i])
|
||||
// Phase 2: Inside task lock - create fresh factory
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||
|
||||
// Fresh load of all sources inside task
|
||||
freshSources := make([]*deb.Snapshot, len(body.Sources))
|
||||
for i := range body.Sources {
|
||||
freshSources[i], err = taskSnapshotCollection.ByName(body.Sources[i])
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
result = result.Merge(sources[i].RefList(), overrideMatching, false)
|
||||
// LoadComplete on fresh copy
|
||||
err = taskSnapshotCollection.LoadComplete(freshSources[i])
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
}
|
||||
|
||||
// Merge using fresh sources
|
||||
result := freshSources[0].RefList()
|
||||
for i := 1; i < len(freshSources); i++ {
|
||||
result = result.Merge(freshSources[i].RefList(), overrideMatching, false)
|
||||
}
|
||||
|
||||
if latest {
|
||||
result.FilterLatestRefs()
|
||||
}
|
||||
|
||||
sourceDescription := make([]string, len(sources))
|
||||
for i, s := range sources {
|
||||
sourceDescription := make([]string, len(freshSources))
|
||||
for i, s := range freshSources {
|
||||
sourceDescription[i] = fmt.Sprintf("'%s'", s.Name)
|
||||
}
|
||||
|
||||
snapshot = deb.NewSnapshotFromRefList(name, sources, result,
|
||||
snapshot = deb.NewSnapshotFromRefList(name, freshSources, result,
|
||||
fmt.Sprintf("Merged from sources: %s", strings.Join(sourceDescription, ", ")))
|
||||
|
||||
err = collectionFactory.SnapshotCollection().Add(snapshot)
|
||||
err = taskCollectionFactory.SnapshotCollection().Add(snapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create snapshot: %s", err)
|
||||
}
|
||||
@@ -701,21 +779,29 @@ func apiSnapshotsPull(c *gin.Context) {
|
||||
resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())}
|
||||
taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination)
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
err = collectionFactory.SnapshotCollection().LoadComplete(toSnapshot)
|
||||
// Phase 2: Inside task lock - create fresh factory
|
||||
taskCollectionFactory := context.NewCollectionFactory()
|
||||
|
||||
// Fresh load of snapshots after lock acquired
|
||||
freshToSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(name)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
err = collectionFactory.SnapshotCollection().LoadComplete(sourceSnapshot)
|
||||
freshSourceSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(body.Source)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
err = taskCollectionFactory.SnapshotCollection().LoadComplete(freshSourceSnapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
|
||||
// convert snapshots to package list
|
||||
toPackageList, err := deb.NewPackageListFromRefList(toSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
|
||||
toPackageList, err := deb.NewPackageListFromRefList(freshToSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
sourcePackageList, err := deb.NewPackageListFromRefList(sourceSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
|
||||
sourcePackageList, err := deb.NewPackageListFromRefList(freshSourceSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
@@ -812,10 +898,10 @@ func apiSnapshotsPull(c *gin.Context) {
|
||||
}
|
||||
|
||||
// Create <destination> snapshot
|
||||
destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{toSnapshot, sourceSnapshot}, toPackageList,
|
||||
fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", toSnapshot.Name, sourceSnapshot.Name, strings.Join(body.Queries, ", ")))
|
||||
destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{freshToSnapshot, freshSourceSnapshot}, toPackageList,
|
||||
fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", freshToSnapshot.Name, freshSourceSnapshot.Name, strings.Join(body.Queries, ", ")))
|
||||
|
||||
err = collectionFactory.SnapshotCollection().Add(destinationSnapshot)
|
||||
err = taskCollectionFactory.SnapshotCollection().Add(destinationSnapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user