mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-06-11 06:24:04 +00:00
snapshot: load data inside background tasks
This fixes a flaw in async apis, which loaded data from the DB and mutated it outside the task closure, before the task lock was acquired. * perform collection.LoadComplete inside maybeRunTaskInBackground * have tasks use a fresh copy of taskCollectionFactory, taskCollection
This commit is contained in:
+134
-48
@@ -83,26 +83,33 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.RemoteRepoCollection()
|
|
||||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
|
||||||
name := c.Params.ByName("name")
|
name := c.Params.ByName("name")
|
||||||
|
|
||||||
repo, err = collection.ByName(name)
|
repo, err = collectionFactory.RemoteRepoCollection().ByName(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, 404, err)
|
AbortWithJSONError(c, 404, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// including snapshot resource key
|
// including snapshot resource key
|
||||||
resources := []string{string(repo.Key()), "S" + b.Name}
|
resources := []string{string(repo.Key())}
|
||||||
taskName := fmt.Sprintf("Create snapshot of mirror %s", name)
|
taskName := fmt.Sprintf("Create snapshot of mirror %s", name)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
err := repo.CheckLock()
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
taskMirrorCollection := taskCollectionFactory.RemoteRepoCollection()
|
||||||
|
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||||
|
|
||||||
|
repo, err := taskMirrorCollection.ByName(name)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = repo.CheckLock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = collection.LoadComplete(repo)
|
err = taskMirrorCollection.LoadComplete(repo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -116,7 +123,7 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) {
|
|||||||
snapshot.Description = b.Description
|
snapshot.Description = b.Description
|
||||||
}
|
}
|
||||||
|
|
||||||
err = snapshotCollection.Add(snapshot)
|
err = taskSnapshotCollection.Add(snapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -165,6 +172,7 @@ func apiSnapshotsCreate(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Phase 1: Pre-task validation (shallow load for 404 checks only)
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||||
var resources []string
|
var resources []string
|
||||||
@@ -182,8 +190,20 @@ func apiSnapshotsCreate(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
for i := range sources {
|
// Phase 2: Inside task lock - create fresh factory
|
||||||
err = snapshotCollection.LoadComplete(sources[i])
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||||
|
taskPackageCollection := taskCollectionFactory.PackageCollection()
|
||||||
|
|
||||||
|
// Fresh load of all sources after lock acquired
|
||||||
|
freshSources := make([]*deb.Snapshot, len(b.SourceSnapshots))
|
||||||
|
for i := range b.SourceSnapshots {
|
||||||
|
freshSources[i], err = taskSnapshotCollection.ByName(b.SourceSnapshots[i])
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
|
}
|
||||||
|
// LoadComplete on fresh copy
|
||||||
|
err = taskSnapshotCollection.LoadComplete(freshSources[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -191,9 +211,9 @@ func apiSnapshotsCreate(c *gin.Context) {
|
|||||||
|
|
||||||
list := deb.NewPackageList()
|
list := deb.NewPackageList()
|
||||||
|
|
||||||
// verify package refs and build package list
|
// verify package refs and build package list using fresh factory
|
||||||
for _, ref := range b.PackageRefs {
|
for _, ref := range b.PackageRefs {
|
||||||
p, err := collectionFactory.PackageCollection().ByKey([]byte(ref))
|
p, err := taskPackageCollection.ByKey([]byte(ref))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == database.ErrNotFound {
|
if err == database.ErrNotFound {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err)
|
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err)
|
||||||
@@ -206,9 +226,9 @@ func apiSnapshotsCreate(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
snapshot = deb.NewSnapshotFromRefList(b.Name, sources, deb.NewPackageRefListFromPackageList(list), b.Description)
|
snapshot = deb.NewSnapshotFromRefList(b.Name, freshSources, deb.NewPackageRefListFromPackageList(list), b.Description)
|
||||||
|
|
||||||
err = snapshotCollection.Add(snapshot)
|
err = taskSnapshotCollection.Add(snapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -249,21 +269,28 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.LocalRepoCollection()
|
|
||||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
|
||||||
name := c.Params.ByName("name")
|
name := c.Params.ByName("name")
|
||||||
|
|
||||||
repo, err = collection.ByName(name)
|
repo, err = collectionFactory.LocalRepoCollection().ByName(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, 404, err)
|
AbortWithJSONError(c, 404, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// including snapshot resource key
|
// including snapshot resource key
|
||||||
resources := []string{string(repo.Key()), "S" + b.Name}
|
resources := []string{string(repo.Key())}
|
||||||
taskName := fmt.Sprintf("Create snapshot of repo %s", name)
|
taskName := fmt.Sprintf("Create snapshot of repo %s", name)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
err := collection.LoadComplete(repo)
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
taskRepoCollection := taskCollectionFactory.LocalRepoCollection()
|
||||||
|
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||||
|
|
||||||
|
repo, err := taskRepoCollection.ByName(name)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = taskRepoCollection.LoadComplete(repo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -277,7 +304,7 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) {
|
|||||||
snapshot.Description = b.Description
|
snapshot.Description = b.Description
|
||||||
}
|
}
|
||||||
|
|
||||||
err = snapshotCollection.Add(snapshot)
|
err = taskSnapshotCollection.Add(snapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -315,6 +342,7 @@ func apiSnapshotsUpdate(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Phase 1: Pre-task validation (shallow load for 404 check only)
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.SnapshotCollection()
|
collection := collectionFactory.SnapshotCollection()
|
||||||
name := c.Params.ByName("name")
|
name := c.Params.ByName("name")
|
||||||
@@ -325,14 +353,38 @@ func apiSnapshotsUpdate(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
resources := []string{string(snapshot.ResourceKey()), "S" + b.Name}
|
// Pre-task validation of new name if provided (skip if renaming to same name)
|
||||||
taskName := fmt.Sprintf("Update snapshot %s", name)
|
if b.Name != "" && b.Name != name {
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
_, err = collection.ByName(b.Name)
|
||||||
_, err := collection.ByName(b.Name)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
|
AbortWithJSONError(c, 409, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
resources := []string{string(snapshot.ResourceKey())}
|
||||||
|
taskName := fmt.Sprintf("Update snapshot %s", name)
|
||||||
|
|
||||||
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
|
// Phase 2: Inside task lock - create fresh factory
|
||||||
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
taskCollection := taskCollectionFactory.SnapshotCollection()
|
||||||
|
|
||||||
|
// Fresh load after lock acquired
|
||||||
|
snapshot, err = taskCollection.ByName(name)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Fresh duplicate check inside lock
|
||||||
|
if b.Name != "" {
|
||||||
|
_, err := taskCollection.ByName(b.Name)
|
||||||
|
if err == nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update fresh copy
|
||||||
if b.Name != "" {
|
if b.Name != "" {
|
||||||
snapshot.Name = b.Name
|
snapshot.Name = b.Name
|
||||||
}
|
}
|
||||||
@@ -341,7 +393,7 @@ func apiSnapshotsUpdate(c *gin.Context) {
|
|||||||
snapshot.Description = b.Description
|
snapshot.Description = b.Description
|
||||||
}
|
}
|
||||||
|
|
||||||
err = collectionFactory.SnapshotCollection().Update(snapshot)
|
err = taskCollection.Update(snapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -395,9 +447,9 @@ func apiSnapshotsDrop(c *gin.Context) {
|
|||||||
name := c.Params.ByName("name")
|
name := c.Params.ByName("name")
|
||||||
force := c.Request.URL.Query().Get("force") == "1"
|
force := c.Request.URL.Query().Get("force") == "1"
|
||||||
|
|
||||||
|
// Phase 1: Pre-task validation (shallow load for 404 check only)
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||||
publishedCollection := collectionFactory.PublishedRepoCollection()
|
|
||||||
|
|
||||||
snapshot, err := snapshotCollection.ByName(name)
|
snapshot, err := snapshotCollection.ByName(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -407,21 +459,35 @@ func apiSnapshotsDrop(c *gin.Context) {
|
|||||||
|
|
||||||
resources := []string{string(snapshot.ResourceKey())}
|
resources := []string{string(snapshot.ResourceKey())}
|
||||||
taskName := fmt.Sprintf("Delete snapshot %s", name)
|
taskName := fmt.Sprintf("Delete snapshot %s", name)
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
published := publishedCollection.BySnapshot(snapshot)
|
// Phase 2: Inside task lock - create fresh collections
|
||||||
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||||
|
taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
|
// Fresh load after lock acquired
|
||||||
|
snapshot, err := taskSnapshotCollection.ByName(name)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fresh checks with current collections
|
||||||
|
published := taskPublishedCollection.BySnapshot(snapshot)
|
||||||
|
|
||||||
if len(published) > 0 {
|
if len(published) > 0 {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: snapshot is published")
|
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: snapshot is published")
|
||||||
}
|
}
|
||||||
|
|
||||||
if !force {
|
if !force {
|
||||||
snapshots := snapshotCollection.BySnapshotSource(snapshot)
|
// Using fresh collection for dependency check
|
||||||
|
snapshots := taskSnapshotCollection.BySnapshotSource(snapshot)
|
||||||
if len(snapshots) > 0 {
|
if len(snapshots) > 0 {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override")
|
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = snapshotCollection.Drop(snapshot)
|
err = taskSnapshotCollection.Drop(snapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -576,6 +642,7 @@ func apiSnapshotsMerge(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Phase 1: Pre-task validation (shallow load for 404 checks only)
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||||
|
|
||||||
@@ -592,32 +659,43 @@ func apiSnapshotsMerge(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
err = snapshotCollection.LoadComplete(sources[0])
|
// Phase 2: Inside task lock - create fresh factory
|
||||||
if err != nil {
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||||
}
|
|
||||||
result := sources[0].RefList()
|
// Fresh load of all sources inside task
|
||||||
for i := 1; i < len(sources); i++ {
|
freshSources := make([]*deb.Snapshot, len(body.Sources))
|
||||||
err = snapshotCollection.LoadComplete(sources[i])
|
for i := range body.Sources {
|
||||||
|
freshSources[i], err = taskSnapshotCollection.ByName(body.Sources[i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
result = result.Merge(sources[i].RefList(), overrideMatching, false)
|
// LoadComplete on fresh copy
|
||||||
|
err = taskSnapshotCollection.LoadComplete(freshSources[i])
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge using fresh sources
|
||||||
|
result := freshSources[0].RefList()
|
||||||
|
for i := 1; i < len(freshSources); i++ {
|
||||||
|
result = result.Merge(freshSources[i].RefList(), overrideMatching, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
if latest {
|
if latest {
|
||||||
result.FilterLatestRefs()
|
result.FilterLatestRefs()
|
||||||
}
|
}
|
||||||
|
|
||||||
sourceDescription := make([]string, len(sources))
|
sourceDescription := make([]string, len(freshSources))
|
||||||
for i, s := range sources {
|
for i, s := range freshSources {
|
||||||
sourceDescription[i] = fmt.Sprintf("'%s'", s.Name)
|
sourceDescription[i] = fmt.Sprintf("'%s'", s.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
snapshot = deb.NewSnapshotFromRefList(name, sources, result,
|
snapshot = deb.NewSnapshotFromRefList(name, freshSources, result,
|
||||||
fmt.Sprintf("Merged from sources: %s", strings.Join(sourceDescription, ", ")))
|
fmt.Sprintf("Merged from sources: %s", strings.Join(sourceDescription, ", ")))
|
||||||
|
|
||||||
err = collectionFactory.SnapshotCollection().Add(snapshot)
|
err = taskCollectionFactory.SnapshotCollection().Add(snapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create snapshot: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create snapshot: %s", err)
|
||||||
}
|
}
|
||||||
@@ -701,21 +779,29 @@ func apiSnapshotsPull(c *gin.Context) {
|
|||||||
resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())}
|
resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())}
|
||||||
taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination)
|
taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
err = collectionFactory.SnapshotCollection().LoadComplete(toSnapshot)
|
// Phase 2: Inside task lock - create fresh factory
|
||||||
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
|
||||||
|
// Fresh load of snapshots after lock acquired
|
||||||
|
freshToSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
err = collectionFactory.SnapshotCollection().LoadComplete(sourceSnapshot)
|
freshSourceSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(body.Source)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
|
}
|
||||||
|
err = taskCollectionFactory.SnapshotCollection().LoadComplete(freshSourceSnapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// convert snapshots to package list
|
// convert snapshots to package list
|
||||||
toPackageList, err := deb.NewPackageListFromRefList(toSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
|
toPackageList, err := deb.NewPackageListFromRefList(freshToSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
sourcePackageList, err := deb.NewPackageListFromRefList(sourceSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
|
sourcePackageList, err := deb.NewPackageListFromRefList(freshSourceSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -812,10 +898,10 @@ func apiSnapshotsPull(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create <destination> snapshot
|
// Create <destination> snapshot
|
||||||
destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{toSnapshot, sourceSnapshot}, toPackageList,
|
destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{freshToSnapshot, freshSourceSnapshot}, toPackageList,
|
||||||
fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", toSnapshot.Name, sourceSnapshot.Name, strings.Join(body.Queries, ", ")))
|
fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", freshToSnapshot.Name, freshSourceSnapshot.Name, strings.Join(body.Queries, ", ")))
|
||||||
|
|
||||||
err = collectionFactory.SnapshotCollection().Add(destinationSnapshot)
|
err = taskCollectionFactory.SnapshotCollection().Add(destinationSnapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user