mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-06-12 06:30:35 +00:00
mirror: load data inside background tasks
This fixes a flaw in async apis, which loaded data from the DB and mutated it outside the task closure, before the task lock was acquired. * perform collection.LoadComplete inside maybeRunTaskInBackground * have tasks use a fresh copy of taskCollectionFactory, taskCollection
This commit is contained in:
+47
-15
@@ -216,9 +216,9 @@ func apiMirrorsDrop(c *gin.Context) {
|
|||||||
name := c.Params.ByName("name")
|
name := c.Params.ByName("name")
|
||||||
force := c.Request.URL.Query().Get("force") == "1"
|
force := c.Request.URL.Query().Get("force") == "1"
|
||||||
|
|
||||||
|
// Phase 1: Pre-task validation (shallow load for 404 check only)
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
mirrorCollection := collectionFactory.RemoteRepoCollection()
|
mirrorCollection := collectionFactory.RemoteRepoCollection()
|
||||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
|
||||||
|
|
||||||
repo, err := mirrorCollection.ByName(name)
|
repo, err := mirrorCollection.ByName(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -228,21 +228,34 @@ func apiMirrorsDrop(c *gin.Context) {
|
|||||||
|
|
||||||
resources := []string{string(repo.Key())}
|
resources := []string{string(repo.Key())}
|
||||||
taskName := fmt.Sprintf("Delete mirror %s", name)
|
taskName := fmt.Sprintf("Delete mirror %s", name)
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
err := repo.CheckLock()
|
// Phase 2: Inside task lock - create fresh collections
|
||||||
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
taskMirrorCollection := taskCollectionFactory.RemoteRepoCollection()
|
||||||
|
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
||||||
|
|
||||||
|
// Fresh load after lock acquired
|
||||||
|
repo, err := taskMirrorCollection.ByName(name)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = repo.CheckLock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !force {
|
if !force {
|
||||||
snapshots := snapshotCollection.ByRemoteRepoSource(repo)
|
// Fresh checks with current collections
|
||||||
|
snapshots := taskSnapshotCollection.ByRemoteRepoSource(repo)
|
||||||
|
|
||||||
if len(snapshots) > 0 {
|
if len(snapshots) > 0 {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusForbidden, Value: nil}, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override")
|
return &task.ProcessReturnValue{Code: http.StatusForbidden, Value: nil}, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = mirrorCollection.Drop(repo)
|
err = taskMirrorCollection.Drop(repo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
|
||||||
}
|
}
|
||||||
@@ -535,7 +548,8 @@ func apiMirrorsUpdate(c *gin.Context) {
|
|||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.RemoteRepoCollection()
|
collection := collectionFactory.RemoteRepoCollection()
|
||||||
|
|
||||||
remote, err = collection.ByName(c.Params.ByName("name"))
|
name := c.Params.ByName("name")
|
||||||
|
remote, err = collection.ByName(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, 404, err)
|
AbortWithJSONError(c, 404, err)
|
||||||
return
|
return
|
||||||
@@ -550,6 +564,7 @@ func apiMirrorsUpdate(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Pre-task validation of new name if provided
|
||||||
if b.Name != remote.Name {
|
if b.Name != remote.Name {
|
||||||
_, err = collection.ByName(b.Name)
|
_, err = collection.ByName(b.Name)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@@ -566,9 +581,26 @@ func apiMirrorsUpdate(c *gin.Context) {
|
|||||||
|
|
||||||
resources := []string{string(remote.Key())}
|
resources := []string{string(remote.Key())}
|
||||||
maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
|
// Phase 2: Inside task lock - create fresh factory
|
||||||
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
taskCollection := taskCollectionFactory.RemoteRepoCollection()
|
||||||
|
|
||||||
|
// Fresh load after lock acquired (use captured `name` variable, not gin context)
|
||||||
|
remote, err := taskCollection.ByName(name)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fresh rename check inside lock (if renaming)
|
||||||
|
if b.Name != remote.Name {
|
||||||
|
_, err := taskCollection.ByName(b.Name)
|
||||||
|
if err == nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: mirror %s already exists", b.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
downloader := context.NewDownloader(out)
|
downloader := context.NewDownloader(out)
|
||||||
err := remote.Fetch(downloader, verifier, b.IgnoreSignatures)
|
err = remote.Fetch(downloader, verifier, b.IgnoreSignatures)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
}
|
}
|
||||||
@@ -580,14 +612,14 @@ func apiMirrorsUpdate(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = remote.DownloadPackageIndexes(out, downloader, verifier, collectionFactory, b.IgnoreSignatures, remote.SkipComponentCheck)
|
err = remote.DownloadPackageIndexes(out, downloader, verifier, taskCollectionFactory, b.IgnoreSignatures, remote.SkipComponentCheck)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if remote.DownloadAppStream && !remote.IsFlat() {
|
if remote.DownloadAppStream && !remote.IsFlat() {
|
||||||
err = remote.DownloadAppStreamFiles(out, downloader,
|
err = remote.DownloadAppStreamFiles(out, downloader,
|
||||||
context.PackagePool(), collectionFactory.ChecksumCollection(nil), b.IgnoreChecksums)
|
context.PackagePool(), taskCollectionFactory.ChecksumCollection(nil), b.IgnoreChecksums)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
}
|
}
|
||||||
@@ -607,8 +639,8 @@ func apiMirrorsUpdate(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
queue, downloadSize, err := remote.BuildDownloadQueue(context.PackagePool(), collectionFactory.PackageCollection(),
|
queue, downloadSize, err := remote.BuildDownloadQueue(context.PackagePool(), taskCollectionFactory.PackageCollection(),
|
||||||
collectionFactory.ChecksumCollection(nil), b.SkipExistingPackages, b.LatestOnly)
|
taskCollectionFactory.ChecksumCollection(nil), b.SkipExistingPackages, b.LatestOnly)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
}
|
}
|
||||||
@@ -618,12 +650,12 @@ func apiMirrorsUpdate(c *gin.Context) {
|
|||||||
e := context.ReOpenDatabase()
|
e := context.ReOpenDatabase()
|
||||||
if e == nil {
|
if e == nil {
|
||||||
remote.MarkAsIdle()
|
remote.MarkAsIdle()
|
||||||
_ = collection.Update(remote)
|
_ = taskCollection.Update(remote)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
remote.MarkAsUpdating()
|
remote.MarkAsUpdating()
|
||||||
err = collection.Update(remote)
|
err = taskCollection.Update(remote)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
}
|
}
|
||||||
@@ -727,7 +759,7 @@ func apiMirrorsUpdate(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// and import it back to the pool
|
// and import it back to the pool
|
||||||
task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, collectionFactory.ChecksumCollection(nil))
|
task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, taskCollectionFactory.ChecksumCollection(nil))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
//return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to import file: %s", err)
|
//return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to import file: %s", err)
|
||||||
pushError(err)
|
pushError(err)
|
||||||
@@ -780,8 +812,8 @@ func apiMirrorsUpdate(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Info().Msgf("%s: Finalizing download...", b.Name)
|
log.Info().Msgf("%s: Finalizing download...", b.Name)
|
||||||
_ = remote.FinalizeDownload(collectionFactory, out)
|
_ = remote.FinalizeDownload(taskCollectionFactory, out)
|
||||||
err = collectionFactory.RemoteRepoCollection().Update(remote)
|
err = taskCollection.Update(remote)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user