Compare commits

..

9 Commits

Author SHA1 Message Date
André Roth c825707fb9 system tests: add repo and publish tests 2026-06-08 00:00:20 +02:00
André Roth e52962955b publish: lock pool on non MultiDist publish
* revert mutex on LinkFromPool
* add tests
2026-06-08 00:00:20 +02:00
André Roth 4445839adc tasks: fix task state locking
Race condition iexisted where task State, err, and processReturnValue fields
were written by consumer goroutine and read by concurrent accessors without
proper synchronization, causing torn reads and data races.

Implemented single-lock model with optimal lock scope:

- Removed 8 accessor methods (direct field access is simpler)
- Lock only during brief state transitions (IDLE→RUNNING, RUNNING→SUCCEEDED/FAILED)
- Release lock during task.process() execution to enable full concurrency
- Readers hold list.Lock() only during atomic struct copy
- Moved State = RUNNING before goroutine spawn for clearer semantics
- task/list.go: RunTaskInBackground() copies *task before unlock,
    returns the pre-made copy instead of dereferencing after unlock
2026-06-07 23:47:09 +02:00
André Roth ca4cbd89f0 mirror: load data inside background tasks
This fixes a flaw in async apis, which loaded data from the
DB and mutated it outside the task closure, before the task lock was acquired.

 * perform collection.LoadComplete inside maybeRunTaskInBackground
 * have tasks use a fresh copy of taskCollectionFactory, taskCollection
2026-06-07 23:47:09 +02:00
André Roth 4b76b803e7 snapshot: load data inside background tasks
This fixes a flaw in async apis, which loaded data from the
DB and mutated it outside the task closure, before the task lock was acquired.

 * perform collection.LoadComplete inside maybeRunTaskInBackground
 * have tasks use a fresh copy of taskCollectionFactory, taskCollection
2026-06-07 23:47:09 +02:00
André Roth e4a4cd9640 repos: load data inside background tasks
This fixes a flaw in async apis, which loaded data from the
DB and mutated it outside the task closure, before the task lock was acquired.

    * perform collection.LoadComplete inside maybeRunTaskInBackground
    * have tasks use a fresh copy of taskCollectionFactory, taskCollection
2026-06-07 23:47:09 +02:00
André Roth 2266eba019 publish: lock source repos/snapshots
Concurrent tasks were not properly locking their resources, leading to
inconsistent published indexes:

  Task A: apiPublishUpdateSwitch loads published, reads source repo/snapshot
  Request B: modifies same source repo or snapshot (add/remove packages, etc)
  Task A: Update() + Publish() reads stale/modified source -> inconsistent
           published index, or partial write if source deleted mid-task.

This changes introduces resource locking for publishing:
* SourceLocalRepo: iterate published.Sources (component -> source UUID), look up each local repo via localRepoCollection.ByUUID and append
string(repo.Key()) to resources
* SourceSnapshot: iterate b.Snapshots,look up each snapshot via snapshotCollection.ByName and append
string(snapshot.ResourceKey()) to resources.
2026-06-07 23:47:09 +02:00
André Roth 826c6a19fd publish: load data inside background tasks
This fixes a flaw in async apis, which loaded the published repo from the
DB and mutated it outside the task closure, before the task lock was acquired.

* perform collection.LoadComplete inside maybeRunTaskInBackground
* have tasks use a fresh copy of taskCollectionFactory, taskCollection
2026-06-07 23:47:09 +02:00
André Roth 25a0318d27 publish: remove useless resource lock
resource locks need to be before the background task. creating same publish endpoint at the same time is unlikely...
2026-06-07 23:47:09 +02:00
14 changed files with 1705 additions and 680 deletions
+47 -15
View File
@@ -216,9 +216,9 @@ func apiMirrorsDrop(c *gin.Context) {
name := c.Params.ByName("name")
force := c.Request.URL.Query().Get("force") == "1"
// Phase 1: Pre-task validation (shallow load for 404 check only)
collectionFactory := context.NewCollectionFactory()
mirrorCollection := collectionFactory.RemoteRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
repo, err := mirrorCollection.ByName(name)
if err != nil {
@@ -228,21 +228,34 @@ func apiMirrorsDrop(c *gin.Context) {
resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Delete mirror %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err := repo.CheckLock()
// Phase 2: Inside task lock - create fresh collections
taskCollectionFactory := context.NewCollectionFactory()
taskMirrorCollection := taskCollectionFactory.RemoteRepoCollection()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
// Fresh load after lock acquired
repo, err := taskMirrorCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
}
err = repo.CheckLock()
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
}
if !force {
snapshots := snapshotCollection.ByRemoteRepoSource(repo)
// Fresh checks with current collections
snapshots := taskSnapshotCollection.ByRemoteRepoSource(repo)
if len(snapshots) > 0 {
return &task.ProcessReturnValue{Code: http.StatusForbidden, Value: nil}, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override")
}
}
err = mirrorCollection.Drop(repo)
err = taskMirrorCollection.Drop(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err)
}
@@ -535,7 +548,8 @@ func apiMirrorsUpdate(c *gin.Context) {
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.RemoteRepoCollection()
remote, err = collection.ByName(c.Params.ByName("name"))
name := c.Params.ByName("name")
remote, err = collection.ByName(name)
if err != nil {
AbortWithJSONError(c, 404, err)
return
@@ -550,6 +564,7 @@ func apiMirrorsUpdate(c *gin.Context) {
return
}
// Pre-task validation of new name if provided
if b.Name != remote.Name {
_, err = collection.ByName(b.Name)
if err == nil {
@@ -566,9 +581,26 @@ func apiMirrorsUpdate(c *gin.Context) {
resources := []string{string(remote.Key())}
maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
// Phase 2: Inside task lock - create fresh factory
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.RemoteRepoCollection()
// Fresh load after lock acquired (use captured `name` variable, not gin context)
remote, err := taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
// Fresh rename check inside lock (if renaming)
if b.Name != remote.Name {
_, err := taskCollection.ByName(b.Name)
if err == nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: mirror %s already exists", b.Name)
}
}
downloader := context.NewDownloader(out)
err := remote.Fetch(downloader, verifier, b.IgnoreSignatures)
err = remote.Fetch(downloader, verifier, b.IgnoreSignatures)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
@@ -580,14 +612,14 @@ func apiMirrorsUpdate(c *gin.Context) {
}
}
err = remote.DownloadPackageIndexes(out, downloader, verifier, collectionFactory, b.IgnoreSignatures, remote.SkipComponentCheck)
err = remote.DownloadPackageIndexes(out, downloader, verifier, taskCollectionFactory, b.IgnoreSignatures, remote.SkipComponentCheck)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
if remote.DownloadAppStream && !remote.IsFlat() {
err = remote.DownloadAppStreamFiles(out, downloader,
context.PackagePool(), collectionFactory.ChecksumCollection(nil), b.IgnoreChecksums)
context.PackagePool(), taskCollectionFactory.ChecksumCollection(nil), b.IgnoreChecksums)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
@@ -607,8 +639,8 @@ func apiMirrorsUpdate(c *gin.Context) {
}
}
queue, downloadSize, err := remote.BuildDownloadQueue(context.PackagePool(), collectionFactory.PackageCollection(),
collectionFactory.ChecksumCollection(nil), b.SkipExistingPackages, b.LatestOnly)
queue, downloadSize, err := remote.BuildDownloadQueue(context.PackagePool(), taskCollectionFactory.PackageCollection(),
taskCollectionFactory.ChecksumCollection(nil), b.SkipExistingPackages, b.LatestOnly)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
@@ -618,12 +650,12 @@ func apiMirrorsUpdate(c *gin.Context) {
e := context.ReOpenDatabase()
if e == nil {
remote.MarkAsIdle()
_ = collection.Update(remote)
_ = taskCollection.Update(remote)
}
}()
remote.MarkAsUpdating()
err = collection.Update(remote)
err = taskCollection.Update(remote)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
@@ -727,7 +759,7 @@ func apiMirrorsUpdate(c *gin.Context) {
}
// and import it back to the pool
task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, collectionFactory.ChecksumCollection(nil))
task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, taskCollectionFactory.ChecksumCollection(nil))
if err != nil {
//return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to import file: %s", err)
pushError(err)
@@ -780,8 +812,8 @@ func apiMirrorsUpdate(c *gin.Context) {
}
log.Info().Msgf("%s: Finalizing download...", b.Name)
_ = remote.FinalizeDownload(collectionFactory, out)
err = collectionFactory.RemoteRepoCollection().Update(remote)
_ = remote.FinalizeDownload(taskCollectionFactory, out)
err = taskCollection.Update(remote)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
+319 -198
View File
@@ -298,11 +298,24 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
multiDist = *b.MultiDist
}
collection := collectionFactory.PublishedRepoCollection()
// Non-MultiDist publishes share a single pool/ directory under the
// prefix. Lock at the prefix level so that concurrent publish/drop
// operations on sibling distributions cannot race during cleanup.
if !multiDist {
storagePrefix := prefix
if storage != "" {
storagePrefix = storage + ":" + prefix
}
resources = append(resources, deb.PrefixPoolLockKey(storagePrefix))
}
taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"",
b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`))
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
taskDetail := task.PublishDetail{
Detail: detail,
}
@@ -314,10 +327,10 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
for _, source := range sources {
switch s := source.(type) {
case *deb.Snapshot:
snapshotCollection := collectionFactory.SnapshotCollection()
snapshotCollection := taskCollectionFactory.SnapshotCollection()
err = snapshotCollection.LoadComplete(s)
case *deb.LocalRepo:
localCollection := collectionFactory.LocalRepoCollection()
localCollection := taskCollectionFactory.LocalRepoCollection()
err = localCollection.LoadComplete(s)
default:
err = fmt.Errorf("unexpected type for source: %T", source)
@@ -327,13 +340,11 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
}
}
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, multiDist)
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, taskCollectionFactory, multiDist)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
}
resources = append(resources, string(published.Key()))
if b.Origin != "" {
published.Origin = b.Origin
}
@@ -367,18 +378,18 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
published.Version = b.Version
}
duplicate := collection.CheckDuplicate(published)
duplicate := taskCollection.CheckDuplicate(published)
if duplicate != nil {
_ = collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory)
_ = taskCollectionFactory.PublishedRepoCollection().LoadComplete(duplicate, taskCollectionFactory)
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate)
}
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath())
err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
}
err = collection.Add(published)
err = taskCollection.Add(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
@@ -458,6 +469,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
localRepoCollection := collectionFactory.LocalRepoCollection()
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
@@ -465,64 +477,85 @@ func apiPublishUpdateSwitch(c *gin.Context) {
return
}
resources := []string{string(published.Key())}
if published.SourceKind == deb.SourceLocalRepo {
if len(b.Snapshots) > 0 {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo"))
return
}
} else if published.SourceKind == deb.SourceSnapshot {
for _, snapshotInfo := range b.Snapshots {
_, err2 := snapshotCollection.ByName(snapshotInfo.Name)
for _, uuid := range published.Sources {
repo, err2 := localRepoCollection.ByUUID(uuid)
if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(repo.Key()))
}
} else if published.SourceKind == deb.SourceSnapshot {
for _, snapshotInfo := range b.Snapshots {
snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name)
if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(snapshot.ResourceKey()))
}
} else {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type"))
return
}
if b.SkipContents != nil {
published.SkipContents = *b.SkipContents
// Non-MultiDist distributions share a single pool/ directory under the
// prefix. Acquire the prefix-level pool lock so that concurrent updates
// on sibling distributions are serialised and cannot race during cleanup.
if !published.MultiDist {
resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix()))
}
if b.SkipBz2 != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
if b.SignedBy != nil {
published.SignedBy = *b.SignedBy
}
if b.MultiDist != nil {
published.MultiDist = *b.MultiDist
}
if b.Label != nil {
published.Label = *b.Label
}
if b.Origin != nil {
published.Origin = *b.Origin
}
if b.Version != nil {
published.Version = *b.Version
}
resources := []string{string(published.Key())}
// Field mutations and fresh DB load are deferred to inside the task so
// they always operate on a consistent state after the lock is held.
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.LoadComplete(published, collectionFactory)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
// Apply field mutations on the freshly loaded object.
if b.SkipContents != nil {
published.SkipContents = *b.SkipContents
}
if b.SkipBz2 != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
if b.SignedBy != nil {
published.SignedBy = *b.SignedBy
}
if b.MultiDist != nil {
published.MultiDist = *b.MultiDist
}
if b.Label != nil {
published.Label = *b.Label
}
if b.Origin != nil {
published.Origin = *b.Origin
}
if b.Version != nil {
published.Version = *b.Version
}
revision := published.ObtainRevision()
sources := revision.Sources
@@ -534,17 +567,17 @@ func apiPublishUpdateSwitch(c *gin.Context) {
}
}
result, err := published.Update(collectionFactory, out)
result, err := published.Update(taskCollectionFactory, out)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = collection.Update(published)
err = taskCollection.Update(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
@@ -552,7 +585,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
if b.SkipCleanup == nil || !*b.SkipCleanup {
cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources))
cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...)
err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out)
err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
@@ -600,10 +633,19 @@ func apiPublishDrop(c *gin.Context) {
}
resources := []string{string(published.Key())}
// Non-MultiDist distributions share a single pool/ directory under the
// prefix. Acquire the prefix-level pool lock so that a drop cannot race
// with a concurrent update or drop of a sibling distribution during cleanup.
if !published.MultiDist {
resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix()))
}
taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err := collection.Remove(context, storage, prefix, distribution,
collectionFactory, out, force, skipCleanup)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
err := taskCollection.Remove(context, storage, prefix, distribution,
taskCollectionFactory, out, force, skipCleanup)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %s", err)
}
@@ -639,43 +681,52 @@ func apiPublishAddSource(c *gin.Context) {
storage, prefix := deb.ParsePrefix(param)
distribution := slashEscape(c.Params.ByName("distribution"))
if c.Bind(&b) != nil {
return
}
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
// Load shallowly (no LoadComplete) to verify existence and obtain the
// resource key and task name. The actual mutation is performed inside
// the task on a freshly loaded copy to prevent lost-update races.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err))
return
}
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to create: %s", err))
return
}
if c.Bind(&b) != nil {
return
}
revision := published.ObtainRevision()
sources := revision.Sources
component := b.Component
name := b.Name
_, exists := sources[component]
if exists {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("unable to create: Component '%s' already exists", component))
return
}
sources[component] = name
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.Update(published)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to create: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create: %s", err)
}
revision := published.ObtainRevision()
sources := revision.Sources
component := b.Component
name := b.Name
_, exists := sources[component]
if exists {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("unable to create: Component '%s' already exists", component)
}
sources[component] = name
err = taskCollection.Update(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
@@ -757,39 +808,48 @@ func apiPublishSetSources(c *gin.Context) {
storage, prefix := deb.ParsePrefix(param)
distribution := slashEscape(c.Params.ByName("distribution"))
if c.Bind(&b) != nil {
return
}
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and mutation happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
return
}
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
return
}
if c.Bind(&b) != nil {
return
}
revision := published.ObtainRevision()
sources := make(map[string]string, len(b))
revision.Sources = sources
for _, source := range b {
component := source.Component
name := source.Name
sources[component] = name
}
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.Update(published)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
revision := published.ObtainRevision()
sources := make(map[string]string, len(b))
revision.Sources = sources
for _, source := range b {
component := source.Component
name := source.Name
sources[component] = name
}
err = taskCollection.Update(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
@@ -822,24 +882,33 @@ func apiPublishDropChanges(c *gin.Context) {
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and DropRevision happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
return
}
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
return
}
published.DropRevision()
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.Update(published)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err)
}
published.DropRevision()
err = taskCollection.Update(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
@@ -875,51 +944,58 @@ func apiPublishUpdateSource(c *gin.Context) {
param := slashEscape(c.Params.ByName("prefix"))
storage, prefix := deb.ParsePrefix(param)
distribution := slashEscape(c.Params.ByName("distribution"))
component := slashEscape(c.Params.ByName("component"))
urlComponent := slashEscape(c.Params.ByName("component"))
// Default component to the URL path segment; the body may rename it.
b.Component = urlComponent
if c.Bind(&b) != nil {
return
}
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and mutation happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
return
}
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
return
}
revision := published.ObtainRevision()
sources := revision.Sources
_, exists := sources[component]
if !exists {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: Component '%s' does not exist", component))
return
}
b.Component = component
b.Name = revision.Sources[component]
if c.Bind(&b) != nil {
return
}
if b.Component != component {
delete(sources, component)
}
component = b.Component
name := b.Name
sources[component] = name
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.Update(published)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
revision := published.ObtainRevision()
sources := revision.Sources
_, exists := sources[urlComponent]
if !exists {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: Component '%s' does not exist", urlComponent)
}
if b.Component != urlComponent {
delete(sources, urlComponent)
}
newComponent := b.Component
name := b.Name
sources[newComponent] = name
err = taskCollection.Update(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
@@ -956,33 +1032,41 @@ func apiPublishRemoveSource(c *gin.Context) {
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and mutation happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
return
}
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
return
}
revision := published.ObtainRevision()
sources := revision.Sources
_, exists := sources[component]
if !exists {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: Component '%s' does not exist", component))
return
}
delete(sources, component)
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.Update(published)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err)
}
revision := published.ObtainRevision()
sources := revision.Sources
_, exists := sources[component]
if !exists {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: Component '%s' does not exist", component)
}
delete(sources, component)
err = taskCollection.Update(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
@@ -1054,64 +1138,101 @@ func apiPublishUpdate(c *gin.Context) {
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and field mutations happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
return
}
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
return
}
if b.SkipContents != nil {
published.SkipContents = *b.SkipContents
}
if b.SkipBz2 != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
if b.SignedBy != nil {
published.SignedBy = *b.SignedBy
}
if b.MultiDist != nil {
published.MultiDist = *b.MultiDist
}
if b.Label != nil {
published.Label = *b.Label
}
if b.Origin != nil {
published.Origin = *b.Origin
}
if b.Version != nil {
published.Version = *b.Version
}
resources := []string{string(published.Key())}
// Non-MultiDist distributions share a single pool/ directory under the
// prefix. Acquire the prefix-level pool lock so that concurrent updates
// on sibling distributions are serialised and cannot race during cleanup.
if !published.MultiDist {
resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix()))
}
// Lock source repos / snapshots the same way apiPublishUpdateSwitch does,
// because published.Update() reads from them and concurrent modification
// would produce an inconsistent view.
snapshotCollection := collectionFactory.SnapshotCollection()
localRepoCollection := collectionFactory.LocalRepoCollection()
if published.SourceKind == deb.SourceLocalRepo {
for _, uuid := range published.Sources {
repo, err2 := localRepoCollection.ByUUID(uuid)
if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(repo.Key()))
}
} else if published.SourceKind == deb.SourceSnapshot {
for _, uuid := range published.Sources {
snapshot, err2 := snapshotCollection.ByUUID(uuid)
if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(snapshot.ResourceKey()))
}
}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
result, err := published.Update(collectionFactory, out)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = collection.Update(published)
// Apply field mutations on the freshly loaded object.
if b.SkipContents != nil {
published.SkipContents = *b.SkipContents
}
if b.SkipBz2 != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
if b.SignedBy != nil {
published.SignedBy = *b.SignedBy
}
if b.MultiDist != nil {
published.MultiDist = *b.MultiDist
}
if b.Label != nil {
published.Label = *b.Label
}
if b.Origin != nil {
published.Origin = *b.Origin
}
if b.Version != nil {
published.Version = *b.Version
}
result, err := published.Update(taskCollectionFactory, out)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = taskCollection.Update(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
@@ -1119,7 +1240,7 @@ func apiPublishUpdate(c *gin.Context) {
if b.SkipCleanup == nil || !*b.SkipCleanup {
cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources))
cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...)
err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out)
err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
+763
View File
@@ -0,0 +1,763 @@
package api
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"path/filepath"
"sync"
"time"
"github.com/aptly-dev/aptly/aptly"
ctx "github.com/aptly-dev/aptly/context"
"github.com/aptly-dev/aptly/deb"
"github.com/gin-gonic/gin"
"github.com/smira/flag"
. "gopkg.in/check.v1"
)
// PublishedFileMissingSuite reproduces the exact bug where:
// - Package import succeeds
// - Metadata is updated (Packages.gz shows the package)
// - Publish reports success
// - BUT the .deb file is missing from the published pool directory
// - Result: apt-get returns 404 when trying to download the package
type PublishedFileMissingSuite struct {
context *ctx.AptlyContext
flags *flag.FlagSet
configFile *os.File
router http.Handler
tempDir string
poolPath string
publicPath string
}
var _ = Suite(&PublishedFileMissingSuite{})
func (s *PublishedFileMissingSuite) SetUpSuite(c *C) {
aptly.Version = "publishedFileMissingTest"
tempDir, err := os.MkdirTemp("", "aptly-published-missing-test")
c.Assert(err, IsNil)
s.tempDir = tempDir
s.poolPath = filepath.Join(tempDir, "pool")
s.publicPath = filepath.Join(tempDir, "public")
file, err := os.CreateTemp("", "aptly-published-missing-config")
c.Assert(err, IsNil)
s.configFile = file
config := gin.H{
"rootDir": tempDir,
"downloadDir": filepath.Join(tempDir, "download"),
"architectures": []string{"amd64"},
"dependencyFollowSuggests": false,
"dependencyFollowRecommends": false,
"gpgDisableSign": true,
"gpgDisableVerify": true,
"gpgProvider": "internal",
"skipLegacyPool": true,
"enableMetricsEndpoint": false,
}
jsonString, err := json.Marshal(config)
c.Assert(err, IsNil)
_, err = file.Write(jsonString)
c.Assert(err, IsNil)
flags := flag.NewFlagSet("publishedFileMissingTestFlags", flag.ContinueOnError)
flags.Bool("no-lock", true, "disable database locking for test")
flags.Int("db-open-attempts", 3, "dummy")
flags.String("config", s.configFile.Name(), "config file")
flags.String("architectures", "", "dummy")
s.flags = flags
context, err := ctx.NewContext(s.flags)
c.Assert(err, IsNil)
s.context = context
s.router = Router(context)
}
func (s *PublishedFileMissingSuite) TearDownSuite(c *C) {
if s.configFile != nil {
_ = os.Remove(s.configFile.Name())
}
if s.context != nil {
s.context.Shutdown()
}
if s.tempDir != "" {
_ = os.RemoveAll(s.tempDir)
}
}
func (s *PublishedFileMissingSuite) SetUpTest(c *C) {
collectionFactory := s.context.NewCollectionFactory()
localRepoCollection := collectionFactory.LocalRepoCollection()
_ = localRepoCollection.ForEach(func(repo *deb.LocalRepo) error {
_ = localRepoCollection.Drop(repo)
return nil
})
publishedCollection := collectionFactory.PublishedRepoCollection()
_ = publishedCollection.ForEach(func(published *deb.PublishedRepo) error {
_ = publishedCollection.Remove(s.context, published.Storage, published.Prefix,
published.Distribution, collectionFactory, nil, true, true)
return nil
})
}
func (s *PublishedFileMissingSuite) TearDownTest(c *C) {
s.SetUpTest(c)
}
func (s *PublishedFileMissingSuite) httpRequest(c *C, method string, url string, body []byte) *httptest.ResponseRecorder {
w := httptest.NewRecorder()
var req *http.Request
var err error
if body != nil {
req, err = http.NewRequest(method, url, bytes.NewReader(body))
} else {
req, err = http.NewRequest(method, url, nil)
}
c.Assert(err, IsNil)
req.Header.Add("Content-Type", "application/json")
s.router.ServeHTTP(w, req)
return w
}
func (s *PublishedFileMissingSuite) createDebPackage(c *C, uploadID, packageName, version string) {
uploadPath := s.context.UploadPath()
uploadDir := filepath.Join(uploadPath, uploadID)
err := os.MkdirAll(uploadDir, 0755)
c.Assert(err, IsNil)
tempDir, err := os.MkdirTemp("", "deb-build")
c.Assert(err, IsNil)
defer func() { _ = os.RemoveAll(tempDir) }()
debianDir := filepath.Join(tempDir, "DEBIAN")
err = os.MkdirAll(debianDir, 0755)
c.Assert(err, IsNil)
controlContent := fmt.Sprintf(`Package: %s
Version: %s
Section: libs
Priority: optional
Architecture: amd64
Maintainer: Test <test@example.com>
Description: Test package
Test package for published file missing bug.
`, packageName, version)
err = os.WriteFile(filepath.Join(debianDir, "control"), []byte(controlContent), 0644)
c.Assert(err, IsNil)
usrDir := filepath.Join(tempDir, "usr", "lib")
err = os.MkdirAll(usrDir, 0755)
c.Assert(err, IsNil)
err = os.WriteFile(filepath.Join(usrDir, "lib.so"), []byte("library"), 0644)
c.Assert(err, IsNil)
debFile := filepath.Join(uploadDir, fmt.Sprintf("%s_%s_amd64.deb", packageName, version))
cmd := exec.Command("dpkg-deb", "--build", tempDir, debFile)
err = cmd.Run()
c.Assert(err, IsNil)
}
// TestPublishedFileGoMissing reproduces the exact production bug
func (s *PublishedFileMissingSuite) TestPublishedFileGoMissing(c *C) {
c.Log("=== Reproducing: Package in metadata but 404 on download ===")
// Create and publish a repository
repoName := "test-repo"
distribution := "bullseye"
createBody, _ := json.Marshal(gin.H{
"Name": repoName,
"DefaultDistribution": distribution,
"DefaultComponent": "main",
})
resp := s.httpRequest(c, "POST", "/api/repos", createBody)
c.Assert(resp.Code, Equals, 201, Commentf("Failed to create repo: %s", resp.Body.String()))
publishBody, _ := json.Marshal(gin.H{
"SourceKind": "local",
"Distribution": distribution,
"Architectures": []string{"amd64"},
"Sources": []gin.H{
{"Component": "main", "Name": repoName},
},
"Signing": gin.H{"Skip": true},
})
resp = s.httpRequest(c, "POST", "/api/publish/hrt", publishBody)
c.Assert(resp.Code, Equals, 201, Commentf("Failed to publish: %s", resp.Body.String()))
// Create package
packageName := "hrt-libblobbyclient1"
version := "20250926.152427+hrtdeb11"
uploadID := "test-upload-1"
s.createDebPackage(c, uploadID, packageName, version)
// Add package
resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoName, uploadID), nil)
c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package: %s", resp.Body.String()))
// Update publish
updateBody, _ := json.Marshal(gin.H{
"Signing": gin.H{"Skip": true},
"ForceOverwrite": true,
})
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/hrt/%s", distribution), updateBody)
c.Assert(resp.Code, Equals, 200, Commentf("Failed to update publish: %s", resp.Body.String()))
// Now check if the file is actually accessible in the published location
publishedStorage := s.context.GetPublishedStorage("")
publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath()
// Expected file path: hrt/pool/main/h/hrt-libblobbyclient1/hrt-libblobbyclient1_20250926.152427+hrtdeb11_amd64.deb
expectedPath := filepath.Join(publicPath, "hrt", "pool", "main", "h", packageName,
fmt.Sprintf("%s_%s_amd64.deb", packageName, version))
c.Logf("Checking for published file at: %s", expectedPath)
fileInfo, err := os.Stat(expectedPath)
fileExists := err == nil
c.Logf("File exists: %v", fileExists)
if fileExists {
c.Logf("File size: %d bytes", fileInfo.Size())
}
// Check metadata
resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repoName), nil)
var packages []string
err = json.Unmarshal(resp.Body.Bytes(), &packages)
c.Assert(err, IsNil)
c.Logf("Packages in metadata: %d", len(packages))
// THE BUG: Metadata says package exists, but file is missing from published location
if len(packages) > 0 && !fileExists {
c.Logf("★★★ BUG REPRODUCED! ★★★")
c.Logf("Metadata shows %d package(s) but file is missing at: %s", len(packages), expectedPath)
c.Logf("This is exactly what causes: 404 Not Found [IP: 10.20.72.62 3142]")
c.Fatal("BUG CONFIRMED: Package in metadata but missing from published directory!")
}
c.Assert(fileExists, Equals, true, Commentf(
"Published file should exist at %s when package is in metadata", expectedPath))
}
// TestConcurrentPublishRace tries to trigger the race with concurrent publishes
func (s *PublishedFileMissingSuite) TestConcurrentPublishRace(c *C) {
c.Log("=== Testing concurrent publish race condition ===")
const numIterations = 4
for iteration := 0; iteration < numIterations; iteration++ {
c.Logf("--- Iteration %d/%d ---", iteration+1, numIterations)
// Create repo
repoName := fmt.Sprintf("race-repo-%d", iteration)
distribution := fmt.Sprintf("dist-%d", iteration)
createBody, _ := json.Marshal(gin.H{
"Name": repoName,
"DefaultDistribution": distribution,
"DefaultComponent": "main",
})
resp := s.httpRequest(c, "POST", "/api/repos", createBody)
c.Assert(resp.Code, Equals, 201)
publishBody, _ := json.Marshal(gin.H{
"SourceKind": "local",
"Distribution": distribution,
"Architectures": []string{"amd64"},
"Sources": []gin.H{
{"Component": "main", "Name": repoName},
},
"Signing": gin.H{"Skip": true},
})
resp = s.httpRequest(c, "POST", "/api/publish/concurrent", publishBody)
c.Assert(resp.Code, Equals, 201)
// Create multiple packages
var wg sync.WaitGroup
numPackages := 5
for i := 0; i < numPackages; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
packageName := fmt.Sprintf("pkg-%d-%d", iteration, idx)
version := "1.0.0"
uploadID := fmt.Sprintf("upload-%d-%d", iteration, idx)
s.createDebPackage(c, uploadID, packageName, version)
// Add package
resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoName, uploadID), nil)
c.Logf("Package %d add: %d", idx, resp.Code)
// Small delay
time.Sleep(time.Duration(5+idx*2) * time.Millisecond)
// Publish
updateBody, _ := json.Marshal(gin.H{
"Signing": gin.H{"Skip": true},
"ForceOverwrite": true,
})
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/concurrent/%s", distribution), updateBody)
c.Logf("Publish %d: %d", idx, resp.Code)
}(i)
}
wg.Wait()
time.Sleep(100 * time.Millisecond)
// Check all packages
resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repoName), nil)
var packages []string
err := json.Unmarshal(resp.Body.Bytes(), &packages)
c.Assert(err, IsNil)
// Check published files
publishedStorage := s.context.GetPublishedStorage("")
publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath()
missingFiles := []string{}
for i := 0; i < numPackages; i++ {
packageName := fmt.Sprintf("pkg-%d-%d", iteration, i)
version := "1.0.0"
// Calculate pool path
poolSubdir := string(packageName[0])
expectedPath := filepath.Join(publicPath, "concurrent", "pool", "main", poolSubdir, packageName,
fmt.Sprintf("%s_%s_amd64.deb", packageName, version))
if _, err := os.Stat(expectedPath); os.IsNotExist(err) {
missingFiles = append(missingFiles, expectedPath)
}
}
if len(missingFiles) > 0 {
c.Logf("★★★ BUG DETECTED in iteration %d/%d! ★★★", iteration+1, numIterations)
c.Logf("Metadata shows %d packages, but %d files are MISSING:", len(packages), len(missingFiles))
for i, f := range missingFiles {
c.Logf(" [iter %d] File MISSING %d/%d: %s", iteration+1, i+1, len(missingFiles), f)
}
c.Fatalf("BUG REPRODUCED in iteration %d/%d: %d published files missing", iteration+1, numIterations, len(missingFiles))
} else {
c.Logf("[iter %d/%d] All %d files present - OK", iteration+1, numIterations, numPackages)
}
}
c.Logf("All %d iterations passed - bug not reproduced with current timing", numIterations)
}
// TestIdenticalPackageRace tests the specific case of identical SHA256 packages
func (s *PublishedFileMissingSuite) TestIdenticalPackageRace(c *C) {
c.Log("=== AGGRESSIVE test: identical package (same SHA256) race ===")
const numIterations = 4
packageName := "shared-package"
for iter := 0; iter < numIterations; iter++ {
c.Logf("Iteration %d/%d", iter+1, numIterations)
// Create two repos that will get the SAME package (unique per iteration)
repos := []string{fmt.Sprintf("identical-a-%d", iter), fmt.Sprintf("identical-b-%d", iter)}
dists := []string{fmt.Sprintf("dist-a-%d", iter), fmt.Sprintf("dist-b-%d", iter)}
for i := range repos {
createBody, _ := json.Marshal(gin.H{
"Name": repos[i],
"DefaultDistribution": dists[i],
"DefaultComponent": "main",
})
resp := s.httpRequest(c, "POST", "/api/repos", createBody)
c.Assert(resp.Code, Equals, 201)
publishBody, _ := json.Marshal(gin.H{
"SourceKind": "local",
"Distribution": dists[i],
"Architectures": []string{"amd64"},
"Sources": []gin.H{
{"Component": "main", "Name": repos[i]},
},
"Signing": gin.H{"Skip": true},
"SkipBz2": true,
})
resp = s.httpRequest(c, "POST", "/api/publish/identical", publishBody)
c.Assert(resp.Code, Equals, 201)
}
// Create IDENTICAL package file with UNIQUE VERSION per iteration
version := fmt.Sprintf("1.0.%d", iter)
uploadID1 := fmt.Sprintf("identical-upload-1-%d", iter)
uploadID2 := fmt.Sprintf("identical-upload-2-%d", iter)
s.createDebPackage(c, uploadID1, packageName, version)
// Copy to second upload (same SHA256)
uploadPath := s.context.UploadPath()
src := filepath.Join(uploadPath, uploadID1, fmt.Sprintf("%s_%s_amd64.deb", packageName, version))
destDir := filepath.Join(uploadPath, uploadID2)
err := os.MkdirAll(destDir, 0755)
c.Assert(err, IsNil)
dest := filepath.Join(destDir, fmt.Sprintf("%s_%s_amd64.deb", packageName, version))
srcData, readErr := os.ReadFile(src)
c.Assert(readErr, IsNil)
err = os.WriteFile(dest, srcData, 0644)
c.Assert(err, IsNil)
// Race: add and publish both simultaneously
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
//time.Sleep(5 * time.Millisecond)
c.Logf("[iter %d] Import A", iter)
resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[0], uploadID1), nil)
c.Logf("[iter %d] Import A complete: %d", iter, resp.Code)
updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
c.Logf("[iter %d] Publish A", iter)
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody)
c.Logf("[iter %d] Publish A complete: %d", iter, resp.Code)
}()
go func() {
defer wg.Done()
//time.Sleep(7 * time.Millisecond)
c.Logf("[iter %d] Import B", iter)
resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[1], uploadID2), nil)
c.Logf("[iter %d] Import B complete: %d", iter, resp.Code)
updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
c.Logf("[iter %d] Publish B", iter)
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody)
c.Logf("[iter %d] Publish B complete: %d", iter, resp.Code)
}()
//go func() {
//defer wg.Done()
//time.Sleep(15 * time.Millisecond)
//updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
//c.Logf("[iter %d] Publish A", iter)
//resp := s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody)
//c.Logf("[iter %d] Publish A complete: %d", iter, resp.Code)
//}()
//go func() {
//defer wg.Done()
//time.Sleep(18 * time.Millisecond)
//updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
//c.Logf("[iter %d] Publish B", iter)
//resp := s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody)
//c.Logf("[iter %d] Publish B complete: %d", iter, resp.Code)
//}()
wg.Wait()
time.Sleep(200 * time.Millisecond)
c.Logf("[iter %d] All operations complete", iter)
// Check the shared pool location
publishedStorage := s.context.GetPublishedStorage("")
publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath()
poolSubdir := string(packageName[0])
sharedPoolPath := filepath.Join(publicPath, "identical", "pool", "main", poolSubdir, packageName,
fmt.Sprintf("%s_%s_amd64.deb", packageName, version))
fileInfo, err := os.Stat(sharedPoolPath)
fileExists := err == nil
if fileExists {
c.Logf("[iter %d] File EXISTS at %s (size: %d)", iter, sharedPoolPath, fileInfo.Size())
} else {
c.Logf("[iter %d] File MISSING at %s (error: %v)", iter, sharedPoolPath, err)
}
// Check metadata
var packagesA, packagesB []string
resp := s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repos[0]), nil)
err = json.Unmarshal(resp.Body.Bytes(), &packagesA)
c.Assert(err, IsNil)
resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repos[1]), nil)
err = json.Unmarshal(resp.Body.Bytes(), &packagesB)
c.Assert(err, IsNil)
c.Logf("[iter %d] Packages in metadata: A=%d, B=%d", iter, len(packagesA), len(packagesB))
// THE BUG: Both repos show packages in metadata, but the shared pool file is missing
if (len(packagesA) > 0 || len(packagesB) > 0) && !fileExists {
c.Logf("★★★ BUG REPRODUCED in iteration %d! ★★★", iter+1)
c.Logf("Packages in metadata A: %d, B: %d", len(packagesA), len(packagesB))
c.Logf("Shared pool file exists: %v", fileExists)
c.Logf("Pool path: %s", sharedPoolPath)
// List what files ARE in the pool directory
poolDir := filepath.Dir(sharedPoolPath)
if entries, err := os.ReadDir(poolDir); err == nil {
c.Logf("Files in pool directory %s:", poolDir)
for _, entry := range entries {
c.Logf(" - %s", entry.Name())
}
}
c.Fatalf("Metadata shows packages but shared pool file is missing (iteration %d)", iter+1)
}
}
c.Logf("All %d iterations passed - bug not reproduced", numIterations)
}
// TestConcurrentSnapshotPublishToSamePrefix reproduces the EXACT production bug:
// Multiple snapshots are published concurrently to the SAME prefix but different distributions.
// Example from production logs:
// - trixie-pgdg published to "external/postgres-auto/trixie"
// - bullseye-pgdg published to "external/postgres-auto/bullseye"
// Both share the same pool directory, causing cleanup race conditions.
func (s *PublishedFileMissingSuite) TestConcurrentSnapshotPublishToSamePrefix(c *C) {
const numIterations = 4
for iter := 0; iter < numIterations; iter++ {
c.Logf("--- Iteration %d/%d ---", iter+1, numIterations)
// Create two repos with different packages (simulating trixie-pgdg and bullseye-pgdg)
repoTrixie := fmt.Sprintf("trixie-pgdg-%d", iter)
repoBullseye := fmt.Sprintf("bullseye-pgdg-%d", iter)
// Create trixie repo
createBody, _ := json.Marshal(gin.H{
"Name": repoTrixie,
"DefaultDistribution": "trixie",
"DefaultComponent": "main",
})
resp := s.httpRequest(c, "POST", "/api/repos", createBody)
c.Assert(resp.Code, Equals, 201, Commentf("Failed to create trixie repo"))
// Create bullseye repo
createBody, _ = json.Marshal(gin.H{
"Name": repoBullseye,
"DefaultDistribution": "bullseye",
"DefaultComponent": "main",
})
resp = s.httpRequest(c, "POST", "/api/repos", createBody)
c.Assert(resp.Code, Equals, 201, Commentf("Failed to create bullseye repo"))
// Add packages to both repos
numPackages := 3
// Add packages to trixie repo
for i := 0; i < numPackages; i++ {
packageName := fmt.Sprintf("postgresql-17-trixie-pkg%d", i)
version := fmt.Sprintf("17.0.%d", iter)
uploadID := fmt.Sprintf("trixie-upload-%d-%d", iter, i)
s.createDebPackage(c, uploadID, packageName, version)
resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoTrixie, uploadID), nil)
c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package to trixie"))
}
// Add packages to bullseye repo
for i := 0; i < numPackages; i++ {
packageName := fmt.Sprintf("postgresql-17-bullseye-pkg%d", i)
version := fmt.Sprintf("17.0.%d", iter)
uploadID := fmt.Sprintf("bullseye-upload-%d-%d", iter, i)
s.createDebPackage(c, uploadID, packageName, version)
resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoBullseye, uploadID), nil)
c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package to bullseye"))
}
// Create snapshots from both repos
snapshotTrixie := fmt.Sprintf("%s-snap", repoTrixie)
snapshotBullseye := fmt.Sprintf("%s-snap", repoBullseye)
createSnapshotBody, _ := json.Marshal(gin.H{"Name": snapshotTrixie})
resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/snapshots", repoTrixie), createSnapshotBody)
c.Assert(resp.Code, Equals, 201, Commentf("Failed to create trixie snapshot"))
createSnapshotBody, _ = json.Marshal(gin.H{"Name": snapshotBullseye})
resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/snapshots", repoBullseye), createSnapshotBody)
c.Assert(resp.Code, Equals, 201, Commentf("Failed to create bullseye snapshot"))
// Publish both snapshots CONCURRENTLY to the SAME prefix
// This mimics production where both are published to "external/postgres-auto"
// Use the SAME prefix across all iterations to trigger the race more aggressively
sharedPrefix := "postgres-auto"
var wg sync.WaitGroup
var trixiePublishCode, bullseyePublishCode int
wg.Add(2)
// Publish or update trixie snapshot
go func() {
defer wg.Done()
var resp *httptest.ResponseRecorder
if iter == 0 {
// First iteration: CREATE
publishBody, _ := json.Marshal(gin.H{
"SourceKind": "snapshot",
"Distribution": "trixie",
"Architectures": []string{"amd64"},
"Sources": []gin.H{
{"Name": snapshotTrixie},
},
"Signing": gin.H{"Skip": true},
"SkipBz2": true,
"ForceOverwrite": true,
"SkipCleanup": false, // Force cleanup to run
})
resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/publish/%s", sharedPrefix), publishBody)
} else {
// Subsequent iterations: UPDATE (this is what happens in production)
updateBody, _ := json.Marshal(gin.H{
"Snapshots": []gin.H{
{"Component": "main", "Name": snapshotTrixie},
},
"Signing": gin.H{"Skip": true},
"SkipBz2": true,
"ForceOverwrite": true,
"SkipCleanup": false,
})
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/%s/trixie", sharedPrefix), updateBody)
}
trixiePublishCode = resp.Code
c.Logf("[iter %d] Trixie publish/update completed: %d", iter, resp.Code)
}()
// Publish or update bullseye snapshot
go func() {
defer wg.Done()
var resp *httptest.ResponseRecorder
if iter == 0 {
// First iteration: CREATE
publishBody, _ := json.Marshal(gin.H{
"SourceKind": "snapshot",
"Distribution": "bullseye",
"Architectures": []string{"amd64"},
"Sources": []gin.H{
{"Name": snapshotBullseye},
},
"Signing": gin.H{"Skip": true},
"SkipBz2": true,
"ForceOverwrite": true,
"SkipCleanup": false,
})
resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/publish/%s", sharedPrefix), publishBody)
} else {
// Subsequent iterations: UPDATE
updateBody, _ := json.Marshal(gin.H{
"Snapshots": []gin.H{
{"Component": "main", "Name": snapshotBullseye},
},
"Signing": gin.H{"Skip": true},
"SkipBz2": true,
"ForceOverwrite": true,
"SkipCleanup": false,
})
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/%s/bullseye", sharedPrefix), updateBody)
}
bullseyePublishCode = resp.Code
c.Logf("[iter %d] Bullseye publish/update completed: %d", iter, resp.Code)
}()
wg.Wait()
time.Sleep(50 * time.Millisecond)
// Verify publishes succeeded (201 for create, 200 for update)
expectedCode := 201
if iter > 0 {
expectedCode = 200
}
c.Assert(trixiePublishCode, Equals, expectedCode, Commentf("Trixie publish/update should succeed"))
c.Assert(bullseyePublishCode, Equals, expectedCode, Commentf("Bullseye publish/update should succeed"))
// Verify ALL package files exist in the published pool
publishedStorage := s.context.GetPublishedStorage("")
publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath()
missingFiles := []string{}
expectedFiles := []string{}
// Check trixie packages
for i := 0; i < numPackages; i++ {
packageName := fmt.Sprintf("postgresql-17-trixie-pkg%d", i)
version := fmt.Sprintf("17.0.%d", iter)
poolSubdir := string(packageName[0])
expectedPath := filepath.Join(publicPath, sharedPrefix, "pool", "main", poolSubdir, packageName,
fmt.Sprintf("%s_%s_amd64.deb", packageName, version))
expectedFiles = append(expectedFiles, expectedPath)
if _, err := os.Stat(expectedPath); os.IsNotExist(err) {
missingFiles = append(missingFiles, fmt.Sprintf("TRIXIE: %s", filepath.Base(expectedPath)))
}
}
// Check bullseye packages
for i := 0; i < numPackages; i++ {
packageName := fmt.Sprintf("postgresql-17-bullseye-pkg%d", i)
version := fmt.Sprintf("17.0.%d", iter)
poolSubdir := string(packageName[0])
expectedPath := filepath.Join(publicPath, sharedPrefix, "pool", "main", poolSubdir, packageName,
fmt.Sprintf("%s_%s_amd64.deb", packageName, version))
expectedFiles = append(expectedFiles, expectedPath)
if _, err := os.Stat(expectedPath); os.IsNotExist(err) {
missingFiles = append(missingFiles, fmt.Sprintf("BULLSEYE: %s", filepath.Base(expectedPath)))
}
}
// BUG: Files from one distribution are deleted by the other's cleanup
if len(missingFiles) > 0 {
c.Logf("★★★ BUG REPRODUCED in iteration %d/%d! ★★★", iter+1, numIterations)
c.Logf("Both publishes to prefix '%s' succeeded, but %d files are MISSING:", sharedPrefix, len(missingFiles))
for i, f := range missingFiles {
c.Logf(" Missing file %d/%d: %s", i+1, len(missingFiles), f)
}
c.Logf("\nThis reproduces the exact production bug where:")
c.Logf(" 1. Mirror updates complete successfully")
c.Logf(" 2. Snapshots are created")
c.Logf(" 3. Both snapshots publish to same prefix (different distributions)")
c.Logf(" 4. Cleanup from one publish DELETES files from the other")
c.Logf(" 5. Result: apt-get returns 404 when downloading packages")
// List what's actually in the pool
poolDir := filepath.Join(publicPath, sharedPrefix, "pool", "main")
if entries, err := os.ReadDir(poolDir); err == nil {
c.Logf("\nActual pool directory contents (%s):", poolDir)
for _, entry := range entries {
c.Logf(" - %s/", entry.Name())
}
}
c.Fatalf("BUG CONFIRMED (iteration %d/%d): %d files missing from shared pool",
iter+1, numIterations, len(missingFiles))
} else {
c.Logf("[iter %d/%d] All %d files present - OK", iter+1, numIterations, len(expectedFiles))
}
}
c.Logf("✓ All %d iterations passed - no files missing", numIterations)
}
+168 -81
View File
@@ -3,7 +3,6 @@ package api
import (
"fmt"
"net/http"
"net/url"
"os"
"path/filepath"
"sort"
@@ -132,51 +131,74 @@ func apiReposCreate(c *gin.Context) {
return
}
repo := deb.NewLocalRepo(b.Name, b.Comment)
repo.DefaultComponent = b.DefaultComponent
repo.DefaultDistribution = b.DefaultDistribution
// Handler: Pre-task validations (shallow)
collectionFactory := context.NewCollectionFactory()
if b.FromSnapshot != "" {
var snapshot *deb.Snapshot
snapshotCollection := collectionFactory.SnapshotCollection()
snapshot, err := snapshotCollection.ByName(b.FromSnapshot)
_, err := snapshotCollection.ByName(b.FromSnapshot)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("source snapshot not found: %s", err))
return
}
// Just verify it exists - don't load here
}
err = snapshotCollection.LoadComplete(snapshot)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to load source snapshot: %s", err))
return
// Use generated key resource for repo being created
resources := []string{"LocalRepo:" + b.Name}
if b.FromSnapshot != "" {
resources = append(resources, "Snapshot:"+b.FromSnapshot)
}
taskName := fmt.Sprintf("Create repository %s", b.Name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Task: Create fresh collection and check/create ATOMIC inside task
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.LocalRepoCollection()
// Check duplicate inside lock
if _, err := taskCollection.ByName(b.Name); err == nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil},
fmt.Errorf("local repo with name %s already exists", b.Name)
}
repo.UpdateRefList(snapshot.RefList())
}
// Create repo
repo := deb.NewLocalRepo(b.Name, b.Comment)
repo.DefaultComponent = b.DefaultComponent
repo.DefaultDistribution = b.DefaultDistribution
localRepoCollection := collectionFactory.LocalRepoCollection()
if b.FromSnapshot != "" {
snapshotCollection := taskCollectionFactory.SnapshotCollection()
if _, err := localRepoCollection.ByName(b.Name); err == nil {
AbortWithJSONError(c, http.StatusConflict, fmt.Errorf("local repo with name %s already exists", b.Name))
return
}
snapshot, err := snapshotCollection.ByName(b.FromSnapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil},
fmt.Errorf("source snapshot not found: %s", err)
}
err := localRepoCollection.Add(repo)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, err)
return
}
err = snapshotCollection.LoadComplete(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil},
fmt.Errorf("unable to load source snapshot: %s", err)
}
c.JSON(http.StatusCreated, repo)
repo.UpdateRefList(snapshot.RefList())
}
err := taskCollection.Add(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
return &task.ProcessReturnValue{Code: http.StatusCreated, Value: repo}, nil
})
}
type reposEditParams struct {
// Name of repository to modify
Name *string ` json:"Name" example:"new-repo-name"`
Name *string `binding:"required" json:"Name" example:"repo1"`
// Change Comment of repository
Comment *string ` json:"Comment" example:"example repo"`
// Change Default Distribution for publishing
@@ -188,7 +210,7 @@ type reposEditParams struct {
// @Summary Update Repository
// @Description **Update local repository meta information**
// @Tags Repos
// @Param name path string true "Repository name to modify"
// @Param name path string true "Repository name"
// @Consume json
// @Param request body reposEditParams true "Parameters"
// @Produce json
@@ -201,52 +223,60 @@ func apiReposEdit(c *gin.Context) {
if c.Bind(&b) != nil {
return
}
if b.Name != nil {
new_name, err := url.PathUnescape(*b.Name)
if err != nil {
AbortWithJSONError(c, 400, err)
return
}
*b.Name = new_name
}
// Load shallowly for 404 check and resource key.
// Mutation and duplicate check happen inside the task for atomicity.
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
name := c.Params.ByName("name") // repo to modify
name := c.Params.ByName("name")
repo, err := collection.ByName(name)
if err != nil {
AbortWithJSONError(c, 404, err)
return
}
if b.Name != nil && *b.Name != name {
_, err := collection.ByName(*b.Name)
if err == nil {
// already exists
AbortWithJSONError(c, 404, fmt.Errorf("local repo with name %q already exists", *b.Name))
return
resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Edit repository %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Task: Create fresh collection inside task after lock
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.LocalRepoCollection()
// Fresh load after lock acquired
repo, err := taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err
}
repo.Name = *b.Name
}
if b.Comment != nil {
repo.Comment = *b.Comment
}
if b.DefaultDistribution != nil {
repo.DefaultDistribution = *b.DefaultDistribution
}
if b.DefaultComponent != nil {
repo.DefaultComponent = *b.DefaultComponent
}
err = collection.Update(repo)
if err != nil {
AbortWithJSONError(c, 500, err)
return
}
// Check and update ATOMIC (inside lock)
if b.Name != nil && *b.Name != name {
_, err := taskCollection.ByName(*b.Name)
if err == nil {
// already exists
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil},
fmt.Errorf("local repo with name %q already exists", *b.Name)
}
repo.Name = *b.Name
}
if b.Comment != nil {
repo.Comment = *b.Comment
}
if b.DefaultDistribution != nil {
repo.DefaultDistribution = *b.DefaultDistribution
}
if b.DefaultComponent != nil {
repo.DefaultComponent = *b.DefaultComponent
}
c.JSON(200, repo)
err = taskCollection.Update(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
return &task.ProcessReturnValue{Code: http.StatusOK, Value: repo}, nil
})
}
// GET /api/repos/:name
@@ -288,10 +318,10 @@ func apiReposDrop(c *gin.Context) {
force := c.Request.URL.Query().Get("force") == "1"
name := c.Params.ByName("name")
// Load shallowly for 404 check, resource key, and task name.
// Full checks (published/snapshots) happen inside the task.
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
publishedCollection := collectionFactory.PublishedRepoCollection()
repo, err := collection.ByName(name)
if err != nil {
@@ -302,19 +332,32 @@ func apiReposDrop(c *gin.Context) {
resources := []string{string(repo.Key())}
taskName := fmt.Sprintf("Delete repo %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
published := publishedCollection.ByLocalRepo(repo)
// Task: Create fresh collections inside task after lock acquired
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.LocalRepoCollection()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection()
// Re-read repo with fresh collection after lock
repo, err := taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: %s", err)
}
// Check with fresh collections
published := taskPublishedCollection.ByLocalRepo(repo)
if len(published) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo is published")
}
if !force {
snapshots := snapshotCollection.ByLocalRepoSource(repo)
snapshots := taskSnapshotCollection.ByLocalRepoSource(repo)
if len(snapshots) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override")
}
}
return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, collection.Drop(repo)
return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, taskCollection.Drop(repo)
})
}
@@ -371,10 +414,13 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
return
}
// Load shallowly for 404 check and resource key.
// Full load and mutations happen inside the task.
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
repo, err := collection.ByName(c.Params.ByName("name"))
name := c.Params.ByName("name")
repo, err := collection.ByName(name)
if err != nil {
AbortWithJSONError(c, 404, err)
return
@@ -383,13 +429,23 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
resources := []string{string(repo.Key())}
maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.LoadComplete(repo)
// Task: Create fresh factory and collection inside task after lock
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.LocalRepoCollection()
// Fresh load after lock acquired (use captured `name` variable, not gin context)
repo, err := taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err
}
err = taskCollection.LoadComplete(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
out.Printf("Loading packages...\n")
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
list, err := deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -398,7 +454,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
for _, ref := range b.PackageRefs {
var p *deb.Package
p, err = collectionFactory.PackageCollection().ByKey([]byte(ref))
p, err = taskCollectionFactory.PackageCollection().ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("packages %s: %s", ref, err)
@@ -414,7 +470,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
err = collectionFactory.LocalRepoCollection().Update(repo)
err = taskCollection.Update(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
}
@@ -521,6 +577,8 @@ func apiReposPackageFromDir(c *gin.Context) {
return
}
// Load shallowly for 404 check and resource key.
// Full load and mutations happen inside the task.
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
@@ -544,7 +602,17 @@ func apiReposPackageFromDir(c *gin.Context) {
resources := []string{string(repo.Key())}
resources = append(resources, sources...)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.LoadComplete(repo)
// Task: Create fresh factory and collection inside task after lock
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.LocalRepoCollection()
// Fresh load after lock acquired
repo, err := taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = taskCollection.LoadComplete(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -565,13 +633,13 @@ func apiReposPackageFromDir(c *gin.Context) {
packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter)
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
list, err = deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages: %s", err)
}
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
collectionFactory.PackageCollection(), reporter, nil, collectionFactory.ChecksumCollection)
taskCollectionFactory.PackageCollection(), reporter, nil, taskCollectionFactory.ChecksumCollection)
failedFiles = append(failedFiles, failedFiles2...)
processedFiles = append(processedFiles, otherFiles...)
@@ -581,7 +649,7 @@ func apiReposPackageFromDir(c *gin.Context) {
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
err = collectionFactory.LocalRepoCollection().Update(repo)
err = taskCollection.Update(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
}
@@ -660,6 +728,8 @@ func apiReposCopyPackage(c *gin.Context) {
return
}
// Load shallowly for 404 check and resource keys.
// Full load and mutations happen inside the task.
collectionFactory := context.NewCollectionFactory()
dstRepo, err := collectionFactory.LocalRepoCollection().ByName(dstRepoName)
if err != nil {
@@ -683,12 +753,26 @@ func apiReposCopyPackage(c *gin.Context) {
resources := []string{string(dstRepo.Key()), string(srcRepo.Key())}
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
// Task: Create fresh factory and collections inside task after lock
taskCollectionFactory := context.NewCollectionFactory()
// Fresh load of both repos after lock acquired
dstRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(dstRepoName)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
}
err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
srcRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(srcRepoName)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
}
err = taskCollectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
}
err = taskCollectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
}
@@ -701,12 +785,12 @@ func apiReposCopyPackage(c *gin.Context) {
RemovedLines: []string{},
}
dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), collectionFactory.PackageCollection(), context.Progress())
dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in dest: %s", err)
}
srcList, err := deb.NewPackageListFromRefList(srcRefList, collectionFactory.PackageCollection(), context.Progress())
srcList, err := deb.NewPackageListFromRefList(srcRefList, taskCollectionFactory.PackageCollection(), context.Progress())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in src: %s", err)
}
@@ -774,7 +858,7 @@ func apiReposCopyPackage(c *gin.Context) {
} else {
dstRepo.UpdateRefList(deb.NewPackageRefListFromPackageList(dstList))
err = collectionFactory.LocalRepoCollection().Update(dstRepo)
err = taskCollectionFactory.LocalRepoCollection().Update(dstRepo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
}
@@ -877,6 +961,9 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
resources = append(resources, sources...)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
// Task: Create fresh factory and collection inside task after lock
taskCollectionFactory := context.NewCollectionFactory()
var (
err error
verifier = context.GetVerifier()
@@ -892,8 +979,8 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter)
_, failedFiles2, err = deb.ImportChangesFiles(
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
repoTemplate, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(),
context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse)
repoTemplate, context.Progress(), taskCollectionFactory.LocalRepoCollection(), taskCollectionFactory.PackageCollection(),
context.PackagePool(), taskCollectionFactory.ChecksumCollection, nil, query.Parse)
failedFiles = append(failedFiles, failedFiles2...)
if err != nil {
+130 -44
View File
@@ -83,11 +83,9 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) {
}
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.RemoteRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
name := c.Params.ByName("name")
repo, err = collection.ByName(name)
repo, err = collectionFactory.RemoteRepoCollection().ByName(name)
if err != nil {
AbortWithJSONError(c, 404, err)
return
@@ -97,12 +95,21 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) {
resources := []string{string(repo.Key()), "S" + b.Name}
taskName := fmt.Sprintf("Create snapshot of mirror %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err := repo.CheckLock()
taskCollectionFactory := context.NewCollectionFactory()
taskMirrorCollection := taskCollectionFactory.RemoteRepoCollection()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
repo, err := taskMirrorCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = repo.CheckLock()
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, err
}
err = collection.LoadComplete(repo)
err = taskMirrorCollection.LoadComplete(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -116,7 +123,7 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) {
snapshot.Description = b.Description
}
err = snapshotCollection.Add(snapshot)
err = taskSnapshotCollection.Add(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
}
@@ -165,6 +172,7 @@ func apiSnapshotsCreate(c *gin.Context) {
}
}
// Phase 1: Pre-task validation (shallow load for 404 checks only)
collectionFactory := context.NewCollectionFactory()
snapshotCollection := collectionFactory.SnapshotCollection()
var resources []string
@@ -182,8 +190,20 @@ func apiSnapshotsCreate(c *gin.Context) {
}
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
for i := range sources {
err = snapshotCollection.LoadComplete(sources[i])
// Phase 2: Inside task lock - create fresh factory
taskCollectionFactory := context.NewCollectionFactory()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
taskPackageCollection := taskCollectionFactory.PackageCollection()
// Fresh load of all sources after lock acquired
freshSources := make([]*deb.Snapshot, len(b.SourceSnapshots))
for i := range b.SourceSnapshots {
freshSources[i], err = taskSnapshotCollection.ByName(b.SourceSnapshots[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
// LoadComplete on fresh copy
err = taskSnapshotCollection.LoadComplete(freshSources[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -191,9 +211,9 @@ func apiSnapshotsCreate(c *gin.Context) {
list := deb.NewPackageList()
// verify package refs and build package list
// verify package refs and build package list using fresh factory
for _, ref := range b.PackageRefs {
p, err := collectionFactory.PackageCollection().ByKey([]byte(ref))
p, err := taskPackageCollection.ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err)
@@ -206,9 +226,9 @@ func apiSnapshotsCreate(c *gin.Context) {
}
}
snapshot = deb.NewSnapshotFromRefList(b.Name, sources, deb.NewPackageRefListFromPackageList(list), b.Description)
snapshot = deb.NewSnapshotFromRefList(b.Name, freshSources, deb.NewPackageRefListFromPackageList(list), b.Description)
err = snapshotCollection.Add(snapshot)
err = taskSnapshotCollection.Add(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
}
@@ -249,11 +269,9 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) {
}
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
name := c.Params.ByName("name")
repo, err = collection.ByName(name)
repo, err = collectionFactory.LocalRepoCollection().ByName(name)
if err != nil {
AbortWithJSONError(c, 404, err)
return
@@ -263,7 +281,16 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) {
resources := []string{string(repo.Key()), "S" + b.Name}
taskName := fmt.Sprintf("Create snapshot of repo %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err := collection.LoadComplete(repo)
taskCollectionFactory := context.NewCollectionFactory()
taskRepoCollection := taskCollectionFactory.LocalRepoCollection()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
repo, err := taskRepoCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = taskRepoCollection.LoadComplete(repo)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -277,7 +304,7 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) {
snapshot.Description = b.Description
}
err = snapshotCollection.Add(snapshot)
err = taskSnapshotCollection.Add(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err
}
@@ -315,6 +342,7 @@ func apiSnapshotsUpdate(c *gin.Context) {
return
}
// Phase 1: Pre-task validation (shallow load for 404 check only)
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.SnapshotCollection()
name := c.Params.ByName("name")
@@ -325,14 +353,38 @@ func apiSnapshotsUpdate(c *gin.Context) {
return
}
// Pre-task validation of new name if provided (skip if renaming to same name)
if b.Name != "" && b.Name != name {
_, err = collection.ByName(b.Name)
if err == nil {
AbortWithJSONError(c, 409, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name))
return
}
}
resources := []string{string(snapshot.ResourceKey()), "S" + b.Name}
taskName := fmt.Sprintf("Update snapshot %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
_, err := collection.ByName(b.Name)
if err == nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
// Phase 2: Inside task lock - create fresh factory
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.SnapshotCollection()
// Fresh load after lock acquired
snapshot, err = taskCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
// Fresh duplicate check inside lock
if b.Name != "" {
_, err := taskCollection.ByName(b.Name)
if err == nil {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)
}
}
// Update fresh copy
if b.Name != "" {
snapshot.Name = b.Name
}
@@ -341,7 +393,7 @@ func apiSnapshotsUpdate(c *gin.Context) {
snapshot.Description = b.Description
}
err = collectionFactory.SnapshotCollection().Update(snapshot)
err = taskCollection.Update(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -395,9 +447,9 @@ func apiSnapshotsDrop(c *gin.Context) {
name := c.Params.ByName("name")
force := c.Request.URL.Query().Get("force") == "1"
// Phase 1: Pre-task validation (shallow load for 404 check only)
collectionFactory := context.NewCollectionFactory()
snapshotCollection := collectionFactory.SnapshotCollection()
publishedCollection := collectionFactory.PublishedRepoCollection()
snapshot, err := snapshotCollection.ByName(name)
if err != nil {
@@ -407,21 +459,35 @@ func apiSnapshotsDrop(c *gin.Context) {
resources := []string{string(snapshot.ResourceKey())}
taskName := fmt.Sprintf("Delete snapshot %s", name)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
published := publishedCollection.BySnapshot(snapshot)
// Phase 2: Inside task lock - create fresh collections
taskCollectionFactory := context.NewCollectionFactory()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection()
// Fresh load after lock acquired
snapshot, err := taskSnapshotCollection.ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
// Fresh checks with current collections
published := taskPublishedCollection.BySnapshot(snapshot)
if len(published) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: snapshot is published")
}
if !force {
snapshots := snapshotCollection.BySnapshotSource(snapshot)
// Using fresh collection for dependency check
snapshots := taskSnapshotCollection.BySnapshotSource(snapshot)
if len(snapshots) > 0 {
return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override")
}
}
err = snapshotCollection.Drop(snapshot)
err = taskSnapshotCollection.Drop(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -576,6 +642,7 @@ func apiSnapshotsMerge(c *gin.Context) {
return
}
// Phase 1: Pre-task validation (shallow load for 404 checks only)
collectionFactory := context.NewCollectionFactory()
snapshotCollection := collectionFactory.SnapshotCollection()
@@ -592,32 +659,43 @@ func apiSnapshotsMerge(c *gin.Context) {
}
maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = snapshotCollection.LoadComplete(sources[0])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
result := sources[0].RefList()
for i := 1; i < len(sources); i++ {
err = snapshotCollection.LoadComplete(sources[i])
// Phase 2: Inside task lock - create fresh factory
taskCollectionFactory := context.NewCollectionFactory()
taskSnapshotCollection := taskCollectionFactory.SnapshotCollection()
// Fresh load of all sources inside task
freshSources := make([]*deb.Snapshot, len(body.Sources))
for i := range body.Sources {
freshSources[i], err = taskSnapshotCollection.ByName(body.Sources[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
result = result.Merge(sources[i].RefList(), overrideMatching, false)
// LoadComplete on fresh copy
err = taskSnapshotCollection.LoadComplete(freshSources[i])
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
}
// Merge using fresh sources
result := freshSources[0].RefList()
for i := 1; i < len(freshSources); i++ {
result = result.Merge(freshSources[i].RefList(), overrideMatching, false)
}
if latest {
result.FilterLatestRefs()
}
sourceDescription := make([]string, len(sources))
for i, s := range sources {
sourceDescription := make([]string, len(freshSources))
for i, s := range freshSources {
sourceDescription[i] = fmt.Sprintf("'%s'", s.Name)
}
snapshot = deb.NewSnapshotFromRefList(name, sources, result,
snapshot = deb.NewSnapshotFromRefList(name, freshSources, result,
fmt.Sprintf("Merged from sources: %s", strings.Join(sourceDescription, ", ")))
err = collectionFactory.SnapshotCollection().Add(snapshot)
err = taskCollectionFactory.SnapshotCollection().Add(snapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create snapshot: %s", err)
}
@@ -701,21 +779,29 @@ func apiSnapshotsPull(c *gin.Context) {
resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())}
taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collectionFactory.SnapshotCollection().LoadComplete(toSnapshot)
// Phase 2: Inside task lock - create fresh factory
taskCollectionFactory := context.NewCollectionFactory()
// Fresh load of snapshots after lock acquired
freshToSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(name)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = collectionFactory.SnapshotCollection().LoadComplete(sourceSnapshot)
freshSourceSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(body.Source)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
err = taskCollectionFactory.SnapshotCollection().LoadComplete(freshSourceSnapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
// convert snapshots to package list
toPackageList, err := deb.NewPackageListFromRefList(toSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
toPackageList, err := deb.NewPackageListFromRefList(freshToSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
sourcePackageList, err := deb.NewPackageListFromRefList(sourceSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
sourcePackageList, err := deb.NewPackageListFromRefList(freshSourceSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
@@ -812,10 +898,10 @@ func apiSnapshotsPull(c *gin.Context) {
}
// Create <destination> snapshot
destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{toSnapshot, sourceSnapshot}, toPackageList,
fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", toSnapshot.Name, sourceSnapshot.Name, strings.Join(body.Queries, ", ")))
destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{freshToSnapshot, freshSourceSnapshot}, toPackageList,
fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", freshToSnapshot.Name, freshSourceSnapshot.Name, strings.Join(body.Queries, ", ")))
err = collectionFactory.SnapshotCollection().Add(destinationSnapshot)
err = taskCollectionFactory.SnapshotCollection().Add(destinationSnapshot)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
}
+9
View File
@@ -612,6 +612,15 @@ func (p *PublishedRepo) Key() []byte {
return []byte("U" + p.StoragePrefix() + ">>" + p.Distribution)
}
// PrefixPoolLockKey returns the task-queue resource key that serialises all
// publish operations sharing the same pool directory under storagePrefix.
// It must be held whenever a non-MultiDist publish may read or clean the
// shared pool, to prevent concurrent cleanup runs from deleting each other's
// files. See docs/Resource-Locking.md for the full key-namespace table.
func PrefixPoolLockKey(storagePrefix string) string {
return "P" + storagePrefix
}
// RefKey is a unique id for package reference list
func (p *PublishedRepo) RefKey(component string) []byte {
return []byte("E" + p.UUID + component)
+7 -2
View File
@@ -873,7 +873,10 @@ func (s *PublishedRepoCollectionSuite) TestListReferencedFiles(c *C) {
snap3 := NewSnapshotFromRefList("snap3", []*Snapshot{}, s.snap2.RefList(), "desc3")
_ = s.snapshotCollection.Add(snap3)
// Ensure that adding a second publish point with matching files doesn't give duplicate results.
// When a second publish point references the same package (snap3 is a clone of snap2,
// both containing p3/lonely-strangers), listReferencedFilesByComponent deduplicates by
// package ref so the file appears only once. StrSlicesSubstract handles a single entry
// correctly, so no duplicate is needed for cleanup safety.
repo3, err := NewPublishedRepo("", "", "anaconda-2", []string{}, []string{"main"}, []interface{}{snap3}, s.factory, false)
c.Check(err, IsNil)
c.Check(s.collection.Add(repo3), IsNil)
@@ -888,7 +891,9 @@ func (s *PublishedRepoCollectionSuite) TestListReferencedFiles(c *C) {
"a/alien-arena/alien-arena-common_7.40-2_i386.deb",
"a/alien-arena/mars-invaders_7.40-2_i386.deb",
},
"main": {"a/alien-arena/lonely-strangers_7.40-2_i386.deb"},
"main": {
"a/alien-arena/lonely-strangers_7.40-2_i386.deb",
},
})
}
-283
View File
@@ -1,283 +0,0 @@
package files
import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/utils"
. "gopkg.in/check.v1"
)
type LinkFromPoolConcurrencySuite struct {
root string
poolDir string
storage *PublishedStorage
pool *PackagePool
cs aptly.ChecksumStorage
testFile string
testContent []byte
testChecksums utils.ChecksumInfo
srcPoolPath string
}
var _ = Suite(&LinkFromPoolConcurrencySuite{})
func (s *LinkFromPoolConcurrencySuite) SetUpTest(c *C) {
s.root = c.MkDir()
s.poolDir = filepath.Join(s.root, "pool")
publishDir := filepath.Join(s.root, "public")
// Create package pool and published storage
s.pool = NewPackagePool(s.poolDir, true)
s.storage = NewPublishedStorage(publishDir, "copy", "checksum")
s.cs = NewMockChecksumStorage()
// Create test file content
s.testContent = []byte("test package content for concurrency testing")
s.testFile = filepath.Join(s.root, "test-package.deb")
err := os.WriteFile(s.testFile, s.testContent, 0644)
c.Assert(err, IsNil)
// Calculate checksums
md5sum, err := utils.MD5ChecksumForFile(s.testFile)
c.Assert(err, IsNil)
s.testChecksums = utils.ChecksumInfo{
Size: int64(len(s.testContent)),
MD5: md5sum,
}
// Import the test file into the pool
s.srcPoolPath, err = s.pool.Import(s.testFile, "test-package.deb", &s.testChecksums, false, s.cs)
c.Assert(err, IsNil)
}
func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolConcurrency(c *C) {
// Test concurrent LinkFromPool operations to ensure no race conditions
concurrency := 5000
iterations := 10
for iter := 0; iter < iterations; iter++ {
c.Logf("Iteration %d: Testing concurrent LinkFromPool with %d goroutines", iter+1, concurrency)
destPath := fmt.Sprintf("main/t/test%d", iter)
var wg sync.WaitGroup
errors := make(chan error, concurrency)
successes := make(chan struct{}, concurrency)
start := time.Now()
// Launch concurrent LinkFromPool operations
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// Use force=true to test the most vulnerable code path (remove-then-create)
err := s.storage.LinkFromPool(
"", // publishedPrefix
destPath, // publishedRelPath
"test-package.deb", // fileName
s.pool, // sourcePool
s.srcPoolPath, // sourcePath
s.testChecksums, // sourceChecksums
true, // force - this triggers vulnerable remove-then-create pattern
)
if err != nil {
errors <- fmt.Errorf("goroutine %d failed: %v", id, err)
} else {
successes <- struct{}{}
}
}(i)
}
// Wait for completion
wg.Wait()
duration := time.Since(start)
close(errors)
close(successes)
// Count results
errorCount := 0
successCount := 0
var firstError error
for err := range errors {
errorCount++
if firstError == nil {
firstError = err
}
c.Logf("Race condition error: %v", err)
}
for range successes {
successCount++
}
c.Logf("Results: %d successes, %d errors, took %v", successCount, errorCount, duration)
// Assert no race conditions occurred
if errorCount > 0 {
c.Fatalf("Race condition detected in iteration %d! "+
"Errors: %d out of %d operations (%.1f%% failure rate). "+
"First error: %v. "+
"This indicates the fix is not working properly.",
iter+1, errorCount, concurrency,
float64(errorCount)/float64(concurrency)*100, firstError)
}
// Verify the final file exists and has correct content
finalFile := filepath.Join(s.storage.rootPath, destPath, "test-package.deb")
_, err := os.Stat(finalFile)
c.Assert(err, IsNil, Commentf("Final file should exist after concurrent operations"))
content, err := os.ReadFile(finalFile)
c.Assert(err, IsNil, Commentf("Should be able to read final file"))
c.Assert(content, DeepEquals, s.testContent, Commentf("File content should be intact after concurrent operations"))
c.Logf("✓ Iteration %d: No race conditions detected", iter+1)
}
c.Logf("SUCCESS: Handled %d total concurrent operations across %d iterations with no race conditions",
concurrency*iterations, iterations)
}
func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolConcurrencyDifferentFiles(c *C) {
// Test concurrent operations on different files to ensure no blocking
concurrency := 10
var wg sync.WaitGroup
errors := make(chan error, concurrency)
start := time.Now()
// Launch concurrent operations on different destination files
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
destPath := fmt.Sprintf("main/t/test-file-%d", id)
err := s.storage.LinkFromPool(
"", // publishedPrefix
destPath, // publishedRelPath
"test-package.deb", // fileName
s.pool, // sourcePool
s.srcPoolPath, // sourcePath
s.testChecksums, // sourceChecksums
false, // force
)
if err != nil {
errors <- fmt.Errorf("goroutine %d failed: %v", id, err)
}
}(i)
}
// Wait for completion
wg.Wait()
duration := time.Since(start)
close(errors)
// Count errors
errorCount := 0
for err := range errors {
errorCount++
c.Logf("Error: %v", err)
}
c.Assert(errorCount, Equals, 0, Commentf("No errors should occur when linking to different files"))
c.Logf("SUCCESS: %d concurrent operations on different files completed in %v", concurrency, duration)
// Verify all files were created correctly
for i := 0; i < concurrency; i++ {
finalFile := filepath.Join(s.storage.rootPath, fmt.Sprintf("main/t/test-file-%d", i), "test-package.deb")
_, err := os.Stat(finalFile)
c.Assert(err, IsNil, Commentf("File %d should exist", i))
content, err := os.ReadFile(finalFile)
c.Assert(err, IsNil, Commentf("Should be able to read file %d", i))
c.Assert(content, DeepEquals, s.testContent, Commentf("File %d content should be correct", i))
}
}
func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolWithoutForceNoConcurrencyIssues(c *C) {
// Test that when force=false, concurrent operations fail gracefully without corruption
concurrency := 20
destPath := "main/t/single-dest"
var wg sync.WaitGroup
errors := make(chan error, concurrency)
successes := make(chan struct{}, concurrency)
// First, create the file so subsequent operations will conflict
err := s.storage.LinkFromPool("", destPath, "test-package.deb", s.pool, s.srcPoolPath, s.testChecksums, false)
c.Assert(err, IsNil)
start := time.Now()
// Launch concurrent operations that should mostly fail
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
err := s.storage.LinkFromPool(
"", // publishedPrefix
destPath, // publishedRelPath
"test-package.deb", // fileName
s.pool, // sourcePool
s.srcPoolPath, // sourcePath
s.testChecksums, // sourceChecksums
false, // force=false - should fail if file exists and is same
)
if err != nil {
errors <- err
} else {
successes <- struct{}{}
}
}(i)
}
// Wait for completion
wg.Wait()
duration := time.Since(start)
close(errors)
close(successes)
errorCount := 0
successCount := 0
for range errors {
errorCount++
}
for range successes {
successCount++
}
c.Logf("Results with force=false: %d successes, %d errors, took %v", successCount, errorCount, duration)
// With force=false and identical files, operations should succeed (file already exists with same content)
// No race conditions should cause crashes or corruption
c.Assert(errorCount, Equals, 0, Commentf("With identical files and force=false, operations should succeed"))
// Verify the file still exists and has correct content
finalFile := filepath.Join(s.storage.rootPath, destPath, "test-package.deb")
content, err := os.ReadFile(finalFile)
c.Assert(err, IsNil)
c.Assert(content, DeepEquals, s.testContent, Commentf("File should not be corrupted by concurrent access"))
}
-25
View File
@@ -26,26 +26,6 @@ type PublishedStorage struct {
verifyMethod uint
}
// Global mutex map to prevent concurrent access to the same destinationPath in LinkFromPool
var (
fileLockMutex sync.Mutex
fileLocks = make(map[string]*sync.Mutex)
)
// getFileLock returns a mutex for a specific file path to prevent concurrent modifications
func getFileLock(filePath string) *sync.Mutex {
fileLockMutex.Lock()
defer fileLockMutex.Unlock()
if mutex, exists := fileLocks[filePath]; exists {
return mutex
}
mutex := &sync.Mutex{}
fileLocks[filePath] = mutex
return mutex
}
// Check interfaces
var (
_ aptly.PublishedStorage = (*PublishedStorage)(nil)
@@ -172,11 +152,6 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
poolPath := filepath.Join(storage.rootPath, publishedPrefix, publishedRelPath, filepath.Dir(fileName))
destinationPath := filepath.Join(poolPath, baseName)
// Acquire file-specific lock to prevent concurrent access to the same file
fileLock := getFileLock(destinationPath)
fileLock.Lock()
defer fileLock.Unlock()
var localSourcePool aptly.LocalPackagePool
if storage.linkMethod != LinkMethodCopy {
pp, ok := sourcePool.(aptly.LocalPackagePool)
-10
View File
@@ -632,16 +632,6 @@ func (s *DiskFullNoRootSuite) TestLinkFromPoolCopySyncErrorIsReturned(c *C) {
c.Check(strings.Contains(err.Error(), "error syncing file"), Equals, true)
}
func (s *DiskFullNoRootSuite) TestGetFileLockReusesMutex(c *C) {
a := getFileLock(filepath.Join(s.root, "a"))
b := getFileLock(filepath.Join(s.root, "a"))
c.Check(a == b, Equals, true)
c1 := getFileLock(filepath.Join(s.root, "c1"))
c2 := getFileLock(filepath.Join(s.root, "c2"))
c.Check(c1 == c2, Equals, false)
}
func (s *DiskFullNoRootSuite) TestPutFileFailsIfDestinationDirMissing(c *C) {
storage := NewPublishedStorage(s.root, "", "")
+226
View File
@@ -992,6 +992,232 @@ class PublishSwitchAPITestRepo(APITest):
self.check_not_exists("public/" + prefix + "dists/")
class PublishSwitchAPITestMirror(APITest):
"""
PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution
"""
fixtureGpg = True
def check(self):
mirror_name = self.random_name()
mirror_desc = {'Name': mirror_name,
'ArchiveURL': 'http://repo.aptly.info/system-tests/packagecloud.io/varnishcache/varnish30/debian/',
'Distribution': 'wheezy',
'Keyrings': ["aptlytest.gpg"],
'Architectures': ["amd64"],
'Components': ['main']}
mirror_desc['IgnoreSignatures'] = True
# Create Mirror
resp = self.post("/api/mirrors", json=mirror_desc)
self.check_equal(resp.status_code, 201)
# Get Mirror
resp = self.get("/api/mirrors/" + mirror_name + "/packages")
self.check_equal(resp.status_code, 404)
# Update Mirror
resp = self.put_task("/api/mirrors/" + mirror_name, json=mirror_desc)
self.check_task(resp)
# Snapshot Mirror
snapshot1_name = self.random_name()
task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot1_name})
self.check_task(task)
# Publish Snapshot
prefix = self.random_name()
task = self.post_task(
"/api/publish/" + prefix,
json={
"Architectures": ["i386", "source"],
"SourceKind": "snapshot",
"Sources": [{"Name": snapshot1_name}],
"Signing": DefaultSigningOptions,
})
self.check_task(task)
repo_expected = {
'AcquireByHash': False,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': '',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Origin': 'packagecloud.io/varnishcache/varnish30',
'Version': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SignedBy': '',
'SkipContents': False,
'MultiDist': False,
'SourceKind': 'snapshot',
'Sources': [{'Component': 'main', 'Name': snapshot1_name}],
'Storage': '',
'Suite': ''}
all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
# Snapshot Mirror 2
snapshot2_name = self.random_name()
task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot2_name})
self.check_task(task)
task = self.put_task(
"/api/publish/" + prefix + "/wheezy",
json={
"Snapshots": [{"Component": "main", "Name": snapshot2_name}],
"Signing": DefaultSigningOptions,
"SkipContents": True,
"Label": "fun",
"Origin": "earth",
"Version": "13.3",
})
self.check_task(task)
repo_expected = {
'AcquireByHash': False,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': 'fun',
'Origin': 'earth',
'Version': '13.3',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SignedBy': '',
'SkipContents': True,
'MultiDist': False,
'SourceKind': 'snapshot',
'Sources': [{'Component': 'main', 'Name': snapshot2_name}],
'Storage': '',
'Suite': ''}
all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
task = self.delete_task("/api/publish/" + prefix + "/wheezy")
self.check_task(task)
self.check_not_exists("public/" + prefix + "dists/")
class PublishSwitchAPITestSnapshot(APITest):
"""
publish snapshot of snapshot
"""
fixtureGpg = True
def check(self):
repo_name = self.random_name()
self.check_equal(self.post(
"/api/repos", json={"Name": repo_name, "DefaultDistribution": "wheezy"}).status_code, 201)
d = self.random_name()
self.check_equal(
self.upload("/api/files/" + d,
"pyspi_0.6.1-1.3.dsc",
"pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz",
"pyspi-0.6.1-1.3.stripped.dsc").status_code, 200)
task = self.post_task("/api/repos/" + repo_name + "/file/" + d)
self.check_task(task)
snapshot1_name = self.random_name()
task = self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot1_name})
self.check_task(task)
prefix = self.random_name()
task = self.post_task(
"/api/publish/" + prefix,
json={
"Architectures": ["i386", "source"],
"SourceKind": "snapshot",
"Sources": [{"Name": snapshot1_name}],
"Signing": DefaultSigningOptions,
})
self.check_task(task)
repo_expected = {
'AcquireByHash': False,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': '',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Origin': '',
'Version': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SignedBy': '',
'SkipContents': False,
'MultiDist': False,
'SourceKind': 'snapshot',
'Sources': [{'Component': 'main', 'Name': snapshot1_name}],
'Storage': '',
'Suite': ''}
all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
self.check_not_exists(
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
self.check_exists("public/" + prefix +
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
snapshot2_name = self.random_name()
task = self.post_task("/api/snapshots", json={"Name": snapshot2_name, 'SourceSnapshots': [snapshot1_name]})
self.check_task(task)
task = self.put_task(
"/api/publish/" + prefix + "/wheezy",
json={
"Snapshots": [{"Component": "main", "Name": snapshot2_name}],
"Signing": DefaultSigningOptions,
"SkipContents": True,
"Label": "fun",
"Origin": "earth",
"Version": "13.3",
})
self.check_task(task)
repo_expected = {
'AcquireByHash': False,
'Architectures': ['i386', 'source'],
'Codename': '',
'Distribution': 'wheezy',
'Label': 'fun',
'Origin': 'earth',
'Version': '13.3',
'NotAutomatic': '',
'ButAutomaticUpgrades': '',
'Path': prefix + '/' + 'wheezy',
'Prefix': prefix,
'SignedBy': '',
'SkipContents': True,
'MultiDist': False,
'SourceKind': 'snapshot',
'Sources': [{'Component': 'main', 'Name': snapshot2_name}],
'Storage': '',
'Suite': ''}
all_repos = self.get("/api/publish")
self.check_equal(all_repos.status_code, 200)
self.check_in(repo_expected, all_repos.json())
# FIXME: what should exist here ? publish snapshot of snapshot
self.check_not_exists(
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
self.check_not_exists("public/" + prefix +
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
task = self.delete_task("/api/publish/" + prefix + "/wheezy")
self.check_task(task)
self.check_not_exists("public/" + prefix + "dists/")
class PublishSwitchAPITestRepoSignedBy(APITest):
"""
PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution
+1 -1
View File
@@ -3,7 +3,7 @@ import tempfile
class TestOut:
def __init__(self):
self.tmp_file = tempfile.NamedTemporaryFile(delete=True)
self.tmp_file = tempfile.NamedTemporaryFile(delete=False)
self.read_pos = 0
def fileno(self):
+34 -21
View File
@@ -44,25 +44,27 @@ func (list *List) consumer() {
for {
select {
case task := <-list.queue:
// Set task state to RUNNING before processing
list.Lock()
{
task.State = RUNNING
}
task.State = RUNNING
list.Unlock()
go func() {
retValue, err := task.process(aptly.Progress(task.output), task.detail)
// Update task completion state and cleanup with list lock held
list.Lock()
{
task.processReturnValue = retValue
task.err = err
if err != nil {
task.output.Printf("Task failed with error: %v", err)
task.State = FAILED
task.err = err
task.processReturnValue = retValue
} else {
task.output.Print("Task succeeded")
task.State = SUCCEEDED
task.err = nil
task.processReturnValue = retValue
}
list.usedResources.Free(task.resources)
@@ -105,13 +107,15 @@ func (list *List) Stop() {
// GetTasks gets complete list of tasks
func (list *List) GetTasks() []Task {
tasks := []Task{}
list.Lock()
defer list.Unlock()
tasks := []Task{}
for _, task := range list.tasks {
// Copy task while holding list lock
tasks = append(tasks, *task)
}
list.Unlock()
return tasks
}
@@ -139,11 +143,11 @@ func (list *List) DeleteTaskByID(ID int) (Task, error) {
// GetTaskByID returns task with given id
func (list *List) GetTaskByID(ID int) (Task, error) {
list.Lock()
tasks := list.tasks
list.Unlock()
defer list.Unlock()
for _, task := range tasks {
for _, task := range list.tasks {
if task.ID == ID {
// Copy task while holding list lock
return *task, nil
}
}
@@ -180,13 +184,16 @@ func (list *List) GetTaskDetailByID(ID int) (interface{}, error) {
// GetTaskReturnValueByID returns process return value of task with given id
func (list *List) GetTaskReturnValueByID(ID int) (*ProcessReturnValue, error) {
task, err := list.GetTaskByID(ID)
list.Lock()
defer list.Unlock()
if err != nil {
return nil, err
for _, task := range list.tasks {
if task.ID == ID {
return task.processReturnValue, nil
}
}
return task.processReturnValue, nil
return nil, fmt.Errorf("could not find task with id %v", ID)
}
// RunTaskInBackground creates task and runs it in background. This will block until the necessary resources
@@ -204,6 +211,10 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
list.wg.Add(1)
task.wgTask.Add(1)
// Copy task while still holding the lock to avoid racing with consumer
// setting State=RUNNING after receiving from queue
taskCopy := *task
// add task to queue for processing if resources are available
// if not, task will be queued by the consumer once resources are available
tasks := list.usedResources.UsedBy(resources)
@@ -216,12 +227,13 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
list.Unlock()
}
return *task, nil
return taskCopy, nil
}
// Clear removes finished tasks from list
func (list *List) Clear() {
list.Lock()
defer list.Unlock()
var tasks []*Task
for _, task := range list.tasks {
@@ -230,8 +242,6 @@ func (list *List) Clear() {
}
}
list.tasks = tasks
list.Unlock()
}
// Wait waits till all tasks are processed
@@ -254,11 +264,14 @@ func (list *List) WaitForTaskByID(ID int) (Task, error) {
// GetTaskErrorByID returns the Task error for a given id
func (list *List) GetTaskErrorByID(ID int) (error, error) {
task, err := list.GetTaskByID(ID)
list.Lock()
defer list.Unlock()
if err != nil {
return nil, err
for _, task := range list.tasks {
if task.ID == ID {
return task.err, nil
}
}
return task.err, nil
return nil, fmt.Errorf("could not find task with id %v", ID)
}
+1
View File
@@ -42,6 +42,7 @@ const (
)
// Task represents as task in a queue encapsulates process code
// All fields are protected by List.Mutex - access task fields only while holding list.Lock()
type Task struct {
output *Output
detail *Detail