mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-06-01 04:40:38 +00:00
Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c77d788493 | |||
| 5ff552d919 | |||
| 4defa49b7f | |||
| 6fbcbc108c | |||
| 41f5d22637 | |||
| 8179f73bf0 | |||
| f8efb3e9b7 | |||
| 55b2943f44 | |||
| 9280231c1d |
@@ -155,7 +155,7 @@ jobs:
|
|||||||
strategy:
|
strategy:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
name: ["Debian 13/trixie", "Debian 12/bookworm", "Debian 11/bullseye", "Ubuntu 26.04", "Ubuntu 24.04", "Ubuntu 22.04", "Ubuntu 20.04"]
|
name: ["Debian 13/trixie", "Debian 12/bookworm", "Debian 11/bullseye", "Ubuntu 24.04", "Ubuntu 22.04", "Ubuntu 20.04"]
|
||||||
arch: ["amd64", "i386" , "arm64" , "armhf"]
|
arch: ["amd64", "i386" , "arm64" , "armhf"]
|
||||||
include:
|
include:
|
||||||
- name: "Debian 13/trixie"
|
- name: "Debian 13/trixie"
|
||||||
@@ -167,9 +167,6 @@ jobs:
|
|||||||
- name: "Debian 11/bullseye"
|
- name: "Debian 11/bullseye"
|
||||||
suite: bullseye
|
suite: bullseye
|
||||||
image: debian:bullseye-slim
|
image: debian:bullseye-slim
|
||||||
- name: "Ubuntu 26.04"
|
|
||||||
suite: resolute
|
|
||||||
image: ubuntu:26.04
|
|
||||||
- name: "Ubuntu 24.04"
|
- name: "Ubuntu 24.04"
|
||||||
suite: noble
|
suite: noble
|
||||||
image: ubuntu:24.04
|
image: ubuntu:24.04
|
||||||
|
|||||||
+2
-2
@@ -16,7 +16,7 @@ Please report unacceptable behavior on [https://github.com/aptly-dev/aptly/discu
|
|||||||
### List of Repositories
|
### List of Repositories
|
||||||
|
|
||||||
* [aptly-dev/aptly](https://github.com/aptly-dev/aptly) - aptly source code, functional tests, man page
|
* [aptly-dev/aptly](https://github.com/aptly-dev/aptly) - aptly source code, functional tests, man page
|
||||||
* [aptly-dev/aptly-dev.github.io](https://github.com/aptly-dev/aptly-dev.github.io) - aptly website (https://www.aptly.info/)
|
* [apty-dev/aptly-dev.github.io](https://github.com/aptly-dev/aptly-dev.github.io) - aptly website (https://www.aptly.info/)
|
||||||
* [aptly-dev/aptly-fixture-db](https://github.com/aptly-dev/aptly-fixture-db) & [aptly-dev/aptly-fixture-pool](https://github.com/aptly-dev/aptly-fixture-pool) provide
|
* [aptly-dev/aptly-fixture-db](https://github.com/aptly-dev/aptly-fixture-db) & [aptly-dev/aptly-fixture-pool](https://github.com/aptly-dev/aptly-fixture-pool) provide
|
||||||
fixtures for aptly functional tests
|
fixtures for aptly functional tests
|
||||||
|
|
||||||
@@ -137,7 +137,7 @@ make docker-unit-tests
|
|||||||
|
|
||||||
In order to run aptly system tests, enter the following:
|
In order to run aptly system tests, enter the following:
|
||||||
```
|
```
|
||||||
make docker-system-test
|
make docker-system-tests
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Running golangci-lint
|
#### Running golangci-lint
|
||||||
|
|||||||
@@ -241,4 +241,4 @@ clean: ## remove local build and module cache
|
|||||||
rm -f unit.out aptly.test VERSION docs/docs.go docs/swagger.json docs/swagger.yaml docs/swagger.conf
|
rm -f unit.out aptly.test VERSION docs/docs.go docs/swagger.json docs/swagger.yaml docs/swagger.conf
|
||||||
find system/ -type d -name __pycache__ -exec rm -rf {} \; 2>/dev/null || true
|
find system/ -type d -name __pycache__ -exec rm -rf {} \; 2>/dev/null || true
|
||||||
|
|
||||||
.PHONY: help man prepare swagger version binaries build docker-release docker-system-test docker-unit-test docker-lint docker-build docker-image docker-man docker-shell docker-serve clean releasetype dpkg serve flake8
|
.PHONY: help man prepare swagger version binaries build docker-release docker-system-tests docker-unit-test docker-lint docker-build docker-image docker-man docker-shell docker-serve clean releasetype dpkg serve flake8
|
||||||
|
|||||||
+1
-1
@@ -54,7 +54,7 @@ type gpgDeleteKeyParams struct {
|
|||||||
// @Summary Add GPG Keys
|
// @Summary Add GPG Keys
|
||||||
// @Description **Adds GPG keys to aptly keyring**
|
// @Description **Adds GPG keys to aptly keyring**
|
||||||
// @Description
|
// @Description
|
||||||
// @Description Add GPG public keys for verifying remote repositories for mirroring.
|
// @Description Add GPG public keys for veryfing remote repositories for mirroring.
|
||||||
// @Description
|
// @Description
|
||||||
// @Description Keys can be added in two ways:
|
// @Description Keys can be added in two ways:
|
||||||
// @Description * By providing the ASCII armord key in `GpgKeyArmor` (leave Keyserver and GpgKeyID empty)
|
// @Description * By providing the ASCII armord key in `GpgKeyArmor` (leave Keyserver and GpgKeyID empty)
|
||||||
|
|||||||
+9
-41
@@ -216,9 +216,9 @@ func apiMirrorsDrop(c *gin.Context) {
|
|||||||
name := c.Params.ByName("name")
|
name := c.Params.ByName("name")
|
||||||
force := c.Request.URL.Query().Get("force") == "1"
|
force := c.Request.URL.Query().Get("force") == "1"
|
||||||
|
|
||||||
// Phase 1: Pre-task validation (shallow load for 404 check only)
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
mirrorCollection := collectionFactory.RemoteRepoCollection()
|
mirrorCollection := collectionFactory.RemoteRepoCollection()
|
||||||
|
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||||
|
|
||||||
repo, err := mirrorCollection.ByName(name)
|
repo, err := mirrorCollection.ByName(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -228,34 +228,21 @@ func apiMirrorsDrop(c *gin.Context) {
|
|||||||
|
|
||||||
resources := []string{string(repo.Key())}
|
resources := []string{string(repo.Key())}
|
||||||
taskName := fmt.Sprintf("Delete mirror %s", name)
|
taskName := fmt.Sprintf("Delete mirror %s", name)
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
// Phase 2: Inside task lock - create fresh collections
|
err := repo.CheckLock()
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
|
||||||
taskMirrorCollection := taskCollectionFactory.RemoteRepoCollection()
|
|
||||||
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
|
||||||
|
|
||||||
// Fresh load after lock acquired
|
|
||||||
repo, err := taskMirrorCollection.ByName(name)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = repo.CheckLock()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !force {
|
if !force {
|
||||||
// Fresh checks with current collections
|
snapshots := snapshotCollection.ByRemoteRepoSource(repo)
|
||||||
snapshots := taskSnapshotCollection.ByRemoteRepoSource(repo)
|
|
||||||
|
|
||||||
if len(snapshots) > 0 {
|
if len(snapshots) > 0 {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusForbidden, Value: nil}, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override")
|
return &task.ProcessReturnValue{Code: http.StatusForbidden, Value: nil}, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = taskMirrorCollection.Drop(repo)
|
err = mirrorCollection.Drop(repo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
|
||||||
}
|
}
|
||||||
@@ -510,7 +497,7 @@ func apiMirrorsEdit(c *gin.Context) {
|
|||||||
type mirrorUpdateParams struct {
|
type mirrorUpdateParams struct {
|
||||||
// Change mirror name to `Name`
|
// Change mirror name to `Name`
|
||||||
Name string ` json:"Name" example:"mirror1"`
|
Name string ` json:"Name" example:"mirror1"`
|
||||||
// Gpg keyring(s) for verifying Release file
|
// Gpg keyring(s) for verifing Release file
|
||||||
Keyrings []string ` json:"Keyrings" example:"trustedkeys.gpg"`
|
Keyrings []string ` json:"Keyrings" example:"trustedkeys.gpg"`
|
||||||
// Set "true" to ignore checksum errors
|
// Set "true" to ignore checksum errors
|
||||||
IgnoreChecksums bool ` json:"IgnoreChecksums"`
|
IgnoreChecksums bool ` json:"IgnoreChecksums"`
|
||||||
@@ -548,8 +535,7 @@ func apiMirrorsUpdate(c *gin.Context) {
|
|||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.RemoteRepoCollection()
|
collection := collectionFactory.RemoteRepoCollection()
|
||||||
|
|
||||||
name := c.Params.ByName("name")
|
remote, err = collection.ByName(c.Params.ByName("name"))
|
||||||
remote, err = collection.ByName(name)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, 404, err)
|
AbortWithJSONError(c, 404, err)
|
||||||
return
|
return
|
||||||
@@ -564,7 +550,6 @@ func apiMirrorsUpdate(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pre-task validation of new name if provided
|
|
||||||
if b.Name != remote.Name {
|
if b.Name != remote.Name {
|
||||||
_, err = collection.ByName(b.Name)
|
_, err = collection.ByName(b.Name)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@@ -581,26 +566,9 @@ func apiMirrorsUpdate(c *gin.Context) {
|
|||||||
|
|
||||||
resources := []string{string(remote.Key())}
|
resources := []string{string(remote.Key())}
|
||||||
maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
// Phase 2: Inside task lock - create fresh factory
|
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
|
||||||
taskCollection := taskCollectionFactory.RemoteRepoCollection()
|
|
||||||
|
|
||||||
// Fresh load after lock acquired (use captured `name` variable, not gin context)
|
|
||||||
remote, err := taskCollection.ByName(name)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fresh rename check inside lock (if renaming)
|
|
||||||
if b.Name != remote.Name {
|
|
||||||
_, err := taskCollection.ByName(b.Name)
|
|
||||||
if err == nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: mirror %s already exists", b.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
downloader := context.NewDownloader(out)
|
downloader := context.NewDownloader(out)
|
||||||
err = remote.Fetch(downloader, verifier, b.IgnoreSignatures)
|
err := remote.Fetch(downloader, verifier, b.IgnoreSignatures)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
}
|
}
|
||||||
@@ -812,8 +780,8 @@ func apiMirrorsUpdate(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Info().Msgf("%s: Finalizing download...", b.Name)
|
log.Info().Msgf("%s: Finalizing download...", b.Name)
|
||||||
_ = remote.FinalizeDownload(taskCollectionFactory, out)
|
_ = remote.FinalizeDownload(collectionFactory, out)
|
||||||
err = taskCollection.Update(remote)
|
err = collectionFactory.RemoteRepoCollection().Update(remote)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
}
|
}
|
||||||
|
|||||||
+251
-304
@@ -2,7 +2,6 @@ package api
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
@@ -125,7 +124,7 @@ func apiPublishList(c *gin.Context) {
|
|||||||
// @Description See also: `aptly publish show`
|
// @Description See also: `aptly publish show`
|
||||||
// @Tags Publish
|
// @Tags Publish
|
||||||
// @Produce json
|
// @Produce json
|
||||||
// @Param prefix path string true "publishing prefix, use `:.` instead of `.` because it is ambiguous in URLs"
|
// @Param prefix path string true "publishing prefix, use `:.` instead of `.` because it is ambigious in URLs"
|
||||||
// @Param distribution path string true "distribution name"
|
// @Param distribution path string true "distribution name"
|
||||||
// @Success 200 {object} deb.PublishedRepo
|
// @Success 200 {object} deb.PublishedRepo
|
||||||
// @Failure 404 {object} Error "Published repository not found"
|
// @Failure 404 {object} Error "Published repository not found"
|
||||||
@@ -256,13 +255,13 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
|
|||||||
if b.SourceKind == deb.SourceSnapshot {
|
if b.SourceKind == deb.SourceSnapshot {
|
||||||
var snapshot *deb.Snapshot
|
var snapshot *deb.Snapshot
|
||||||
|
|
||||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
tmpCollection := collectionFactory.SnapshotCollection()
|
||||||
|
|
||||||
for _, source := range b.Sources {
|
for _, source := range b.Sources {
|
||||||
components = append(components, source.Component)
|
components = append(components, source.Component)
|
||||||
names = append(names, source.Name)
|
names = append(names, source.Name)
|
||||||
|
|
||||||
snapshot, err = snapshotCollection.ByName(source.Name)
|
snapshot, err = tmpCollection.ByName(source.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err))
|
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err))
|
||||||
return
|
return
|
||||||
@@ -274,13 +273,13 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
|
|||||||
} else if b.SourceKind == deb.SourceLocalRepo {
|
} else if b.SourceKind == deb.SourceLocalRepo {
|
||||||
var localRepo *deb.LocalRepo
|
var localRepo *deb.LocalRepo
|
||||||
|
|
||||||
localCollection := collectionFactory.LocalRepoCollection()
|
tmpCollection := collectionFactory.LocalRepoCollection()
|
||||||
|
|
||||||
for _, source := range b.Sources {
|
for _, source := range b.Sources {
|
||||||
components = append(components, source.Component)
|
components = append(components, source.Component)
|
||||||
names = append(names, source.Name)
|
names = append(names, source.Name)
|
||||||
|
|
||||||
localRepo, err = localCollection.ByName(source.Name)
|
localRepo, err = tmpCollection.ByName(source.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err))
|
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err))
|
||||||
return
|
return
|
||||||
@@ -299,25 +298,11 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
|
|||||||
multiDist = *b.MultiDist
|
multiDist = *b.MultiDist
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pre-register the published repo key in resources so that concurrent
|
collection := collectionFactory.PublishedRepoCollection()
|
||||||
// POST requests for the same prefix/distribution are serialized by the
|
|
||||||
// task queue rather than racing on CheckDuplicate + Add.
|
|
||||||
if b.Distribution != "" {
|
|
||||||
storagePrefix := prefix
|
|
||||||
if storage != "" {
|
|
||||||
storagePrefix = storage + ":" + prefix
|
|
||||||
}
|
|
||||||
resources = append(resources, "U"+storagePrefix+">>"+b.Distribution)
|
|
||||||
} else {
|
|
||||||
log.Printf("distribution not specified for publish to prefix '%s' - unable to lock ", prefix)
|
|
||||||
}
|
|
||||||
|
|
||||||
taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"",
|
taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"",
|
||||||
b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`))
|
b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`))
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
|
||||||
taskCollection := taskCollectionFactory.PublishedRepoCollection()
|
|
||||||
|
|
||||||
taskDetail := task.PublishDetail{
|
taskDetail := task.PublishDetail{
|
||||||
Detail: detail,
|
Detail: detail,
|
||||||
}
|
}
|
||||||
@@ -329,10 +314,10 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
|
|||||||
for _, source := range sources {
|
for _, source := range sources {
|
||||||
switch s := source.(type) {
|
switch s := source.(type) {
|
||||||
case *deb.Snapshot:
|
case *deb.Snapshot:
|
||||||
snapshotCollection := taskCollectionFactory.SnapshotCollection()
|
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||||
err = snapshotCollection.LoadComplete(s)
|
err = snapshotCollection.LoadComplete(s)
|
||||||
case *deb.LocalRepo:
|
case *deb.LocalRepo:
|
||||||
localCollection := taskCollectionFactory.LocalRepoCollection()
|
localCollection := collectionFactory.LocalRepoCollection()
|
||||||
err = localCollection.LoadComplete(s)
|
err = localCollection.LoadComplete(s)
|
||||||
default:
|
default:
|
||||||
err = fmt.Errorf("unexpected type for source: %T", source)
|
err = fmt.Errorf("unexpected type for source: %T", source)
|
||||||
@@ -342,7 +327,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, taskCollectionFactory, multiDist)
|
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, multiDist)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
|
||||||
}
|
}
|
||||||
@@ -380,18 +365,18 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
|
|||||||
published.Version = b.Version
|
published.Version = b.Version
|
||||||
}
|
}
|
||||||
|
|
||||||
duplicate := taskCollection.CheckDuplicate(published)
|
duplicate := collection.CheckDuplicate(published)
|
||||||
if duplicate != nil {
|
if duplicate != nil {
|
||||||
_ = taskCollectionFactory.PublishedRepoCollection().LoadComplete(duplicate, taskCollectionFactory)
|
_ = collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory)
|
||||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate)
|
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath())
|
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = taskCollection.Add(published)
|
err = collection.Add(published)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
||||||
}
|
}
|
||||||
@@ -400,6 +385,46 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Return resources to be locked for a Snapshot name
|
||||||
|
func getSnapshotResources(snapshotCollection *deb.SnapshotCollection, snapshotName string) (resources []string, err error) {
|
||||||
|
snapshot, err := snapshotCollection.ByName(snapshotName)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resources = append(resources, string(snapshot.ResourceKey()))
|
||||||
|
|
||||||
|
for _, sourceID := range snapshot.SourceIDs {
|
||||||
|
if snapshot.SourceKind == deb.SourceSnapshot {
|
||||||
|
snapshot2, err2 := snapshotCollection.ByUUID(sourceID)
|
||||||
|
if err2 != nil {
|
||||||
|
err = err2
|
||||||
|
return
|
||||||
|
}
|
||||||
|
res, err3 := getSnapshotResources(snapshotCollection, snapshot2.Name)
|
||||||
|
if err3 != nil {
|
||||||
|
err = err3
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resources = append(resources, res...)
|
||||||
|
} else if snapshot.SourceKind == deb.SourceLocalRepo {
|
||||||
|
var repo *deb.LocalRepo
|
||||||
|
repo, err = context.NewCollectionFactory().LocalRepoCollection().ByUUID(sourceID)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resources = append(resources, string(repo.Key()))
|
||||||
|
} else if snapshot.SourceKind == deb.SourceRemoteRepo {
|
||||||
|
var mirror *deb.RemoteRepo
|
||||||
|
mirror, err = context.NewCollectionFactory().RemoteRepoCollection().ByUUID(sourceID)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resources = append(resources, string(mirror.Key()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
type publishedRepoUpdateSwitchParams struct {
|
type publishedRepoUpdateSwitchParams struct {
|
||||||
// when publishing, overwrite files in pool/ directory without notice
|
// when publishing, overwrite files in pool/ directory without notice
|
||||||
ForceOverwrite bool ` json:"ForceOverwrite" example:"false"`
|
ForceOverwrite bool ` json:"ForceOverwrite" example:"false"`
|
||||||
@@ -419,12 +444,12 @@ type publishedRepoUpdateSwitchParams struct {
|
|||||||
SignedBy *string ` json:"SignedBy" example:""`
|
SignedBy *string ` json:"SignedBy" example:""`
|
||||||
// Enable multiple packages with the same filename in different distributions
|
// Enable multiple packages with the same filename in different distributions
|
||||||
MultiDist *bool ` json:"MultiDist" example:"false"`
|
MultiDist *bool ` json:"MultiDist" example:"false"`
|
||||||
// Value of Label: field in published repository stanza
|
// Value of Label: field in published repository stanza
|
||||||
Label *string ` json:"Label" example:"Debian"`
|
Label *string ` json:"Label" example:"Debian"`
|
||||||
// Value of Origin: field in published repository stanza
|
// Value of Origin: field in published repository stanza
|
||||||
Origin *string ` json:"Origin" example:"Debian"`
|
Origin *string ` json:"Origin" example:"Debian"`
|
||||||
// Version of the release: Optional
|
// Version of the release: Optional
|
||||||
Version *string ` json:"Version" example:"13.3"`
|
Version *string ` json:"Version" example:"13.3"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// @Summary Update Published Repository
|
// @Summary Update Published Repository
|
||||||
@@ -471,7 +496,6 @@ func apiPublishUpdateSwitch(c *gin.Context) {
|
|||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.PublishedRepoCollection()
|
collection := collectionFactory.PublishedRepoCollection()
|
||||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||||
localRepoCollection := collectionFactory.LocalRepoCollection()
|
|
||||||
|
|
||||||
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -486,71 +510,69 @@ func apiPublishUpdateSwitch(c *gin.Context) {
|
|||||||
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo"))
|
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, uuid := range published.Sources {
|
|
||||||
repo, err2 := localRepoCollection.ByUUID(uuid)
|
localCollection := collectionFactory.LocalRepoCollection()
|
||||||
|
for _, sourceID := range published.Sources {
|
||||||
|
localRepo, err2 := localCollection.ByUUID(sourceID)
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, err2)
|
AbortWithJSONError(c, http.StatusNotFound, err2)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resources = append(resources, string(repo.Key()))
|
resources = append(resources, string(localRepo.Key()))
|
||||||
}
|
}
|
||||||
} else if published.SourceKind == deb.SourceSnapshot {
|
} else if published.SourceKind == deb.SourceSnapshot {
|
||||||
for _, snapshotInfo := range b.Snapshots {
|
for _, snapshotInfo := range b.Snapshots {
|
||||||
snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name)
|
res, err2 := getSnapshotResources(snapshotCollection, snapshotInfo.Name)
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, err2)
|
AbortWithJSONError(c, http.StatusNotFound, err2)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resources = append(resources, string(snapshot.ResourceKey()))
|
resources = append(resources, res...)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type"))
|
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Field mutations and fresh DB load are deferred to inside the task so
|
if b.SkipContents != nil {
|
||||||
// they always operate on a consistent state after the lock is held.
|
published.SkipContents = *b.SkipContents
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.SkipBz2 != nil {
|
||||||
|
published.SkipBz2 = *b.SkipBz2
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.AcquireByHash != nil {
|
||||||
|
published.AcquireByHash = *b.AcquireByHash
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.SignedBy != nil {
|
||||||
|
published.SignedBy = *b.SignedBy
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.MultiDist != nil {
|
||||||
|
published.MultiDist = *b.MultiDist
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.Label != nil {
|
||||||
|
published.Label = *b.Label
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.Origin != nil {
|
||||||
|
published.Origin = *b.Origin
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.Version != nil {
|
||||||
|
published.Version = *b.Version
|
||||||
|
}
|
||||||
|
|
||||||
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
err = collection.LoadComplete(published, collectionFactory)
|
||||||
taskCollection := taskCollectionFactory.PublishedRepoCollection()
|
|
||||||
|
|
||||||
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = taskCollection.LoadComplete(published, taskCollectionFactory)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply field mutations on the freshly loaded object.
|
|
||||||
if b.SkipContents != nil {
|
|
||||||
published.SkipContents = *b.SkipContents
|
|
||||||
}
|
|
||||||
if b.SkipBz2 != nil {
|
|
||||||
published.SkipBz2 = *b.SkipBz2
|
|
||||||
}
|
|
||||||
if b.AcquireByHash != nil {
|
|
||||||
published.AcquireByHash = *b.AcquireByHash
|
|
||||||
}
|
|
||||||
if b.SignedBy != nil {
|
|
||||||
published.SignedBy = *b.SignedBy
|
|
||||||
}
|
|
||||||
if b.MultiDist != nil {
|
|
||||||
published.MultiDist = *b.MultiDist
|
|
||||||
}
|
|
||||||
if b.Label != nil {
|
|
||||||
published.Label = *b.Label
|
|
||||||
}
|
|
||||||
if b.Origin != nil {
|
|
||||||
published.Origin = *b.Origin
|
|
||||||
}
|
|
||||||
if b.Version != nil {
|
|
||||||
published.Version = *b.Version
|
|
||||||
}
|
|
||||||
|
|
||||||
revision := published.ObtainRevision()
|
revision := published.ObtainRevision()
|
||||||
sources := revision.Sources
|
sources := revision.Sources
|
||||||
|
|
||||||
@@ -562,17 +584,17 @@ func apiPublishUpdateSwitch(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := published.Update(taskCollectionFactory, out)
|
result, err := published.Update(collectionFactory, out)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
|
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = taskCollection.Update(published)
|
err = collection.Update(published)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
||||||
}
|
}
|
||||||
@@ -580,7 +602,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
|
|||||||
if b.SkipCleanup == nil || !*b.SkipCleanup {
|
if b.SkipCleanup == nil || !*b.SkipCleanup {
|
||||||
cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources))
|
cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources))
|
||||||
cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...)
|
cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...)
|
||||||
err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out)
|
err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
}
|
}
|
||||||
@@ -630,11 +652,8 @@ func apiPublishDrop(c *gin.Context) {
|
|||||||
resources := []string{string(published.Key())}
|
resources := []string{string(published.Key())}
|
||||||
taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
err := collection.Remove(context, storage, prefix, distribution,
|
||||||
taskCollection := taskCollectionFactory.PublishedRepoCollection()
|
collectionFactory, out, force, skipCleanup)
|
||||||
|
|
||||||
err := taskCollection.Remove(context, storage, prefix, distribution,
|
|
||||||
taskCollectionFactory, out, force, skipCleanup)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %s", err)
|
||||||
}
|
}
|
||||||
@@ -670,52 +689,43 @@ func apiPublishAddSource(c *gin.Context) {
|
|||||||
storage, prefix := deb.ParsePrefix(param)
|
storage, prefix := deb.ParsePrefix(param)
|
||||||
distribution := slashEscape(c.Params.ByName("distribution"))
|
distribution := slashEscape(c.Params.ByName("distribution"))
|
||||||
|
|
||||||
if c.Bind(&b) != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.PublishedRepoCollection()
|
collection := collectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
// Load shallowly (no LoadComplete) to verify existence and obtain the
|
|
||||||
// resource key and task name. The actual mutation is performed inside
|
|
||||||
// the task on a freshly loaded copy to prevent lost-update races.
|
|
||||||
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err))
|
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = collection.LoadComplete(published, collectionFactory)
|
||||||
|
if err != nil {
|
||||||
|
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to create: %s", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.Bind(&b) != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
revision := published.ObtainRevision()
|
||||||
|
sources := revision.Sources
|
||||||
|
|
||||||
|
component := b.Component
|
||||||
|
name := b.Name
|
||||||
|
|
||||||
|
_, exists := sources[component]
|
||||||
|
if exists {
|
||||||
|
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("unable to create: Component '%s' already exists", component))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sources[component] = name
|
||||||
|
|
||||||
resources := []string{string(published.Key())}
|
resources := []string{string(published.Key())}
|
||||||
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
err = collection.Update(published)
|
||||||
taskCollection := taskCollectionFactory.PublishedRepoCollection()
|
|
||||||
|
|
||||||
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to create: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = taskCollection.LoadComplete(published, taskCollectionFactory)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
revision := published.ObtainRevision()
|
|
||||||
sources := revision.Sources
|
|
||||||
|
|
||||||
component := b.Component
|
|
||||||
name := b.Name
|
|
||||||
|
|
||||||
_, exists := sources[component]
|
|
||||||
if exists {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("unable to create: Component '%s' already exists", component)
|
|
||||||
}
|
|
||||||
|
|
||||||
sources[component] = name
|
|
||||||
|
|
||||||
err = taskCollection.Update(published)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
||||||
}
|
}
|
||||||
@@ -797,48 +807,39 @@ func apiPublishSetSources(c *gin.Context) {
|
|||||||
storage, prefix := deb.ParsePrefix(param)
|
storage, prefix := deb.ParsePrefix(param)
|
||||||
distribution := slashEscape(c.Params.ByName("distribution"))
|
distribution := slashEscape(c.Params.ByName("distribution"))
|
||||||
|
|
||||||
if c.Bind(&b) != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.PublishedRepoCollection()
|
collection := collectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
// Load shallowly for 404 check, resource key, and task name.
|
|
||||||
// Full load and mutation happen inside the task.
|
|
||||||
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
|
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = collection.LoadComplete(published, collectionFactory)
|
||||||
|
if err != nil {
|
||||||
|
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.Bind(&b) != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
revision := published.ObtainRevision()
|
||||||
|
sources := make(map[string]string, len(b))
|
||||||
|
revision.Sources = sources
|
||||||
|
|
||||||
|
for _, source := range b {
|
||||||
|
component := source.Component
|
||||||
|
name := source.Name
|
||||||
|
sources[component] = name
|
||||||
|
}
|
||||||
|
|
||||||
resources := []string{string(published.Key())}
|
resources := []string{string(published.Key())}
|
||||||
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
err = collection.Update(published)
|
||||||
taskCollection := taskCollectionFactory.PublishedRepoCollection()
|
|
||||||
|
|
||||||
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = taskCollection.LoadComplete(published, taskCollectionFactory)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
revision := published.ObtainRevision()
|
|
||||||
sources := make(map[string]string, len(b))
|
|
||||||
revision.Sources = sources
|
|
||||||
|
|
||||||
for _, source := range b {
|
|
||||||
component := source.Component
|
|
||||||
name := source.Name
|
|
||||||
sources[component] = name
|
|
||||||
}
|
|
||||||
|
|
||||||
err = taskCollection.Update(published)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
||||||
}
|
}
|
||||||
@@ -871,33 +872,24 @@ func apiPublishDropChanges(c *gin.Context) {
|
|||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.PublishedRepoCollection()
|
collection := collectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
// Load shallowly for 404 check, resource key, and task name.
|
|
||||||
// Full load and DropRevision happen inside the task.
|
|
||||||
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
|
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = collection.LoadComplete(published, collectionFactory)
|
||||||
|
if err != nil {
|
||||||
|
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
published.DropRevision()
|
||||||
|
|
||||||
resources := []string{string(published.Key())}
|
resources := []string{string(published.Key())}
|
||||||
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
err = collection.Update(published)
|
||||||
taskCollection := taskCollectionFactory.PublishedRepoCollection()
|
|
||||||
|
|
||||||
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = taskCollection.LoadComplete(published, taskCollectionFactory)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
published.DropRevision()
|
|
||||||
|
|
||||||
err = taskCollection.Update(published)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
||||||
}
|
}
|
||||||
@@ -933,58 +925,51 @@ func apiPublishUpdateSource(c *gin.Context) {
|
|||||||
param := slashEscape(c.Params.ByName("prefix"))
|
param := slashEscape(c.Params.ByName("prefix"))
|
||||||
storage, prefix := deb.ParsePrefix(param)
|
storage, prefix := deb.ParsePrefix(param)
|
||||||
distribution := slashEscape(c.Params.ByName("distribution"))
|
distribution := slashEscape(c.Params.ByName("distribution"))
|
||||||
urlComponent := slashEscape(c.Params.ByName("component"))
|
component := slashEscape(c.Params.ByName("component"))
|
||||||
|
|
||||||
// Default component to the URL path segment; the body may rename it.
|
|
||||||
b.Component = urlComponent
|
|
||||||
if c.Bind(&b) != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.PublishedRepoCollection()
|
collection := collectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
// Load shallowly for 404 check, resource key, and task name.
|
|
||||||
// Full load and mutation happen inside the task.
|
|
||||||
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
|
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = collection.LoadComplete(published, collectionFactory)
|
||||||
|
if err != nil {
|
||||||
|
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
revision := published.ObtainRevision()
|
||||||
|
sources := revision.Sources
|
||||||
|
|
||||||
|
_, exists := sources[component]
|
||||||
|
if !exists {
|
||||||
|
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: Component '%s' does not exist", component))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
b.Component = component
|
||||||
|
b.Name = revision.Sources[component]
|
||||||
|
|
||||||
|
if c.Bind(&b) != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.Component != component {
|
||||||
|
delete(sources, component)
|
||||||
|
}
|
||||||
|
|
||||||
|
component = b.Component
|
||||||
|
name := b.Name
|
||||||
|
sources[component] = name
|
||||||
|
|
||||||
resources := []string{string(published.Key())}
|
resources := []string{string(published.Key())}
|
||||||
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
err = collection.Update(published)
|
||||||
taskCollection := taskCollectionFactory.PublishedRepoCollection()
|
|
||||||
|
|
||||||
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = taskCollection.LoadComplete(published, taskCollectionFactory)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
revision := published.ObtainRevision()
|
|
||||||
sources := revision.Sources
|
|
||||||
|
|
||||||
_, exists := sources[urlComponent]
|
|
||||||
if !exists {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: Component '%s' does not exist", urlComponent)
|
|
||||||
}
|
|
||||||
|
|
||||||
if b.Component != urlComponent {
|
|
||||||
delete(sources, urlComponent)
|
|
||||||
}
|
|
||||||
|
|
||||||
newComponent := b.Component
|
|
||||||
name := b.Name
|
|
||||||
sources[newComponent] = name
|
|
||||||
|
|
||||||
err = taskCollection.Update(published)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
||||||
}
|
}
|
||||||
@@ -1021,41 +1006,33 @@ func apiPublishRemoveSource(c *gin.Context) {
|
|||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.PublishedRepoCollection()
|
collection := collectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
// Load shallowly for 404 check, resource key, and task name.
|
|
||||||
// Full load and mutation happen inside the task.
|
|
||||||
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
|
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = collection.LoadComplete(published, collectionFactory)
|
||||||
|
if err != nil {
|
||||||
|
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
revision := published.ObtainRevision()
|
||||||
|
sources := revision.Sources
|
||||||
|
|
||||||
|
_, exists := sources[component]
|
||||||
|
if !exists {
|
||||||
|
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: Component '%s' does not exist", component))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(sources, component)
|
||||||
|
|
||||||
resources := []string{string(published.Key())}
|
resources := []string{string(published.Key())}
|
||||||
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
err = collection.Update(published)
|
||||||
taskCollection := taskCollectionFactory.PublishedRepoCollection()
|
|
||||||
|
|
||||||
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = taskCollection.LoadComplete(published, taskCollectionFactory)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
revision := published.ObtainRevision()
|
|
||||||
sources := revision.Sources
|
|
||||||
|
|
||||||
_, exists := sources[component]
|
|
||||||
if !exists {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: Component '%s' does not exist", component)
|
|
||||||
}
|
|
||||||
|
|
||||||
delete(sources, component)
|
|
||||||
|
|
||||||
err = taskCollection.Update(published)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
||||||
}
|
}
|
||||||
@@ -1127,94 +1104,64 @@ func apiPublishUpdate(c *gin.Context) {
|
|||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.PublishedRepoCollection()
|
collection := collectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
// Load shallowly for 404 check, resource key, and task name.
|
|
||||||
// Full load and field mutations happen inside the task.
|
|
||||||
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
|
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
resources := []string{string(published.Key())}
|
err = collection.LoadComplete(published, collectionFactory)
|
||||||
|
if err != nil {
|
||||||
// Lock source repos / snapshots the same way apiPublishUpdateSwitch does,
|
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
|
||||||
// because published.Update() reads from them and concurrent modification
|
return
|
||||||
// would produce an inconsistent view.
|
|
||||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
|
||||||
localRepoCollection := collectionFactory.LocalRepoCollection()
|
|
||||||
|
|
||||||
if published.SourceKind == deb.SourceLocalRepo {
|
|
||||||
for _, uuid := range published.Sources {
|
|
||||||
repo, err2 := localRepoCollection.ByUUID(uuid)
|
|
||||||
if err2 != nil {
|
|
||||||
AbortWithJSONError(c, http.StatusNotFound, err2)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
resources = append(resources, string(repo.Key()))
|
|
||||||
}
|
|
||||||
} else if published.SourceKind == deb.SourceSnapshot {
|
|
||||||
for _, uuid := range published.Sources {
|
|
||||||
snapshot, err2 := snapshotCollection.ByUUID(uuid)
|
|
||||||
if err2 != nil {
|
|
||||||
AbortWithJSONError(c, http.StatusNotFound, err2)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
resources = append(resources, string(snapshot.ResourceKey()))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if b.SkipContents != nil {
|
||||||
|
published.SkipContents = *b.SkipContents
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.SkipBz2 != nil {
|
||||||
|
published.SkipBz2 = *b.SkipBz2
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.AcquireByHash != nil {
|
||||||
|
published.AcquireByHash = *b.AcquireByHash
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.SignedBy != nil {
|
||||||
|
published.SignedBy = *b.SignedBy
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.MultiDist != nil {
|
||||||
|
published.MultiDist = *b.MultiDist
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.Label != nil {
|
||||||
|
published.Label = *b.Label
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.Origin != nil {
|
||||||
|
published.Origin = *b.Origin
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.Version != nil {
|
||||||
|
published.Version = *b.Version
|
||||||
|
}
|
||||||
|
|
||||||
|
resources := []string{string(published.Key())}
|
||||||
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
result, err := published.Update(collectionFactory, out)
|
||||||
taskCollection := taskCollectionFactory.PublishedRepoCollection()
|
|
||||||
|
|
||||||
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = taskCollection.LoadComplete(published, taskCollectionFactory)
|
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply field mutations on the freshly loaded object.
|
err = collection.Update(published)
|
||||||
if b.SkipContents != nil {
|
|
||||||
published.SkipContents = *b.SkipContents
|
|
||||||
}
|
|
||||||
if b.SkipBz2 != nil {
|
|
||||||
published.SkipBz2 = *b.SkipBz2
|
|
||||||
}
|
|
||||||
if b.AcquireByHash != nil {
|
|
||||||
published.AcquireByHash = *b.AcquireByHash
|
|
||||||
}
|
|
||||||
if b.SignedBy != nil {
|
|
||||||
published.SignedBy = *b.SignedBy
|
|
||||||
}
|
|
||||||
if b.MultiDist != nil {
|
|
||||||
published.MultiDist = *b.MultiDist
|
|
||||||
}
|
|
||||||
if b.Label != nil {
|
|
||||||
published.Label = *b.Label
|
|
||||||
}
|
|
||||||
if b.Origin != nil {
|
|
||||||
published.Origin = *b.Origin
|
|
||||||
}
|
|
||||||
if b.Version != nil {
|
|
||||||
published.Version = *b.Version
|
|
||||||
}
|
|
||||||
|
|
||||||
result, err := published.Update(taskCollectionFactory, out)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = taskCollection.Update(published)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
||||||
}
|
}
|
||||||
@@ -1222,7 +1169,7 @@ func apiPublishUpdate(c *gin.Context) {
|
|||||||
if b.SkipCleanup == nil || !*b.SkipCleanup {
|
if b.SkipCleanup == nil || !*b.SkipCleanup {
|
||||||
cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources))
|
cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources))
|
||||||
cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...)
|
cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...)
|
||||||
err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out)
|
err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
}
|
}
|
||||||
|
|||||||
+74
-171
@@ -102,7 +102,7 @@ type repoCreateParams struct {
|
|||||||
DefaultDistribution string ` json:"DefaultDistribution" example:"stable"`
|
DefaultDistribution string ` json:"DefaultDistribution" example:"stable"`
|
||||||
// Default component when publishing from this local repo
|
// Default component when publishing from this local repo
|
||||||
DefaultComponent string ` json:"DefaultComponent" example:"main"`
|
DefaultComponent string ` json:"DefaultComponent" example:"main"`
|
||||||
// Snapshot name to create repository from (optional)
|
// Snapshot name to create repoitory from (optional)
|
||||||
FromSnapshot string ` json:"FromSnapshot" example:""`
|
FromSnapshot string ` json:"FromSnapshot" example:""`
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -131,69 +131,46 @@ func apiReposCreate(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handler: Pre-task validations (shallow)
|
repo := deb.NewLocalRepo(b.Name, b.Comment)
|
||||||
|
repo.DefaultComponent = b.DefaultComponent
|
||||||
|
repo.DefaultDistribution = b.DefaultDistribution
|
||||||
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
|
|
||||||
if b.FromSnapshot != "" {
|
if b.FromSnapshot != "" {
|
||||||
|
var snapshot *deb.Snapshot
|
||||||
|
|
||||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||||
|
|
||||||
_, err := snapshotCollection.ByName(b.FromSnapshot)
|
snapshot, err := snapshotCollection.ByName(b.FromSnapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("source snapshot not found: %s", err))
|
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("source snapshot not found: %s", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Just verify it exists - don't load here
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use generated key resource for repo being created
|
err = snapshotCollection.LoadComplete(snapshot)
|
||||||
resources := []string{"LocalRepo:" + b.Name}
|
|
||||||
if b.FromSnapshot != "" {
|
|
||||||
resources = append(resources, "Snapshot:"+b.FromSnapshot)
|
|
||||||
}
|
|
||||||
|
|
||||||
taskName := fmt.Sprintf("Create repository %s", b.Name)
|
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
|
||||||
// Task: Create fresh collection and check/create ATOMIC inside task
|
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
|
||||||
taskCollection := taskCollectionFactory.LocalRepoCollection()
|
|
||||||
|
|
||||||
// Check duplicate inside lock
|
|
||||||
if _, err := taskCollection.ByName(b.Name); err == nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil},
|
|
||||||
fmt.Errorf("local repo with name %s already exists", b.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create repo
|
|
||||||
repo := deb.NewLocalRepo(b.Name, b.Comment)
|
|
||||||
repo.DefaultComponent = b.DefaultComponent
|
|
||||||
repo.DefaultDistribution = b.DefaultDistribution
|
|
||||||
|
|
||||||
if b.FromSnapshot != "" {
|
|
||||||
snapshotCollection := taskCollectionFactory.SnapshotCollection()
|
|
||||||
|
|
||||||
snapshot, err := snapshotCollection.ByName(b.FromSnapshot)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil},
|
|
||||||
fmt.Errorf("source snapshot not found: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = snapshotCollection.LoadComplete(snapshot)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil},
|
|
||||||
fmt.Errorf("unable to load source snapshot: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
repo.UpdateRefList(snapshot.RefList())
|
|
||||||
}
|
|
||||||
|
|
||||||
err := taskCollection.Add(repo)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to load source snapshot: %s", err))
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusCreated, Value: repo}, nil
|
repo.UpdateRefList(snapshot.RefList())
|
||||||
})
|
}
|
||||||
|
|
||||||
|
localRepoCollection := collectionFactory.LocalRepoCollection()
|
||||||
|
|
||||||
|
if _, err := localRepoCollection.ByName(b.Name); err == nil {
|
||||||
|
AbortWithJSONError(c, http.StatusConflict, fmt.Errorf("local repo with name %s already exists", b.Name))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err := localRepoCollection.Add(repo)
|
||||||
|
if err != nil {
|
||||||
|
AbortWithJSONError(c, http.StatusInternalServerError, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.JSON(http.StatusCreated, repo)
|
||||||
}
|
}
|
||||||
|
|
||||||
type reposEditParams struct {
|
type reposEditParams struct {
|
||||||
@@ -203,7 +180,7 @@ type reposEditParams struct {
|
|||||||
Comment *string ` json:"Comment" example:"example repo"`
|
Comment *string ` json:"Comment" example:"example repo"`
|
||||||
// Change Default Distribution for publishing
|
// Change Default Distribution for publishing
|
||||||
DefaultDistribution *string ` json:"DefaultDistribution" example:""`
|
DefaultDistribution *string ` json:"DefaultDistribution" example:""`
|
||||||
// Change Default Component for publishing
|
// Change Devault Component for publishing
|
||||||
DefaultComponent *string ` json:"DefaultComponent" example:""`
|
DefaultComponent *string ` json:"DefaultComponent" example:""`
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -224,8 +201,6 @@ func apiReposEdit(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load shallowly for 404 check and resource key.
|
|
||||||
// Mutation and duplicate check happen inside the task for atomicity.
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.LocalRepoCollection()
|
collection := collectionFactory.LocalRepoCollection()
|
||||||
|
|
||||||
@@ -236,47 +211,32 @@ func apiReposEdit(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
resources := []string{string(repo.Key())}
|
if b.Name != nil && *b.Name != name {
|
||||||
taskName := fmt.Sprintf("Edit repository %s", name)
|
_, err := collection.ByName(*b.Name)
|
||||||
|
if err == nil {
|
||||||
|
// already exists
|
||||||
|
AbortWithJSONError(c, 404, fmt.Errorf("local repo with name %q already exists", *b.Name))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
repo.Name = *b.Name
|
||||||
|
}
|
||||||
|
if b.Comment != nil {
|
||||||
|
repo.Comment = *b.Comment
|
||||||
|
}
|
||||||
|
if b.DefaultDistribution != nil {
|
||||||
|
repo.DefaultDistribution = *b.DefaultDistribution
|
||||||
|
}
|
||||||
|
if b.DefaultComponent != nil {
|
||||||
|
repo.DefaultComponent = *b.DefaultComponent
|
||||||
|
}
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
err = collection.Update(repo)
|
||||||
// Task: Create fresh collection inside task after lock
|
if err != nil {
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
AbortWithJSONError(c, 500, err)
|
||||||
taskCollection := taskCollectionFactory.LocalRepoCollection()
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Fresh load after lock acquired
|
c.JSON(200, repo)
|
||||||
repo, err := taskCollection.ByName(name)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check and update ATOMIC (inside lock)
|
|
||||||
if b.Name != nil && *b.Name != name {
|
|
||||||
_, err := taskCollection.ByName(*b.Name)
|
|
||||||
if err == nil {
|
|
||||||
// already exists
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil},
|
|
||||||
fmt.Errorf("local repo with name %q already exists", *b.Name)
|
|
||||||
}
|
|
||||||
repo.Name = *b.Name
|
|
||||||
}
|
|
||||||
if b.Comment != nil {
|
|
||||||
repo.Comment = *b.Comment
|
|
||||||
}
|
|
||||||
if b.DefaultDistribution != nil {
|
|
||||||
repo.DefaultDistribution = *b.DefaultDistribution
|
|
||||||
}
|
|
||||||
if b.DefaultComponent != nil {
|
|
||||||
repo.DefaultComponent = *b.DefaultComponent
|
|
||||||
}
|
|
||||||
|
|
||||||
err = taskCollection.Update(repo)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusOK, Value: repo}, nil
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GET /api/repos/:name
|
// GET /api/repos/:name
|
||||||
@@ -318,10 +278,10 @@ func apiReposDrop(c *gin.Context) {
|
|||||||
force := c.Request.URL.Query().Get("force") == "1"
|
force := c.Request.URL.Query().Get("force") == "1"
|
||||||
name := c.Params.ByName("name")
|
name := c.Params.ByName("name")
|
||||||
|
|
||||||
// Load shallowly for 404 check, resource key, and task name.
|
|
||||||
// Full checks (published/snapshots) happen inside the task.
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.LocalRepoCollection()
|
collection := collectionFactory.LocalRepoCollection()
|
||||||
|
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||||
|
publishedCollection := collectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
repo, err := collection.ByName(name)
|
repo, err := collection.ByName(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -332,32 +292,19 @@ func apiReposDrop(c *gin.Context) {
|
|||||||
resources := []string{string(repo.Key())}
|
resources := []string{string(repo.Key())}
|
||||||
taskName := fmt.Sprintf("Delete repo %s", name)
|
taskName := fmt.Sprintf("Delete repo %s", name)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
// Task: Create fresh collections inside task after lock acquired
|
published := publishedCollection.ByLocalRepo(repo)
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
|
||||||
taskCollection := taskCollectionFactory.LocalRepoCollection()
|
|
||||||
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
|
||||||
taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection()
|
|
||||||
|
|
||||||
// Re-read repo with fresh collection after lock
|
|
||||||
repo, err := taskCollection.ByName(name)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check with fresh collections
|
|
||||||
published := taskPublishedCollection.ByLocalRepo(repo)
|
|
||||||
if len(published) > 0 {
|
if len(published) > 0 {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo is published")
|
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo is published")
|
||||||
}
|
}
|
||||||
|
|
||||||
if !force {
|
if !force {
|
||||||
snapshots := taskSnapshotCollection.ByLocalRepoSource(repo)
|
snapshots := snapshotCollection.ByLocalRepoSource(repo)
|
||||||
if len(snapshots) > 0 {
|
if len(snapshots) > 0 {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override")
|
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, taskCollection.Drop(repo)
|
return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, collection.Drop(repo)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -414,13 +361,10 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load shallowly for 404 check and resource key.
|
|
||||||
// Full load and mutations happen inside the task.
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.LocalRepoCollection()
|
collection := collectionFactory.LocalRepoCollection()
|
||||||
|
|
||||||
name := c.Params.ByName("name")
|
repo, err := collection.ByName(c.Params.ByName("name"))
|
||||||
repo, err := collection.ByName(name)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, 404, err)
|
AbortWithJSONError(c, 404, err)
|
||||||
return
|
return
|
||||||
@@ -429,23 +373,13 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
|
|||||||
resources := []string{string(repo.Key())}
|
resources := []string{string(repo.Key())}
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
// Task: Create fresh factory and collection inside task after lock
|
err = collection.LoadComplete(repo)
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
|
||||||
taskCollection := taskCollectionFactory.LocalRepoCollection()
|
|
||||||
|
|
||||||
// Fresh load after lock acquired (use captured `name` variable, not gin context)
|
|
||||||
repo, err := taskCollection.ByName(name)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = taskCollection.LoadComplete(repo)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
out.Printf("Loading packages...\n")
|
out.Printf("Loading packages...\n")
|
||||||
list, err := deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil)
|
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -454,7 +388,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
|
|||||||
for _, ref := range b.PackageRefs {
|
for _, ref := range b.PackageRefs {
|
||||||
var p *deb.Package
|
var p *deb.Package
|
||||||
|
|
||||||
p, err = taskCollectionFactory.PackageCollection().ByKey([]byte(ref))
|
p, err = collectionFactory.PackageCollection().ByKey([]byte(ref))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == database.ErrNotFound {
|
if err == database.ErrNotFound {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("packages %s: %s", ref, err)
|
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("packages %s: %s", ref, err)
|
||||||
@@ -470,7 +404,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
|
|||||||
|
|
||||||
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
|
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
|
||||||
|
|
||||||
err = taskCollection.Update(repo)
|
err = collectionFactory.LocalRepoCollection().Update(repo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
|
||||||
}
|
}
|
||||||
@@ -577,8 +511,6 @@ func apiReposPackageFromDir(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load shallowly for 404 check and resource key.
|
|
||||||
// Full load and mutations happen inside the task.
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.LocalRepoCollection()
|
collection := collectionFactory.LocalRepoCollection()
|
||||||
|
|
||||||
@@ -602,17 +534,7 @@ func apiReposPackageFromDir(c *gin.Context) {
|
|||||||
resources := []string{string(repo.Key())}
|
resources := []string{string(repo.Key())}
|
||||||
resources = append(resources, sources...)
|
resources = append(resources, sources...)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
// Task: Create fresh factory and collection inside task after lock
|
err = collection.LoadComplete(repo)
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
|
||||||
taskCollection := taskCollectionFactory.LocalRepoCollection()
|
|
||||||
|
|
||||||
// Fresh load after lock acquired
|
|
||||||
repo, err := taskCollection.ByName(name)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = taskCollection.LoadComplete(repo)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -633,13 +555,13 @@ func apiReposPackageFromDir(c *gin.Context) {
|
|||||||
|
|
||||||
packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter)
|
packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter)
|
||||||
|
|
||||||
list, err = deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil)
|
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
|
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
|
||||||
taskCollectionFactory.PackageCollection(), reporter, nil, taskCollectionFactory.ChecksumCollection)
|
collectionFactory.PackageCollection(), reporter, nil, collectionFactory.ChecksumCollection)
|
||||||
failedFiles = append(failedFiles, failedFiles2...)
|
failedFiles = append(failedFiles, failedFiles2...)
|
||||||
processedFiles = append(processedFiles, otherFiles...)
|
processedFiles = append(processedFiles, otherFiles...)
|
||||||
|
|
||||||
@@ -649,7 +571,7 @@ func apiReposPackageFromDir(c *gin.Context) {
|
|||||||
|
|
||||||
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
|
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
|
||||||
|
|
||||||
err = taskCollection.Update(repo)
|
err = collectionFactory.LocalRepoCollection().Update(repo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
|
||||||
}
|
}
|
||||||
@@ -728,8 +650,6 @@ func apiReposCopyPackage(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load shallowly for 404 check and resource keys.
|
|
||||||
// Full load and mutations happen inside the task.
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
dstRepo, err := collectionFactory.LocalRepoCollection().ByName(dstRepoName)
|
dstRepo, err := collectionFactory.LocalRepoCollection().ByName(dstRepoName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -753,26 +673,12 @@ func apiReposCopyPackage(c *gin.Context) {
|
|||||||
resources := []string{string(dstRepo.Key()), string(srcRepo.Key())}
|
resources := []string{string(dstRepo.Key()), string(srcRepo.Key())}
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
// Task: Create fresh factory and collections inside task after lock
|
err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
|
||||||
|
|
||||||
// Fresh load of both repos after lock acquired
|
|
||||||
dstRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(dstRepoName)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
srcRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(srcRepoName)
|
err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = taskCollectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = taskCollectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
|
||||||
}
|
}
|
||||||
@@ -785,12 +691,12 @@ func apiReposCopyPackage(c *gin.Context) {
|
|||||||
RemovedLines: []string{},
|
RemovedLines: []string{},
|
||||||
}
|
}
|
||||||
|
|
||||||
dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
|
dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), collectionFactory.PackageCollection(), context.Progress())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in dest: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in dest: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
srcList, err := deb.NewPackageListFromRefList(srcRefList, taskCollectionFactory.PackageCollection(), context.Progress())
|
srcList, err := deb.NewPackageListFromRefList(srcRefList, collectionFactory.PackageCollection(), context.Progress())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in src: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in src: %s", err)
|
||||||
}
|
}
|
||||||
@@ -858,7 +764,7 @@ func apiReposCopyPackage(c *gin.Context) {
|
|||||||
} else {
|
} else {
|
||||||
dstRepo.UpdateRefList(deb.NewPackageRefListFromPackageList(dstList))
|
dstRepo.UpdateRefList(deb.NewPackageRefListFromPackageList(dstList))
|
||||||
|
|
||||||
err = taskCollectionFactory.LocalRepoCollection().Update(dstRepo)
|
err = collectionFactory.LocalRepoCollection().Update(dstRepo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
|
||||||
}
|
}
|
||||||
@@ -961,9 +867,6 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
|
|||||||
resources = append(resources, sources...)
|
resources = append(resources, sources...)
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
// Task: Create fresh factory and collection inside task after lock
|
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
verifier = context.GetVerifier()
|
verifier = context.GetVerifier()
|
||||||
@@ -979,8 +882,8 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
|
|||||||
changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter)
|
changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter)
|
||||||
_, failedFiles2, err = deb.ImportChangesFiles(
|
_, failedFiles2, err = deb.ImportChangesFiles(
|
||||||
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
|
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
|
||||||
repoTemplate, context.Progress(), taskCollectionFactory.LocalRepoCollection(), taskCollectionFactory.PackageCollection(),
|
repoTemplate, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(),
|
||||||
context.PackagePool(), taskCollectionFactory.ChecksumCollection, nil, query.Parse)
|
context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse)
|
||||||
failedFiles = append(failedFiles, failedFiles2...)
|
failedFiles = append(failedFiles, failedFiles2...)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
+35
-114
@@ -165,7 +165,6 @@ func apiSnapshotsCreate(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Phase 1: Pre-task validation (shallow load for 404 checks only)
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||||
var resources []string
|
var resources []string
|
||||||
@@ -182,28 +181,9 @@ func apiSnapshotsCreate(c *gin.Context) {
|
|||||||
resources = append(resources, string(sources[i].ResourceKey()))
|
resources = append(resources, string(sources[i].ResourceKey()))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pre-task check for destination snapshot name
|
|
||||||
_, err = snapshotCollection.ByName(b.Name)
|
|
||||||
if err == nil {
|
|
||||||
AbortWithJSONError(c, 409, fmt.Errorf("unable to create: snapshot %s already exists", b.Name))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
// Phase 2: Inside task lock - create fresh factory
|
for i := range sources {
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
err = snapshotCollection.LoadComplete(sources[i])
|
||||||
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
|
||||||
taskPackageCollection := taskCollectionFactory.PackageCollection()
|
|
||||||
|
|
||||||
// Fresh load of all sources after lock acquired
|
|
||||||
freshSources := make([]*deb.Snapshot, len(b.SourceSnapshots))
|
|
||||||
for i := range b.SourceSnapshots {
|
|
||||||
freshSources[i], err = taskSnapshotCollection.ByName(b.SourceSnapshots[i])
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
|
||||||
}
|
|
||||||
// LoadComplete on fresh copy
|
|
||||||
err = taskSnapshotCollection.LoadComplete(freshSources[i])
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -211,9 +191,9 @@ func apiSnapshotsCreate(c *gin.Context) {
|
|||||||
|
|
||||||
list := deb.NewPackageList()
|
list := deb.NewPackageList()
|
||||||
|
|
||||||
// verify package refs and build package list using fresh factory
|
// verify package refs and build package list
|
||||||
for _, ref := range b.PackageRefs {
|
for _, ref := range b.PackageRefs {
|
||||||
p, err := taskPackageCollection.ByKey([]byte(ref))
|
p, err := collectionFactory.PackageCollection().ByKey([]byte(ref))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == database.ErrNotFound {
|
if err == database.ErrNotFound {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err)
|
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err)
|
||||||
@@ -226,9 +206,9 @@ func apiSnapshotsCreate(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
snapshot = deb.NewSnapshotFromRefList(b.Name, freshSources, deb.NewPackageRefListFromPackageList(list), b.Description)
|
snapshot = deb.NewSnapshotFromRefList(b.Name, sources, deb.NewPackageRefListFromPackageList(list), b.Description)
|
||||||
|
|
||||||
err = taskSnapshotCollection.Add(snapshot)
|
err = snapshotCollection.Add(snapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -335,7 +315,6 @@ func apiSnapshotsUpdate(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Phase 1: Pre-task validation (shallow load for 404 check only)
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.SnapshotCollection()
|
collection := collectionFactory.SnapshotCollection()
|
||||||
name := c.Params.ByName("name")
|
name := c.Params.ByName("name")
|
||||||
@@ -346,38 +325,14 @@ func apiSnapshotsUpdate(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pre-task validation of new name if provided (skip if renaming to same name)
|
|
||||||
if b.Name != "" && b.Name != name {
|
|
||||||
_, err = collection.ByName(b.Name)
|
|
||||||
if err == nil {
|
|
||||||
AbortWithJSONError(c, 409, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
resources := []string{string(snapshot.ResourceKey()), "S" + b.Name}
|
resources := []string{string(snapshot.ResourceKey()), "S" + b.Name}
|
||||||
taskName := fmt.Sprintf("Update snapshot %s", name)
|
taskName := fmt.Sprintf("Update snapshot %s", name)
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
// Phase 2: Inside task lock - create fresh factory
|
_, err := collection.ByName(b.Name)
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
if err == nil {
|
||||||
taskCollection := taskCollectionFactory.SnapshotCollection()
|
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
|
||||||
|
|
||||||
// Fresh load after lock acquired
|
|
||||||
snapshot, err = taskCollection.ByName(name)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fresh duplicate check inside lock
|
|
||||||
if b.Name != "" {
|
|
||||||
_, err := taskCollection.ByName(b.Name)
|
|
||||||
if err == nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update fresh copy
|
|
||||||
if b.Name != "" {
|
if b.Name != "" {
|
||||||
snapshot.Name = b.Name
|
snapshot.Name = b.Name
|
||||||
}
|
}
|
||||||
@@ -386,7 +341,7 @@ func apiSnapshotsUpdate(c *gin.Context) {
|
|||||||
snapshot.Description = b.Description
|
snapshot.Description = b.Description
|
||||||
}
|
}
|
||||||
|
|
||||||
err = taskCollection.Update(snapshot)
|
err = collectionFactory.SnapshotCollection().Update(snapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -440,9 +395,9 @@ func apiSnapshotsDrop(c *gin.Context) {
|
|||||||
name := c.Params.ByName("name")
|
name := c.Params.ByName("name")
|
||||||
force := c.Request.URL.Query().Get("force") == "1"
|
force := c.Request.URL.Query().Get("force") == "1"
|
||||||
|
|
||||||
// Phase 1: Pre-task validation (shallow load for 404 check only)
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||||
|
publishedCollection := collectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
snapshot, err := snapshotCollection.ByName(name)
|
snapshot, err := snapshotCollection.ByName(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -452,35 +407,21 @@ func apiSnapshotsDrop(c *gin.Context) {
|
|||||||
|
|
||||||
resources := []string{string(snapshot.ResourceKey())}
|
resources := []string{string(snapshot.ResourceKey())}
|
||||||
taskName := fmt.Sprintf("Delete snapshot %s", name)
|
taskName := fmt.Sprintf("Delete snapshot %s", name)
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
// Phase 2: Inside task lock - create fresh collections
|
published := publishedCollection.BySnapshot(snapshot)
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
|
||||||
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
|
||||||
taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection()
|
|
||||||
|
|
||||||
// Fresh load after lock acquired
|
|
||||||
snapshot, err := taskSnapshotCollection.ByName(name)
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fresh checks with current collections
|
|
||||||
published := taskPublishedCollection.BySnapshot(snapshot)
|
|
||||||
|
|
||||||
if len(published) > 0 {
|
if len(published) > 0 {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: snapshot is published")
|
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: snapshot is published")
|
||||||
}
|
}
|
||||||
|
|
||||||
if !force {
|
if !force {
|
||||||
// Using fresh collection for dependency check
|
snapshots := snapshotCollection.BySnapshotSource(snapshot)
|
||||||
snapshots := taskSnapshotCollection.BySnapshotSource(snapshot)
|
|
||||||
if len(snapshots) > 0 {
|
if len(snapshots) > 0 {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override")
|
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = taskSnapshotCollection.Drop(snapshot)
|
err = snapshotCollection.Drop(snapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -635,7 +576,6 @@ func apiSnapshotsMerge(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Phase 1: Pre-task validation (shallow load for 404 checks only)
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||||
|
|
||||||
@@ -652,43 +592,32 @@ func apiSnapshotsMerge(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
// Phase 2: Inside task lock - create fresh factory
|
err = snapshotCollection.LoadComplete(sources[0])
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
if err != nil {
|
||||||
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
|
|
||||||
// Fresh load of all sources inside task
|
|
||||||
freshSources := make([]*deb.Snapshot, len(body.Sources))
|
|
||||||
for i := range body.Sources {
|
|
||||||
freshSources[i], err = taskSnapshotCollection.ByName(body.Sources[i])
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
|
||||||
}
|
|
||||||
// LoadComplete on fresh copy
|
|
||||||
err = taskSnapshotCollection.LoadComplete(freshSources[i])
|
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
result := sources[0].RefList()
|
||||||
// Merge using fresh sources
|
for i := 1; i < len(sources); i++ {
|
||||||
result := freshSources[0].RefList()
|
err = snapshotCollection.LoadComplete(sources[i])
|
||||||
for i := 1; i < len(freshSources); i++ {
|
if err != nil {
|
||||||
result = result.Merge(freshSources[i].RefList(), overrideMatching, false)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
|
}
|
||||||
|
result = result.Merge(sources[i].RefList(), overrideMatching, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
if latest {
|
if latest {
|
||||||
result.FilterLatestRefs()
|
result.FilterLatestRefs()
|
||||||
}
|
}
|
||||||
|
|
||||||
sourceDescription := make([]string, len(freshSources))
|
sourceDescription := make([]string, len(sources))
|
||||||
for i, s := range freshSources {
|
for i, s := range sources {
|
||||||
sourceDescription[i] = fmt.Sprintf("'%s'", s.Name)
|
sourceDescription[i] = fmt.Sprintf("'%s'", s.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
snapshot = deb.NewSnapshotFromRefList(name, freshSources, result,
|
snapshot = deb.NewSnapshotFromRefList(name, sources, result,
|
||||||
fmt.Sprintf("Merged from sources: %s", strings.Join(sourceDescription, ", ")))
|
fmt.Sprintf("Merged from sources: %s", strings.Join(sourceDescription, ", ")))
|
||||||
|
|
||||||
err = taskCollectionFactory.SnapshotCollection().Add(snapshot)
|
err = collectionFactory.SnapshotCollection().Add(snapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create snapshot: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create snapshot: %s", err)
|
||||||
}
|
}
|
||||||
@@ -772,29 +701,21 @@ func apiSnapshotsPull(c *gin.Context) {
|
|||||||
resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())}
|
resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())}
|
||||||
taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination)
|
taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
// Phase 2: Inside task lock - create fresh factory
|
err = collectionFactory.SnapshotCollection().LoadComplete(toSnapshot)
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
|
||||||
|
|
||||||
// Fresh load of snapshots after lock acquired
|
|
||||||
freshToSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(name)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
freshSourceSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(body.Source)
|
err = collectionFactory.SnapshotCollection().LoadComplete(sourceSnapshot)
|
||||||
if err != nil {
|
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
|
||||||
}
|
|
||||||
err = taskCollectionFactory.SnapshotCollection().LoadComplete(freshSourceSnapshot)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// convert snapshots to package list
|
// convert snapshots to package list
|
||||||
toPackageList, err := deb.NewPackageListFromRefList(freshToSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
|
toPackageList, err := deb.NewPackageListFromRefList(toSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
sourcePackageList, err := deb.NewPackageListFromRefList(freshSourceSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
|
sourcePackageList, err := deb.NewPackageListFromRefList(sourceSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
@@ -891,10 +812,10 @@ func apiSnapshotsPull(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create <destination> snapshot
|
// Create <destination> snapshot
|
||||||
destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{freshToSnapshot, freshSourceSnapshot}, toPackageList,
|
destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{toSnapshot, sourceSnapshot}, toPackageList,
|
||||||
fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", freshToSnapshot.Name, freshSourceSnapshot.Name, strings.Join(body.Queries, ", ")))
|
fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", toSnapshot.Name, sourceSnapshot.Name, strings.Join(body.Queries, ", ")))
|
||||||
|
|
||||||
err = taskCollectionFactory.SnapshotCollection().Add(destinationSnapshot)
|
err = collectionFactory.SnapshotCollection().Add(destinationSnapshot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||||
}
|
}
|
||||||
|
|||||||
+1
-2
@@ -168,8 +168,6 @@ func (collection *LocalRepoCollection) Update(repo *LocalRepo) error {
|
|||||||
|
|
||||||
// LoadComplete loads additional information for local repo
|
// LoadComplete loads additional information for local repo
|
||||||
func (collection *LocalRepoCollection) LoadComplete(repo *LocalRepo) error {
|
func (collection *LocalRepoCollection) LoadComplete(repo *LocalRepo) error {
|
||||||
repo.packageRefs = &PackageRefList{}
|
|
||||||
|
|
||||||
encoded, err := collection.db.Get(repo.RefKey())
|
encoded, err := collection.db.Get(repo.RefKey())
|
||||||
if err == database.ErrNotFound {
|
if err == database.ErrNotFound {
|
||||||
return nil
|
return nil
|
||||||
@@ -178,6 +176,7 @@ func (collection *LocalRepoCollection) LoadComplete(repo *LocalRepo) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
repo.packageRefs = &PackageRefList{}
|
||||||
return repo.packageRefs.Decode(encoded)
|
return repo.packageRefs.Decode(encoded)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -133,18 +133,6 @@ func (s *LocalRepoCollectionSuite) TestByUUID(c *C) {
|
|||||||
c.Assert(r.String(), Equals, repo.String())
|
c.Assert(r.String(), Equals, repo.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *LocalRepoCollectionSuite) TestLoadCompleteNoRefKey(c *C) {
|
|
||||||
repo := NewLocalRepo("local1", "Comment 1")
|
|
||||||
c.Assert(s.collection.Update(repo), IsNil)
|
|
||||||
|
|
||||||
r, err := s.collection.ByName("local1")
|
|
||||||
c.Assert(err, IsNil)
|
|
||||||
|
|
||||||
c.Assert(s.collection.LoadComplete(r), IsNil)
|
|
||||||
c.Assert(r.packageRefs, NotNil)
|
|
||||||
c.Assert(r.NumPackages(), Equals, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *LocalRepoCollectionSuite) TestUpdateLoadComplete(c *C) {
|
func (s *LocalRepoCollectionSuite) TestUpdateLoadComplete(c *C) {
|
||||||
repo := NewLocalRepo("local1", "Comment 1")
|
repo := NewLocalRepo("local1", "Comment 1")
|
||||||
c.Assert(s.collection.Update(repo), IsNil)
|
c.Assert(s.collection.Update(repo), IsNil)
|
||||||
|
|||||||
+6
-1
@@ -609,7 +609,12 @@ func (p *PublishedRepo) StoragePrefix() string {
|
|||||||
|
|
||||||
// Key returns unique key identifying PublishedRepo
|
// Key returns unique key identifying PublishedRepo
|
||||||
func (p *PublishedRepo) Key() []byte {
|
func (p *PublishedRepo) Key() []byte {
|
||||||
return []byte("U" + p.StoragePrefix() + ">>" + p.Distribution)
|
if p.MultiDist {
|
||||||
|
// do not lock Distribution in MultiDist
|
||||||
|
return []byte("UM" + p.StoragePrefix())
|
||||||
|
} else {
|
||||||
|
return []byte("U" + p.StoragePrefix() + ">>" + p.Distribution)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RefKey is a unique id for package reference list
|
// RefKey is a unique id for package reference list
|
||||||
|
|||||||
@@ -79,9 +79,6 @@ func (l *PackageRefList) Decode(input []byte) error {
|
|||||||
|
|
||||||
// ForEach calls handler for each package ref in list
|
// ForEach calls handler for each package ref in list
|
||||||
func (l *PackageRefList) ForEach(handler func([]byte) error) error {
|
func (l *PackageRefList) ForEach(handler func([]byte) error) error {
|
||||||
if l == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
var err error
|
var err error
|
||||||
for _, p := range l.Refs {
|
for _, p := range l.Refs {
|
||||||
err = handler(p)
|
err = handler(p)
|
||||||
|
|||||||
@@ -130,17 +130,6 @@ func (s *PackageRefListSuite) TestPackageRefListForeach(c *C) {
|
|||||||
c.Check(err, Equals, e)
|
c.Check(err, Equals, e)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PackageRefListSuite) TestForEachNilList(c *C) {
|
|
||||||
var l *PackageRefList
|
|
||||||
called := false
|
|
||||||
err := l.ForEach(func([]byte) error {
|
|
||||||
called = true
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
c.Assert(err, IsNil)
|
|
||||||
c.Assert(called, Equals, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *PackageRefListSuite) TestHas(c *C) {
|
func (s *PackageRefListSuite) TestHas(c *C) {
|
||||||
_ = s.list.Add(s.p1)
|
_ = s.list.Add(s.p1)
|
||||||
_ = s.list.Add(s.p3)
|
_ = s.list.Add(s.p3)
|
||||||
|
|||||||
+1
-1
@@ -2,7 +2,7 @@
|
|||||||
<div>
|
<div>
|
||||||
|
|
||||||
In order to add debian package files to a local repository, files are first uploaded to a temporary directory.
|
In order to add debian package files to a local repository, files are first uploaded to a temporary directory.
|
||||||
Then the directory (or a specific file within) is added to a repository. After adding to a repository, the directory resp. files are removed bt default.
|
Then the directory (or a specific file within) is added to a repository. After adding to a repositorty, the directory resp. files are removed bt default.
|
||||||
|
|
||||||
All uploaded files are stored under `<rootDir>/upload/<tempdir>` directory.
|
All uploaded files are stored under `<rootDir>/upload/<tempdir>` directory.
|
||||||
|
|
||||||
|
|||||||
+1
-1
@@ -1,5 +1,5 @@
|
|||||||
# Search Package Collection
|
# Search Package Collection
|
||||||
<div>
|
<div>
|
||||||
Perform operations on the whole collection of packages in aptly database.
|
Perform operations on the whole collection of packages in apty database.
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
|||||||
+1
-1
@@ -35,6 +35,6 @@ aptly publish repo my-repo --gpg-key=KEY_ID_a --gpg-key=KEY_ID_b
|
|||||||
#### Parameters
|
#### Parameters
|
||||||
|
|
||||||
Publish APIs use following convention to identify published repositories: `/api/publish/:prefix/:distribution`. `:distribution` is distribution name, while `:prefix` is `[<storage>:]<prefix>` (storage is optional, it defaults to empty string), if publishing prefix contains slashes `/`, they should be replaced with underscores (`_`) and underscores
|
Publish APIs use following convention to identify published repositories: `/api/publish/:prefix/:distribution`. `:distribution` is distribution name, while `:prefix` is `[<storage>:]<prefix>` (storage is optional, it defaults to empty string), if publishing prefix contains slashes `/`, they should be replaced with underscores (`_`) and underscores
|
||||||
should be replaced with double underscore (`__`). To specify root `:prefix`, use `:.`, as `.` is ambiguous in URLs.
|
should be replaced with double underscore (`__`). To specify root `:prefix`, use `:.`, as `.` is ambigious in URLs.
|
||||||
|
|
||||||
</div>
|
</div>
|
||||||
|
|||||||
+1
-1
@@ -1,6 +1,6 @@
|
|||||||
# Manage Local Repositories
|
# Manage Local Repositories
|
||||||
<div>
|
<div>
|
||||||
A local repository is a collection of versioned packages (usually custom packages created internally).
|
A local repository is a collection of versionned packages (usually custom packages created internally).
|
||||||
|
|
||||||
Packages can be added, removed, moved or copied between repos.
|
Packages can be added, removed, moved or copied between repos.
|
||||||
|
|
||||||
|
|||||||
@@ -992,6 +992,232 @@ class PublishSwitchAPITestRepo(APITest):
|
|||||||
self.check_not_exists("public/" + prefix + "dists/")
|
self.check_not_exists("public/" + prefix + "dists/")
|
||||||
|
|
||||||
|
|
||||||
|
class PublishSwitchAPITestMirror(APITest):
|
||||||
|
"""
|
||||||
|
PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution
|
||||||
|
"""
|
||||||
|
fixtureGpg = True
|
||||||
|
|
||||||
|
def check(self):
|
||||||
|
mirror_name = self.random_name()
|
||||||
|
mirror_desc = {'Name': mirror_name,
|
||||||
|
'ArchiveURL': 'http://repo.aptly.info/system-tests/packagecloud.io/varnishcache/varnish30/debian/',
|
||||||
|
'Distribution': 'wheezy',
|
||||||
|
'Keyrings': ["aptlytest.gpg"],
|
||||||
|
'Architectures': ["amd64"],
|
||||||
|
'Components': ['main']}
|
||||||
|
mirror_desc['IgnoreSignatures'] = True
|
||||||
|
|
||||||
|
# Create Mirror
|
||||||
|
resp = self.post("/api/mirrors", json=mirror_desc)
|
||||||
|
self.check_equal(resp.status_code, 201)
|
||||||
|
|
||||||
|
# Get Mirror
|
||||||
|
resp = self.get("/api/mirrors/" + mirror_name + "/packages")
|
||||||
|
self.check_equal(resp.status_code, 404)
|
||||||
|
|
||||||
|
# Update Mirror
|
||||||
|
resp = self.put_task("/api/mirrors/" + mirror_name, json=mirror_desc)
|
||||||
|
self.check_task(resp)
|
||||||
|
|
||||||
|
# Snapshot Mirror
|
||||||
|
snapshot1_name = self.random_name()
|
||||||
|
task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot1_name})
|
||||||
|
self.check_task(task)
|
||||||
|
|
||||||
|
# Publish Snapshot
|
||||||
|
prefix = self.random_name()
|
||||||
|
task = self.post_task(
|
||||||
|
"/api/publish/" + prefix,
|
||||||
|
json={
|
||||||
|
"Architectures": ["i386", "source"],
|
||||||
|
"SourceKind": "snapshot",
|
||||||
|
"Sources": [{"Name": snapshot1_name}],
|
||||||
|
"Signing": DefaultSigningOptions,
|
||||||
|
})
|
||||||
|
self.check_task(task)
|
||||||
|
|
||||||
|
repo_expected = {
|
||||||
|
'AcquireByHash': False,
|
||||||
|
'Architectures': ['i386', 'source'],
|
||||||
|
'Codename': '',
|
||||||
|
'Distribution': 'wheezy',
|
||||||
|
'Label': '',
|
||||||
|
'NotAutomatic': '',
|
||||||
|
'ButAutomaticUpgrades': '',
|
||||||
|
'Origin': 'packagecloud.io/varnishcache/varnish30',
|
||||||
|
'Version': '',
|
||||||
|
'Path': prefix + '/' + 'wheezy',
|
||||||
|
'Prefix': prefix,
|
||||||
|
'SignedBy': '',
|
||||||
|
'SkipContents': False,
|
||||||
|
'MultiDist': False,
|
||||||
|
'SourceKind': 'snapshot',
|
||||||
|
'Sources': [{'Component': 'main', 'Name': snapshot1_name}],
|
||||||
|
'Storage': '',
|
||||||
|
'Suite': ''}
|
||||||
|
all_repos = self.get("/api/publish")
|
||||||
|
self.check_equal(all_repos.status_code, 200)
|
||||||
|
self.check_in(repo_expected, all_repos.json())
|
||||||
|
|
||||||
|
# Snapshot Mirror 2
|
||||||
|
snapshot2_name = self.random_name()
|
||||||
|
task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot2_name})
|
||||||
|
self.check_task(task)
|
||||||
|
|
||||||
|
task = self.put_task(
|
||||||
|
"/api/publish/" + prefix + "/wheezy",
|
||||||
|
json={
|
||||||
|
"Snapshots": [{"Component": "main", "Name": snapshot2_name}],
|
||||||
|
"Signing": DefaultSigningOptions,
|
||||||
|
"SkipContents": True,
|
||||||
|
"Label": "fun",
|
||||||
|
"Origin": "earth",
|
||||||
|
"Version": "13.3",
|
||||||
|
})
|
||||||
|
self.check_task(task)
|
||||||
|
repo_expected = {
|
||||||
|
'AcquireByHash': False,
|
||||||
|
'Architectures': ['i386', 'source'],
|
||||||
|
'Codename': '',
|
||||||
|
'Distribution': 'wheezy',
|
||||||
|
'Label': 'fun',
|
||||||
|
'Origin': 'earth',
|
||||||
|
'Version': '13.3',
|
||||||
|
'NotAutomatic': '',
|
||||||
|
'ButAutomaticUpgrades': '',
|
||||||
|
'Path': prefix + '/' + 'wheezy',
|
||||||
|
'Prefix': prefix,
|
||||||
|
'SignedBy': '',
|
||||||
|
'SkipContents': True,
|
||||||
|
'MultiDist': False,
|
||||||
|
'SourceKind': 'snapshot',
|
||||||
|
'Sources': [{'Component': 'main', 'Name': snapshot2_name}],
|
||||||
|
'Storage': '',
|
||||||
|
'Suite': ''}
|
||||||
|
|
||||||
|
all_repos = self.get("/api/publish")
|
||||||
|
self.check_equal(all_repos.status_code, 200)
|
||||||
|
self.check_in(repo_expected, all_repos.json())
|
||||||
|
|
||||||
|
task = self.delete_task("/api/publish/" + prefix + "/wheezy")
|
||||||
|
self.check_task(task)
|
||||||
|
self.check_not_exists("public/" + prefix + "dists/")
|
||||||
|
|
||||||
|
|
||||||
|
class PublishSwitchAPITestSnapshot(APITest):
|
||||||
|
"""
|
||||||
|
publish snapshot of snapshot
|
||||||
|
"""
|
||||||
|
fixtureGpg = True
|
||||||
|
|
||||||
|
def check(self):
|
||||||
|
repo_name = self.random_name()
|
||||||
|
self.check_equal(self.post(
|
||||||
|
"/api/repos", json={"Name": repo_name, "DefaultDistribution": "wheezy"}).status_code, 201)
|
||||||
|
|
||||||
|
d = self.random_name()
|
||||||
|
self.check_equal(
|
||||||
|
self.upload("/api/files/" + d,
|
||||||
|
"pyspi_0.6.1-1.3.dsc",
|
||||||
|
"pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz",
|
||||||
|
"pyspi-0.6.1-1.3.stripped.dsc").status_code, 200)
|
||||||
|
task = self.post_task("/api/repos/" + repo_name + "/file/" + d)
|
||||||
|
self.check_task(task)
|
||||||
|
|
||||||
|
snapshot1_name = self.random_name()
|
||||||
|
task = self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot1_name})
|
||||||
|
self.check_task(task)
|
||||||
|
|
||||||
|
prefix = self.random_name()
|
||||||
|
task = self.post_task(
|
||||||
|
"/api/publish/" + prefix,
|
||||||
|
json={
|
||||||
|
"Architectures": ["i386", "source"],
|
||||||
|
"SourceKind": "snapshot",
|
||||||
|
"Sources": [{"Name": snapshot1_name}],
|
||||||
|
"Signing": DefaultSigningOptions,
|
||||||
|
})
|
||||||
|
self.check_task(task)
|
||||||
|
|
||||||
|
repo_expected = {
|
||||||
|
'AcquireByHash': False,
|
||||||
|
'Architectures': ['i386', 'source'],
|
||||||
|
'Codename': '',
|
||||||
|
'Distribution': 'wheezy',
|
||||||
|
'Label': '',
|
||||||
|
'NotAutomatic': '',
|
||||||
|
'ButAutomaticUpgrades': '',
|
||||||
|
'Origin': '',
|
||||||
|
'Version': '',
|
||||||
|
'Path': prefix + '/' + 'wheezy',
|
||||||
|
'Prefix': prefix,
|
||||||
|
'SignedBy': '',
|
||||||
|
'SkipContents': False,
|
||||||
|
'MultiDist': False,
|
||||||
|
'SourceKind': 'snapshot',
|
||||||
|
'Sources': [{'Component': 'main', 'Name': snapshot1_name}],
|
||||||
|
'Storage': '',
|
||||||
|
'Suite': ''}
|
||||||
|
all_repos = self.get("/api/publish")
|
||||||
|
self.check_equal(all_repos.status_code, 200)
|
||||||
|
self.check_in(repo_expected, all_repos.json())
|
||||||
|
|
||||||
|
self.check_not_exists(
|
||||||
|
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
|
||||||
|
self.check_exists("public/" + prefix +
|
||||||
|
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
|
||||||
|
|
||||||
|
snapshot2_name = self.random_name()
|
||||||
|
task = self.post_task("/api/snapshots", json={"Name": snapshot2_name, 'SourceSnapshots': [snapshot1_name]})
|
||||||
|
self.check_task(task)
|
||||||
|
|
||||||
|
task = self.put_task(
|
||||||
|
"/api/publish/" + prefix + "/wheezy",
|
||||||
|
json={
|
||||||
|
"Snapshots": [{"Component": "main", "Name": snapshot2_name}],
|
||||||
|
"Signing": DefaultSigningOptions,
|
||||||
|
"SkipContents": True,
|
||||||
|
"Label": "fun",
|
||||||
|
"Origin": "earth",
|
||||||
|
"Version": "13.3",
|
||||||
|
})
|
||||||
|
self.check_task(task)
|
||||||
|
repo_expected = {
|
||||||
|
'AcquireByHash': False,
|
||||||
|
'Architectures': ['i386', 'source'],
|
||||||
|
'Codename': '',
|
||||||
|
'Distribution': 'wheezy',
|
||||||
|
'Label': 'fun',
|
||||||
|
'Origin': 'earth',
|
||||||
|
'Version': '13.3',
|
||||||
|
'NotAutomatic': '',
|
||||||
|
'ButAutomaticUpgrades': '',
|
||||||
|
'Path': prefix + '/' + 'wheezy',
|
||||||
|
'Prefix': prefix,
|
||||||
|
'SignedBy': '',
|
||||||
|
'SkipContents': True,
|
||||||
|
'MultiDist': False,
|
||||||
|
'SourceKind': 'snapshot',
|
||||||
|
'Sources': [{'Component': 'main', 'Name': snapshot2_name}],
|
||||||
|
'Storage': '',
|
||||||
|
'Suite': ''}
|
||||||
|
|
||||||
|
all_repos = self.get("/api/publish")
|
||||||
|
self.check_equal(all_repos.status_code, 200)
|
||||||
|
self.check_in(repo_expected, all_repos.json())
|
||||||
|
|
||||||
|
# FIXME: what should exist here ? publish snapshot of snapshot
|
||||||
|
self.check_not_exists(
|
||||||
|
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
|
||||||
|
self.check_not_exists("public/" + prefix +
|
||||||
|
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
|
||||||
|
|
||||||
|
task = self.delete_task("/api/publish/" + prefix + "/wheezy")
|
||||||
|
self.check_task(task)
|
||||||
|
self.check_not_exists("public/" + prefix + "dists/")
|
||||||
|
|
||||||
|
|
||||||
class PublishSwitchAPITestRepoSignedBy(APITest):
|
class PublishSwitchAPITestRepoSignedBy(APITest):
|
||||||
"""
|
"""
|
||||||
PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution
|
PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution
|
||||||
|
|||||||
@@ -461,34 +461,3 @@ class ReposAPITestCopyPackage(APITest):
|
|||||||
|
|
||||||
self.check_equal(self.get(f"/api/repos/{repo2_name}/packages").json(),
|
self.check_equal(self.get(f"/api/repos/{repo2_name}/packages").json(),
|
||||||
['Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378'])
|
['Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378'])
|
||||||
|
|
||||||
|
|
||||||
class ReposAPITestCreateEdit(APITest):
|
|
||||||
"""
|
|
||||||
POST /api/repos,
|
|
||||||
"""
|
|
||||||
def check(self):
|
|
||||||
repo_name = self.random_name() + ' with space'
|
|
||||||
repo_desc = {'Comment': 'fun repo',
|
|
||||||
'DefaultComponent': 'contrib',
|
|
||||||
'DefaultDistribution': 'bookworm',
|
|
||||||
'Name': repo_name}
|
|
||||||
|
|
||||||
resp = self.post("/api/repos", json=repo_desc)
|
|
||||||
self.check_equal(resp.json(), repo_desc)
|
|
||||||
self.check_equal(resp.status_code, 201)
|
|
||||||
|
|
||||||
repo_desc = {'Comment': 'modified repo',
|
|
||||||
'DefaultComponent': 'main',
|
|
||||||
'DefaultDistribution': 'trixie',
|
|
||||||
'Name': repo_name + '@renamed'}
|
|
||||||
resp = self.put(f"/api/repos/{repo_name}", json=repo_desc)
|
|
||||||
self.check_equal(resp.json(), repo_desc)
|
|
||||||
self.check_equal(resp.status_code, 200)
|
|
||||||
|
|
||||||
resp = self.get("/api/repos/" + repo_name + '@renamed')
|
|
||||||
self.check_equal(resp.json(), repo_desc)
|
|
||||||
self.check_equal(resp.status_code, 200)
|
|
||||||
|
|
||||||
resp = self.delete("/api/repos/" + repo_name + '@renamed')
|
|
||||||
self.check_equal(resp.status_code, 200)
|
|
||||||
|
|||||||
+26
-34
@@ -44,29 +44,28 @@ func (list *List) consumer() {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case task := <-list.queue:
|
case task := <-list.queue:
|
||||||
// Set task state to RUNNING before processing
|
|
||||||
list.Lock()
|
list.Lock()
|
||||||
task.State = RUNNING
|
{
|
||||||
|
task.State = RUNNING
|
||||||
|
}
|
||||||
list.Unlock()
|
list.Unlock()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
retValue, err := task.process(aptly.Progress(task.output), task.detail)
|
retValue, err := task.process(aptly.Progress(task.output), task.detail)
|
||||||
|
|
||||||
// Update task completion state and cleanup with list lock held
|
|
||||||
list.Lock()
|
list.Lock()
|
||||||
{
|
{
|
||||||
|
task.processReturnValue = retValue
|
||||||
|
task.err = err
|
||||||
if err != nil {
|
if err != nil {
|
||||||
task.output.Printf("Task failed with error: %v", err)
|
task.output.Printf("Task failed with error: %v", err)
|
||||||
task.State = FAILED
|
task.State = FAILED
|
||||||
task.err = err
|
|
||||||
task.processReturnValue = retValue
|
|
||||||
} else {
|
} else {
|
||||||
task.output.Print("Task succeeded")
|
task.output.Print("Task succeeded")
|
||||||
task.State = SUCCEEDED
|
task.State = SUCCEEDED
|
||||||
task.err = nil
|
|
||||||
task.processReturnValue = retValue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fmt.Printf("RACE DEBUG: Task Done '%s', freeing %s\n", task.Name, task.resources)
|
||||||
list.usedResources.Free(task.resources)
|
list.usedResources.Free(task.resources)
|
||||||
|
|
||||||
task.wgTask.Done()
|
task.wgTask.Done()
|
||||||
@@ -79,6 +78,8 @@ func (list *List) consumer() {
|
|||||||
blockingTasks := list.usedResources.UsedBy(t.resources)
|
blockingTasks := list.usedResources.UsedBy(t.resources)
|
||||||
if len(blockingTasks) == 0 {
|
if len(blockingTasks) == 0 {
|
||||||
list.usedResources.MarkInUse(t.resources, t)
|
list.usedResources.MarkInUse(t.resources, t)
|
||||||
|
|
||||||
|
fmt.Printf("RACE DEBUG: Task Resuming '%s', locking %s\n", t.Name, t.resources)
|
||||||
// unlock list since queueing may block
|
// unlock list since queueing may block
|
||||||
list.Unlock()
|
list.Unlock()
|
||||||
unlocked = true
|
unlocked = true
|
||||||
@@ -107,15 +108,13 @@ func (list *List) Stop() {
|
|||||||
|
|
||||||
// GetTasks gets complete list of tasks
|
// GetTasks gets complete list of tasks
|
||||||
func (list *List) GetTasks() []Task {
|
func (list *List) GetTasks() []Task {
|
||||||
list.Lock()
|
|
||||||
defer list.Unlock()
|
|
||||||
|
|
||||||
tasks := []Task{}
|
tasks := []Task{}
|
||||||
|
list.Lock()
|
||||||
for _, task := range list.tasks {
|
for _, task := range list.tasks {
|
||||||
// Copy task while holding list lock
|
|
||||||
tasks = append(tasks, *task)
|
tasks = append(tasks, *task)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
list.Unlock()
|
||||||
return tasks
|
return tasks
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -143,11 +142,11 @@ func (list *List) DeleteTaskByID(ID int) (Task, error) {
|
|||||||
// GetTaskByID returns task with given id
|
// GetTaskByID returns task with given id
|
||||||
func (list *List) GetTaskByID(ID int) (Task, error) {
|
func (list *List) GetTaskByID(ID int) (Task, error) {
|
||||||
list.Lock()
|
list.Lock()
|
||||||
defer list.Unlock()
|
tasks := list.tasks
|
||||||
|
list.Unlock()
|
||||||
|
|
||||||
for _, task := range list.tasks {
|
for _, task := range tasks {
|
||||||
if task.ID == ID {
|
if task.ID == ID {
|
||||||
// Copy task while holding list lock
|
|
||||||
return *task, nil
|
return *task, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -184,16 +183,13 @@ func (list *List) GetTaskDetailByID(ID int) (interface{}, error) {
|
|||||||
|
|
||||||
// GetTaskReturnValueByID returns process return value of task with given id
|
// GetTaskReturnValueByID returns process return value of task with given id
|
||||||
func (list *List) GetTaskReturnValueByID(ID int) (*ProcessReturnValue, error) {
|
func (list *List) GetTaskReturnValueByID(ID int) (*ProcessReturnValue, error) {
|
||||||
list.Lock()
|
task, err := list.GetTaskByID(ID)
|
||||||
defer list.Unlock()
|
|
||||||
|
|
||||||
for _, task := range list.tasks {
|
if err != nil {
|
||||||
if task.ID == ID {
|
return nil, err
|
||||||
return task.processReturnValue, nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("could not find task with id %v", ID)
|
return task.processReturnValue, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunTaskInBackground creates task and runs it in background. This will block until the necessary resources
|
// RunTaskInBackground creates task and runs it in background. This will block until the necessary resources
|
||||||
@@ -211,29 +207,26 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
|
|||||||
list.wg.Add(1)
|
list.wg.Add(1)
|
||||||
task.wgTask.Add(1)
|
task.wgTask.Add(1)
|
||||||
|
|
||||||
// Copy task while still holding the lock to avoid racing with consumer
|
|
||||||
// setting State=RUNNING after receiving from queue
|
|
||||||
taskCopy := *task
|
|
||||||
|
|
||||||
// add task to queue for processing if resources are available
|
// add task to queue for processing if resources are available
|
||||||
// if not, task will be queued by the consumer once resources are available
|
// if not, task will be queued by the consumer once resources are available
|
||||||
tasks := list.usedResources.UsedBy(resources)
|
tasks := list.usedResources.UsedBy(resources)
|
||||||
if len(tasks) == 0 {
|
if len(tasks) == 0 {
|
||||||
list.usedResources.MarkInUse(task.resources, task)
|
list.usedResources.MarkInUse(task.resources, task)
|
||||||
|
fmt.Printf("RACE DEBUG: Task Starting '%s', locking %s\n", name, resources)
|
||||||
// queueing task might block if channel not ready, unlock list before queueing
|
// queueing task might block if channel not ready, unlock list before queueing
|
||||||
list.Unlock()
|
list.Unlock()
|
||||||
list.queue <- task
|
list.queue <- task
|
||||||
} else {
|
} else {
|
||||||
|
fmt.Printf("RACE DEBUG: Task Queued '%s', waiting on %s\n", name, resources)
|
||||||
list.Unlock()
|
list.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
return taskCopy, nil
|
return *task, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear removes finished tasks from list
|
// Clear removes finished tasks from list
|
||||||
func (list *List) Clear() {
|
func (list *List) Clear() {
|
||||||
list.Lock()
|
list.Lock()
|
||||||
defer list.Unlock()
|
|
||||||
|
|
||||||
var tasks []*Task
|
var tasks []*Task
|
||||||
for _, task := range list.tasks {
|
for _, task := range list.tasks {
|
||||||
@@ -242,6 +235,8 @@ func (list *List) Clear() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
list.tasks = tasks
|
list.tasks = tasks
|
||||||
|
|
||||||
|
list.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait waits till all tasks are processed
|
// Wait waits till all tasks are processed
|
||||||
@@ -264,14 +259,11 @@ func (list *List) WaitForTaskByID(ID int) (Task, error) {
|
|||||||
|
|
||||||
// GetTaskErrorByID returns the Task error for a given id
|
// GetTaskErrorByID returns the Task error for a given id
|
||||||
func (list *List) GetTaskErrorByID(ID int) (error, error) {
|
func (list *List) GetTaskErrorByID(ID int) (error, error) {
|
||||||
list.Lock()
|
task, err := list.GetTaskByID(ID)
|
||||||
defer list.Unlock()
|
|
||||||
|
|
||||||
for _, task := range list.tasks {
|
if err != nil {
|
||||||
if task.ID == ID {
|
return nil, err
|
||||||
return task.err, nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("could not find task with id %v", ID)
|
return task.err, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -42,7 +42,6 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Task represents as task in a queue encapsulates process code
|
// Task represents as task in a queue encapsulates process code
|
||||||
// All fields are protected by List.Mutex - access task fields only while holding list.Lock()
|
|
||||||
type Task struct {
|
type Task struct {
|
||||||
output *Output
|
output *Output
|
||||||
detail *Detail
|
detail *Detail
|
||||||
|
|||||||
Reference in New Issue
Block a user