diff --git a/api/api.go b/api/api.go index 70a02abe..cb33dcaf 100644 --- a/api/api.go +++ b/api/api.go @@ -8,6 +8,7 @@ import ( "github.com/aptly-dev/aptly/aptly" "github.com/aptly-dev/aptly/deb" "github.com/aptly-dev/aptly/query" + "github.com/aptly-dev/aptly/task" "github.com/gin-gonic/gin" ) @@ -34,12 +35,14 @@ type dbRequest struct { err chan<- error } +var dbRequests chan dbRequest + // Acquire database lock and release it when not needed anymore. // // Should be run in a goroutine! -func acquireDatabase(requests <-chan dbRequest) { +func acquireDatabase() { clients := 0 - for request := range requests { + for request := range dbRequests { var err error switch request.kind { @@ -66,6 +69,46 @@ func acquireDatabase(requests <-chan dbRequest) { } } +// Should be called before database access is needed in any api call. +// Happens per default for each api call. It is important that you run +// runTaskInBackground to run a task which accquire database. +// Important do not forget to defer to releaseDatabaseConnection +func acquireDatabaseConnection() error { + if dbRequests == nil { + return nil + } + + errCh := make(chan error) + dbRequests <- dbRequest{acquiredb, errCh} + + return <-errCh +} + +// Release database connection when not needed anymore +func releaseDatabaseConnection() error { + if dbRequests == nil { + return nil + } + + errCh := make(chan error) + dbRequests <- dbRequest{releasedb, errCh} + return <-errCh +} + +// runs tasks in background. Acquires database connection first. +func runTaskInBackground(name string, resources []string, proc task.Process) (task.Task, *task.ResourceConflictError) { + return context.TaskList().RunTaskInBackground(name, resources, func(out *task.Output, detail *task.Detail) error { + err := acquireDatabaseConnection() + + if err != nil { + return err + } + + defer releaseDatabaseConnection() + return proc(out, detail) + }) +} + // Common piece of code to show list of packages, // with searching & details if requested func showPackages(c *gin.Context, reflist *deb.PackageRefList, collectionFactory *deb.CollectionFactory) { diff --git a/api/publish.go b/api/publish.go index 771613c8..557f40db 100644 --- a/api/publish.go +++ b/api/publish.go @@ -6,6 +6,7 @@ import ( "github.com/aptly-dev/aptly/deb" "github.com/aptly-dev/aptly/pgp" + "github.com/aptly-dev/aptly/task" "github.com/aptly-dev/aptly/utils" "github.com/gin-gonic/gin" ) @@ -114,7 +115,9 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { } var components []string + var names []string var sources []interface{} + var resources []string collectionFactory := context.NewCollectionFactory() if b.SourceKind == "snapshot" { @@ -124,6 +127,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { for _, source := range b.Sources { components = append(components, source.Component) + names = append(names, source.Name) snapshot, err = snapshotCollection.ByName(source.Name) if err != nil { @@ -131,6 +135,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { return } + resources = append(resources, string(snapshot.ResourceKey())) err = snapshotCollection.LoadComplete(snapshot) if err != nil { c.AbortWithError(500, fmt.Errorf("unable to publish: %s", err)) @@ -146,6 +151,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { for _, source := range b.Sources { components = append(components, source.Component) + names = append(names, source.Name) localRepo, err = localCollection.ByName(source.Name) if err != nil { @@ -153,6 +159,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { return } + resources = append(resources, string(localRepo.Key())) err = localCollection.LoadComplete(localRepo) if err != nil { c.AbortWithError(500, fmt.Errorf("unable to publish: %s", err)) @@ -165,53 +172,62 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { return } - collection := collectionFactory.PublishedRepoCollection() - published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory) if err != nil { c.AbortWithError(500, fmt.Errorf("unable to publish: %s", err)) return } - if b.Origin != "" { - published.Origin = b.Origin - } - if b.NotAutomatic != "" { - published.NotAutomatic = b.NotAutomatic - } - if b.ButAutomaticUpgrades != "" { - published.ButAutomaticUpgrades = b.ButAutomaticUpgrades - } - published.Label = b.Label - published.SkipContents = context.Config().SkipContentsPublishing - if b.SkipContents != nil { - published.SkipContents = *b.SkipContents - } + resources = append(resources, string(published.Key())) + collection := collectionFactory.PublishedRepoCollection() - if b.AcquireByHash != nil { - published.AcquireByHash = *b.AcquireByHash - } + taskName := fmt.Sprintf("Publish %s: %s", b.SourceKind, strings.Join(names, ", ")) + task, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error { + if b.Origin != "" { + published.Origin = b.Origin + } + if b.NotAutomatic != "" { + published.NotAutomatic = b.NotAutomatic + } + if b.ButAutomaticUpgrades != "" { + published.ButAutomaticUpgrades = b.ButAutomaticUpgrades + } + published.Label = b.Label - duplicate := collection.CheckDuplicate(published) - if duplicate != nil { - collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory) - c.AbortWithError(400, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate)) + published.SkipContents = context.Config().SkipContentsPublishing + if b.SkipContents != nil { + published.SkipContents = *b.SkipContents + } + + if b.AcquireByHash != nil { + published.AcquireByHash = *b.AcquireByHash + } + + duplicate := collection.CheckDuplicate(published) + if duplicate != nil { + collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory) + return fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate) + } + + err := published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite) + if err != nil { + return fmt.Errorf("unable to publish: %s", err) + } + + err = collection.Add(published) + if err != nil { + return fmt.Errorf("unable to save to DB: %s", err) + } + + return nil + }) + + if conflictErr != nil { + c.AbortWithError(409, conflictErr) return } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, nil, b.ForceOverwrite) - if err != nil { - c.AbortWithError(500, fmt.Errorf("unable to publish: %s", err)) - return - } - - err = collection.Add(published) - if err != nil { - c.AbortWithError(500, fmt.Errorf("unable to save to DB: %s", err)) - return - } - - c.JSON(201, published) + c.JSON(202, task) } // PUT /publish/:prefix/:distribution @@ -257,6 +273,8 @@ func apiPublishUpdateSwitch(c *gin.Context) { } var updatedComponents []string + var updatedSnapshots []string + var resources []string if published.SourceKind == deb.SourceLocalRepo { if len(b.Snapshots) > 0 { @@ -290,6 +308,7 @@ func apiPublishUpdateSwitch(c *gin.Context) { published.UpdateSnapshot(snapshotInfo.Component, snapshot) updatedComponents = append(updatedComponents, snapshotInfo.Component) + updatedSnapshots = append(updatedSnapshots, snapshot.Name) } } else { c.AbortWithError(500, fmt.Errorf("unknown published repository type")) @@ -304,28 +323,36 @@ func apiPublishUpdateSwitch(c *gin.Context) { published.AcquireByHash = *b.AcquireByHash } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, nil, b.ForceOverwrite) - if err != nil { - c.AbortWithError(500, fmt.Errorf("unable to update: %s", err)) - return - } - - err = collection.Update(published) - if err != nil { - c.AbortWithError(500, fmt.Errorf("unable to save to DB: %s", err)) - return - } - - if b.SkipCleanup == nil || !*b.SkipCleanup { - err = collection.CleanupPrefixComponentFiles(published.Prefix, updatedComponents, - context.GetPublishedStorage(storage), collectionFactory, nil) + resources = append(resources, string(published.Key())) + taskName := fmt.Sprintf("Update published %s (%s): %s", published.SourceKind, strings.Join(updatedComponents, " "), strings.Join(updatedSnapshots, ", ")) + currTask, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error { + err := published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite) if err != nil { - c.AbortWithError(500, fmt.Errorf("unable to update: %s", err)) - return + return fmt.Errorf("unable to update: %s", err) } + + err = collection.Update(published) + if err != nil { + return fmt.Errorf("unable to save to DB: %s", err) + } + + if b.SkipCleanup == nil || !*b.SkipCleanup { + err = collection.CleanupPrefixComponentFiles(published.Prefix, updatedComponents, + context.GetPublishedStorage(storage), collectionFactory, out) + if err != nil { + return fmt.Errorf("unable to update: %s", err) + } + } + + return nil + }) + + if conflictErr != nil { + c.AbortWithError(409, conflictErr) + return } - c.JSON(200, published) + c.JSON(202, currTask) } // DELETE /publish/:prefix/:distribution @@ -340,12 +367,29 @@ func apiPublishDrop(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() - err := collection.Remove(context, storage, prefix, distribution, - collectionFactory, context.Progress(), force, skipCleanup) + published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { c.AbortWithError(500, fmt.Errorf("unable to drop: %s", err)) return } - c.JSON(200, gin.H{}) + resources := []string{string(published.Key())} + + taskName := fmt.Sprintf("Delete published %s (%s)", prefix, distribution) + currTask, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error { + err := collection.Remove(context, storage, prefix, distribution, + collectionFactory, out, force, skipCleanup) + if err != nil { + return fmt.Errorf("unable to drop: %s", err) + } + + return nil + }) + + if conflictErr != nil { + c.AbortWithError(409, conflictErr) + return + } + + c.JSON(202, currTask) } diff --git a/api/repos.go b/api/repos.go index e570ac4b..b0792f49 100644 --- a/api/repos.go +++ b/api/repos.go @@ -4,11 +4,14 @@ import ( "fmt" "os" "path/filepath" + "strings" + "text/template" "github.com/aptly-dev/aptly/aptly" "github.com/aptly-dev/aptly/database" "github.com/aptly-dev/aptly/deb" "github.com/aptly-dev/aptly/query" + "github.com/aptly-dev/aptly/task" "github.com/aptly-dev/aptly/utils" "github.com/gin-gonic/gin" ) @@ -112,39 +115,43 @@ func apiReposShow(c *gin.Context) { // DELETE /api/repos/:name func apiReposDrop(c *gin.Context) { force := c.Request.URL.Query().Get("force") == "1" + name := c.Params.ByName("name") collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() snapshotCollection := collectionFactory.SnapshotCollection() publishedCollection := collectionFactory.PublishedRepoCollection() - repo, err := collection.ByName(c.Params.ByName("name")) + repo, err := collection.ByName(name) if err != nil { c.AbortWithError(404, err) return } - published := publishedCollection.ByLocalRepo(repo) - if len(published) > 0 { - c.AbortWithError(409, fmt.Errorf("unable to drop, local repo is published")) - return - } - - if !force { - snapshots := snapshotCollection.ByLocalRepoSource(repo) - if len(snapshots) > 0 { - c.AbortWithError(409, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override")) - return + resources := []string{string(repo.Key())} + taskName := fmt.Sprintf("Delete repo %s", name) + task, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error { + published := publishedCollection.ByLocalRepo(repo) + if len(published) > 0 { + return fmt.Errorf("unable to drop, local repo is published") } - } - err = collection.Drop(repo) - if err != nil { - c.AbortWithError(500, err) + if !force { + snapshots := snapshotCollection.ByLocalRepoSource(repo) + if len(snapshots) > 0 { + return fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override") + } + } + + return collection.Drop(repo) + }) + + if conflictErr != nil { + c.AbortWithError(409, conflictErr) return } - c.JSON(200, gin.H{}) + c.JSON(202, task) } // GET /api/repos/:name/packages @@ -168,7 +175,7 @@ func apiReposPackagesShow(c *gin.Context) { } // Handler for both add and delete -func apiReposPackagesAddDelete(c *gin.Context, cb func(list *deb.PackageList, p *deb.Package) error) { +func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(list *deb.PackageList, p *deb.Package, out *task.Output) error) { var b struct { PackageRefs []string } @@ -192,54 +199,57 @@ func apiReposPackagesAddDelete(c *gin.Context, cb func(list *deb.PackageList, p return } - list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil) - if err != nil { - c.AbortWithError(500, err) - return - } - - // verify package refs and build package list - for _, ref := range b.PackageRefs { - var p *deb.Package - - p, err = collectionFactory.PackageCollection().ByKey([]byte(ref)) + resources := []string{string(repo.Key())} + currTask, conflictErr := runTaskInBackground(taskNamePrefix+repo.Name, resources, func(out *task.Output, detail *task.Detail) error { + out.Print("Loading packages...\n") + list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil) if err != nil { - if err == database.ErrNotFound { - c.AbortWithError(404, fmt.Errorf("package %s: %s", ref, err)) - } else { - c.AbortWithError(500, err) + return err + } + + // verify package refs and build package list + for _, ref := range b.PackageRefs { + var p *deb.Package + + p, err = collectionFactory.PackageCollection().ByKey([]byte(ref)) + if err != nil { + if err == database.ErrNotFound { + return fmt.Errorf("packages %s: %s", ref, err) + } + + return err + } + err = cb(list, p, out) + if err != nil { + return err } - return } - err = cb(list, p) - if err != nil { - c.AbortWithError(400, err) - return - } - } - repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list)) + repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list)) - err = collectionFactory.LocalRepoCollection().Update(repo) - if err != nil { - c.AbortWithError(500, fmt.Errorf("unable to save: %s", err)) + return collectionFactory.LocalRepoCollection().Update(repo) + }) + + if conflictErr != nil { + c.AbortWithError(409, conflictErr) return } - c.JSON(200, repo) - + c.JSON(202, currTask) } // POST /repos/:name/packages func apiReposPackagesAdd(c *gin.Context) { - apiReposPackagesAddDelete(c, func(list *deb.PackageList, p *deb.Package) error { + apiReposPackagesAddDelete(c, "Add packages to repo ", func(list *deb.PackageList, p *deb.Package, out *task.Output) error { + out.Printf("Adding package %s\n", p.Name) return list.Add(p) }) } // DELETE /repos/:name/packages func apiReposPackagesDelete(c *gin.Context) { - apiReposPackagesAddDelete(c, func(list *deb.PackageList, p *deb.Package) error { + apiReposPackagesAddDelete(c, "Delete packages from repo ", func(list *deb.PackageList, p *deb.Package, out *task.Output) error { + out.Printf("Removing package %s\n", p.Name) list.Remove(p) return nil }) @@ -260,6 +270,7 @@ func apiReposPackageFromDir(c *gin.Context) { return } + dirParam := c.Params.ByName("dir") fileParam := c.Params.ByName("file") if fileParam != "" && !verifyPath(fileParam) { c.AbortWithError(400, fmt.Errorf("wrong file")) @@ -269,7 +280,8 @@ func apiReposPackageFromDir(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() - repo, err := collection.ByName(c.Params.ByName("name")) + name := c.Params.ByName("name") + repo, err := collection.ByName(name) if err != nil { c.AbortWithError(404, err) return @@ -281,76 +293,96 @@ func apiReposPackageFromDir(c *gin.Context) { return } - verifier := context.GetVerifier() - - var ( - sources []string - packageFiles, failedFiles []string - otherFiles []string - processedFiles, failedFiles2 []string - reporter = &aptly.RecordingResultReporter{ - Warnings: []string{}, - AddedLines: []string{}, - RemovedLines: []string{}, - } - list *deb.PackageList - ) - + var taskName string + var sources []string if fileParam == "" { - sources = []string{filepath.Join(context.UploadPath(), c.Params.ByName("dir"))} + taskName = fmt.Sprintf("Add packages from dir %s to repo %s", dirParam, name) + sources = []string{filepath.Join(context.UploadPath(), dirParam)} } else { - sources = []string{filepath.Join(context.UploadPath(), c.Params.ByName("dir"), c.Params.ByName("file"))} + sources = []string{filepath.Join(context.UploadPath(), dirParam, fileParam)} + taskName = fmt.Sprintf("Add package %s from dir %s to repo %s", fileParam, dirParam, name) } - packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter) + resources := []string{string(repo.Key())} + resources = append(resources, sources...) + currTask, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error { + verifier := context.GetVerifier() - list, err = deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil) - if err != nil { - c.AbortWithError(500, fmt.Errorf("unable to load packages: %s", err)) - return - } - - processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(), - collectionFactory.PackageCollection(), reporter, nil, collectionFactory.ChecksumCollection) - failedFiles = append(failedFiles, failedFiles2...) - - processedFiles = append(processedFiles, otherFiles...) - - if err != nil { - c.AbortWithError(500, fmt.Errorf("unable to import package files: %s", err)) - return - } - - repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list)) - - err = collectionFactory.LocalRepoCollection().Update(repo) - if err != nil { - c.AbortWithError(500, fmt.Errorf("unable to save: %s", err)) - return - } - - if !noRemove { - processedFiles = utils.StrSliceDeduplicate(processedFiles) - - for _, file := range processedFiles { - err := os.Remove(file) - if err != nil { - reporter.Warning("unable to remove file %s: %s", file, err) + var ( + packageFiles, failedFiles []string + otherFiles []string + processedFiles, failedFiles2 []string + reporter = &aptly.RecordingResultReporter{ + Warnings: []string{}, + AddedLines: []string{}, + RemovedLines: []string{}, } + list *deb.PackageList + ) + + packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter) + + list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil) + if err != nil { + return fmt.Errorf("unable to load packages: %s", err) } - // atempt to remove dir, if it fails, that's fine: probably it's not empty - os.Remove(filepath.Join(context.UploadPath(), c.Params.ByName("dir"))) - } + processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(), + collectionFactory.PackageCollection(), reporter, nil, collectionFactory.ChecksumCollection) + failedFiles = append(failedFiles, failedFiles2...) + processedFiles = append(processedFiles, otherFiles...) - if failedFiles == nil { - failedFiles = []string{} - } + if err != nil { + return fmt.Errorf("unable to import package files: %s", err) + } - c.JSON(200, gin.H{ - "Report": reporter, - "FailedFiles": failedFiles, + repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list)) + + err = collectionFactory.LocalRepoCollection().Update(repo) + if err != nil { + return fmt.Errorf("unable to save: %s", err) + } + + if !noRemove { + processedFiles = utils.StrSliceDeduplicate(processedFiles) + + for _, file := range processedFiles { + err := os.Remove(file) + if err != nil { + reporter.Warning("unable to remove file %s: %s", file, err) + } + } + + // atempt to remove dir, if it fails, that's fine: probably it's not empty + os.Remove(filepath.Join(context.UploadPath(), dirParam)) + } + + if failedFiles == nil { + failedFiles = []string{} + } + + if len(reporter.AddedLines) > 0 { + out.Printf("Added: %s\n", strings.Join(reporter.AddedLines, ", ")) + } + if len(reporter.RemovedLines) > 0 { + out.Printf("Removed: %s\n", strings.Join(reporter.RemovedLines, ", ")) + } + if len(reporter.Warnings) > 0 { + out.Printf("Warnings: %s\n", strings.Join(reporter.Warnings, ", ")) + } + if len(failedFiles) > 0 { + out.Printf("Failed files: %s\n", strings.Join(failedFiles, ", ")) + } + + return nil }) + + if conflictErr != nil { + c.AbortWithError(409, conflictErr) + return + } + + c.JSON(202, currTask) } // POST /repos/:name/include/:dir/:file @@ -367,59 +399,103 @@ func apiReposIncludePackageFromDir(c *gin.Context) { ignoreSignature := c.Request.URL.Query().Get("ignoreSignature") == "1" repoTemplateString := c.Params.ByName("name") + collectionFactory := context.NewCollectionFactory() if !verifyDir(c) { return } + var sources []string + var taskName string + dirParam := c.Params.ByName("dir") fileParam := c.Params.ByName("file") if fileParam != "" && !verifyPath(fileParam) { c.AbortWithError(400, fmt.Errorf("wrong file")) return } - var ( - err error - verifier = context.GetVerifier() - sources, changesFiles []string - failedFiles, failedFiles2 []string - reporter = &aptly.RecordingResultReporter{ - Warnings: []string{}, - AddedLines: []string{}, - RemovedLines: []string{}, - } - ) - if fileParam == "" { - sources = []string{filepath.Join(context.UploadPath(), c.Params.ByName("dir"))} + taskName = fmt.Sprintf("Include packages from changes files in dir %s to repo matching template %s", dirParam, repoTemplateString) + sources = []string{filepath.Join(context.UploadPath(), dirParam)} } else { - sources = []string{filepath.Join(context.UploadPath(), c.Params.ByName("dir"), c.Params.ByName("file"))} + taskName = fmt.Sprintf("Include packages from changes file %s from dir %s to repo matching template %s", fileParam, dirParam, repoTemplateString) + sources = []string{filepath.Join(context.UploadPath(), dirParam, fileParam)} } - collectionFactory := context.NewCollectionFactory() - changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter) - _, failedFiles2, err = deb.ImportChangesFiles( - changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier, - repoTemplateString, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(), - context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse) - failedFiles = append(failedFiles, failedFiles2...) - + repoTemplate, err := template.New("repo").Parse(repoTemplateString) if err != nil { - c.AbortWithError(500, fmt.Errorf("unable to import changes files: %s", err)) + c.AbortWithError(400, fmt.Errorf("error parsing repo template: %s", err)) return } - if !noRemoveFiles { - // atempt to remove dir, if it fails, that's fine: probably it's not empty - os.Remove(filepath.Join(context.UploadPath(), c.Params.ByName("dir"))) - } + var resources []string + if len(repoTemplate.Tree.Root.Nodes) > 1 { + resources = append(resources, task.AllLocalReposResourcesKey) + } else { + // repo template string is simple text so only use resource key of specific repository + repo, err := collectionFactory.LocalRepoCollection().ByName(repoTemplateString) + if err != nil { + c.AbortWithError(404, err) + return + } - if failedFiles == nil { - failedFiles = []string{} + resources = append(resources, string(repo.Key())) } + resources = append(resources, sources...) - c.JSON(200, gin.H{ - "Report": reporter, - "FailedFiles": failedFiles, + currTask, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error { + var ( + err error + verifier = context.GetVerifier() + changesFiles []string + failedFiles, failedFiles2 []string + reporter = &aptly.RecordingResultReporter{ + Warnings: []string{}, + AddedLines: []string{}, + RemovedLines: []string{}, + } + ) + + changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter) + _, failedFiles2, err = deb.ImportChangesFiles( + changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier, + repoTemplate, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(), + context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse) + failedFiles = append(failedFiles, failedFiles2...) + + if err != nil { + return fmt.Errorf("unable to import changes files: %s", err) + } + + if !noRemoveFiles { + // atempt to remove dir, if it fails, that's fine: probably it's not empty + os.Remove(filepath.Join(context.UploadPath(), dirParam)) + } + + if failedFiles == nil { + failedFiles = []string{} + } + + if len(reporter.AddedLines) > 0 { + out.Printf("Added: %s\n", strings.Join(reporter.AddedLines, ", ")) + } + if len(reporter.RemovedLines) > 0 { + out.Printf("Removed: %s\n", strings.Join(reporter.RemovedLines, ", ")) + } + if len(reporter.Warnings) > 0 { + out.Printf("Warnings: %s\n", strings.Join(reporter.Warnings, ", ")) + } + if len(failedFiles) > 0 { + out.Printf("Failed files: %s\n", strings.Join(failedFiles, ", ")) + } + + return nil }) + + if conflictErr != nil { + c.AbortWithError(409, conflictErr) + return + } + + c.JSON(202, currTask) } diff --git a/api/router.go b/api/router.go index 0661e5ba..c71fc3d5 100644 --- a/api/router.go +++ b/api/router.go @@ -20,15 +20,15 @@ func Router(c *ctx.AptlyContext) http.Handler { // We use a goroutine to count the number of // concurrent requests. When no more requests are // running, we close the database to free the lock. - requests := make(chan dbRequest) + dbRequests = make(chan dbRequest) - go acquireDatabase(requests) + go acquireDatabase() router.Use(func(c *gin.Context) { var err error errCh := make(chan error) - requests <- dbRequest{acquiredb, errCh} + dbRequests <- dbRequest{acquiredb, errCh} err = <-errCh if err != nil { @@ -37,7 +37,7 @@ func Router(c *ctx.AptlyContext) http.Handler { } defer func() { - requests <- dbRequest{releasedb, errCh} + dbRequests <- dbRequest{releasedb, errCh} err = <-errCh if err != nil { c.AbortWithError(500, err) @@ -111,6 +111,16 @@ func Router(c *ctx.AptlyContext) http.Handler { { root.GET("/graph.:ext", apiGraph) } + { + root.GET("/tasks", apiTasksList) + root.POST("/tasks-clear", apiTasksClear) + root.GET("/tasks-wait", apiTasksWait) + root.GET("/tasks/:id/wait", apiTasksWaitForTaskByID) + root.GET("/tasks/:id/output", apiTasksOutputShow) + root.GET("/tasks/:id/detail", apiTasksDetailShow) + root.GET("/tasks/:id", apiTasksShow) + root.DELETE("/tasks/:id", apiTasksDelete) + } return router } diff --git a/api/snapshot.go b/api/snapshot.go index b03bbe11..e94dd5ab 100644 --- a/api/snapshot.go +++ b/api/snapshot.go @@ -5,6 +5,7 @@ import ( "github.com/aptly-dev/aptly/database" "github.com/aptly-dev/aptly/deb" + "github.com/aptly-dev/aptly/task" "github.com/gin-gonic/gin" ) @@ -48,42 +49,46 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.RemoteRepoCollection() snapshotCollection := collectionFactory.SnapshotCollection() + name := c.Params.ByName("name") - repo, err = collection.ByName(c.Params.ByName("name")) + repo, err = collection.ByName(name) if err != nil { c.AbortWithError(404, err) return } - err = repo.CheckLock() - if err != nil { - c.AbortWithError(409, err) + // including snapshot resource key + resources := []string{string(repo.Key()), "S" + b.Name} + taskName := fmt.Sprintf("Create snapshot of mirror %s", name) + currTask, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error { + err := repo.CheckLock() + if err != nil { + return err + } + + err = collection.LoadComplete(repo) + if err != nil { + return err + } + + snapshot, err = deb.NewSnapshotFromRepository(b.Name, repo) + if err != nil { + return err + } + + if b.Description != "" { + snapshot.Description = b.Description + } + + return snapshotCollection.Add(snapshot) + }) + + if conflictErr != nil { + c.AbortWithError(409, conflictErr) return } - err = collection.LoadComplete(repo) - if err != nil { - c.AbortWithError(500, err) - return - } - - snapshot, err = deb.NewSnapshotFromRepository(b.Name, repo) - if err != nil { - c.AbortWithError(400, err) - return - } - - if b.Description != "" { - snapshot.Description = b.Description - } - - err = snapshotCollection.Add(snapshot) - if err != nil { - c.AbortWithError(400, err) - return - } - - c.JSON(201, snapshot) + c.JSON(202, currTask) } // POST /api/snapshots @@ -112,6 +117,7 @@ func apiSnapshotsCreate(c *gin.Context) { collectionFactory := context.NewCollectionFactory() snapshotCollection := collectionFactory.SnapshotCollection() + var resources []string sources := make([]*deb.Snapshot, len(b.SourceSnapshots)) @@ -127,39 +133,39 @@ func apiSnapshotsCreate(c *gin.Context) { c.AbortWithError(500, err) return } + + resources = append(resources, string(sources[i].ResourceKey())) } - list := deb.NewPackageList() + currTask, conflictErr := runTaskInBackground("Create snapshot "+b.Name, resources, func(out *task.Output, detail *task.Detail) error { + list := deb.NewPackageList() - // verify package refs and build package list - for _, ref := range b.PackageRefs { - var p *deb.Package - - p, err = collectionFactory.PackageCollection().ByKey([]byte(ref)) - if err != nil { - if err == database.ErrNotFound { - c.AbortWithError(404, fmt.Errorf("package %s: %s", ref, err)) - } else { - c.AbortWithError(500, err) + // verify package refs and build package list + for _, ref := range b.PackageRefs { + p, err := collectionFactory.PackageCollection().ByKey([]byte(ref)) + if err != nil { + if err == database.ErrNotFound { + return fmt.Errorf("package %s: %s", ref, err) + } + return err + } + err = list.Add(p) + if err != nil { + return err } - return } - err = list.Add(p) - if err != nil { - c.AbortWithError(400, err) - return - } - } - snapshot = deb.NewSnapshotFromRefList(b.Name, sources, deb.NewPackageRefListFromPackageList(list), b.Description) + snapshot = deb.NewSnapshotFromRefList(b.Name, sources, deb.NewPackageRefListFromPackageList(list), b.Description) - err = snapshotCollection.Add(snapshot) - if err != nil { - c.AbortWithError(400, err) + return snapshotCollection.Add(snapshot) + }) + + if conflictErr != nil { + c.AbortWithError(409, conflictErr) return } - c.JSON(201, snapshot) + c.JSON(202, currTask) } // POST /api/repos/:name/snapshots @@ -182,36 +188,41 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() snapshotCollection := collectionFactory.SnapshotCollection() + name := c.Params.ByName("name") - repo, err = collection.ByName(c.Params.ByName("name")) + repo, err = collection.ByName(name) if err != nil { c.AbortWithError(404, err) return } - err = collection.LoadComplete(repo) - if err != nil { - c.AbortWithError(500, err) + // including snapshot resource key + resources := []string{string(repo.Key()), "S" + b.Name} + taskName := fmt.Sprintf("Create snapshot of repo %s", name) + currTask, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error { + err := collection.LoadComplete(repo) + if err != nil { + return err + } + + snapshot, err = deb.NewSnapshotFromLocalRepo(b.Name, repo) + if err != nil { + return err + } + + if b.Description != "" { + snapshot.Description = b.Description + } + + return snapshotCollection.Add(snapshot) + }) + + if conflictErr != nil { + c.AbortWithError(409, conflictErr) return } - snapshot, err = deb.NewSnapshotFromLocalRepo(b.Name, repo) - if err != nil { - c.AbortWithError(400, err) - return - } - - if b.Description != "" { - snapshot.Description = b.Description - } - - err = snapshotCollection.Add(snapshot) - if err != nil { - c.AbortWithError(400, err) - return - } - - c.JSON(201, snapshot) + c.JSON(202, currTask) } // PUT /api/snapshots/:name @@ -232,34 +243,39 @@ func apiSnapshotsUpdate(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.SnapshotCollection() + name := c.Params.ByName("name") - snapshot, err = collection.ByName(c.Params.ByName("name")) + snapshot, err = collection.ByName(name) if err != nil { c.AbortWithError(404, err) return } - _, err = collection.ByName(b.Name) - if err == nil { - c.AbortWithError(409, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)) + resources := []string{string(snapshot.ResourceKey()), "S" + b.Name} + taskName := fmt.Sprintf("Update snapshot %s", name) + currTask, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error { + _, err := collection.ByName(b.Name) + if err == nil { + return fmt.Errorf("unable to rename: snapshot %s already exists", b.Name) + } + + if b.Name != "" { + snapshot.Name = b.Name + } + + if b.Description != "" { + snapshot.Description = b.Description + } + + return collectionFactory.SnapshotCollection().Update(snapshot) + }) + + if conflictErr != nil { + c.AbortWithError(409, conflictErr) return } - if b.Name != "" { - snapshot.Name = b.Name - } - - if b.Description != "" { - snapshot.Description = b.Description - } - - err = collectionFactory.SnapshotCollection().Update(snapshot) - if err != nil { - c.AbortWithError(500, err) - return - } - - c.JSON(200, snapshot) + c.JSON(202, currTask) } // GET /api/snapshots/:name @@ -297,28 +313,31 @@ func apiSnapshotsDrop(c *gin.Context) { return } - published := publishedCollection.BySnapshot(snapshot) + resources := []string{string(snapshot.ResourceKey())} + taskName := fmt.Sprintf("Delete snapshot %s", name) + currTask, conflictErr := runTaskInBackground(taskName, resources, func(out *task.Output, detail *task.Detail) error { + published := publishedCollection.BySnapshot(snapshot) - if len(published) > 0 { - c.AbortWithError(409, fmt.Errorf("unable to drop: snapshot is published")) - return - } - - if !force { - snapshots := snapshotCollection.BySnapshotSource(snapshot) - if len(snapshots) > 0 { - c.AbortWithError(409, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override")) - return + if len(published) > 0 { + return fmt.Errorf("unable to drop: snapshot is published") } - } - err = snapshotCollection.Drop(snapshot) - if err != nil { - c.AbortWithError(500, err) + if !force { + snapshots := snapshotCollection.BySnapshotSource(snapshot) + if len(snapshots) > 0 { + return fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override") + } + } + + return snapshotCollection.Drop(snapshot) + }) + + if conflictErr != nil { + c.AbortWithError(409, conflictErr) return } - c.JSON(200, gin.H{}) + c.JSON(202, currTask) } // GET /api/snapshots/:name/diff/:withSnapshot diff --git a/api/task.go b/api/task.go new file mode 100644 index 00000000..1a25f113 --- /dev/null +++ b/api/task.go @@ -0,0 +1,122 @@ +package api + +import ( + "strconv" + + "github.com/aptly-dev/aptly/task" + "github.com/gin-gonic/gin" +) + +// GET /tasks +func apiTasksList(c *gin.Context) { + list := context.TaskList() + c.JSON(200, list.GetTasks()) +} + +// POST /tasks/clear +func apiTasksClear(c *gin.Context) { + list := context.TaskList() + list.Clear() + c.JSON(200, gin.H{}) +} + +// GET /tasks-wait +func apiTasksWait(c *gin.Context) { + list := context.TaskList() + list.Wait() + c.JSON(200, gin.H{}) +} + +// GET /tasks/:id/wait +func apiTasksWaitForTaskByID(c *gin.Context) { + list := context.TaskList() + id, err := strconv.ParseInt(c.Params.ByName("id"), 10, 0) + if err != nil { + c.AbortWithError(500, err) + return + } + + task, err := list.WaitForTaskByID(int(id)) + if err != nil { + c.AbortWithError(400, err) + return + } + + c.JSON(200, task) +} + +// GET /tasks/:id +func apiTasksShow(c *gin.Context) { + list := context.TaskList() + id, err := strconv.ParseInt(c.Params.ByName("id"), 10, 0) + if err != nil { + c.AbortWithError(500, err) + return + } + + var task task.Task + task, err = list.GetTaskByID(int(id)) + if err != nil { + c.AbortWithError(404, err) + return + } + + c.JSON(200, task) +} + +// GET /tasks/:id/output +func apiTasksOutputShow(c *gin.Context) { + list := context.TaskList() + id, err := strconv.ParseInt(c.Params.ByName("id"), 10, 0) + if err != nil { + c.AbortWithError(500, err) + return + } + + var output string + output, err = list.GetTaskOutputByID(int(id)) + if err != nil { + c.AbortWithError(404, err) + return + } + + c.JSON(200, output) +} + +// GET /tasks/:id/detail +func apiTasksDetailShow(c *gin.Context) { + list := context.TaskList() + id, err := strconv.ParseInt(c.Params.ByName("id"), 10, 0) + if err != nil { + c.AbortWithError(500, err) + return + } + + var detail interface{} + detail, err = list.GetTaskDetailByID(int(id)) + if err != nil { + c.AbortWithError(404, err) + return + } + + c.JSON(200, detail) +} + +// DELETE /tasks/:id +func apiTasksDelete(c *gin.Context) { + list := context.TaskList() + id, err := strconv.ParseInt(c.Params.ByName("id"), 10, 0) + if err != nil { + c.AbortWithError(500, err) + return + } + + var delTask task.Task + delTask, err = list.DeleteTaskByID(int(id)) + if err != nil { + c.AbortWithError(400, err) + return + } + + c.JSON(200, delTask) +} diff --git a/cmd/repo_include.go b/cmd/repo_include.go index b449e253..b84b96a3 100644 --- a/cmd/repo_include.go +++ b/cmd/repo_include.go @@ -2,6 +2,7 @@ package cmd import ( "fmt" + "text/template" "github.com/aptly-dev/aptly/aptly" "github.com/aptly-dev/aptly/deb" @@ -33,6 +34,12 @@ func aptlyRepoInclude(cmd *commander.Command, args []string) error { repoTemplateString := context.Flags().Lookup("repo").Value.Get().(string) collectionFactory := context.NewCollectionFactory() + var repoTemplate *template.Template + repoTemplate, err = template.New("repo").Parse(repoTemplateString) + if err != nil { + return fmt.Errorf("error parsing -repo template: %s", err) + } + uploaders := (*deb.Uploaders)(nil) uploadersFile := context.Flags().Lookup("uploaders-file").Value.Get().(string) if uploadersFile != "" { @@ -55,7 +62,7 @@ func aptlyRepoInclude(cmd *commander.Command, args []string) error { changesFiles, failedFiles = deb.CollectChangesFiles(args, reporter) _, failedFiles2, err = deb.ImportChangesFiles( - changesFiles, reporter, acceptUnsigned, ignoreSignatures, forceReplace, noRemoveFiles, verifier, repoTemplateString, + changesFiles, reporter, acceptUnsigned, ignoreSignatures, forceReplace, noRemoveFiles, verifier, repoTemplate, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(), context.PackagePool(), collectionFactory.ChecksumCollection, uploaders, query.Parse) diff --git a/context/context.go b/context/context.go index 38514836..51667119 100644 --- a/context/context.go +++ b/context/context.go @@ -25,6 +25,7 @@ import ( "github.com/aptly-dev/aptly/pgp" "github.com/aptly-dev/aptly/s3" "github.com/aptly-dev/aptly/swift" + "github.com/aptly-dev/aptly/task" "github.com/aptly-dev/aptly/utils" "github.com/smira/commander" "github.com/smira/flag" @@ -41,6 +42,7 @@ type AptlyContext struct { progress aptly.Progress downloader aptly.Downloader + taskList *task.List database database.Storage packagePool aptly.PackagePool publishedStorages map[string]aptly.PublishedStorage @@ -200,34 +202,59 @@ func (context *AptlyContext) _progress() aptly.Progress { return context.progress } +// NewDownloader returns instance of new downloader with given progress +func (context *AptlyContext) NewDownloader(progress aptly.Progress) aptly.Downloader { + context.Lock() + defer context.Unlock() + + return context.newDownloader(progress) +} + +// NewDownloader returns instance of new downloader with given progress without locking +// so it can be used for internal usage. +func (context *AptlyContext) newDownloader(progress aptly.Progress) aptly.Downloader { + var downloadLimit int64 + limitFlag := context.flags.Lookup("download-limit") + if limitFlag != nil { + downloadLimit = limitFlag.Value.Get().(int64) + } + if downloadLimit == 0 { + downloadLimit = context.config().DownloadLimit + } + maxTries := context.config().DownloadRetries + 1 + maxTriesFlag := context.flags.Lookup("max-tries") + if maxTriesFlag != nil { + maxTriesFlagValue := maxTriesFlag.Value.Get().(int) + if maxTriesFlagValue > maxTries { + maxTries = maxTriesFlagValue + } + } + return http.NewDownloader(downloadLimit*1024, maxTries, progress) +} + // Downloader returns instance of current downloader func (context *AptlyContext) Downloader() aptly.Downloader { context.Lock() defer context.Unlock() if context.downloader == nil { - var downloadLimit int64 - limitFlag := context.flags.Lookup("download-limit") - if limitFlag != nil { - downloadLimit = limitFlag.Value.Get().(int64) - } - if downloadLimit == 0 { - downloadLimit = context.config().DownloadLimit - } - maxTries := context.config().DownloadRetries + 1 - maxTriesFlag := context.flags.Lookup("max-tries") - if maxTriesFlag != nil { - maxTriesFlagValue := maxTriesFlag.Value.Get().(int) - if maxTriesFlagValue > maxTries { - maxTries = maxTriesFlagValue - } - } - context.downloader = http.NewDownloader(downloadLimit*1024, maxTries, context._progress()) + context.downloader = context.newDownloader(context._progress()) } return context.downloader } +// TaskList returns instance of current task list +func (context *AptlyContext) TaskList() *task.List { + context.Lock() + defer context.Unlock() + + if context.taskList == nil { + context.taskList = task.NewList() + } + return context.taskList +} + // DBPath builds path to database func (context *AptlyContext) DBPath() string { context.Lock() diff --git a/deb/changes.go b/deb/changes.go index 10fc83eb..48b217e6 100644 --- a/deb/changes.go +++ b/deb/changes.go @@ -293,14 +293,9 @@ func CollectChangesFiles(locations []string, reporter aptly.ResultReporter) (cha // ImportChangesFiles imports referenced files in changes files into local repository func ImportChangesFiles(changesFiles []string, reporter aptly.ResultReporter, acceptUnsigned, ignoreSignatures, forceReplace, noRemoveFiles bool, - verifier pgp.Verifier, repoTemplateString string, progress aptly.Progress, localRepoCollection *LocalRepoCollection, packageCollection *PackageCollection, + verifier pgp.Verifier, repoTemplate *template.Template, progress aptly.Progress, localRepoCollection *LocalRepoCollection, packageCollection *PackageCollection, pool aptly.PackagePool, checksumStorageProvider aptly.ChecksumStorageProvider, uploaders *Uploaders, parseQuery parseQuery) (processedFiles []string, failedFiles []string, err error) { - var repoTemplate *template.Template - repoTemplate, err = template.New("repo").Parse(repoTemplateString) - if err != nil { - return nil, nil, fmt.Errorf("error parsing -repo template: %s", err) - } for _, path := range changesFiles { var changes *Changes diff --git a/deb/changes_test.go b/deb/changes_test.go index af56d1c2..a536d619 100644 --- a/deb/changes_test.go +++ b/deb/changes_test.go @@ -3,6 +3,7 @@ package deb import ( "os" "path/filepath" + "text/template" "github.com/aptly-dev/aptly/aptly" "github.com/aptly-dev/aptly/console" @@ -123,7 +124,7 @@ func (s *ChangesSuite) TestImportChangesFiles(c *C) { processedFiles, failedFiles, err := ImportChangesFiles( append(changesFiles, "testdata/changes/notexistent.changes"), s.Reporter, true, true, false, false, &NullVerifier{}, - "test", s.progress, s.localRepoCollection, s.packageCollection, s.packagePool, func(database.ReaderWriter) aptly.ChecksumStorage { return s.checksumStorage }, + template.Must(template.New("test").Parse("test")), s.progress, s.localRepoCollection, s.packageCollection, s.packagePool, func(database.ReaderWriter) aptly.ChecksumStorage { return s.checksumStorage }, nil, nil) c.Assert(err, IsNil) c.Check(failedFiles, DeepEquals, append(expectedFailedFiles, "testdata/changes/notexistent.changes")) diff --git a/deb/snapshot.go b/deb/snapshot.go index 6ff4582f..657999db 100644 --- a/deb/snapshot.go +++ b/deb/snapshot.go @@ -125,6 +125,13 @@ func (s *Snapshot) Key() []byte { return []byte("S" + s.UUID) } +// ResourceKey is a unique identifier of the resource +// this snapshot uses. Instead of uuid it uses name +// which needs to be unique as well. +func (s *Snapshot) ResourceKey() []byte { + return []byte("S" + s.Name) +} + // RefKey is a unique id for package reference list func (s *Snapshot) RefKey() []byte { return []byte("E" + s.UUID) diff --git a/http/download.go b/http/download.go index 2b237b84..4ae26707 100644 --- a/http/download.go +++ b/http/download.go @@ -266,7 +266,9 @@ func (downloader *downloaderImpl) download(req *http.Request, url, destination s if err != nil { if ignoreMismatch { - downloader.progress.Printf("WARNING: %s\n", err.Error()) + if downloader.progress != nil { + downloader.progress.Printf("WARNING: %s\n", err.Error()) + } } else { os.Remove(temppath) return "", err diff --git a/system/api_lib.py b/system/api_lib.py index bb54ef0f..aada2f64 100644 --- a/system/api_lib.py +++ b/system/api_lib.py @@ -47,6 +47,17 @@ class APITest(BaseTest): kwargs["headers"]["Content-Type"] = "application/json" return requests.post("http://%s%s" % (self.base_url, uri), *args, **kwargs) + def post_task(self, uri, *args, **kwargs): + resp = self.post(uri, *args, **kwargs) + if resp.status_code != 202: + return resp + + _id = resp.json()['ID'] + resp = self.get("/api/tasks/" + str(_id) + "/wait") + self.check_equal(resp.status_code, 200) + + return self.get("/api/tasks/" + str(_id)) + def put(self, uri, *args, **kwargs): if "json" in kwargs: kwargs["data"] = json.dumps(kwargs.pop("json")) @@ -55,6 +66,17 @@ class APITest(BaseTest): kwargs["headers"]["Content-Type"] = "application/json" return requests.put("http://%s%s" % (self.base_url, uri), *args, **kwargs) + def put_task(self, uri, *args, **kwargs): + resp = self.put(uri, *args, **kwargs) + if resp.status_code != 202: + return resp + + _id = resp.json()['ID'] + resp = self.get("/api/tasks/" + str(_id) + "/wait") + self.check_equal(resp.status_code, 200) + + return self.get("/api/tasks/" + str(_id)) + def delete(self, uri, *args, **kwargs): if "json" in kwargs: kwargs["data"] = json.dumps(kwargs.pop("json")) @@ -63,6 +85,17 @@ class APITest(BaseTest): kwargs["headers"]["Content-Type"] = "application/json" return requests.delete("http://%s%s" % (self.base_url, uri), *args, **kwargs) + def delete_task(self, uri, *args, **kwargs): + resp = self.delete(uri, *args, **kwargs) + if resp.status_code != 202: + return resp + + _id = resp.json()['ID'] + resp = self.get("/api/tasks/" + str(_id) + "/wait") + self.check_equal(resp.status_code, 200) + + return self.get("/api/tasks/" + str(_id)) + def upload(self, uri, *filenames, **kwargs): upload_name = kwargs.pop("upload_name", None) directory = kwargs.pop("directory", "files") diff --git a/system/t12_api/graph.py b/system/t12_api/graph.py index 020a8ef2..670ada83 100644 --- a/system/t12_api/graph.py +++ b/system/t12_api/graph.py @@ -44,4 +44,6 @@ class GraphAPITest(APITest): # remove the repos again for repo in tempRepos: - self.check_equal(self.delete("/api/repos/" + repo, params={"force": "1"}).status_code, 200) + self.check_equal(self.delete_task( + "/api/repos/" + repo, params={"force": "1"}).json()['State'], 2 + ) diff --git a/system/t12_api/packages.py b/system/t12_api/packages.py index 27dae270..836d939d 100644 --- a/system/t12_api/packages.py +++ b/system/t12_api/packages.py @@ -15,8 +15,8 @@ class PackagesAPITestShow(APITest): self.check_equal(self.upload("/api/files/" + d, "pyspi_0.6.1-1.3.dsc", "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz").status_code, 200) - resp = self.post("/api/repos/" + repo_name + "/file/" + d) - self.check_equal(resp.status_code, 200) + resp = self.post_task("/api/repos/" + repo_name + "/file/" + d) + self.check_equal(resp.json()['State'], 2) # get information about package resp = self.get("/api/packages/" + urllib.quote('Psource pyspi 0.6.1-1.3 3a8b37cbd9a3559e')) diff --git a/system/t12_api/publish.py b/system/t12_api/publish.py index 865326ae..98874561 100644 --- a/system/t12_api/publish.py +++ b/system/t12_api/publish.py @@ -26,17 +26,18 @@ class PublishAPITestRepo(APITest): "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz", "pyspi-0.6.1-1.3.stripped.dsc").status_code, 200) - self.check_equal( - self.post("/api/repos/" + repo_name + "/file/" + d).status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], 2) # publishing under prefix, default distribution prefix = self.random_name() - resp = self.post("/api/publish/" + prefix, - json={ - "SourceKind": "local", - "Sources": [{"Name": repo_name}], - "Signing": DefaultSigningOptions, - }) + resp = self.post_task( + "/api/publish/" + prefix, + json={ + "SourceKind": "local", + "Sources": [{"Name": repo_name}], + "Signing": DefaultSigningOptions, + } + ) repo_expected = { 'AcquireByHash': False, 'Architectures': ['i386', 'source'], @@ -52,8 +53,7 @@ class PublishAPITestRepo(APITest): 'Storage': '', 'Suite': ''} - self.check_equal(resp.status_code, 201) - self.check_equal(resp.json(), repo_expected) + self.check_equal(resp.json()['State'], 2) all_repos = self.get("/api/publish") self.check_equal(all_repos.status_code, 200) @@ -71,14 +71,17 @@ class PublishAPITestRepo(APITest): # publishing under root, custom distribution, architectures distribution = self.random_name() - resp = self.post("/api/publish/:.", - json={ - "SourceKind": "local", - "Sources": [{"Name": repo_name}], - "Signing": DefaultSigningOptions, - "Distribution": distribution, - "Architectures": ["i386", "amd64"], - }) + resp = self.post_task( + "/api/publish/:.", + json={ + "SourceKind": "local", + "Sources": [{"Name": repo_name}], + "Signing": DefaultSigningOptions, + "Distribution": distribution, + "Architectures": ["i386", "amd64"], + } + ) + self.check_equal(resp.json()['State'], 2) repo2_expected = { 'AcquireByHash': False, 'Architectures': ['amd64', 'i386'], @@ -93,8 +96,9 @@ class PublishAPITestRepo(APITest): 'Sources': [{'Component': 'main', 'Name': repo_name}], 'Storage': '', 'Suite': ''} - self.check_equal(resp.status_code, 201) - self.check_equal(resp.json(), repo2_expected) + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) self.check_exists("public/dists/" + distribution + "/Release") self.check_exists("public/dists/" + distribution + @@ -129,27 +133,27 @@ class PublishSnapshotAPITest(APITest): self.check_equal(self.upload("/api/files/" + d, "libboost-program-options-dev_1.49.0.1_i386.deb").status_code, 200) - self.check_equal( - self.post("/api/repos/" + repo_name + "/file/" + d).status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], 2) - self.check_equal(self.post("/api/repos/" + repo_name + - '/snapshots', json={'Name': snapshot_name}).status_code, 201) + self.check_equal(self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot_name}).json()['State'], 2) prefix = self.random_name() - resp = self.post("/api/publish/" + prefix, - json={ - "AcquireByHash": True, - "SourceKind": "snapshot", - "Sources": [{"Name": snapshot_name}], - "Signing": DefaultSigningOptions, - "Distribution": "squeeze", - "NotAutomatic": "yes", - "ButAutomaticUpgrades": "yes", - "Origin": "earth", - "Label": "fun", - }) - self.check_equal(resp.status_code, 201) - self.check_equal(resp.json(), { + resp = self.post_task( + "/api/publish/" + prefix, + json={ + "AcquireByHash": True, + "SourceKind": "snapshot", + "Sources": [{"Name": snapshot_name}], + "Signing": DefaultSigningOptions, + "Distribution": "squeeze", + "NotAutomatic": "yes", + "ButAutomaticUpgrades": "yes", + "Origin": "earth", + "Label": "fun", + } + ) + self.check_equal(resp.json()['State'], 2) + repo_expected = { 'AcquireByHash': True, 'Architectures': ['i386'], 'Distribution': 'squeeze', @@ -163,6 +167,9 @@ class PublishSnapshotAPITest(APITest): 'Sources': [{'Component': 'main', 'Name': snapshot_name}], 'Storage': '', 'Suite': ''}) + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) self.check_exists("public/" + prefix + "/dists/squeeze/Release") self.check_exists("public/" + prefix + @@ -192,19 +199,20 @@ class PublishUpdateAPITestRepo(APITest): "pyspi_0.6.1-1.3.dsc", "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz", "pyspi-0.6.1-1.3.stripped.dsc").status_code, 200) - self.check_equal( - self.post("/api/repos/" + repo_name + "/file/" + d).status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], 2) prefix = self.random_name() - resp = self.post("/api/publish/" + prefix, - json={ - "Architectures": ["i386", "source"], - "SourceKind": "local", - "Sources": [{"Name": repo_name}], - "Signing": DefaultSigningOptions, - }) + resp = self.post_task( + "/api/publish/" + prefix, + json={ + "Architectures": ["i386", "source"], + "SourceKind": "local", + "Sources": [{"Name": repo_name}], + "Signing": DefaultSigningOptions, + } + ) - self.check_equal(resp.status_code, 201) + self.check_equal(resp.json()['State'], 2) self.check_not_exists( "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") @@ -213,19 +221,21 @@ class PublishUpdateAPITestRepo(APITest): d = self.random_name() self.check_equal(self.upload("/api/files/" + d, - "libboost-program-options-dev_1.49.0.1_i386.deb").status_code, 200) - self.check_equal( - self.post("/api/repos/" + repo_name + "/file/" + d).status_code, 200) + "libboost-program-options-dev_1.49.0.1_i386.deb").status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], 2) - self.check_equal(self.delete("/api/repos/" + repo_name + "/packages/", - json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89']}).status_code, 200) + self.check_equal(self.delete_task("/api/repos/" + repo_name + "/packages/", + json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89']}).json()['State'], 2) # Update and switch AcquireByHash on. - resp = self.put("/api/publish/" + prefix + "/wheezy", - json={ - "AcquireByHash": True, - "Signing": DefaultSigningOptions, - }) + resp = self.put_task( + "/api/publish/" + prefix + "/wheezy", + json={ + "AcquireByHash": True, + "Signing": DefaultSigningOptions, + } + ) + self.check_equal(resp.json()['State'], 2) repo_expected = { 'AcquireByHash': True, 'Architectures': ['i386', 'source'], @@ -241,8 +251,9 @@ class PublishUpdateAPITestRepo(APITest): 'Storage': '', 'Suite': ''} - self.check_equal(resp.status_code, 200) - self.check_equal(resp.json(), repo_expected) + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) self.check_exists("public/" + prefix + "/dists/wheezy/main/binary-i386/by-hash") @@ -252,8 +263,7 @@ class PublishUpdateAPITestRepo(APITest): self.check_not_exists( "public/" + prefix + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") - self.check_equal(self.delete("/api/publish/" + - prefix + "/wheezy").status_code, 200) + self.check_equal(self.delete_task("/api/publish/" + prefix + "/wheezy").json()['State'], 2) self.check_not_exists("public/" + prefix + "dists/") @@ -274,19 +284,18 @@ class PublishUpdateSkipCleanupAPITestRepo(APITest): "pyspi_0.6.1-1.3.dsc", "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz", "pyspi-0.6.1-1.3.stripped.dsc").status_code, 200) - self.check_equal( - self.post("/api/repos/" + repo_name + "/file/" + d).status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], 2) prefix = self.random_name() - resp = self.post("/api/publish/" + prefix, - json={ - "Architectures": ["i386", "source"], - "SourceKind": "local", - "Sources": [{"Name": repo_name}], - "Signing": DefaultSigningOptions, - }) + resp = self.post_task("/api/publish/" + prefix, + json={ + "Architectures": ["i386", "source"], + "SourceKind": "local", + "Sources": [{"Name": repo_name}], + "Signing": DefaultSigningOptions, + }) - self.check_equal(resp.status_code, 201) + self.check_equal(resp.json()['State'], 2) self.check_not_exists( "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") @@ -295,31 +304,31 @@ class PublishUpdateSkipCleanupAPITestRepo(APITest): # Publish two repos, so that deleting one while skipping cleanup will # not delete the whole prefix. - resp = self.post("/api/publish/" + prefix, - json={ - "Architectures": ["i386", "source"], - "Distribution": "otherdist", - "SourceKind": "local", - "Sources": [{"Name": repo_name}], - "Signing": DefaultSigningOptions, - }) + resp = self.post_task("/api/publish/" + prefix, + json={ + "Architectures": ["i386", "source"], + "Distribution": "otherdist", + "SourceKind": "local", + "Sources": [{"Name": repo_name}], + "Signing": DefaultSigningOptions, + }) - self.check_equal(resp.status_code, 201) + self.check_equal(resp.json()['State'], 2) d = self.random_name() self.check_equal(self.upload("/api/files/" + d, - "libboost-program-options-dev_1.49.0.1_i386.deb").status_code, 200) - self.check_equal( - self.post("/api/repos/" + repo_name + "/file/" + d).status_code, 200) + "libboost-program-options-dev_1.49.0.1_i386.deb").status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], 2) - self.check_equal(self.delete("/api/repos/" + repo_name + "/packages/", - json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89']}).status_code, 200) + self.check_equal(self.delete_task("/api/repos/" + repo_name + "/packages/", + json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89']}).json()['State'], 2) - resp = self.put("/api/publish/" + prefix + "/wheezy", - json={ - "Signing": DefaultSigningOptions, - "SkipCleanup": True, - }) + resp = self.put_task("/api/publish/" + prefix + "/wheezy", + json={ + "Signing": DefaultSigningOptions, + "SkipCleanup": True, + }) + self.check_equal(resp.json()['State'], 2) repo_expected = { 'AcquireByHash': False, 'Architectures': ['i386', 'source'], @@ -335,20 +344,18 @@ class PublishUpdateSkipCleanupAPITestRepo(APITest): 'Storage': '', 'Suite': ''} - self.check_equal(resp.status_code, 200) - self.check_equal(resp.json(), repo_expected) + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) self.check_exists( "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") self.check_exists("public/" + prefix + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") - self.check_equal(self.delete("/api/publish/" + prefix + - "/wheezy", params={"SkipCleanup": "1"}).status_code, 200) - self.check_exists( - "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") - self.check_exists("public/" + prefix + - "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") + self.check_equal(self.delete_task("/api/publish/" + prefix + "/wheezy", params={"SkipCleanup": "1"}).json()['State'], 2) + self.check_exists("public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + self.check_exists("public/" + prefix + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") class PublishSwitchAPITestRepo(APITest): @@ -368,23 +375,22 @@ class PublishSwitchAPITestRepo(APITest): "pyspi_0.6.1-1.3.dsc", "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz", "pyspi-0.6.1-1.3.stripped.dsc").status_code, 200) - self.check_equal( - self.post("/api/repos/" + repo_name + "/file/" + d).status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], 2) snapshot1_name = self.random_name() - self.check_equal(self.post("/api/repos/" + repo_name + - '/snapshots', json={'Name': snapshot1_name}).status_code, 201) + self.check_equal(self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot1_name}).json()['State'], 2) prefix = self.random_name() - resp = self.post("/api/publish/" + prefix, - json={ - "Architectures": ["i386", "source"], - "SourceKind": "snapshot", - "Sources": [{"Name": snapshot1_name}], - "Signing": DefaultSigningOptions, - }) + resp = self.post_task( + "/api/publish/" + prefix, + json={ + "Architectures": ["i386", "source"], + "SourceKind": "snapshot", + "Sources": [{"Name": snapshot1_name}], + "Signing": DefaultSigningOptions, + }) - self.check_equal(resp.status_code, 201) + self.check_equal(resp.json()['State'], 2) repo_expected = { 'AcquireByHash': False, 'Architectures': ['i386', 'source'], @@ -399,7 +405,9 @@ class PublishSwitchAPITestRepo(APITest): 'Sources': [{'Component': 'main', 'Name': snapshot1_name}], 'Storage': '', 'Suite': ''} - self.check_equal(resp.json(), repo_expected) + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) self.check_not_exists( "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") @@ -408,23 +416,23 @@ class PublishSwitchAPITestRepo(APITest): d = self.random_name() self.check_equal(self.upload("/api/files/" + d, - "libboost-program-options-dev_1.49.0.1_i386.deb").status_code, 200) - self.check_equal( - self.post("/api/repos/" + repo_name + "/file/" + d).status_code, 200) + "libboost-program-options-dev_1.49.0.1_i386.deb").status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], 2) - self.check_equal(self.delete("/api/repos/" + repo_name + "/packages/", - json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89']}).status_code, 200) + self.check_equal(self.delete_task("/api/repos/" + repo_name + "/packages/", + json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89']}).json()['State'], 2) snapshot2_name = self.random_name() - self.check_equal(self.post("/api/repos/" + repo_name + - '/snapshots', json={'Name': snapshot2_name}).status_code, 201) + self.check_equal(self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot2_name}).json()['State'], 2) - resp = self.put("/api/publish/" + prefix + "/wheezy", - json={ - "Snapshots": [{"Component": "main", "Name": snapshot2_name}], - "Signing": DefaultSigningOptions, - "SkipContents": True, - }) + resp = self.put_task( + "/api/publish/" + prefix + "/wheezy", + json={ + "Snapshots": [{"Component": "main", "Name": snapshot2_name}], + "Signing": DefaultSigningOptions, + "SkipContents": True, + }) + self.check_equal(resp.json()['State'], 2) repo_expected = { 'AcquireByHash': False, 'Architectures': ['i386', 'source'], @@ -440,16 +448,16 @@ class PublishSwitchAPITestRepo(APITest): 'Storage': '', 'Suite': ''} - self.check_equal(resp.status_code, 200) - self.check_equal(resp.json(), repo_expected) + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) self.check_exists( "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") self.check_not_exists( "public/" + prefix + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") - self.check_equal(self.delete("/api/publish/" + - prefix + "/wheezy").status_code, 200) + self.check_equal(self.delete_task("/api/publish/" + prefix + "/wheezy").json()['State'], 2) self.check_not_exists("public/" + prefix + "dists/") @@ -470,23 +478,21 @@ class PublishSwitchAPISkipCleanupTestRepo(APITest): "pyspi_0.6.1-1.3.dsc", "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz", "pyspi-0.6.1-1.3.stripped.dsc").status_code, 200) - self.check_equal( - self.post("/api/repos/" + repo_name + "/file/" + d).status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], 2) snapshot1_name = self.random_name() - self.check_equal(self.post("/api/repos/" + repo_name + - '/snapshots', json={'Name': snapshot1_name}).status_code, 201) + self.check_equal(self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot1_name}).json()['State'], 2) prefix = self.random_name() - resp = self.post("/api/publish/" + prefix, - json={ - "Architectures": ["i386", "source"], - "SourceKind": "snapshot", - "Sources": [{"Name": snapshot1_name}], - "Signing": DefaultSigningOptions, - }) + resp = self.post_task("/api/publish/" + prefix, + json={ + "Architectures": ["i386", "source"], + "SourceKind": "snapshot", + "Sources": [{"Name": snapshot1_name}], + "Signing": DefaultSigningOptions, + }) - self.check_equal(resp.status_code, 201) + self.check_equal(resp.json()['State'], 2) repo_expected = { 'AcquireByHash': False, 'Architectures': ['i386', 'source'], @@ -501,7 +507,9 @@ class PublishSwitchAPISkipCleanupTestRepo(APITest): 'Sources': [{'Component': 'main', 'Name': snapshot1_name}], 'Storage': '', 'Suite': ''} - self.check_equal(resp.json(), repo_expected) + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) self.check_not_exists( "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") @@ -510,16 +518,16 @@ class PublishSwitchAPISkipCleanupTestRepo(APITest): # Publish two snapshots, so that deleting one while skipping cleanup will # not delete the whole prefix. - resp = self.post("/api/publish/" + prefix, - json={ - "Architectures": ["i386", "source"], - "Distribution": "otherdist", - "SourceKind": "snapshot", - "Sources": [{"Name": snapshot1_name}], - "Signing": DefaultSigningOptions, - }) + resp = self.post_task("/api/publish/" + prefix, + json={ + "Architectures": ["i386", "source"], + "Distribution": "otherdist", + "SourceKind": "snapshot", + "Sources": [{"Name": snapshot1_name}], + "Signing": DefaultSigningOptions, + }) - self.check_equal(resp.status_code, 201) + self.check_equal(resp.json()['State'], 2) repo_expected = { 'AcquireByHash': False, 'Architectures': ['i386', 'source'], @@ -534,28 +542,29 @@ class PublishSwitchAPISkipCleanupTestRepo(APITest): 'Sources': [{'Component': 'main', 'Name': snapshot1_name}], 'Storage': '', 'Suite': ''} - self.check_equal(resp.json(), repo_expected) + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) d = self.random_name() self.check_equal(self.upload("/api/files/" + d, - "libboost-program-options-dev_1.49.0.1_i386.deb").status_code, 200) - self.check_equal( - self.post("/api/repos/" + repo_name + "/file/" + d).status_code, 200) + "libboost-program-options-dev_1.49.0.1_i386.deb").status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], 2) - self.check_equal(self.delete("/api/repos/" + repo_name + "/packages/", - json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89']}).status_code, 200) + self.check_equal(self.delete_task("/api/repos/" + repo_name + "/packages/", + json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89']}).json()['State'], 2) snapshot2_name = self.random_name() - self.check_equal(self.post("/api/repos/" + repo_name + - '/snapshots', json={'Name': snapshot2_name}).status_code, 201) + self.check_equal(self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot2_name}).json()['State'], 2) - resp = self.put("/api/publish/" + prefix + "/wheezy", - json={ - "Snapshots": [{"Component": "main", "Name": snapshot2_name}], - "Signing": DefaultSigningOptions, - "SkipCleanup": True, - "SkipContents": True, - }) + resp = self.put_task("/api/publish/" + prefix + "/wheezy", + json={ + "Snapshots": [{"Component": "main", "Name": snapshot2_name}], + "Signing": DefaultSigningOptions, + "SkipCleanup": True, + "SkipContents": True, + }) + self.check_equal(resp.json()['State'], 2) repo_expected = { 'AcquireByHash': False, 'Architectures': ['i386', 'source'], @@ -571,17 +580,13 @@ class PublishSwitchAPISkipCleanupTestRepo(APITest): 'Storage': '', 'Suite': ''} - self.check_equal(resp.status_code, 200) - self.check_equal(resp.json(), repo_expected) + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) - self.check_exists( - "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") - self.check_exists("public/" + prefix + - "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") + self.check_exists("public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + self.check_exists("public/" + prefix + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") - self.check_equal(self.delete("/api/publish/" + prefix + - "/wheezy", params={"SkipCleanup": "1"}).status_code, 200) - self.check_exists( - "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") - self.check_exists("public/" + prefix + - "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") + self.check_equal(self.delete_task("/api/publish/" + prefix + "/wheezy", params={"SkipCleanup": "1"}).json()['State'], 2) + self.check_exists("public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + self.check_exists("public/" + prefix + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") diff --git a/system/t12_api/repos.py b/system/t12_api/repos.py index f327df2a..8851c71d 100644 --- a/system/t12_api/repos.py +++ b/system/t12_api/repos.py @@ -40,7 +40,7 @@ class ReposAPITestCreateIndexDelete(APITest): names = [repo["Name"] for repo in repos] assert repo_name in names - self.check_equal(self.delete("/api/repos/" + repo_name).status_code, 200) + self.check_equal(self.delete_task("/api/repos/" + repo_name).json()['State'], 2) self.check_equal(self.delete("/api/repos/" + repo_name).status_code, 404) self.check_equal(self.get("/api/repos/" + repo_name).status_code, 404) @@ -59,26 +59,26 @@ class ReposAPITestCreateIndexDelete(APITest): self.check_equal(self.upload("/api/files/" + d, "pyspi_0.6.1-1.3.dsc", "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz").status_code, 200) - resp = self.post("/api/repos/" + repo_name + "/file/" + d) - self.check_equal(resp.status_code, 200) + resp = self.post_task("/api/repos/" + repo_name + "/file/" + d) + self.check_equal(resp.json()['State'], 2) - self.check_equal(self.post("/api/repos/" + repo_name + "/snapshots", json={"Name": repo_name}).status_code, 201) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/snapshots", json={"Name": repo_name}).json()['State'], 2) - self.check_equal(self.post("/api/publish", + self.check_equal(self.post_task("/api/publish", json={ "SourceKind": "local", "Sources": [{"Name": repo_name}], "Signing": DefaultSigningOptions, - }).status_code, 201) + }).json()['State'], 2) # repo is not deletable while it is published - self.check_equal(self.delete("/api/repos/" + repo_name).status_code, 409) - self.check_equal(self.delete("/api/repos/" + repo_name, params={"force": "1"}).status_code, 409) + self.check_equal(self.delete_task("/api/repos/" + repo_name).json()['State'], 3) + self.check_equal(self.delete_task("/api/repos/" + repo_name, params={"force": "1"}).json()['State'], 3) # drop published - self.check_equal(self.delete("/api/publish//" + distribution).status_code, 200) - self.check_equal(self.delete("/api/repos/" + repo_name).status_code, 409) - self.check_equal(self.delete("/api/repos/" + repo_name, params={"force": "1"}).status_code, 200) + self.check_equal(self.delete_task("/api/publish//" + distribution).json()['State'], 2) + self.check_equal(self.delete_task("/api/repos/" + repo_name).json()['State'], 3) + self.check_equal(self.delete_task("/api/repos/" + repo_name, params={"force": "1"}).json()['State'], 2) self.check_equal(self.get("/api/repos/" + repo_name).status_code, 404) @@ -95,14 +95,16 @@ class ReposAPITestAdd(APITest): self.check_equal(self.upload("/api/files/" + d, "pyspi_0.6.1-1.3.dsc", "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz").status_code, 200) - resp = self.post("/api/repos/" + repo_name + "/file/" + d) + resp = self.post_task("/api/repos/" + repo_name + "/file/" + d) + self.check_equal(resp.json()['State'], 2) + + resp = self.get("/api/tasks/" + str(resp.json()['ID']) + "/output") self.check_equal(resp.status_code, 200) - self.check_equal(resp.json(), { - u'FailedFiles': [], - u'Report': { - u'Added': [u'pyspi_0.6.1-1.3_source added'], - u'Removed': [], - u'Warnings': []}}) + + self.check_in("Added: pyspi_0.6.1-1.3_source added", resp.content) + self.check_equal("Removed: " in resp.content, False) + self.check_equal("Failed files: " in resp.content, False) + self.check_equal("Warnings: " in resp.content, False) self.check_equal(self.get("/api/repos/" + repo_name + "/packages").json(), ['Psource pyspi 0.6.1-1.3 3a8b37cbd9a3559e']) @@ -122,7 +124,7 @@ class ReposAPITestAddNotFullRemove(APITest): self.check_equal(self.upload("/api/files/" + d, "pyspi_0.6.1-1.3.dsc", "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz", "aptly.pub").status_code, 200) - self.check_equal(self.post("/api/repos/" + repo_name + "/file/" + d).status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], 2) self.check_equal(self.get("/api/repos/" + repo_name + "/packages").json(), ['Psource pyspi 0.6.1-1.3 3a8b37cbd9a3559e']) self.check_exists("upload/" + d + "/aptly.pub") @@ -142,7 +144,7 @@ class ReposAPITestAddNoRemove(APITest): self.check_equal(self.upload("/api/files/" + d, "pyspi_0.6.1-1.3.dsc", "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz").status_code, 200) - self.check_equal(self.post("/api/repos/" + repo_name + "/file/" + d, params={"noRemove": 1}).status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d, params={"noRemove": 1}).json()['State'], 2) self.check_equal(self.get("/api/repos/" + repo_name + "/packages").json(), ['Psource pyspi 0.6.1-1.3 3a8b37cbd9a3559e']) self.check_exists("upload/" + d + "/pyspi_0.6.1-1.3.dsc") @@ -161,14 +163,16 @@ class ReposAPITestAddFile(APITest): self.check_equal(self.upload("/api/files/" + d, "libboost-program-options-dev_1.49.0.1_i386.deb").status_code, 200) - resp = self.post("/api/repos/" + repo_name + "/file/" + d + "/libboost-program-options-dev_1.49.0.1_i386.deb") + resp = self.post_task("/api/repos/" + repo_name + "/file/" + d + "/libboost-program-options-dev_1.49.0.1_i386.deb") + self.check_equal(resp.json()['State'], 2) + + resp = self.get("/api/tasks/" + str(resp.json()['ID']) + "/output") self.check_equal(resp.status_code, 200) - self.check_equal(resp.json(), { - u'FailedFiles': [], - u'Report': { - u'Added': [u'libboost-program-options-dev_1.49.0.1_i386 added'], - u'Removed': [], - u'Warnings': []}}) + + self.check_in("Added: libboost-program-options-dev_1.49.0.1_i386 added", resp.content) + self.check_equal("Removed: " in resp.content, False) + self.check_equal("Failed files: " in resp.content, False) + self.check_equal("Warnings: " in resp.content, False) self.check_equal(self.get("/api/repos/" + repo_name + "/packages").json(), ['Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378']) @@ -191,15 +195,12 @@ class ReposAPITestInclude(APITest): "hardlink_0.2.1_amd64.deb", directory='changes') self.check_equal(resp.status_code, 200) - resp = self.post("/api/repos/" + repo_name + "/include/" + d, params={"ignoreSignature": 1}) - self.check_equal(resp.status_code, 200) - self.check_equal(resp.json(), { - u'FailedFiles': [], - u'Report': { - u'Added': [u'hardlink_0.2.1_source added', 'hardlink_0.2.1_amd64 added'], - u'Removed': [], - u'Warnings': []}}) + resp = self.post_task("/api/repos/" + repo_name + "/include/" + d, params={"ignoreSignature": 1}) + self.check_equal(resp.json()['State'], 2) + resp = self.get("/api/tasks/" + str(resp.json()['ID']) + "/output") + self.check_equal(resp.status_code, 200) + self.check_in("Added: hardlink_0.2.1_source added, hardlink_0.2.1_amd64 added", resp.content) self.check_equal( sorted(self.get("/api/repos/" + repo_name + "/packages").json()), [u'Pamd64 hardlink 0.2.1 daf8fcecbf8210ad', u'Psource hardlink 0.2.1 8f72df429d7166e5'] @@ -223,7 +224,7 @@ class ReposAPITestShowQuery(APITest): "libboost-program-options-dev_1.49.0.1_i386.deb", "pyspi_0.6.1-1.3.dsc", "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz", "pyspi-0.6.1-1.3.stripped.dsc").status_code, 200) - self.check_equal(self.post("/api/repos/" + repo_name + "/file/" + d).status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], 2) self.check_equal(sorted(self.get("/api/repos/" + repo_name + "/packages", params={"q": "pyspi"}).json()), ['Psource pyspi 0.6.1-1.3 3a8b37cbd9a3559e', 'Psource pyspi 0.6.1-1.4 f8f1daa806004e89']) @@ -256,13 +257,13 @@ class ReposAPITestAddMultiple(APITest): "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz", "pyspi-0.6.1-1.3.stripped.dsc").status_code, 200) - self.check_equal(self.post("/api/repos/" + repo_name + "/file/" + d + "/pyspi_0.6.1-1.3.dsc", - params={"noRemove": 1}).status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d + "/pyspi_0.6.1-1.3.dsc", + params={"noRemove": 1}).json()['State'], 2) self.check_equal(sorted(self.get("/api/repos/" + repo_name + "/packages").json()), ['Psource pyspi 0.6.1-1.3 3a8b37cbd9a3559e']) - self.check_equal(self.post("/api/repos/" + repo_name + "/file/" + d + "/pyspi-0.6.1-1.3.stripped.dsc").status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d + "/pyspi-0.6.1-1.3.stripped.dsc").json()['State'], 2) self.check_equal(sorted(self.get("/api/repos/" + repo_name + "/packages").json()), ['Psource pyspi 0.6.1-1.3 3a8b37cbd9a3559e', 'Psource pyspi 0.6.1-1.4 f8f1daa806004e89']) @@ -284,34 +285,34 @@ class ReposAPITestPackagesAddDelete(APITest): "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz", "pyspi-0.6.1-1.3.stripped.dsc").status_code, 200) - self.check_equal(self.post("/api/repos/" + repo_name + "/file/" + d).status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], 2) self.check_equal(sorted(self.get("/api/repos/" + repo_name + "/packages").json()), ['Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378', 'Psource pyspi 0.6.1-1.3 3a8b37cbd9a3559e', 'Psource pyspi 0.6.1-1.4 f8f1daa806004e89']) - self.check_equal(self.post("/api/repos/" + repo_name + "/packages/", - json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89']}).status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/packages/", + json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89']}).json()['State'], 2) self.check_equal(sorted(self.get("/api/repos/" + repo_name + "/packages").json()), ['Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378', 'Psource pyspi 0.6.1-1.3 3a8b37cbd9a3559e', 'Psource pyspi 0.6.1-1.4 f8f1daa806004e89']) - self.check_equal(self.post("/api/repos/" + repo_name + "/packages/", + self.check_equal(self.post_task("/api/repos/" + repo_name + "/packages/", json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89', - 'Psource no-such-package 0.6.1-1.4 f8f1daa806004e89']}).status_code, 404) + 'Psource no-such-package 0.6.1-1.4 f8f1daa806004e89']}).json()['State'], 3) - self.check_equal(self.delete("/api/repos/" + repo_name + "/packages/", - json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89']}).status_code, 200) + self.check_equal(self.delete_task("/api/repos/" + repo_name + "/packages/", + json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89']}).json()['State'], 2) self.check_equal(sorted(self.get("/api/repos/" + repo_name + "/packages").json()), ['Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378', 'Psource pyspi 0.6.1-1.3 3a8b37cbd9a3559e']) - self.check_equal(self.post("/api/repos/" + repo_name + "/packages/", - json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89']}).status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/packages/", + json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89']}).json()['State'], 2) self.check_equal(sorted(self.get("/api/repos/" + repo_name + "/packages").json()), ['Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378', @@ -322,9 +323,9 @@ class ReposAPITestPackagesAddDelete(APITest): self.check_equal(self.post("/api/repos", json={"Name": repo_name2, "Comment": "fun repo"}).status_code, 201) - self.check_equal(self.post("/api/repos/" + repo_name2 + "/packages/", + self.check_equal(self.post_task("/api/repos/" + repo_name2 + "/packages/", json={"PackageRefs": ['Psource pyspi 0.6.1-1.4 f8f1daa806004e89', - 'Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378']}).status_code, 200) + 'Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378']}).json()['State'], 2) self.check_equal(sorted(self.get("/api/repos/" + repo_name2 + "/packages").json()), ['Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378', diff --git a/system/t12_api/snapshots.py b/system/t12_api/snapshots.py index da139c83..626011d7 100644 --- a/system/t12_api/snapshots.py +++ b/system/t12_api/snapshots.py @@ -12,9 +12,8 @@ class SnapshotsAPITestCreateShowEmpty(APITest): u'Name': snapshot_name} # create empty snapshot - resp = self.post("/api/snapshots", json=snapshot_desc) - self.check_subset(snapshot_desc, resp.json()) - self.check_equal(resp.status_code, 201) + resp = self.post_task("/api/snapshots", json=snapshot_desc) + self.check_equal(resp.json()['State'], 2) self.check_subset(snapshot_desc, self.get("/api/snapshots/" + snapshot_name).json()) self.check_equal(self.get("/api/snapshots/" + snapshot_name).status_code, 200) @@ -26,8 +25,8 @@ class SnapshotsAPITestCreateShowEmpty(APITest): self.check_equal(self.get("/api/snapshots/" + self.random_name()).status_code, 404) # create snapshot with duplicate name - resp = self.post("/api/snapshots", json=snapshot_desc) - self.check_equal(resp.status_code, 400) + resp = self.post_task("/api/snapshots", json=snapshot_desc) + self.check_equal(resp.json()['State'], 3) class SnapshotsAPITestCreateFromRefs(APITest): @@ -47,9 +46,11 @@ class SnapshotsAPITestCreateFromRefs(APITest): # create empty snapshot empty_snapshot_name = self.random_name() - resp = self.post("/api/snapshots", json={"Name": empty_snapshot_name}) - self.check_equal(resp.status_code, 201) - self.check_equal(resp.json()['Description'], 'Created as empty') + resp = self.post_task("/api/snapshots", json={"Name": empty_snapshot_name}) + self.check_equal(resp.json()['State'], 2) + self.check_equal( + self.get("/api/snapshots/" + empty_snapshot_name).json()['Description'], "Created as empty" + ) # create and upload package to repo to register package in DB repo_name = self.random_name() @@ -57,16 +58,18 @@ class SnapshotsAPITestCreateFromRefs(APITest): d = self.random_name() self.check_equal(self.upload("/api/files/" + d, "libboost-program-options-dev_1.49.0.1_i386.deb").status_code, 200) - self.check_equal(self.post("/api/repos/" + repo_name + "/file/" + d).status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], 2) # create snapshot with empty snapshot as source and package snapshot = snapshot_desc.copy() snapshot['PackageRefs'] = ["Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378"] snapshot['SourceSnapshots'] = [empty_snapshot_name] - resp = self.post("/api/snapshots", json=snapshot) - self.check_equal(resp.status_code, 201) + resp = self.post_task("/api/snapshots", json=snapshot) + self.check_equal(resp.json()['State'], 2) snapshot.pop('SourceSnapshots') snapshot.pop('PackageRefs') + resp = self.get("/api/snapshots/" + snapshot_name) + self.check_equal(resp.status_code, 200) self.check_subset(snapshot, resp.json()) self.check_subset(snapshot, self.get("/api/snapshots/" + snapshot_name).json()) @@ -75,10 +78,10 @@ class SnapshotsAPITestCreateFromRefs(APITest): self.check_equal(resp.json(), ["Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378"]) # create snapshot with unreferenced package - resp = self.post("/api/snapshots", json={ + resp = self.post_task("/api/snapshots", json={ "Name": self.random_name(), "PackageRefs": ["Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378", "Pamd64 no-such-package 1.2 91"]}) - self.check_equal(resp.status_code, 404) + self.check_equal(resp.json()['State'], 3) # list snapshots resp = self.get("/api/snapshots", params={"sort": "time"}) @@ -96,8 +99,8 @@ class SnapshotsAPITestCreateFromRepo(APITest): snapshot_name = self.random_name() self.check_equal(self.post("/api/repos", json={"Name": repo_name}).status_code, 201) - resp = self.post("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot_name}) - self.check_equal(resp.status_code, 201) + resp = self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot_name}) + self.check_equal(resp.json()['State'], 2) self.check_equal([], self.get("/api/snapshots/" + snapshot_name + "/packages", params={"format": "details"}).json()) @@ -106,10 +109,10 @@ class SnapshotsAPITestCreateFromRepo(APITest): self.check_equal(self.upload("/api/files/" + d, "libboost-program-options-dev_1.49.0.1_i386.deb").status_code, 200) - self.check_equal(self.post("/api/repos/" + repo_name + "/file/" + d).status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], 2) - resp = self.post("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot_name}) - self.check_equal(resp.status_code, 201) + resp = self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot_name}) + self.check_equal(resp.json()['State'], 2) self.check_equal(self.get("/api/snapshots/" + snapshot_name).status_code, 200) self.check_subset({u'Architecture': 'i386', @@ -126,8 +129,8 @@ class SnapshotsAPITestCreateFromRepo(APITest): params={"format": "details", "q": "Version (> 0.6.1-1.4)"}).json()[0]) # duplicate snapshot name - resp = self.post("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot_name}) - self.check_equal(resp.status_code, 400) + resp = self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot_name}) + self.check_equal(resp.json()['State'], 3) class SnapshotsAPITestCreateUpdate(APITest): @@ -139,13 +142,13 @@ class SnapshotsAPITestCreateUpdate(APITest): snapshot_desc = {u'Description': u'fun snapshot', u'Name': snapshot_name} - resp = self.post("/api/snapshots", json=snapshot_desc) - self.check_equal(resp.status_code, 201) + resp = self.post_task("/api/snapshots", json=snapshot_desc) + self.check_equal(resp.json()['State'], 2) new_snapshot_name = self.random_name() - resp = self.put("/api/snapshots/" + snapshot_name, json={'Name': new_snapshot_name, - 'Description': 'New description'}) - self.check_equal(resp.status_code, 200) + resp = self.put_task("/api/snapshots/" + snapshot_name, json={'Name': new_snapshot_name, + 'Description': 'New description'}) + self.check_equal(resp.json()['State'], 2) resp = self.get("/api/snapshots/" + new_snapshot_name) self.check_equal(resp.status_code, 200) @@ -153,9 +156,9 @@ class SnapshotsAPITestCreateUpdate(APITest): "Description": "New description"}, resp.json()) # duplicate name - resp = self.put("/api/snapshots/" + new_snapshot_name, json={'Name': new_snapshot_name, - 'Description': 'New description'}) - self.check_equal(resp.status_code, 409) + resp = self.put_task("/api/snapshots/" + new_snapshot_name, json={'Name': new_snapshot_name, + 'Description': 'New description'}) + self.check_equal(resp.json()['State'], 3) # missing snapshot resp = self.put("/api/snapshots/" + snapshot_name, json={}) @@ -172,36 +175,42 @@ class SnapshotsAPITestCreateDelete(APITest): u'Name': snapshot_name} # deleting unreferenced snapshot - resp = self.post("/api/snapshots", json=snapshot_desc) - self.check_equal(resp.status_code, 201) + resp = self.post_task("/api/snapshots", json=snapshot_desc) + self.check_equal(resp.json()['State'], 2) - self.check_equal(self.delete("/api/snapshots/" + snapshot_name).status_code, 200) + self.check_equal(self.delete_task("/api/snapshots/" + snapshot_name).json()['State'], 2) self.check_equal(self.get("/api/snapshots/" + snapshot_name).status_code, 404) # deleting referenced snapshot snap1, snap2 = self.random_name(), self.random_name() - self.check_equal(self.post("/api/snapshots", json={"Name": snap1}).status_code, 201) - self.check_equal(self.post("/api/snapshots", json={"Name": snap2, "SourceSnapshots": [snap1]}).status_code, 201) + self.check_equal(self.post_task("/api/snapshots", json={"Name": snap1}).json()['State'], 2) + self.check_equal( + self.post_task( + "/api/snapshots", json={"Name": snap2, "SourceSnapshots": [snap1]} + ).json()['State'], 2 + ) - self.check_equal(self.delete("/api/snapshots/" + snap1).status_code, 409) + self.check_equal(self.delete_task("/api/snapshots/" + snap1).json()['State'], 3) self.check_equal(self.get("/api/snapshots/" + snap1).status_code, 200) - self.check_equal(self.delete("/api/snapshots/" + snap1, params={"force": "1"}).status_code, 200) + self.check_equal(self.delete_task("/api/snapshots/" + snap1, params={"force": "1"}).json()['State'], 2) self.check_equal(self.get("/api/snapshots/" + snap1).status_code, 404) # deleting published snapshot - resp = self.post("/api/publish", - json={ - "SourceKind": "snapshot", - "Distribution": "trusty", - "Architectures": ["i386"], - "Sources": [{"Name": snap2}], - "Signing": DefaultSigningOptions, - }) - self.check_equal(resp.status_code, 201) + resp = self.post_task( + "/api/publish", + json={ + "SourceKind": "snapshot", + "Distribution": "trusty", + "Architectures": ["i386"], + "Sources": [{"Name": snap2}], + "Signing": DefaultSigningOptions, + } + ) + self.check_equal(resp.json()['State'], 2) - self.check_equal(self.delete("/api/snapshots/" + snap2).status_code, 409) - self.check_equal(self.delete("/api/snapshots/" + snap2, params={"force": "1"}).status_code, 409) + self.check_equal(self.delete_task("/api/snapshots/" + snap2).json()['State'], 3) + self.check_equal(self.delete_task("/api/snapshots/" + snap2, params={"force": "1"}).json()['State'], 3) class SnapshotsAPITestSearch(APITest): @@ -218,10 +227,10 @@ class SnapshotsAPITestSearch(APITest): self.check_equal(self.upload("/api/files/" + d, "libboost-program-options-dev_1.49.0.1_i386.deb").status_code, 200) - self.check_equal(self.post("/api/repos/" + repo_name + "/file/" + d).status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], 2) - resp = self.post("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot_name}) - self.check_equal(resp.status_code, 201) + resp = self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot_name}) + self.check_equal(resp.json()['State'], 2) resp = self.get("/api/snapshots/" + snapshot_name + "/packages", params={"q": "libboost-program-options-dev", "format": "details"}) @@ -252,13 +261,13 @@ class SnapshotsAPITestDiff(APITest): self.check_equal(self.upload("/api/files/" + d, "libboost-program-options-dev_1.49.0.1_i386.deb").status_code, 200) - self.check_equal(self.post("/api/repos/" + repo_name + "/file/" + d).status_code, 200) + self.check_equal(self.post_task("/api/repos/" + repo_name + "/file/" + d).json()['State'], 2) - resp = self.post("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshots[0]}) - self.check_equal(resp.status_code, 201) + resp = self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshots[0]}) + self.check_equal(resp.json()['State'], 2) - resp = self.post("/api/snapshots", json={'Name': snapshots[1]}) - self.check_equal(resp.status_code, 201) + resp = self.post_task("/api/snapshots", json={'Name': snapshots[1]}) + self.check_equal(resp.json()['State'], 2) resp = self.get("/api/snapshots/" + snapshots[0] + "/diff/" + snapshots[1]) self.check_equal(resp.status_code, 200) diff --git a/task/list.go b/task/list.go new file mode 100644 index 00000000..c2844658 --- /dev/null +++ b/task/list.go @@ -0,0 +1,195 @@ +package task + +import ( + "fmt" + "sync" +) + +// List is handling list of processes and makes sure +// only one process is executed at the time +type List struct { + *sync.Mutex + tasks []*Task + wgTasks map[int]*sync.WaitGroup + wg *sync.WaitGroup + // resources currently used by running tasks + usedResources *ResourcesSet + idCounter int +} + +// NewList creates empty task list +func NewList() *List { + list := &List{ + Mutex: &sync.Mutex{}, + tasks: make([]*Task, 0), + wgTasks: make(map[int]*sync.WaitGroup), + wg: &sync.WaitGroup{}, + usedResources: NewResourcesSet(), + } + return list +} + +// GetTasks gets complete list of tasks +func (list *List) GetTasks() []Task { + var tasks []Task + list.Lock() + for _, task := range list.tasks { + tasks = append(tasks, *task) + } + + list.Unlock() + return tasks +} + +// DeleteTaskByID deletes given task from list. Only finished +// tasks can be deleted. +func (list *List) DeleteTaskByID(ID int) (Task, error) { + list.Lock() + defer list.Unlock() + + tasks := list.tasks + for i, task := range tasks { + if task.ID == ID { + if task.State == SUCCEEDED || task.State == FAILED { + list.tasks = append(tasks[:i], tasks[i+1:]...) + return *task, nil + } + + return *task, fmt.Errorf("Task with id %v is still running", ID) + } + } + + return Task{}, fmt.Errorf("Could not find task with id %v", ID) +} + +// GetTaskByID returns task with given id +func (list *List) GetTaskByID(ID int) (Task, error) { + list.Lock() + tasks := list.tasks + list.Unlock() + + for _, task := range tasks { + if task.ID == ID { + return *task, nil + } + } + + return Task{}, fmt.Errorf("Could not find task with id %v", ID) +} + +// GetTaskOutputByID returns standard output of task with given id +func (list *List) GetTaskOutputByID(ID int) (string, error) { + task, err := list.GetTaskByID(ID) + + if err != nil { + return "", err + } + + return task.output.String(), nil +} + +// GetTaskDetailByID returns detail of task with given id +func (list *List) GetTaskDetailByID(ID int) (interface{}, error) { + task, err := list.GetTaskByID(ID) + + if err != nil { + return nil, err + } + + detail := task.detail.Load() + if detail == nil { + return struct{}{}, nil + } + + return detail, nil +} + +// RunTaskInBackground creates task and runs it in background. It won't be run and an error +// returned if there are running tasks which are using needed resources already. +func (list *List) RunTaskInBackground(name string, resources []string, process Process) (Task, *ResourceConflictError) { + list.Lock() + defer list.Unlock() + + tasks := list.usedResources.UsedBy(resources) + if len(tasks) > 0 { + conflictError := &ResourceConflictError{ + Tasks: tasks, + Message: "Needed resources are used by other tasks.", + } + return Task{}, conflictError + } + + list.idCounter++ + wgTask := &sync.WaitGroup{} + task := NewTask(process, name, list.idCounter) + + list.tasks = append(list.tasks, task) + list.wgTasks[task.ID] = wgTask + list.usedResources.MarkInUse(resources, task) + + list.wg.Add(1) + wgTask.Add(1) + + go func() { + + list.Lock() + { + task.State = RUNNING + } + list.Unlock() + + err := process(task.output, task.detail) + + list.Lock() + { + if err != nil { + task.output.Printf("Task failed with error: %v", err) + task.State = FAILED + } else { + task.output.Print("Task succeeded") + task.State = SUCCEEDED + } + + list.usedResources.Free(resources) + + wgTask.Done() + list.wg.Done() + } + list.Unlock() + }() + + return *task, nil +} + +// Clear removes finished tasks from list +func (list *List) Clear() { + list.Lock() + + var tasks []*Task + for _, task := range list.tasks { + if task.State == IDLE || task.State == RUNNING { + tasks = append(tasks, task) + } + } + list.tasks = tasks + + list.Unlock() +} + +// Wait waits till all tasks are processed +func (list *List) Wait() { + list.wg.Wait() +} + +// WaitForTaskByID waits for task with given id to be processed +func (list *List) WaitForTaskByID(ID int) (Task, error) { + list.Lock() + wgTask, ok := list.wgTasks[ID] + list.Unlock() + if !ok { + return Task{}, fmt.Errorf("Could not find task with id %v", ID) + } + + wgTask.Wait() + return list.GetTaskByID(ID) +} diff --git a/task/list_test.go b/task/list_test.go new file mode 100644 index 00000000..dd0ac207 --- /dev/null +++ b/task/list_test.go @@ -0,0 +1,51 @@ +package task + +import ( + "errors" + + // need to import as check as otherwise List is redeclared + check "gopkg.in/check.v1" +) + +type ListSuite struct{} + +var _ = check.Suite(&ListSuite{}) + +func (s *ListSuite) TestList(c *check.C) { + list := NewList() + c.Assert(len(list.GetTasks()), check.Equals, 0) + + task, err := list.RunTaskInBackground("Successful task", nil, func(out *Output, detail *Detail) error { + return nil + }) + c.Assert(err, check.IsNil) + list.WaitForTaskByID(task.ID) + + tasks := list.GetTasks() + c.Assert(len(tasks), check.Equals, 1) + task, _ = list.GetTaskByID(task.ID) + c.Check(task.State, check.Equals, SUCCEEDED) + output, _ := list.GetTaskOutputByID(task.ID) + c.Check(output, check.Equals, "Task succeeded") + detail, _ := list.GetTaskDetailByID(task.ID) + c.Check(detail, check.Equals, struct{}{}) + + task, err = list.RunTaskInBackground("Faulty task", nil, func(out *Output, detail *Detail) error { + detail.Store("Details") + out.WriteString("Test Progress\n") + return errors.New("Task failed") + }) + c.Assert(err, check.IsNil) + list.WaitForTaskByID(task.ID) + + tasks = list.GetTasks() + c.Assert(len(tasks), check.Equals, 2) + task, _ = list.GetTaskByID(task.ID) + c.Check(task.State, check.Equals, FAILED) + output, _ = list.GetTaskOutputByID(task.ID) + c.Check(output, check.Equals, "Test Progress\nTask failed with error: Task failed") + detail, _ = list.GetTaskDetailByID(task.ID) + c.Check(detail, check.Equals, "Details") + _, deleteErr := list.DeleteTaskByID(task.ID) + c.Check(deleteErr, check.IsNil) +} diff --git a/task/output.go b/task/output.go new file mode 100644 index 00000000..4f451df3 --- /dev/null +++ b/task/output.go @@ -0,0 +1,95 @@ +package task + +import ( + "bytes" + "fmt" + "sync" +) + +// Output represents a safe standard output of task +// which is compatbile to AptlyProgress. +type Output struct { + mu *sync.Mutex + output *bytes.Buffer +} + +// NewOutput creates new output +func NewOutput() *Output { + return &Output{mu: &sync.Mutex{}, output: &bytes.Buffer{}} +} + +func (t *Output) String() string { + t.mu.Lock() + defer t.mu.Unlock() + + return t.output.String() +} + +// Write is used to determine how many bytes have been written +// not needed in our case. +func (t *Output) Write(p []byte) (n int, err error) { + return len(p), err +} + +// WriteString writes string to output +func (t *Output) WriteString(s string) (n int, err error) { + t.mu.Lock() + defer t.mu.Unlock() + return t.output.WriteString(s) +} + +// Start is needed for progress compatibility +func (t *Output) Start() { + // Not implemented +} + +// Shutdown is needed for progress compatibility +func (t *Output) Shutdown() { + // Not implemented +} + +// Flush is needed for progress compatibility +func (t *Output) Flush() { + // Not implemented +} + +// InitBar is needed for progress compatibility +func (t *Output) InitBar(count int64, isBytes bool) { + // Not implemented +} + +// ShutdownBar is needed for progress compatibility +func (t *Output) ShutdownBar() { + // Not implemented +} + +// AddBar is needed for progress compatibility +func (t *Output) AddBar(count int) { + // Not implemented +} + +// SetBar sets current position for progress bar +func (t *Output) SetBar(count int) { + // Not implemented +} + +// Printf does printf in a safe manner +func (t *Output) Printf(msg string, a ...interface{}) { + t.WriteString(fmt.Sprintf(msg, a...)) +} + +// Print does printf in a safe manner +func (t *Output) Print(msg string) { + t.WriteString(msg) +} + +// ColoredPrintf does printf in a safe manner + newline +// currently are no colors supported. +func (t *Output) ColoredPrintf(msg string, a ...interface{}) { + t.WriteString(fmt.Sprintf(msg+"\n", a...)) +} + +// PrintfStdErr does printf but in safe manner to output +func (t *Output) PrintfStdErr(msg string, a ...interface{}) { + t.WriteString(msg) +} diff --git a/task/resources.go b/task/resources.go new file mode 100644 index 00000000..703111d6 --- /dev/null +++ b/task/resources.go @@ -0,0 +1,93 @@ +package task + +import ( + "strings" +) + +// AllLocalReposResourcesKey to be used as resource key when all local repos are needed +const AllLocalReposResourcesKey = "__alllocalrepos__" + +// ResourceConflictError represents a list tasks +// using conflicitng resources +type ResourceConflictError struct { + Tasks []Task + Message string +} + +func (e *ResourceConflictError) Error() string { + return e.Message +} + +// ResourcesSet represents a set of task resources. +// A resource is represented by its unique key +type ResourcesSet struct { + set map[string]*Task +} + +// NewResourcesSet creates new instance of resources set +func NewResourcesSet() *ResourcesSet { + return &ResourcesSet{make(map[string]*Task)} +} + +// MarkInUse given resources as used by given task +func (r *ResourcesSet) MarkInUse(resources []string, task *Task) { + for _, resource := range resources { + r.set[resource] = task + } +} + +// UsedBy checks whether one of given resources +// is used by a task and if yes returns slice of such task +func (r *ResourcesSet) UsedBy(resources []string) []Task { + var tasks []Task + var task *Task + var found bool + + for _, resource := range resources { + + if resource == AllLocalReposResourcesKey { + for taskResource, task := range r.set { + if strings.HasPrefix(taskResource, "L") { + tasks = appendTask(tasks, task) + } + } + } + + task, found = r.set[resource] + if found { + tasks = appendTask(tasks, task) + } + } + + task, found = r.set[AllLocalReposResourcesKey] + if found { + tasks = appendTask(tasks, task) + } + + return tasks +} + +// appendTask only appends task to tasks slice if not already +// on slice +func appendTask(tasks []Task, task *Task) []Task { + needsAppending := true + for _, givenTask := range tasks { + if givenTask.ID == task.ID { + needsAppending = false + break + } + } + + if needsAppending { + return append(tasks, *task) + } + + return tasks +} + +// Free removes given resources from dependency set +func (r *ResourcesSet) Free(resources []string) { + for _, resource := range resources { + delete(r.set, resource) + } +} diff --git a/task/task.go b/task/task.go new file mode 100644 index 00000000..362d1351 --- /dev/null +++ b/task/task.go @@ -0,0 +1,50 @@ +package task + +import ( + "sync/atomic" +) + +// State task is in +type State int + +// Detail represents custom task details +type Detail struct { + atomic.Value +} + +// Process is a function implementing the actual task logic +type Process func(out *Output, detail *Detail) error + +const ( + // IDLE when task is waiting + IDLE State = iota + // RUNNING when task is running + RUNNING + // SUCCEEDED when task is successfully finished + SUCCEEDED + // FAILED when task failed + FAILED +) + +// Task represents as task in a queue encapsulates process code +type Task struct { + output *Output + detail *Detail + process Process + Name string + ID int + State State +} + +// NewTask creates new task +func NewTask(process Process, name string, ID int) *Task { + task := &Task{ + output: NewOutput(), + detail: &Detail{}, + process: process, + Name: name, + ID: ID, + State: IDLE, + } + return task +} diff --git a/task/task_test.go b/task/task_test.go new file mode 100644 index 00000000..a51aa87e --- /dev/null +++ b/task/task_test.go @@ -0,0 +1,12 @@ +package task + +import ( + "testing" + + check "gopkg.in/check.v1" +) + +// Launch gocheck tests +func Test(t *testing.T) { + check.TestingT(t) +}