More informative return value for task.Process

This commit is contained in:
Lorenzo Bolla
2021-09-20 09:43:44 +02:00
parent 0914cd16af
commit ff51c46915
11 changed files with 156 additions and 127 deletions

View File

@@ -4,6 +4,7 @@ package api
import (
"fmt"
"log"
"net/http"
"sort"
"strconv"
"strings"
@@ -100,11 +101,11 @@ func releaseDatabaseConnection() error {
// runs tasks in background. Acquires database connection first.
func runTaskInBackground(name string, resources []string, proc task.Process) (task.Task, *task.ResourceConflictError) {
return context.TaskList().RunTaskInBackground(name, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
return context.TaskList().RunTaskInBackground(name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
err := acquireDatabaseConnection()
if err != nil {
return -1, err
return nil, err
}
defer releaseDatabaseConnection()
@@ -142,13 +143,16 @@ func maybeRunTaskInBackground(c *gin.Context, name string, resources []string, p
log.Println("Executing task synchronously")
out := context.Progress()
detail := task.Detail{}
retCode, err := proc(out, &detail)
retValue, err := proc(out, &detail)
if err != nil {
c.AbortWithError(retCode, err)
c.AbortWithError(retValue.Code, err)
return
}
response := detail.Load()
c.JSON(retCode, response)
if retValue != nil {
c.JSON(retValue.Code, retValue.Value)
} else {
c.JSON(http.StatusOK, nil)
}
}
}

View File

@@ -15,7 +15,7 @@ import (
func apiDbCleanup(c *gin.Context) {
resources := []string{string(task.AllResourcesKey)}
maybeRunTaskInBackground(c, "Clean up db", resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
maybeRunTaskInBackground(c, "Clean up db", resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
var err error
collectionFactory := context.NewCollectionFactory()
@@ -36,7 +36,7 @@ func apiDbCleanup(c *gin.Context) {
return nil
})
if err != nil {
return -1, err
return nil, err
}
err = collectionFactory.LocalRepoCollection().ForEach(func(repo *deb.LocalRepo) error {
@@ -52,7 +52,7 @@ func apiDbCleanup(c *gin.Context) {
return nil
})
if err != nil {
return -1, err
return nil, err
}
err = collectionFactory.SnapshotCollection().ForEach(func(snapshot *deb.Snapshot) error {
@@ -66,7 +66,7 @@ func apiDbCleanup(c *gin.Context) {
return nil
})
if err != nil {
return -1, err
return nil, err
}
err = collectionFactory.PublishedRepoCollection().ForEach(func(published *deb.PublishedRepo) error {
@@ -84,7 +84,7 @@ func apiDbCleanup(c *gin.Context) {
return nil
})
if err != nil {
return -1, err
return nil, err
}
// ... and compare it to the list of all packages
@@ -108,7 +108,7 @@ func apiDbCleanup(c *gin.Context) {
err = batch.Write()
if err != nil {
return -1, fmt.Errorf("unable to write to DB: %s", err)
return nil, fmt.Errorf("unable to write to DB: %s", err)
}
}
@@ -131,7 +131,7 @@ func apiDbCleanup(c *gin.Context) {
return nil
})
if err != nil {
return -1, err
return nil, err
}
sort.Strings(referencedFiles)
@@ -140,7 +140,7 @@ func apiDbCleanup(c *gin.Context) {
out.Printf("Building list of files in package pool...")
existingFiles, err := context.PackagePool().FilepathList(out)
if err != nil {
return -1, fmt.Errorf("unable to collect file paths: %s", err)
return nil, fmt.Errorf("unable to collect file paths: %s", err)
}
// find files which are in the pool but not referenced by packages
@@ -163,7 +163,7 @@ func apiDbCleanup(c *gin.Context) {
for _, file := range filesToDelete {
size, err = context.PackagePool().Remove(file)
if err != nil {
return -1, err
return nil, err
}
taskDetail.RemainingNumberOfPackagesToDelete--
@@ -175,6 +175,6 @@ func apiDbCleanup(c *gin.Context) {
}
out.Printf("Compacting database...")
return -1, db.CompactDB()
return nil, db.CompactDB()
})
}

View File

@@ -147,25 +147,25 @@ func apiMirrorsDrop(c *gin.Context) {
resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Delete mirror %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
err := repo.CheckLock()
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to drop: %v", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to drop: %v", err)
}
if !force {
snapshots := snapshotCollection.ByRemoteRepoSource(repo)
if len(snapshots) > 0 {
return http.StatusInternalServerError, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override")
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override")
}
}
err = mirrorCollection.Drop(repo)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to drop: %v", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to drop: %v", err)
}
return http.StatusNoContent, nil
return &task.ProcessReturnValue{http.StatusNoContent, nil}, nil
})
}
@@ -350,24 +350,24 @@ func apiMirrorsUpdate(c *gin.Context) {
}
resources := []string{string(remote.Key())}
maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
downloader := context.NewDownloader(out)
err := remote.Fetch(downloader, verifier)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err)
}
if !b.ForceUpdate {
err = remote.CheckLock()
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err)
}
}
err = remote.DownloadPackageIndexes(out, downloader, verifier, collectionFactory, b.SkipComponentCheck)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err)
}
if remote.Filter != "" {
@@ -375,19 +375,19 @@ func apiMirrorsUpdate(c *gin.Context) {
filterQuery, err = query.Parse(remote.Filter)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err)
}
_, _, err = remote.ApplyFilter(context.DependencyOptions(), filterQuery, out)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err)
}
}
queue, downloadSize, err := remote.BuildDownloadQueue(context.PackagePool(), collectionFactory.PackageCollection(),
collectionFactory.ChecksumCollection(nil), b.SkipExistingPackages)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err)
}
defer func() {
@@ -402,7 +402,7 @@ func apiMirrorsUpdate(c *gin.Context) {
remote.MarkAsUpdating()
err = collection.Update(remote)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err)
}
context.GoContextHandleSignals()
@@ -512,45 +512,45 @@ func apiMirrorsUpdate(c *gin.Context) {
for idx := range queue {
task := &queue[idx]
atask := &queue[idx]
if !task.Done {
if !atask.Done {
// download not finished yet
continue
}
// and import it back to the pool
task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, collectionFactory.ChecksumCollection(nil))
atask.File.PoolPath, err = context.PackagePool().Import(atask.TempDownPath, atask.File.Filename, &atask.File.Checksums, true, collectionFactory.ChecksumCollection(nil))
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to import file: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to import file: %s", err)
}
// update "attached" files if any
for _, additionalTask := range task.Additional {
additionalTask.File.PoolPath = task.File.PoolPath
additionalTask.File.Checksums = task.File.Checksums
for _, additionalAtask := range atask.Additional {
additionalAtask.File.PoolPath = atask.File.PoolPath
additionalAtask.File.Checksums = atask.File.Checksums
}
}
select {
case <-context.Done():
return http.StatusInternalServerError, fmt.Errorf("unable to update: interrupted")
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: interrupted")
default:
}
if len(errors) > 0 {
log.Printf("%s: Unable to update because of previous errors\n", b.Name)
return http.StatusInternalServerError, fmt.Errorf("unable to update: download errors:\n %s", strings.Join(errors, "\n "))
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: download errors:\n %s", strings.Join(errors, "\n "))
}
log.Printf("%s: Finalizing download\n", b.Name)
remote.FinalizeDownload(collectionFactory, out)
err = collectionFactory.RemoteRepoCollection().Update(remote)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err)
}
log.Printf("%s: Mirror updated successfully!\n", b.Name)
return http.StatusNoContent, nil
return &task.ProcessReturnValue{http.StatusNoContent, nil}, nil
})
}

View File

@@ -184,7 +184,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
collection := collectionFactory.PublishedRepoCollection()
taskName := fmt.Sprintf("Publish %s: %s", b.SourceKind, strings.Join(names, ", "))
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
taskDetail := task.PublishDetail{
Detail: detail,
}
@@ -216,21 +216,20 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
duplicate := collection.CheckDuplicate(published)
if duplicate != nil {
collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory)
return 400, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate)
return &task.ProcessReturnValue{http.StatusBadRequest, nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate)
}
err := published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to publish: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to publish: %s", err)
}
err = collection.Add(published)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to save to DB: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to save to DB: %s", err)
}
detail.Store(published)
return http.StatusCreated, nil
return &task.ProcessReturnValue{http.StatusCreated, published}, nil
})
}
@@ -329,27 +328,26 @@ func apiPublishUpdateSwitch(c *gin.Context) {
resources = append(resources, string(published.Key()))
taskName := fmt.Sprintf("Update published %s (%s): %s", published.SourceKind, strings.Join(updatedComponents, " "), strings.Join(updatedSnapshots, ", "))
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
err := published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err)
}
err = collection.Update(published)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to save to DB: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to save to DB: %s", err)
}
if b.SkipCleanup == nil || !*b.SkipCleanup {
err = collection.CleanupPrefixComponentFiles(published.Prefix, updatedComponents,
context.GetPublishedStorage(storage), collectionFactory, out)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err)
}
}
detail.Store(published)
return http.StatusOK, nil
return &task.ProcessReturnValue{http.StatusOK, published}, nil
})
}
@@ -374,14 +372,13 @@ func apiPublishDrop(c *gin.Context) {
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Delete published %s (%s)", prefix, distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
err := collection.Remove(context, storage, prefix, distribution,
collectionFactory, out, force, skipCleanup)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to drop: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to drop: %s", err)
}
detail.Store(gin.H{})
return http.StatusOK, nil
return &task.ProcessReturnValue{http.StatusOK, gin.H{}}, nil
})
}

View File

@@ -141,21 +141,20 @@ func apiReposDrop(c *gin.Context) {
resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Delete repo %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
published := publishedCollection.ByLocalRepo(repo)
if len(published) > 0 {
return http.StatusConflict, fmt.Errorf("unable to drop, local repo is published")
return &task.ProcessReturnValue{http.StatusConflict, nil}, fmt.Errorf("unable to drop, local repo is published")
}
if !force {
snapshots := snapshotCollection.ByLocalRepoSource(repo)
if len(snapshots) > 0 {
return http.StatusConflict, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override")
return &task.ProcessReturnValue{http.StatusConflict, nil}, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override")
}
}
detail.Store(gin.H{})
return http.StatusOK, collection.Drop(repo)
return &task.ProcessReturnValue{http.StatusOK, gin.H{}}, collection.Drop(repo)
})
}
@@ -205,11 +204,11 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
}
resources := []string{string(repo.Key())}
maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
out.Printf("Loading packages...\n")
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
if err != nil {
return http.StatusInternalServerError, err
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, err
}
// verify package refs and build package list
@@ -219,14 +218,14 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
p, err = collectionFactory.PackageCollection().ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
return http.StatusNotFound, fmt.Errorf("packages %s: %s", ref, err)
return &task.ProcessReturnValue{http.StatusNotFound, nil}, fmt.Errorf("packages %s: %s", ref, err)
}
return http.StatusInternalServerError, err
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, err
}
err = cb(list, p, out)
if err != nil {
return http.StatusBadRequest, err
return &task.ProcessReturnValue{http.StatusBadRequest, nil}, err
}
}
@@ -234,10 +233,9 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
err = collectionFactory.LocalRepoCollection().Update(repo)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to save: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to save: %s", err)
}
detail.Store(repo)
return http.StatusOK, nil
return &task.ProcessReturnValue{http.StatusOK, repo}, nil
})
}
@@ -308,7 +306,7 @@ func apiReposPackageFromDir(c *gin.Context) {
resources := []string{string(repo.Key())}
resources = append(resources, sources...)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
verifier := context.GetVerifier()
var (
@@ -327,7 +325,7 @@ func apiReposPackageFromDir(c *gin.Context) {
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to load packages: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to load packages: %s", err)
}
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
@@ -336,14 +334,14 @@ func apiReposPackageFromDir(c *gin.Context) {
processedFiles = append(processedFiles, otherFiles...)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to import package files: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to import package files: %s", err)
}
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
err = collectionFactory.LocalRepoCollection().Update(repo)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to save: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to save: %s", err)
}
if !noRemove {
@@ -377,11 +375,10 @@ func apiReposPackageFromDir(c *gin.Context) {
out.Printf("Failed files: %s\n", strings.Join(failedFiles, ", "))
}
detail.Store(gin.H{
return &task.ProcessReturnValue{http.StatusOK, gin.H{
"Report": reporter,
"FailedFiles": failedFiles,
})
return http.StatusOK, nil
}}, nil
})
}
@@ -443,7 +440,7 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
}
resources = append(resources, sources...)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
var (
err error
verifier = context.GetVerifier()
@@ -464,7 +461,7 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
failedFiles = append(failedFiles, failedFiles2...)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to import changes files: %s", err)
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to import changes files: %s", err)
}
if !noRemoveFiles {
@@ -489,11 +486,10 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
out.Printf("Failed files: %s\n", strings.Join(failedFiles, ", "))
}
detail.Store(gin.H{
return &task.ProcessReturnValue{http.StatusOK, gin.H{
"Report": reporter,
"FailedFiles": failedFiles,
})
return http.StatusOK, nil
}}, nil
})
}

View File

@@ -134,6 +134,7 @@ func Router(c *ctx.AptlyContext) http.Handler {
root.GET("/tasks/:id/wait", apiTasksWaitForTaskByID)
root.GET("/tasks/:id/output", apiTasksOutputShow)
root.GET("/tasks/:id/detail", apiTasksDetailShow)
root.GET("/tasks/:id/return_value", apiTasksReturnValueShow)
root.GET("/tasks/:id", apiTasksShow)
root.DELETE("/tasks/:id", apiTasksDelete)
root.POST("/tasks-dummy", apiTasksDummy)

View File

@@ -62,20 +62,20 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) {
// including snapshot resource key
resources := []string{string(repo.Key()), "S" + b.Name}
taskName := fmt.Sprintf("Create snapshot of mirror %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
err := repo.CheckLock()
if err != nil {
return http.StatusConflict, err
return &task.ProcessReturnValue{http.StatusConflict, nil}, err
}
err = collection.LoadComplete(repo)
if err != nil {
return http.StatusInternalServerError, err
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, err
}
snapshot, err = deb.NewSnapshotFromRepository(b.Name, repo)
if err != nil {
return http.StatusBadRequest, err
return &task.ProcessReturnValue{http.StatusBadRequest, nil}, err
}
if b.Description != "" {
@@ -84,10 +84,9 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) {
err = snapshotCollection.Add(snapshot)
if err != nil {
return http.StatusBadRequest, err
return &task.ProcessReturnValue{http.StatusBadRequest, nil}, err
}
detail.Store(snapshot)
return http.StatusCreated, nil
return &task.ProcessReturnValue{http.StatusCreated, snapshot}, nil
})
}
@@ -137,7 +136,7 @@ func apiSnapshotsCreate(c *gin.Context) {
resources = append(resources, string(sources[i].ResourceKey()))
}
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
list := deb.NewPackageList()
// verify package refs and build package list
@@ -145,13 +144,13 @@ func apiSnapshotsCreate(c *gin.Context) {
p, err := collectionFactory.PackageCollection().ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
return http.StatusNotFound, fmt.Errorf("package %s: %s", ref, err)
return &task.ProcessReturnValue{http.StatusNotFound, nil}, fmt.Errorf("package %s: %s", ref, err)
}
return http.StatusInternalServerError, err
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, err
}
err = list.Add(p)
if err != nil {
return http.StatusBadRequest, err
return &task.ProcessReturnValue{http.StatusBadRequest, nil}, err
}
}
@@ -159,9 +158,9 @@ func apiSnapshotsCreate(c *gin.Context) {
err = snapshotCollection.Add(snapshot)
if err != nil {
return http.StatusBadRequest, err
return &task.ProcessReturnValue{http.StatusBadRequest, nil}, err
}
return http.StatusCreated, nil
return &task.ProcessReturnValue{http.StatusCreated, nil}, nil
})
}
@@ -196,15 +195,15 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) {
// including snapshot resource key
resources := []string{string(repo.Key()), "S" + b.Name}
taskName := fmt.Sprintf("Create snapshot of repo %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
err := collection.LoadComplete(repo)
if err != nil {
return http.StatusInternalServerError, err
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, err
}
snapshot, err = deb.NewSnapshotFromLocalRepo(b.Name, repo)
if err != nil {
return http.StatusNotFound, err
return &task.ProcessReturnValue{http.StatusNotFound, nil}, err
}
if b.Description != "" {
@@ -213,10 +212,9 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) {
err = snapshotCollection.Add(snapshot)
if err != nil {
return http.StatusBadRequest, err
return &task.ProcessReturnValue{http.StatusBadRequest, nil}, err
}
detail.Store(snapshot)
return http.StatusCreated, nil
return &task.ProcessReturnValue{http.StatusCreated, snapshot}, nil
})
}
@@ -248,10 +246,10 @@ func apiSnapshotsUpdate(c *gin.Context) {
resources := []string{string(snapshot.ResourceKey()), "S" + b.Name}
taskName := fmt.Sprintf("Update snapshot %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
_, err := collection.ByName(b.Name)
if err == nil {
return http.StatusConflict, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
return &task.ProcessReturnValue{http.StatusConflict, nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
}
if b.Name != "" {
@@ -264,10 +262,9 @@ func apiSnapshotsUpdate(c *gin.Context) {
err = collectionFactory.SnapshotCollection().Update(snapshot)
if err != nil {
return http.StatusInternalServerError, err
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, err
}
detail.Store(snapshot)
return http.StatusOK, nil
return &task.ProcessReturnValue{http.StatusOK, snapshot}, nil
})
}
@@ -308,26 +305,25 @@ func apiSnapshotsDrop(c *gin.Context) {
resources := []string{string(snapshot.ResourceKey())}
taskName := fmt.Sprintf("Delete snapshot %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
published := publishedCollection.BySnapshot(snapshot)
if len(published) > 0 {
return http.StatusConflict, fmt.Errorf("unable to drop: snapshot is published")
return &task.ProcessReturnValue{http.StatusConflict, nil}, fmt.Errorf("unable to drop: snapshot is published")
}
if !force {
snapshots := snapshotCollection.BySnapshotSource(snapshot)
if len(snapshots) > 0 {
return http.StatusConflict, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override")
return &task.ProcessReturnValue{http.StatusConflict, nil}, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override")
}
}
err = snapshotCollection.Drop(snapshot)
if err != nil {
return http.StatusInternalServerError, err
return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, err
}
detail.Store(gin.H{})
return http.StatusOK, nil
return &task.ProcessReturnValue{http.StatusOK, gin.H{}}, nil
})
}

View File

@@ -105,6 +105,24 @@ func apiTasksDetailShow(c *gin.Context) {
c.JSON(200, detail)
}
// GET /tasks/:id/return_value
func apiTasksReturnValueShow(c *gin.Context) {
list := context.TaskList()
id, err := strconv.ParseInt(c.Params.ByName("id"), 10, 0)
if err != nil {
c.AbortWithError(500, err)
return
}
output, err := list.GetTaskReturnValueByID(int(id))
if err != nil {
c.AbortWithError(404, err)
return
}
c.JSON(200, output)
}
// DELETE /tasks/:id
func apiTasksDelete(c *gin.Context) {
list := context.TaskList()
@@ -128,10 +146,10 @@ func apiTasksDelete(c *gin.Context) {
func apiTasksDummy(c *gin.Context) {
resources := []string{"dummy"}
taskName := fmt.Sprintf("Dummy task")
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
out.Printf("Dummy task started\n")
detail.Store([]int{1, 2, 3})
out.Printf("Dummy task finished\n")
return http.StatusTeapot, nil
return &task.ProcessReturnValue{Code: http.StatusTeapot, Value: []int{1, 2, 3}}, nil
})
}

View File

@@ -105,6 +105,17 @@ func (list *List) GetTaskDetailByID(ID int) (interface{}, error) {
return detail, nil
}
// GetTaskReturnValueByID returns process return value of task with given id
func (list *List) GetTaskReturnValueByID(ID int) (*ProcessReturnValue, error) {
task, err := list.GetTaskByID(ID)
if err != nil {
return nil, err
}
return task.processReturnValue, nil
}
// RunTaskInBackground creates task and runs it in background. It won't be run and an error
// returned if there are running tasks which are using needed resources already.
func (list *List) RunTaskInBackground(name string, resources []string, process Process) (Task, *ResourceConflictError) {
@@ -139,11 +150,11 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
}
list.Unlock()
retCode, err := process(aptly.Progress(task.output), task.detail)
retValue, err := process(aptly.Progress(task.output), task.detail)
list.Lock()
{
task.processReturnCode = retCode
task.processReturnValue = retValue
if err != nil {
task.output.Printf("Task failed with error: %v", err)
task.State = FAILED

View File

@@ -2,6 +2,7 @@ package task
import (
"errors"
"github.com/aptly-dev/aptly/aptly"
// need to import as check as otherwise List is redeclared
@@ -16,8 +17,8 @@ func (s *ListSuite) TestList(c *check.C) {
list := NewList()
c.Assert(len(list.GetTasks()), check.Equals, 0)
task, err := list.RunTaskInBackground("Successful task", nil, func(out aptly.Progress, detail *Detail) (int, error) {
return -1, nil
task, err := list.RunTaskInBackground("Successful task", nil, func(out aptly.Progress, detail *Detail) (*ProcessReturnValue, error) {
return nil, nil
})
c.Assert(err, check.IsNil)
list.WaitForTaskByID(task.ID)
@@ -31,10 +32,10 @@ func (s *ListSuite) TestList(c *check.C) {
detail, _ := list.GetTaskDetailByID(task.ID)
c.Check(detail, check.Equals, struct{}{})
task, err = list.RunTaskInBackground("Faulty task", nil, func(out aptly.Progress, detail *Detail) (int, error) {
task, err = list.RunTaskInBackground("Faulty task", nil, func(out aptly.Progress, detail *Detail) (*ProcessReturnValue, error) {
detail.Store("Details")
out.Printf("Test Progress\n")
return -1, errors.New("Task failed")
return nil, errors.New("Task failed")
})
c.Assert(err, check.IsNil)
list.WaitForTaskByID(task.ID)

View File

@@ -20,8 +20,13 @@ type PublishDetail struct {
RemainingNumberOfPackages int64
}
type ProcessReturnValue struct {
Code int
Value interface{}
}
// Process is a function implementing the actual task logic
type Process func(out aptly.Progress, detail *Detail) (int, error)
type Process func(out aptly.Progress, detail *Detail) (*ProcessReturnValue, error)
const (
// IDLE when task is waiting
@@ -36,13 +41,13 @@ const (
// Task represents as task in a queue encapsulates process code
type Task struct {
output *Output
detail *Detail
process Process
processReturnCode int
Name string
ID int
State State
output *Output
detail *Detail
process Process
processReturnValue *ProcessReturnValue
Name string
ID int
State State
}
// NewTask creates new task