Compare commits

..

11 Commits

Author SHA1 Message Date
André Roth 174ced253f system-tests: add debug flag 2026-05-14 20:38:59 +02:00
André Roth d27c5856de debug mode 2026-05-14 13:51:36 +02:00
André Roth c77d788493 publish: lock all distributions with MultiDist 2026-05-05 11:04:36 +02:00
André Roth 5ff552d919 publish: fix locking of snapshots
snapshots come in 3 kinds: local, remote and snapshot

find resources to be locked for each kind, recursively for the snapshot kind
2026-05-05 00:24:01 +02:00
André Roth 4defa49b7f publish: lock resources from all SourceKinds 2026-05-04 20:48:05 +02:00
André Roth 6fbcbc108c more debug 2026-05-04 18:41:50 +02:00
André Roth 41f5d22637 publish: remove useless ressource assignment 2026-05-04 17:19:36 +02:00
André Roth 8179f73bf0 publish: cleanup 2026-05-04 17:19:15 +02:00
André Roth f8efb3e9b7 publish update: lock all snapshots and repos as well 2026-05-04 16:12:54 +02:00
André Roth 55b2943f44 more debug 2026-05-04 13:49:24 +02:00
André Roth 9280231c1d publish: debug locking 2026-05-04 12:49:16 +02:00
24 changed files with 636 additions and 703 deletions
+1 -4
View File
@@ -155,7 +155,7 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
name: ["Debian 13/trixie", "Debian 12/bookworm", "Debian 11/bullseye", "Ubuntu 26.04", "Ubuntu 24.04", "Ubuntu 22.04", "Ubuntu 20.04"] name: ["Debian 13/trixie", "Debian 12/bookworm", "Debian 11/bullseye", "Ubuntu 24.04", "Ubuntu 22.04", "Ubuntu 20.04"]
arch: ["amd64", "i386" , "arm64" , "armhf"] arch: ["amd64", "i386" , "arm64" , "armhf"]
include: include:
- name: "Debian 13/trixie" - name: "Debian 13/trixie"
@@ -167,9 +167,6 @@ jobs:
- name: "Debian 11/bullseye" - name: "Debian 11/bullseye"
suite: bullseye suite: bullseye
image: debian:bullseye-slim image: debian:bullseye-slim
- name: "Ubuntu 26.04"
suite: resolute
image: ubuntu:26.04
- name: "Ubuntu 24.04" - name: "Ubuntu 24.04"
suite: noble suite: noble
image: ubuntu:24.04 image: ubuntu:24.04
+2 -2
View File
@@ -16,7 +16,7 @@ Please report unacceptable behavior on [https://github.com/aptly-dev/aptly/discu
### List of Repositories ### List of Repositories
* [aptly-dev/aptly](https://github.com/aptly-dev/aptly) - aptly source code, functional tests, man page * [aptly-dev/aptly](https://github.com/aptly-dev/aptly) - aptly source code, functional tests, man page
* [aptly-dev/aptly-dev.github.io](https://github.com/aptly-dev/aptly-dev.github.io) - aptly website (https://www.aptly.info/) * [apty-dev/aptly-dev.github.io](https://github.com/aptly-dev/aptly-dev.github.io) - aptly website (https://www.aptly.info/)
* [aptly-dev/aptly-fixture-db](https://github.com/aptly-dev/aptly-fixture-db) & [aptly-dev/aptly-fixture-pool](https://github.com/aptly-dev/aptly-fixture-pool) provide * [aptly-dev/aptly-fixture-db](https://github.com/aptly-dev/aptly-fixture-db) & [aptly-dev/aptly-fixture-pool](https://github.com/aptly-dev/aptly-fixture-pool) provide
fixtures for aptly functional tests fixtures for aptly functional tests
@@ -137,7 +137,7 @@ make docker-unit-tests
In order to run aptly system tests, enter the following: In order to run aptly system tests, enter the following:
``` ```
make docker-system-test make docker-system-tests
``` ```
#### Running golangci-lint #### Running golangci-lint
+8 -3
View File
@@ -29,6 +29,11 @@ ifeq ($(CAPTURE),1)
CAPTURE_ARG := --capture CAPTURE_ARG := --capture
endif endif
# export DEBUG=1 to enable debug output in system tests
ifeq ($(DEBUG),1)
DEBUG_ARG := --debug
endif
help: ## Print this help help: ## Print this help
@grep -E '^[a-zA-Z][a-zA-Z0-9_-]*:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' @grep -E '^[a-zA-Z][a-zA-Z0-9_-]*:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
@@ -121,7 +126,7 @@ system-test: prepare swagger etcd-install ## Run system tests
if [ ! -e ~/aptly-fixture-pool ]; then git clone https://github.com/aptly-dev/aptly-fixture-pool.git ~/aptly-fixture-pool/; fi if [ ! -e ~/aptly-fixture-pool ]; then git clone https://github.com/aptly-dev/aptly-fixture-pool.git ~/aptly-fixture-pool/; fi
test -f ~/etcd.db || (curl -o ~/etcd.db.xz http://repo.aptly.info/system-tests/etcd.db.xz && xz -d ~/etcd.db.xz) test -f ~/etcd.db || (curl -o ~/etcd.db.xz http://repo.aptly.info/system-tests/etcd.db.xz && xz -d ~/etcd.db.xz)
# Run system tests # Run system tests
PATH=$(BINPATH)/:$(PATH) FORCE_COLOR=1 $(PYTHON) system/run.py --long $(COVERAGE_ARG_TEST) $(CAPTURE_ARG) $(TEST) PATH=$(BINPATH)/:$(PATH) FORCE_COLOR=1 $(PYTHON) system/run.py --long $(COVERAGE_ARG_TEST) $(CAPTURE_ARG) $(DEBUG_ARG) $(TEST)
bench: bench:
@echo "\e[33m\e[1mRunning benchmark ...\e[0m" @echo "\e[33m\e[1mRunning benchmark ...\e[0m"
@@ -211,7 +216,7 @@ docker-system-test: ## Run system tests in docker container (add TEST=t04_mirro
AZURE_STORAGE_ACCESS_KEY="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" \ AZURE_STORAGE_ACCESS_KEY="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" \
AWS_ACCESS_KEY_ID=$(AWS_ACCESS_KEY_ID) \ AWS_ACCESS_KEY_ID=$(AWS_ACCESS_KEY_ID) \
AWS_SECRET_ACCESS_KEY=$(AWS_SECRET_ACCESS_KEY) \ AWS_SECRET_ACCESS_KEY=$(AWS_SECRET_ACCESS_KEY) \
system-test TEST=$(TEST) CAPTURE=$(CAPTURE) COVERAGE_SKIP=$(COVERAGE_SKIP) \ system-test TEST=$(TEST) CAPTURE=$(CAPTURE) COVERAGE_SKIP=$(COVERAGE_SKIP) DEBUG=$(DEBUG) \
azurite-stop azurite-stop
docker-serve: ## Run development server (auto recompiling) on http://localhost:3142 docker-serve: ## Run development server (auto recompiling) on http://localhost:3142
@@ -241,4 +246,4 @@ clean: ## remove local build and module cache
rm -f unit.out aptly.test VERSION docs/docs.go docs/swagger.json docs/swagger.yaml docs/swagger.conf rm -f unit.out aptly.test VERSION docs/docs.go docs/swagger.json docs/swagger.yaml docs/swagger.conf
find system/ -type d -name __pycache__ -exec rm -rf {} \; 2>/dev/null || true find system/ -type d -name __pycache__ -exec rm -rf {} \; 2>/dev/null || true
.PHONY: help man prepare swagger version binaries build docker-release docker-system-test docker-unit-test docker-lint docker-build docker-image docker-man docker-shell docker-serve clean releasetype dpkg serve flake8 .PHONY: help man prepare swagger version binaries build docker-release docker-system-tests docker-unit-test docker-lint docker-build docker-image docker-man docker-shell docker-serve clean releasetype dpkg serve flake8
+1 -1
View File
@@ -54,7 +54,7 @@ type gpgDeleteKeyParams struct {
// @Summary Add GPG Keys // @Summary Add GPG Keys
// @Description **Adds GPG keys to aptly keyring** // @Description **Adds GPG keys to aptly keyring**
// @Description // @Description
// @Description Add GPG public keys for verifying remote repositories for mirroring. // @Description Add GPG public keys for veryfing remote repositories for mirroring.
// @Description // @Description
// @Description Keys can be added in two ways: // @Description Keys can be added in two ways:
// @Description * By providing the ASCII armord key in `GpgKeyArmor` (leave Keyserver and GpgKeyID empty) // @Description * By providing the ASCII armord key in `GpgKeyArmor` (leave Keyserver and GpgKeyID empty)
+9 -41
View File
@@ -216,9 +216,9 @@ func apiMirrorsDrop(c *gin.Context) {
name := c.Params.ByName("name") name := c.Params.ByName("name")
force := c.Request.URL.Query().Get("force") == "1" force := c.Request.URL.Query().Get("force") == "1"
// Phase 1: Pre-task validation (shallow load for 404 check only)
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
mirrorCollection := collectionFactory.RemoteRepoCollection() mirrorCollection := collectionFactory.RemoteRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
repo, err := mirrorCollection.ByName(name) repo, err := mirrorCollection.ByName(name)
if err != nil { if err != nil {
@@ -228,34 +228,21 @@ func apiMirrorsDrop(c *gin.Context) {
resources := []string{string(repo.Key())} resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Delete mirror %s", name) taskName := fmt.Sprintf("Delete mirror %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Phase 2: Inside task lock - create fresh collections err := repo.CheckLock()
taskCollectionFactory := context.NewCollectionFactory()
taskMirrorCollection := taskCollectionFactory.RemoteRepoCollection()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
// Fresh load after lock acquired
repo, err := taskMirrorCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
}
err = repo.CheckLock()
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
} }
if !force { if !force {
// Fresh checks with current collections snapshots := snapshotCollection.ByRemoteRepoSource(repo)
snapshots := taskSnapshotCollection.ByRemoteRepoSource(repo)
if len(snapshots) > 0 { if len(snapshots) > 0 {
return &task.ProcessReturnValue{Code: http.StatusForbidden, Value: nil}, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override") return &task.ProcessReturnValue{Code: http.StatusForbidden, Value: nil}, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override")
} }
} }
err = taskMirrorCollection.Drop(repo) err = mirrorCollection.Drop(repo)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
} }
@@ -510,7 +497,7 @@ func apiMirrorsEdit(c *gin.Context) {
type mirrorUpdateParams struct { type mirrorUpdateParams struct {
// Change mirror name to `Name` // Change mirror name to `Name`
Name string ` json:"Name" example:"mirror1"` Name string ` json:"Name" example:"mirror1"`
// Gpg keyring(s) for verifying Release file // Gpg keyring(s) for verifing Release file
Keyrings []string ` json:"Keyrings" example:"trustedkeys.gpg"` Keyrings []string ` json:"Keyrings" example:"trustedkeys.gpg"`
// Set "true" to ignore checksum errors // Set "true" to ignore checksum errors
IgnoreChecksums bool ` json:"IgnoreChecksums"` IgnoreChecksums bool ` json:"IgnoreChecksums"`
@@ -548,8 +535,7 @@ func apiMirrorsUpdate(c *gin.Context) {
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.RemoteRepoCollection() collection := collectionFactory.RemoteRepoCollection()
name := c.Params.ByName("name") remote, err = collection.ByName(c.Params.ByName("name"))
remote, err = collection.ByName(name)
if err != nil { if err != nil {
AbortWithJSONError(c, 404, err) AbortWithJSONError(c, 404, err)
return return
@@ -564,7 +550,6 @@ func apiMirrorsUpdate(c *gin.Context) {
return return
} }
// Pre-task validation of new name if provided
if b.Name != remote.Name { if b.Name != remote.Name {
_, err = collection.ByName(b.Name) _, err = collection.ByName(b.Name)
if err == nil { if err == nil {
@@ -581,26 +566,9 @@ func apiMirrorsUpdate(c *gin.Context) {
resources := []string{string(remote.Key())} resources := []string{string(remote.Key())}
maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
// Phase 2: Inside task lock - create fresh factory
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.RemoteRepoCollection()
// Fresh load after lock acquired (use captured `name` variable, not gin context)
remote, err := taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
// Fresh rename check inside lock (if renaming)
if b.Name != remote.Name {
_, err := taskCollection.ByName(b.Name)
if err == nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: mirror %s already exists", b.Name)
}
}
downloader := context.NewDownloader(out) downloader := context.NewDownloader(out)
err = remote.Fetch(downloader, verifier, b.IgnoreSignatures) err := remote.Fetch(downloader, verifier, b.IgnoreSignatures)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
} }
@@ -812,8 +780,8 @@ func apiMirrorsUpdate(c *gin.Context) {
} }
log.Info().Msgf("%s: Finalizing download...", b.Name) log.Info().Msgf("%s: Finalizing download...", b.Name)
_ = remote.FinalizeDownload(taskCollectionFactory, out) _ = remote.FinalizeDownload(collectionFactory, out)
err = taskCollection.Update(remote) err = collectionFactory.RemoteRepoCollection().Update(remote)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
} }
+251 -304
View File
@@ -2,7 +2,6 @@ package api
import ( import (
"fmt" "fmt"
"log"
"net/http" "net/http"
"strings" "strings"
@@ -125,7 +124,7 @@ func apiPublishList(c *gin.Context) {
// @Description See also: `aptly publish show` // @Description See also: `aptly publish show`
// @Tags Publish // @Tags Publish
// @Produce json // @Produce json
// @Param prefix path string true "publishing prefix, use `:.` instead of `.` because it is ambiguous in URLs" // @Param prefix path string true "publishing prefix, use `:.` instead of `.` because it is ambigious in URLs"
// @Param distribution path string true "distribution name" // @Param distribution path string true "distribution name"
// @Success 200 {object} deb.PublishedRepo // @Success 200 {object} deb.PublishedRepo
// @Failure 404 {object} Error "Published repository not found" // @Failure 404 {object} Error "Published repository not found"
@@ -256,13 +255,13 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
if b.SourceKind == deb.SourceSnapshot { if b.SourceKind == deb.SourceSnapshot {
var snapshot *deb.Snapshot var snapshot *deb.Snapshot
snapshotCollection := collectionFactory.SnapshotCollection() tmpCollection := collectionFactory.SnapshotCollection()
for _, source := range b.Sources { for _, source := range b.Sources {
components = append(components, source.Component) components = append(components, source.Component)
names = append(names, source.Name) names = append(names, source.Name)
snapshot, err = snapshotCollection.ByName(source.Name) snapshot, err = tmpCollection.ByName(source.Name)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err))
return return
@@ -274,13 +273,13 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
} else if b.SourceKind == deb.SourceLocalRepo { } else if b.SourceKind == deb.SourceLocalRepo {
var localRepo *deb.LocalRepo var localRepo *deb.LocalRepo
localCollection := collectionFactory.LocalRepoCollection() tmpCollection := collectionFactory.LocalRepoCollection()
for _, source := range b.Sources { for _, source := range b.Sources {
components = append(components, source.Component) components = append(components, source.Component)
names = append(names, source.Name) names = append(names, source.Name)
localRepo, err = localCollection.ByName(source.Name) localRepo, err = tmpCollection.ByName(source.Name)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err))
return return
@@ -299,25 +298,11 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
multiDist = *b.MultiDist multiDist = *b.MultiDist
} }
// Pre-register the published repo key in resources so that concurrent collection := collectionFactory.PublishedRepoCollection()
// POST requests for the same prefix/distribution are serialized by the
// task queue rather than racing on CheckDuplicate + Add.
if b.Distribution != "" {
storagePrefix := prefix
if storage != "" {
storagePrefix = storage + ":" + prefix
}
resources = append(resources, "U"+storagePrefix+">>"+b.Distribution)
} else {
log.Printf("distribution not specified for publish to prefix '%s' - unable to lock ", prefix)
}
taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"", taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"",
b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`)) b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`))
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
taskDetail := task.PublishDetail{ taskDetail := task.PublishDetail{
Detail: detail, Detail: detail,
} }
@@ -329,10 +314,10 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
for _, source := range sources { for _, source := range sources {
switch s := source.(type) { switch s := source.(type) {
case *deb.Snapshot: case *deb.Snapshot:
snapshotCollection := taskCollectionFactory.SnapshotCollection() snapshotCollection := collectionFactory.SnapshotCollection()
err = snapshotCollection.LoadComplete(s) err = snapshotCollection.LoadComplete(s)
case *deb.LocalRepo: case *deb.LocalRepo:
localCollection := taskCollectionFactory.LocalRepoCollection() localCollection := collectionFactory.LocalRepoCollection()
err = localCollection.LoadComplete(s) err = localCollection.LoadComplete(s)
default: default:
err = fmt.Errorf("unexpected type for source: %T", source) err = fmt.Errorf("unexpected type for source: %T", source)
@@ -342,7 +327,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
} }
} }
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, taskCollectionFactory, multiDist) published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, multiDist)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
} }
@@ -380,18 +365,18 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
published.Version = b.Version published.Version = b.Version
} }
duplicate := taskCollection.CheckDuplicate(published) duplicate := collection.CheckDuplicate(published)
if duplicate != nil { if duplicate != nil {
_ = taskCollectionFactory.PublishedRepoCollection().LoadComplete(duplicate, taskCollectionFactory) _ = collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory)
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate) return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate)
} }
err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath()) err = published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath())
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
} }
err = taskCollection.Add(published) err = collection.Add(published)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
} }
@@ -400,6 +385,46 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
}) })
} }
// Return resources to be locked for a Snapshot name
func getSnapshotResources(snapshotCollection *deb.SnapshotCollection, snapshotName string) (resources []string, err error) {
snapshot, err := snapshotCollection.ByName(snapshotName)
if err != nil {
return
}
resources = append(resources, string(snapshot.ResourceKey()))
for _, sourceID := range snapshot.SourceIDs {
if snapshot.SourceKind == deb.SourceSnapshot {
snapshot2, err2 := snapshotCollection.ByUUID(sourceID)
if err2 != nil {
err = err2
return
}
res, err3 := getSnapshotResources(snapshotCollection, snapshot2.Name)
if err3 != nil {
err = err3
return
}
resources = append(resources, res...)
} else if snapshot.SourceKind == deb.SourceLocalRepo {
var repo *deb.LocalRepo
repo, err = context.NewCollectionFactory().LocalRepoCollection().ByUUID(sourceID)
if err != nil {
return
}
resources = append(resources, string(repo.Key()))
} else if snapshot.SourceKind == deb.SourceRemoteRepo {
var mirror *deb.RemoteRepo
mirror, err = context.NewCollectionFactory().RemoteRepoCollection().ByUUID(sourceID)
if err != nil {
return
}
resources = append(resources, string(mirror.Key()))
}
}
return
}
type publishedRepoUpdateSwitchParams struct { type publishedRepoUpdateSwitchParams struct {
// when publishing, overwrite files in pool/ directory without notice // when publishing, overwrite files in pool/ directory without notice
ForceOverwrite bool ` json:"ForceOverwrite" example:"false"` ForceOverwrite bool ` json:"ForceOverwrite" example:"false"`
@@ -419,12 +444,12 @@ type publishedRepoUpdateSwitchParams struct {
SignedBy *string ` json:"SignedBy" example:""` SignedBy *string ` json:"SignedBy" example:""`
// Enable multiple packages with the same filename in different distributions // Enable multiple packages with the same filename in different distributions
MultiDist *bool ` json:"MultiDist" example:"false"` MultiDist *bool ` json:"MultiDist" example:"false"`
// Value of Label: field in published repository stanza // Value of Label: field in published repository stanza
Label *string ` json:"Label" example:"Debian"` Label *string ` json:"Label" example:"Debian"`
// Value of Origin: field in published repository stanza // Value of Origin: field in published repository stanza
Origin *string ` json:"Origin" example:"Debian"` Origin *string ` json:"Origin" example:"Debian"`
// Version of the release: Optional // Version of the release: Optional
Version *string ` json:"Version" example:"13.3"` Version *string ` json:"Version" example:"13.3"`
} }
// @Summary Update Published Repository // @Summary Update Published Repository
@@ -471,7 +496,6 @@ func apiPublishUpdateSwitch(c *gin.Context) {
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection() collection := collectionFactory.PublishedRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection() snapshotCollection := collectionFactory.SnapshotCollection()
localRepoCollection := collectionFactory.LocalRepoCollection()
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
@@ -486,71 +510,69 @@ func apiPublishUpdateSwitch(c *gin.Context) {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo")) AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo"))
return return
} }
for _, uuid := range published.Sources {
repo, err2 := localRepoCollection.ByUUID(uuid) localCollection := collectionFactory.LocalRepoCollection()
for _, sourceID := range published.Sources {
localRepo, err2 := localCollection.ByUUID(sourceID)
if err2 != nil { if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2) AbortWithJSONError(c, http.StatusNotFound, err2)
return return
} }
resources = append(resources, string(repo.Key())) resources = append(resources, string(localRepo.Key()))
} }
} else if published.SourceKind == deb.SourceSnapshot { } else if published.SourceKind == deb.SourceSnapshot {
for _, snapshotInfo := range b.Snapshots { for _, snapshotInfo := range b.Snapshots {
snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name) res, err2 := getSnapshotResources(snapshotCollection, snapshotInfo.Name)
if err2 != nil { if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2) AbortWithJSONError(c, http.StatusNotFound, err2)
return return
} }
resources = append(resources, string(snapshot.ResourceKey())) resources = append(resources, res...)
} }
} else { } else {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type")) AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type"))
return return
} }
// Field mutations and fresh DB load are deferred to inside the task so if b.SkipContents != nil {
// they always operate on a consistent state after the lock is held. published.SkipContents = *b.SkipContents
}
if b.SkipBz2 != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
if b.SignedBy != nil {
published.SignedBy = *b.SignedBy
}
if b.MultiDist != nil {
published.MultiDist = *b.MultiDist
}
if b.Label != nil {
published.Label = *b.Label
}
if b.Origin != nil {
published.Origin = *b.Origin
}
if b.Version != nil {
published.Version = *b.Version
}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory() err = collection.LoadComplete(published, collectionFactory)
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
} }
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
// Apply field mutations on the freshly loaded object.
if b.SkipContents != nil {
published.SkipContents = *b.SkipContents
}
if b.SkipBz2 != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
if b.SignedBy != nil {
published.SignedBy = *b.SignedBy
}
if b.MultiDist != nil {
published.MultiDist = *b.MultiDist
}
if b.Label != nil {
published.Label = *b.Label
}
if b.Origin != nil {
published.Origin = *b.Origin
}
if b.Version != nil {
published.Version = *b.Version
}
revision := published.ObtainRevision() revision := published.ObtainRevision()
sources := revision.Sources sources := revision.Sources
@@ -562,17 +584,17 @@ func apiPublishUpdateSwitch(c *gin.Context) {
} }
} }
result, err := published.Update(taskCollectionFactory, out) result, err := published.Update(collectionFactory, out)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
} }
err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
} }
err = taskCollection.Update(published) err = collection.Update(published)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
} }
@@ -580,7 +602,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
if b.SkipCleanup == nil || !*b.SkipCleanup { if b.SkipCleanup == nil || !*b.SkipCleanup {
cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources)) cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources))
cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...) cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...)
err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out) err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
} }
@@ -630,11 +652,8 @@ func apiPublishDrop(c *gin.Context) {
resources := []string{string(published.Key())} resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory() err := collection.Remove(context, storage, prefix, distribution,
taskCollection := taskCollectionFactory.PublishedRepoCollection() collectionFactory, out, force, skipCleanup)
err := taskCollection.Remove(context, storage, prefix, distribution,
taskCollectionFactory, out, force, skipCleanup)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %s", err)
} }
@@ -670,52 +689,43 @@ func apiPublishAddSource(c *gin.Context) {
storage, prefix := deb.ParsePrefix(param) storage, prefix := deb.ParsePrefix(param)
distribution := slashEscape(c.Params.ByName("distribution")) distribution := slashEscape(c.Params.ByName("distribution"))
if c.Bind(&b) != nil {
return
}
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection() collection := collectionFactory.PublishedRepoCollection()
// Load shallowly (no LoadComplete) to verify existence and obtain the
// resource key and task name. The actual mutation is performed inside
// the task on a freshly loaded copy to prevent lost-update races.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err))
return return
} }
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to create: %s", err))
return
}
if c.Bind(&b) != nil {
return
}
revision := published.ObtainRevision()
sources := revision.Sources
component := b.Component
name := b.Name
_, exists := sources[component]
if exists {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("unable to create: Component '%s' already exists", component))
return
}
sources[component] = name
resources := []string{string(published.Key())} resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory() err = collection.Update(published)
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to create: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create: %s", err)
}
revision := published.ObtainRevision()
sources := revision.Sources
component := b.Component
name := b.Name
_, exists := sources[component]
if exists {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("unable to create: Component '%s' already exists", component)
}
sources[component] = name
err = taskCollection.Update(published)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
} }
@@ -797,48 +807,39 @@ func apiPublishSetSources(c *gin.Context) {
storage, prefix := deb.ParsePrefix(param) storage, prefix := deb.ParsePrefix(param)
distribution := slashEscape(c.Params.ByName("distribution")) distribution := slashEscape(c.Params.ByName("distribution"))
if c.Bind(&b) != nil {
return
}
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection() collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and mutation happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
return return
} }
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
return
}
if c.Bind(&b) != nil {
return
}
revision := published.ObtainRevision()
sources := make(map[string]string, len(b))
revision.Sources = sources
for _, source := range b {
component := source.Component
name := source.Name
sources[component] = name
}
resources := []string{string(published.Key())} resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory() err = collection.Update(published)
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
revision := published.ObtainRevision()
sources := make(map[string]string, len(b))
revision.Sources = sources
for _, source := range b {
component := source.Component
name := source.Name
sources[component] = name
}
err = taskCollection.Update(published)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
} }
@@ -871,33 +872,24 @@ func apiPublishDropChanges(c *gin.Context) {
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection() collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and DropRevision happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
return return
} }
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
return
}
published.DropRevision()
resources := []string{string(published.Key())} resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory() err = collection.Update(published)
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err)
}
published.DropRevision()
err = taskCollection.Update(published)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
} }
@@ -933,58 +925,51 @@ func apiPublishUpdateSource(c *gin.Context) {
param := slashEscape(c.Params.ByName("prefix")) param := slashEscape(c.Params.ByName("prefix"))
storage, prefix := deb.ParsePrefix(param) storage, prefix := deb.ParsePrefix(param)
distribution := slashEscape(c.Params.ByName("distribution")) distribution := slashEscape(c.Params.ByName("distribution"))
urlComponent := slashEscape(c.Params.ByName("component")) component := slashEscape(c.Params.ByName("component"))
// Default component to the URL path segment; the body may rename it.
b.Component = urlComponent
if c.Bind(&b) != nil {
return
}
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection() collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and mutation happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
return return
} }
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
return
}
revision := published.ObtainRevision()
sources := revision.Sources
_, exists := sources[component]
if !exists {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: Component '%s' does not exist", component))
return
}
b.Component = component
b.Name = revision.Sources[component]
if c.Bind(&b) != nil {
return
}
if b.Component != component {
delete(sources, component)
}
component = b.Component
name := b.Name
sources[component] = name
resources := []string{string(published.Key())} resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory() err = collection.Update(published)
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
revision := published.ObtainRevision()
sources := revision.Sources
_, exists := sources[urlComponent]
if !exists {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: Component '%s' does not exist", urlComponent)
}
if b.Component != urlComponent {
delete(sources, urlComponent)
}
newComponent := b.Component
name := b.Name
sources[newComponent] = name
err = taskCollection.Update(published)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
} }
@@ -1021,41 +1006,33 @@ func apiPublishRemoveSource(c *gin.Context) {
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection() collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and mutation happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
return return
} }
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
return
}
revision := published.ObtainRevision()
sources := revision.Sources
_, exists := sources[component]
if !exists {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: Component '%s' does not exist", component))
return
}
delete(sources, component)
resources := []string{string(published.Key())} resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory() err = collection.Update(published)
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err)
}
revision := published.ObtainRevision()
sources := revision.Sources
_, exists := sources[component]
if !exists {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: Component '%s' does not exist", component)
}
delete(sources, component)
err = taskCollection.Update(published)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
} }
@@ -1127,94 +1104,64 @@ func apiPublishUpdate(c *gin.Context) {
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection() collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and field mutations happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
return return
} }
resources := []string{string(published.Key())} err = collection.LoadComplete(published, collectionFactory)
if err != nil {
// Lock source repos / snapshots the same way apiPublishUpdateSwitch does, AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
// because published.Update() reads from them and concurrent modification return
// would produce an inconsistent view.
snapshotCollection := collectionFactory.SnapshotCollection()
localRepoCollection := collectionFactory.LocalRepoCollection()
if published.SourceKind == deb.SourceLocalRepo {
for _, uuid := range published.Sources {
repo, err2 := localRepoCollection.ByUUID(uuid)
if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(repo.Key()))
}
} else if published.SourceKind == deb.SourceSnapshot {
for _, uuid := range published.Sources {
snapshot, err2 := snapshotCollection.ByUUID(uuid)
if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(snapshot.ResourceKey()))
}
} }
if b.SkipContents != nil {
published.SkipContents = *b.SkipContents
}
if b.SkipBz2 != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
if b.SignedBy != nil {
published.SignedBy = *b.SignedBy
}
if b.MultiDist != nil {
published.MultiDist = *b.MultiDist
}
if b.Label != nil {
published.Label = *b.Label
}
if b.Origin != nil {
published.Origin = *b.Origin
}
if b.Version != nil {
published.Version = *b.Version
}
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory() result, err := published.Update(collectionFactory, out)
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
} }
err = taskCollection.LoadComplete(published, taskCollectionFactory) err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
} }
// Apply field mutations on the freshly loaded object. err = collection.Update(published)
if b.SkipContents != nil {
published.SkipContents = *b.SkipContents
}
if b.SkipBz2 != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
if b.SignedBy != nil {
published.SignedBy = *b.SignedBy
}
if b.MultiDist != nil {
published.MultiDist = *b.MultiDist
}
if b.Label != nil {
published.Label = *b.Label
}
if b.Origin != nil {
published.Origin = *b.Origin
}
if b.Version != nil {
published.Version = *b.Version
}
result, err := published.Update(taskCollectionFactory, out)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = taskCollection.Update(published)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
} }
@@ -1222,7 +1169,7 @@ func apiPublishUpdate(c *gin.Context) {
if b.SkipCleanup == nil || !*b.SkipCleanup { if b.SkipCleanup == nil || !*b.SkipCleanup {
cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources)) cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources))
cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...) cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...)
err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out) err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
} }
+74 -171
View File
@@ -102,7 +102,7 @@ type repoCreateParams struct {
DefaultDistribution string ` json:"DefaultDistribution" example:"stable"` DefaultDistribution string ` json:"DefaultDistribution" example:"stable"`
// Default component when publishing from this local repo // Default component when publishing from this local repo
DefaultComponent string ` json:"DefaultComponent" example:"main"` DefaultComponent string ` json:"DefaultComponent" example:"main"`
// Snapshot name to create repository from (optional) // Snapshot name to create repoitory from (optional)
FromSnapshot string ` json:"FromSnapshot" example:""` FromSnapshot string ` json:"FromSnapshot" example:""`
} }
@@ -131,69 +131,46 @@ func apiReposCreate(c *gin.Context) {
return return
} }
// Handler: Pre-task validations (shallow) repo := deb.NewLocalRepo(b.Name, b.Comment)
repo.DefaultComponent = b.DefaultComponent
repo.DefaultDistribution = b.DefaultDistribution
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
if b.FromSnapshot != "" { if b.FromSnapshot != "" {
var snapshot *deb.Snapshot
snapshotCollection := collectionFactory.SnapshotCollection() snapshotCollection := collectionFactory.SnapshotCollection()
_, err := snapshotCollection.ByName(b.FromSnapshot) snapshot, err := snapshotCollection.ByName(b.FromSnapshot)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("source snapshot not found: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("source snapshot not found: %s", err))
return return
} }
// Just verify it exists - don't load here
}
// Use generated key resource for repo being created err = snapshotCollection.LoadComplete(snapshot)
resources := []string{"LocalRepo:" + b.Name}
if b.FromSnapshot != "" {
resources = append(resources, "Snapshot:"+b.FromSnapshot)
}
taskName := fmt.Sprintf("Create repository %s", b.Name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Task: Create fresh collection and check/create ATOMIC inside task
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.LocalRepoCollection()
// Check duplicate inside lock
if _, err := taskCollection.ByName(b.Name); err == nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil},
fmt.Errorf("local repo with name %s already exists", b.Name)
}
// Create repo
repo := deb.NewLocalRepo(b.Name, b.Comment)
repo.DefaultComponent = b.DefaultComponent
repo.DefaultDistribution = b.DefaultDistribution
if b.FromSnapshot != "" {
snapshotCollection := taskCollectionFactory.SnapshotCollection()
snapshot, err := snapshotCollection.ByName(b.FromSnapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil},
fmt.Errorf("source snapshot not found: %s", err)
}
err = snapshotCollection.LoadComplete(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil},
fmt.Errorf("unable to load source snapshot: %s", err)
}
repo.UpdateRefList(snapshot.RefList())
}
err := taskCollection.Add(repo)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to load source snapshot: %s", err))
return
} }
return &task.ProcessReturnValue{Code: http.StatusCreated, Value: repo}, nil repo.UpdateRefList(snapshot.RefList())
}) }
localRepoCollection := collectionFactory.LocalRepoCollection()
if _, err := localRepoCollection.ByName(b.Name); err == nil {
AbortWithJSONError(c, http.StatusConflict, fmt.Errorf("local repo with name %s already exists", b.Name))
return
}
err := localRepoCollection.Add(repo)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, err)
return
}
c.JSON(http.StatusCreated, repo)
} }
type reposEditParams struct { type reposEditParams struct {
@@ -203,7 +180,7 @@ type reposEditParams struct {
Comment *string ` json:"Comment" example:"example repo"` Comment *string ` json:"Comment" example:"example repo"`
// Change Default Distribution for publishing // Change Default Distribution for publishing
DefaultDistribution *string ` json:"DefaultDistribution" example:""` DefaultDistribution *string ` json:"DefaultDistribution" example:""`
// Change Default Component for publishing // Change Devault Component for publishing
DefaultComponent *string ` json:"DefaultComponent" example:""` DefaultComponent *string ` json:"DefaultComponent" example:""`
} }
@@ -224,8 +201,6 @@ func apiReposEdit(c *gin.Context) {
return return
} }
// Load shallowly for 404 check and resource key.
// Mutation and duplicate check happen inside the task for atomicity.
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection() collection := collectionFactory.LocalRepoCollection()
@@ -236,47 +211,32 @@ func apiReposEdit(c *gin.Context) {
return return
} }
resources := []string{string(repo.Key())} if b.Name != nil && *b.Name != name {
taskName := fmt.Sprintf("Edit repository %s", name) _, err := collection.ByName(*b.Name)
if err == nil {
// already exists
AbortWithJSONError(c, 404, fmt.Errorf("local repo with name %q already exists", *b.Name))
return
}
repo.Name = *b.Name
}
if b.Comment != nil {
repo.Comment = *b.Comment
}
if b.DefaultDistribution != nil {
repo.DefaultDistribution = *b.DefaultDistribution
}
if b.DefaultComponent != nil {
repo.DefaultComponent = *b.DefaultComponent
}
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { err = collection.Update(repo)
// Task: Create fresh collection inside task after lock if err != nil {
taskCollectionFactory := context.NewCollectionFactory() AbortWithJSONError(c, 500, err)
taskCollection := taskCollectionFactory.LocalRepoCollection() return
}
// Fresh load after lock acquired c.JSON(200, repo)
repo, err := taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err
}
// Check and update ATOMIC (inside lock)
if b.Name != nil && *b.Name != name {
_, err := taskCollection.ByName(*b.Name)
if err == nil {
// already exists
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil},
fmt.Errorf("local repo with name %q already exists", *b.Name)
}
repo.Name = *b.Name
}
if b.Comment != nil {
repo.Comment = *b.Comment
}
if b.DefaultDistribution != nil {
repo.DefaultDistribution = *b.DefaultDistribution
}
if b.DefaultComponent != nil {
repo.DefaultComponent = *b.DefaultComponent
}
err = taskCollection.Update(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
return &task.ProcessReturnValue{Code: http.StatusOK, Value: repo}, nil
})
} }
// GET /api/repos/:name // GET /api/repos/:name
@@ -318,10 +278,10 @@ func apiReposDrop(c *gin.Context) {
force := c.Request.URL.Query().Get("force") == "1" force := c.Request.URL.Query().Get("force") == "1"
name := c.Params.ByName("name") name := c.Params.ByName("name")
// Load shallowly for 404 check, resource key, and task name.
// Full checks (published/snapshots) happen inside the task.
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection() collection := collectionFactory.LocalRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
publishedCollection := collectionFactory.PublishedRepoCollection()
repo, err := collection.ByName(name) repo, err := collection.ByName(name)
if err != nil { if err != nil {
@@ -332,32 +292,19 @@ func apiReposDrop(c *gin.Context) {
resources := []string{string(repo.Key())} resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Delete repo %s", name) taskName := fmt.Sprintf("Delete repo %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Task: Create fresh collections inside task after lock acquired published := publishedCollection.ByLocalRepo(repo)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.LocalRepoCollection()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection()
// Re-read repo with fresh collection after lock
repo, err := taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: %s", err)
}
// Check with fresh collections
published := taskPublishedCollection.ByLocalRepo(repo)
if len(published) > 0 { if len(published) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo is published") return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo is published")
} }
if !force { if !force {
snapshots := taskSnapshotCollection.ByLocalRepoSource(repo) snapshots := snapshotCollection.ByLocalRepoSource(repo)
if len(snapshots) > 0 { if len(snapshots) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override") return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override")
} }
} }
return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, taskCollection.Drop(repo) return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, collection.Drop(repo)
}) })
} }
@@ -414,13 +361,10 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
return return
} }
// Load shallowly for 404 check and resource key.
// Full load and mutations happen inside the task.
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection() collection := collectionFactory.LocalRepoCollection()
name := c.Params.ByName("name") repo, err := collection.ByName(c.Params.ByName("name"))
repo, err := collection.ByName(name)
if err != nil { if err != nil {
AbortWithJSONError(c, 404, err) AbortWithJSONError(c, 404, err)
return return
@@ -429,23 +373,13 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
resources := []string{string(repo.Key())} resources := []string{string(repo.Key())}
maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Task: Create fresh factory and collection inside task after lock err = collection.LoadComplete(repo)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.LocalRepoCollection()
// Fresh load after lock acquired (use captured `name` variable, not gin context)
repo, err := taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err
}
err = taskCollection.LoadComplete(repo)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
} }
out.Printf("Loading packages...\n") out.Printf("Loading packages...\n")
list, err := deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil) list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
} }
@@ -454,7 +388,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
for _, ref := range b.PackageRefs { for _, ref := range b.PackageRefs {
var p *deb.Package var p *deb.Package
p, err = taskCollectionFactory.PackageCollection().ByKey([]byte(ref)) p, err = collectionFactory.PackageCollection().ByKey([]byte(ref))
if err != nil { if err != nil {
if err == database.ErrNotFound { if err == database.ErrNotFound {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("packages %s: %s", ref, err) return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("packages %s: %s", ref, err)
@@ -470,7 +404,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list)) repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
err = taskCollection.Update(repo) err = collectionFactory.LocalRepoCollection().Update(repo)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
} }
@@ -577,8 +511,6 @@ func apiReposPackageFromDir(c *gin.Context) {
return return
} }
// Load shallowly for 404 check and resource key.
// Full load and mutations happen inside the task.
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection() collection := collectionFactory.LocalRepoCollection()
@@ -602,17 +534,7 @@ func apiReposPackageFromDir(c *gin.Context) {
resources := []string{string(repo.Key())} resources := []string{string(repo.Key())}
resources = append(resources, sources...) resources = append(resources, sources...)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Task: Create fresh factory and collection inside task after lock err = collection.LoadComplete(repo)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.LocalRepoCollection()
// Fresh load after lock acquired
repo, err := taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = taskCollection.LoadComplete(repo)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
} }
@@ -633,13 +555,13 @@ func apiReposPackageFromDir(c *gin.Context) {
packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter) packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter)
list, err = deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil) list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages: %s", err)
} }
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(), processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
taskCollectionFactory.PackageCollection(), reporter, nil, taskCollectionFactory.ChecksumCollection) collectionFactory.PackageCollection(), reporter, nil, collectionFactory.ChecksumCollection)
failedFiles = append(failedFiles, failedFiles2...) failedFiles = append(failedFiles, failedFiles2...)
processedFiles = append(processedFiles, otherFiles...) processedFiles = append(processedFiles, otherFiles...)
@@ -649,7 +571,7 @@ func apiReposPackageFromDir(c *gin.Context) {
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list)) repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
err = taskCollection.Update(repo) err = collectionFactory.LocalRepoCollection().Update(repo)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
} }
@@ -728,8 +650,6 @@ func apiReposCopyPackage(c *gin.Context) {
return return
} }
// Load shallowly for 404 check and resource keys.
// Full load and mutations happen inside the task.
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
dstRepo, err := collectionFactory.LocalRepoCollection().ByName(dstRepoName) dstRepo, err := collectionFactory.LocalRepoCollection().ByName(dstRepoName)
if err != nil { if err != nil {
@@ -753,26 +673,12 @@ func apiReposCopyPackage(c *gin.Context) {
resources := []string{string(dstRepo.Key()), string(srcRepo.Key())} resources := []string{string(dstRepo.Key()), string(srcRepo.Key())}
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Task: Create fresh factory and collections inside task after lock err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
taskCollectionFactory := context.NewCollectionFactory()
// Fresh load of both repos after lock acquired
dstRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(dstRepoName)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err) return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
} }
srcRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(srcRepoName) err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
}
err = taskCollectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
}
err = taskCollectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err) return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
} }
@@ -785,12 +691,12 @@ func apiReposCopyPackage(c *gin.Context) {
RemovedLines: []string{}, RemovedLines: []string{},
} }
dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), taskCollectionFactory.PackageCollection(), context.Progress()) dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), collectionFactory.PackageCollection(), context.Progress())
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in dest: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in dest: %s", err)
} }
srcList, err := deb.NewPackageListFromRefList(srcRefList, taskCollectionFactory.PackageCollection(), context.Progress()) srcList, err := deb.NewPackageListFromRefList(srcRefList, collectionFactory.PackageCollection(), context.Progress())
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in src: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in src: %s", err)
} }
@@ -858,7 +764,7 @@ func apiReposCopyPackage(c *gin.Context) {
} else { } else {
dstRepo.UpdateRefList(deb.NewPackageRefListFromPackageList(dstList)) dstRepo.UpdateRefList(deb.NewPackageRefListFromPackageList(dstList))
err = taskCollectionFactory.LocalRepoCollection().Update(dstRepo) err = collectionFactory.LocalRepoCollection().Update(dstRepo)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
} }
@@ -961,9 +867,6 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
resources = append(resources, sources...) resources = append(resources, sources...)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Task: Create fresh factory and collection inside task after lock
taskCollectionFactory := context.NewCollectionFactory()
var ( var (
err error err error
verifier = context.GetVerifier() verifier = context.GetVerifier()
@@ -979,8 +882,8 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter) changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter)
_, failedFiles2, err = deb.ImportChangesFiles( _, failedFiles2, err = deb.ImportChangesFiles(
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier, changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
repoTemplate, context.Progress(), taskCollectionFactory.LocalRepoCollection(), taskCollectionFactory.PackageCollection(), repoTemplate, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(),
context.PackagePool(), taskCollectionFactory.ChecksumCollection, nil, query.Parse) context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse)
failedFiles = append(failedFiles, failedFiles2...) failedFiles = append(failedFiles, failedFiles2...)
if err != nil { if err != nil {
+35 -107
View File
@@ -165,7 +165,6 @@ func apiSnapshotsCreate(c *gin.Context) {
} }
} }
// Phase 1: Pre-task validation (shallow load for 404 checks only)
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
snapshotCollection := collectionFactory.SnapshotCollection() snapshotCollection := collectionFactory.SnapshotCollection()
var resources []string var resources []string
@@ -183,20 +182,8 @@ func apiSnapshotsCreate(c *gin.Context) {
} }
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Phase 2: Inside task lock - create fresh factory for i := range sources {
taskCollectionFactory := context.NewCollectionFactory() err = snapshotCollection.LoadComplete(sources[i])
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
taskPackageCollection := taskCollectionFactory.PackageCollection()
// Fresh load of all sources after lock acquired
freshSources := make([]*deb.Snapshot, len(b.SourceSnapshots))
for i := range b.SourceSnapshots {
freshSources[i], err = taskSnapshotCollection.ByName(b.SourceSnapshots[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
// LoadComplete on fresh copy
err = taskSnapshotCollection.LoadComplete(freshSources[i])
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
} }
@@ -204,9 +191,9 @@ func apiSnapshotsCreate(c *gin.Context) {
list := deb.NewPackageList() list := deb.NewPackageList()
// verify package refs and build package list using fresh factory // verify package refs and build package list
for _, ref := range b.PackageRefs { for _, ref := range b.PackageRefs {
p, err := taskPackageCollection.ByKey([]byte(ref)) p, err := collectionFactory.PackageCollection().ByKey([]byte(ref))
if err != nil { if err != nil {
if err == database.ErrNotFound { if err == database.ErrNotFound {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err) return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err)
@@ -219,9 +206,9 @@ func apiSnapshotsCreate(c *gin.Context) {
} }
} }
snapshot = deb.NewSnapshotFromRefList(b.Name, freshSources, deb.NewPackageRefListFromPackageList(list), b.Description) snapshot = deb.NewSnapshotFromRefList(b.Name, sources, deb.NewPackageRefListFromPackageList(list), b.Description)
err = taskSnapshotCollection.Add(snapshot) err = snapshotCollection.Add(snapshot)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
} }
@@ -328,7 +315,6 @@ func apiSnapshotsUpdate(c *gin.Context) {
return return
} }
// Phase 1: Pre-task validation (shallow load for 404 check only)
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.SnapshotCollection() collection := collectionFactory.SnapshotCollection()
name := c.Params.ByName("name") name := c.Params.ByName("name")
@@ -339,38 +325,14 @@ func apiSnapshotsUpdate(c *gin.Context) {
return return
} }
// Pre-task validation of new name if provided (skip if renaming to same name)
if b.Name != "" && b.Name != name {
_, err = collection.ByName(b.Name)
if err == nil {
AbortWithJSONError(c, 409, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name))
return
}
}
resources := []string{string(snapshot.ResourceKey()), "S" + b.Name} resources := []string{string(snapshot.ResourceKey()), "S" + b.Name}
taskName := fmt.Sprintf("Update snapshot %s", name) taskName := fmt.Sprintf("Update snapshot %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Phase 2: Inside task lock - create fresh factory _, err := collection.ByName(b.Name)
taskCollectionFactory := context.NewCollectionFactory() if err == nil {
taskCollection := taskCollectionFactory.SnapshotCollection() return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
// Fresh load after lock acquired
snapshot, err = taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
} }
// Fresh duplicate check inside lock
if b.Name != "" {
_, err := taskCollection.ByName(b.Name)
if err == nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
}
}
// Update fresh copy
if b.Name != "" { if b.Name != "" {
snapshot.Name = b.Name snapshot.Name = b.Name
} }
@@ -379,7 +341,7 @@ func apiSnapshotsUpdate(c *gin.Context) {
snapshot.Description = b.Description snapshot.Description = b.Description
} }
err = taskCollection.Update(snapshot) err = collectionFactory.SnapshotCollection().Update(snapshot)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
} }
@@ -433,9 +395,9 @@ func apiSnapshotsDrop(c *gin.Context) {
name := c.Params.ByName("name") name := c.Params.ByName("name")
force := c.Request.URL.Query().Get("force") == "1" force := c.Request.URL.Query().Get("force") == "1"
// Phase 1: Pre-task validation (shallow load for 404 check only)
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
snapshotCollection := collectionFactory.SnapshotCollection() snapshotCollection := collectionFactory.SnapshotCollection()
publishedCollection := collectionFactory.PublishedRepoCollection()
snapshot, err := snapshotCollection.ByName(name) snapshot, err := snapshotCollection.ByName(name)
if err != nil { if err != nil {
@@ -445,35 +407,21 @@ func apiSnapshotsDrop(c *gin.Context) {
resources := []string{string(snapshot.ResourceKey())} resources := []string{string(snapshot.ResourceKey())}
taskName := fmt.Sprintf("Delete snapshot %s", name) taskName := fmt.Sprintf("Delete snapshot %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Phase 2: Inside task lock - create fresh collections published := publishedCollection.BySnapshot(snapshot)
taskCollectionFactory := context.NewCollectionFactory()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection()
// Fresh load after lock acquired
snapshot, err := taskSnapshotCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
// Fresh checks with current collections
published := taskPublishedCollection.BySnapshot(snapshot)
if len(published) > 0 { if len(published) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: snapshot is published") return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: snapshot is published")
} }
if !force { if !force {
// Using fresh collection for dependency check snapshots := snapshotCollection.BySnapshotSource(snapshot)
snapshots := taskSnapshotCollection.BySnapshotSource(snapshot)
if len(snapshots) > 0 { if len(snapshots) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override") return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override")
} }
} }
err = taskSnapshotCollection.Drop(snapshot) err = snapshotCollection.Drop(snapshot)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
} }
@@ -628,7 +576,6 @@ func apiSnapshotsMerge(c *gin.Context) {
return return
} }
// Phase 1: Pre-task validation (shallow load for 404 checks only)
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
snapshotCollection := collectionFactory.SnapshotCollection() snapshotCollection := collectionFactory.SnapshotCollection()
@@ -645,43 +592,32 @@ func apiSnapshotsMerge(c *gin.Context) {
} }
maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Phase 2: Inside task lock - create fresh factory err = snapshotCollection.LoadComplete(sources[0])
taskCollectionFactory := context.NewCollectionFactory() if err != nil {
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
// Fresh load of all sources inside task
freshSources := make([]*deb.Snapshot, len(body.Sources))
for i := range body.Sources {
freshSources[i], err = taskSnapshotCollection.ByName(body.Sources[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
// LoadComplete on fresh copy
err = taskSnapshotCollection.LoadComplete(freshSources[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
} }
result := sources[0].RefList()
// Merge using fresh sources for i := 1; i < len(sources); i++ {
result := freshSources[0].RefList() err = snapshotCollection.LoadComplete(sources[i])
for i := 1; i < len(freshSources); i++ { if err != nil {
result = result.Merge(freshSources[i].RefList(), overrideMatching, false) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
result = result.Merge(sources[i].RefList(), overrideMatching, false)
} }
if latest { if latest {
result.FilterLatestRefs() result.FilterLatestRefs()
} }
sourceDescription := make([]string, len(freshSources)) sourceDescription := make([]string, len(sources))
for i, s := range freshSources { for i, s := range sources {
sourceDescription[i] = fmt.Sprintf("'%s'", s.Name) sourceDescription[i] = fmt.Sprintf("'%s'", s.Name)
} }
snapshot = deb.NewSnapshotFromRefList(name, freshSources, result, snapshot = deb.NewSnapshotFromRefList(name, sources, result,
fmt.Sprintf("Merged from sources: %s", strings.Join(sourceDescription, ", "))) fmt.Sprintf("Merged from sources: %s", strings.Join(sourceDescription, ", ")))
err = taskCollectionFactory.SnapshotCollection().Add(snapshot) err = collectionFactory.SnapshotCollection().Add(snapshot)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create snapshot: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create snapshot: %s", err)
} }
@@ -765,29 +701,21 @@ func apiSnapshotsPull(c *gin.Context) {
resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())} resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())}
taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination) taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Phase 2: Inside task lock - create fresh factory err = collectionFactory.SnapshotCollection().LoadComplete(toSnapshot)
taskCollectionFactory := context.NewCollectionFactory()
// Fresh load of snapshots after lock acquired
freshToSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(name)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
} }
freshSourceSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(body.Source) err = collectionFactory.SnapshotCollection().LoadComplete(sourceSnapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = taskCollectionFactory.SnapshotCollection().LoadComplete(freshSourceSnapshot)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
} }
// convert snapshots to package list // convert snapshots to package list
toPackageList, err := deb.NewPackageListFromRefList(freshToSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress()) toPackageList, err := deb.NewPackageListFromRefList(toSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
} }
sourcePackageList, err := deb.NewPackageListFromRefList(freshSourceSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress()) sourcePackageList, err := deb.NewPackageListFromRefList(sourceSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
} }
@@ -884,10 +812,10 @@ func apiSnapshotsPull(c *gin.Context) {
} }
// Create <destination> snapshot // Create <destination> snapshot
destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{freshToSnapshot, freshSourceSnapshot}, toPackageList, destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{toSnapshot, sourceSnapshot}, toPackageList,
fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", freshToSnapshot.Name, freshSourceSnapshot.Name, strings.Join(body.Queries, ", "))) fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", toSnapshot.Name, sourceSnapshot.Name, strings.Join(body.Queries, ", ")))
err = taskCollectionFactory.SnapshotCollection().Add(destinationSnapshot) err = collectionFactory.SnapshotCollection().Add(destinationSnapshot)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
} }
+1 -2
View File
@@ -168,8 +168,6 @@ func (collection *LocalRepoCollection) Update(repo *LocalRepo) error {
// LoadComplete loads additional information for local repo // LoadComplete loads additional information for local repo
func (collection *LocalRepoCollection) LoadComplete(repo *LocalRepo) error { func (collection *LocalRepoCollection) LoadComplete(repo *LocalRepo) error {
repo.packageRefs = &PackageRefList{}
encoded, err := collection.db.Get(repo.RefKey()) encoded, err := collection.db.Get(repo.RefKey())
if err == database.ErrNotFound { if err == database.ErrNotFound {
return nil return nil
@@ -178,6 +176,7 @@ func (collection *LocalRepoCollection) LoadComplete(repo *LocalRepo) error {
return err return err
} }
repo.packageRefs = &PackageRefList{}
return repo.packageRefs.Decode(encoded) return repo.packageRefs.Decode(encoded)
} }
-12
View File
@@ -133,18 +133,6 @@ func (s *LocalRepoCollectionSuite) TestByUUID(c *C) {
c.Assert(r.String(), Equals, repo.String()) c.Assert(r.String(), Equals, repo.String())
} }
func (s *LocalRepoCollectionSuite) TestLoadCompleteNoRefKey(c *C) {
repo := NewLocalRepo("local1", "Comment 1")
c.Assert(s.collection.Update(repo), IsNil)
r, err := s.collection.ByName("local1")
c.Assert(err, IsNil)
c.Assert(s.collection.LoadComplete(r), IsNil)
c.Assert(r.packageRefs, NotNil)
c.Assert(r.NumPackages(), Equals, 0)
}
func (s *LocalRepoCollectionSuite) TestUpdateLoadComplete(c *C) { func (s *LocalRepoCollectionSuite) TestUpdateLoadComplete(c *C) {
repo := NewLocalRepo("local1", "Comment 1") repo := NewLocalRepo("local1", "Comment 1")
c.Assert(s.collection.Update(repo), IsNil) c.Assert(s.collection.Update(repo), IsNil)
+6 -1
View File
@@ -609,7 +609,12 @@ func (p *PublishedRepo) StoragePrefix() string {
// Key returns unique key identifying PublishedRepo // Key returns unique key identifying PublishedRepo
func (p *PublishedRepo) Key() []byte { func (p *PublishedRepo) Key() []byte {
return []byte("U" + p.StoragePrefix() + ">>" + p.Distribution) if p.MultiDist {
// do not lock Distribution in MultiDist
return []byte("UM" + p.StoragePrefix())
} else {
return []byte("U" + p.StoragePrefix() + ">>" + p.Distribution)
}
} }
// RefKey is a unique id for package reference list // RefKey is a unique id for package reference list
-3
View File
@@ -79,9 +79,6 @@ func (l *PackageRefList) Decode(input []byte) error {
// ForEach calls handler for each package ref in list // ForEach calls handler for each package ref in list
func (l *PackageRefList) ForEach(handler func([]byte) error) error { func (l *PackageRefList) ForEach(handler func([]byte) error) error {
if l == nil {
return nil
}
var err error var err error
for _, p := range l.Refs { for _, p := range l.Refs {
err = handler(p) err = handler(p)
-11
View File
@@ -130,17 +130,6 @@ func (s *PackageRefListSuite) TestPackageRefListForeach(c *C) {
c.Check(err, Equals, e) c.Check(err, Equals, e)
} }
func (s *PackageRefListSuite) TestForEachNilList(c *C) {
var l *PackageRefList
called := false
err := l.ForEach(func([]byte) error {
called = true
return nil
})
c.Assert(err, IsNil)
c.Assert(called, Equals, false)
}
func (s *PackageRefListSuite) TestHas(c *C) { func (s *PackageRefListSuite) TestHas(c *C) {
_ = s.list.Add(s.p1) _ = s.list.Add(s.p1)
_ = s.list.Add(s.p3) _ = s.list.Add(s.p3)
+1 -1
View File
@@ -2,7 +2,7 @@
<div> <div>
In order to add debian package files to a local repository, files are first uploaded to a temporary directory. In order to add debian package files to a local repository, files are first uploaded to a temporary directory.
Then the directory (or a specific file within) is added to a repository. After adding to a repository, the directory resp. files are removed bt default. Then the directory (or a specific file within) is added to a repository. After adding to a repositorty, the directory resp. files are removed bt default.
All uploaded files are stored under `<rootDir>/upload/<tempdir>` directory. All uploaded files are stored under `<rootDir>/upload/<tempdir>` directory.
+1 -1
View File
@@ -1,5 +1,5 @@
# Search Package Collection # Search Package Collection
<div> <div>
Perform operations on the whole collection of packages in aptly database. Perform operations on the whole collection of packages in apty database.
</div> </div>
+1 -1
View File
@@ -35,6 +35,6 @@ aptly publish repo my-repo --gpg-key=KEY_ID_a --gpg-key=KEY_ID_b
#### Parameters #### Parameters
Publish APIs use following convention to identify published repositories: `/api/publish/:prefix/:distribution`. `:distribution` is distribution name, while `:prefix` is `[<storage>:]<prefix>` (storage is optional, it defaults to empty string), if publishing prefix contains slashes `/`, they should be replaced with underscores (`_`) and underscores Publish APIs use following convention to identify published repositories: `/api/publish/:prefix/:distribution`. `:distribution` is distribution name, while `:prefix` is `[<storage>:]<prefix>` (storage is optional, it defaults to empty string), if publishing prefix contains slashes `/`, they should be replaced with underscores (`_`) and underscores
should be replaced with double underscore (`__`). To specify root `:prefix`, use `:.`, as `.` is ambiguous in URLs. should be replaced with double underscore (`__`). To specify root `:prefix`, use `:.`, as `.` is ambigious in URLs.
</div> </div>
+1 -1
View File
@@ -1,6 +1,6 @@
# Manage Local Repositories # Manage Local Repositories
<div> <div>
A local repository is a collection of versioned packages (usually custom packages created internally). A local repository is a collection of versionned packages (usually custom packages created internally).
Packages can be added, removed, moved or copied between repos. Packages can be added, removed, moved or copied between repos.
+1 -1
View File
@@ -25,7 +25,7 @@ class APITest(BaseTest):
""" """
aptly_server = None aptly_server = None
aptly_out = None aptly_out = None
debugOutput = True debugOutput = False # Controlled by --debug flag in run.py
base_url = "127.0.0.1:8765" base_url = "127.0.0.1:8765"
configOverride = { configOverride = {
"FileSystemPublishEndpoints": { "FileSystemPublishEndpoints": {
+2 -2
View File
@@ -164,10 +164,10 @@ class BaseTest(object):
self.run() self.run()
self.check() self.check()
except Exception as exc: except Exception as exc:
if self.debugOutput:
print(f"API log:\n{self.debug_output()}")
raise exc raise exc
finally: finally:
if self.debugOutput:
print(f"API log:\n{self.debug_output()}")
self.teardown() self.teardown()
def prepare_remove_all(self): def prepare_remove_all(self):
+9 -2
View File
@@ -36,7 +36,7 @@ def natural_key(string_):
return [int(s) if s.isdigit() else s for s in re.split(r'(\d+)', string_)] return [int(s) if s.isdigit() else s for s in re.split(r'(\d+)', string_)]
def run(include_long_tests=False, capture_results=False, tests=None, filters=None, coverage_dir=None, coverage_skip=False): def run(include_long_tests=False, capture_results=False, tests=None, filters=None, coverage_dir=None, coverage_skip=False, debug=False):
""" """
Run system test. Run system test.
""" """
@@ -50,6 +50,9 @@ def run(include_long_tests=False, capture_results=False, tests=None, filters=Non
if not coverage_dir and not coverage_skip: if not coverage_dir and not coverage_skip:
coverage_dir = mkdtemp(suffix="aptly-coverage") coverage_dir = mkdtemp(suffix="aptly-coverage")
# Set debug output globally for all test classes
BaseTest.debugOutput = debug
failed = False failed = False
for test in tests: for test in tests:
orig_stdout = sys.stdout orig_stdout = sys.stdout
@@ -155,6 +158,7 @@ def run(include_long_tests=False, capture_results=False, tests=None, filters=Non
traceback.print_exception(typ, val, tb, file=orig_stdout) traceback.print_exception(typ, val, tb, file=orig_stdout)
else: else:
orig_stdout.write(colored("\b\b\b\bOK", color="green", attrs=["bold"]) + f" {duration}\n") orig_stdout.write(colored("\b\b\b\bOK", color="green", attrs=["bold"]) + f" {duration}\n")
orig_stdout.write(testout.get_contents())
t.shutdown() t.shutdown()
@@ -214,6 +218,7 @@ if __name__ == "__main__":
capture_results = False capture_results = False
coverage_dir = None coverage_dir = None
coverage_skip = False coverage_skip = False
debug = False
tests = None tests = None
args = sys.argv[1:] args = sys.argv[1:]
@@ -227,6 +232,8 @@ if __name__ == "__main__":
args = args[1:] args = args[1:]
elif args[0] == "--coverage-skip": elif args[0] == "--coverage-skip":
coverage_skip = True coverage_skip = True
elif args[0] == "--debug":
debug = True
args = args[1:] args = args[1:]
@@ -239,4 +246,4 @@ if __name__ == "__main__":
else: else:
filters.append(arg) filters.append(arg)
run(include_long_tests, capture_results, tests, filters, coverage_dir, coverage_skip) run(include_long_tests, capture_results, tests, filters, coverage_dir, coverage_skip, debug)
+226
View File
@@ -992,6 +992,232 @@ class PublishSwitchAPITestRepo(APITest):
self.check_not_exists("public/" + prefix + "dists/") self.check_not_exists("public/" + prefix + "dists/")
class PublishSwitchAPITestMirror(APITest):
"""
PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution
"""
fixtureGpg = True
def check(self):
mirror_name = self.random_name()
mirror_desc = {'Name': mirror_name,
'ArchiveURL': 'http://repo.aptly.info/system-tests/packagecloud.io/varnishcache/varnish30/debian/',
'Distribution': 'wheezy',
'Keyrings': ["aptlytest.gpg"],
'Architectures': ["amd64"],
'Components': ['main']}
mirror_desc['IgnoreSignatures'] = True
# Create Mirror
resp = self.post("/api/mirrors", json=mirror_desc)
self.check_equal(resp.status_code, 201)
# Get Mirror
resp = self.get("/api/mirrors/" + mirror_name + "/packages")
self.check_equal(resp.status_code, 404)
# Update Mirror
resp = self.put_task("/api/mirrors/" + mirror_name, json=mirror_desc)
self.check_task(resp)
# Snapshot Mirror
snapshot1_name = self.random_name()
task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot1_name})
self.check_task(task)
# Publish Snapshot
prefix = self.random_name()
task = self.post_task(
"/api/publish/" + prefix,
json={
"Architectures": ["i386", "source"],
"SourceKind": "snapshot",
"Sources": [{"Name": snapshot1_name}],
"Signing": DefaultSigningOptions,
})
self.check_task(task)
repo_expected = {
'AcquireByHash': False,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': '',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Origin': 'packagecloud.io/varnishcache/varnish30',
'Version': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SignedBy': '',
'SkipContents': False,
'MultiDist': False,
'SourceKind': 'snapshot',
'Sources': [{'Component': 'main', 'Name': snapshot1_name}],
'Storage': '',
'Suite': ''}
all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
# Snapshot Mirror 2
snapshot2_name = self.random_name()
task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot2_name})
self.check_task(task)
task = self.put_task(
"/api/publish/" + prefix + "/wheezy",
json={
"Snapshots": [{"Component": "main", "Name": snapshot2_name}],
"Signing": DefaultSigningOptions,
"SkipContents": True,
"Label": "fun",
"Origin": "earth",
"Version": "13.3",
})
self.check_task(task)
repo_expected = {
'AcquireByHash': False,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': 'fun',
'Origin': 'earth',
'Version': '13.3',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SignedBy': '',
'SkipContents': True,
'MultiDist': False,
'SourceKind': 'snapshot',
'Sources': [{'Component': 'main', 'Name': snapshot2_name}],
'Storage': '',
'Suite': ''}
all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
task = self.delete_task("/api/publish/" + prefix + "/wheezy")
self.check_task(task)
self.check_not_exists("public/" + prefix + "dists/")
class PublishSwitchAPITestSnapshot(APITest):
"""
publish snapshot of snapshot
"""
fixtureGpg = True
def check(self):
repo_name = self.random_name()
self.check_equal(self.post(
"/api/repos", json={"Name": repo_name, "DefaultDistribution": "wheezy"}).status_code, 201)
d = self.random_name()
self.check_equal(
self.upload("/api/files/" + d,
"pyspi_0.6.1-1.3.dsc",
"pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz",
"pyspi-0.6.1-1.3.stripped.dsc").status_code, 200)
task = self.post_task("/api/repos/" + repo_name + "/file/" + d)
self.check_task(task)
snapshot1_name = self.random_name()
task = self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot1_name})
self.check_task(task)
prefix = self.random_name()
task = self.post_task(
"/api/publish/" + prefix,
json={
"Architectures": ["i386", "source"],
"SourceKind": "snapshot",
"Sources": [{"Name": snapshot1_name}],
"Signing": DefaultSigningOptions,
})
self.check_task(task)
repo_expected = {
'AcquireByHash': False,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': '',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Origin': '',
'Version': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SignedBy': '',
'SkipContents': False,
'MultiDist': False,
'SourceKind': 'snapshot',
'Sources': [{'Component': 'main', 'Name': snapshot1_name}],
'Storage': '',
'Suite': ''}
all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
self.check_not_exists(
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
self.check_exists("public/" + prefix +
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
snapshot2_name = self.random_name()
task = self.post_task("/api/snapshots", json={"Name": snapshot2_name, 'SourceSnapshots': [snapshot1_name]})
self.check_task(task)
task = self.put_task(
"/api/publish/" + prefix + "/wheezy",
json={
"Snapshots": [{"Component": "main", "Name": snapshot2_name}],
"Signing": DefaultSigningOptions,
"SkipContents": True,
"Label": "fun",
"Origin": "earth",
"Version": "13.3",
})
self.check_task(task)
repo_expected = {
'AcquireByHash': False,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': 'fun',
'Origin': 'earth',
'Version': '13.3',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SignedBy': '',
'SkipContents': True,
'MultiDist': False,
'SourceKind': 'snapshot',
'Sources': [{'Component': 'main', 'Name': snapshot2_name}],
'Storage': '',
'Suite': ''}
all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
# FIXME: what should exist here ? publish snapshot of snapshot
self.check_not_exists(
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
self.check_not_exists("public/" + prefix +
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
task = self.delete_task("/api/publish/" + prefix + "/wheezy")
self.check_task(task)
self.check_not_exists("public/" + prefix + "dists/")
class PublishSwitchAPITestRepoSignedBy(APITest): class PublishSwitchAPITestRepoSignedBy(APITest):
""" """
PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution
-31
View File
@@ -461,34 +461,3 @@ class ReposAPITestCopyPackage(APITest):
self.check_equal(self.get(f"/api/repos/{repo2_name}/packages").json(), self.check_equal(self.get(f"/api/repos/{repo2_name}/packages").json(),
['Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378']) ['Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378'])
class ReposAPITestCreateEdit(APITest):
"""
POST /api/repos,
"""
def check(self):
repo_name = self.random_name() + ' with space'
repo_desc = {'Comment': 'fun repo',
'DefaultComponent': 'contrib',
'DefaultDistribution': 'bookworm',
'Name': repo_name}
resp = self.post("/api/repos", json=repo_desc)
self.check_equal(resp.json(), repo_desc)
self.check_equal(resp.status_code, 201)
repo_desc = {'Comment': 'modified repo',
'DefaultComponent': 'main',
'DefaultDistribution': 'trixie',
'Name': repo_name + '@renamed'}
resp = self.put(f"/api/repos/{repo_name}", json=repo_desc)
self.check_equal(resp.json(), repo_desc)
self.check_equal(resp.status_code, 200)
resp = self.get("/api/repos/" + repo_name + '@renamed')
self.check_equal(resp.json(), repo_desc)
self.check_equal(resp.status_code, 200)
resp = self.delete("/api/repos/" + repo_name + '@renamed')
self.check_equal(resp.status_code, 200)
+1 -1
View File
@@ -14,7 +14,7 @@ class UnixSocketAPITest(BaseTest):
socket_path = "/tmp/_aptly_test.sock" socket_path = "/tmp/_aptly_test.sock"
base_url = ("unix://%s" % socket_path) base_url = ("unix://%s" % socket_path)
aptly_out = None aptly_out = None
debugOutput = True debugOutput = False # Controlled by --debug flag in run.py
def prepare(self): def prepare(self):
if self.aptly_server is None: if self.aptly_server is None:
+5
View File
@@ -65,6 +65,7 @@ func (list *List) consumer() {
task.State = SUCCEEDED task.State = SUCCEEDED
} }
fmt.Printf("RACE DEBUG: Task Done '%s', freeing %s\n", task.Name, task.resources)
list.usedResources.Free(task.resources) list.usedResources.Free(task.resources)
task.wgTask.Done() task.wgTask.Done()
@@ -77,6 +78,8 @@ func (list *List) consumer() {
blockingTasks := list.usedResources.UsedBy(t.resources) blockingTasks := list.usedResources.UsedBy(t.resources)
if len(blockingTasks) == 0 { if len(blockingTasks) == 0 {
list.usedResources.MarkInUse(t.resources, t) list.usedResources.MarkInUse(t.resources, t)
fmt.Printf("RACE DEBUG: Task Resuming '%s', locking %s\n", t.Name, t.resources)
// unlock list since queueing may block // unlock list since queueing may block
list.Unlock() list.Unlock()
unlocked = true unlocked = true
@@ -209,10 +212,12 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
tasks := list.usedResources.UsedBy(resources) tasks := list.usedResources.UsedBy(resources)
if len(tasks) == 0 { if len(tasks) == 0 {
list.usedResources.MarkInUse(task.resources, task) list.usedResources.MarkInUse(task.resources, task)
fmt.Printf("RACE DEBUG: Task Starting '%s', locking %s\n", name, resources)
// queueing task might block if channel not ready, unlock list before queueing // queueing task might block if channel not ready, unlock list before queueing
list.Unlock() list.Unlock()
list.queue <- task list.queue <- task
} else { } else {
fmt.Printf("RACE DEBUG: Task Queued '%s', waiting on %s\n", name, resources)
list.Unlock() list.Unlock()
} }