Compare commits

..

12 Commits

Author SHA1 Message Date
André Roth 8d6215f3ae docker: provide test image with source 2026-06-07 23:03:38 +02:00
André Roth 9c82a7036b system tests: add repo and publish tests 2026-06-07 22:45:33 +02:00
André Roth 784e215d78 publish: lock pool on non MultiDist publish
* revert mutex on LinkFromPool
* add tests
2026-06-07 22:45:33 +02:00
André Roth 3849750cf3 tasks: fix task state locking
Race condition iexisted where task State, err, and processReturnValue fields
were written by consumer goroutine and read by concurrent accessors without
proper synchronization, causing torn reads and data races.

Implemented single-lock model with optimal lock scope:

- Removed 8 accessor methods (direct field access is simpler)
- Lock only during brief state transitions (IDLE→RUNNING, RUNNING→SUCCEEDED/FAILED)
- Release lock during task.process() execution to enable full concurrency
- Readers hold list.Lock() only during atomic struct copy
- Moved State = RUNNING before goroutine spawn for clearer semantics
- task/list.go: RunTaskInBackground() copies *task before unlock,
    returns the pre-made copy instead of dereferencing after unlock
2026-06-07 22:45:33 +02:00
André Roth 9e0215457f mirror: load data inside background tasks
This fixes a flaw in async apis, which loaded data from the
DB and mutated it outside the task closure, before the task lock was acquired.

 * perform collection.LoadComplete inside maybeRunTaskInBackground
 * have tasks use a fresh copy of taskCollectionFactory, taskCollection
2026-06-07 16:20:23 +02:00
André Roth 2bd474adc2 snapshot: load data inside background tasks
This fixes a flaw in async apis, which loaded data from the
DB and mutated it outside the task closure, before the task lock was acquired.

 * perform collection.LoadComplete inside maybeRunTaskInBackground
 * have tasks use a fresh copy of taskCollectionFactory, taskCollection
2026-06-07 16:20:23 +02:00
André Roth 14b1936f58 repos: load data inside background tasks
This fixes a flaw in async apis, which loaded data from the
DB and mutated it outside the task closure, before the task lock was acquired.

    * perform collection.LoadComplete inside maybeRunTaskInBackground
    * have tasks use a fresh copy of taskCollectionFactory, taskCollection
2026-06-07 16:08:02 +02:00
André Roth 3b7272a872 docs: fix typo 2026-06-07 15:49:32 +02:00
André Roth a58894a757 publish: lock source repos/snapshots
Concurrent tasks were not properly locking their resources, leading to
inconsistent published indexes:

  Task A: apiPublishUpdateSwitch loads published, reads source repo/snapshot
  Request B: modifies same source repo or snapshot (add/remove packages, etc)
  Task A: Update() + Publish() reads stale/modified source -> inconsistent
           published index, or partial write if source deleted mid-task.

This changes introduces resource locking for publishing:
* SourceLocalRepo: iterate published.Sources (component -> source UUID), look up each local repo via localRepoCollection.ByUUID and append
string(repo.Key()) to resources
* SourceSnapshot: iterate b.Snapshots,look up each snapshot via snapshotCollection.ByName and append
string(snapshot.ResourceKey()) to resources.
2026-06-07 15:49:15 +02:00
André Roth 461e4a545b fix(publish): warn when distribution missing and resource key cannot be pre-registered
When b.Distribution is empty, the pre-registered resource key
U<storage>:<prefix>>><distribution> cannot be constructed, so
concurrent POST requests to the same prefix are not serialized by the
task queue.  Add a log warning so operators are aware of the gap.
2026-06-07 15:45:20 +02:00
André Roth 28fea864b8 publish: load data inside background tasks
This fixes a flaw in async apis, which loaded the published repo from the
DB and mutated it outside the task closure, before the task lock was acquired.

* perform collection.LoadComplete inside maybeRunTaskInBackground
* have tasks use a fresh copy of taskCollectionFactory, taskCollection
2026-06-07 15:44:35 +02:00
André Roth 2827620cfe fix(publish): pre-register published repo key before task submission
apiPublishRepoOrSnapshot appended published.Key() to resources inside
the task closure, after maybeRunTaskInBackground had already been called.
The task's locked-resource set is fixed at submission time, so that append
had no effect — the published repo key was never registered as a resource.

Two concurrent POST /api/publish/{prefix} requests for the same
prefix/distribution therefore did not conflict in the task queue: both
ran in parallel, each loaded an empty PublishedRepoCollection from the DB,
both passed CheckDuplicate, and the second Add silently overwrote the first.

Fix: compute the published repo key ("U{storagePrefix}>>{distribution}")
from the already-known storage/prefix/distribution values and append it to
resources before calling maybeRunTaskInBackground, so concurrent creates
for the same destination are serialised by the task queue.  The now-dead
append inside the closure is removed.
2026-05-23 13:54:50 +02:00
8 changed files with 81 additions and 45 deletions
+3 -3
View File
@@ -1,4 +1,3 @@
---
name: CI name: CI
on: on:
@@ -11,6 +10,7 @@ on:
defaults: defaults:
run: run:
# see: https://docs.github.com/en/actions/reference/workflow-syntax-for-github-actions#using-a-specific-shell
shell: bash --noprofile --norc -eo pipefail {0} shell: bash --noprofile --norc -eo pipefail {0}
env: env:
@@ -33,7 +33,7 @@ jobs:
make docker-image make docker-image
- name: "Unit Tests" - name: "Unit Tests"
run: | run: |
make docker-unit-test make docker-unit-tests
mkdir -p out/coverage mkdir -p out/coverage
mv unit.out out/coverage/ mv unit.out out/coverage/
- uses: actions/upload-artifact@v4 - uses: actions/upload-artifact@v4
@@ -140,7 +140,7 @@ jobs:
- name: "Upload Code Coverage" - name: "Upload Code Coverage"
if: github.actor != 'dependabot[bot]' if: github.actor != 'dependabot[bot]'
uses: codecov/codecov-action@v7.0.0 uses: codecov/codecov-action@v5
with: with:
token: ${{ secrets.CODECOV_TOKEN }} token: ${{ secrets.CODECOV_TOKEN }}
files: coverage.txt files: coverage.txt
+1 -1
View File
@@ -130,7 +130,7 @@ aptly version: 1.5.0+189+g0fc90dff
In order to run aptly unit tests, enter the following: In order to run aptly unit tests, enter the following:
``` ```
make docker-unit-test make docker-unit-tests
``` ```
#### Running system tests #### Running system tests
+4 -1
View File
@@ -182,6 +182,9 @@ binaries: prepare swagger ## Build binary releases (FreeBSD, macOS, Linux gener
docker-image: ## Build aptly-dev docker image docker-image: ## Build aptly-dev docker image
@docker build -f system/Dockerfile . -t aptly-dev @docker build -f system/Dockerfile . -t aptly-dev
docker-image-test: # Build aptly-test docker image for testing
@docker build -f docker/test.Dockerfile . -t aptly-test
docker-image-no-cache: ## Build aptly-dev docker image (no cache) docker-image-no-cache: ## Build aptly-dev docker image (no cache)
@docker build --no-cache -f system/Dockerfile . -t aptly-dev @docker build --no-cache -f system/Dockerfile . -t aptly-dev
@@ -194,7 +197,7 @@ docker-shell: ## Run aptly and other commands in docker container
docker-deb: ## Build debian packages in docker container docker-deb: ## Build debian packages in docker container
@$(DOCKER_RUN) -t aptly-dev /work/src/system/docker-wrapper dpkg DEBARCH=amd64 @$(DOCKER_RUN) -t aptly-dev /work/src/system/docker-wrapper dpkg DEBARCH=amd64
docker-unit-test: ## Run unit tests in docker container (add TEST=regex to specify which tests to run) docker-unit-tests: ## Run unit tests in docker container (add TEST=regex to specify which tests to run)
$(DOCKER_RUN) -t --tmpfs /smallfs:rw,size=1m aptly-dev /work/src/system/docker-wrapper \ $(DOCKER_RUN) -t --tmpfs /smallfs:rw,size=1m aptly-dev /work/src/system/docker-wrapper \
azurite-start \ azurite-start \
AZURE_STORAGE_ENDPOINT=http://127.0.0.1:10000/devstoreaccount1 \ AZURE_STORAGE_ENDPOINT=http://127.0.0.1:10000/devstoreaccount1 \
+14 -6
View File
@@ -2,6 +2,7 @@ package api
import ( import (
"fmt" "fmt"
"log"
"net/http" "net/http"
"strings" "strings"
@@ -298,16 +299,23 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
multiDist = *b.MultiDist multiDist = *b.MultiDist
} }
// Non-MultiDist publishes share a single pool/ directory under the // Pre-register the published repo key in resources so that concurrent
// prefix. Lock at the prefix level so that concurrent publish/drop // POST requests for the same prefix/distribution are serialized by the
// operations on sibling distributions cannot race during cleanup. // task queue rather than racing on CheckDuplicate + Add.
if !multiDist { if b.Distribution != "" {
storagePrefix := prefix storagePrefix := prefix
if storage != "" { if storage != "" {
storagePrefix = storage + ":" + prefix storagePrefix = storage + ":" + prefix
} }
resources = append(resources, "U"+storagePrefix+">>"+b.Distribution)
resources = append(resources, deb.PrefixPoolLockKey(storagePrefix)) // Non-MultiDist publishes share a single pool/ directory under the
// prefix. Lock at the prefix level so that concurrent publish/drop
// operations on sibling distributions cannot race during cleanup.
if !multiDist {
resources = append(resources, deb.PrefixPoolLockKey(storagePrefix))
}
} else {
log.Printf("distribution not specified for publish to prefix '%s' - unable to lock ", prefix)
} }
taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"", taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"",
+34 -4
View File
@@ -429,18 +429,48 @@ func (s *PublishedFileMissingSuite) TestIdenticalPackageRace(c *C) {
go func() { go func() {
defer wg.Done() defer wg.Done()
s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[0], uploadID1), nil) //time.Sleep(5 * time.Millisecond)
c.Logf("[iter %d] Import A", iter)
resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[0], uploadID1), nil)
c.Logf("[iter %d] Import A complete: %d", iter, resp.Code)
updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true}) updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody) c.Logf("[iter %d] Publish A", iter)
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody)
c.Logf("[iter %d] Publish A complete: %d", iter, resp.Code)
}() }()
go func() { go func() {
defer wg.Done() defer wg.Done()
s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[1], uploadID2), nil) //time.Sleep(7 * time.Millisecond)
c.Logf("[iter %d] Import B", iter)
resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[1], uploadID2), nil)
c.Logf("[iter %d] Import B complete: %d", iter, resp.Code)
updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true}) updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody) c.Logf("[iter %d] Publish B", iter)
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody)
c.Logf("[iter %d] Publish B complete: %d", iter, resp.Code)
}() }()
//go func() {
//defer wg.Done()
//time.Sleep(15 * time.Millisecond)
//updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
//c.Logf("[iter %d] Publish A", iter)
//resp := s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody)
//c.Logf("[iter %d] Publish A complete: %d", iter, resp.Code)
//}()
//go func() {
//defer wg.Done()
//time.Sleep(18 * time.Millisecond)
//updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
//c.Logf("[iter %d] Publish B", iter)
//resp := s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody)
//c.Logf("[iter %d] Publish B complete: %d", iter, resp.Code)
//}()
wg.Wait() wg.Wait()
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
c.Logf("[iter %d] All operations complete", iter) c.Logf("[iter %d] All operations complete", iter)
+15 -28
View File
@@ -209,37 +209,24 @@ func apiSnapshotsCreate(c *gin.Context) {
} }
} }
// Merge packages from all source snapshots list := deb.NewPackageList()
var refList *deb.PackageRefList
if len(freshSources) > 0 { // verify package refs and build package list using fresh factory
refList = freshSources[0].RefList() for _, ref := range b.PackageRefs {
for i := 1; i < len(freshSources); i++ { p, err := taskPackageCollection.ByKey([]byte(ref))
refList = refList.Merge(freshSources[i].RefList(), true, false) if err != nil {
if err == database.ErrNotFound {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err)
}
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = list.Add(p)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
} }
} else {
refList = deb.NewPackageRefList()
} }
// Add any explicitly specified package refs on top snapshot = deb.NewSnapshotFromRefList(b.Name, freshSources, deb.NewPackageRefListFromPackageList(list), b.Description)
if len(b.PackageRefs) > 0 {
list := deb.NewPackageList()
for _, ref := range b.PackageRefs {
p, err := taskPackageCollection.ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err)
}
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = list.Add(p)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
}
}
refList = refList.Merge(deb.NewPackageRefListFromPackageList(list), true, false)
}
snapshot = deb.NewSnapshotFromRefList(b.Name, freshSources, refList, b.Description)
err = taskSnapshotCollection.Add(snapshot) err = taskSnapshotCollection.Add(snapshot)
if err != nil { if err != nil {
+7
View File
@@ -0,0 +1,7 @@
FROM aptly-dev
ADD --chown=aptly:aptly . /work/src/
# Pre-populate the Go module cache so go mod verify works offline
RUN chown aptly /work/src && mkdir -p /work/src/.go && chown aptly /work/src/.go && \
cd /work/src && sudo -u aptly GOPATH=/work/src/.go GOCACHE=/work/src/.go/cache go mod download
+3 -2
View File
@@ -1207,10 +1207,11 @@ class PublishSwitchAPITestSnapshot(APITest):
self.check_equal(all_repos.status_code, 200) self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json()) self.check_in(repo_expected, all_repos.json())
# FIXME: what should exist here ? publish snapshot of snapshot
self.check_not_exists( self.check_not_exists(
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
self.check_exists("public/" + prefix + self.check_not_exists("public/" + prefix +
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
task = self.delete_task("/api/publish/" + prefix + "/wheezy") task = self.delete_task("/api/publish/" + prefix + "/wheezy")
self.check_task(task) self.check_task(task)