Configurable background task execution

This commit is contained in:
Lorenzo Bolla
2021-09-08 15:50:54 +02:00
parent bd4c3a246d
commit 9b28d8984f
18 changed files with 427 additions and 214 deletions

1
.gitignore vendored
View File

@@ -41,3 +41,4 @@ build/
pgp/keyrings/aptly2*.gpg
pgp/keyrings/aptly2*.gpg~
pgp/keyrings/.#*

View File

@@ -3,7 +3,9 @@ package api
import (
"fmt"
"log"
"sort"
"strings"
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/deb"
@@ -97,11 +99,11 @@ func releaseDatabaseConnection() error {
// runs tasks in background. Acquires database connection first.
func runTaskInBackground(name string, resources []string, proc task.Process) (task.Task, *task.ResourceConflictError) {
return context.TaskList().RunTaskInBackground(name, resources, func(out *task.Output, detail *task.Detail) error {
return context.TaskList().RunTaskInBackground(name, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
err := acquireDatabaseConnection()
if err != nil {
return err
return -1, err
}
defer releaseDatabaseConnection()
@@ -109,6 +111,39 @@ func runTaskInBackground(name string, resources []string, proc task.Process) (ta
})
}
func truthy(value string) bool {
switch strings.ToLower(value) {
case "y", "yes", "t", "true":
return true
}
return false
}
func maybeRunTaskInBackground(c *gin.Context, name string, resources []string, proc task.Process) {
// Run this task in background if configured globally or per-request
background := context.Config().AsyncAPI || truthy(c.Query("_async"))
if background {
log.Println("Executing task asynchronously")
task, conflictErr := runTaskInBackground(name, resources, proc)
if conflictErr != nil {
c.AbortWithError(409, conflictErr)
return
}
c.JSON(202, task)
} else {
log.Println("Executing task synchronously")
out := context.Progress()
detail := task.Detail{}
retCode, err := proc(out, &detail)
if err != nil {
c.AbortWithError(retCode, err)
return
}
response := detail.Load()
c.JSON(retCode, response)
}
}
// Common piece of code to show list of packages,
// with searching & details if requested
func showPackages(c *gin.Context, reflist *deb.PackageRefList, collectionFactory *deb.CollectionFactory) {

93
api/api_test.go Normal file
View File

@@ -0,0 +1,93 @@
package api
import (
"encoding/json"
ctx "github.com/aptly-dev/aptly/context"
"github.com/gin-gonic/gin"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"testing"
"github.com/smira/flag"
. "gopkg.in/check.v1"
)
func Test(t *testing.T) {
TestingT(t)
}
type ApiSuite struct {
context *ctx.AptlyContext
flags *flag.FlagSet
configFile *os.File
router http.Handler
}
var _ = Suite(&ApiSuite{})
func createTestConfig() *os.File {
file, err := ioutil.TempFile("", "aptly")
if err != nil {
return nil
}
jsonString, err := json.Marshal(gin.H{
"architectures": []string{},
})
if err != nil {
return nil
}
file.Write(jsonString)
return file
}
func (s *ApiSuite) SetUpSuite(c *C) {
file := createTestConfig()
c.Assert(file, NotNil)
s.configFile = file
flags := flag.NewFlagSet("fakeFlags", flag.ContinueOnError)
flags.Bool("no-lock", false, "dummy")
flags.Int("db-open-attempts", 3, "dummy")
flags.String("config", s.configFile.Name(), "dummy")
flags.String("architectures", "", "dummy")
s.flags = flags
context, err := ctx.NewContext(s.flags)
c.Assert(err, IsNil)
s.context = context
s.router = Router(context)
}
func (s *ApiSuite) TearDownSuite(c *C) {
os.Remove(s.configFile.Name())
s.context.Shutdown()
}
func (s *ApiSuite) SetUpTest(c *C) {
}
func (s *ApiSuite) TearDownTest(c *C) {
}
func (s *ApiSuite) HTTPRequest(method string, url string, body io.Reader) (*httptest.ResponseRecorder, error) {
w := httptest.NewRecorder()
req, err := http.NewRequest(method, url, body)
if err != nil {
return nil, err
}
req.Header.Add("Content-Type", "application/json")
s.router.ServeHTTP(w, req)
return w, nil
}
func (s *ApiSuite) TestGetVersion(c *C) {
response, err := s.HTTPRequest("GET", "/api/version", nil)
c.Assert(err, IsNil)
c.Check(response.Code, Equals, 200)
c.Check(response.Body.String(), Matches, ".*Version.*")
}

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"sort"
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/deb"
"github.com/aptly-dev/aptly/task"
"github.com/aptly-dev/aptly/utils"
@@ -14,7 +15,7 @@ import (
func apiDbCleanup(c *gin.Context) {
resources := []string{string(task.AllResourcesKey)}
currTask, conflictErr := runTaskInBackground("Clean up db", resources, func(out *task.Output, detail *task.Detail) error {
maybeRunTaskInBackground(c, "Clean up db", resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
var err error
collectionFactory := context.NewCollectionFactory()
@@ -35,7 +36,7 @@ func apiDbCleanup(c *gin.Context) {
return nil
})
if err != nil {
return err
return -1, err
}
err = collectionFactory.LocalRepoCollection().ForEach(func(repo *deb.LocalRepo) error {
@@ -51,7 +52,7 @@ func apiDbCleanup(c *gin.Context) {
return nil
})
if err != nil {
return err
return -1, err
}
err = collectionFactory.SnapshotCollection().ForEach(func(snapshot *deb.Snapshot) error {
@@ -65,7 +66,7 @@ func apiDbCleanup(c *gin.Context) {
return nil
})
if err != nil {
return err
return -1, err
}
err = collectionFactory.PublishedRepoCollection().ForEach(func(published *deb.PublishedRepo) error {
@@ -83,7 +84,7 @@ func apiDbCleanup(c *gin.Context) {
return nil
})
if err != nil {
return err
return -1, err
}
// ... and compare it to the list of all packages
@@ -107,7 +108,7 @@ func apiDbCleanup(c *gin.Context) {
err = batch.Write()
if err != nil {
return fmt.Errorf("unable to write to DB: %s", err)
return -1, fmt.Errorf("unable to write to DB: %s", err)
}
}
@@ -130,7 +131,7 @@ func apiDbCleanup(c *gin.Context) {
return nil
})
if err != nil {
return err
return -1, err
}
sort.Strings(referencedFiles)
@@ -139,7 +140,7 @@ func apiDbCleanup(c *gin.Context) {
out.Printf("Building list of files in package pool...")
existingFiles, err := context.PackagePool().FilepathList(out)
if err != nil {
return fmt.Errorf("unable to collect file paths: %s", err)
return -1, fmt.Errorf("unable to collect file paths: %s", err)
}
// find files which are in the pool but not referenced by packages
@@ -162,7 +163,7 @@ func apiDbCleanup(c *gin.Context) {
for _, file := range filesToDelete {
size, err = context.PackagePool().Remove(file)
if err != nil {
return err
return -1, err
}
taskDetail.RemainingNumberOfPackagesToDelete--
@@ -174,13 +175,6 @@ func apiDbCleanup(c *gin.Context) {
}
out.Printf("Compacting database...")
return db.CompactDB()
return -1, db.CompactDB()
})
if conflictErr != nil {
c.AbortWithError(409, conflictErr)
return
}
c.JSON(202, currTask)
}

View File

@@ -3,6 +3,7 @@ package api
import (
"fmt"
"log"
"net/http"
"sort"
"strings"
"sync"
@@ -146,29 +147,26 @@ func apiMirrorsDrop(c *gin.Context) {
resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Delete mirror %s", name)
task, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
err := repo.CheckLock()
if err != nil {
return fmt.Errorf("unable to drop: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to drop: %v", err)
}
if !force {
snapshots := snapshotCollection.ByRemoteRepoSource(repo)
if len(snapshots) > 0 {
return fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override")
return http.StatusInternalServerError, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override")
}
}
return mirrorCollection.Drop(repo)
err = mirrorCollection.Drop(repo)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to drop: %v", err)
}
return http.StatusNoContent, nil
})
if conflictErr != nil {
c.AbortWithError(409, conflictErr)
return
}
c.JSON(202, task)
}
// GET /api/mirrors/:name
@@ -352,24 +350,24 @@ func apiMirrorsUpdate(c *gin.Context) {
}
resources := []string{string(remote.Key())}
currTask, conflictErr := runTaskInBackground("Update mirror "+b.Name, resources, func(out *task.Output, detail *task.Detail) error {
maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
downloader := context.NewDownloader(out)
err := remote.Fetch(downloader, verifier)
if err != nil {
return fmt.Errorf("unable to update: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
}
if !b.ForceUpdate {
err = remote.CheckLock()
if err != nil {
return fmt.Errorf("unable to update: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
}
}
err = remote.DownloadPackageIndexes(out, downloader, verifier, collectionFactory, b.SkipComponentCheck)
if err != nil {
return fmt.Errorf("unable to update: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
}
if remote.Filter != "" {
@@ -377,19 +375,19 @@ func apiMirrorsUpdate(c *gin.Context) {
filterQuery, err = query.Parse(remote.Filter)
if err != nil {
return fmt.Errorf("unable to update: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
}
_, _, err = remote.ApplyFilter(context.DependencyOptions(), filterQuery, out)
if err != nil {
return fmt.Errorf("unable to update: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
}
}
queue, downloadSize, err := remote.BuildDownloadQueue(context.PackagePool(), collectionFactory.PackageCollection(),
collectionFactory.ChecksumCollection(nil), b.SkipExistingPackages)
if err != nil {
return fmt.Errorf("unable to update: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
}
defer func() {
@@ -404,7 +402,7 @@ func apiMirrorsUpdate(c *gin.Context) {
remote.MarkAsUpdating()
err = collection.Update(remote)
if err != nil {
return fmt.Errorf("unable to update: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
}
context.GoContextHandleSignals()
@@ -524,7 +522,7 @@ func apiMirrorsUpdate(c *gin.Context) {
// and import it back to the pool
task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, collectionFactory.ChecksumCollection(nil))
if err != nil {
return fmt.Errorf("unable to import file: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to import file: %s", err)
}
// update "attached" files if any
@@ -536,30 +534,23 @@ func apiMirrorsUpdate(c *gin.Context) {
select {
case <-context.Done():
return fmt.Errorf("unable to update: interrupted")
return http.StatusInternalServerError, fmt.Errorf("unable to update: interrupted")
default:
}
if len(errors) > 0 {
log.Printf("%s: Unable to update because of previous errors\n", b.Name)
return fmt.Errorf("unable to update: download errors:\n %s", strings.Join(errors, "\n "))
return http.StatusInternalServerError, fmt.Errorf("unable to update: download errors:\n %s", strings.Join(errors, "\n "))
}
log.Printf("%s: Finalizing download\n", b.Name)
remote.FinalizeDownload(collectionFactory, out)
err = collectionFactory.RemoteRepoCollection().Update(remote)
if err != nil {
return fmt.Errorf("unable to update: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
}
log.Printf("%s: Mirror updated successfully!\n", b.Name)
return nil
return http.StatusNoContent, nil
})
if conflictErr != nil {
c.AbortWithError(409, conflictErr)
return
}
c.JSON(202, currTask)
}

40
api/mirror_test.go Normal file
View File

@@ -0,0 +1,40 @@
package api
import (
"bytes"
"encoding/json"
"github.com/gin-gonic/gin"
. "gopkg.in/check.v1"
)
type MirrorSuite struct {
ApiSuite
}
var _ = Suite(&MirrorSuite{})
func (s *MirrorSuite) TestGetMirrors(c *C) {
response, _ := s.HTTPRequest("GET", "/api/mirrors", nil)
c.Check(response.Code, Equals, 200)
c.Check(response.Body.String(), Equals, "[]")
}
func (s *MirrorSuite) TestDeleteMirrorNonExisting(c *C) {
response, _ := s.HTTPRequest("DELETE", "/api/mirrors/does-not-exist", nil)
c.Check(response.Code, Equals, 404)
c.Check(response.Body.String(), Equals, "{\"error\":\"unable to drop: mirror with name does-not-exist not found\"}")
}
func (s *MirrorSuite) TestCreateMirror(c *C) {
c.ExpectFailure("Need to mock downloads")
body, err := json.Marshal(gin.H{
"Name": "dummy",
"ArchiveURL": "foobar",
})
c.Assert(err, IsNil)
response, err := s.HTTPRequest("POST", "/api/mirrors", bytes.NewReader(body))
c.Assert(err, IsNil)
c.Check(response.Code, Equals, 400)
c.Check(response.Body.String(), Equals, "")
}

View File

@@ -2,8 +2,10 @@ package api
import (
"fmt"
"net/http"
"strings"
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/deb"
"github.com/aptly-dev/aptly/pgp"
"github.com/aptly-dev/aptly/task"
@@ -182,12 +184,12 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
collection := collectionFactory.PublishedRepoCollection()
taskName := fmt.Sprintf("Publish %s: %s", b.SourceKind, strings.Join(names, ", "))
task, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
taskDetail := task.PublishDetail{
Detail: detail,
}
publishOutput := &task.PublishOutput{
Output: out,
Progress: out,
PublishDetail: taskDetail,
}
@@ -214,28 +216,22 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
duplicate := collection.CheckDuplicate(published)
if duplicate != nil {
collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory)
return fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate)
return 400, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate)
}
err := published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite)
if err != nil {
return fmt.Errorf("unable to publish: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to publish: %s", err)
}
err = collection.Add(published)
if err != nil {
return fmt.Errorf("unable to save to DB: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to save to DB: %s", err)
}
return nil
detail.Store(published)
return http.StatusCreated, nil
})
if conflictErr != nil {
c.AbortWithError(409, conflictErr)
return
}
c.JSON(202, task)
}
// PUT /publish/:prefix/:distribution
@@ -333,34 +329,28 @@ func apiPublishUpdateSwitch(c *gin.Context) {
resources = append(resources, string(published.Key()))
taskName := fmt.Sprintf("Update published %s (%s): %s", published.SourceKind, strings.Join(updatedComponents, " "), strings.Join(updatedSnapshots, ", "))
currTask, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
err := published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite)
if err != nil {
return fmt.Errorf("unable to update: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
}
err = collection.Update(published)
if err != nil {
return fmt.Errorf("unable to save to DB: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to save to DB: %s", err)
}
if b.SkipCleanup == nil || !*b.SkipCleanup {
err = collection.CleanupPrefixComponentFiles(published.Prefix, updatedComponents,
context.GetPublishedStorage(storage), collectionFactory, out)
if err != nil {
return fmt.Errorf("unable to update: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)
}
}
return nil
detail.Store(published)
return http.StatusOK, nil
})
if conflictErr != nil {
c.AbortWithError(409, conflictErr)
return
}
c.JSON(202, currTask)
}
// DELETE /publish/:prefix/:distribution
@@ -377,27 +367,21 @@ func apiPublishDrop(c *gin.Context) {
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to drop: %s", err))
c.AbortWithError(http.StatusInternalServerError, fmt.Errorf("unable to drop: %s", err))
return
}
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Delete published %s (%s)", prefix, distribution)
currTask, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
err := collection.Remove(context, storage, prefix, distribution,
collectionFactory, out, force, skipCleanup)
if err != nil {
return fmt.Errorf("unable to drop: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to drop: %s", err)
}
return nil
detail.Store(gin.H{})
return http.StatusOK, nil
})
if conflictErr != nil {
c.AbortWithError(409, conflictErr)
return
}
c.JSON(202, currTask)
}

View File

@@ -2,6 +2,7 @@ package api
import (
"fmt"
"net/http"
"os"
"path/filepath"
"strings"
@@ -81,13 +82,13 @@ func apiReposEdit(c *gin.Context) {
}
if b.Name != nil {
_, err := collection.ByName(*b.Name)
if err == nil {
// already exists
c.AbortWithError(404, err)
return
}
repo.Name = *b.Name
_, err := collection.ByName(*b.Name)
if err == nil {
// already exists
c.AbortWithError(404, err)
return
}
repo.Name = *b.Name
}
if b.Comment != nil {
repo.Comment = *b.Comment
@@ -140,28 +141,22 @@ func apiReposDrop(c *gin.Context) {
resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Delete repo %s", name)
task, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
published := publishedCollection.ByLocalRepo(repo)
if len(published) > 0 {
return fmt.Errorf("unable to drop, local repo is published")
return http.StatusConflict, fmt.Errorf("unable to drop, local repo is published")
}
if !force {
snapshots := snapshotCollection.ByLocalRepoSource(repo)
if len(snapshots) > 0 {
return fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override")
return http.StatusConflict, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override")
}
}
return collection.Drop(repo)
detail.Store(gin.H{})
return http.StatusOK, collection.Drop(repo)
})
if conflictErr != nil {
c.AbortWithError(409, conflictErr)
return
}
c.JSON(202, task)
}
// GET /api/repos/:name/packages
@@ -185,7 +180,7 @@ func apiReposPackagesShow(c *gin.Context) {
}
// Handler for both add and delete
func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(list *deb.PackageList, p *deb.Package, out *task.Output) error) {
func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(list *deb.PackageList, p *deb.Package, out aptly.Progress) error) {
var b struct {
PackageRefs []string
}
@@ -210,11 +205,11 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
}
resources := []string{string(repo.Key())}
currTask, conflictErr := runTaskInBackground(taskNamePrefix+repo.Name, resources, func(out *task.Output, detail *task.Detail) error {
out.Print("Loading packages...\n")
maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
out.Printf("Loading packages...\n")
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
if err != nil {
return err
return http.StatusInternalServerError, err
}
// verify package refs and build package list
@@ -224,33 +219,31 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
p, err = collectionFactory.PackageCollection().ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
return fmt.Errorf("packages %s: %s", ref, err)
return http.StatusNotFound, fmt.Errorf("packages %s: %s", ref, err)
}
return err
return http.StatusInternalServerError, err
}
err = cb(list, p, out)
if err != nil {
return err
return http.StatusBadRequest, err
}
}
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
return collectionFactory.LocalRepoCollection().Update(repo)
err = collectionFactory.LocalRepoCollection().Update(repo)
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("unable to save: %s", err)
}
detail.Store(repo)
return http.StatusOK, nil
})
if conflictErr != nil {
c.AbortWithError(409, conflictErr)
return
}
c.JSON(202, currTask)
}
// POST /repos/:name/packages
func apiReposPackagesAdd(c *gin.Context) {
apiReposPackagesAddDelete(c, "Add packages to repo ", func(list *deb.PackageList, p *deb.Package, out *task.Output) error {
apiReposPackagesAddDelete(c, "Add packages to repo ", func(list *deb.PackageList, p *deb.Package, out aptly.Progress) error {
out.Printf("Adding package %s\n", p.Name)
return list.Add(p)
})
@@ -258,7 +251,7 @@ func apiReposPackagesAdd(c *gin.Context) {
// DELETE /repos/:name/packages
func apiReposPackagesDelete(c *gin.Context) {
apiReposPackagesAddDelete(c, "Delete packages from repo ", func(list *deb.PackageList, p *deb.Package, out *task.Output) error {
apiReposPackagesAddDelete(c, "Delete packages from repo ", func(list *deb.PackageList, p *deb.Package, out aptly.Progress) error {
out.Printf("Removing package %s\n", p.Name)
list.Remove(p)
return nil
@@ -315,7 +308,7 @@ func apiReposPackageFromDir(c *gin.Context) {
resources := []string{string(repo.Key())}
resources = append(resources, sources...)
currTask, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
verifier := context.GetVerifier()
var (
@@ -334,7 +327,7 @@ func apiReposPackageFromDir(c *gin.Context) {
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
if err != nil {
return fmt.Errorf("unable to load packages: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to load packages: %s", err)
}
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
@@ -343,14 +336,14 @@ func apiReposPackageFromDir(c *gin.Context) {
processedFiles = append(processedFiles, otherFiles...)
if err != nil {
return fmt.Errorf("unable to import package files: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to import package files: %s", err)
}
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
err = collectionFactory.LocalRepoCollection().Update(repo)
if err != nil {
return fmt.Errorf("unable to save: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to save: %s", err)
}
if !noRemove {
@@ -384,15 +377,12 @@ func apiReposPackageFromDir(c *gin.Context) {
out.Printf("Failed files: %s\n", strings.Join(failedFiles, ", "))
}
return nil
detail.Store(gin.H{
"Report": reporter,
"FailedFiles": failedFiles,
})
return http.StatusOK, nil
})
if conflictErr != nil {
c.AbortWithError(409, conflictErr)
return
}
c.JSON(202, currTask)
}
// POST /repos/:name/include/:dir/:file
@@ -453,7 +443,7 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
}
resources = append(resources, sources...)
currTask, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
var (
err error
verifier = context.GetVerifier()
@@ -474,7 +464,7 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
failedFiles = append(failedFiles, failedFiles2...)
if err != nil {
return fmt.Errorf("unable to import changes files: %s", err)
return http.StatusInternalServerError, fmt.Errorf("unable to import changes files: %s", err)
}
if !noRemoveFiles {
@@ -499,13 +489,11 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
out.Printf("Failed files: %s\n", strings.Join(failedFiles, ", "))
}
return nil
detail.Store(gin.H{
"Report": reporter,
"FailedFiles": failedFiles,
})
return http.StatusOK, nil
})
if conflictErr != nil {
c.AbortWithError(409, conflictErr)
return
}
c.JSON(202, currTask)
}

View File

@@ -136,6 +136,7 @@ func Router(c *ctx.AptlyContext) http.Handler {
root.GET("/tasks/:id/detail", apiTasksDetailShow)
root.GET("/tasks/:id", apiTasksShow)
root.DELETE("/tasks/:id", apiTasksDelete)
root.POST("/tasks-dummy", apiTasksDummy)
}
return router

View File

@@ -2,7 +2,9 @@ package api
import (
"fmt"
"net/http"
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/deb"
"github.com/aptly-dev/aptly/task"
@@ -60,35 +62,33 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) {
// including snapshot resource key
resources := []string{string(repo.Key()), "S" + b.Name}
taskName := fmt.Sprintf("Create snapshot of mirror %s", name)
currTask, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
err := repo.CheckLock()
if err != nil {
return err
return http.StatusConflict, err
}
err = collection.LoadComplete(repo)
if err != nil {
return err
return http.StatusInternalServerError, err
}
snapshot, err = deb.NewSnapshotFromRepository(b.Name, repo)
if err != nil {
return err
return http.StatusBadRequest, err
}
if b.Description != "" {
snapshot.Description = b.Description
}
return snapshotCollection.Add(snapshot)
err = snapshotCollection.Add(snapshot)
if err != nil {
return http.StatusBadRequest, err
}
detail.Store(snapshot)
return http.StatusCreated, nil
})
if conflictErr != nil {
c.AbortWithError(409, conflictErr)
return
}
c.JSON(202, currTask)
}
// POST /api/snapshots
@@ -137,7 +137,7 @@ func apiSnapshotsCreate(c *gin.Context) {
resources = append(resources, string(sources[i].ResourceKey()))
}
currTask, conflictErr := runTaskInBackground("Create snapshot "+b.Name, resources, func(out *task.Output, detail *task.Detail) error {
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
list := deb.NewPackageList()
// verify package refs and build package list
@@ -145,27 +145,24 @@ func apiSnapshotsCreate(c *gin.Context) {
p, err := collectionFactory.PackageCollection().ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
return fmt.Errorf("package %s: %s", ref, err)
return http.StatusNotFound, fmt.Errorf("package %s: %s", ref, err)
}
return err
return http.StatusInternalServerError, err
}
err = list.Add(p)
if err != nil {
return err
return http.StatusBadRequest, err
}
}
snapshot = deb.NewSnapshotFromRefList(b.Name, sources, deb.NewPackageRefListFromPackageList(list), b.Description)
return snapshotCollection.Add(snapshot)
err = snapshotCollection.Add(snapshot)
if err != nil {
return http.StatusBadRequest, err
}
return http.StatusCreated, nil
})
if conflictErr != nil {
c.AbortWithError(409, conflictErr)
return
}
c.JSON(202, currTask)
}
// POST /api/repos/:name/snapshots
@@ -199,30 +196,28 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) {
// including snapshot resource key
resources := []string{string(repo.Key()), "S" + b.Name}
taskName := fmt.Sprintf("Create snapshot of repo %s", name)
currTask, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
err := collection.LoadComplete(repo)
if err != nil {
return err
return http.StatusInternalServerError, err
}
snapshot, err = deb.NewSnapshotFromLocalRepo(b.Name, repo)
if err != nil {
return err
return http.StatusNotFound, err
}
if b.Description != "" {
snapshot.Description = b.Description
}
return snapshotCollection.Add(snapshot)
err = snapshotCollection.Add(snapshot)
if err != nil {
return http.StatusBadRequest, err
}
detail.Store(snapshot)
return http.StatusCreated, nil
})
if conflictErr != nil {
c.AbortWithError(409, conflictErr)
return
}
c.JSON(202, currTask)
}
// PUT /api/snapshots/:name
@@ -253,10 +248,10 @@ func apiSnapshotsUpdate(c *gin.Context) {
resources := []string{string(snapshot.ResourceKey()), "S" + b.Name}
taskName := fmt.Sprintf("Update snapshot %s", name)
currTask, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
_, err := collection.ByName(b.Name)
if err == nil {
return fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
return http.StatusConflict, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
}
if b.Name != "" {
@@ -267,15 +262,13 @@ func apiSnapshotsUpdate(c *gin.Context) {
snapshot.Description = b.Description
}
return collectionFactory.SnapshotCollection().Update(snapshot)
err = collectionFactory.SnapshotCollection().Update(snapshot)
if err != nil {
return http.StatusInternalServerError, err
}
detail.Store(snapshot)
return http.StatusOK, nil
})
if conflictErr != nil {
c.AbortWithError(409, conflictErr)
return
}
c.JSON(202, currTask)
}
// GET /api/snapshots/:name
@@ -315,29 +308,27 @@ func apiSnapshotsDrop(c *gin.Context) {
resources := []string{string(snapshot.ResourceKey())}
taskName := fmt.Sprintf("Delete snapshot %s", name)
currTask, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error {
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
published := publishedCollection.BySnapshot(snapshot)
if len(published) > 0 {
return fmt.Errorf("unable to drop: snapshot is published")
return http.StatusConflict, fmt.Errorf("unable to drop: snapshot is published")
}
if !force {
snapshots := snapshotCollection.BySnapshotSource(snapshot)
if len(snapshots) > 0 {
return fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override")
return http.StatusConflict, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override")
}
}
return snapshotCollection.Drop(snapshot)
err = snapshotCollection.Drop(snapshot)
if err != nil {
return http.StatusInternalServerError, err
}
detail.Store(gin.H{})
return http.StatusOK, nil
})
if conflictErr != nil {
c.AbortWithError(409, conflictErr)
return
}
c.JSON(202, currTask)
}
// GET /api/snapshots/:name/diff/:withSnapshot

View File

@@ -1,8 +1,11 @@
package api
import (
"fmt"
"net/http"
"strconv"
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/task"
"github.com/gin-gonic/gin"
)
@@ -13,7 +16,7 @@ func apiTasksList(c *gin.Context) {
c.JSON(200, list.GetTasks())
}
// POST /tasks/clear
// POST /tasks-clear
func apiTasksClear(c *gin.Context) {
list := context.TaskList()
list.Clear()
@@ -120,3 +123,15 @@ func apiTasksDelete(c *gin.Context) {
c.JSON(200, delTask)
}
// POST /tasks-dummy
func apiTasksDummy(c *gin.Context) {
resources := []string{"dummy"}
taskName := fmt.Sprintf("Dummy task")
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (int, error) {
out.Printf("Dummy task started\n")
detail.Store([]int{1, 2, 3})
out.Printf("Dummy task finished\n")
return http.StatusTeapot, nil
})
}

72
api/task_test.go Normal file
View File

@@ -0,0 +1,72 @@
package api
import (
"encoding/json"
"fmt"
"github.com/aptly-dev/aptly/task"
. "gopkg.in/check.v1"
)
type TaskSuite struct {
ApiSuite
}
var _ = Suite(&TaskSuite{})
func (s *TaskSuite) TestTasksDummy(c *C) {
response, _ := s.HTTPRequest("POST", "/api/tasks-dummy", nil)
c.Check(response.Code, Equals, 418)
c.Check(response.Body.String(), Equals, "[1,2,3]")
}
func (s *TaskSuite) TestTasksDummyAsync(c *C) {
response, _ := s.HTTPRequest("POST", "/api/tasks-dummy?_async=true", nil)
c.Check(response.Code, Equals, 202)
var t task.Task
err := json.Unmarshal(response.Body.Bytes(), &t)
c.Assert(err, IsNil)
c.Check(t.Name, Equals, "Dummy task")
response, _ = s.HTTPRequest("GET", fmt.Sprintf("/api/tasks/%d/wait", t.ID), nil)
err = json.Unmarshal(response.Body.Bytes(), &t)
c.Assert(err, IsNil)
c.Check(t.State, Equals, task.SUCCEEDED)
response, _ = s.HTTPRequest("GET", fmt.Sprintf("/api/tasks/%d/detail", t.ID), nil)
c.Check(response.Code, Equals, 200)
c.Check(response.Body.String(), Equals, "[1,2,3]")
response, _ = s.HTTPRequest("GET", fmt.Sprintf("/api/tasks/%d/output", t.ID), nil)
c.Check(response.Code, Equals, 200)
c.Check(response.Body.String(), Matches, "\"Dummy task started.*")
}
func (s *TaskSuite) TestTaskDelete(c *C) {
response, _ := s.HTTPRequest("POST", "/api/tasks-dummy?_async=true", nil)
c.Check(response.Code, Equals, 202)
c.Check(response.Body.String(), Equals, "{\"Name\":\"Dummy task\",\"ID\":1,\"State\":0}")
response, _ = s.HTTPRequest("DELETE", "/api/tasks/1", nil)
c.Check(response.Code, Equals, 200)
}
func (s *TaskSuite) TestTasksClear(c *C) {
response, _ := s.HTTPRequest("POST", "/api/tasks-dummy?_async=true", nil)
c.Check(response.Code, Equals, 202)
var t task.Task
err := json.Unmarshal(response.Body.Bytes(), &t)
c.Assert(err, IsNil)
c.Check(t.Name, Equals, "Dummy task")
response, _ = s.HTTPRequest("GET", "/api/tasks-wait", nil)
c.Check(response.Code, Equals, 200)
response, _ = s.HTTPRequest("GET", "/api/tasks", nil)
c.Check(response.Code, Equals, 200)
var ts []task.Task
err = json.Unmarshal(response.Body.Bytes(), &ts)
c.Assert(err, IsNil)
c.Check(len(ts), Equals, 1)
c.Check(ts[0].State, Equals, task.SUCCEEDED)
response, _ = s.HTTPRequest("POST", "/api/tasks-clear", nil)
c.Check(response.Code, Equals, 200)
response, _ = s.HTTPRequest("GET", "/api/tasks", nil)
c.Check(response.Code, Equals, 200)
c.Check(response.Body.String(), Equals, "null")
}

View File

@@ -2,6 +2,7 @@ package task
import (
"fmt"
"github.com/aptly-dev/aptly/aptly"
"sync"
)
@@ -138,10 +139,11 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
}
list.Unlock()
err := process(task.output, task.detail)
retCode, err := process(aptly.Progress(task.output), task.detail)
list.Lock()
{
task.processReturnCode = retCode
if err != nil {
task.output.Printf("Task failed with error: %v", err)
task.State = FAILED

View File

@@ -2,6 +2,7 @@ package task
import (
"errors"
"github.com/aptly-dev/aptly/aptly"
// need to import as check as otherwise List is redeclared
check "gopkg.in/check.v1"
@@ -15,8 +16,8 @@ func (s *ListSuite) TestList(c *check.C) {
list := NewList()
c.Assert(len(list.GetTasks()), check.Equals, 0)
task, err := list.RunTaskInBackground("Successful task", nil, func(out *Output, detail *Detail) error {
return nil
task, err := list.RunTaskInBackground("Successful task", nil, func(out aptly.Progress, detail *Detail) (int, error) {
return -1, nil
})
c.Assert(err, check.IsNil)
list.WaitForTaskByID(task.ID)
@@ -30,10 +31,10 @@ func (s *ListSuite) TestList(c *check.C) {
detail, _ := list.GetTaskDetailByID(task.ID)
c.Check(detail, check.Equals, struct{}{})
task, err = list.RunTaskInBackground("Faulty task", nil, func(out *Output, detail *Detail) error {
task, err = list.RunTaskInBackground("Faulty task", nil, func(out aptly.Progress, detail *Detail) (int, error) {
detail.Store("Details")
out.WriteString("Test Progress\n")
return errors.New("Task failed")
out.Printf("Test Progress\n")
return -1, errors.New("Task failed")
})
c.Assert(err, check.IsNil)
list.WaitForTaskByID(task.ID)

View File

@@ -17,7 +17,7 @@ type Output struct {
// PublishOutput specific output for publishing api
type PublishOutput struct {
*Output
aptly.Progress
PublishDetail
barType *aptly.BarType
}

View File

@@ -1,6 +1,7 @@
package task
import (
"github.com/aptly-dev/aptly/aptly"
"sync/atomic"
)
@@ -20,7 +21,7 @@ type PublishDetail struct {
}
// Process is a function implementing the actual task logic
type Process func(out *Output, detail *Detail) error
type Process func(out aptly.Progress, detail *Detail) (int, error)
const (
// IDLE when task is waiting
@@ -35,12 +36,13 @@ const (
// Task represents as task in a queue encapsulates process code
type Task struct {
output *Output
detail *Detail
process Process
Name string
ID int
State State
output *Output
detail *Detail
process Process
processReturnCode int
Name string
ID int
State State
}
// NewTask creates new task

View File

@@ -31,6 +31,7 @@ type ConfigStructure struct { // nolint: maligned
S3PublishRoots map[string]S3PublishRoot `json:"S3PublishEndpoints"`
SwiftPublishRoots map[string]SwiftPublishRoot `json:"SwiftPublishEndpoints"`
AzurePublishRoots map[string]AzurePublishRoot `json:"AzurePublishEndpoints"`
AsyncAPI bool `json:"AsyncAPI"`
}
// FileSystemPublishRoot describes single filesystem publishing entry point
@@ -103,6 +104,7 @@ var Config = ConfigStructure{
S3PublishRoots: map[string]S3PublishRoot{},
SwiftPublishRoots: map[string]SwiftPublishRoot{},
AzurePublishRoots: map[string]AzurePublishRoot{},
AsyncAPI: false,
}
// LoadConfig loads configuration from json file

View File

@@ -126,6 +126,7 @@ func (s *ConfigSuite) TestSaveConfig(c *C) {
" \"prefix\": \"\"\n"+
" }\n"+
" }\n"+
" \"AsyncAPI\": false\n"+
"}")
}