Compare commits

..

15 Commits

Author SHA1 Message Date
André Roth b9521660d3 locking: use uuids, since names can be renamed 2026-06-10 19:11:58 +02:00
André Roth d5b1482ab5 tasks: show resources 2026-06-10 19:01:17 +02:00
André Roth 5f5104389a snapshots: fix snapshot of snapshot
### api/snapshot.go — fix apiSnapshotsCreate

 The function was building the new snapshot's ref list only from b.PackageRefs, completely ignoring SourceSnapshots package contents (they were stored as provenance metadata only). The fix
 mirrors the approach from apiSnapshotsMerge:

 1. Start with source snapshots: merge all freshSources[i].RefList() together using Merge(overrideMatching=true)
 2. Layer explicit PackageRefs on top: only enter the package-loading loop if b.PackageRefs is non-empty, then merge the result into refList
 3. Pass the combined refList to NewSnapshotFromRefList

 This means an empty snapshot (SourceSnapshots: [], PackageRefs: []) still correctly produces an empty ref list, single-source and multi-source snapshot-of-snapshot cases are now handled,
 and PackageRefs can still augment or override on top of the merged sources.

 ### system/t12_api/publish.py — fix PublishSwitchAPITestSnapshot

 - Removed the # FIXME comment
 - Changed check_not_exists → check_exists for pyspi-0.6.1-1.3.stripped.dsc after the publish-switch to snapshot2, which is now the correct expectation since snapshot2 inherits all packages
   from snapshot1
2026-06-10 19:01:17 +02:00
André Roth b5343b627c tests: cleanup 2026-06-10 19:01:17 +02:00
André Roth ddd041e26a system tests: add repo and publish tests 2026-06-10 19:01:17 +02:00
André Roth 853e548235 publish: lock pool on non MultiDist publish
* revert mutex on LinkFromPool
* add tests
2026-06-10 19:01:17 +02:00
André Roth 1946840e0f tasks: fix task state locking
Race condition iexisted where task State, err, and processReturnValue fields
were written by consumer goroutine and read by concurrent accessors without
proper synchronization, causing torn reads and data races.

Implemented single-lock model with optimal lock scope:

- Removed 8 accessor methods (direct field access is simpler)
- Lock only during brief state transitions (IDLE→RUNNING, RUNNING→SUCCEEDED/FAILED)
- Release lock during task.process() execution to enable full concurrency
- Readers hold list.Lock() only during atomic struct copy
- Moved State = RUNNING before goroutine spawn for clearer semantics
- task/list.go: RunTaskInBackground() copies *task before unlock,
    returns the pre-made copy instead of dereferencing after unlock
2026-06-10 19:01:17 +02:00
André Roth f573688fbb mirror: load data inside background tasks
This fixes a flaw in async apis, which loaded data from the
DB and mutated it outside the task closure, before the task lock was acquired.

 * perform collection.LoadComplete inside maybeRunTaskInBackground
 * have tasks use a fresh copy of taskCollectionFactory, taskCollection
2026-06-10 19:01:17 +02:00
André Roth e4ba2fc00e snapshot: load data inside background tasks
This fixes a flaw in async apis, which loaded data from the
DB and mutated it outside the task closure, before the task lock was acquired.

 * perform collection.LoadComplete inside maybeRunTaskInBackground
 * have tasks use a fresh copy of taskCollectionFactory, taskCollection
2026-06-10 19:01:17 +02:00
André Roth 9116b68db5 repos: load data inside background tasks
This fixes a flaw in async apis, which loaded data from the
DB and mutated it outside the task closure, before the task lock was acquired.

    * perform collection.LoadComplete inside maybeRunTaskInBackground
    * have tasks use a fresh copy of taskCollectionFactory, taskCollection
2026-06-10 19:01:17 +02:00
André Roth 2266eba019 publish: lock source repos/snapshots
Concurrent tasks were not properly locking their resources, leading to
inconsistent published indexes:

  Task A: apiPublishUpdateSwitch loads published, reads source repo/snapshot
  Request B: modifies same source repo or snapshot (add/remove packages, etc)
  Task A: Update() + Publish() reads stale/modified source -> inconsistent
           published index, or partial write if source deleted mid-task.

This changes introduces resource locking for publishing:
* SourceLocalRepo: iterate published.Sources (component -> source UUID), look up each local repo via localRepoCollection.ByUUID and append
string(repo.Key()) to resources
* SourceSnapshot: iterate b.Snapshots,look up each snapshot via snapshotCollection.ByName and append
string(snapshot.ResourceKey()) to resources.
2026-06-07 23:47:09 +02:00
André Roth 826c6a19fd publish: load data inside background tasks
This fixes a flaw in async apis, which loaded the published repo from the
DB and mutated it outside the task closure, before the task lock was acquired.

* perform collection.LoadComplete inside maybeRunTaskInBackground
* have tasks use a fresh copy of taskCollectionFactory, taskCollection
2026-06-07 23:47:09 +02:00
André Roth 25a0318d27 publish: remove useless resource lock
resource locks need to be before the background task. creating same publish endpoint at the same time is unlikely...
2026-06-07 23:47:09 +02:00
André Roth 2974558aa7 cleanup 2026-06-07 23:46:43 +02:00
André Roth 00773f9840 ci: update codecov-action to 7.0.0 2026-06-07 22:53:09 +02:00
12 changed files with 71 additions and 113 deletions
+3 -3
View File
@@ -1,3 +1,4 @@
---
name: CI name: CI
on: on:
@@ -10,7 +11,6 @@ on:
defaults: defaults:
run: run:
# see: https://docs.github.com/en/actions/reference/workflow-syntax-for-github-actions#using-a-specific-shell
shell: bash --noprofile --norc -eo pipefail {0} shell: bash --noprofile --norc -eo pipefail {0}
env: env:
@@ -33,7 +33,7 @@ jobs:
make docker-image make docker-image
- name: "Unit Tests" - name: "Unit Tests"
run: | run: |
make docker-unit-tests make docker-unit-test
mkdir -p out/coverage mkdir -p out/coverage
mv unit.out out/coverage/ mv unit.out out/coverage/
- uses: actions/upload-artifact@v4 - uses: actions/upload-artifact@v4
@@ -140,7 +140,7 @@ jobs:
- name: "Upload Code Coverage" - name: "Upload Code Coverage"
if: github.actor != 'dependabot[bot]' if: github.actor != 'dependabot[bot]'
uses: codecov/codecov-action@v5 uses: codecov/codecov-action@v7.0.0
with: with:
token: ${{ secrets.CODECOV_TOKEN }} token: ${{ secrets.CODECOV_TOKEN }}
files: coverage.txt files: coverage.txt
+1 -1
View File
@@ -130,7 +130,7 @@ aptly version: 1.5.0+189+g0fc90dff
In order to run aptly unit tests, enter the following: In order to run aptly unit tests, enter the following:
``` ```
make docker-unit-tests make docker-unit-test
``` ```
#### Running system tests #### Running system tests
+1 -4
View File
@@ -182,9 +182,6 @@ binaries: prepare swagger ## Build binary releases (FreeBSD, macOS, Linux gener
docker-image: ## Build aptly-dev docker image docker-image: ## Build aptly-dev docker image
@docker build -f system/Dockerfile . -t aptly-dev @docker build -f system/Dockerfile . -t aptly-dev
docker-image-test: # Build aptly-test docker image for testing
@docker build -f docker/test.Dockerfile . -t aptly-test
docker-image-no-cache: ## Build aptly-dev docker image (no cache) docker-image-no-cache: ## Build aptly-dev docker image (no cache)
@docker build --no-cache -f system/Dockerfile . -t aptly-dev @docker build --no-cache -f system/Dockerfile . -t aptly-dev
@@ -197,7 +194,7 @@ docker-shell: ## Run aptly and other commands in docker container
docker-deb: ## Build debian packages in docker container docker-deb: ## Build debian packages in docker container
@$(DOCKER_RUN) -t aptly-dev /work/src/system/docker-wrapper dpkg DEBARCH=amd64 @$(DOCKER_RUN) -t aptly-dev /work/src/system/docker-wrapper dpkg DEBARCH=amd64
docker-unit-tests: ## Run unit tests in docker container (add TEST=regex to specify which tests to run) docker-unit-test: ## Run unit tests in docker container (add TEST=regex to specify which tests to run)
$(DOCKER_RUN) -t --tmpfs /smallfs:rw,size=1m aptly-dev /work/src/system/docker-wrapper \ $(DOCKER_RUN) -t --tmpfs /smallfs:rw,size=1m aptly-dev /work/src/system/docker-wrapper \
azurite-start \ azurite-start \
AZURE_STORAGE_ENDPOINT=http://127.0.0.1:10000/devstoreaccount1 \ AZURE_STORAGE_ENDPOINT=http://127.0.0.1:10000/devstoreaccount1 \
+9 -17
View File
@@ -2,7 +2,6 @@ package api
import ( import (
"fmt" "fmt"
"log"
"net/http" "net/http"
"strings" "strings"
@@ -268,7 +267,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
return return
} }
resources = append(resources, string(snapshot.ResourceKey())) resources = append(resources, string(snapshot.Key()))
sources = append(sources, snapshot) sources = append(sources, snapshot)
} }
} else if b.SourceKind == deb.SourceLocalRepo { } else if b.SourceKind == deb.SourceLocalRepo {
@@ -299,23 +298,16 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
multiDist = *b.MultiDist multiDist = *b.MultiDist
} }
// Pre-register the published repo key in resources so that concurrent // Non-MultiDist publishes share a single pool/ directory under the
// POST requests for the same prefix/distribution are serialized by the // prefix. Lock at the prefix level so that concurrent publish/drop
// task queue rather than racing on CheckDuplicate + Add. // operations on sibling distributions cannot race during cleanup.
if b.Distribution != "" { if !multiDist {
storagePrefix := prefix storagePrefix := prefix
if storage != "" { if storage != "" {
storagePrefix = storage + ":" + prefix storagePrefix = storage + ":" + prefix
} }
resources = append(resources, "U"+storagePrefix+">>"+b.Distribution)
// Non-MultiDist publishes share a single pool/ directory under the resources = append(resources, deb.PrefixPoolLockKey(storagePrefix))
// prefix. Lock at the prefix level so that concurrent publish/drop
// operations on sibling distributions cannot race during cleanup.
if !multiDist {
resources = append(resources, deb.PrefixPoolLockKey(storagePrefix))
}
} else {
log.Printf("distribution not specified for publish to prefix '%s' - unable to lock ", prefix)
} }
taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"", taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"",
@@ -507,7 +499,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
AbortWithJSONError(c, http.StatusNotFound, err2) AbortWithJSONError(c, http.StatusNotFound, err2)
return return
} }
resources = append(resources, string(snapshot.ResourceKey())) resources = append(resources, string(snapshot.Key()))
} }
} else { } else {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type")) AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type"))
@@ -1185,7 +1177,7 @@ func apiPublishUpdate(c *gin.Context) {
AbortWithJSONError(c, http.StatusNotFound, err2) AbortWithJSONError(c, http.StatusNotFound, err2)
return return
} }
resources = append(resources, string(snapshot.ResourceKey())) resources = append(resources, string(snapshot.Key()))
} }
} }
+4 -34
View File
@@ -429,48 +429,18 @@ func (s *PublishedFileMissingSuite) TestIdenticalPackageRace(c *C) {
go func() { go func() {
defer wg.Done() defer wg.Done()
//time.Sleep(5 * time.Millisecond) s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[0], uploadID1), nil)
c.Logf("[iter %d] Import A", iter)
resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[0], uploadID1), nil)
c.Logf("[iter %d] Import A complete: %d", iter, resp.Code)
updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true}) updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
c.Logf("[iter %d] Publish A", iter) s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody)
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody)
c.Logf("[iter %d] Publish A complete: %d", iter, resp.Code)
}() }()
go func() { go func() {
defer wg.Done() defer wg.Done()
//time.Sleep(7 * time.Millisecond) s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[1], uploadID2), nil)
c.Logf("[iter %d] Import B", iter)
resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[1], uploadID2), nil)
c.Logf("[iter %d] Import B complete: %d", iter, resp.Code)
updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true}) updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
c.Logf("[iter %d] Publish B", iter) s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody)
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody)
c.Logf("[iter %d] Publish B complete: %d", iter, resp.Code)
}() }()
//go func() {
//defer wg.Done()
//time.Sleep(15 * time.Millisecond)
//updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
//c.Logf("[iter %d] Publish A", iter)
//resp := s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody)
//c.Logf("[iter %d] Publish A complete: %d", iter, resp.Code)
//}()
//go func() {
//defer wg.Done()
//time.Sleep(18 * time.Millisecond)
//updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
//c.Logf("[iter %d] Publish B", iter)
//resp := s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody)
//c.Logf("[iter %d] Publish B complete: %d", iter, resp.Code)
//}()
wg.Wait() wg.Wait()
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
c.Logf("[iter %d] All operations complete", iter) c.Logf("[iter %d] All operations complete", iter)
+10 -10
View File
@@ -134,21 +134,14 @@ func apiReposCreate(c *gin.Context) {
// Handler: Pre-task validations (shallow) // Handler: Pre-task validations (shallow)
collectionFactory := context.NewCollectionFactory() collectionFactory := context.NewCollectionFactory()
var resources []string
if b.FromSnapshot != "" { if b.FromSnapshot != "" {
snapshotCollection := collectionFactory.SnapshotCollection() snapshot, err := collectionFactory.SnapshotCollection().ByName(b.FromSnapshot)
_, err := snapshotCollection.ByName(b.FromSnapshot)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("source snapshot not found: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("source snapshot not found: %s", err))
return return
} }
// Just verify it exists - don't load here resources = append(resources, string(snapshot.Key()))
}
// Use generated key resource for repo being created
resources := []string{"LocalRepo:" + b.Name}
if b.FromSnapshot != "" {
resources = append(resources, "Snapshot:"+b.FromSnapshot)
} }
taskName := fmt.Sprintf("Create repository %s", b.Name) taskName := fmt.Sprintf("Create repository %s", b.Name)
@@ -236,6 +229,13 @@ func apiReposEdit(c *gin.Context) {
return return
} }
if b.Name != nil && *b.Name != name {
if _, err = collection.ByName(*b.Name); err == nil {
AbortWithJSONError(c, 409, fmt.Errorf("unable to rename: local repo %q already exists", *b.Name))
return
}
}
resources := []string{string(repo.Key())} resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Edit repository %s", name) taskName := fmt.Sprintf("Edit repository %s", name)
+35 -22
View File
@@ -92,7 +92,7 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) {
} }
// including snapshot resource key // including snapshot resource key
resources := []string{string(repo.Key()), "S" + b.Name} resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Create snapshot of mirror %s", name) taskName := fmt.Sprintf("Create snapshot of mirror %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory() taskCollectionFactory := context.NewCollectionFactory()
@@ -186,7 +186,7 @@ func apiSnapshotsCreate(c *gin.Context) {
return return
} }
resources = append(resources, string(sources[i].ResourceKey())) resources = append(resources, string(sources[i].Key()))
} }
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
@@ -209,24 +209,37 @@ func apiSnapshotsCreate(c *gin.Context) {
} }
} }
list := deb.NewPackageList() // Merge packages from all source snapshots
var refList *deb.PackageRefList
// verify package refs and build package list using fresh factory if len(freshSources) > 0 {
for _, ref := range b.PackageRefs { refList = freshSources[0].RefList()
p, err := taskPackageCollection.ByKey([]byte(ref)) for i := 1; i < len(freshSources); i++ {
if err != nil { refList = refList.Merge(freshSources[i].RefList(), true, false)
if err == database.ErrNotFound {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err)
}
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = list.Add(p)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
} }
} else {
refList = deb.NewPackageRefList()
} }
snapshot = deb.NewSnapshotFromRefList(b.Name, freshSources, deb.NewPackageRefListFromPackageList(list), b.Description) // Add any explicitly specified package refs on top
if len(b.PackageRefs) > 0 {
list := deb.NewPackageList()
for _, ref := range b.PackageRefs {
p, err := taskPackageCollection.ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err)
}
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = list.Add(p)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
}
}
refList = refList.Merge(deb.NewPackageRefListFromPackageList(list), true, false)
}
snapshot = deb.NewSnapshotFromRefList(b.Name, freshSources, refList, b.Description)
err = taskSnapshotCollection.Add(snapshot) err = taskSnapshotCollection.Add(snapshot)
if err != nil { if err != nil {
@@ -278,7 +291,7 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) {
} }
// including snapshot resource key // including snapshot resource key
resources := []string{string(repo.Key()), "S" + b.Name} resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Create snapshot of repo %s", name) taskName := fmt.Sprintf("Create snapshot of repo %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory() taskCollectionFactory := context.NewCollectionFactory()
@@ -362,7 +375,7 @@ func apiSnapshotsUpdate(c *gin.Context) {
} }
} }
resources := []string{string(snapshot.ResourceKey()), "S" + b.Name} resources := []string{string(snapshot.Key())}
taskName := fmt.Sprintf("Update snapshot %s", name) taskName := fmt.Sprintf("Update snapshot %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
@@ -457,7 +470,7 @@ func apiSnapshotsDrop(c *gin.Context) {
return return
} }
resources := []string{string(snapshot.ResourceKey())} resources := []string{string(snapshot.Key())}
taskName := fmt.Sprintf("Delete snapshot %s", name) taskName := fmt.Sprintf("Delete snapshot %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
@@ -655,7 +668,7 @@ func apiSnapshotsMerge(c *gin.Context) {
return return
} }
resources[i] = string(sources[i].ResourceKey()) resources[i] = string(sources[i].Key())
} }
maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
@@ -776,7 +789,7 @@ func apiSnapshotsPull(c *gin.Context) {
return return
} }
resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())} resources := []string{string(sourceSnapshot.Key()), string(toSnapshot.Key())}
taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination) taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Phase 2: Inside task lock - create fresh factory // Phase 2: Inside task lock - create fresh factory
-6
View File
@@ -143,12 +143,6 @@ func (s *Snapshot) Key() []byte {
return []byte("S" + s.UUID) return []byte("S" + s.UUID)
} }
// ResourceKey is a unique identifier of the resource
// this snapshot uses. Instead of uuid it uses name
// which needs to be unique as well.
func (s *Snapshot) ResourceKey() []byte {
return []byte("S" + s.Name)
}
// RefKey is a unique id for package reference list // RefKey is a unique id for package reference list
func (s *Snapshot) RefKey() []byte { func (s *Snapshot) RefKey() []byte {
-7
View File
@@ -1,7 +0,0 @@
FROM aptly-dev
ADD --chown=aptly:aptly . /work/src/
# Pre-populate the Go module cache so go mod verify works offline
RUN chown aptly /work/src && mkdir -p /work/src/.go && chown aptly /work/src/.go && \
cd /work/src && sudo -u aptly GOPATH=/work/src/.go GOCACHE=/work/src/.go/cache go mod download
+2 -3
View File
@@ -1207,11 +1207,10 @@ class PublishSwitchAPITestSnapshot(APITest):
self.check_equal(all_repos.status_code, 200) self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json()) self.check_in(repo_expected, all_repos.json())
# FIXME: what should exist here ? publish snapshot of snapshot
self.check_not_exists( self.check_not_exists(
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
self.check_not_exists("public/" + prefix + self.check_exists("public/" + prefix +
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
task = self.delete_task("/api/publish/" + prefix + "/wheezy") task = self.delete_task("/api/publish/" + prefix + "/wheezy")
self.check_task(task) self.check_task(task)
+4 -4
View File
@@ -67,7 +67,7 @@ func (list *List) consumer() {
task.processReturnValue = retValue task.processReturnValue = retValue
} }
list.usedResources.Free(task.resources) list.usedResources.Free(task.Resources)
task.wgTask.Done() task.wgTask.Done()
list.wg.Done() list.wg.Done()
@@ -76,9 +76,9 @@ func (list *List) consumer() {
for _, t := range list.tasks { for _, t := range list.tasks {
if t.State == IDLE { if t.State == IDLE {
// check resources // check resources
blockingTasks := list.usedResources.UsedBy(t.resources) blockingTasks := list.usedResources.UsedBy(t.Resources)
if len(blockingTasks) == 0 { if len(blockingTasks) == 0 {
list.usedResources.MarkInUse(t.resources, t) list.usedResources.MarkInUse(t.Resources, t)
// unlock list since queueing may block // unlock list since queueing may block
list.Unlock() list.Unlock()
unlocked = true unlocked = true
@@ -219,7 +219,7 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
// if not, task will be queued by the consumer once resources are available // if not, task will be queued by the consumer once resources are available
tasks := list.usedResources.UsedBy(resources) tasks := list.usedResources.UsedBy(resources)
if len(tasks) == 0 { if len(tasks) == 0 {
list.usedResources.MarkInUse(task.resources, task) list.usedResources.MarkInUse(task.Resources, task)
// queueing task might block if channel not ready, unlock list before queueing // queueing task might block if channel not ready, unlock list before queueing
list.Unlock() list.Unlock()
list.queue <- task list.queue <- task
+2 -2
View File
@@ -52,7 +52,7 @@ type Task struct {
Name string Name string
ID int ID int
State State State State
resources []string Resources []string
wgTask *sync.WaitGroup wgTask *sync.WaitGroup
} }
@@ -65,7 +65,7 @@ func NewTask(process Process, name string, ID int, resources []string, wgTask *s
Name: name, Name: name,
ID: ID, ID: ID,
State: IDLE, State: IDLE,
resources: resources, Resources: resources,
wgTask: wgTask, wgTask: wgTask,
} }
return task return task