Compare commits

..

16 Commits

Author SHA1 Message Date
André Roth 68814ff1f0 docs: fix typo 2026-05-25 11:57:47 +02:00
André Roth d44ae522ac fix(publish): lock source repos/snapshots on publish update endpoint 2026-05-25 09:47:22 +00:00
André Roth 8f2b335409 fix(publish): lock source repos/snapshots on publish update switch
Affected endpoint: apiPublishUpdateSwitch (PUT /api/publish/{prefix}/{distribution}).

The handler registered only the published repo key as a task resource.
The underlying source repos (for local) or snapshots (for snapshot-based
published repos) were not locked.  Concurrent updates to a source repo
or snapshot while a publish-update/switch task was running could produce
inconsistent published indexes:

  Task A: apiPublishUpdateSwitch loads published, reads source repo/snapshot
  Request B: modifies same source repo or snapshot (add/remove packages, etc)
  Task A: Update() + Publish() reads stale/modified source -> inconsistent
           published index, or partial write if source deleted mid-task.

Fix: for SourceLocalRepo, iterate published.Sources (component -> source
UUID), look up each local repo via localRepoCollection.ByUUID and append
string(repo.Key()) to resources.  For SourceSnapshot, iterate b.Snapshots,
look up each snapshot via snapshotCollection.ByName and append
string(snapshot.ResourceKey()) to resources.  Task queue now serialises
against both the published repo and all its sources.
2026-05-25 10:36:21 +02:00
André Roth 9ecbc844e7 fix(publish): warn when distribution missing and resource key cannot be pre-registered
When b.Distribution is empty, the pre-registered resource key
U<storage>:<prefix>>><distribution> cannot be constructed, so
concurrent POST requests to the same prefix are not serialized by the
task queue.  Add a log warning so operators are aware of the gap.
2026-05-25 10:36:21 +02:00
André Roth 9e91ee4c4a fix(publish): reload published inside task for create/drop endpoints
Affected endpoints: apiPublishRepoOrSnapshot (POST /api/publish/{prefix}),
apiPublishDrop (DELETE /api/publish/{prefix}/{distribution}).

Both handlers used the outer-scoped collectionFactory and collection
variables inside the task closure.  These were captured before the task
lock was acquired, so under concurrent load each task operated on a
stale DB view:

  apiPublishRepoOrSnapshot:  snapshot/localRepo LoadComplete,
  NewPublishedRepo, CheckDuplicate, Publish, and collection.Add all
  used the pre-lock collectionFactory/collection.  Two concurrent
  POST to same prefix could both pass CheckDuplicate (neither sees
  the other in the stale DB view) and race on disk writes.

  apiPublishDrop:  collection.Remove used pre-lock collection,
  potentially racing with concurrent updates/other drops.

Fix: inside the task closure create a fresh taskCollectionFactory and
taskCollection.  All DB reads (LoadComplete) and writes
(CheckDuplicate, Add, Remove, Publish) now run against the authoritative
DB state after the lock is held.
2026-05-25 10:36:16 +02:00
André Roth b7969c7a2d fix(publish): reload published inside task for update/switch endpoints
Affected endpoints: apiPublishUpdateSwitch (PUT), apiPublishUpdate (POST).

Both handlers loaded the published repo and mutated scalar fields
(Label, Origin, SkipContents, SkipBz2, AcquireByHash, SignedBy,
MultiDist, Version) outside the task closure, before the lock was
acquired.  Inside the task, LoadComplete only refreshed sourceItems —
it did not reload scalar fields or the Revision.  Two concurrent
requests therefore each operated on a stale base:

  Request A loads published (Label="old"), sets Label="A"
  Request B loads published (Label="old"), sets Label="B"
  Task A runs: Update() + Publish() + collection.Update() -> saves Label="A"
  Task B runs: Update() on B's stale copy -> saves Label="B",
               silently discarding A's Label change and potentially
               reconciling a Revision built against the pre-A state.

Fix: remove all field mutations and the LoadComplete call from the HTTP
handler.  Inside the task, a fresh taskCollectionFactory is created, the
published repo is re-read via ByStoragePrefixDistribution + LoadComplete
(obtaining the current DB state after the lock is held), and then all
field mutations are applied before Update / Publish / collection.Update.
2026-05-23 13:54:50 +02:00
André Roth 2a5992c74e fix(publish): reload published inside task for source-management endpoints
Affected endpoints: apiPublishAddSource, apiPublishSetSources,
apiPublishUpdateSource, apiPublishRemoveSource, apiPublishDropChanges.

All five handlers shared the same flawed pattern: they loaded the
published repo from the DB and mutated it (ObtainRevision / DropRevision)
outside the task closure, before the task lock was acquired.  Each task
closure then just wrote back the already-mutated, pre-lock object.

Because the task queue serialises tasks that share a resource key, two
concurrent requests appear safe — but each task closure holds a stale
copy of the object captured before the lock was taken:

  Request A loads published: revision = {}
  Request B loads published: revision = {}   <- same DB state
  A mutates: revision = {main: snap1}
  B mutates: revision = {contrib: snap2}
  Task A runs: saves {main: snap1}           OK
  Task B runs: saves {contrib: snap2}        <- clobbers A's change

Fix: perform only a shallow ByStoragePrefixDistribution outside the task
(for the early 404 response, resource key, and task name).  Inside the
task closure a dedicated taskCollectionFactory is created, the published
repo is re-read fresh from the DB (after the lock is acquired), and
LoadComplete + all mutations + Update are executed against that
authoritative copy.
2026-05-23 13:54:50 +02:00
André Roth 2827620cfe fix(publish): pre-register published repo key before task submission
apiPublishRepoOrSnapshot appended published.Key() to resources inside
the task closure, after maybeRunTaskInBackground had already been called.
The task's locked-resource set is fixed at submission time, so that append
had no effect — the published repo key was never registered as a resource.

Two concurrent POST /api/publish/{prefix} requests for the same
prefix/distribution therefore did not conflict in the task queue: both
ran in parallel, each loaded an empty PublishedRepoCollection from the DB,
both passed CheckDuplicate, and the second Add silently overwrote the first.

Fix: compute the published repo key ("U{storagePrefix}>>{distribution}")
from the already-known storage/prefix/distribution values and append it to
resources before calling maybeRunTaskInBackground, so concurrent creates
for the same destination are serialised by the task queue.  The now-dead
append inside the closure is removed.
2026-05-23 13:54:50 +02:00
André Roth 8dc61cf362 ci: use correct ubuntu 26.04 codename 2026-05-17 10:16:01 +02:00
André Roth 4a9ddbdc34 Merge pull request #1565 from muresan/fix/aptly-crash-db-recover
Crash in aptly db recover
2026-05-15 16:51:51 +02:00
André Roth c316ea9b73 Merge pull request #1567 from aptly-dev/fix/doc-typos
docs: fix typos
2026-05-15 16:49:14 +02:00
André Roth d027a251ba Merge pull request #1571 from aptly-dev/feature/ubu26.04
ci: build for ubuntu 26.04
2026-05-15 16:48:55 +02:00
André Roth 16b6348710 ci: build for ubuntu 26.04 2026-05-15 00:05:35 +02:00
Catalin Muresan 1c1abe6b10 Added tests to please codeconv 2026-05-14 23:33:27 +02:00
Catalin Muresan c4bfbe52ca Fix crash in aptly db recover 2026-05-14 23:33:27 +02:00
André Roth c723fea807 docs: fix typos 2026-05-04 11:35:55 +02:00
22 changed files with 357 additions and 522 deletions
+4 -1
View File
@@ -155,7 +155,7 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
name: ["Debian 13/trixie", "Debian 12/bookworm", "Debian 11/bullseye", "Ubuntu 24.04", "Ubuntu 22.04", "Ubuntu 20.04"] name: ["Debian 13/trixie", "Debian 12/bookworm", "Debian 11/bullseye", "Ubuntu 26.04", "Ubuntu 24.04", "Ubuntu 22.04", "Ubuntu 20.04"]
arch: ["amd64", "i386" , "arm64" , "armhf"] arch: ["amd64", "i386" , "arm64" , "armhf"]
include: include:
- name: "Debian 13/trixie" - name: "Debian 13/trixie"
@@ -167,6 +167,9 @@ jobs:
- name: "Debian 11/bullseye" - name: "Debian 11/bullseye"
suite: bullseye suite: bullseye
image: debian:bullseye-slim image: debian:bullseye-slim
- name: "Ubuntu 26.04"
suite: resolute
image: ubuntu:26.04
- name: "Ubuntu 24.04" - name: "Ubuntu 24.04"
suite: noble suite: noble
image: ubuntu:24.04 image: ubuntu:24.04
+2 -2
View File
@@ -16,7 +16,7 @@ Please report unacceptable behavior on [https://github.com/aptly-dev/aptly/discu
### List of Repositories ### List of Repositories
* [aptly-dev/aptly](https://github.com/aptly-dev/aptly) - aptly source code, functional tests, man page * [aptly-dev/aptly](https://github.com/aptly-dev/aptly) - aptly source code, functional tests, man page
* [apty-dev/aptly-dev.github.io](https://github.com/aptly-dev/aptly-dev.github.io) - aptly website (https://www.aptly.info/) * [aptly-dev/aptly-dev.github.io](https://github.com/aptly-dev/aptly-dev.github.io) - aptly website (https://www.aptly.info/)
* [aptly-dev/aptly-fixture-db](https://github.com/aptly-dev/aptly-fixture-db) & [aptly-dev/aptly-fixture-pool](https://github.com/aptly-dev/aptly-fixture-pool) provide * [aptly-dev/aptly-fixture-db](https://github.com/aptly-dev/aptly-fixture-db) & [aptly-dev/aptly-fixture-pool](https://github.com/aptly-dev/aptly-fixture-pool) provide
fixtures for aptly functional tests fixtures for aptly functional tests
@@ -137,7 +137,7 @@ make docker-unit-tests
In order to run aptly system tests, enter the following: In order to run aptly system tests, enter the following:
``` ```
make docker-system-tests make docker-system-test
``` ```
#### Running golangci-lint #### Running golangci-lint
+3 -8
View File
@@ -29,11 +29,6 @@ ifeq ($(CAPTURE),1)
CAPTURE_ARG := --capture CAPTURE_ARG := --capture
endif endif
# export DEBUG=1 to enable debug output in system tests
ifeq ($(DEBUG),1)
DEBUG_ARG := --debug
endif
help: ## Print this help help: ## Print this help
@grep -E '^[a-zA-Z][a-zA-Z0-9_-]*:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' @grep -E '^[a-zA-Z][a-zA-Z0-9_-]*:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
@@ -126,7 +121,7 @@ system-test: prepare swagger etcd-install ## Run system tests
if [ ! -e ~/aptly-fixture-pool ]; then git clone https://github.com/aptly-dev/aptly-fixture-pool.git ~/aptly-fixture-pool/; fi if [ ! -e ~/aptly-fixture-pool ]; then git clone https://github.com/aptly-dev/aptly-fixture-pool.git ~/aptly-fixture-pool/; fi
test -f ~/etcd.db || (curl -o ~/etcd.db.xz http://repo.aptly.info/system-tests/etcd.db.xz && xz -d ~/etcd.db.xz) test -f ~/etcd.db || (curl -o ~/etcd.db.xz http://repo.aptly.info/system-tests/etcd.db.xz && xz -d ~/etcd.db.xz)
# Run system tests # Run system tests
PATH=$(BINPATH)/:$(PATH) FORCE_COLOR=1 $(PYTHON) system/run.py --long $(COVERAGE_ARG_TEST) $(CAPTURE_ARG) $(DEBUG_ARG) $(TEST) PATH=$(BINPATH)/:$(PATH) FORCE_COLOR=1 $(PYTHON) system/run.py --long $(COVERAGE_ARG_TEST) $(CAPTURE_ARG) $(TEST)
bench: bench:
@echo "\e[33m\e[1mRunning benchmark ...\e[0m" @echo "\e[33m\e[1mRunning benchmark ...\e[0m"
@@ -216,7 +211,7 @@ docker-system-test: ## Run system tests in docker container (add TEST=t04_mirro
AZURE_STORAGE_ACCESS_KEY="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" \ AZURE_STORAGE_ACCESS_KEY="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" \
AWS_ACCESS_KEY_ID=$(AWS_ACCESS_KEY_ID) \ AWS_ACCESS_KEY_ID=$(AWS_ACCESS_KEY_ID) \
AWS_SECRET_ACCESS_KEY=$(AWS_SECRET_ACCESS_KEY) \ AWS_SECRET_ACCESS_KEY=$(AWS_SECRET_ACCESS_KEY) \
system-test TEST=$(TEST) CAPTURE=$(CAPTURE) COVERAGE_SKIP=$(COVERAGE_SKIP) DEBUG=$(DEBUG) \ system-test TEST=$(TEST) CAPTURE=$(CAPTURE) COVERAGE_SKIP=$(COVERAGE_SKIP) \
azurite-stop azurite-stop
docker-serve: ## Run development server (auto recompiling) on http://localhost:3142 docker-serve: ## Run development server (auto recompiling) on http://localhost:3142
@@ -246,4 +241,4 @@ clean: ## remove local build and module cache
rm -f unit.out aptly.test VERSION docs/docs.go docs/swagger.json docs/swagger.yaml docs/swagger.conf rm -f unit.out aptly.test VERSION docs/docs.go docs/swagger.json docs/swagger.yaml docs/swagger.conf
find system/ -type d -name __pycache__ -exec rm -rf {} \; 2>/dev/null || true find system/ -type d -name __pycache__ -exec rm -rf {} \; 2>/dev/null || true
.PHONY: help man prepare swagger version binaries build docker-release docker-system-tests docker-unit-test docker-lint docker-build docker-image docker-man docker-shell docker-serve clean releasetype dpkg serve flake8 .PHONY: help man prepare swagger version binaries build docker-release docker-system-test docker-unit-test docker-lint docker-build docker-image docker-man docker-shell docker-serve clean releasetype dpkg serve flake8
+1 -1
View File
@@ -54,7 +54,7 @@ type gpgDeleteKeyParams struct {
// @Summary Add GPG Keys // @Summary Add GPG Keys
// @Description **Adds GPG keys to aptly keyring** // @Description **Adds GPG keys to aptly keyring**
// @Description // @Description
// @Description Add GPG public keys for veryfing remote repositories for mirroring. // @Description Add GPG public keys for verifying remote repositories for mirroring.
// @Description // @Description
// @Description Keys can be added in two ways: // @Description Keys can be added in two ways:
// @Description * By providing the ASCII armord key in `GpgKeyArmor` (leave Keyserver and GpgKeyID empty) // @Description * By providing the ASCII armord key in `GpgKeyArmor` (leave Keyserver and GpgKeyID empty)
+1 -1
View File
@@ -497,7 +497,7 @@ func apiMirrorsEdit(c *gin.Context) {
type mirrorUpdateParams struct { type mirrorUpdateParams struct {
// Change mirror name to `Name` // Change mirror name to `Name`
Name string ` json:"Name" example:"mirror1"` Name string ` json:"Name" example:"mirror1"`
// Gpg keyring(s) for verifing Release file // Gpg keyring(s) for verifying Release file
Keyrings []string ` json:"Keyrings" example:"trustedkeys.gpg"` Keyrings []string ` json:"Keyrings" example:"trustedkeys.gpg"`
// Set "true" to ignore checksum errors // Set "true" to ignore checksum errors
IgnoreChecksums bool ` json:"IgnoreChecksums"` IgnoreChecksums bool ` json:"IgnoreChecksums"`
+305 -252
View File
@@ -2,6 +2,7 @@ package api
import ( import (
"fmt" "fmt"
"log"
"net/http" "net/http"
"strings" "strings"
@@ -124,7 +125,7 @@ func apiPublishList(c *gin.Context) {
// @Description See also: `aptly publish show` // @Description See also: `aptly publish show`
// @Tags Publish // @Tags Publish
// @Produce json // @Produce json
// @Param prefix path string true "publishing prefix, use `:.` instead of `.` because it is ambigious in URLs" // @Param prefix path string true "publishing prefix, use `:.` instead of `.` because it is ambiguous in URLs"
// @Param distribution path string true "distribution name" // @Param distribution path string true "distribution name"
// @Success 200 {object} deb.PublishedRepo // @Success 200 {object} deb.PublishedRepo
// @Failure 404 {object} Error "Published repository not found" // @Failure 404 {object} Error "Published repository not found"
@@ -255,13 +256,13 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
if b.SourceKind == deb.SourceSnapshot { if b.SourceKind == deb.SourceSnapshot {
var snapshot *deb.Snapshot var snapshot *deb.Snapshot
tmpCollection := collectionFactory.SnapshotCollection() snapshotCollection := collectionFactory.SnapshotCollection()
for _, source := range b.Sources { for _, source := range b.Sources {
components = append(components, source.Component) components = append(components, source.Component)
names = append(names, source.Name) names = append(names, source.Name)
snapshot, err = tmpCollection.ByName(source.Name) snapshot, err = snapshotCollection.ByName(source.Name)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err))
return return
@@ -273,13 +274,13 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
} else if b.SourceKind == deb.SourceLocalRepo { } else if b.SourceKind == deb.SourceLocalRepo {
var localRepo *deb.LocalRepo var localRepo *deb.LocalRepo
tmpCollection := collectionFactory.LocalRepoCollection() localCollection := collectionFactory.LocalRepoCollection()
for _, source := range b.Sources { for _, source := range b.Sources {
components = append(components, source.Component) components = append(components, source.Component)
names = append(names, source.Name) names = append(names, source.Name)
localRepo, err = tmpCollection.ByName(source.Name) localRepo, err = localCollection.ByName(source.Name)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err))
return return
@@ -298,11 +299,25 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
multiDist = *b.MultiDist multiDist = *b.MultiDist
} }
collection := collectionFactory.PublishedRepoCollection() // Pre-register the published repo key in resources so that concurrent
// POST requests for the same prefix/distribution are serialized by the
// task queue rather than racing on CheckDuplicate + Add.
if b.Distribution != "" {
storagePrefix := prefix
if storage != "" {
storagePrefix = storage + ":" + prefix
}
resources = append(resources, "U"+storagePrefix+">>"+b.Distribution)
} else {
log.Printf("distribution not specified for publish to prefix '%s' - unable to lock ", prefix)
}
taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"", taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"",
b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`)) b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`))
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
taskDetail := task.PublishDetail{ taskDetail := task.PublishDetail{
Detail: detail, Detail: detail,
} }
@@ -314,10 +329,10 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
for _, source := range sources { for _, source := range sources {
switch s := source.(type) { switch s := source.(type) {
case *deb.Snapshot: case *deb.Snapshot:
snapshotCollection := collectionFactory.SnapshotCollection() snapshotCollection := taskCollectionFactory.SnapshotCollection()
err = snapshotCollection.LoadComplete(s) err = snapshotCollection.LoadComplete(s)
case *deb.LocalRepo: case *deb.LocalRepo:
localCollection := collectionFactory.LocalRepoCollection() localCollection := taskCollectionFactory.LocalRepoCollection()
err = localCollection.LoadComplete(s) err = localCollection.LoadComplete(s)
default: default:
err = fmt.Errorf("unexpected type for source: %T", source) err = fmt.Errorf("unexpected type for source: %T", source)
@@ -327,7 +342,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
} }
} }
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, multiDist) published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, taskCollectionFactory, multiDist)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
} }
@@ -365,18 +380,18 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
published.Version = b.Version published.Version = b.Version
} }
duplicate := collection.CheckDuplicate(published) duplicate := taskCollection.CheckDuplicate(published)
if duplicate != nil { if duplicate != nil {
_ = collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory) _ = taskCollectionFactory.PublishedRepoCollection().LoadComplete(duplicate, taskCollectionFactory)
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate) return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate)
} }
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath()) err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath())
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
} }
err = collection.Add(published) err = taskCollection.Add(published)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
} }
@@ -385,46 +400,6 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
}) })
} }
// Return resources to be locked for a Snapshot name
func getSnapshotResources(snapshotCollection *deb.SnapshotCollection, snapshotName string) (resources []string, err error) {
snapshot, err := snapshotCollection.ByName(snapshotName)
if err != nil {
return
}
resources = append(resources, string(snapshot.ResourceKey()))
for _, sourceID := range snapshot.SourceIDs {
if snapshot.SourceKind == deb.SourceSnapshot {
snapshot2, err2 := snapshotCollection.ByUUID(sourceID)
if err2 != nil {
err = err2
return
}
res, err3 := getSnapshotResources(snapshotCollection, snapshot2.Name)
if err3 != nil {
err = err3
return
}
resources = append(resources, res...)
} else if snapshot.SourceKind == deb.SourceLocalRepo {
var repo *deb.LocalRepo
repo, err = context.NewCollectionFactory().LocalRepoCollection().ByUUID(sourceID)
if err != nil {
return
}
resources = append(resources, string(repo.Key()))
} else if snapshot.SourceKind == deb.SourceRemoteRepo {
var mirror *deb.RemoteRepo
mirror, err = context.NewCollectionFactory().RemoteRepoCollection().ByUUID(sourceID)
if err != nil {
return
}
resources = append(resources, string(mirror.Key()))
}
}
return
}
type publishedRepoUpdateSwitchParams struct { type publishedRepoUpdateSwitchParams struct {
// when publishing, overwrite files in pool/ directory without notice // when publishing, overwrite files in pool/ directory without notice
ForceOverwrite bool ` json:"ForceOverwrite" example:"false"` ForceOverwrite bool ` json:"ForceOverwrite" example:"false"`
@@ -444,12 +419,12 @@ type publishedRepoUpdateSwitchParams struct {
SignedBy *string ` json:"SignedBy" example:""` SignedBy *string ` json:"SignedBy" example:""`
// Enable multiple packages with the same filename in different distributions // Enable multiple packages with the same filename in different distributions
MultiDist *bool ` json:"MultiDist" example:"false"` MultiDist *bool ` json:"MultiDist" example:"false"`
// Value of Label: field in published repository stanza // Value of Label: field in published repository stanza
Label *string ` json:"Label" example:"Debian"` Label *string ` json:"Label" example:"Debian"`
// Value of Origin: field in published repository stanza // Value of Origin: field in published repository stanza
Origin *string ` json:"Origin" example:"Debian"` Origin *string ` json:"Origin" example:"Debian"`
// Version of the release: Optional // Version of the release: Optional
Version *string ` json:"Version" example:"13.3"` Version *string ` json:"Version" example:"13.3"`
} }
// @Summary Update Published Repository // @Summary Update Published Repository
@@ -496,6 +471,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection() collection := collectionFactory.PublishedRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection() snapshotCollection := collectionFactory.SnapshotCollection()
localRepoCollection := collectionFactory.LocalRepoCollection()
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
@@ -510,69 +486,71 @@ func apiPublishUpdateSwitch(c *gin.Context) {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo")) AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo"))
return return
} }
for _, uuid := range published.Sources {
localCollection := collectionFactory.LocalRepoCollection() repo, err2 := localRepoCollection.ByUUID(uuid)
for _, sourceID := range published.Sources {
localRepo, err2 := localCollection.ByUUID(sourceID)
if err2 != nil { if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2) AbortWithJSONError(c, http.StatusNotFound, err2)
return return
} }
resources = append(resources, string(localRepo.Key())) resources = append(resources, string(repo.Key()))
} }
} else if published.SourceKind == deb.SourceSnapshot { } else if published.SourceKind == deb.SourceSnapshot {
for _, snapshotInfo := range b.Snapshots { for _, snapshotInfo := range b.Snapshots {
res, err2 := getSnapshotResources(snapshotCollection, snapshotInfo.Name) snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name)
if err2 != nil { if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2) AbortWithJSONError(c, http.StatusNotFound, err2)
return return
} }
resources = append(resources, res...) resources = append(resources, string(snapshot.ResourceKey()))
} }
} else { } else {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type")) AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type"))
return return
} }
if b.SkipContents != nil { // Field mutations and fresh DB load are deferred to inside the task so
published.SkipContents = *b.SkipContents // they always operate on a consistent state after the lock is held.
}
if b.SkipBz2 != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
if b.SignedBy != nil {
published.SignedBy = *b.SignedBy
}
if b.MultiDist != nil {
published.MultiDist = *b.MultiDist
}
if b.Label != nil {
published.Label = *b.Label
}
if b.Origin != nil {
published.Origin = *b.Origin
}
if b.Version != nil {
published.Version = *b.Version
}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.LoadComplete(published, collectionFactory) taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
} }
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
// Apply field mutations on the freshly loaded object.
if b.SkipContents != nil {
published.SkipContents = *b.SkipContents
}
if b.SkipBz2 != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
if b.SignedBy != nil {
published.SignedBy = *b.SignedBy
}
if b.MultiDist != nil {
published.MultiDist = *b.MultiDist
}
if b.Label != nil {
published.Label = *b.Label
}
if b.Origin != nil {
published.Origin = *b.Origin
}
if b.Version != nil {
published.Version = *b.Version
}
revision := published.ObtainRevision() revision := published.ObtainRevision()
sources := revision.Sources sources := revision.Sources
@@ -584,17 +562,17 @@ func apiPublishUpdateSwitch(c *gin.Context) {
} }
} }
result, err := published.Update(collectionFactory, out) result, err := published.Update(taskCollectionFactory, out)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
} }
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
} }
err = collection.Update(published) err = taskCollection.Update(published)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
} }
@@ -602,7 +580,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
if b.SkipCleanup == nil || !*b.SkipCleanup { if b.SkipCleanup == nil || !*b.SkipCleanup {
cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources)) cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources))
cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...) cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...)
err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out) err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
} }
@@ -652,8 +630,11 @@ func apiPublishDrop(c *gin.Context) {
resources := []string{string(published.Key())} resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err := collection.Remove(context, storage, prefix, distribution, taskCollectionFactory := context.NewCollectionFactory()
collectionFactory, out, force, skipCleanup) taskCollection := taskCollectionFactory.PublishedRepoCollection()
err := taskCollection.Remove(context, storage, prefix, distribution,
taskCollectionFactory, out, force, skipCleanup)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %s", err)
} }
@@ -689,43 +670,52 @@ func apiPublishAddSource(c *gin.Context) {
storage, prefix := deb.ParsePrefix(param) storage, prefix := deb.ParsePrefix(param)
distribution := slashEscape(c.Params.ByName("distribution")) distribution := slashEscape(c.Params.ByName("distribution"))
if c.Bind(&b) != nil {
return
}
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection() collection := collectionFactory.PublishedRepoCollection()
// Load shallowly (no LoadComplete) to verify existence and obtain the
// resource key and task name. The actual mutation is performed inside
// the task on a freshly loaded copy to prevent lost-update races.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err))
return return
} }
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to create: %s", err))
return
}
if c.Bind(&b) != nil {
return
}
revision := published.ObtainRevision()
sources := revision.Sources
component := b.Component
name := b.Name
_, exists := sources[component]
if exists {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("unable to create: Component '%s' already exists", component))
return
}
sources[component] = name
resources := []string{string(published.Key())} resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.Update(published) taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to create: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create: %s", err)
}
revision := published.ObtainRevision()
sources := revision.Sources
component := b.Component
name := b.Name
_, exists := sources[component]
if exists {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("unable to create: Component '%s' already exists", component)
}
sources[component] = name
err = taskCollection.Update(published)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
} }
@@ -807,39 +797,48 @@ func apiPublishSetSources(c *gin.Context) {
storage, prefix := deb.ParsePrefix(param) storage, prefix := deb.ParsePrefix(param)
distribution := slashEscape(c.Params.ByName("distribution")) distribution := slashEscape(c.Params.ByName("distribution"))
if c.Bind(&b) != nil {
return
}
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection() collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and mutation happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
return return
} }
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
return
}
if c.Bind(&b) != nil {
return
}
revision := published.ObtainRevision()
sources := make(map[string]string, len(b))
revision.Sources = sources
for _, source := range b {
component := source.Component
name := source.Name
sources[component] = name
}
resources := []string{string(published.Key())} resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.Update(published) taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
revision := published.ObtainRevision()
sources := make(map[string]string, len(b))
revision.Sources = sources
for _, source := range b {
component := source.Component
name := source.Name
sources[component] = name
}
err = taskCollection.Update(published)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
} }
@@ -872,24 +871,33 @@ func apiPublishDropChanges(c *gin.Context) {
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection() collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and DropRevision happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
return return
} }
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
return
}
published.DropRevision()
resources := []string{string(published.Key())} resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.Update(published) taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err)
}
published.DropRevision()
err = taskCollection.Update(published)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
} }
@@ -925,51 +933,58 @@ func apiPublishUpdateSource(c *gin.Context) {
param := slashEscape(c.Params.ByName("prefix")) param := slashEscape(c.Params.ByName("prefix"))
storage, prefix := deb.ParsePrefix(param) storage, prefix := deb.ParsePrefix(param)
distribution := slashEscape(c.Params.ByName("distribution")) distribution := slashEscape(c.Params.ByName("distribution"))
component := slashEscape(c.Params.ByName("component")) urlComponent := slashEscape(c.Params.ByName("component"))
// Default component to the URL path segment; the body may rename it.
b.Component = urlComponent
if c.Bind(&b) != nil {
return
}
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection() collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and mutation happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
return return
} }
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
return
}
revision := published.ObtainRevision()
sources := revision.Sources
_, exists := sources[component]
if !exists {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: Component '%s' does not exist", component))
return
}
b.Component = component
b.Name = revision.Sources[component]
if c.Bind(&b) != nil {
return
}
if b.Component != component {
delete(sources, component)
}
component = b.Component
name := b.Name
sources[component] = name
resources := []string{string(published.Key())} resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.Update(published) taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
revision := published.ObtainRevision()
sources := revision.Sources
_, exists := sources[urlComponent]
if !exists {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: Component '%s' does not exist", urlComponent)
}
if b.Component != urlComponent {
delete(sources, urlComponent)
}
newComponent := b.Component
name := b.Name
sources[newComponent] = name
err = taskCollection.Update(published)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
} }
@@ -1006,33 +1021,41 @@ func apiPublishRemoveSource(c *gin.Context) {
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection() collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and mutation happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
return return
} }
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
return
}
revision := published.ObtainRevision()
sources := revision.Sources
_, exists := sources[component]
if !exists {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: Component '%s' does not exist", component))
return
}
delete(sources, component)
resources := []string{string(published.Key())} resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.Update(published) taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err)
}
revision := published.ObtainRevision()
sources := revision.Sources
_, exists := sources[component]
if !exists {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: Component '%s' does not exist", component)
}
delete(sources, component)
err = taskCollection.Update(published)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
} }
@@ -1104,64 +1127,94 @@ func apiPublishUpdate(c *gin.Context) {
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection() collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and field mutations happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
return return
} }
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
return
}
if b.SkipContents != nil {
published.SkipContents = *b.SkipContents
}
if b.SkipBz2 != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
if b.SignedBy != nil {
published.SignedBy = *b.SignedBy
}
if b.MultiDist != nil {
published.MultiDist = *b.MultiDist
}
if b.Label != nil {
published.Label = *b.Label
}
if b.Origin != nil {
published.Origin = *b.Origin
}
if b.Version != nil {
published.Version = *b.Version
}
resources := []string{string(published.Key())} resources := []string{string(published.Key())}
// Lock source repos / snapshots the same way apiPublishUpdateSwitch does,
// because published.Update() reads from them and concurrent modification
// would produce an inconsistent view.
snapshotCollection := collectionFactory.SnapshotCollection()
localRepoCollection := collectionFactory.LocalRepoCollection()
if published.SourceKind == deb.SourceLocalRepo {
for _, uuid := range published.Sources {
repo, err2 := localRepoCollection.ByUUID(uuid)
if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(repo.Key()))
}
} else if published.SourceKind == deb.SourceSnapshot {
for _, uuid := range published.Sources {
snapshot, err2 := snapshotCollection.ByUUID(uuid)
if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(snapshot.ResourceKey()))
}
}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
result, err := published.Update(collectionFactory, out) taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
} }
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
} }
err = collection.Update(published) // Apply field mutations on the freshly loaded object.
if b.SkipContents != nil {
published.SkipContents = *b.SkipContents
}
if b.SkipBz2 != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
if b.SignedBy != nil {
published.SignedBy = *b.SignedBy
}
if b.MultiDist != nil {
published.MultiDist = *b.MultiDist
}
if b.Label != nil {
published.Label = *b.Label
}
if b.Origin != nil {
published.Origin = *b.Origin
}
if b.Version != nil {
published.Version = *b.Version
}
result, err := published.Update(taskCollectionFactory, out)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = taskCollection.Update(published)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
} }
@@ -1169,7 +1222,7 @@ func apiPublishUpdate(c *gin.Context) {
if b.SkipCleanup == nil || !*b.SkipCleanup { if b.SkipCleanup == nil || !*b.SkipCleanup {
cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources)) cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources))
cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...) cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...)
err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out) err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out)
if err != nil { if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
} }
+2 -2
View File
@@ -102,7 +102,7 @@ type repoCreateParams struct {
DefaultDistribution string ` json:"DefaultDistribution" example:"stable"` DefaultDistribution string ` json:"DefaultDistribution" example:"stable"`
// Default component when publishing from this local repo // Default component when publishing from this local repo
DefaultComponent string ` json:"DefaultComponent" example:"main"` DefaultComponent string ` json:"DefaultComponent" example:"main"`
// Snapshot name to create repoitory from (optional) // Snapshot name to create repository from (optional)
FromSnapshot string ` json:"FromSnapshot" example:""` FromSnapshot string ` json:"FromSnapshot" example:""`
} }
@@ -180,7 +180,7 @@ type reposEditParams struct {
Comment *string ` json:"Comment" example:"example repo"` Comment *string ` json:"Comment" example:"example repo"`
// Change Default Distribution for publishing // Change Default Distribution for publishing
DefaultDistribution *string ` json:"DefaultDistribution" example:""` DefaultDistribution *string ` json:"DefaultDistribution" example:""`
// Change Devault Component for publishing // Change Default Component for publishing
DefaultComponent *string ` json:"DefaultComponent" example:""` DefaultComponent *string ` json:"DefaultComponent" example:""`
} }
+2 -1
View File
@@ -168,6 +168,8 @@ func (collection *LocalRepoCollection) Update(repo *LocalRepo) error {
// LoadComplete loads additional information for local repo // LoadComplete loads additional information for local repo
func (collection *LocalRepoCollection) LoadComplete(repo *LocalRepo) error { func (collection *LocalRepoCollection) LoadComplete(repo *LocalRepo) error {
repo.packageRefs = &PackageRefList{}
encoded, err := collection.db.Get(repo.RefKey()) encoded, err := collection.db.Get(repo.RefKey())
if err == database.ErrNotFound { if err == database.ErrNotFound {
return nil return nil
@@ -176,7 +178,6 @@ func (collection *LocalRepoCollection) LoadComplete(repo *LocalRepo) error {
return err return err
} }
repo.packageRefs = &PackageRefList{}
return repo.packageRefs.Decode(encoded) return repo.packageRefs.Decode(encoded)
} }
+12
View File
@@ -133,6 +133,18 @@ func (s *LocalRepoCollectionSuite) TestByUUID(c *C) {
c.Assert(r.String(), Equals, repo.String()) c.Assert(r.String(), Equals, repo.String())
} }
func (s *LocalRepoCollectionSuite) TestLoadCompleteNoRefKey(c *C) {
repo := NewLocalRepo("local1", "Comment 1")
c.Assert(s.collection.Update(repo), IsNil)
r, err := s.collection.ByName("local1")
c.Assert(err, IsNil)
c.Assert(s.collection.LoadComplete(r), IsNil)
c.Assert(r.packageRefs, NotNil)
c.Assert(r.NumPackages(), Equals, 0)
}
func (s *LocalRepoCollectionSuite) TestUpdateLoadComplete(c *C) { func (s *LocalRepoCollectionSuite) TestUpdateLoadComplete(c *C) {
repo := NewLocalRepo("local1", "Comment 1") repo := NewLocalRepo("local1", "Comment 1")
c.Assert(s.collection.Update(repo), IsNil) c.Assert(s.collection.Update(repo), IsNil)
+1 -6
View File
@@ -609,12 +609,7 @@ func (p *PublishedRepo) StoragePrefix() string {
// Key returns unique key identifying PublishedRepo // Key returns unique key identifying PublishedRepo
func (p *PublishedRepo) Key() []byte { func (p *PublishedRepo) Key() []byte {
if p.MultiDist { return []byte("U" + p.StoragePrefix() + ">>" + p.Distribution)
// do not lock Distribution in MultiDist
return []byte("UM" + p.StoragePrefix())
} else {
return []byte("U" + p.StoragePrefix() + ">>" + p.Distribution)
}
} }
// RefKey is a unique id for package reference list // RefKey is a unique id for package reference list
+3
View File
@@ -79,6 +79,9 @@ func (l *PackageRefList) Decode(input []byte) error {
// ForEach calls handler for each package ref in list // ForEach calls handler for each package ref in list
func (l *PackageRefList) ForEach(handler func([]byte) error) error { func (l *PackageRefList) ForEach(handler func([]byte) error) error {
if l == nil {
return nil
}
var err error var err error
for _, p := range l.Refs { for _, p := range l.Refs {
err = handler(p) err = handler(p)
+11
View File
@@ -130,6 +130,17 @@ func (s *PackageRefListSuite) TestPackageRefListForeach(c *C) {
c.Check(err, Equals, e) c.Check(err, Equals, e)
} }
func (s *PackageRefListSuite) TestForEachNilList(c *C) {
var l *PackageRefList
called := false
err := l.ForEach(func([]byte) error {
called = true
return nil
})
c.Assert(err, IsNil)
c.Assert(called, Equals, false)
}
func (s *PackageRefListSuite) TestHas(c *C) { func (s *PackageRefListSuite) TestHas(c *C) {
_ = s.list.Add(s.p1) _ = s.list.Add(s.p1)
_ = s.list.Add(s.p3) _ = s.list.Add(s.p3)
+1 -1
View File
@@ -2,7 +2,7 @@
<div> <div>
In order to add debian package files to a local repository, files are first uploaded to a temporary directory. In order to add debian package files to a local repository, files are first uploaded to a temporary directory.
Then the directory (or a specific file within) is added to a repository. After adding to a repositorty, the directory resp. files are removed bt default. Then the directory (or a specific file within) is added to a repository. After adding to a repository, the directory resp. files are removed bt default.
All uploaded files are stored under `<rootDir>/upload/<tempdir>` directory. All uploaded files are stored under `<rootDir>/upload/<tempdir>` directory.
+1 -1
View File
@@ -1,5 +1,5 @@
# Search Package Collection # Search Package Collection
<div> <div>
Perform operations on the whole collection of packages in apty database. Perform operations on the whole collection of packages in aptly database.
</div> </div>
+1 -1
View File
@@ -35,6 +35,6 @@ aptly publish repo my-repo --gpg-key=KEY_ID_a --gpg-key=KEY_ID_b
#### Parameters #### Parameters
Publish APIs use following convention to identify published repositories: `/api/publish/:prefix/:distribution`. `:distribution` is distribution name, while `:prefix` is `[<storage>:]<prefix>` (storage is optional, it defaults to empty string), if publishing prefix contains slashes `/`, they should be replaced with underscores (`_`) and underscores Publish APIs use following convention to identify published repositories: `/api/publish/:prefix/:distribution`. `:distribution` is distribution name, while `:prefix` is `[<storage>:]<prefix>` (storage is optional, it defaults to empty string), if publishing prefix contains slashes `/`, they should be replaced with underscores (`_`) and underscores
should be replaced with double underscore (`__`). To specify root `:prefix`, use `:.`, as `.` is ambigious in URLs. should be replaced with double underscore (`__`). To specify root `:prefix`, use `:.`, as `.` is ambiguous in URLs.
</div> </div>
+1 -1
View File
@@ -1,6 +1,6 @@
# Manage Local Repositories # Manage Local Repositories
<div> <div>
A local repository is a collection of versionned packages (usually custom packages created internally). A local repository is a collection of versioned packages (usually custom packages created internally).
Packages can be added, removed, moved or copied between repos. Packages can be added, removed, moved or copied between repos.
+1 -1
View File
@@ -25,7 +25,7 @@ class APITest(BaseTest):
""" """
aptly_server = None aptly_server = None
aptly_out = None aptly_out = None
debugOutput = False # Controlled by --debug flag in run.py debugOutput = True
base_url = "127.0.0.1:8765" base_url = "127.0.0.1:8765"
configOverride = { configOverride = {
"FileSystemPublishEndpoints": { "FileSystemPublishEndpoints": {
+2 -2
View File
@@ -164,10 +164,10 @@ class BaseTest(object):
self.run() self.run()
self.check() self.check()
except Exception as exc: except Exception as exc:
raise exc
finally:
if self.debugOutput: if self.debugOutput:
print(f"API log:\n{self.debug_output()}") print(f"API log:\n{self.debug_output()}")
raise exc
finally:
self.teardown() self.teardown()
def prepare_remove_all(self): def prepare_remove_all(self):
+2 -9
View File
@@ -36,7 +36,7 @@ def natural_key(string_):
return [int(s) if s.isdigit() else s for s in re.split(r'(\d+)', string_)] return [int(s) if s.isdigit() else s for s in re.split(r'(\d+)', string_)]
def run(include_long_tests=False, capture_results=False, tests=None, filters=None, coverage_dir=None, coverage_skip=False, debug=False): def run(include_long_tests=False, capture_results=False, tests=None, filters=None, coverage_dir=None, coverage_skip=False):
""" """
Run system test. Run system test.
""" """
@@ -50,9 +50,6 @@ def run(include_long_tests=False, capture_results=False, tests=None, filters=Non
if not coverage_dir and not coverage_skip: if not coverage_dir and not coverage_skip:
coverage_dir = mkdtemp(suffix="aptly-coverage") coverage_dir = mkdtemp(suffix="aptly-coverage")
# Set debug output globally for all test classes
BaseTest.debugOutput = debug
failed = False failed = False
for test in tests: for test in tests:
orig_stdout = sys.stdout orig_stdout = sys.stdout
@@ -158,7 +155,6 @@ def run(include_long_tests=False, capture_results=False, tests=None, filters=Non
traceback.print_exception(typ, val, tb, file=orig_stdout) traceback.print_exception(typ, val, tb, file=orig_stdout)
else: else:
orig_stdout.write(colored("\b\b\b\bOK", color="green", attrs=["bold"]) + f" {duration}\n") orig_stdout.write(colored("\b\b\b\bOK", color="green", attrs=["bold"]) + f" {duration}\n")
orig_stdout.write(testout.get_contents())
t.shutdown() t.shutdown()
@@ -218,7 +214,6 @@ if __name__ == "__main__":
capture_results = False capture_results = False
coverage_dir = None coverage_dir = None
coverage_skip = False coverage_skip = False
debug = False
tests = None tests = None
args = sys.argv[1:] args = sys.argv[1:]
@@ -232,8 +227,6 @@ if __name__ == "__main__":
args = args[1:] args = args[1:]
elif args[0] == "--coverage-skip": elif args[0] == "--coverage-skip":
coverage_skip = True coverage_skip = True
elif args[0] == "--debug":
debug = True
args = args[1:] args = args[1:]
@@ -246,4 +239,4 @@ if __name__ == "__main__":
else: else:
filters.append(arg) filters.append(arg)
run(include_long_tests, capture_results, tests, filters, coverage_dir, coverage_skip, debug) run(include_long_tests, capture_results, tests, filters, coverage_dir, coverage_skip)
-226
View File
@@ -992,232 +992,6 @@ class PublishSwitchAPITestRepo(APITest):
self.check_not_exists("public/" + prefix + "dists/") self.check_not_exists("public/" + prefix + "dists/")
class PublishSwitchAPITestMirror(APITest):
"""
PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution
"""
fixtureGpg = True
def check(self):
mirror_name = self.random_name()
mirror_desc = {'Name': mirror_name,
'ArchiveURL': 'http://repo.aptly.info/system-tests/packagecloud.io/varnishcache/varnish30/debian/',
'Distribution': 'wheezy',
'Keyrings': ["aptlytest.gpg"],
'Architectures': ["amd64"],
'Components': ['main']}
mirror_desc['IgnoreSignatures'] = True
# Create Mirror
resp = self.post("/api/mirrors", json=mirror_desc)
self.check_equal(resp.status_code, 201)
# Get Mirror
resp = self.get("/api/mirrors/" + mirror_name + "/packages")
self.check_equal(resp.status_code, 404)
# Update Mirror
resp = self.put_task("/api/mirrors/" + mirror_name, json=mirror_desc)
self.check_task(resp)
# Snapshot Mirror
snapshot1_name = self.random_name()
task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot1_name})
self.check_task(task)
# Publish Snapshot
prefix = self.random_name()
task = self.post_task(
"/api/publish/" + prefix,
json={
"Architectures": ["i386", "source"],
"SourceKind": "snapshot",
"Sources": [{"Name": snapshot1_name}],
"Signing": DefaultSigningOptions,
})
self.check_task(task)
repo_expected = {
'AcquireByHash': False,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': '',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Origin': 'packagecloud.io/varnishcache/varnish30',
'Version': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SignedBy': '',
'SkipContents': False,
'MultiDist': False,
'SourceKind': 'snapshot',
'Sources': [{'Component': 'main', 'Name': snapshot1_name}],
'Storage': '',
'Suite': ''}
all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
# Snapshot Mirror 2
snapshot2_name = self.random_name()
task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot2_name})
self.check_task(task)
task = self.put_task(
"/api/publish/" + prefix + "/wheezy",
json={
"Snapshots": [{"Component": "main", "Name": snapshot2_name}],
"Signing": DefaultSigningOptions,
"SkipContents": True,
"Label": "fun",
"Origin": "earth",
"Version": "13.3",
})
self.check_task(task)
repo_expected = {
'AcquireByHash': False,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': 'fun',
'Origin': 'earth',
'Version': '13.3',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SignedBy': '',
'SkipContents': True,
'MultiDist': False,
'SourceKind': 'snapshot',
'Sources': [{'Component': 'main', 'Name': snapshot2_name}],
'Storage': '',
'Suite': ''}
all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
task = self.delete_task("/api/publish/" + prefix + "/wheezy")
self.check_task(task)
self.check_not_exists("public/" + prefix + "dists/")
class PublishSwitchAPITestSnapshot(APITest):
"""
publish snapshot of snapshot
"""
fixtureGpg = True
def check(self):
repo_name = self.random_name()
self.check_equal(self.post(
"/api/repos", json={"Name": repo_name, "DefaultDistribution": "wheezy"}).status_code, 201)
d = self.random_name()
self.check_equal(
self.upload("/api/files/" + d,
"pyspi_0.6.1-1.3.dsc",
"pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz",
"pyspi-0.6.1-1.3.stripped.dsc").status_code, 200)
task = self.post_task("/api/repos/" + repo_name + "/file/" + d)
self.check_task(task)
snapshot1_name = self.random_name()
task = self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot1_name})
self.check_task(task)
prefix = self.random_name()
task = self.post_task(
"/api/publish/" + prefix,
json={
"Architectures": ["i386", "source"],
"SourceKind": "snapshot",
"Sources": [{"Name": snapshot1_name}],
"Signing": DefaultSigningOptions,
})
self.check_task(task)
repo_expected = {
'AcquireByHash': False,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': '',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Origin': '',
'Version': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SignedBy': '',
'SkipContents': False,
'MultiDist': False,
'SourceKind': 'snapshot',
'Sources': [{'Component': 'main', 'Name': snapshot1_name}],
'Storage': '',
'Suite': ''}
all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
self.check_not_exists(
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
self.check_exists("public/" + prefix +
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
snapshot2_name = self.random_name()
task = self.post_task("/api/snapshots", json={"Name": snapshot2_name, 'SourceSnapshots': [snapshot1_name]})
self.check_task(task)
task = self.put_task(
"/api/publish/" + prefix + "/wheezy",
json={
"Snapshots": [{"Component": "main", "Name": snapshot2_name}],
"Signing": DefaultSigningOptions,
"SkipContents": True,
"Label": "fun",
"Origin": "earth",
"Version": "13.3",
})
self.check_task(task)
repo_expected = {
'AcquireByHash': False,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': 'fun',
'Origin': 'earth',
'Version': '13.3',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SignedBy': '',
'SkipContents': True,
'MultiDist': False,
'SourceKind': 'snapshot',
'Sources': [{'Component': 'main', 'Name': snapshot2_name}],
'Storage': '',
'Suite': ''}
all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
# FIXME: what should exist here ? publish snapshot of snapshot
self.check_not_exists(
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
self.check_not_exists("public/" + prefix +
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
task = self.delete_task("/api/publish/" + prefix + "/wheezy")
self.check_task(task)
self.check_not_exists("public/" + prefix + "dists/")
class PublishSwitchAPITestRepoSignedBy(APITest): class PublishSwitchAPITestRepoSignedBy(APITest):
""" """
PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution
+1 -1
View File
@@ -14,7 +14,7 @@ class UnixSocketAPITest(BaseTest):
socket_path = "/tmp/_aptly_test.sock" socket_path = "/tmp/_aptly_test.sock"
base_url = ("unix://%s" % socket_path) base_url = ("unix://%s" % socket_path)
aptly_out = None aptly_out = None
debugOutput = False # Controlled by --debug flag in run.py debugOutput = True
def prepare(self): def prepare(self):
if self.aptly_server is None: if self.aptly_server is None:
-5
View File
@@ -65,7 +65,6 @@ func (list *List) consumer() {
task.State = SUCCEEDED task.State = SUCCEEDED
} }
fmt.Printf("RACE DEBUG: Task Done '%s', freeing %s\n", task.Name, task.resources)
list.usedResources.Free(task.resources) list.usedResources.Free(task.resources)
task.wgTask.Done() task.wgTask.Done()
@@ -78,8 +77,6 @@ func (list *List) consumer() {
blockingTasks := list.usedResources.UsedBy(t.resources) blockingTasks := list.usedResources.UsedBy(t.resources)
if len(blockingTasks) == 0 { if len(blockingTasks) == 0 {
list.usedResources.MarkInUse(t.resources, t) list.usedResources.MarkInUse(t.resources, t)
fmt.Printf("RACE DEBUG: Task Resuming '%s', locking %s\n", t.Name, t.resources)
// unlock list since queueing may block // unlock list since queueing may block
list.Unlock() list.Unlock()
unlocked = true unlocked = true
@@ -212,12 +209,10 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
tasks := list.usedResources.UsedBy(resources) tasks := list.usedResources.UsedBy(resources)
if len(tasks) == 0 { if len(tasks) == 0 {
list.usedResources.MarkInUse(task.resources, task) list.usedResources.MarkInUse(task.resources, task)
fmt.Printf("RACE DEBUG: Task Starting '%s', locking %s\n", name, resources)
// queueing task might block if channel not ready, unlock list before queueing // queueing task might block if channel not ready, unlock list before queueing
list.Unlock() list.Unlock()
list.queue <- task list.queue <- task
} else { } else {
fmt.Printf("RACE DEBUG: Task Queued '%s', waiting on %s\n", name, resources)
list.Unlock() list.Unlock()
} }