Compare commits

..

15 Commits

Author SHA1 Message Date
André Roth b9521660d3 locking: use uuids, since names can be renamed 2026-06-10 19:11:58 +02:00
André Roth d5b1482ab5 tasks: show resources 2026-06-10 19:01:17 +02:00
André Roth 5f5104389a snapshots: fix snapshot of snapshot
### api/snapshot.go — fix apiSnapshotsCreate

 The function was building the new snapshot's ref list only from b.PackageRefs, completely ignoring SourceSnapshots package contents (they were stored as provenance metadata only). The fix
 mirrors the approach from apiSnapshotsMerge:

 1. Start with source snapshots: merge all freshSources[i].RefList() together using Merge(overrideMatching=true)
 2. Layer explicit PackageRefs on top: only enter the package-loading loop if b.PackageRefs is non-empty, then merge the result into refList
 3. Pass the combined refList to NewSnapshotFromRefList

 This means an empty snapshot (SourceSnapshots: [], PackageRefs: []) still correctly produces an empty ref list, single-source and multi-source snapshot-of-snapshot cases are now handled,
 and PackageRefs can still augment or override on top of the merged sources.

 ### system/t12_api/publish.py — fix PublishSwitchAPITestSnapshot

 - Removed the # FIXME comment
 - Changed check_not_exists → check_exists for pyspi-0.6.1-1.3.stripped.dsc after the publish-switch to snapshot2, which is now the correct expectation since snapshot2 inherits all packages
   from snapshot1
2026-06-10 19:01:17 +02:00
André Roth b5343b627c tests: cleanup 2026-06-10 19:01:17 +02:00
André Roth ddd041e26a system tests: add repo and publish tests 2026-06-10 19:01:17 +02:00
André Roth 853e548235 publish: lock pool on non MultiDist publish
* revert mutex on LinkFromPool
* add tests
2026-06-10 19:01:17 +02:00
André Roth 1946840e0f tasks: fix task state locking
Race condition iexisted where task State, err, and processReturnValue fields
were written by consumer goroutine and read by concurrent accessors without
proper synchronization, causing torn reads and data races.

Implemented single-lock model with optimal lock scope:

- Removed 8 accessor methods (direct field access is simpler)
- Lock only during brief state transitions (IDLE→RUNNING, RUNNING→SUCCEEDED/FAILED)
- Release lock during task.process() execution to enable full concurrency
- Readers hold list.Lock() only during atomic struct copy
- Moved State = RUNNING before goroutine spawn for clearer semantics
- task/list.go: RunTaskInBackground() copies *task before unlock,
    returns the pre-made copy instead of dereferencing after unlock
2026-06-10 19:01:17 +02:00
André Roth f573688fbb mirror: load data inside background tasks
This fixes a flaw in async apis, which loaded data from the
DB and mutated it outside the task closure, before the task lock was acquired.

 * perform collection.LoadComplete inside maybeRunTaskInBackground
 * have tasks use a fresh copy of taskCollectionFactory, taskCollection
2026-06-10 19:01:17 +02:00
André Roth e4ba2fc00e snapshot: load data inside background tasks
This fixes a flaw in async apis, which loaded data from the
DB and mutated it outside the task closure, before the task lock was acquired.

 * perform collection.LoadComplete inside maybeRunTaskInBackground
 * have tasks use a fresh copy of taskCollectionFactory, taskCollection
2026-06-10 19:01:17 +02:00
André Roth 9116b68db5 repos: load data inside background tasks
This fixes a flaw in async apis, which loaded data from the
DB and mutated it outside the task closure, before the task lock was acquired.

    * perform collection.LoadComplete inside maybeRunTaskInBackground
    * have tasks use a fresh copy of taskCollectionFactory, taskCollection
2026-06-10 19:01:17 +02:00
André Roth 2266eba019 publish: lock source repos/snapshots
Concurrent tasks were not properly locking their resources, leading to
inconsistent published indexes:

  Task A: apiPublishUpdateSwitch loads published, reads source repo/snapshot
  Request B: modifies same source repo or snapshot (add/remove packages, etc)
  Task A: Update() + Publish() reads stale/modified source -> inconsistent
           published index, or partial write if source deleted mid-task.

This changes introduces resource locking for publishing:
* SourceLocalRepo: iterate published.Sources (component -> source UUID), look up each local repo via localRepoCollection.ByUUID and append
string(repo.Key()) to resources
* SourceSnapshot: iterate b.Snapshots,look up each snapshot via snapshotCollection.ByName and append
string(snapshot.ResourceKey()) to resources.
2026-06-07 23:47:09 +02:00
André Roth 826c6a19fd publish: load data inside background tasks
This fixes a flaw in async apis, which loaded the published repo from the
DB and mutated it outside the task closure, before the task lock was acquired.

* perform collection.LoadComplete inside maybeRunTaskInBackground
* have tasks use a fresh copy of taskCollectionFactory, taskCollection
2026-06-07 23:47:09 +02:00
André Roth 25a0318d27 publish: remove useless resource lock
resource locks need to be before the background task. creating same publish endpoint at the same time is unlikely...
2026-06-07 23:47:09 +02:00
André Roth 2974558aa7 cleanup 2026-06-07 23:46:43 +02:00
André Roth 00773f9840 ci: update codecov-action to 7.0.0 2026-06-07 22:53:09 +02:00
12 changed files with 71 additions and 113 deletions
+3 -3
View File
@@ -1,3 +1,4 @@
---
name: CI
on:
@@ -10,7 +11,6 @@ on:
defaults:
run:
# see: https://docs.github.com/en/actions/reference/workflow-syntax-for-github-actions#using-a-specific-shell
shell: bash --noprofile --norc -eo pipefail {0}
env:
@@ -33,7 +33,7 @@ jobs:
make docker-image
- name: "Unit Tests"
run: |
make docker-unit-tests
make docker-unit-test
mkdir -p out/coverage
mv unit.out out/coverage/
- uses: actions/upload-artifact@v4
@@ -140,7 +140,7 @@ jobs:
- name: "Upload Code Coverage"
if: github.actor != 'dependabot[bot]'
uses: codecov/codecov-action@v5
uses: codecov/codecov-action@v7.0.0
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: coverage.txt
+1 -1
View File
@@ -130,7 +130,7 @@ aptly version: 1.5.0+189+g0fc90dff
In order to run aptly unit tests, enter the following:
```
make docker-unit-tests
make docker-unit-test
```
#### Running system tests
+1 -4
View File
@@ -182,9 +182,6 @@ binaries: prepare swagger ## Build binary releases (FreeBSD, macOS, Linux gener
docker-image: ## Build aptly-dev docker image
@docker build -f system/Dockerfile . -t aptly-dev
docker-image-test: # Build aptly-test docker image for testing
@docker build -f docker/test.Dockerfile . -t aptly-test
docker-image-no-cache: ## Build aptly-dev docker image (no cache)
@docker build --no-cache -f system/Dockerfile . -t aptly-dev
@@ -197,7 +194,7 @@ docker-shell: ## Run aptly and other commands in docker container
docker-deb: ## Build debian packages in docker container
@$(DOCKER_RUN) -t aptly-dev /work/src/system/docker-wrapper dpkg DEBARCH=amd64
docker-unit-tests: ## Run unit tests in docker container (add TEST=regex to specify which tests to run)
docker-unit-test: ## Run unit tests in docker container (add TEST=regex to specify which tests to run)
$(DOCKER_RUN) -t --tmpfs /smallfs:rw,size=1m aptly-dev /work/src/system/docker-wrapper \
azurite-start \
AZURE_STORAGE_ENDPOINT=http://127.0.0.1:10000/devstoreaccount1 \
+9 -17
View File
@@ -2,7 +2,6 @@ package api
import (
"fmt"
"log"
"net/http"
"strings"
@@ -268,7 +267,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
return
}
resources = append(resources, string(snapshot.ResourceKey()))
resources = append(resources, string(snapshot.Key()))
sources = append(sources, snapshot)
}
} else if b.SourceKind == deb.SourceLocalRepo {
@@ -299,23 +298,16 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
multiDist = *b.MultiDist
}
// Pre-register the published repo key in resources so that concurrent
// POST requests for the same prefix/distribution are serialized by the
// task queue rather than racing on CheckDuplicate + Add.
if b.Distribution != "" {
// Non-MultiDist publishes share a single pool/ directory under the
// prefix. Lock at the prefix level so that concurrent publish/drop
// operations on sibling distributions cannot race during cleanup.
if !multiDist {
storagePrefix := prefix
if storage != "" {
storagePrefix = storage + ":" + prefix
}
resources = append(resources, "U"+storagePrefix+">>"+b.Distribution)
// Non-MultiDist publishes share a single pool/ directory under the
// prefix. Lock at the prefix level so that concurrent publish/drop
// operations on sibling distributions cannot race during cleanup.
if !multiDist {
resources = append(resources, deb.PrefixPoolLockKey(storagePrefix))
}
} else {
log.Printf("distribution not specified for publish to prefix '%s' - unable to lock ", prefix)
resources = append(resources, deb.PrefixPoolLockKey(storagePrefix))
}
taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"",
@@ -507,7 +499,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(snapshot.ResourceKey()))
resources = append(resources, string(snapshot.Key()))
}
} else {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type"))
@@ -1185,7 +1177,7 @@ func apiPublishUpdate(c *gin.Context) {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(snapshot.ResourceKey()))
resources = append(resources, string(snapshot.Key()))
}
}
+4 -34
View File
@@ -429,48 +429,18 @@ func (s *PublishedFileMissingSuite) TestIdenticalPackageRace(c *C) {
go func() {
defer wg.Done()
//time.Sleep(5 * time.Millisecond)
c.Logf("[iter %d] Import A", iter)
resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[0], uploadID1), nil)
c.Logf("[iter %d] Import A complete: %d", iter, resp.Code)
s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[0], uploadID1), nil)
updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
c.Logf("[iter %d] Publish A", iter)
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody)
c.Logf("[iter %d] Publish A complete: %d", iter, resp.Code)
s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody)
}()
go func() {
defer wg.Done()
//time.Sleep(7 * time.Millisecond)
c.Logf("[iter %d] Import B", iter)
resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[1], uploadID2), nil)
c.Logf("[iter %d] Import B complete: %d", iter, resp.Code)
s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[1], uploadID2), nil)
updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
c.Logf("[iter %d] Publish B", iter)
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody)
c.Logf("[iter %d] Publish B complete: %d", iter, resp.Code)
s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody)
}()
//go func() {
//defer wg.Done()
//time.Sleep(15 * time.Millisecond)
//updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
//c.Logf("[iter %d] Publish A", iter)
//resp := s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody)
//c.Logf("[iter %d] Publish A complete: %d", iter, resp.Code)
//}()
//go func() {
//defer wg.Done()
//time.Sleep(18 * time.Millisecond)
//updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
//c.Logf("[iter %d] Publish B", iter)
//resp := s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody)
//c.Logf("[iter %d] Publish B complete: %d", iter, resp.Code)
//}()
wg.Wait()
time.Sleep(200 * time.Millisecond)
c.Logf("[iter %d] All operations complete", iter)
+10 -10
View File
@@ -134,21 +134,14 @@ func apiReposCreate(c *gin.Context) {
// Handler: Pre-task validations (shallow)
collectionFactory := context.NewCollectionFactory()
var resources []string
if b.FromSnapshot != "" {
snapshotCollection := collectionFactory.SnapshotCollection()
_, err := snapshotCollection.ByName(b.FromSnapshot)
snapshot, err := collectionFactory.SnapshotCollection().ByName(b.FromSnapshot)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("source snapshot not found: %s", err))
return
}
// Just verify it exists - don't load here
}
// Use generated key resource for repo being created
resources := []string{"LocalRepo:" + b.Name}
if b.FromSnapshot != "" {
resources = append(resources, "Snapshot:"+b.FromSnapshot)
resources = append(resources, string(snapshot.Key()))
}
taskName := fmt.Sprintf("Create repository %s", b.Name)
@@ -236,6 +229,13 @@ func apiReposEdit(c *gin.Context) {
return
}
if b.Name != nil && *b.Name != name {
if _, err = collection.ByName(*b.Name); err == nil {
AbortWithJSONError(c, 409, fmt.Errorf("unable to rename: local repo %q already exists", *b.Name))
return
}
}
resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Edit repository %s", name)
+35 -22
View File
@@ -92,7 +92,7 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) {
}
// including snapshot resource key
resources := []string{string(repo.Key()), "S" + b.Name}
resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Create snapshot of mirror %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory()
@@ -186,7 +186,7 @@ func apiSnapshotsCreate(c *gin.Context) {
return
}
resources = append(resources, string(sources[i].ResourceKey()))
resources = append(resources, string(sources[i].Key()))
}
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
@@ -209,24 +209,37 @@ func apiSnapshotsCreate(c *gin.Context) {
}
}
list := deb.NewPackageList()
// verify package refs and build package list using fresh factory
for _, ref := range b.PackageRefs {
p, err := taskPackageCollection.ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err)
}
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = list.Add(p)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
// Merge packages from all source snapshots
var refList *deb.PackageRefList
if len(freshSources) > 0 {
refList = freshSources[0].RefList()
for i := 1; i < len(freshSources); i++ {
refList = refList.Merge(freshSources[i].RefList(), true, false)
}
} else {
refList = deb.NewPackageRefList()
}
snapshot = deb.NewSnapshotFromRefList(b.Name, freshSources, deb.NewPackageRefListFromPackageList(list), b.Description)
// Add any explicitly specified package refs on top
if len(b.PackageRefs) > 0 {
list := deb.NewPackageList()
for _, ref := range b.PackageRefs {
p, err := taskPackageCollection.ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err)
}
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = list.Add(p)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
}
}
refList = refList.Merge(deb.NewPackageRefListFromPackageList(list), true, false)
}
snapshot = deb.NewSnapshotFromRefList(b.Name, freshSources, refList, b.Description)
err = taskSnapshotCollection.Add(snapshot)
if err != nil {
@@ -278,7 +291,7 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) {
}
// including snapshot resource key
resources := []string{string(repo.Key()), "S" + b.Name}
resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Create snapshot of repo %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory()
@@ -362,7 +375,7 @@ func apiSnapshotsUpdate(c *gin.Context) {
}
}
resources := []string{string(snapshot.ResourceKey()), "S" + b.Name}
resources := []string{string(snapshot.Key())}
taskName := fmt.Sprintf("Update snapshot %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
@@ -457,7 +470,7 @@ func apiSnapshotsDrop(c *gin.Context) {
return
}
resources := []string{string(snapshot.ResourceKey())}
resources := []string{string(snapshot.Key())}
taskName := fmt.Sprintf("Delete snapshot %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
@@ -655,7 +668,7 @@ func apiSnapshotsMerge(c *gin.Context) {
return
}
resources[i] = string(sources[i].ResourceKey())
resources[i] = string(sources[i].Key())
}
maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
@@ -776,7 +789,7 @@ func apiSnapshotsPull(c *gin.Context) {
return
}
resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())}
resources := []string{string(sourceSnapshot.Key()), string(toSnapshot.Key())}
taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Phase 2: Inside task lock - create fresh factory
-6
View File
@@ -143,12 +143,6 @@ func (s *Snapshot) Key() []byte {
return []byte("S" + s.UUID)
}
// ResourceKey is a unique identifier of the resource
// this snapshot uses. Instead of uuid it uses name
// which needs to be unique as well.
func (s *Snapshot) ResourceKey() []byte {
return []byte("S" + s.Name)
}
// RefKey is a unique id for package reference list
func (s *Snapshot) RefKey() []byte {
-7
View File
@@ -1,7 +0,0 @@
FROM aptly-dev
ADD --chown=aptly:aptly . /work/src/
# Pre-populate the Go module cache so go mod verify works offline
RUN chown aptly /work/src && mkdir -p /work/src/.go && chown aptly /work/src/.go && \
cd /work/src && sudo -u aptly GOPATH=/work/src/.go GOCACHE=/work/src/.go/cache go mod download
+2 -3
View File
@@ -1207,11 +1207,10 @@ class PublishSwitchAPITestSnapshot(APITest):
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
# FIXME: what should exist here ? publish snapshot of snapshot
self.check_not_exists(
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
self.check_not_exists("public/" + prefix +
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
self.check_exists("public/" + prefix +
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
task = self.delete_task("/api/publish/" + prefix + "/wheezy")
self.check_task(task)
+4 -4
View File
@@ -67,7 +67,7 @@ func (list *List) consumer() {
task.processReturnValue = retValue
}
list.usedResources.Free(task.resources)
list.usedResources.Free(task.Resources)
task.wgTask.Done()
list.wg.Done()
@@ -76,9 +76,9 @@ func (list *List) consumer() {
for _, t := range list.tasks {
if t.State == IDLE {
// check resources
blockingTasks := list.usedResources.UsedBy(t.resources)
blockingTasks := list.usedResources.UsedBy(t.Resources)
if len(blockingTasks) == 0 {
list.usedResources.MarkInUse(t.resources, t)
list.usedResources.MarkInUse(t.Resources, t)
// unlock list since queueing may block
list.Unlock()
unlocked = true
@@ -219,7 +219,7 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
// if not, task will be queued by the consumer once resources are available
tasks := list.usedResources.UsedBy(resources)
if len(tasks) == 0 {
list.usedResources.MarkInUse(task.resources, task)
list.usedResources.MarkInUse(task.Resources, task)
// queueing task might block if channel not ready, unlock list before queueing
list.Unlock()
list.queue <- task
+2 -2
View File
@@ -52,7 +52,7 @@ type Task struct {
Name string
ID int
State State
resources []string
Resources []string
wgTask *sync.WaitGroup
}
@@ -65,7 +65,7 @@ func NewTask(process Process, name string, ID int, resources []string, wgTask *s
Name: name,
ID: ID,
State: IDLE,
resources: resources,
Resources: resources,
wgTask: wgTask,
}
return task