New upstream version 1.5.0+ds1

This commit is contained in:
Roland Mas
2023-01-02 14:19:29 +01:00
parent 29e4ea6ec0
commit 5c4f97f88e
324 changed files with 15360 additions and 6668 deletions
+98 -42
View File
@@ -3,12 +3,16 @@ package api
import (
"fmt"
"log"
"net/http"
"sort"
"time"
"strconv"
"strings"
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/deb"
"github.com/aptly-dev/aptly/query"
"github.com/aptly-dev/aptly/task"
"github.com/gin-gonic/gin"
)
@@ -35,49 +39,14 @@ type dbRequest struct {
err chan<- error
}
// Flushes all collections which cache in-memory objects
func flushColections() {
// lock everything to eliminate in-progress calls
r := context.CollectionFactory().RemoteRepoCollection()
r.Lock()
defer r.Unlock()
l := context.CollectionFactory().LocalRepoCollection()
l.Lock()
defer l.Unlock()
s := context.CollectionFactory().SnapshotCollection()
s.Lock()
defer s.Unlock()
p := context.CollectionFactory().PublishedRepoCollection()
p.Lock()
defer p.Unlock()
// all collections locked, flush them
context.CollectionFactory().Flush()
}
// Periodically flushes CollectionFactory to free up memory used by
// collections, flushing caches.
//
// Should be run in goroutine!
func cacheFlusher() {
ticker := time.Tick(15 * time.Minute)
for {
<-ticker
flushColections()
}
}
var dbRequests chan dbRequest
// Acquire database lock and release it when not needed anymore.
//
// Should be run in a goroutine!
func acquireDatabase(requests <-chan dbRequest) {
func acquireDatabase() {
clients := 0
for request := range requests {
for request := range dbRequests {
var err error
switch request.kind {
@@ -94,7 +63,6 @@ func acquireDatabase(requests <-chan dbRequest) {
case releasedb:
clients--
if clients == 0 {
flushColections()
err = context.CloseDatabase()
} else {
err = nil
@@ -105,12 +73,100 @@ func acquireDatabase(requests <-chan dbRequest) {
}
}
// Should be called before database access is needed in any api call.
// Happens per default for each api call. It is important that you run
// runTaskInBackground to run a task which accquire database.
// Important do not forget to defer to releaseDatabaseConnection
func acquireDatabaseConnection() error {
if dbRequests == nil {
return nil
}
errCh := make(chan error)
dbRequests <- dbRequest{acquiredb, errCh}
return <-errCh
}
// Release database connection when not needed anymore
func releaseDatabaseConnection() error {
if dbRequests == nil {
return nil
}
errCh := make(chan error)
dbRequests <- dbRequest{releasedb, errCh}
return <-errCh
}
// runs tasks in background. Acquires database connection first.
func runTaskInBackground(name string, resources []string, proc task.Process) (task.Task, *task.ResourceConflictError) {
return context.TaskList().RunTaskInBackground(name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
err := acquireDatabaseConnection()
if err != nil {
return nil, err
}
defer releaseDatabaseConnection()
return proc(out, detail)
})
}
func truthy(value interface{}) bool {
if value == nil {
return false
}
switch value.(type) {
case string:
switch strings.ToLower(value.(string)) {
case "n", "no", "f", "false", "0", "off":
return false
default:
return true
}
case int:
return !(value.(int) == 0)
case bool:
return value.(bool)
}
return true
}
func maybeRunTaskInBackground(c *gin.Context, name string, resources []string, proc task.Process) {
// Run this task in background if configured globally or per-request
background := truthy(c.DefaultQuery("_async", strconv.FormatBool(context.Config().AsyncAPI)))
if background {
log.Println("Executing task asynchronously")
task, conflictErr := runTaskInBackground(name, resources, proc)
if conflictErr != nil {
c.AbortWithError(409, conflictErr)
return
}
c.JSON(202, task)
} else {
log.Println("Executing task synchronously")
out := context.Progress()
detail := task.Detail{}
retValue, err := proc(out, &detail)
if err != nil {
c.AbortWithError(retValue.Code, err)
return
}
if retValue != nil {
c.JSON(retValue.Code, retValue.Value)
} else {
c.JSON(http.StatusOK, nil)
}
}
}
// Common piece of code to show list of packages,
// with searching & details if requested
func showPackages(c *gin.Context, reflist *deb.PackageRefList) {
func showPackages(c *gin.Context, reflist *deb.PackageRefList, collectionFactory *deb.CollectionFactory) {
result := []*deb.Package{}
list, err := deb.NewPackageListFromRefList(reflist, context.CollectionFactory().PackageCollection(), nil)
list, err := deb.NewPackageListFromRefList(reflist, collectionFactory.PackageCollection(), nil)
if err != nil {
c.AbortWithError(404, err)
return
+117
View File
@@ -0,0 +1,117 @@
package api
import (
"encoding/json"
ctx "github.com/aptly-dev/aptly/context"
"github.com/gin-gonic/gin"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"testing"
"github.com/smira/flag"
. "gopkg.in/check.v1"
)
func Test(t *testing.T) {
TestingT(t)
}
type ApiSuite struct {
context *ctx.AptlyContext
flags *flag.FlagSet
configFile *os.File
router http.Handler
}
var _ = Suite(&ApiSuite{})
func createTestConfig() *os.File {
file, err := ioutil.TempFile("", "aptly")
if err != nil {
return nil
}
jsonString, err := json.Marshal(gin.H{
"architectures": []string{},
})
if err != nil {
return nil
}
file.Write(jsonString)
return file
}
func (s *ApiSuite) SetUpSuite(c *C) {
file := createTestConfig()
c.Assert(file, NotNil)
s.configFile = file
flags := flag.NewFlagSet("fakeFlags", flag.ContinueOnError)
flags.Bool("no-lock", false, "dummy")
flags.Int("db-open-attempts", 3, "dummy")
flags.String("config", s.configFile.Name(), "dummy")
flags.String("architectures", "", "dummy")
s.flags = flags
context, err := ctx.NewContext(s.flags)
c.Assert(err, IsNil)
s.context = context
s.router = Router(context)
}
func (s *ApiSuite) TearDownSuite(c *C) {
os.Remove(s.configFile.Name())
s.context.Shutdown()
}
func (s *ApiSuite) SetUpTest(c *C) {
}
func (s *ApiSuite) TearDownTest(c *C) {
}
func (s *ApiSuite) HTTPRequest(method string, url string, body io.Reader) (*httptest.ResponseRecorder, error) {
w := httptest.NewRecorder()
req, err := http.NewRequest(method, url, body)
if err != nil {
return nil, err
}
req.Header.Add("Content-Type", "application/json")
s.router.ServeHTTP(w, req)
return w, nil
}
func (s *ApiSuite) TestGetVersion(c *C) {
response, err := s.HTTPRequest("GET", "/api/version", nil)
c.Assert(err, IsNil)
c.Check(response.Code, Equals, 200)
c.Check(response.Body.String(), Matches, ".*Version.*")
}
func (s *ApiSuite) TestTruthy(c *C) {
c.Check(truthy("no"), Equals, false)
c.Check(truthy("n"), Equals, false)
c.Check(truthy("off"), Equals, false)
c.Check(truthy("false"), Equals, false)
c.Check(truthy("0"), Equals, false)
c.Check(truthy(false), Equals, false)
c.Check(truthy(0), Equals, false)
c.Check(truthy("y"), Equals, true)
c.Check(truthy("yes"), Equals, true)
c.Check(truthy("t"), Equals, true)
c.Check(truthy("true"), Equals, true)
c.Check(truthy("1"), Equals, true)
c.Check(truthy(true), Equals, true)
c.Check(truthy(1), Equals, true)
c.Check(truthy(nil), Equals, false)
c.Check(truthy("foobar"), Equals, true)
c.Check(truthy(-1), Equals, true)
c.Check(truthy(gin.H{}), Equals, true)
}
+180
View File
@@ -0,0 +1,180 @@
package api
import (
"fmt"
"sort"
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/deb"
"github.com/aptly-dev/aptly/task"
"github.com/aptly-dev/aptly/utils"
"github.com/gin-gonic/gin"
)
// POST /api/db/cleanup
func apiDbCleanup(c *gin.Context) {
resources := []string{string(task.AllResourcesKey)}
maybeRunTaskInBackground(c, "Clean up db", resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
var err error
collectionFactory := context.NewCollectionFactory()
// collect information about referenced packages...
existingPackageRefs := deb.NewPackageRefList()
out.Printf("Loading mirrors, local repos, snapshots and published repos...")
err = collectionFactory.RemoteRepoCollection().ForEach(func(repo *deb.RemoteRepo) error {
e := collectionFactory.RemoteRepoCollection().LoadComplete(repo)
if e != nil {
return e
}
if repo.RefList() != nil {
existingPackageRefs = existingPackageRefs.Merge(repo.RefList(), false, true)
}
return nil
})
if err != nil {
return nil, err
}
err = collectionFactory.LocalRepoCollection().ForEach(func(repo *deb.LocalRepo) error {
e := collectionFactory.LocalRepoCollection().LoadComplete(repo)
if e != nil {
return e
}
if repo.RefList() != nil {
existingPackageRefs = existingPackageRefs.Merge(repo.RefList(), false, true)
}
return nil
})
if err != nil {
return nil, err
}
err = collectionFactory.SnapshotCollection().ForEach(func(snapshot *deb.Snapshot) error {
e := collectionFactory.SnapshotCollection().LoadComplete(snapshot)
if e != nil {
return e
}
existingPackageRefs = existingPackageRefs.Merge(snapshot.RefList(), false, true)
return nil
})
if err != nil {
return nil, err
}
err = collectionFactory.PublishedRepoCollection().ForEach(func(published *deb.PublishedRepo) error {
if published.SourceKind != deb.SourceLocalRepo {
return nil
}
e := collectionFactory.PublishedRepoCollection().LoadComplete(published, collectionFactory)
if e != nil {
return e
}
for _, component := range published.Components() {
existingPackageRefs = existingPackageRefs.Merge(published.RefList(component), false, true)
}
return nil
})
if err != nil {
return nil, err
}
// ... and compare it to the list of all packages
out.Printf("Loading list of all packages...")
allPackageRefs := collectionFactory.PackageCollection().AllPackageRefs()
toDelete := allPackageRefs.Subtract(existingPackageRefs)
// delete packages that are no longer referenced
out.Printf("Deleting unreferenced packages (%d)...", toDelete.Len())
// database can't err as collection factory already constructed
db, _ := context.Database()
if toDelete.Len() > 0 {
batch := db.CreateBatch()
toDelete.ForEach(func(ref []byte) error {
collectionFactory.PackageCollection().DeleteByKey(ref, batch)
return nil
})
err = batch.Write()
if err != nil {
return nil, fmt.Errorf("unable to write to DB: %s", err)
}
}
// now, build a list of files that should be present in Repository (package pool)
out.Printf("Building list of files referenced by packages...")
referencedFiles := make([]string, 0, existingPackageRefs.Len())
err = existingPackageRefs.ForEach(func(key []byte) error {
pkg, err2 := collectionFactory.PackageCollection().ByKey(key)
if err2 != nil {
tail := ""
return fmt.Errorf("unable to load package %s: %s%s", string(key), err2, tail)
}
paths, err2 := pkg.FilepathList(context.PackagePool())
if err2 != nil {
return err2
}
referencedFiles = append(referencedFiles, paths...)
return nil
})
if err != nil {
return nil, err
}
sort.Strings(referencedFiles)
// build a list of files in the package pool
out.Printf("Building list of files in package pool...")
existingFiles, err := context.PackagePool().FilepathList(out)
if err != nil {
return nil, fmt.Errorf("unable to collect file paths: %s", err)
}
// find files which are in the pool but not referenced by packages
filesToDelete := utils.StrSlicesSubstract(existingFiles, referencedFiles)
// delete files that are no longer referenced
out.Printf("Deleting unreferenced files (%d)...", len(filesToDelete))
countFilesToDelete := len(filesToDelete)
taskDetail := struct {
TotalNumberOfPackagesToDelete int
RemainingNumberOfPackagesToDelete int
}{
countFilesToDelete, countFilesToDelete,
}
detail.Store(taskDetail)
if countFilesToDelete > 0 {
var size, totalSize int64
for _, file := range filesToDelete {
size, err = context.PackagePool().Remove(file)
if err != nil {
return nil, err
}
taskDetail.RemainingNumberOfPackagesToDelete--
detail.Store(taskDetail)
totalSize += size
}
out.Printf("Disk space freed: %s...", utils.HumanBytes(totalSize))
}
out.Printf("Compacting database...")
return nil, db.CompactDB()
})
}
+84
View File
@@ -0,0 +1,84 @@
package api
import (
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"
"github.com/aptly-dev/aptly/pgp"
"github.com/gin-gonic/gin"
)
// POST /api/gpg
func apiGPGAddKey(c *gin.Context) {
var b struct {
Keyserver string
GpgKeyID string
GpgKeyArmor string
Keyring string
}
if c.Bind(&b) != nil {
return
}
var err error
args := []string{"--no-default-keyring"}
keyring := "trustedkeys.gpg"
if len(b.Keyring) > 0 {
keyring = b.Keyring
}
args = append(args, "--keyring", keyring)
if len(b.Keyserver) > 0 {
args = append(args, "--keyserver", b.Keyserver)
}
if len(b.GpgKeyArmor) > 0 {
var tempdir string
tempdir, err = ioutil.TempDir(os.TempDir(), "aptly")
if err != nil {
c.AbortWithError(400, err)
return
}
defer os.RemoveAll(tempdir)
keypath := filepath.Join(tempdir, "key")
keyfile, e := os.Create(keypath)
if e != nil {
c.AbortWithError(400, e)
return
}
if _, e = keyfile.WriteString(b.GpgKeyArmor); e != nil {
c.AbortWithError(400, e)
}
args = append(args, "--import", keypath)
}
if len(b.GpgKeyID) > 0 {
keys := strings.Fields(b.GpgKeyID)
args = append(args, "--recv-keys")
args = append(args, keys...)
}
finder := pgp.GPG1Finder()
gpg, _, err := finder.FindGPG()
if err != nil {
c.AbortWithError(400, err)
return
}
// it might happened that we have a situation with an erroneous
// gpg command (e.g. when GpgKeyID and GpgKeyArmor is set).
// there is no error handling for such as gpg will do this for us
cmd := exec.Command(gpg, args...)
fmt.Printf("running %s %s\n", gpg, strings.Join(args, " "))
cmd.Stdout = os.Stdout
if err = cmd.Run(); err != nil {
c.AbortWithError(400, err)
return
}
c.JSON(200, gin.H{})
}
+1 -11
View File
@@ -21,17 +21,7 @@ func apiGraph(c *gin.Context) {
ext := c.Params.ByName("ext")
layout := c.Request.URL.Query().Get("layout")
factory := context.CollectionFactory()
factory.RemoteRepoCollection().Lock()
defer factory.RemoteRepoCollection().Unlock()
factory.LocalRepoCollection().Lock()
defer factory.LocalRepoCollection().Unlock()
factory.SnapshotCollection().Lock()
defer factory.SnapshotCollection().Unlock()
factory.PublishedRepoCollection().Lock()
defer factory.PublishedRepoCollection().Unlock()
factory := context.NewCollectionFactory()
graph, err := deb.BuildGraph(factory, layout)
if err != nil {
+103
View File
@@ -0,0 +1,103 @@
package api
import (
"fmt"
"math"
"strconv"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
apiRequestsInFlightGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "aptly_api_http_requests_in_flight",
Help: "Number of concurrent HTTP api requests currently handled.",
},
[]string{"method", "path"},
)
apiRequestsTotalCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "aptly_api_http_requests_total",
Help: "Total number of api requests.",
},
[]string{"code", "method", "path"},
)
apiRequestSizeSummary = promauto.NewSummaryVec(
prometheus.SummaryOpts{
Name: "aptly_api_http_request_size_bytes",
Help: "Api HTTP request size in bytes.",
},
[]string{"code", "method", "path"},
)
apiResponseSizeSummary = promauto.NewSummaryVec(
prometheus.SummaryOpts{
Name: "aptly_api_http_response_size_bytes",
Help: "Api HTTP response size in bytes.",
},
[]string{"code", "method", "path"},
)
apiRequestsDurationSummary = promauto.NewSummaryVec(
prometheus.SummaryOpts{
Name: "aptly_api_http_request_duration_seconds",
Help: "Duration of api requests in seconds.",
},
[]string{"code", "method", "path"},
)
)
// Only use base path as label value (e.g.: /api/repos) because of time series cardinality
// See https://prometheus.io/docs/practices/naming/#labels
func getBasePath(c *gin.Context) string {
return fmt.Sprintf("%s%s", getURLSegment(c.Request.URL.Path, 0), getURLSegment(c.Request.URL.Path, 1))
}
func getURLSegment(url string, idx int) string {
var urlSegments = strings.Split(url, "/")
// Remove segment at index 0 because it's an empty string
var segmentAtIndex = urlSegments[1:cap(urlSegments)][idx]
return fmt.Sprintf("/%s", segmentAtIndex)
}
func instrumentHandlerInFlight(g *prometheus.GaugeVec, pathFunc func(*gin.Context) string) func(*gin.Context) {
return func(c *gin.Context) {
g.WithLabelValues(c.Request.Method, pathFunc(c)).Inc()
defer g.WithLabelValues(c.Request.Method, pathFunc(c)).Dec()
c.Next()
}
}
func instrumentHandlerCounter(counter *prometheus.CounterVec, pathFunc func(*gin.Context) string) func(*gin.Context) {
return func(c *gin.Context) {
c.Next()
counter.WithLabelValues(strconv.Itoa(c.Writer.Status()), c.Request.Method, pathFunc(c)).Inc()
}
}
func instrumentHandlerRequestSize(obs prometheus.ObserverVec, pathFunc func(*gin.Context) string) func(*gin.Context) {
return func(c *gin.Context) {
c.Next()
obs.WithLabelValues(strconv.Itoa(c.Writer.Status()), c.Request.Method, pathFunc(c)).Observe(float64(c.Request.ContentLength))
}
}
func instrumentHandlerResponseSize(obs prometheus.ObserverVec, pathFunc func(*gin.Context) string) func(*gin.Context) {
return func(c *gin.Context) {
c.Next()
var responseSize = math.Max(float64(c.Writer.Size()), 0)
obs.WithLabelValues(strconv.Itoa(c.Writer.Status()), c.Request.Method, pathFunc(c)).Observe(responseSize)
}
}
func instrumentHandlerDuration(obs prometheus.ObserverVec, pathFunc func(*gin.Context) string) func(*gin.Context) {
return func(c *gin.Context) {
now := time.Now()
c.Next()
obs.WithLabelValues(strconv.Itoa(c.Writer.Status()), c.Request.Method, pathFunc(c)).Observe(time.Since(now).Seconds())
}
}
+556
View File
@@ -0,0 +1,556 @@
package api
import (
"fmt"
"log"
"net/http"
"sort"
"strings"
"sync"
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/deb"
"github.com/aptly-dev/aptly/pgp"
"github.com/aptly-dev/aptly/query"
"github.com/aptly-dev/aptly/task"
"github.com/gin-gonic/gin"
)
func getVerifier(ignoreSignatures bool, keyRings []string) (pgp.Verifier, error) {
if ignoreSignatures {
return nil, nil
}
verifier := context.GetVerifier()
for _, keyRing := range keyRings {
verifier.AddKeyring(keyRing)
}
err := verifier.InitKeyring()
if err != nil {
return nil, err
}
return verifier, nil
}
// GET /api/mirrors
func apiMirrorsList(c *gin.Context) {
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.RemoteRepoCollection()
result := []*deb.RemoteRepo{}
collection.ForEach(func(repo *deb.RemoteRepo) error {
result = append(result, repo)
return nil
})
c.JSON(200, result)
}
// POST /api/mirrors
func apiMirrorsCreate(c *gin.Context) {
var err error
var b struct {
Name string `binding:"required"`
ArchiveURL string `binding:"required"`
Distribution string
Filter string
Components []string
Architectures []string
Keyrings []string
DownloadSources bool
DownloadUdebs bool
DownloadInstaller bool
FilterWithDeps bool
SkipComponentCheck bool
IgnoreSignatures bool
}
b.DownloadSources = context.Config().DownloadSourcePackages
b.IgnoreSignatures = context.Config().GpgDisableVerify
b.Architectures = context.ArchitecturesList()
if c.Bind(&b) != nil {
return
}
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.RemoteRepoCollection()
if strings.HasPrefix(b.ArchiveURL, "ppa:") {
b.ArchiveURL, b.Distribution, b.Components, err = deb.ParsePPA(b.ArchiveURL, context.Config())
if err != nil {
c.AbortWithError(400, err)
return
}
}
if b.Filter != "" {
_, err = query.Parse(b.Filter)
if err != nil {
c.AbortWithError(400, fmt.Errorf("unable to create mirror: %s", err))
return
}
}
repo, err := deb.NewRemoteRepo(b.Name, b.ArchiveURL, b.Distribution, b.Components, b.Architectures,
b.DownloadSources, b.DownloadUdebs, b.DownloadInstaller)
if err != nil {
c.AbortWithError(400, fmt.Errorf("unable to create mirror: %s", err))
return
}
repo.Filter = b.Filter
repo.FilterWithDeps = b.FilterWithDeps
repo.SkipComponentCheck = b.SkipComponentCheck
repo.DownloadSources = b.DownloadSources
repo.DownloadUdebs = b.DownloadUdebs
verifier, err := getVerifier(b.IgnoreSignatures, b.Keyrings)
if err != nil {
c.AbortWithError(400, fmt.Errorf("unable to initialize GPG verifier: %s", err))
return
}
downloader := context.NewDownloader(nil)
err = repo.Fetch(downloader, verifier)
if err != nil {
c.AbortWithError(400, fmt.Errorf("unable to fetch mirror: %s", err))
return
}
err = collection.Add(repo)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to add mirror: %s", err))
return
}
c.JSON(201, repo)
}
// DELETE /api/mirrors/:name
func apiMirrorsDrop(c *gin.Context) {
name := c.Params.ByName("name")
force := c.Request.URL.Query().Get("force") == "1"
collectionFactory := context.NewCollectionFactory()
mirrorCollection := collectionFactory.RemoteRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
repo, err := mirrorCollection.ByName(name)
if err != nil {
c.AbortWithError(404, fmt.Errorf("unable to drop: %s", err))
return
}
resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Delete mirror %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
err := repo.CheckLock()
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
}
if !force {
snapshots := snapshotCollection.ByRemoteRepoSource(repo)
if len(snapshots) > 0 {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override")
}
}
err = mirrorCollection.Drop(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
}
return &task.ProcessReturnValue{Code: http.StatusNoContent, Value: nil}, nil
})
}
// GET /api/mirrors/:name
func apiMirrorsShow(c *gin.Context) {
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.RemoteRepoCollection()
name := c.Params.ByName("name")
repo, err := collection.ByName(name)
if err != nil {
c.AbortWithError(404, fmt.Errorf("unable to show: %s", err))
return
}
err = collection.LoadComplete(repo)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to show: %s", err))
}
c.JSON(200, repo)
}
// GET /api/mirrors/:name/packages
func apiMirrorsPackages(c *gin.Context) {
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.RemoteRepoCollection()
name := c.Params.ByName("name")
repo, err := collection.ByName(name)
if err != nil {
c.AbortWithError(404, fmt.Errorf("unable to show: %s", err))
return
}
err = collection.LoadComplete(repo)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to show: %s", err))
}
if repo.LastDownloadDate.IsZero() {
c.AbortWithError(404, fmt.Errorf("unable to show package list, mirror hasn't been downloaded yet"))
return
}
reflist := repo.RefList()
result := []*deb.Package{}
list, err := deb.NewPackageListFromRefList(reflist, collectionFactory.PackageCollection(), nil)
if err != nil {
c.AbortWithError(404, err)
return
}
queryS := c.Request.URL.Query().Get("q")
if queryS != "" {
q, err := query.Parse(c.Request.URL.Query().Get("q"))
if err != nil {
c.AbortWithError(400, err)
return
}
withDeps := c.Request.URL.Query().Get("withDeps") == "1"
architecturesList := []string{}
if withDeps {
if len(context.ArchitecturesList()) > 0 {
architecturesList = context.ArchitecturesList()
} else {
architecturesList = list.Architectures(false)
}
sort.Strings(architecturesList)
if len(architecturesList) == 0 {
c.AbortWithError(400, fmt.Errorf("unable to determine list of architectures, please specify explicitly"))
return
}
}
list.PrepareIndex()
list, err = list.Filter([]deb.PackageQuery{q}, withDeps,
nil, context.DependencyOptions(), architecturesList)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to search: %s", err))
}
}
if c.Request.URL.Query().Get("format") == "details" {
list.ForEach(func(p *deb.Package) error {
result = append(result, p)
return nil
})
c.JSON(200, result)
} else {
c.JSON(200, list.Strings())
}
}
// PUT /api/mirrors/:name
func apiMirrorsUpdate(c *gin.Context) {
var (
err error
remote *deb.RemoteRepo
)
var b struct {
Name string
ArchiveURL string
Filter string
Architectures []string
Components []string
Keyrings []string
FilterWithDeps bool
DownloadSources bool
DownloadUdebs bool
SkipComponentCheck bool
IgnoreChecksums bool
IgnoreSignatures bool
ForceUpdate bool
SkipExistingPackages bool
}
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.RemoteRepoCollection()
remote, err = collection.ByName(c.Params.ByName("name"))
if err != nil {
c.AbortWithError(404, err)
return
}
b.Name = remote.Name
b.DownloadUdebs = remote.DownloadUdebs
b.DownloadSources = remote.DownloadSources
b.SkipComponentCheck = remote.SkipComponentCheck
b.FilterWithDeps = remote.FilterWithDeps
b.Filter = remote.Filter
b.Architectures = remote.Architectures
b.Components = remote.Components
log.Printf("%s: Starting mirror update\n", b.Name)
if c.Bind(&b) != nil {
return
}
if b.Name != remote.Name {
_, err = collection.ByName(b.Name)
if err == nil {
c.AbortWithError(409, fmt.Errorf("unable to rename: mirror %s already exists", b.Name))
return
}
}
if b.DownloadUdebs != remote.DownloadUdebs {
if remote.IsFlat() && b.DownloadUdebs {
c.AbortWithError(400, fmt.Errorf("unable to update: flat mirrors don't support udebs"))
return
}
}
if b.ArchiveURL != "" {
remote.SetArchiveRoot(b.ArchiveURL)
}
remote.Name = b.Name
remote.DownloadUdebs = b.DownloadUdebs
remote.DownloadSources = b.DownloadSources
remote.SkipComponentCheck = b.SkipComponentCheck
remote.FilterWithDeps = b.FilterWithDeps
remote.Filter = b.Filter
remote.Architectures = b.Architectures
remote.Components = b.Components
verifier, err := getVerifier(b.IgnoreSignatures, b.Keyrings)
if err != nil {
c.AbortWithError(400, fmt.Errorf("unable to initialize GPG verifier: %s", err))
return
}
resources := []string{string(remote.Key())}
maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
downloader := context.NewDownloader(out)
err := remote.Fetch(downloader, verifier)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
if !b.ForceUpdate {
err = remote.CheckLock()
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
}
err = remote.DownloadPackageIndexes(out, downloader, verifier, collectionFactory, b.SkipComponentCheck)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
if remote.Filter != "" {
var filterQuery deb.PackageQuery
filterQuery, err = query.Parse(remote.Filter)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
_, _, err = remote.ApplyFilter(context.DependencyOptions(), filterQuery, out)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
}
queue, downloadSize, err := remote.BuildDownloadQueue(context.PackagePool(), collectionFactory.PackageCollection(),
collectionFactory.ChecksumCollection(nil), b.SkipExistingPackages)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
defer func() {
// on any interruption, unlock the mirror
e := context.ReOpenDatabase()
if e == nil {
remote.MarkAsIdle()
collection.Update(remote)
}
}()
remote.MarkAsUpdating()
err = collection.Update(remote)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
context.GoContextHandleSignals()
count := len(queue)
taskDetail := struct {
TotalDownloadSize int64
RemainingDownloadSize int64
TotalNumberOfPackages int
RemainingNumberOfPackages int
}{
downloadSize, downloadSize, count, count,
}
detail.Store(taskDetail)
downloadQueue := make(chan int)
taskFinished := make(chan *deb.PackageDownloadTask)
var (
errors []string
errLock sync.Mutex
)
pushError := func(err error) {
errLock.Lock()
errors = append(errors, err.Error())
errLock.Unlock()
}
go func() {
for idx := range queue {
select {
case downloadQueue <- idx:
case <-context.Done():
return
}
}
close(downloadQueue)
}()
// update of task details need to be done in order
go func() {
for {
task, ok := <-taskFinished
if !ok {
return
}
taskDetail.RemainingDownloadSize -= task.File.Checksums.Size
taskDetail.RemainingNumberOfPackages--
detail.Store(taskDetail)
}
}()
log.Printf("%s: Spawning background processes...\n", b.Name)
var wg sync.WaitGroup
for i := 0; i < context.Config().DownloadConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case idx, ok := <-downloadQueue:
if !ok {
return
}
task := &queue[idx]
var e error
// provision download location
task.TempDownPath, e = context.PackagePool().(aptly.LocalPackagePool).GenerateTempPath(task.File.Filename)
if e != nil {
pushError(e)
continue
}
// download file...
e = context.Downloader().DownloadWithChecksum(
context,
remote.PackageURL(task.File.DownloadURL()).String(),
task.TempDownPath,
&task.File.Checksums,
b.IgnoreChecksums)
if e != nil {
pushError(e)
continue
}
task.Done = true
taskFinished <- task
case <-context.Done():
return
}
}
}()
}
// Wait for all download goroutines to finish
log.Printf("%s: Waiting for background processes to finish...\n", b.Name)
wg.Wait()
log.Printf("%s: Background processes finished\n", b.Name)
close(taskFinished)
for idx := range queue {
atask := &queue[idx]
if !atask.Done {
// download not finished yet
continue
}
// and import it back to the pool
atask.File.PoolPath, err = context.PackagePool().Import(atask.TempDownPath, atask.File.Filename, &atask.File.Checksums, true, collectionFactory.ChecksumCollection(nil))
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to import file: %s", err)
}
// update "attached" files if any
for _, additionalAtask := range atask.Additional {
additionalAtask.File.PoolPath = atask.File.PoolPath
additionalAtask.File.Checksums = atask.File.Checksums
}
}
select {
case <-context.Done():
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: interrupted")
default:
}
if len(errors) > 0 {
log.Printf("%s: Unable to update because of previous errors\n", b.Name)
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: download errors:\n %s", strings.Join(errors, "\n "))
}
log.Printf("%s: Finalizing download\n", b.Name)
remote.FinalizeDownload(collectionFactory, out)
err = collectionFactory.RemoteRepoCollection().Update(remote)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
log.Printf("%s: Mirror updated successfully!\n", b.Name)
return &task.ProcessReturnValue{Code: http.StatusNoContent, Value: nil}, nil
})
}
+40
View File
@@ -0,0 +1,40 @@
package api
import (
"bytes"
"encoding/json"
"github.com/gin-gonic/gin"
. "gopkg.in/check.v1"
)
type MirrorSuite struct {
ApiSuite
}
var _ = Suite(&MirrorSuite{})
func (s *MirrorSuite) TestGetMirrors(c *C) {
response, _ := s.HTTPRequest("GET", "/api/mirrors", nil)
c.Check(response.Code, Equals, 200)
c.Check(response.Body.String(), Equals, "[]")
}
func (s *MirrorSuite) TestDeleteMirrorNonExisting(c *C) {
response, _ := s.HTTPRequest("DELETE", "/api/mirrors/does-not-exist", nil)
c.Check(response.Code, Equals, 404)
c.Check(response.Body.String(), Equals, "{\"error\":\"unable to drop: mirror with name does-not-exist not found\"}")
}
func (s *MirrorSuite) TestCreateMirror(c *C) {
c.ExpectFailure("Need to mock downloads")
body, err := json.Marshal(gin.H{
"Name": "dummy",
"ArchiveURL": "foobar",
})
c.Assert(err, IsNil)
response, err := s.HTTPRequest("POST", "/api/mirrors", bytes.NewReader(body))
c.Assert(err, IsNil)
c.Check(response.Code, Equals, 400)
c.Check(response.Body.String(), Equals, "")
}
+2 -1
View File
@@ -6,7 +6,8 @@ import (
// GET /api/packages/:key
func apiPackagesShow(c *gin.Context) {
p, err := context.CollectionFactory().PackageCollection().ByKey([]byte(c.Params.ByName("key")))
collectionFactory := context.NewCollectionFactory()
p, err := collectionFactory.PackageCollection().ByKey([]byte(c.Params.ByName("key")))
if err != nil {
c.AbortWithError(404, err)
return
+120 -105
View File
@@ -2,10 +2,13 @@ package api
import (
"fmt"
"net/http"
"strings"
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/deb"
"github.com/aptly-dev/aptly/pgp"
"github.com/aptly-dev/aptly/task"
"github.com/aptly-dev/aptly/utils"
"github.com/gin-gonic/gin"
)
@@ -51,22 +54,13 @@ func parseEscapedPath(path string) string {
// GET /publish
func apiPublishList(c *gin.Context) {
localCollection := context.CollectionFactory().LocalRepoCollection()
localCollection.RLock()
defer localCollection.RUnlock()
snapshotCollection := context.CollectionFactory().SnapshotCollection()
snapshotCollection.RLock()
defer snapshotCollection.RUnlock()
collection := context.CollectionFactory().PublishedRepoCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
result := make([]*deb.PublishedRepo, 0, collection.Len())
err := collection.ForEach(func(repo *deb.PublishedRepo) error {
err := collection.LoadComplete(repo, context.CollectionFactory())
err := collection.LoadComplete(repo, collectionFactory)
if err != nil {
return err
}
@@ -102,6 +96,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
ButAutomaticUpgrades string
ForceOverwrite bool
SkipContents *bool
SkipBz2 *bool
Architectures []string
Signing SigningOptions
AcquireByHash *bool
@@ -123,17 +118,19 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
}
var components []string
var names []string
var sources []interface{}
var resources []string
collectionFactory := context.NewCollectionFactory()
if b.SourceKind == "snapshot" {
var snapshot *deb.Snapshot
snapshotCollection := context.CollectionFactory().SnapshotCollection()
snapshotCollection.Lock()
defer snapshotCollection.Unlock()
snapshotCollection := collectionFactory.SnapshotCollection()
for _, source := range b.Sources {
components = append(components, source.Component)
names = append(names, source.Name)
snapshot, err = snapshotCollection.ByName(source.Name)
if err != nil {
@@ -141,6 +138,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
return
}
resources = append(resources, string(snapshot.ResourceKey()))
err = snapshotCollection.LoadComplete(snapshot)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to publish: %s", err))
@@ -152,12 +150,11 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
} else if b.SourceKind == deb.SourceLocalRepo {
var localRepo *deb.LocalRepo
localCollection := context.CollectionFactory().LocalRepoCollection()
localCollection.Lock()
defer localCollection.Unlock()
localCollection := collectionFactory.LocalRepoCollection()
for _, source := range b.Sources {
components = append(components, source.Component)
names = append(names, source.Name)
localRepo, err = localCollection.ByName(source.Name)
if err != nil {
@@ -165,6 +162,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
return
}
resources = append(resources, string(localRepo.Key()))
err = localCollection.LoadComplete(localRepo)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to publish: %s", err))
@@ -177,55 +175,68 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
return
}
collection := context.CollectionFactory().PublishedRepoCollection()
collection.Lock()
defer collection.Unlock()
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, context.CollectionFactory())
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to publish: %s", err))
return
}
if b.Origin != "" {
published.Origin = b.Origin
}
if b.NotAutomatic != "" {
published.NotAutomatic = b.NotAutomatic
}
if b.ButAutomaticUpgrades != "" {
published.ButAutomaticUpgrades = b.ButAutomaticUpgrades
}
published.Label = b.Label
published.SkipContents = context.Config().SkipContentsPublishing
if b.SkipContents != nil {
published.SkipContents = *b.SkipContents
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
duplicate := collection.CheckDuplicate(published)
if duplicate != nil {
context.CollectionFactory().PublishedRepoCollection().LoadComplete(duplicate, context.CollectionFactory())
c.AbortWithError(400, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate))
return
}
err = published.Publish(context.PackagePool(), context, context.CollectionFactory(), signer, nil, b.ForceOverwrite)
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to publish: %s", err))
return
}
err = collection.Add(published)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to save to DB: %s", err))
return
}
resources = append(resources, string(published.Key()))
collection := collectionFactory.PublishedRepoCollection()
c.JSON(201, published)
taskName := fmt.Sprintf("Publish %s: %s", b.SourceKind, strings.Join(names, ", "))
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
taskDetail := task.PublishDetail{
Detail: detail,
}
publishOutput := &task.PublishOutput{
Progress: out,
PublishDetail: taskDetail,
}
if b.Origin != "" {
published.Origin = b.Origin
}
if b.NotAutomatic != "" {
published.NotAutomatic = b.NotAutomatic
}
if b.ButAutomaticUpgrades != "" {
published.ButAutomaticUpgrades = b.ButAutomaticUpgrades
}
published.Label = b.Label
published.SkipContents = context.Config().SkipContentsPublishing
if b.SkipContents != nil {
published.SkipContents = *b.SkipContents
}
published.SkipBz2 = context.Config().SkipBz2Publishing
if b.SkipContents != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
duplicate := collection.CheckDuplicate(published)
if duplicate != nil {
collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory)
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate)
}
err := published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
}
err = collection.Add(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
return &task.ProcessReturnValue{Code: http.StatusCreated, Value: published}, nil
})
}
// PUT /publish/:prefix/:distribution
@@ -238,6 +249,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
ForceOverwrite bool
Signing SigningOptions
SkipContents *bool
SkipBz2 *bool
SkipCleanup *bool
Snapshots []struct {
Component string `binding:"required"`
@@ -256,31 +268,23 @@ func apiPublishUpdateSwitch(c *gin.Context) {
return
}
// published.LoadComplete would touch local repo collection
localRepoCollection := context.CollectionFactory().LocalRepoCollection()
localRepoCollection.Lock()
defer localRepoCollection.Unlock()
snapshotCollection := context.CollectionFactory().SnapshotCollection()
snapshotCollection.Lock()
defer snapshotCollection.Unlock()
collection := context.CollectionFactory().PublishedRepoCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
c.AbortWithError(404, fmt.Errorf("unable to update: %s", err))
return
}
err = collection.LoadComplete(published, context.CollectionFactory())
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to update: %s", err))
return
}
var updatedComponents []string
var updatedSnapshots []string
var resources []string
if published.SourceKind == deb.SourceLocalRepo {
if len(b.Snapshots) > 0 {
@@ -299,6 +303,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
return
}
snapshotCollection := collectionFactory.SnapshotCollection()
snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name)
if err2 != nil {
c.AbortWithError(404, err2)
@@ -313,6 +318,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
published.UpdateSnapshot(snapshotInfo.Component, snapshot)
updatedComponents = append(updatedComponents, snapshotInfo.Component)
updatedSnapshots = append(updatedSnapshots, snapshot.Name)
}
} else {
c.AbortWithError(500, fmt.Errorf("unknown published repository type"))
@@ -323,32 +329,37 @@ func apiPublishUpdateSwitch(c *gin.Context) {
published.SkipContents = *b.SkipContents
}
if b.SkipBz2 != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
err = published.Publish(context.PackagePool(), context, context.CollectionFactory(), signer, nil, b.ForceOverwrite)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to update: %s", err))
return
}
err = collection.Update(published)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to save to DB: %s", err))
return
}
if b.SkipCleanup == nil || !*b.SkipCleanup {
err = collection.CleanupPrefixComponentFiles(published.Prefix, updatedComponents,
context.GetPublishedStorage(storage), context.CollectionFactory(), nil)
resources = append(resources, string(published.Key()))
taskName := fmt.Sprintf("Update published %s (%s): %s", published.SourceKind, strings.Join(updatedComponents, " "), strings.Join(updatedSnapshots, ", "))
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
err := published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to update: %s", err))
return
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
}
c.JSON(200, published)
err = collection.Update(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
if b.SkipCleanup == nil || !*b.SkipCleanup {
err = collection.CleanupPrefixComponentFiles(published.Prefix, updatedComponents,
context.GetPublishedStorage(storage), collectionFactory, out)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
}
return &task.ProcessReturnValue{Code: http.StatusOK, Value: published}, nil
})
}
// DELETE /publish/:prefix/:distribution
@@ -360,21 +371,25 @@ func apiPublishDrop(c *gin.Context) {
storage, prefix := deb.ParsePrefix(param)
distribution := c.Params.ByName("distribution")
// published.LoadComplete would touch local repo collection
localRepoCollection := context.CollectionFactory().LocalRepoCollection()
localRepoCollection.Lock()
defer localRepoCollection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
collection := context.CollectionFactory().PublishedRepoCollection()
collection.Lock()
defer collection.Unlock()
err := collection.Remove(context, storage, prefix, distribution,
context.CollectionFactory(), context.Progress(), force, skipCleanup)
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to drop: %s", err))
c.AbortWithError(http.StatusInternalServerError, fmt.Errorf("unable to drop: %s", err))
return
}
c.JSON(200, gin.H{})
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Delete published %s (%s)", prefix, distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
err := collection.Remove(context, storage, prefix, distribution,
collectionFactory, out, force, skipCleanup)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %s", err)
}
return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, nil
})
}
+236 -185
View File
@@ -2,13 +2,17 @@ package api
import (
"fmt"
"net/http"
"os"
"path/filepath"
"strings"
"text/template"
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/deb"
"github.com/aptly-dev/aptly/query"
"github.com/aptly-dev/aptly/task"
"github.com/aptly-dev/aptly/utils"
"github.com/gin-gonic/gin"
)
@@ -17,11 +21,9 @@ import (
func apiReposList(c *gin.Context) {
result := []*deb.LocalRepo{}
collection := context.CollectionFactory().LocalRepoCollection()
collection.RLock()
defer collection.RUnlock()
context.CollectionFactory().LocalRepoCollection().ForEach(func(r *deb.LocalRepo) error {
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
collection.ForEach(func(r *deb.LocalRepo) error {
result = append(result, r)
return nil
})
@@ -46,11 +48,9 @@ func apiReposCreate(c *gin.Context) {
repo.DefaultComponent = b.DefaultComponent
repo.DefaultDistribution = b.DefaultDistribution
collection := context.CollectionFactory().LocalRepoCollection()
collection.Lock()
defer collection.Unlock()
err := context.CollectionFactory().LocalRepoCollection().Add(repo)
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
err := collection.Add(repo)
if err != nil {
c.AbortWithError(400, err)
return
@@ -62,6 +62,7 @@ func apiReposCreate(c *gin.Context) {
// PUT /api/repos/:name
func apiReposEdit(c *gin.Context) {
var b struct {
Name *string
Comment *string
DefaultDistribution *string
DefaultComponent *string
@@ -71,9 +72,8 @@ func apiReposEdit(c *gin.Context) {
return
}
collection := context.CollectionFactory().LocalRepoCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
repo, err := collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -81,6 +81,15 @@ func apiReposEdit(c *gin.Context) {
return
}
if b.Name != nil {
_, err := collection.ByName(*b.Name)
if err == nil {
// already exists
c.AbortWithError(404, err)
return
}
repo.Name = *b.Name
}
if b.Comment != nil {
repo.Comment = *b.Comment
}
@@ -102,9 +111,8 @@ func apiReposEdit(c *gin.Context) {
// GET /api/repos/:name
func apiReposShow(c *gin.Context) {
collection := context.CollectionFactory().LocalRepoCollection()
collection.RLock()
defer collection.RUnlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
repo, err := collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -118,53 +126,42 @@ func apiReposShow(c *gin.Context) {
// DELETE /api/repos/:name
func apiReposDrop(c *gin.Context) {
force := c.Request.URL.Query().Get("force") == "1"
name := c.Params.ByName("name")
collection := context.CollectionFactory().LocalRepoCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
publishedCollection := collectionFactory.PublishedRepoCollection()
snapshotCollection := context.CollectionFactory().SnapshotCollection()
snapshotCollection.RLock()
defer snapshotCollection.RUnlock()
publishedCollection := context.CollectionFactory().PublishedRepoCollection()
publishedCollection.RLock()
defer publishedCollection.RUnlock()
repo, err := collection.ByName(c.Params.ByName("name"))
repo, err := collection.ByName(name)
if err != nil {
c.AbortWithError(404, err)
return
}
published := publishedCollection.ByLocalRepo(repo)
if len(published) > 0 {
c.AbortWithError(409, fmt.Errorf("unable to drop, local repo is published"))
return
}
if !force {
snapshots := snapshotCollection.ByLocalRepoSource(repo)
if len(snapshots) > 0 {
c.AbortWithError(409, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override"))
return
resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Delete repo %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
published := publishedCollection.ByLocalRepo(repo)
if len(published) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo is published")
}
}
err = collection.Drop(repo)
if err != nil {
c.AbortWithError(500, err)
return
}
if !force {
snapshots := snapshotCollection.ByLocalRepoSource(repo)
if len(snapshots) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override")
}
}
c.JSON(200, gin.H{})
return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, collection.Drop(repo)
})
}
// GET /api/repos/:name/packages
func apiReposPackagesShow(c *gin.Context) {
collection := context.CollectionFactory().LocalRepoCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
repo, err := collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -178,11 +175,11 @@ func apiReposPackagesShow(c *gin.Context) {
return
}
showPackages(c, repo.RefList())
showPackages(c, repo.RefList(), collectionFactory)
}
// Handler for both add and delete
func apiReposPackagesAddDelete(c *gin.Context, cb func(list *deb.PackageList, p *deb.Package) error) {
func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(list *deb.PackageList, p *deb.Package, out aptly.Progress) error) {
var b struct {
PackageRefs []string
}
@@ -191,9 +188,8 @@ func apiReposPackagesAddDelete(c *gin.Context, cb func(list *deb.PackageList, p
return
}
collection := context.CollectionFactory().LocalRepoCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
repo, err := collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -207,54 +203,54 @@ func apiReposPackagesAddDelete(c *gin.Context, cb func(list *deb.PackageList, p
return
}
list, err := deb.NewPackageListFromRefList(repo.RefList(), context.CollectionFactory().PackageCollection(), nil)
if err != nil {
c.AbortWithError(500, err)
return
}
// verify package refs and build package list
for _, ref := range b.PackageRefs {
var p *deb.Package
p, err = context.CollectionFactory().PackageCollection().ByKey([]byte(ref))
resources := []string{string(repo.Key())}
maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
out.Printf("Loading packages...\n")
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
if err != nil {
if err == database.ErrNotFound {
c.AbortWithError(404, fmt.Errorf("package %s: %s", ref, err))
} else {
c.AbortWithError(500, err)
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
// verify package refs and build package list
for _, ref := range b.PackageRefs {
var p *deb.Package
p, err = collectionFactory.PackageCollection().ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("packages %s: %s", ref, err)
}
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = cb(list, p, out)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
}
return
}
err = cb(list, p)
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
err = collectionFactory.LocalRepoCollection().Update(repo)
if err != nil {
c.AbortWithError(400, err)
return
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
}
}
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
err = context.CollectionFactory().LocalRepoCollection().Update(repo)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to save: %s", err))
return
}
c.JSON(200, repo)
return &task.ProcessReturnValue{Code: http.StatusOK, Value: repo}, nil
})
}
// POST /repos/:name/packages
func apiReposPackagesAdd(c *gin.Context) {
apiReposPackagesAddDelete(c, func(list *deb.PackageList, p *deb.Package) error {
apiReposPackagesAddDelete(c, "Add packages to repo ", func(list *deb.PackageList, p *deb.Package, out aptly.Progress) error {
out.Printf("Adding package %s\n", p.Name)
return list.Add(p)
})
}
// DELETE /repos/:name/packages
func apiReposPackagesDelete(c *gin.Context) {
apiReposPackagesAddDelete(c, func(list *deb.PackageList, p *deb.Package) error {
apiReposPackagesAddDelete(c, "Delete packages from repo ", func(list *deb.PackageList, p *deb.Package, out aptly.Progress) error {
out.Printf("Removing package %s\n", p.Name)
list.Remove(p)
return nil
})
@@ -275,17 +271,18 @@ func apiReposPackageFromDir(c *gin.Context) {
return
}
dirParam := c.Params.ByName("dir")
fileParam := c.Params.ByName("file")
if fileParam != "" && !verifyPath(fileParam) {
c.AbortWithError(400, fmt.Errorf("wrong file"))
return
}
collection := context.CollectionFactory().LocalRepoCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
repo, err := collection.ByName(c.Params.ByName("name"))
name := c.Params.ByName("name")
repo, err := collection.ByName(name)
if err != nil {
c.AbortWithError(404, err)
return
@@ -297,75 +294,91 @@ func apiReposPackageFromDir(c *gin.Context) {
return
}
verifier := context.GetVerifier()
var (
sources []string
packageFiles, failedFiles []string
otherFiles []string
processedFiles, failedFiles2 []string
reporter = &aptly.RecordingResultReporter{
Warnings: []string{},
AddedLines: []string{},
RemovedLines: []string{},
}
list *deb.PackageList
)
var taskName string
var sources []string
if fileParam == "" {
sources = []string{filepath.Join(context.UploadPath(), c.Params.ByName("dir"))}
taskName = fmt.Sprintf("Add packages from dir %s to repo %s", dirParam, name)
sources = []string{filepath.Join(context.UploadPath(), dirParam)}
} else {
sources = []string{filepath.Join(context.UploadPath(), c.Params.ByName("dir"), c.Params.ByName("file"))}
sources = []string{filepath.Join(context.UploadPath(), dirParam, fileParam)}
taskName = fmt.Sprintf("Add package %s from dir %s to repo %s", fileParam, dirParam, name)
}
packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter)
resources := []string{string(repo.Key())}
resources = append(resources, sources...)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
verifier := context.GetVerifier()
list, err = deb.NewPackageListFromRefList(repo.RefList(), context.CollectionFactory().PackageCollection(), nil)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to load packages: %s", err))
return
}
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
context.CollectionFactory().PackageCollection(), reporter, nil, context.CollectionFactory().ChecksumCollection())
failedFiles = append(failedFiles, failedFiles2...)
processedFiles = append(processedFiles, otherFiles...)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to import package files: %s", err))
return
}
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
err = context.CollectionFactory().LocalRepoCollection().Update(repo)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to save: %s", err))
return
}
if !noRemove {
processedFiles = utils.StrSliceDeduplicate(processedFiles)
for _, file := range processedFiles {
err := os.Remove(file)
if err != nil {
reporter.Warning("unable to remove file %s: %s", file, err)
var (
packageFiles, failedFiles []string
otherFiles []string
processedFiles, failedFiles2 []string
reporter = &aptly.RecordingResultReporter{
Warnings: []string{},
AddedLines: []string{},
RemovedLines: []string{},
}
list *deb.PackageList
)
packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter)
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages: %s", err)
}
// atempt to remove dir, if it fails, that's fine: probably it's not empty
os.Remove(filepath.Join(context.UploadPath(), c.Params.ByName("dir")))
}
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
collectionFactory.PackageCollection(), reporter, nil, collectionFactory.ChecksumCollection)
failedFiles = append(failedFiles, failedFiles2...)
processedFiles = append(processedFiles, otherFiles...)
if failedFiles == nil {
failedFiles = []string{}
}
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to import package files: %s", err)
}
c.JSON(200, gin.H{
"Report": reporter,
"FailedFiles": failedFiles,
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
err = collectionFactory.LocalRepoCollection().Update(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
}
if !noRemove {
processedFiles = utils.StrSliceDeduplicate(processedFiles)
for _, file := range processedFiles {
err := os.Remove(file)
if err != nil {
reporter.Warning("unable to remove file %s: %s", file, err)
}
}
// atempt to remove dir, if it fails, that's fine: probably it's not empty
os.Remove(filepath.Join(context.UploadPath(), dirParam))
}
if failedFiles == nil {
failedFiles = []string{}
}
if len(reporter.AddedLines) > 0 {
out.Printf("Added: %s\n", strings.Join(reporter.AddedLines, ", "))
}
if len(reporter.RemovedLines) > 0 {
out.Printf("Removed: %s\n", strings.Join(reporter.RemovedLines, ", "))
}
if len(reporter.Warnings) > 0 {
out.Printf("Warnings: %s\n", strings.Join(reporter.Warnings, ", "))
}
if len(failedFiles) > 0 {
out.Printf("Failed files: %s\n", strings.Join(failedFiles, ", "))
}
return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{
"Report": reporter,
"FailedFiles": failedFiles,
}}, nil
})
}
@@ -383,62 +396,100 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
ignoreSignature := c.Request.URL.Query().Get("ignoreSignature") == "1"
repoTemplateString := c.Params.ByName("name")
collectionFactory := context.NewCollectionFactory()
if !verifyDir(c) {
return
}
var sources []string
var taskName string
dirParam := c.Params.ByName("dir")
fileParam := c.Params.ByName("file")
if fileParam != "" && !verifyPath(fileParam) {
c.AbortWithError(400, fmt.Errorf("wrong file"))
return
}
var (
err error
verifier = context.GetVerifier()
sources, changesFiles []string
failedFiles, failedFiles2 []string
reporter = &aptly.RecordingResultReporter{
Warnings: []string{},
AddedLines: []string{},
RemovedLines: []string{},
}
)
if fileParam == "" {
sources = []string{filepath.Join(context.UploadPath(), c.Params.ByName("dir"))}
taskName = fmt.Sprintf("Include packages from changes files in dir %s to repo matching template %s", dirParam, repoTemplateString)
sources = []string{filepath.Join(context.UploadPath(), dirParam)}
} else {
sources = []string{filepath.Join(context.UploadPath(), c.Params.ByName("dir"), c.Params.ByName("file"))}
taskName = fmt.Sprintf("Include packages from changes file %s from dir %s to repo matching template %s", fileParam, dirParam, repoTemplateString)
sources = []string{filepath.Join(context.UploadPath(), dirParam, fileParam)}
}
localRepoCollection := context.CollectionFactory().LocalRepoCollection()
localRepoCollection.Lock()
defer localRepoCollection.Unlock()
changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter)
_, failedFiles2, err = deb.ImportChangesFiles(
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
repoTemplateString, context.Progress(), localRepoCollection, context.CollectionFactory().PackageCollection(),
context.PackagePool(), context.CollectionFactory().ChecksumCollection(), nil, query.Parse)
failedFiles = append(failedFiles, failedFiles2...)
repoTemplate, err := template.New("repo").Parse(repoTemplateString)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to import changes files: %s", err))
c.AbortWithError(400, fmt.Errorf("error parsing repo template: %s", err))
return
}
if !noRemoveFiles {
// atempt to remove dir, if it fails, that's fine: probably it's not empty
os.Remove(filepath.Join(context.UploadPath(), c.Params.ByName("dir")))
}
var resources []string
if len(repoTemplate.Tree.Root.Nodes) > 1 {
resources = append(resources, task.AllLocalReposResourcesKey)
} else {
// repo template string is simple text so only use resource key of specific repository
repo, err := collectionFactory.LocalRepoCollection().ByName(repoTemplateString)
if err != nil {
c.AbortWithError(404, err)
return
}
if failedFiles == nil {
failedFiles = []string{}
resources = append(resources, string(repo.Key()))
}
resources = append(resources, sources...)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
var (
err error
verifier = context.GetVerifier()
changesFiles []string
failedFiles, failedFiles2 []string
reporter = &aptly.RecordingResultReporter{
Warnings: []string{},
AddedLines: []string{},
RemovedLines: []string{},
}
)
changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter)
_, failedFiles2, err = deb.ImportChangesFiles(
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
repoTemplate, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(),
context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse)
failedFiles = append(failedFiles, failedFiles2...)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to import changes files: %s", err)
}
if !noRemoveFiles {
// atempt to remove dir, if it fails, that's fine: probably it's not empty
os.Remove(filepath.Join(context.UploadPath(), dirParam))
}
if failedFiles == nil {
failedFiles = []string{}
}
if len(reporter.AddedLines) > 0 {
out.Printf("Added: %s\n", strings.Join(reporter.AddedLines, ", "))
}
if len(reporter.RemovedLines) > 0 {
out.Printf("Removed: %s\n", strings.Join(reporter.RemovedLines, ", "))
}
if len(reporter.Warnings) > 0 {
out.Printf("Warnings: %s\n", strings.Join(reporter.Warnings, ", "))
}
if len(failedFiles) > 0 {
out.Printf("Failed files: %s\n", strings.Join(failedFiles, ", "))
}
return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{
"Report": reporter,
"FailedFiles": failedFiles,
}}, nil
c.JSON(200, gin.H{
"Report": reporter,
"FailedFiles": failedFiles,
})
}
+51 -7
View File
@@ -5,30 +5,46 @@ import (
ctx "github.com/aptly-dev/aptly/context"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var context *ctx.AptlyContext
func apiMetricsGet() gin.HandlerFunc {
return func(c *gin.Context) {
promhttp.Handler().ServeHTTP(c.Writer, c.Request)
}
}
// Router returns prebuilt with routes http.Handler
func Router(c *ctx.AptlyContext) http.Handler {
context = c
router := gin.Default()
router.UseRawPath = true
router.Use(gin.ErrorLogger())
if c.Config().EnableMetricsEndpoint {
router.Use(instrumentHandlerInFlight(apiRequestsInFlightGauge, getBasePath))
router.Use(instrumentHandlerCounter(apiRequestsTotalCounter, getBasePath))
router.Use(instrumentHandlerRequestSize(apiRequestSizeSummary, getBasePath))
router.Use(instrumentHandlerResponseSize(apiResponseSizeSummary, getBasePath))
router.Use(instrumentHandlerDuration(apiRequestsDurationSummary, getBasePath))
}
if context.Flags().Lookup("no-lock").Value.Get().(bool) {
// We use a goroutine to count the number of
// concurrent requests. When no more requests are
// running, we close the database to free the lock.
requests := make(chan dbRequest)
dbRequests = make(chan dbRequest)
go acquireDatabase(requests)
go acquireDatabase()
router.Use(func(c *gin.Context) {
var err error
errCh := make(chan error)
requests <- dbRequest{acquiredb, errCh}
dbRequests <- dbRequest{acquiredb, errCh}
err = <-errCh
if err != nil {
@@ -37,7 +53,7 @@ func Router(c *ctx.AptlyContext) http.Handler {
}
defer func() {
requests <- dbRequest{releasedb, errCh}
dbRequests <- dbRequest{releasedb, errCh}
err = <-errCh
if err != nil {
c.AbortWithError(500, err)
@@ -46,14 +62,14 @@ func Router(c *ctx.AptlyContext) http.Handler {
c.Next()
})
} else {
go cacheFlusher()
}
root := router.Group("/api")
{
if c.Config().EnableMetricsEndpoint {
root.GET("/metrics", apiMetricsGet())
}
root.GET("/version", apiVersion)
}
@@ -81,6 +97,19 @@ func Router(c *ctx.AptlyContext) http.Handler {
root.POST("/mirrors/:name/snapshots", apiSnapshotsCreateFromMirror)
}
{
root.GET("/mirrors", apiMirrorsList)
root.GET("/mirrors/:name", apiMirrorsShow)
root.GET("/mirrors/:name/packages", apiMirrorsPackages)
root.POST("/mirrors", apiMirrorsCreate)
root.PUT("/mirrors/:name", apiMirrorsUpdate)
root.DELETE("/mirrors/:name", apiMirrorsDrop)
}
{
root.POST("/gpg/key", apiGPGAddKey)
}
{
root.GET("/files", apiFilesListDirs)
root.POST("/files/:dir", apiFilesUpload)
@@ -114,6 +143,21 @@ func Router(c *ctx.AptlyContext) http.Handler {
{
root.GET("/graph.:ext", apiGraph)
}
{
root.POST("/db/cleanup", apiDbCleanup)
}
{
root.GET("/tasks", apiTasksList)
root.POST("/tasks-clear", apiTasksClear)
root.GET("/tasks-wait", apiTasksWait)
root.GET("/tasks/:id/wait", apiTasksWaitForTaskByID)
root.GET("/tasks/:id/output", apiTasksOutputShow)
root.GET("/tasks/:id/detail", apiTasksDetailShow)
root.GET("/tasks/:id/return_value", apiTasksReturnValueShow)
root.GET("/tasks/:id", apiTasksShow)
root.DELETE("/tasks/:id", apiTasksDelete)
root.POST("/tasks-dummy", apiTasksDummy)
}
return router
}
+139 -151
View File
@@ -2,9 +2,12 @@ package api
import (
"fmt"
"net/http"
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/deb"
"github.com/aptly-dev/aptly/task"
"github.com/gin-gonic/gin"
)
@@ -12,9 +15,8 @@ import (
func apiSnapshotsList(c *gin.Context) {
SortMethodString := c.Request.URL.Query().Get("sort")
collection := context.CollectionFactory().SnapshotCollection()
collection.RLock()
defer collection.RUnlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.SnapshotCollection()
if SortMethodString == "" {
SortMethodString = "name"
@@ -46,49 +48,46 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) {
return
}
collection := context.CollectionFactory().RemoteRepoCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.RemoteRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
name := c.Params.ByName("name")
snapshotCollection := context.CollectionFactory().SnapshotCollection()
snapshotCollection.Lock()
defer snapshotCollection.Unlock()
repo, err = collection.ByName(c.Params.ByName("name"))
repo, err = collection.ByName(name)
if err != nil {
c.AbortWithError(404, err)
return
}
err = repo.CheckLock()
if err != nil {
c.AbortWithError(409, err)
return
}
// including snapshot resource key
resources := []string{string(repo.Key()), "S" + b.Name}
taskName := fmt.Sprintf("Create snapshot of mirror %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
err := repo.CheckLock()
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, err
}
err = collection.LoadComplete(repo)
if err != nil {
c.AbortWithError(500, err)
return
}
err = collection.LoadComplete(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
snapshot, err = deb.NewSnapshotFromRepository(b.Name, repo)
if err != nil {
c.AbortWithError(400, err)
return
}
snapshot, err = deb.NewSnapshotFromRepository(b.Name, repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
}
if b.Description != "" {
snapshot.Description = b.Description
}
if b.Description != "" {
snapshot.Description = b.Description
}
err = snapshotCollection.Add(snapshot)
if err != nil {
c.AbortWithError(400, err)
return
}
c.JSON(201, snapshot)
err = snapshotCollection.Add(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
}
return &task.ProcessReturnValue{Code: http.StatusCreated, Value: snapshot}, nil
})
}
// POST /api/snapshots
@@ -115,9 +114,9 @@ func apiSnapshotsCreate(c *gin.Context) {
}
}
snapshotCollection := context.CollectionFactory().SnapshotCollection()
snapshotCollection.Lock()
defer snapshotCollection.Unlock()
collectionFactory := context.NewCollectionFactory()
snapshotCollection := collectionFactory.SnapshotCollection()
var resources []string
sources := make([]*deb.Snapshot, len(b.SourceSnapshots))
@@ -133,39 +132,36 @@ func apiSnapshotsCreate(c *gin.Context) {
c.AbortWithError(500, err)
return
}
resources = append(resources, string(sources[i].ResourceKey()))
}
list := deb.NewPackageList()
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
list := deb.NewPackageList()
// verify package refs and build package list
for _, ref := range b.PackageRefs {
var p *deb.Package
p, err = context.CollectionFactory().PackageCollection().ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
c.AbortWithError(404, fmt.Errorf("package %s: %s", ref, err))
} else {
c.AbortWithError(500, err)
// verify package refs and build package list
for _, ref := range b.PackageRefs {
p, err := collectionFactory.PackageCollection().ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err)
}
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = list.Add(p)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
}
return
}
err = list.Add(p)
snapshot = deb.NewSnapshotFromRefList(b.Name, sources, deb.NewPackageRefListFromPackageList(list), b.Description)
err = snapshotCollection.Add(snapshot)
if err != nil {
c.AbortWithError(400, err)
return
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
}
}
snapshot = deb.NewSnapshotFromRefList(b.Name, sources, deb.NewPackageRefListFromPackageList(list), b.Description)
err = snapshotCollection.Add(snapshot)
if err != nil {
c.AbortWithError(400, err)
return
}
c.JSON(201, snapshot)
return &task.ProcessReturnValue{Code: http.StatusCreated, Value: nil}, nil
})
}
// POST /api/repos/:name/snapshots
@@ -185,43 +181,41 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) {
return
}
collection := context.CollectionFactory().LocalRepoCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
name := c.Params.ByName("name")
snapshotCollection := context.CollectionFactory().SnapshotCollection()
snapshotCollection.Lock()
defer snapshotCollection.Unlock()
repo, err = collection.ByName(c.Params.ByName("name"))
repo, err = collection.ByName(name)
if err != nil {
c.AbortWithError(404, err)
return
}
err = collection.LoadComplete(repo)
if err != nil {
c.AbortWithError(500, err)
return
}
// including snapshot resource key
resources := []string{string(repo.Key()), "S" + b.Name}
taskName := fmt.Sprintf("Create snapshot of repo %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
err := collection.LoadComplete(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
snapshot, err = deb.NewSnapshotFromLocalRepo(b.Name, repo)
if err != nil {
c.AbortWithError(400, err)
return
}
snapshot, err = deb.NewSnapshotFromLocalRepo(b.Name, repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err
}
if b.Description != "" {
snapshot.Description = b.Description
}
if b.Description != "" {
snapshot.Description = b.Description
}
err = snapshotCollection.Add(snapshot)
if err != nil {
c.AbortWithError(400, err)
return
}
c.JSON(201, snapshot)
err = snapshotCollection.Add(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
}
return &task.ProcessReturnValue{Code: http.StatusCreated, Value: snapshot}, nil
})
}
// PUT /api/snapshots/:name
@@ -240,44 +234,44 @@ func apiSnapshotsUpdate(c *gin.Context) {
return
}
collection := context.CollectionFactory().SnapshotCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.SnapshotCollection()
name := c.Params.ByName("name")
snapshot, err = collection.ByName(c.Params.ByName("name"))
snapshot, err = collection.ByName(name)
if err != nil {
c.AbortWithError(404, err)
return
}
_, err = collection.ByName(b.Name)
if err == nil {
c.AbortWithError(409, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name))
return
}
resources := []string{string(snapshot.ResourceKey()), "S" + b.Name}
taskName := fmt.Sprintf("Update snapshot %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
_, err := collection.ByName(b.Name)
if err == nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
}
if b.Name != "" {
snapshot.Name = b.Name
}
if b.Name != "" {
snapshot.Name = b.Name
}
if b.Description != "" {
snapshot.Description = b.Description
}
if b.Description != "" {
snapshot.Description = b.Description
}
err = context.CollectionFactory().SnapshotCollection().Update(snapshot)
if err != nil {
c.AbortWithError(500, err)
return
}
c.JSON(200, snapshot)
err = collectionFactory.SnapshotCollection().Update(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
return &task.ProcessReturnValue{Code: http.StatusOK, Value: snapshot}, nil
})
}
// GET /api/snapshots/:name
func apiSnapshotsShow(c *gin.Context) {
collection := context.CollectionFactory().SnapshotCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.SnapshotCollection()
snapshot, err := collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -299,13 +293,9 @@ func apiSnapshotsDrop(c *gin.Context) {
name := c.Params.ByName("name")
force := c.Request.URL.Query().Get("force") == "1"
snapshotCollection := context.CollectionFactory().SnapshotCollection()
snapshotCollection.Lock()
defer snapshotCollection.Unlock()
publishedCollection := context.CollectionFactory().PublishedRepoCollection()
publishedCollection.RLock()
defer publishedCollection.RUnlock()
collectionFactory := context.NewCollectionFactory()
snapshotCollection := collectionFactory.SnapshotCollection()
publishedCollection := collectionFactory.PublishedRepoCollection()
snapshot, err := snapshotCollection.ByName(name)
if err != nil {
@@ -313,37 +303,36 @@ func apiSnapshotsDrop(c *gin.Context) {
return
}
published := publishedCollection.BySnapshot(snapshot)
resources := []string{string(snapshot.ResourceKey())}
taskName := fmt.Sprintf("Delete snapshot %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
published := publishedCollection.BySnapshot(snapshot)
if len(published) > 0 {
c.AbortWithError(409, fmt.Errorf("unable to drop: snapshot is published"))
return
}
if !force {
snapshots := snapshotCollection.BySnapshotSource(snapshot)
if len(snapshots) > 0 {
c.AbortWithError(409, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override"))
return
if len(published) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: snapshot is published")
}
}
err = snapshotCollection.Drop(snapshot)
if err != nil {
c.AbortWithError(500, err)
return
}
if !force {
snapshots := snapshotCollection.BySnapshotSource(snapshot)
if len(snapshots) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override")
}
}
c.JSON(200, gin.H{})
err = snapshotCollection.Drop(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, nil
})
}
// GET /api/snapshots/:name/diff/:withSnapshot
func apiSnapshotsDiff(c *gin.Context) {
onlyMatching := c.Request.URL.Query().Get("onlyMatching") == "1"
collection := context.CollectionFactory().SnapshotCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.SnapshotCollection()
snapshotA, err := collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -370,7 +359,7 @@ func apiSnapshotsDiff(c *gin.Context) {
}
// Calculate diff
diff, err := snapshotA.RefList().Diff(snapshotB.RefList(), context.CollectionFactory().PackageCollection())
diff, err := snapshotA.RefList().Diff(snapshotB.RefList(), collectionFactory.PackageCollection())
if err != nil {
c.AbortWithError(500, err)
return
@@ -391,9 +380,8 @@ func apiSnapshotsDiff(c *gin.Context) {
// GET /api/snapshots/:name/packages
func apiSnapshotsSearchPackages(c *gin.Context) {
collection := context.CollectionFactory().SnapshotCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.SnapshotCollection()
snapshot, err := collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -407,5 +395,5 @@ func apiSnapshotsSearchPackages(c *gin.Context) {
return
}
showPackages(c, snapshot.RefList())
showPackages(c, snapshot.RefList(), collectionFactory)
}
+155
View File
@@ -0,0 +1,155 @@
package api
import (
"fmt"
"net/http"
"strconv"
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/task"
"github.com/gin-gonic/gin"
)
// GET /tasks
func apiTasksList(c *gin.Context) {
list := context.TaskList()
c.JSON(200, list.GetTasks())
}
// POST /tasks-clear
func apiTasksClear(c *gin.Context) {
list := context.TaskList()
list.Clear()
c.JSON(200, gin.H{})
}
// GET /tasks-wait
func apiTasksWait(c *gin.Context) {
list := context.TaskList()
list.Wait()
c.JSON(200, gin.H{})
}
// GET /tasks/:id/wait
func apiTasksWaitForTaskByID(c *gin.Context) {
list := context.TaskList()
id, err := strconv.ParseInt(c.Params.ByName("id"), 10, 0)
if err != nil {
c.AbortWithError(500, err)
return
}
task, err := list.WaitForTaskByID(int(id))
if err != nil {
c.AbortWithError(400, err)
return
}
c.JSON(200, task)
}
// GET /tasks/:id
func apiTasksShow(c *gin.Context) {
list := context.TaskList()
id, err := strconv.ParseInt(c.Params.ByName("id"), 10, 0)
if err != nil {
c.AbortWithError(500, err)
return
}
var task task.Task
task, err = list.GetTaskByID(int(id))
if err != nil {
c.AbortWithError(404, err)
return
}
c.JSON(200, task)
}
// GET /tasks/:id/output
func apiTasksOutputShow(c *gin.Context) {
list := context.TaskList()
id, err := strconv.ParseInt(c.Params.ByName("id"), 10, 0)
if err != nil {
c.AbortWithError(500, err)
return
}
var output string
output, err = list.GetTaskOutputByID(int(id))
if err != nil {
c.AbortWithError(404, err)
return
}
c.JSON(200, output)
}
// GET /tasks/:id/detail
func apiTasksDetailShow(c *gin.Context) {
list := context.TaskList()
id, err := strconv.ParseInt(c.Params.ByName("id"), 10, 0)
if err != nil {
c.AbortWithError(500, err)
return
}
var detail interface{}
detail, err = list.GetTaskDetailByID(int(id))
if err != nil {
c.AbortWithError(404, err)
return
}
c.JSON(200, detail)
}
// GET /tasks/:id/return_value
func apiTasksReturnValueShow(c *gin.Context) {
list := context.TaskList()
id, err := strconv.ParseInt(c.Params.ByName("id"), 10, 0)
if err != nil {
c.AbortWithError(500, err)
return
}
output, err := list.GetTaskReturnValueByID(int(id))
if err != nil {
c.AbortWithError(404, err)
return
}
c.JSON(200, output)
}
// DELETE /tasks/:id
func apiTasksDelete(c *gin.Context) {
list := context.TaskList()
id, err := strconv.ParseInt(c.Params.ByName("id"), 10, 0)
if err != nil {
c.AbortWithError(500, err)
return
}
var delTask task.Task
delTask, err = list.DeleteTaskByID(int(id))
if err != nil {
c.AbortWithError(400, err)
return
}
c.JSON(200, delTask)
}
// POST /tasks-dummy
func apiTasksDummy(c *gin.Context) {
resources := []string{"dummy"}
taskName := fmt.Sprintf("Dummy task")
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
out.Printf("Dummy task started\n")
detail.Store([]int{1, 2, 3})
out.Printf("Dummy task finished\n")
return &task.ProcessReturnValue{Code: http.StatusTeapot, Value: []int{1, 2, 3}}, nil
})
}
+75
View File
@@ -0,0 +1,75 @@
package api
import (
"encoding/json"
"fmt"
"time"
"github.com/aptly-dev/aptly/task"
. "gopkg.in/check.v1"
)
type TaskSuite struct {
ApiSuite
}
var _ = Suite(&TaskSuite{})
func (s *TaskSuite) TestTasksDummy(c *C) {
response, _ := s.HTTPRequest("POST", "/api/tasks-dummy", nil)
c.Check(response.Code, Equals, 418)
c.Check(response.Body.String(), Equals, "[1,2,3]")
}
func (s *TaskSuite) TestTasksDummyAsync(c *C) {
response, _ := s.HTTPRequest("POST", "/api/tasks-dummy?_async=true", nil)
c.Check(response.Code, Equals, 202)
var t task.Task
err := json.Unmarshal(response.Body.Bytes(), &t)
c.Assert(err, IsNil)
c.Check(t.Name, Equals, "Dummy task")
response, _ = s.HTTPRequest("GET", fmt.Sprintf("/api/tasks/%d/wait", t.ID), nil)
err = json.Unmarshal(response.Body.Bytes(), &t)
c.Assert(err, IsNil)
c.Check(t.State, Equals, task.SUCCEEDED)
response, _ = s.HTTPRequest("GET", fmt.Sprintf("/api/tasks/%d/detail", t.ID), nil)
c.Check(response.Code, Equals, 200)
c.Check(response.Body.String(), Equals, "[1,2,3]")
response, _ = s.HTTPRequest("GET", fmt.Sprintf("/api/tasks/%d/output", t.ID), nil)
c.Check(response.Code, Equals, 200)
c.Check(response.Body.String(), Matches, "\"Dummy task started.*")
}
func (s *TaskSuite) TestTaskDelete(c *C) {
response, _ := s.HTTPRequest("POST", "/api/tasks-dummy?_async=true", nil)
c.Check(response.Code, Equals, 202)
c.Check(response.Body.String(), Equals, "{\"Name\":\"Dummy task\",\"ID\":1,\"State\":0}")
// Give the task time to start
time.Sleep(time.Second)
response, _ = s.HTTPRequest("DELETE", "/api/tasks/1", nil)
c.Check(response.Code, Equals, 200)
}
func (s *TaskSuite) TestTasksClear(c *C) {
response, _ := s.HTTPRequest("POST", "/api/tasks-dummy?_async=true", nil)
c.Check(response.Code, Equals, 202)
var t task.Task
err := json.Unmarshal(response.Body.Bytes(), &t)
c.Assert(err, IsNil)
c.Check(t.Name, Equals, "Dummy task")
response, _ = s.HTTPRequest("GET", "/api/tasks-wait", nil)
c.Check(response.Code, Equals, 200)
response, _ = s.HTTPRequest("GET", "/api/tasks", nil)
c.Check(response.Code, Equals, 200)
var ts []task.Task
err = json.Unmarshal(response.Body.Bytes(), &ts)
c.Assert(err, IsNil)
c.Check(len(ts), Equals, 1)
c.Check(ts[0].State, Equals, task.SUCCEEDED)
response, _ = s.HTTPRequest("POST", "/api/tasks-clear", nil)
c.Check(response.Code, Equals, 200)
response, _ = s.HTTPRequest("GET", "/api/tasks", nil)
c.Check(response.Code, Equals, 200)
c.Check(response.Body.String(), Equals, "null")
}