diff --git a/api/api.go b/api/api.go index c683dfbf..dc3f2463 100644 --- a/api/api.go +++ b/api/api.go @@ -4,6 +4,7 @@ package api import ( "fmt" "log" + "net/http" "sort" "strconv" "strings" @@ -100,11 +101,11 @@ func releaseDatabaseConnection() error { // runs tasks in background. Acquires database connection first. func runTaskInBackground(name string, resources []string, proc task.Process) (task.Task, *task.ResourceConflictError) { - return context.TaskList().RunTaskInBackground(name, resources, func(out aptly.Progress, detail *task.Detail) (int, error) { + return context.TaskList().RunTaskInBackground(name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { err := acquireDatabaseConnection() if err != nil { - return -1, err + return nil, err } defer releaseDatabaseConnection() @@ -142,13 +143,16 @@ func maybeRunTaskInBackground(c *gin.Context, name string, resources []string, p log.Println("Executing task synchronously") out := context.Progress() detail := task.Detail{} - retCode, err := proc(out, &detail) + retValue, err := proc(out, &detail) if err != nil { - c.AbortWithError(retCode, err) + c.AbortWithError(retValue.Code, err) return } - response := detail.Load() - c.JSON(retCode, response) + if retValue != nil { + c.JSON(retValue.Code, retValue.Value) + } else { + c.JSON(http.StatusOK, nil) + } } } diff --git a/api/db.go b/api/db.go index bfdf920f..3f8b826d 100644 --- a/api/db.go +++ b/api/db.go @@ -15,7 +15,7 @@ import ( func apiDbCleanup(c *gin.Context) { resources := []string{string(task.AllResourcesKey)} - maybeRunTaskInBackground(c, "Clean up db", resources, func(out aptly.Progress, detail *task.Detail) (int, error) { + maybeRunTaskInBackground(c, "Clean up db", resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { var err error collectionFactory := context.NewCollectionFactory() @@ -36,7 +36,7 @@ func apiDbCleanup(c *gin.Context) { return nil }) if err != nil { - return -1, err + return nil, err } err = collectionFactory.LocalRepoCollection().ForEach(func(repo *deb.LocalRepo) error { @@ -52,7 +52,7 @@ func apiDbCleanup(c *gin.Context) { return nil }) if err != nil { - return -1, err + return nil, err } err = collectionFactory.SnapshotCollection().ForEach(func(snapshot *deb.Snapshot) error { @@ -66,7 +66,7 @@ func apiDbCleanup(c *gin.Context) { return nil }) if err != nil { - return -1, err + return nil, err } err = collectionFactory.PublishedRepoCollection().ForEach(func(published *deb.PublishedRepo) error { @@ -84,7 +84,7 @@ func apiDbCleanup(c *gin.Context) { return nil }) if err != nil { - return -1, err + return nil, err } // ... and compare it to the list of all packages @@ -108,7 +108,7 @@ func apiDbCleanup(c *gin.Context) { err = batch.Write() if err != nil { - return -1, fmt.Errorf("unable to write to DB: %s", err) + return nil, fmt.Errorf("unable to write to DB: %s", err) } } @@ -131,7 +131,7 @@ func apiDbCleanup(c *gin.Context) { return nil }) if err != nil { - return -1, err + return nil, err } sort.Strings(referencedFiles) @@ -140,7 +140,7 @@ func apiDbCleanup(c *gin.Context) { out.Printf("Building list of files in package pool...") existingFiles, err := context.PackagePool().FilepathList(out) if err != nil { - return -1, fmt.Errorf("unable to collect file paths: %s", err) + return nil, fmt.Errorf("unable to collect file paths: %s", err) } // find files which are in the pool but not referenced by packages @@ -163,7 +163,7 @@ func apiDbCleanup(c *gin.Context) { for _, file := range filesToDelete { size, err = context.PackagePool().Remove(file) if err != nil { - return -1, err + return nil, err } taskDetail.RemainingNumberOfPackagesToDelete-- @@ -175,6 +175,6 @@ func apiDbCleanup(c *gin.Context) { } out.Printf("Compacting database...") - return -1, db.CompactDB() + return nil, db.CompactDB() }) } diff --git a/api/mirror.go b/api/mirror.go index a08f627c..711c5846 100644 --- a/api/mirror.go +++ b/api/mirror.go @@ -147,25 +147,25 @@ func apiMirrorsDrop(c *gin.Context) { resources := []string{string(repo.Key())} taskName := fmt.Sprintf("Delete mirror %s", name) - maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) { + maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { err := repo.CheckLock() if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to drop: %v", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to drop: %v", err) } if !force { snapshots := snapshotCollection.ByRemoteRepoSource(repo) if len(snapshots) > 0 { - return http.StatusInternalServerError, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override") + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override") } } err = mirrorCollection.Drop(repo) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to drop: %v", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to drop: %v", err) } - return http.StatusNoContent, nil + return &task.ProcessReturnValue{http.StatusNoContent, nil}, nil }) } @@ -350,24 +350,24 @@ func apiMirrorsUpdate(c *gin.Context) { } resources := []string{string(remote.Key())} - maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (int, error) { + maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { downloader := context.NewDownloader(out) err := remote.Fetch(downloader, verifier) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err) } if !b.ForceUpdate { err = remote.CheckLock() if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err) } } err = remote.DownloadPackageIndexes(out, downloader, verifier, collectionFactory, b.SkipComponentCheck) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err) } if remote.Filter != "" { @@ -375,19 +375,19 @@ func apiMirrorsUpdate(c *gin.Context) { filterQuery, err = query.Parse(remote.Filter) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err) } _, _, err = remote.ApplyFilter(context.DependencyOptions(), filterQuery, out) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err) } } queue, downloadSize, err := remote.BuildDownloadQueue(context.PackagePool(), collectionFactory.PackageCollection(), collectionFactory.ChecksumCollection(nil), b.SkipExistingPackages) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err) } defer func() { @@ -402,7 +402,7 @@ func apiMirrorsUpdate(c *gin.Context) { remote.MarkAsUpdating() err = collection.Update(remote) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err) } context.GoContextHandleSignals() @@ -512,45 +512,45 @@ func apiMirrorsUpdate(c *gin.Context) { for idx := range queue { - task := &queue[idx] + atask := &queue[idx] - if !task.Done { + if !atask.Done { // download not finished yet continue } // and import it back to the pool - task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, collectionFactory.ChecksumCollection(nil)) + atask.File.PoolPath, err = context.PackagePool().Import(atask.TempDownPath, atask.File.Filename, &atask.File.Checksums, true, collectionFactory.ChecksumCollection(nil)) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to import file: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to import file: %s", err) } // update "attached" files if any - for _, additionalTask := range task.Additional { - additionalTask.File.PoolPath = task.File.PoolPath - additionalTask.File.Checksums = task.File.Checksums + for _, additionalAtask := range atask.Additional { + additionalAtask.File.PoolPath = atask.File.PoolPath + additionalAtask.File.Checksums = atask.File.Checksums } } select { case <-context.Done(): - return http.StatusInternalServerError, fmt.Errorf("unable to update: interrupted") + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: interrupted") default: } if len(errors) > 0 { log.Printf("%s: Unable to update because of previous errors\n", b.Name) - return http.StatusInternalServerError, fmt.Errorf("unable to update: download errors:\n %s", strings.Join(errors, "\n ")) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: download errors:\n %s", strings.Join(errors, "\n ")) } log.Printf("%s: Finalizing download\n", b.Name) remote.FinalizeDownload(collectionFactory, out) err = collectionFactory.RemoteRepoCollection().Update(remote) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err) } log.Printf("%s: Mirror updated successfully!\n", b.Name) - return http.StatusNoContent, nil + return &task.ProcessReturnValue{http.StatusNoContent, nil}, nil }) } diff --git a/api/publish.go b/api/publish.go index 33c137d2..5f3545c0 100644 --- a/api/publish.go +++ b/api/publish.go @@ -184,7 +184,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { collection := collectionFactory.PublishedRepoCollection() taskName := fmt.Sprintf("Publish %s: %s", b.SourceKind, strings.Join(names, ", ")) - maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) { + maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { taskDetail := task.PublishDetail{ Detail: detail, } @@ -216,21 +216,20 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { duplicate := collection.CheckDuplicate(published) if duplicate != nil { collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory) - return 400, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate) + return &task.ProcessReturnValue{http.StatusBadRequest, nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate) } err := published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to publish: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to publish: %s", err) } err = collection.Add(published) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to save to DB: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to save to DB: %s", err) } - detail.Store(published) - return http.StatusCreated, nil + return &task.ProcessReturnValue{http.StatusCreated, published}, nil }) } @@ -329,27 +328,26 @@ func apiPublishUpdateSwitch(c *gin.Context) { resources = append(resources, string(published.Key())) taskName := fmt.Sprintf("Update published %s (%s): %s", published.SourceKind, strings.Join(updatedComponents, " "), strings.Join(updatedSnapshots, ", ")) - maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) { + maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { err := published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err) } err = collection.Update(published) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to save to DB: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to save to DB: %s", err) } if b.SkipCleanup == nil || !*b.SkipCleanup { err = collection.CleanupPrefixComponentFiles(published.Prefix, updatedComponents, context.GetPublishedStorage(storage), collectionFactory, out) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to update: %s", err) } } - detail.Store(published) - return http.StatusOK, nil + return &task.ProcessReturnValue{http.StatusOK, published}, nil }) } @@ -374,14 +372,13 @@ func apiPublishDrop(c *gin.Context) { resources := []string{string(published.Key())} taskName := fmt.Sprintf("Delete published %s (%s)", prefix, distribution) - maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) { + maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { err := collection.Remove(context, storage, prefix, distribution, collectionFactory, out, force, skipCleanup) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to drop: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to drop: %s", err) } - detail.Store(gin.H{}) - return http.StatusOK, nil + return &task.ProcessReturnValue{http.StatusOK, gin.H{}}, nil }) } diff --git a/api/repos.go b/api/repos.go index 94db4332..2ee65fa8 100644 --- a/api/repos.go +++ b/api/repos.go @@ -141,21 +141,20 @@ func apiReposDrop(c *gin.Context) { resources := []string{string(repo.Key())} taskName := fmt.Sprintf("Delete repo %s", name) - maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) { + maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { published := publishedCollection.ByLocalRepo(repo) if len(published) > 0 { - return http.StatusConflict, fmt.Errorf("unable to drop, local repo is published") + return &task.ProcessReturnValue{http.StatusConflict, nil}, fmt.Errorf("unable to drop, local repo is published") } if !force { snapshots := snapshotCollection.ByLocalRepoSource(repo) if len(snapshots) > 0 { - return http.StatusConflict, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override") + return &task.ProcessReturnValue{http.StatusConflict, nil}, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override") } } - detail.Store(gin.H{}) - return http.StatusOK, collection.Drop(repo) + return &task.ProcessReturnValue{http.StatusOK, gin.H{}}, collection.Drop(repo) }) } @@ -205,11 +204,11 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li } resources := []string{string(repo.Key())} - maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, detail *task.Detail) (int, error) { + maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { out.Printf("Loading packages...\n") list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil) if err != nil { - return http.StatusInternalServerError, err + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, err } // verify package refs and build package list @@ -219,14 +218,14 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li p, err = collectionFactory.PackageCollection().ByKey([]byte(ref)) if err != nil { if err == database.ErrNotFound { - return http.StatusNotFound, fmt.Errorf("packages %s: %s", ref, err) + return &task.ProcessReturnValue{http.StatusNotFound, nil}, fmt.Errorf("packages %s: %s", ref, err) } - return http.StatusInternalServerError, err + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, err } err = cb(list, p, out) if err != nil { - return http.StatusBadRequest, err + return &task.ProcessReturnValue{http.StatusBadRequest, nil}, err } } @@ -234,10 +233,9 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li err = collectionFactory.LocalRepoCollection().Update(repo) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to save: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to save: %s", err) } - detail.Store(repo) - return http.StatusOK, nil + return &task.ProcessReturnValue{http.StatusOK, repo}, nil }) } @@ -308,7 +306,7 @@ func apiReposPackageFromDir(c *gin.Context) { resources := []string{string(repo.Key())} resources = append(resources, sources...) - maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) { + maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { verifier := context.GetVerifier() var ( @@ -327,7 +325,7 @@ func apiReposPackageFromDir(c *gin.Context) { list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to load packages: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to load packages: %s", err) } processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(), @@ -336,14 +334,14 @@ func apiReposPackageFromDir(c *gin.Context) { processedFiles = append(processedFiles, otherFiles...) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to import package files: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to import package files: %s", err) } repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list)) err = collectionFactory.LocalRepoCollection().Update(repo) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to save: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to save: %s", err) } if !noRemove { @@ -377,11 +375,10 @@ func apiReposPackageFromDir(c *gin.Context) { out.Printf("Failed files: %s\n", strings.Join(failedFiles, ", ")) } - detail.Store(gin.H{ + return &task.ProcessReturnValue{http.StatusOK, gin.H{ "Report": reporter, "FailedFiles": failedFiles, - }) - return http.StatusOK, nil + }}, nil }) } @@ -443,7 +440,7 @@ func apiReposIncludePackageFromDir(c *gin.Context) { } resources = append(resources, sources...) - maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) { + maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { var ( err error verifier = context.GetVerifier() @@ -464,7 +461,7 @@ func apiReposIncludePackageFromDir(c *gin.Context) { failedFiles = append(failedFiles, failedFiles2...) if err != nil { - return http.StatusInternalServerError, fmt.Errorf("unable to import changes files: %s", err) + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, fmt.Errorf("unable to import changes files: %s", err) } if !noRemoveFiles { @@ -489,11 +486,10 @@ func apiReposIncludePackageFromDir(c *gin.Context) { out.Printf("Failed files: %s\n", strings.Join(failedFiles, ", ")) } - detail.Store(gin.H{ + return &task.ProcessReturnValue{http.StatusOK, gin.H{ "Report": reporter, "FailedFiles": failedFiles, - }) - return http.StatusOK, nil + }}, nil }) } diff --git a/api/router.go b/api/router.go index fb5d6f34..fdfbb940 100644 --- a/api/router.go +++ b/api/router.go @@ -134,6 +134,7 @@ func Router(c *ctx.AptlyContext) http.Handler { root.GET("/tasks/:id/wait", apiTasksWaitForTaskByID) root.GET("/tasks/:id/output", apiTasksOutputShow) root.GET("/tasks/:id/detail", apiTasksDetailShow) + root.GET("/tasks/:id/return_value", apiTasksReturnValueShow) root.GET("/tasks/:id", apiTasksShow) root.DELETE("/tasks/:id", apiTasksDelete) root.POST("/tasks-dummy", apiTasksDummy) diff --git a/api/snapshot.go b/api/snapshot.go index 7cb7112a..423233ad 100644 --- a/api/snapshot.go +++ b/api/snapshot.go @@ -62,20 +62,20 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) { // including snapshot resource key resources := []string{string(repo.Key()), "S" + b.Name} taskName := fmt.Sprintf("Create snapshot of mirror %s", name) - maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) { + maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { err := repo.CheckLock() if err != nil { - return http.StatusConflict, err + return &task.ProcessReturnValue{http.StatusConflict, nil}, err } err = collection.LoadComplete(repo) if err != nil { - return http.StatusInternalServerError, err + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, err } snapshot, err = deb.NewSnapshotFromRepository(b.Name, repo) if err != nil { - return http.StatusBadRequest, err + return &task.ProcessReturnValue{http.StatusBadRequest, nil}, err } if b.Description != "" { @@ -84,10 +84,9 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) { err = snapshotCollection.Add(snapshot) if err != nil { - return http.StatusBadRequest, err + return &task.ProcessReturnValue{http.StatusBadRequest, nil}, err } - detail.Store(snapshot) - return http.StatusCreated, nil + return &task.ProcessReturnValue{http.StatusCreated, snapshot}, nil }) } @@ -137,7 +136,7 @@ func apiSnapshotsCreate(c *gin.Context) { resources = append(resources, string(sources[i].ResourceKey())) } - maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (int, error) { + maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { list := deb.NewPackageList() // verify package refs and build package list @@ -145,13 +144,13 @@ func apiSnapshotsCreate(c *gin.Context) { p, err := collectionFactory.PackageCollection().ByKey([]byte(ref)) if err != nil { if err == database.ErrNotFound { - return http.StatusNotFound, fmt.Errorf("package %s: %s", ref, err) + return &task.ProcessReturnValue{http.StatusNotFound, nil}, fmt.Errorf("package %s: %s", ref, err) } - return http.StatusInternalServerError, err + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, err } err = list.Add(p) if err != nil { - return http.StatusBadRequest, err + return &task.ProcessReturnValue{http.StatusBadRequest, nil}, err } } @@ -159,9 +158,9 @@ func apiSnapshotsCreate(c *gin.Context) { err = snapshotCollection.Add(snapshot) if err != nil { - return http.StatusBadRequest, err + return &task.ProcessReturnValue{http.StatusBadRequest, nil}, err } - return http.StatusCreated, nil + return &task.ProcessReturnValue{http.StatusCreated, nil}, nil }) } @@ -196,15 +195,15 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) { // including snapshot resource key resources := []string{string(repo.Key()), "S" + b.Name} taskName := fmt.Sprintf("Create snapshot of repo %s", name) - maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) { + maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { err := collection.LoadComplete(repo) if err != nil { - return http.StatusInternalServerError, err + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, err } snapshot, err = deb.NewSnapshotFromLocalRepo(b.Name, repo) if err != nil { - return http.StatusNotFound, err + return &task.ProcessReturnValue{http.StatusNotFound, nil}, err } if b.Description != "" { @@ -213,10 +212,9 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) { err = snapshotCollection.Add(snapshot) if err != nil { - return http.StatusBadRequest, err + return &task.ProcessReturnValue{http.StatusBadRequest, nil}, err } - detail.Store(snapshot) - return http.StatusCreated, nil + return &task.ProcessReturnValue{http.StatusCreated, snapshot}, nil }) } @@ -248,10 +246,10 @@ func apiSnapshotsUpdate(c *gin.Context) { resources := []string{string(snapshot.ResourceKey()), "S" + b.Name} taskName := fmt.Sprintf("Update snapshot %s", name) - maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) { + maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { _, err := collection.ByName(b.Name) if err == nil { - return http.StatusConflict, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name) + return &task.ProcessReturnValue{http.StatusConflict, nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name) } if b.Name != "" { @@ -264,10 +262,9 @@ func apiSnapshotsUpdate(c *gin.Context) { err = collectionFactory.SnapshotCollection().Update(snapshot) if err != nil { - return http.StatusInternalServerError, err + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, err } - detail.Store(snapshot) - return http.StatusOK, nil + return &task.ProcessReturnValue{http.StatusOK, snapshot}, nil }) } @@ -308,26 +305,25 @@ func apiSnapshotsDrop(c *gin.Context) { resources := []string{string(snapshot.ResourceKey())} taskName := fmt.Sprintf("Delete snapshot %s", name) - maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) { + maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { published := publishedCollection.BySnapshot(snapshot) if len(published) > 0 { - return http.StatusConflict, fmt.Errorf("unable to drop: snapshot is published") + return &task.ProcessReturnValue{http.StatusConflict, nil}, fmt.Errorf("unable to drop: snapshot is published") } if !force { snapshots := snapshotCollection.BySnapshotSource(snapshot) if len(snapshots) > 0 { - return http.StatusConflict, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override") + return &task.ProcessReturnValue{http.StatusConflict, nil}, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override") } } err = snapshotCollection.Drop(snapshot) if err != nil { - return http.StatusInternalServerError, err + return &task.ProcessReturnValue{http.StatusInternalServerError, nil}, err } - detail.Store(gin.H{}) - return http.StatusOK, nil + return &task.ProcessReturnValue{http.StatusOK, gin.H{}}, nil }) } diff --git a/api/task.go b/api/task.go index a20d99ce..19cb0614 100644 --- a/api/task.go +++ b/api/task.go @@ -105,6 +105,24 @@ func apiTasksDetailShow(c *gin.Context) { c.JSON(200, detail) } +// GET /tasks/:id/return_value +func apiTasksReturnValueShow(c *gin.Context) { + list := context.TaskList() + id, err := strconv.ParseInt(c.Params.ByName("id"), 10, 0) + if err != nil { + c.AbortWithError(500, err) + return + } + + output, err := list.GetTaskReturnValueByID(int(id)) + if err != nil { + c.AbortWithError(404, err) + return + } + + c.JSON(200, output) +} + // DELETE /tasks/:id func apiTasksDelete(c *gin.Context) { list := context.TaskList() @@ -128,10 +146,10 @@ func apiTasksDelete(c *gin.Context) { func apiTasksDummy(c *gin.Context) { resources := []string{"dummy"} taskName := fmt.Sprintf("Dummy task") - maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) { + maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { out.Printf("Dummy task started\n") detail.Store([]int{1, 2, 3}) out.Printf("Dummy task finished\n") - return http.StatusTeapot, nil + return &task.ProcessReturnValue{Code: http.StatusTeapot, Value: []int{1, 2, 3}}, nil }) } diff --git a/task/list.go b/task/list.go index d4dd34bb..d9a62c4a 100644 --- a/task/list.go +++ b/task/list.go @@ -105,6 +105,17 @@ func (list *List) GetTaskDetailByID(ID int) (interface{}, error) { return detail, nil } +// GetTaskReturnValueByID returns process return value of task with given id +func (list *List) GetTaskReturnValueByID(ID int) (*ProcessReturnValue, error) { + task, err := list.GetTaskByID(ID) + + if err != nil { + return nil, err + } + + return task.processReturnValue, nil +} + // RunTaskInBackground creates task and runs it in background. It won't be run and an error // returned if there are running tasks which are using needed resources already. func (list *List) RunTaskInBackground(name string, resources []string, process Process) (Task, *ResourceConflictError) { @@ -139,11 +150,11 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P } list.Unlock() - retCode, err := process(aptly.Progress(task.output), task.detail) + retValue, err := process(aptly.Progress(task.output), task.detail) list.Lock() { - task.processReturnCode = retCode + task.processReturnValue = retValue if err != nil { task.output.Printf("Task failed with error: %v", err) task.State = FAILED diff --git a/task/list_test.go b/task/list_test.go index a9c4ccca..297e97e2 100644 --- a/task/list_test.go +++ b/task/list_test.go @@ -2,6 +2,7 @@ package task import ( "errors" + "github.com/aptly-dev/aptly/aptly" // need to import as check as otherwise List is redeclared @@ -16,8 +17,8 @@ func (s *ListSuite) TestList(c *check.C) { list := NewList() c.Assert(len(list.GetTasks()), check.Equals, 0) - task, err := list.RunTaskInBackground("Successful task", nil, func(out aptly.Progress, detail *Detail) (int, error) { - return -1, nil + task, err := list.RunTaskInBackground("Successful task", nil, func(out aptly.Progress, detail *Detail) (*ProcessReturnValue, error) { + return nil, nil }) c.Assert(err, check.IsNil) list.WaitForTaskByID(task.ID) @@ -31,10 +32,10 @@ func (s *ListSuite) TestList(c *check.C) { detail, _ := list.GetTaskDetailByID(task.ID) c.Check(detail, check.Equals, struct{}{}) - task, err = list.RunTaskInBackground("Faulty task", nil, func(out aptly.Progress, detail *Detail) (int, error) { + task, err = list.RunTaskInBackground("Faulty task", nil, func(out aptly.Progress, detail *Detail) (*ProcessReturnValue, error) { detail.Store("Details") out.Printf("Test Progress\n") - return -1, errors.New("Task failed") + return nil, errors.New("Task failed") }) c.Assert(err, check.IsNil) list.WaitForTaskByID(task.ID) diff --git a/task/task.go b/task/task.go index 2bb764f3..59e6a4bb 100644 --- a/task/task.go +++ b/task/task.go @@ -20,8 +20,13 @@ type PublishDetail struct { RemainingNumberOfPackages int64 } +type ProcessReturnValue struct { + Code int + Value interface{} +} + // Process is a function implementing the actual task logic -type Process func(out aptly.Progress, detail *Detail) (int, error) +type Process func(out aptly.Progress, detail *Detail) (*ProcessReturnValue, error) const ( // IDLE when task is waiting @@ -36,13 +41,13 @@ const ( // Task represents as task in a queue encapsulates process code type Task struct { - output *Output - detail *Detail - process Process - processReturnCode int - Name string - ID int - State State + output *Output + detail *Detail + process Process + processReturnValue *ProcessReturnValue + Name string + ID int + State State } // NewTask creates new task