From 6ee51b64547b895ce5d6423dcd013672e6873790 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Wed, 20 May 2026 13:40:37 +0000 Subject: [PATCH 1/6] publish: fix race conditions * remove useless resource lock Resource locks need to be before the background task. creating same publish endpoint at the same time is unlikely... * load data inside background tasks This fixes a flaw in async apis, which loaded the published repo from the DB and mutated it outside the task closure, before the task lock was acquired. Perform collection.LoadComplete inside maybeRunTaskInBackground and have tasks use a fresh copy of taskCollectionFactory, taskCollection * lock source repos/snapshots for publish operations Concurrent tasks were not properly locking their resources, leading to inconsistent published indexes: SourceLocalRepo: iterate published.Sources (component -> source UUID), look up each local repo via localRepoCollection.ByUUID and append string(repo.Key()) to resources SourceSnapshot: iterate b.Snapshots,look up each snapshot via snapshotCollection.ByName and append string(snapshot.ResourceKey()) to resources. * lock pool on non MultiDist publish * revert mutex on LinkFromPool * use uuids, since names can be renamed * add test for MultiDist change --- api/publish.go | 519 ++++++++++------- api/published_file_missing_test.go | 733 +++++++++++++++++++++++++ deb/publish.go | 9 + deb/publish_test.go | 9 +- files/linkfrompool_concurrency_test.go | 283 ---------- files/public.go | 25 - files/public_test.go | 10 - system/t12_api/publish.py | 225 ++++++++ 8 files changed, 1294 insertions(+), 519 deletions(-) create mode 100644 api/published_file_missing_test.go delete mode 100644 files/linkfrompool_concurrency_test.go diff --git a/api/publish.go b/api/publish.go index 67b260d4..5ab7ccdb 100644 --- a/api/publish.go +++ b/api/publish.go @@ -267,7 +267,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { return } - resources = append(resources, string(snapshot.ResourceKey())) + resources = append(resources, string(snapshot.Key())) sources = append(sources, snapshot) } } else if b.SourceKind == deb.SourceLocalRepo { @@ -298,11 +298,24 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { multiDist = *b.MultiDist } - collection := collectionFactory.PublishedRepoCollection() + // Non-MultiDist publishes share a single pool/ directory under the + // prefix. Lock at the prefix level so that concurrent publish/drop + // operations on sibling distributions cannot race during cleanup. + if !multiDist { + storagePrefix := prefix + if storage != "" { + storagePrefix = storage + ":" + prefix + } + + resources = append(resources, deb.PrefixPoolLockKey(storagePrefix)) + } taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"", b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`)) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + taskDetail := task.PublishDetail{ Detail: detail, } @@ -314,10 +327,10 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { for _, source := range sources { switch s := source.(type) { case *deb.Snapshot: - snapshotCollection := collectionFactory.SnapshotCollection() + snapshotCollection := taskCollectionFactory.SnapshotCollection() err = snapshotCollection.LoadComplete(s) case *deb.LocalRepo: - localCollection := collectionFactory.LocalRepoCollection() + localCollection := taskCollectionFactory.LocalRepoCollection() err = localCollection.LoadComplete(s) default: err = fmt.Errorf("unexpected type for source: %T", source) @@ -327,13 +340,11 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { } } - published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, multiDist) + published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, taskCollectionFactory, multiDist) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) } - resources = append(resources, string(published.Key())) - if b.Origin != "" { published.Origin = b.Origin } @@ -367,18 +378,18 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { published.Version = b.Version } - duplicate := collection.CheckDuplicate(published) + duplicate := taskCollection.CheckDuplicate(published) if duplicate != nil { - _ = collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory) + _ = taskCollectionFactory.PublishedRepoCollection().LoadComplete(duplicate, taskCollectionFactory) return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath()) + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) } - err = collection.Add(published) + err = taskCollection.Add(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -458,6 +469,7 @@ func apiPublishUpdateSwitch(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() snapshotCollection := collectionFactory.SnapshotCollection() + localRepoCollection := collectionFactory.LocalRepoCollection() published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { @@ -465,64 +477,85 @@ func apiPublishUpdateSwitch(c *gin.Context) { return } + resources := []string{string(published.Key())} + if published.SourceKind == deb.SourceLocalRepo { if len(b.Snapshots) > 0 { AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo")) return } - } else if published.SourceKind == deb.SourceSnapshot { - for _, snapshotInfo := range b.Snapshots { - _, err2 := snapshotCollection.ByName(snapshotInfo.Name) + for _, uuid := range published.Sources { + repo, err2 := localRepoCollection.ByUUID(uuid) if err2 != nil { AbortWithJSONError(c, http.StatusNotFound, err2) return } + resources = append(resources, string(repo.Key())) + } + } else if published.SourceKind == deb.SourceSnapshot { + for _, snapshotInfo := range b.Snapshots { + snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(snapshot.Key())) } } else { AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type")) return } - if b.SkipContents != nil { - published.SkipContents = *b.SkipContents + // Non-MultiDist distributions share a single pool/ directory under the + // prefix. Acquire the prefix-level pool lock so that concurrent updates + // on sibling distributions are serialised and cannot race during cleanup. + if !published.MultiDist { + resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix())) } - if b.SkipBz2 != nil { - published.SkipBz2 = *b.SkipBz2 - } - - if b.AcquireByHash != nil { - published.AcquireByHash = *b.AcquireByHash - } - - if b.SignedBy != nil { - published.SignedBy = *b.SignedBy - } - - if b.MultiDist != nil { - published.MultiDist = *b.MultiDist - } - - if b.Label != nil { - published.Label = *b.Label - } - - if b.Origin != nil { - published.Origin = *b.Origin - } - - if b.Version != nil { - published.Version = *b.Version - } - - resources := []string{string(published.Key())} + // Field mutations and fresh DB load are deferred to inside the task so + // they always operate on a consistent state after the lock is held. taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.LoadComplete(published, collectionFactory) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + // Apply field mutations on the freshly loaded object. + if b.SkipContents != nil { + published.SkipContents = *b.SkipContents + } + if b.SkipBz2 != nil { + published.SkipBz2 = *b.SkipBz2 + } + if b.AcquireByHash != nil { + published.AcquireByHash = *b.AcquireByHash + } + if b.SignedBy != nil { + published.SignedBy = *b.SignedBy + } + if b.MultiDist != nil { + published.MultiDist = *b.MultiDist + } + if b.Label != nil { + published.Label = *b.Label + } + if b.Origin != nil { + published.Origin = *b.Origin + } + if b.Version != nil { + published.Version = *b.Version + } + revision := published.ObtainRevision() sources := revision.Sources @@ -534,17 +567,17 @@ func apiPublishUpdateSwitch(c *gin.Context) { } } - result, err := published.Update(collectionFactory, out) + result, err := published.Update(taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = collection.Update(published) + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -552,7 +585,7 @@ func apiPublishUpdateSwitch(c *gin.Context) { if b.SkipCleanup == nil || !*b.SkipCleanup { cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources)) cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...) - err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out) + err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } @@ -600,10 +633,19 @@ func apiPublishDrop(c *gin.Context) { } resources := []string{string(published.Key())} + // Non-MultiDist distributions share a single pool/ directory under the + // prefix. Acquire the prefix-level pool lock so that a drop cannot race + // with a concurrent update or drop of a sibling distribution during cleanup. + if !published.MultiDist { + resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix())) + } taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err := collection.Remove(context, storage, prefix, distribution, - collectionFactory, out, force, skipCleanup) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + err := taskCollection.Remove(context, storage, prefix, distribution, + taskCollectionFactory, out, force, skipCleanup) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %s", err) } @@ -639,43 +681,52 @@ func apiPublishAddSource(c *gin.Context) { storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) + if c.Bind(&b) != nil { + return + } + collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly (no LoadComplete) to verify existence and obtain the + // resource key and task name. The actual mutation is performed inside + // the task on a freshly loaded copy to prevent lost-update races. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to create: %s", err)) - return - } - - if c.Bind(&b) != nil { - return - } - - revision := published.ObtainRevision() - sources := revision.Sources - - component := b.Component - name := b.Name - - _, exists := sources[component] - if exists { - AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("unable to create: Component '%s' already exists", component)) - return - } - - sources[component] = name - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to create: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create: %s", err) + } + + revision := published.ObtainRevision() + sources := revision.Sources + + component := b.Component + name := b.Name + + _, exists := sources[component] + if exists { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("unable to create: Component '%s' already exists", component) + } + + sources[component] = name + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -757,39 +808,48 @@ func apiPublishSetSources(c *gin.Context) { storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) + if c.Bind(&b) != nil { + return + } + collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } - - if c.Bind(&b) != nil { - return - } - - revision := published.ObtainRevision() - sources := make(map[string]string, len(b)) - revision.Sources = sources - - for _, source := range b { - component := source.Component - name := source.Name - sources[component] = name - } - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + revision := published.ObtainRevision() + sources := make(map[string]string, len(b)) + revision.Sources = sources + + for _, source := range b { + component := source.Component + name := source.Name + sources[component] = name + } + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -822,24 +882,33 @@ func apiPublishDropChanges(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and DropRevision happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err)) - return - } - - published.DropRevision() - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + published.DropRevision() + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -875,51 +944,58 @@ func apiPublishUpdateSource(c *gin.Context) { param := slashEscape(c.Params.ByName("prefix")) storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) - component := slashEscape(c.Params.ByName("component")) + urlComponent := slashEscape(c.Params.ByName("component")) + + // Default component to the URL path segment; the body may rename it. + b.Component = urlComponent + if c.Bind(&b) != nil { + return + } collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } - - revision := published.ObtainRevision() - sources := revision.Sources - - _, exists := sources[component] - if !exists { - AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: Component '%s' does not exist", component)) - return - } - - b.Component = component - b.Name = revision.Sources[component] - - if c.Bind(&b) != nil { - return - } - - if b.Component != component { - delete(sources, component) - } - - component = b.Component - name := b.Name - sources[component] = name - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + revision := published.ObtainRevision() + sources := revision.Sources + + _, exists := sources[urlComponent] + if !exists { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: Component '%s' does not exist", urlComponent) + } + + if b.Component != urlComponent { + delete(sources, urlComponent) + } + + newComponent := b.Component + name := b.Name + sources[newComponent] = name + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -956,33 +1032,41 @@ func apiPublishRemoveSource(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err)) - return - } - - revision := published.ObtainRevision() - sources := revision.Sources - - _, exists := sources[component] - if !exists { - AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: Component '%s' does not exist", component)) - return - } - - delete(sources, component) - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + revision := published.ObtainRevision() + sources := revision.Sources + + _, exists := sources[component] + if !exists { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: Component '%s' does not exist", component) + } + + delete(sources, component) + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -1054,64 +1138,101 @@ func apiPublishUpdate(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and field mutations happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } - - if b.SkipContents != nil { - published.SkipContents = *b.SkipContents - } - - if b.SkipBz2 != nil { - published.SkipBz2 = *b.SkipBz2 - } - - if b.AcquireByHash != nil { - published.AcquireByHash = *b.AcquireByHash - } - - if b.SignedBy != nil { - published.SignedBy = *b.SignedBy - } - - if b.MultiDist != nil { - published.MultiDist = *b.MultiDist - } - - if b.Label != nil { - published.Label = *b.Label - } - - if b.Origin != nil { - published.Origin = *b.Origin - } - - if b.Version != nil { - published.Version = *b.Version - } - resources := []string{string(published.Key())} + + // Non-MultiDist distributions share a single pool/ directory under the + // prefix. Acquire the prefix-level pool lock so that concurrent updates + // on sibling distributions are serialised and cannot race during cleanup. + if !published.MultiDist { + resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix())) + } + + // Lock source repos / snapshots the same way apiPublishUpdateSwitch does, + // because published.Update() reads from them and concurrent modification + // would produce an inconsistent view. + snapshotCollection := collectionFactory.SnapshotCollection() + localRepoCollection := collectionFactory.LocalRepoCollection() + + if published.SourceKind == deb.SourceLocalRepo { + for _, uuid := range published.Sources { + repo, err2 := localRepoCollection.ByUUID(uuid) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(repo.Key())) + } + } else if published.SourceKind == deb.SourceSnapshot { + for _, uuid := range published.Sources { + snapshot, err2 := snapshotCollection.ByUUID(uuid) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(snapshot.Key())) + } + } + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - result, err := published.Update(collectionFactory, out) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) + err = taskCollection.LoadComplete(published, taskCollectionFactory) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = collection.Update(published) + // Apply field mutations on the freshly loaded object. + if b.SkipContents != nil { + published.SkipContents = *b.SkipContents + } + if b.SkipBz2 != nil { + published.SkipBz2 = *b.SkipBz2 + } + if b.AcquireByHash != nil { + published.AcquireByHash = *b.AcquireByHash + } + if b.SignedBy != nil { + published.SignedBy = *b.SignedBy + } + if b.MultiDist != nil { + published.MultiDist = *b.MultiDist + } + if b.Label != nil { + published.Label = *b.Label + } + if b.Origin != nil { + published.Origin = *b.Origin + } + if b.Version != nil { + published.Version = *b.Version + } + + result, err := published.Update(taskCollectionFactory, out) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -1119,7 +1240,7 @@ func apiPublishUpdate(c *gin.Context) { if b.SkipCleanup == nil || !*b.SkipCleanup { cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources)) cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...) - err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out) + err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } diff --git a/api/published_file_missing_test.go b/api/published_file_missing_test.go new file mode 100644 index 00000000..6d7b77f8 --- /dev/null +++ b/api/published_file_missing_test.go @@ -0,0 +1,733 @@ +package api + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "os" + "os/exec" + "path/filepath" + "sync" + "time" + + "github.com/aptly-dev/aptly/aptly" + ctx "github.com/aptly-dev/aptly/context" + "github.com/aptly-dev/aptly/deb" + "github.com/gin-gonic/gin" + "github.com/smira/flag" + + . "gopkg.in/check.v1" +) + +// PublishedFileMissingSuite reproduces the exact bug where: +// - Package import succeeds +// - Metadata is updated (Packages.gz shows the package) +// - Publish reports success +// - BUT the .deb file is missing from the published pool directory +// - Result: apt-get returns 404 when trying to download the package +type PublishedFileMissingSuite struct { + context *ctx.AptlyContext + flags *flag.FlagSet + configFile *os.File + router http.Handler + tempDir string + poolPath string + publicPath string +} + +var _ = Suite(&PublishedFileMissingSuite{}) + +func (s *PublishedFileMissingSuite) SetUpSuite(c *C) { + aptly.Version = "publishedFileMissingTest" + + tempDir, err := os.MkdirTemp("", "aptly-published-missing-test") + c.Assert(err, IsNil) + s.tempDir = tempDir + s.poolPath = filepath.Join(tempDir, "pool") + s.publicPath = filepath.Join(tempDir, "public") + + file, err := os.CreateTemp("", "aptly-published-missing-config") + c.Assert(err, IsNil) + s.configFile = file + + config := gin.H{ + "rootDir": tempDir, + "downloadDir": filepath.Join(tempDir, "download"), + "architectures": []string{"amd64"}, + "dependencyFollowSuggests": false, + "dependencyFollowRecommends": false, + "gpgDisableSign": true, + "gpgDisableVerify": true, + "gpgProvider": "internal", + "skipLegacyPool": true, + "enableMetricsEndpoint": false, + } + + jsonString, err := json.Marshal(config) + c.Assert(err, IsNil) + _, err = file.Write(jsonString) + c.Assert(err, IsNil) + + flags := flag.NewFlagSet("publishedFileMissingTestFlags", flag.ContinueOnError) + flags.Bool("no-lock", true, "disable database locking for test") + flags.Int("db-open-attempts", 3, "dummy") + flags.String("config", s.configFile.Name(), "config file") + flags.String("architectures", "", "dummy") + s.flags = flags + + context, err := ctx.NewContext(s.flags) + c.Assert(err, IsNil) + + s.context = context + s.router = Router(context) +} + +func (s *PublishedFileMissingSuite) TearDownSuite(c *C) { + if s.configFile != nil { + _ = os.Remove(s.configFile.Name()) + } + if s.context != nil { + s.context.Shutdown() + } + if s.tempDir != "" { + _ = os.RemoveAll(s.tempDir) + } +} + +func (s *PublishedFileMissingSuite) SetUpTest(c *C) { + collectionFactory := s.context.NewCollectionFactory() + + localRepoCollection := collectionFactory.LocalRepoCollection() + _ = localRepoCollection.ForEach(func(repo *deb.LocalRepo) error { + _ = localRepoCollection.Drop(repo) + return nil + }) + + publishedCollection := collectionFactory.PublishedRepoCollection() + _ = publishedCollection.ForEach(func(published *deb.PublishedRepo) error { + _ = publishedCollection.Remove(s.context, published.Storage, published.Prefix, + published.Distribution, collectionFactory, nil, true, true) + return nil + }) +} + +func (s *PublishedFileMissingSuite) TearDownTest(c *C) { + s.SetUpTest(c) +} + +func (s *PublishedFileMissingSuite) httpRequest(c *C, method string, url string, body []byte) *httptest.ResponseRecorder { + w := httptest.NewRecorder() + var req *http.Request + var err error + + if body != nil { + req, err = http.NewRequest(method, url, bytes.NewReader(body)) + } else { + req, err = http.NewRequest(method, url, nil) + } + c.Assert(err, IsNil) + req.Header.Add("Content-Type", "application/json") + s.router.ServeHTTP(w, req) + return w +} + +func (s *PublishedFileMissingSuite) createDebPackage(c *C, uploadID, packageName, version string) { + uploadPath := s.context.UploadPath() + uploadDir := filepath.Join(uploadPath, uploadID) + err := os.MkdirAll(uploadDir, 0755) + c.Assert(err, IsNil) + + tempDir, err := os.MkdirTemp("", "deb-build") + c.Assert(err, IsNil) + defer func() { _ = os.RemoveAll(tempDir) }() + + debianDir := filepath.Join(tempDir, "DEBIAN") + err = os.MkdirAll(debianDir, 0755) + c.Assert(err, IsNil) + + controlContent := fmt.Sprintf(`Package: %s +Version: %s +Section: libs +Priority: optional +Architecture: amd64 +Maintainer: Test +Description: Test package + Test package for published file missing bug. +`, packageName, version) + + err = os.WriteFile(filepath.Join(debianDir, "control"), []byte(controlContent), 0644) + c.Assert(err, IsNil) + + usrDir := filepath.Join(tempDir, "usr", "lib") + err = os.MkdirAll(usrDir, 0755) + c.Assert(err, IsNil) + err = os.WriteFile(filepath.Join(usrDir, "lib.so"), []byte("library"), 0644) + c.Assert(err, IsNil) + + debFile := filepath.Join(uploadDir, fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + cmd := exec.Command("dpkg-deb", "--build", tempDir, debFile) + err = cmd.Run() + c.Assert(err, IsNil) +} + +// TestPublishedFileGoMissing reproduces the exact production bug +func (s *PublishedFileMissingSuite) TestPublishedFileGoMissing(c *C) { + c.Log("=== Reproducing: Package in metadata but 404 on download ===") + + // Create and publish a repository + repoName := "test-repo" + distribution := "bullseye" + + createBody, _ := json.Marshal(gin.H{ + "Name": repoName, + "DefaultDistribution": distribution, + "DefaultComponent": "main", + }) + resp := s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create repo: %s", resp.Body.String())) + + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "local", + "Distribution": distribution, + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Component": "main", "Name": repoName}, + }, + "Signing": gin.H{"Skip": true}, + }) + resp = s.httpRequest(c, "POST", "/api/publish/hrt", publishBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to publish: %s", resp.Body.String())) + + // Create package + packageName := "hrt-libblobbyclient1" + version := "20250926.152427+hrtdeb11" + uploadID := "test-upload-1" + + s.createDebPackage(c, uploadID, packageName, version) + + // Add package + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoName, uploadID), nil) + c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package: %s", resp.Body.String())) + + // Update publish + updateBody, _ := json.Marshal(gin.H{ + "Signing": gin.H{"Skip": true}, + "ForceOverwrite": true, + }) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/hrt/%s", distribution), updateBody) + c.Assert(resp.Code, Equals, 200, Commentf("Failed to update publish: %s", resp.Body.String())) + + // Now check if the file is actually accessible in the published location + publishedStorage := s.context.GetPublishedStorage("") + publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath() + + // Expected file path: hrt/pool/main/h/hrt-libblobbyclient1/hrt-libblobbyclient1_20250926.152427+hrtdeb11_amd64.deb + expectedPath := filepath.Join(publicPath, "hrt", "pool", "main", "h", packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + c.Logf("Checking for published file at: %s", expectedPath) + + fileInfo, err := os.Stat(expectedPath) + fileExists := err == nil + + c.Logf("File exists: %v", fileExists) + if fileExists { + c.Logf("File size: %d bytes", fileInfo.Size()) + } + + // Check metadata + resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repoName), nil) + var packages []string + err = json.Unmarshal(resp.Body.Bytes(), &packages) + c.Assert(err, IsNil) + c.Logf("Packages in metadata: %d", len(packages)) + + // THE BUG: Metadata says package exists, but file is missing from published location + if len(packages) > 0 && !fileExists { + c.Logf("★★★ BUG REPRODUCED! ★★★") + c.Logf("Metadata shows %d package(s) but file is missing at: %s", len(packages), expectedPath) + c.Logf("This is exactly what causes: 404 Not Found [IP: 10.20.72.62 3142]") + + c.Fatal("BUG CONFIRMED: Package in metadata but missing from published directory!") + } + + c.Assert(fileExists, Equals, true, Commentf( + "Published file should exist at %s when package is in metadata", expectedPath)) +} + +// TestConcurrentPublishRace tries to trigger the race with concurrent publishes +func (s *PublishedFileMissingSuite) TestConcurrentPublishRace(c *C) { + c.Log("=== Testing concurrent publish race condition ===") + + const numIterations = 4 + + for iteration := 0; iteration < numIterations; iteration++ { + c.Logf("--- Iteration %d/%d ---", iteration+1, numIterations) + + // Create repo + repoName := fmt.Sprintf("race-repo-%d", iteration) + distribution := fmt.Sprintf("dist-%d", iteration) + + createBody, _ := json.Marshal(gin.H{ + "Name": repoName, + "DefaultDistribution": distribution, + "DefaultComponent": "main", + }) + resp := s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201) + + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "local", + "Distribution": distribution, + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Component": "main", "Name": repoName}, + }, + "Signing": gin.H{"Skip": true}, + }) + resp = s.httpRequest(c, "POST", "/api/publish/concurrent", publishBody) + c.Assert(resp.Code, Equals, 201) + + // Create multiple packages + var wg sync.WaitGroup + numPackages := 5 + + for i := 0; i < numPackages; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + + packageName := fmt.Sprintf("pkg-%d-%d", iteration, idx) + version := "1.0.0" + uploadID := fmt.Sprintf("upload-%d-%d", iteration, idx) + + s.createDebPackage(c, uploadID, packageName, version) + + // Add package + resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoName, uploadID), nil) + c.Logf("Package %d add: %d", idx, resp.Code) + + // Small delay + time.Sleep(time.Duration(5+idx*2) * time.Millisecond) + + // Publish + updateBody, _ := json.Marshal(gin.H{ + "Signing": gin.H{"Skip": true}, + "ForceOverwrite": true, + }) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/concurrent/%s", distribution), updateBody) + c.Logf("Publish %d: %d", idx, resp.Code) + }(i) + } + + wg.Wait() + time.Sleep(100 * time.Millisecond) + + // Check all packages + resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repoName), nil) + var packages []string + err := json.Unmarshal(resp.Body.Bytes(), &packages) + c.Assert(err, IsNil) + + // Check published files + publishedStorage := s.context.GetPublishedStorage("") + publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath() + + missingFiles := []string{} + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("pkg-%d-%d", iteration, i) + version := "1.0.0" + + // Calculate pool path + poolSubdir := string(packageName[0]) + expectedPath := filepath.Join(publicPath, "concurrent", "pool", "main", poolSubdir, packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + if _, err := os.Stat(expectedPath); os.IsNotExist(err) { + missingFiles = append(missingFiles, expectedPath) + } + } + + if len(missingFiles) > 0 { + c.Logf("★★★ BUG DETECTED in iteration %d/%d! ★★★", iteration+1, numIterations) + c.Logf("Metadata shows %d packages, but %d files are MISSING:", len(packages), len(missingFiles)) + for i, f := range missingFiles { + c.Logf(" [iter %d] File MISSING %d/%d: %s", iteration+1, i+1, len(missingFiles), f) + } + + c.Fatalf("BUG REPRODUCED in iteration %d/%d: %d published files missing", iteration+1, numIterations, len(missingFiles)) + } else { + c.Logf("[iter %d/%d] All %d files present - OK", iteration+1, numIterations, numPackages) + } + } + + c.Logf("All %d iterations passed - bug not reproduced with current timing", numIterations) +} + +// TestIdenticalPackageRace tests the specific case of identical SHA256 packages +func (s *PublishedFileMissingSuite) TestIdenticalPackageRace(c *C) { + c.Log("=== AGGRESSIVE test: identical package (same SHA256) race ===") + + const numIterations = 4 + packageName := "shared-package" + + for iter := 0; iter < numIterations; iter++ { + c.Logf("Iteration %d/%d", iter+1, numIterations) + + // Create two repos that will get the SAME package (unique per iteration) + repos := []string{fmt.Sprintf("identical-a-%d", iter), fmt.Sprintf("identical-b-%d", iter)} + dists := []string{fmt.Sprintf("dist-a-%d", iter), fmt.Sprintf("dist-b-%d", iter)} + + for i := range repos { + createBody, _ := json.Marshal(gin.H{ + "Name": repos[i], + "DefaultDistribution": dists[i], + "DefaultComponent": "main", + }) + resp := s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201) + + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "local", + "Distribution": dists[i], + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Component": "main", "Name": repos[i]}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + }) + resp = s.httpRequest(c, "POST", "/api/publish/identical", publishBody) + c.Assert(resp.Code, Equals, 201) + } + + // Create IDENTICAL package file with UNIQUE VERSION per iteration + version := fmt.Sprintf("1.0.%d", iter) + uploadID1 := fmt.Sprintf("identical-upload-1-%d", iter) + uploadID2 := fmt.Sprintf("identical-upload-2-%d", iter) + + s.createDebPackage(c, uploadID1, packageName, version) + + // Copy to second upload (same SHA256) + uploadPath := s.context.UploadPath() + src := filepath.Join(uploadPath, uploadID1, fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + destDir := filepath.Join(uploadPath, uploadID2) + err := os.MkdirAll(destDir, 0755) + c.Assert(err, IsNil) + dest := filepath.Join(destDir, fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + srcData, readErr := os.ReadFile(src) + c.Assert(readErr, IsNil) + err = os.WriteFile(dest, srcData, 0644) + c.Assert(err, IsNil) + + // Race: add and publish both simultaneously + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[0], uploadID1), nil) + updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true}) + s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody) + }() + + go func() { + defer wg.Done() + s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[1], uploadID2), nil) + updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true}) + s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody) + }() + + wg.Wait() + time.Sleep(200 * time.Millisecond) + c.Logf("[iter %d] All operations complete", iter) + + // Check the shared pool location + publishedStorage := s.context.GetPublishedStorage("") + publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath() + + poolSubdir := string(packageName[0]) + sharedPoolPath := filepath.Join(publicPath, "identical", "pool", "main", poolSubdir, packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + fileInfo, err := os.Stat(sharedPoolPath) + fileExists := err == nil + + if fileExists { + c.Logf("[iter %d] File EXISTS at %s (size: %d)", iter, sharedPoolPath, fileInfo.Size()) + } else { + c.Logf("[iter %d] File MISSING at %s (error: %v)", iter, sharedPoolPath, err) + } + + // Check metadata + var packagesA, packagesB []string + resp := s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repos[0]), nil) + err = json.Unmarshal(resp.Body.Bytes(), &packagesA) + c.Assert(err, IsNil) + resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repos[1]), nil) + err = json.Unmarshal(resp.Body.Bytes(), &packagesB) + c.Assert(err, IsNil) + + c.Logf("[iter %d] Packages in metadata: A=%d, B=%d", iter, len(packagesA), len(packagesB)) + + // THE BUG: Both repos show packages in metadata, but the shared pool file is missing + if (len(packagesA) > 0 || len(packagesB) > 0) && !fileExists { + c.Logf("★★★ BUG REPRODUCED in iteration %d! ★★★", iter+1) + c.Logf("Packages in metadata A: %d, B: %d", len(packagesA), len(packagesB)) + c.Logf("Shared pool file exists: %v", fileExists) + c.Logf("Pool path: %s", sharedPoolPath) + + // List what files ARE in the pool directory + poolDir := filepath.Dir(sharedPoolPath) + if entries, err := os.ReadDir(poolDir); err == nil { + c.Logf("Files in pool directory %s:", poolDir) + for _, entry := range entries { + c.Logf(" - %s", entry.Name()) + } + } + + c.Fatalf("Metadata shows packages but shared pool file is missing (iteration %d)", iter+1) + } + } + + c.Logf("All %d iterations passed - bug not reproduced", numIterations) +} + +// TestConcurrentSnapshotPublishToSamePrefix reproduces the EXACT production bug: +// Multiple snapshots are published concurrently to the SAME prefix but different distributions. +// Example from production logs: +// - trixie-pgdg published to "external/postgres-auto/trixie" +// - bullseye-pgdg published to "external/postgres-auto/bullseye" +// Both share the same pool directory, causing cleanup race conditions. +func (s *PublishedFileMissingSuite) TestConcurrentSnapshotPublishToSamePrefix(c *C) { + const numIterations = 4 + + for iter := 0; iter < numIterations; iter++ { + c.Logf("--- Iteration %d/%d ---", iter+1, numIterations) + + // Create two repos with different packages (simulating trixie-pgdg and bullseye-pgdg) + repoTrixie := fmt.Sprintf("trixie-pgdg-%d", iter) + repoBullseye := fmt.Sprintf("bullseye-pgdg-%d", iter) + + // Create trixie repo + createBody, _ := json.Marshal(gin.H{ + "Name": repoTrixie, + "DefaultDistribution": "trixie", + "DefaultComponent": "main", + }) + resp := s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create trixie repo")) + + // Create bullseye repo + createBody, _ = json.Marshal(gin.H{ + "Name": repoBullseye, + "DefaultDistribution": "bullseye", + "DefaultComponent": "main", + }) + resp = s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create bullseye repo")) + + // Add packages to both repos + numPackages := 3 + + // Add packages to trixie repo + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("postgresql-17-trixie-pkg%d", i) + version := fmt.Sprintf("17.0.%d", iter) + uploadID := fmt.Sprintf("trixie-upload-%d-%d", iter, i) + + s.createDebPackage(c, uploadID, packageName, version) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoTrixie, uploadID), nil) + c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package to trixie")) + } + + // Add packages to bullseye repo + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("postgresql-17-bullseye-pkg%d", i) + version := fmt.Sprintf("17.0.%d", iter) + uploadID := fmt.Sprintf("bullseye-upload-%d-%d", iter, i) + + s.createDebPackage(c, uploadID, packageName, version) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoBullseye, uploadID), nil) + c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package to bullseye")) + } + + // Create snapshots from both repos + snapshotTrixie := fmt.Sprintf("%s-snap", repoTrixie) + snapshotBullseye := fmt.Sprintf("%s-snap", repoBullseye) + + createSnapshotBody, _ := json.Marshal(gin.H{"Name": snapshotTrixie}) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/snapshots", repoTrixie), createSnapshotBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create trixie snapshot")) + + createSnapshotBody, _ = json.Marshal(gin.H{"Name": snapshotBullseye}) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/snapshots", repoBullseye), createSnapshotBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create bullseye snapshot")) + + // Publish both snapshots CONCURRENTLY to the SAME prefix + // This mimics production where both are published to "external/postgres-auto" + // Use the SAME prefix across all iterations to trigger the race more aggressively + sharedPrefix := "postgres-auto" + + var wg sync.WaitGroup + var trixiePublishCode, bullseyePublishCode int + + wg.Add(2) + + // Publish or update trixie snapshot + go func() { + defer wg.Done() + + var resp *httptest.ResponseRecorder + if iter == 0 { + // First iteration: CREATE + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "snapshot", + "Distribution": "trixie", + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Name": snapshotTrixie}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + "ForceOverwrite": true, + "SkipCleanup": false, // Force cleanup to run + }) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/publish/%s", sharedPrefix), publishBody) + } else { + // Subsequent iterations: UPDATE (this is what happens in production) + updateBody, _ := json.Marshal(gin.H{ + "Snapshots": []gin.H{ + {"Component": "main", "Name": snapshotTrixie}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + "ForceOverwrite": true, + "SkipCleanup": false, + }) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/%s/trixie", sharedPrefix), updateBody) + } + trixiePublishCode = resp.Code + c.Logf("[iter %d] Trixie publish/update completed: %d", iter, resp.Code) + }() + + // Publish or update bullseye snapshot + go func() { + defer wg.Done() + + var resp *httptest.ResponseRecorder + if iter == 0 { + // First iteration: CREATE + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "snapshot", + "Distribution": "bullseye", + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Name": snapshotBullseye}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + "ForceOverwrite": true, + "SkipCleanup": false, + }) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/publish/%s", sharedPrefix), publishBody) + } else { + // Subsequent iterations: UPDATE + updateBody, _ := json.Marshal(gin.H{ + "Snapshots": []gin.H{ + {"Component": "main", "Name": snapshotBullseye}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + "ForceOverwrite": true, + "SkipCleanup": false, + }) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/%s/bullseye", sharedPrefix), updateBody) + } + bullseyePublishCode = resp.Code + c.Logf("[iter %d] Bullseye publish/update completed: %d", iter, resp.Code) + }() + + wg.Wait() + time.Sleep(50 * time.Millisecond) + + // Verify publishes succeeded (201 for create, 200 for update) + expectedCode := 201 + if iter > 0 { + expectedCode = 200 + } + c.Assert(trixiePublishCode, Equals, expectedCode, Commentf("Trixie publish/update should succeed")) + c.Assert(bullseyePublishCode, Equals, expectedCode, Commentf("Bullseye publish/update should succeed")) + + // Verify ALL package files exist in the published pool + publishedStorage := s.context.GetPublishedStorage("") + publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath() + + missingFiles := []string{} + expectedFiles := []string{} + + // Check trixie packages + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("postgresql-17-trixie-pkg%d", i) + version := fmt.Sprintf("17.0.%d", iter) + + poolSubdir := string(packageName[0]) + expectedPath := filepath.Join(publicPath, sharedPrefix, "pool", "main", poolSubdir, packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + expectedFiles = append(expectedFiles, expectedPath) + if _, err := os.Stat(expectedPath); os.IsNotExist(err) { + missingFiles = append(missingFiles, fmt.Sprintf("TRIXIE: %s", filepath.Base(expectedPath))) + } + } + + // Check bullseye packages + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("postgresql-17-bullseye-pkg%d", i) + version := fmt.Sprintf("17.0.%d", iter) + + poolSubdir := string(packageName[0]) + expectedPath := filepath.Join(publicPath, sharedPrefix, "pool", "main", poolSubdir, packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + expectedFiles = append(expectedFiles, expectedPath) + if _, err := os.Stat(expectedPath); os.IsNotExist(err) { + missingFiles = append(missingFiles, fmt.Sprintf("BULLSEYE: %s", filepath.Base(expectedPath))) + } + } + + // BUG: Files from one distribution are deleted by the other's cleanup + if len(missingFiles) > 0 { + c.Logf("★★★ BUG REPRODUCED in iteration %d/%d! ★★★", iter+1, numIterations) + c.Logf("Both publishes to prefix '%s' succeeded, but %d files are MISSING:", sharedPrefix, len(missingFiles)) + for i, f := range missingFiles { + c.Logf(" Missing file %d/%d: %s", i+1, len(missingFiles), f) + } + + c.Logf("\nThis reproduces the exact production bug where:") + c.Logf(" 1. Mirror updates complete successfully") + c.Logf(" 2. Snapshots are created") + c.Logf(" 3. Both snapshots publish to same prefix (different distributions)") + c.Logf(" 4. Cleanup from one publish DELETES files from the other") + c.Logf(" 5. Result: apt-get returns 404 when downloading packages") + + // List what's actually in the pool + poolDir := filepath.Join(publicPath, sharedPrefix, "pool", "main") + if entries, err := os.ReadDir(poolDir); err == nil { + c.Logf("\nActual pool directory contents (%s):", poolDir) + for _, entry := range entries { + c.Logf(" - %s/", entry.Name()) + } + } + + c.Fatalf("BUG CONFIRMED (iteration %d/%d): %d files missing from shared pool", + iter+1, numIterations, len(missingFiles)) + } else { + c.Logf("[iter %d/%d] All %d files present - OK", iter+1, numIterations, len(expectedFiles)) + } + } + c.Logf("✓ All %d iterations passed - no files missing", numIterations) +} diff --git a/deb/publish.go b/deb/publish.go index cc4c2a0f..dbaa81a7 100644 --- a/deb/publish.go +++ b/deb/publish.go @@ -612,6 +612,15 @@ func (p *PublishedRepo) Key() []byte { return []byte("U" + p.StoragePrefix() + ">>" + p.Distribution) } +// PrefixPoolLockKey returns the task-queue resource key that serialises all +// publish operations sharing the same pool directory under storagePrefix. +// It must be held whenever a non-MultiDist publish may read or clean the +// shared pool, to prevent concurrent cleanup runs from deleting each other's +// files. See docs/Resource-Locking.md for the full key-namespace table. +func PrefixPoolLockKey(storagePrefix string) string { + return "P" + storagePrefix +} + // RefKey is a unique id for package reference list func (p *PublishedRepo) RefKey(component string) []byte { return []byte("E" + p.UUID + component) diff --git a/deb/publish_test.go b/deb/publish_test.go index 125397b2..933f9ef6 100644 --- a/deb/publish_test.go +++ b/deb/publish_test.go @@ -873,7 +873,10 @@ func (s *PublishedRepoCollectionSuite) TestListReferencedFiles(c *C) { snap3 := NewSnapshotFromRefList("snap3", []*Snapshot{}, s.snap2.RefList(), "desc3") _ = s.snapshotCollection.Add(snap3) - // Ensure that adding a second publish point with matching files doesn't give duplicate results. + // When a second publish point references the same package (snap3 is a clone of snap2, + // both containing p3/lonely-strangers), listReferencedFilesByComponent deduplicates by + // package ref so the file appears only once. StrSlicesSubstract handles a single entry + // correctly, so no duplicate is needed for cleanup safety. repo3, err := NewPublishedRepo("", "", "anaconda-2", []string{}, []string{"main"}, []interface{}{snap3}, s.factory, false) c.Check(err, IsNil) c.Check(s.collection.Add(repo3), IsNil) @@ -888,7 +891,9 @@ func (s *PublishedRepoCollectionSuite) TestListReferencedFiles(c *C) { "a/alien-arena/alien-arena-common_7.40-2_i386.deb", "a/alien-arena/mars-invaders_7.40-2_i386.deb", }, - "main": {"a/alien-arena/lonely-strangers_7.40-2_i386.deb"}, + "main": { + "a/alien-arena/lonely-strangers_7.40-2_i386.deb", + }, }) } diff --git a/files/linkfrompool_concurrency_test.go b/files/linkfrompool_concurrency_test.go deleted file mode 100644 index 0acbab3f..00000000 --- a/files/linkfrompool_concurrency_test.go +++ /dev/null @@ -1,283 +0,0 @@ -package files - -import ( - "fmt" - "os" - "path/filepath" - "sync" - "time" - - "github.com/aptly-dev/aptly/aptly" - "github.com/aptly-dev/aptly/utils" - - . "gopkg.in/check.v1" -) - -type LinkFromPoolConcurrencySuite struct { - root string - poolDir string - storage *PublishedStorage - pool *PackagePool - cs aptly.ChecksumStorage - testFile string - testContent []byte - testChecksums utils.ChecksumInfo - srcPoolPath string -} - -var _ = Suite(&LinkFromPoolConcurrencySuite{}) - -func (s *LinkFromPoolConcurrencySuite) SetUpTest(c *C) { - s.root = c.MkDir() - s.poolDir = filepath.Join(s.root, "pool") - publishDir := filepath.Join(s.root, "public") - - // Create package pool and published storage - s.pool = NewPackagePool(s.poolDir, true) - s.storage = NewPublishedStorage(publishDir, "copy", "checksum") - s.cs = NewMockChecksumStorage() - - // Create test file content - s.testContent = []byte("test package content for concurrency testing") - s.testFile = filepath.Join(s.root, "test-package.deb") - - err := os.WriteFile(s.testFile, s.testContent, 0644) - c.Assert(err, IsNil) - - // Calculate checksums - md5sum, err := utils.MD5ChecksumForFile(s.testFile) - c.Assert(err, IsNil) - - s.testChecksums = utils.ChecksumInfo{ - Size: int64(len(s.testContent)), - MD5: md5sum, - } - - // Import the test file into the pool - s.srcPoolPath, err = s.pool.Import(s.testFile, "test-package.deb", &s.testChecksums, false, s.cs) - c.Assert(err, IsNil) -} - -func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolConcurrency(c *C) { - // Test concurrent LinkFromPool operations to ensure no race conditions - concurrency := 5000 - iterations := 10 - - for iter := 0; iter < iterations; iter++ { - c.Logf("Iteration %d: Testing concurrent LinkFromPool with %d goroutines", iter+1, concurrency) - - destPath := fmt.Sprintf("main/t/test%d", iter) - - var wg sync.WaitGroup - errors := make(chan error, concurrency) - successes := make(chan struct{}, concurrency) - - start := time.Now() - - // Launch concurrent LinkFromPool operations - for i := 0; i < concurrency; i++ { - wg.Add(1) - go func(id int) { - defer wg.Done() - - // Use force=true to test the most vulnerable code path (remove-then-create) - err := s.storage.LinkFromPool( - "", // publishedPrefix - destPath, // publishedRelPath - "test-package.deb", // fileName - s.pool, // sourcePool - s.srcPoolPath, // sourcePath - s.testChecksums, // sourceChecksums - true, // force - this triggers vulnerable remove-then-create pattern - ) - - if err != nil { - errors <- fmt.Errorf("goroutine %d failed: %v", id, err) - } else { - successes <- struct{}{} - } - }(i) - } - - // Wait for completion - wg.Wait() - duration := time.Since(start) - - close(errors) - close(successes) - - // Count results - errorCount := 0 - successCount := 0 - var firstError error - - for err := range errors { - errorCount++ - if firstError == nil { - firstError = err - } - c.Logf("Race condition error: %v", err) - } - - for range successes { - successCount++ - } - - c.Logf("Results: %d successes, %d errors, took %v", successCount, errorCount, duration) - - // Assert no race conditions occurred - if errorCount > 0 { - c.Fatalf("Race condition detected in iteration %d! "+ - "Errors: %d out of %d operations (%.1f%% failure rate). "+ - "First error: %v. "+ - "This indicates the fix is not working properly.", - iter+1, errorCount, concurrency, - float64(errorCount)/float64(concurrency)*100, firstError) - } - - // Verify the final file exists and has correct content - finalFile := filepath.Join(s.storage.rootPath, destPath, "test-package.deb") - _, err := os.Stat(finalFile) - c.Assert(err, IsNil, Commentf("Final file should exist after concurrent operations")) - - content, err := os.ReadFile(finalFile) - c.Assert(err, IsNil, Commentf("Should be able to read final file")) - c.Assert(content, DeepEquals, s.testContent, Commentf("File content should be intact after concurrent operations")) - - c.Logf("✓ Iteration %d: No race conditions detected", iter+1) - } - - c.Logf("SUCCESS: Handled %d total concurrent operations across %d iterations with no race conditions", - concurrency*iterations, iterations) -} - -func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolConcurrencyDifferentFiles(c *C) { - // Test concurrent operations on different files to ensure no blocking - concurrency := 10 - - var wg sync.WaitGroup - errors := make(chan error, concurrency) - - start := time.Now() - - // Launch concurrent operations on different destination files - for i := 0; i < concurrency; i++ { - wg.Add(1) - go func(id int) { - defer wg.Done() - - destPath := fmt.Sprintf("main/t/test-file-%d", id) - - err := s.storage.LinkFromPool( - "", // publishedPrefix - destPath, // publishedRelPath - "test-package.deb", // fileName - s.pool, // sourcePool - s.srcPoolPath, // sourcePath - s.testChecksums, // sourceChecksums - false, // force - ) - - if err != nil { - errors <- fmt.Errorf("goroutine %d failed: %v", id, err) - } - }(i) - } - - // Wait for completion - wg.Wait() - duration := time.Since(start) - - close(errors) - - // Count errors - errorCount := 0 - for err := range errors { - errorCount++ - c.Logf("Error: %v", err) - } - - c.Assert(errorCount, Equals, 0, Commentf("No errors should occur when linking to different files")) - c.Logf("SUCCESS: %d concurrent operations on different files completed in %v", concurrency, duration) - - // Verify all files were created correctly - for i := 0; i < concurrency; i++ { - finalFile := filepath.Join(s.storage.rootPath, fmt.Sprintf("main/t/test-file-%d", i), "test-package.deb") - _, err := os.Stat(finalFile) - c.Assert(err, IsNil, Commentf("File %d should exist", i)) - - content, err := os.ReadFile(finalFile) - c.Assert(err, IsNil, Commentf("Should be able to read file %d", i)) - c.Assert(content, DeepEquals, s.testContent, Commentf("File %d content should be correct", i)) - } -} - -func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolWithoutForceNoConcurrencyIssues(c *C) { - // Test that when force=false, concurrent operations fail gracefully without corruption - concurrency := 20 - destPath := "main/t/single-dest" - - var wg sync.WaitGroup - errors := make(chan error, concurrency) - successes := make(chan struct{}, concurrency) - - // First, create the file so subsequent operations will conflict - err := s.storage.LinkFromPool("", destPath, "test-package.deb", s.pool, s.srcPoolPath, s.testChecksums, false) - c.Assert(err, IsNil) - - start := time.Now() - - // Launch concurrent operations that should mostly fail - for i := 0; i < concurrency; i++ { - wg.Add(1) - go func(id int) { - defer wg.Done() - - err := s.storage.LinkFromPool( - "", // publishedPrefix - destPath, // publishedRelPath - "test-package.deb", // fileName - s.pool, // sourcePool - s.srcPoolPath, // sourcePath - s.testChecksums, // sourceChecksums - false, // force=false - should fail if file exists and is same - ) - - if err != nil { - errors <- err - } else { - successes <- struct{}{} - } - }(i) - } - - // Wait for completion - wg.Wait() - duration := time.Since(start) - - close(errors) - close(successes) - - errorCount := 0 - successCount := 0 - - for range errors { - errorCount++ - } - - for range successes { - successCount++ - } - - c.Logf("Results with force=false: %d successes, %d errors, took %v", successCount, errorCount, duration) - - // With force=false and identical files, operations should succeed (file already exists with same content) - // No race conditions should cause crashes or corruption - c.Assert(errorCount, Equals, 0, Commentf("With identical files and force=false, operations should succeed")) - - // Verify the file still exists and has correct content - finalFile := filepath.Join(s.storage.rootPath, destPath, "test-package.deb") - content, err := os.ReadFile(finalFile) - c.Assert(err, IsNil) - c.Assert(content, DeepEquals, s.testContent, Commentf("File should not be corrupted by concurrent access")) -} diff --git a/files/public.go b/files/public.go index a2d3f815..dea35ea8 100644 --- a/files/public.go +++ b/files/public.go @@ -26,26 +26,6 @@ type PublishedStorage struct { verifyMethod uint } -// Global mutex map to prevent concurrent access to the same destinationPath in LinkFromPool -var ( - fileLockMutex sync.Mutex - fileLocks = make(map[string]*sync.Mutex) -) - -// getFileLock returns a mutex for a specific file path to prevent concurrent modifications -func getFileLock(filePath string) *sync.Mutex { - fileLockMutex.Lock() - defer fileLockMutex.Unlock() - - if mutex, exists := fileLocks[filePath]; exists { - return mutex - } - - mutex := &sync.Mutex{} - fileLocks[filePath] = mutex - return mutex -} - // Check interfaces var ( _ aptly.PublishedStorage = (*PublishedStorage)(nil) @@ -172,11 +152,6 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath, poolPath := filepath.Join(storage.rootPath, publishedPrefix, publishedRelPath, filepath.Dir(fileName)) destinationPath := filepath.Join(poolPath, baseName) - // Acquire file-specific lock to prevent concurrent access to the same file - fileLock := getFileLock(destinationPath) - fileLock.Lock() - defer fileLock.Unlock() - var localSourcePool aptly.LocalPackagePool if storage.linkMethod != LinkMethodCopy { pp, ok := sourcePool.(aptly.LocalPackagePool) diff --git a/files/public_test.go b/files/public_test.go index 058bd8c4..9f9198f3 100644 --- a/files/public_test.go +++ b/files/public_test.go @@ -632,16 +632,6 @@ func (s *DiskFullNoRootSuite) TestLinkFromPoolCopySyncErrorIsReturned(c *C) { c.Check(strings.Contains(err.Error(), "error syncing file"), Equals, true) } -func (s *DiskFullNoRootSuite) TestGetFileLockReusesMutex(c *C) { - a := getFileLock(filepath.Join(s.root, "a")) - b := getFileLock(filepath.Join(s.root, "a")) - c.Check(a == b, Equals, true) - - c1 := getFileLock(filepath.Join(s.root, "c1")) - c2 := getFileLock(filepath.Join(s.root, "c2")) - c.Check(c1 == c2, Equals, false) -} - func (s *DiskFullNoRootSuite) TestPutFileFailsIfDestinationDirMissing(c *C) { storage := NewPublishedStorage(s.root, "", "") diff --git a/system/t12_api/publish.py b/system/t12_api/publish.py index 964a71a9..cd92f775 100644 --- a/system/t12_api/publish.py +++ b/system/t12_api/publish.py @@ -992,6 +992,231 @@ class PublishSwitchAPITestRepo(APITest): self.check_not_exists("public/" + prefix + "dists/") +class PublishSwitchAPITestMirror(APITest): + """ + PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution + """ + fixtureGpg = True + + def check(self): + mirror_name = self.random_name() + mirror_desc = {'Name': mirror_name, + 'ArchiveURL': 'http://repo.aptly.info/system-tests/packagecloud.io/varnishcache/varnish30/debian/', + 'Distribution': 'wheezy', + 'Keyrings': ["aptlytest.gpg"], + 'Architectures': ["amd64"], + 'Components': ['main']} + mirror_desc['IgnoreSignatures'] = True + + # Create Mirror + resp = self.post("/api/mirrors", json=mirror_desc) + self.check_equal(resp.status_code, 201) + + # Get Mirror + resp = self.get("/api/mirrors/" + mirror_name + "/packages") + self.check_equal(resp.status_code, 404) + + # Update Mirror + resp = self.put_task("/api/mirrors/" + mirror_name, json=mirror_desc) + self.check_task(resp) + + # Snapshot Mirror + snapshot1_name = self.random_name() + task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot1_name}) + self.check_task(task) + + # Publish Snapshot + prefix = self.random_name() + task = self.post_task( + "/api/publish/" + prefix, + json={ + "Architectures": ["i386", "source"], + "SourceKind": "snapshot", + "Sources": [{"Name": snapshot1_name}], + "Signing": DefaultSigningOptions, + }) + self.check_task(task) + + repo_expected = { + 'AcquireByHash': False, + 'Architectures': ['i386', 'source'], + 'Codename': '', + 'Distribution': 'wheezy', + 'Label': '', + 'NotAutomatic': '', + 'ButAutomaticUpgrades': '', + 'Origin': 'packagecloud.io/varnishcache/varnish30', + 'Version': '', + 'Path': prefix + '/' + 'wheezy', + 'Prefix': prefix, + 'SignedBy': '', + 'SkipContents': False, + 'MultiDist': False, + 'SourceKind': 'snapshot', + 'Sources': [{'Component': 'main', 'Name': snapshot1_name}], + 'Storage': '', + 'Suite': ''} + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) + + # Snapshot Mirror 2 + snapshot2_name = self.random_name() + task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot2_name}) + self.check_task(task) + + task = self.put_task( + "/api/publish/" + prefix + "/wheezy", + json={ + "Snapshots": [{"Component": "main", "Name": snapshot2_name}], + "Signing": DefaultSigningOptions, + "SkipContents": True, + "Label": "fun", + "Origin": "earth", + "Version": "13.3", + }) + self.check_task(task) + repo_expected = { + 'AcquireByHash': False, + 'Architectures': ['i386', 'source'], + 'Codename': '', + 'Distribution': 'wheezy', + 'Label': 'fun', + 'Origin': 'earth', + 'Version': '13.3', + 'NotAutomatic': '', + 'ButAutomaticUpgrades': '', + 'Path': prefix + '/' + 'wheezy', + 'Prefix': prefix, + 'SignedBy': '', + 'SkipContents': True, + 'MultiDist': False, + 'SourceKind': 'snapshot', + 'Sources': [{'Component': 'main', 'Name': snapshot2_name}], + 'Storage': '', + 'Suite': ''} + + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) + + task = self.delete_task("/api/publish/" + prefix + "/wheezy") + self.check_task(task) + self.check_not_exists("public/" + prefix + "dists/") + + +class PublishSwitchAPITestSnapshot(APITest): + """ + publish snapshot of snapshot + """ + fixtureGpg = True + + def check(self): + repo_name = self.random_name() + self.check_equal(self.post( + "/api/repos", json={"Name": repo_name, "DefaultDistribution": "wheezy"}).status_code, 201) + + d = self.random_name() + self.check_equal( + self.upload("/api/files/" + d, + "pyspi_0.6.1-1.3.dsc", + "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz", + "pyspi-0.6.1-1.3.stripped.dsc").status_code, 200) + task = self.post_task("/api/repos/" + repo_name + "/file/" + d) + self.check_task(task) + + snapshot1_name = self.random_name() + task = self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot1_name}) + self.check_task(task) + + prefix = self.random_name() + task = self.post_task( + "/api/publish/" + prefix, + json={ + "Architectures": ["i386", "source"], + "SourceKind": "snapshot", + "Sources": [{"Name": snapshot1_name}], + "Signing": DefaultSigningOptions, + }) + self.check_task(task) + + repo_expected = { + 'AcquireByHash': False, + 'Architectures': ['i386', 'source'], + 'Codename': '', + 'Distribution': 'wheezy', + 'Label': '', + 'NotAutomatic': '', + 'ButAutomaticUpgrades': '', + 'Origin': '', + 'Version': '', + 'Path': prefix + '/' + 'wheezy', + 'Prefix': prefix, + 'SignedBy': '', + 'SkipContents': False, + 'MultiDist': False, + 'SourceKind': 'snapshot', + 'Sources': [{'Component': 'main', 'Name': snapshot1_name}], + 'Storage': '', + 'Suite': ''} + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) + + self.check_not_exists( + "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + self.check_exists("public/" + prefix + + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") + + snapshot2_name = self.random_name() + task = self.post_task("/api/snapshots", json={"Name": snapshot2_name, 'SourceSnapshots': [snapshot1_name]}) + self.check_task(task) + + task = self.put_task( + "/api/publish/" + prefix + "/wheezy", + json={ + "Snapshots": [{"Component": "main", "Name": snapshot2_name}], + "Signing": DefaultSigningOptions, + "SkipContents": True, + "Label": "fun", + "Origin": "earth", + "Version": "13.3", + }) + self.check_task(task) + repo_expected = { + 'AcquireByHash': False, + 'Architectures': ['i386', 'source'], + 'Codename': '', + 'Distribution': 'wheezy', + 'Label': 'fun', + 'Origin': 'earth', + 'Version': '13.3', + 'NotAutomatic': '', + 'ButAutomaticUpgrades': '', + 'Path': prefix + '/' + 'wheezy', + 'Prefix': prefix, + 'SignedBy': '', + 'SkipContents': True, + 'MultiDist': False, + 'SourceKind': 'snapshot', + 'Sources': [{'Component': 'main', 'Name': snapshot2_name}], + 'Storage': '', + 'Suite': ''} + + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) + + self.check_not_exists( + "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + self.check_exists("public/" + prefix + + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") + + task = self.delete_task("/api/publish/" + prefix + "/wheezy") + self.check_task(task) + self.check_not_exists("public/" + prefix + "dists/") + + class PublishSwitchAPITestRepoSignedBy(APITest): """ PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution From 9771f228bc263e5003d95456ce0fbfbdb0829aa0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Mon, 25 May 2026 12:01:49 +0000 Subject: [PATCH 2/6] repos: fix race conditions * load data inside background tasks Perform collection.LoadComplete inside maybeRunTaskInBackground Have tasks use a fresh copy of taskCollectionFactory, taskCollection * use uuids, since names can be renamed --- api/repos.go | 231 ++++++++++++++++++++++++++++------------ system/t12_api/repos.py | 31 ++++++ 2 files changed, 195 insertions(+), 67 deletions(-) diff --git a/api/repos.go b/api/repos.go index 1d6dfae3..fb002039 100644 --- a/api/repos.go +++ b/api/repos.go @@ -131,46 +131,62 @@ func apiReposCreate(c *gin.Context) { return } - repo := deb.NewLocalRepo(b.Name, b.Comment) - repo.DefaultComponent = b.DefaultComponent - repo.DefaultDistribution = b.DefaultDistribution - + // Handler: Pre-task validations (shallow) collectionFactory := context.NewCollectionFactory() + var resources []string if b.FromSnapshot != "" { - var snapshot *deb.Snapshot - - snapshotCollection := collectionFactory.SnapshotCollection() - - snapshot, err := snapshotCollection.ByName(b.FromSnapshot) + snapshot, err := collectionFactory.SnapshotCollection().ByName(b.FromSnapshot) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("source snapshot not found: %s", err)) return } + resources = append(resources, string(snapshot.Key())) + } - err = snapshotCollection.LoadComplete(snapshot) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to load source snapshot: %s", err)) - return + taskName := fmt.Sprintf("Create repository %s", b.Name) + + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + // Task: Create fresh collection and check/create ATOMIC inside task + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Check duplicate inside lock + if _, err := taskCollection.ByName(b.Name); err == nil { + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, + fmt.Errorf("local repo with name %s already exists", b.Name) } - repo.UpdateRefList(snapshot.RefList()) - } + // Create repo + repo := deb.NewLocalRepo(b.Name, b.Comment) + repo.DefaultComponent = b.DefaultComponent + repo.DefaultDistribution = b.DefaultDistribution - localRepoCollection := collectionFactory.LocalRepoCollection() + if b.FromSnapshot != "" { + snapshotCollection := taskCollectionFactory.SnapshotCollection() - if _, err := localRepoCollection.ByName(b.Name); err == nil { - AbortWithJSONError(c, http.StatusConflict, fmt.Errorf("local repo with name %s already exists", b.Name)) - return - } + snapshot, err := snapshotCollection.ByName(b.FromSnapshot) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, + fmt.Errorf("source snapshot not found: %s", err) + } - err := localRepoCollection.Add(repo) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, err) - return - } + err = snapshotCollection.LoadComplete(snapshot) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, + fmt.Errorf("unable to load source snapshot: %s", err) + } - c.JSON(http.StatusCreated, repo) + repo.UpdateRefList(snapshot.RefList()) + } + + err := taskCollection.Add(repo) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + return &task.ProcessReturnValue{Code: http.StatusCreated, Value: repo}, nil + }) } type reposEditParams struct { @@ -201,6 +217,8 @@ func apiReposEdit(c *gin.Context) { return } + // Load shallowly for 404 check and resource key. + // Mutation and duplicate check happen inside the task for atomicity. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() @@ -212,31 +230,53 @@ func apiReposEdit(c *gin.Context) { } if b.Name != nil && *b.Name != name { - _, err := collection.ByName(*b.Name) - if err == nil { - // already exists - AbortWithJSONError(c, 404, fmt.Errorf("local repo with name %q already exists", *b.Name)) + if _, err = collection.ByName(*b.Name); err == nil { + AbortWithJSONError(c, 409, fmt.Errorf("unable to rename: local repo %q already exists", *b.Name)) return } - repo.Name = *b.Name - } - if b.Comment != nil { - repo.Comment = *b.Comment - } - if b.DefaultDistribution != nil { - repo.DefaultDistribution = *b.DefaultDistribution - } - if b.DefaultComponent != nil { - repo.DefaultComponent = *b.DefaultComponent } - err = collection.Update(repo) - if err != nil { - AbortWithJSONError(c, 500, err) - return - } + resources := []string{string(repo.Key())} + taskName := fmt.Sprintf("Edit repository %s", name) - c.JSON(200, repo) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + // Task: Create fresh collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Fresh load after lock acquired + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err + } + + // Check and update ATOMIC (inside lock) + if b.Name != nil && *b.Name != name { + _, err := taskCollection.ByName(*b.Name) + if err == nil { + // already exists + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, + fmt.Errorf("local repo with name %q already exists", *b.Name) + } + repo.Name = *b.Name + } + if b.Comment != nil { + repo.Comment = *b.Comment + } + if b.DefaultDistribution != nil { + repo.DefaultDistribution = *b.DefaultDistribution + } + if b.DefaultComponent != nil { + repo.DefaultComponent = *b.DefaultComponent + } + + err = taskCollection.Update(repo) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + return &task.ProcessReturnValue{Code: http.StatusOK, Value: repo}, nil + }) } // GET /api/repos/:name @@ -278,10 +318,10 @@ func apiReposDrop(c *gin.Context) { force := c.Request.URL.Query().Get("force") == "1" name := c.Params.ByName("name") + // Load shallowly for 404 check, resource key, and task name. + // Full checks (published/snapshots) happen inside the task. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() - snapshotCollection := collectionFactory.SnapshotCollection() - publishedCollection := collectionFactory.PublishedRepoCollection() repo, err := collection.ByName(name) if err != nil { @@ -292,19 +332,32 @@ func apiReposDrop(c *gin.Context) { resources := []string{string(repo.Key())} taskName := fmt.Sprintf("Delete repo %s", name) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - published := publishedCollection.ByLocalRepo(repo) + // Task: Create fresh collections inside task after lock acquired + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection() + + // Re-read repo with fresh collection after lock + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: %s", err) + } + + // Check with fresh collections + published := taskPublishedCollection.ByLocalRepo(repo) if len(published) > 0 { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo is published") } if !force { - snapshots := snapshotCollection.ByLocalRepoSource(repo) + snapshots := taskSnapshotCollection.ByLocalRepoSource(repo) if len(snapshots) > 0 { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override") } } - return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, collection.Drop(repo) + return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, taskCollection.Drop(repo) }) } @@ -361,10 +414,13 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li return } + // Load shallowly for 404 check and resource key. + // Full load and mutations happen inside the task. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() - repo, err := collection.ByName(c.Params.ByName("name")) + name := c.Params.ByName("name") + repo, err := collection.ByName(name) if err != nil { AbortWithJSONError(c, 404, err) return @@ -373,13 +429,23 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li resources := []string{string(repo.Key())} maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.LoadComplete(repo) + // Task: Create fresh factory and collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Fresh load after lock acquired (use captured `name` variable, not gin context) + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err + } + + err = taskCollection.LoadComplete(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } out.Printf("Loading packages...\n") - list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil) + list, err := deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -388,7 +454,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li for _, ref := range b.PackageRefs { var p *deb.Package - p, err = collectionFactory.PackageCollection().ByKey([]byte(ref)) + p, err = taskCollectionFactory.PackageCollection().ByKey([]byte(ref)) if err != nil { if err == database.ErrNotFound { return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("packages %s: %s", ref, err) @@ -404,7 +470,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list)) - err = collectionFactory.LocalRepoCollection().Update(repo) + err = taskCollection.Update(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) } @@ -511,6 +577,8 @@ func apiReposPackageFromDir(c *gin.Context) { return } + // Load shallowly for 404 check and resource key. + // Full load and mutations happen inside the task. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() @@ -534,7 +602,17 @@ func apiReposPackageFromDir(c *gin.Context) { resources := []string{string(repo.Key())} resources = append(resources, sources...) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.LoadComplete(repo) + // Task: Create fresh factory and collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Fresh load after lock acquired + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + err = taskCollection.LoadComplete(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -555,13 +633,13 @@ func apiReposPackageFromDir(c *gin.Context) { packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter) - list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil) + list, err = deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages: %s", err) } processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(), - collectionFactory.PackageCollection(), reporter, nil, collectionFactory.ChecksumCollection) + taskCollectionFactory.PackageCollection(), reporter, nil, taskCollectionFactory.ChecksumCollection) failedFiles = append(failedFiles, failedFiles2...) processedFiles = append(processedFiles, otherFiles...) @@ -571,7 +649,7 @@ func apiReposPackageFromDir(c *gin.Context) { repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list)) - err = collectionFactory.LocalRepoCollection().Update(repo) + err = taskCollection.Update(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) } @@ -650,6 +728,8 @@ func apiReposCopyPackage(c *gin.Context) { return } + // Load shallowly for 404 check and resource keys. + // Full load and mutations happen inside the task. collectionFactory := context.NewCollectionFactory() dstRepo, err := collectionFactory.LocalRepoCollection().ByName(dstRepoName) if err != nil { @@ -673,12 +753,26 @@ func apiReposCopyPackage(c *gin.Context) { resources := []string{string(dstRepo.Key()), string(srcRepo.Key())} maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo) + // Task: Create fresh factory and collections inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + + // Fresh load of both repos after lock acquired + dstRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(dstRepoName) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err) } - err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo) + srcRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(srcRepoName) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err) + } + + err = taskCollectionFactory.LocalRepoCollection().LoadComplete(dstRepo) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err) + } + + err = taskCollectionFactory.LocalRepoCollection().LoadComplete(srcRepo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err) } @@ -691,12 +785,12 @@ func apiReposCopyPackage(c *gin.Context) { RemovedLines: []string{}, } - dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), collectionFactory.PackageCollection(), context.Progress()) + dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), taskCollectionFactory.PackageCollection(), context.Progress()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in dest: %s", err) } - srcList, err := deb.NewPackageListFromRefList(srcRefList, collectionFactory.PackageCollection(), context.Progress()) + srcList, err := deb.NewPackageListFromRefList(srcRefList, taskCollectionFactory.PackageCollection(), context.Progress()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in src: %s", err) } @@ -764,7 +858,7 @@ func apiReposCopyPackage(c *gin.Context) { } else { dstRepo.UpdateRefList(deb.NewPackageRefListFromPackageList(dstList)) - err = collectionFactory.LocalRepoCollection().Update(dstRepo) + err = taskCollectionFactory.LocalRepoCollection().Update(dstRepo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) } @@ -867,6 +961,9 @@ func apiReposIncludePackageFromDir(c *gin.Context) { resources = append(resources, sources...) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + // Task: Create fresh factory and collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + var ( err error verifier = context.GetVerifier() @@ -882,8 +979,8 @@ func apiReposIncludePackageFromDir(c *gin.Context) { changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter) _, failedFiles2, err = deb.ImportChangesFiles( changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier, - repoTemplate, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(), - context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse) + repoTemplate, context.Progress(), taskCollectionFactory.LocalRepoCollection(), taskCollectionFactory.PackageCollection(), + context.PackagePool(), taskCollectionFactory.ChecksumCollection, nil, query.Parse) failedFiles = append(failedFiles, failedFiles2...) if err != nil { diff --git a/system/t12_api/repos.py b/system/t12_api/repos.py index 424f9f49..6448a557 100644 --- a/system/t12_api/repos.py +++ b/system/t12_api/repos.py @@ -461,3 +461,34 @@ class ReposAPITestCopyPackage(APITest): self.check_equal(self.get(f"/api/repos/{repo2_name}/packages").json(), ['Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378']) + + +class ReposAPITestCreateEdit(APITest): + """ + POST /api/repos, + """ + def check(self): + repo_name = self.random_name() + ' with space' + repo_desc = {'Comment': 'fun repo', + 'DefaultComponent': 'contrib', + 'DefaultDistribution': 'bookworm', + 'Name': repo_name} + + resp = self.post("/api/repos", json=repo_desc) + self.check_equal(resp.json(), repo_desc) + self.check_equal(resp.status_code, 201) + + repo_desc = {'Comment': 'modified repo', + 'DefaultComponent': 'main', + 'DefaultDistribution': 'trixie', + 'Name': repo_name + '@renamed'} + resp = self.put(f"/api/repos/{repo_name}", json=repo_desc) + self.check_equal(resp.json(), repo_desc) + self.check_equal(resp.status_code, 200) + + resp = self.get("/api/repos/" + repo_name + '@renamed') + self.check_equal(resp.json(), repo_desc) + self.check_equal(resp.status_code, 200) + + resp = self.delete("/api/repos/" + repo_name + '@renamed') + self.check_equal(resp.status_code, 200) From 722064363c885b1879e3250562f27053edd1d3c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Mon, 25 May 2026 12:13:41 +0000 Subject: [PATCH 3/6] snapshot: fix race conditions * perform collection.LoadComplete inside maybeRunTaskInBackground * have tasks use a fresh copy of taskCollectionFactory, taskCollection * fix locking for snapshots of snapshots by locking SourceSnapshots * use uuids, since names can be renamed --- api/snapshot.go | 223 ++++++++++++++++++++++++++++++++++-------------- deb/snapshot.go | 6 -- 2 files changed, 161 insertions(+), 68 deletions(-) diff --git a/api/snapshot.go b/api/snapshot.go index 2c50566f..7b6422f0 100644 --- a/api/snapshot.go +++ b/api/snapshot.go @@ -83,26 +83,33 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) { } collectionFactory := context.NewCollectionFactory() - collection := collectionFactory.RemoteRepoCollection() - snapshotCollection := collectionFactory.SnapshotCollection() name := c.Params.ByName("name") - repo, err = collection.ByName(name) + repo, err = collectionFactory.RemoteRepoCollection().ByName(name) if err != nil { AbortWithJSONError(c, 404, err) return } // including snapshot resource key - resources := []string{string(repo.Key()), "S" + b.Name} + resources := []string{string(repo.Key())} taskName := fmt.Sprintf("Create snapshot of mirror %s", name) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err := repo.CheckLock() + taskCollectionFactory := context.NewCollectionFactory() + taskMirrorCollection := taskCollectionFactory.RemoteRepoCollection() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + + repo, err := taskMirrorCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + err = repo.CheckLock() if err != nil { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, err } - err = collection.LoadComplete(repo) + err = taskMirrorCollection.LoadComplete(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -116,7 +123,7 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) { snapshot.Description = b.Description } - err = snapshotCollection.Add(snapshot) + err = taskSnapshotCollection.Add(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err } @@ -165,6 +172,7 @@ func apiSnapshotsCreate(c *gin.Context) { } } + // Phase 1: Pre-task validation (shallow load for 404 checks only) collectionFactory := context.NewCollectionFactory() snapshotCollection := collectionFactory.SnapshotCollection() var resources []string @@ -178,37 +186,62 @@ func apiSnapshotsCreate(c *gin.Context) { return } - resources = append(resources, string(sources[i].ResourceKey())) + resources = append(resources, string(sources[i].Key())) } maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - for i := range sources { - err = snapshotCollection.LoadComplete(sources[i]) + // Phase 2: Inside task lock - create fresh factory + taskCollectionFactory := context.NewCollectionFactory() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + taskPackageCollection := taskCollectionFactory.PackageCollection() + + // Fresh load of all sources after lock acquired + freshSources := make([]*deb.Snapshot, len(b.SourceSnapshots)) + for i := range b.SourceSnapshots { + freshSources[i], err = taskSnapshotCollection.ByName(b.SourceSnapshots[i]) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + // LoadComplete on fresh copy + err = taskSnapshotCollection.LoadComplete(freshSources[i]) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } } - list := deb.NewPackageList() + // Merge packages from all source snapshots + var refList *deb.PackageRefList + if len(freshSources) > 0 { + refList = freshSources[0].RefList() + for i := 1; i < len(freshSources); i++ { + refList = refList.Merge(freshSources[i].RefList(), true, false) + } + } else { + refList = deb.NewPackageRefList() + } - // verify package refs and build package list - for _, ref := range b.PackageRefs { - p, err := collectionFactory.PackageCollection().ByKey([]byte(ref)) - if err != nil { - if err == database.ErrNotFound { - return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err) + // Add any explicitly specified package refs on top + if len(b.PackageRefs) > 0 { + list := deb.NewPackageList() + for _, ref := range b.PackageRefs { + p, err := taskPackageCollection.ByKey([]byte(ref)) + if err != nil { + if err == database.ErrNotFound { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err) + } + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + err = list.Add(p) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err } - return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err - } - err = list.Add(p) - if err != nil { - return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err } + refList = refList.Merge(deb.NewPackageRefListFromPackageList(list), true, false) } - snapshot = deb.NewSnapshotFromRefList(b.Name, sources, deb.NewPackageRefListFromPackageList(list), b.Description) + snapshot = deb.NewSnapshotFromRefList(b.Name, freshSources, refList, b.Description) - err = snapshotCollection.Add(snapshot) + err = taskSnapshotCollection.Add(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err } @@ -249,21 +282,28 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) { } collectionFactory := context.NewCollectionFactory() - collection := collectionFactory.LocalRepoCollection() - snapshotCollection := collectionFactory.SnapshotCollection() name := c.Params.ByName("name") - repo, err = collection.ByName(name) + repo, err = collectionFactory.LocalRepoCollection().ByName(name) if err != nil { AbortWithJSONError(c, 404, err) return } // including snapshot resource key - resources := []string{string(repo.Key()), "S" + b.Name} + resources := []string{string(repo.Key())} taskName := fmt.Sprintf("Create snapshot of repo %s", name) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err := collection.LoadComplete(repo) + taskCollectionFactory := context.NewCollectionFactory() + taskRepoCollection := taskCollectionFactory.LocalRepoCollection() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + + repo, err := taskRepoCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + err = taskRepoCollection.LoadComplete(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -277,7 +317,7 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) { snapshot.Description = b.Description } - err = snapshotCollection.Add(snapshot) + err = taskSnapshotCollection.Add(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err } @@ -315,6 +355,7 @@ func apiSnapshotsUpdate(c *gin.Context) { return } + // Phase 1: Pre-task validation (shallow load for 404 check only) collectionFactory := context.NewCollectionFactory() collection := collectionFactory.SnapshotCollection() name := c.Params.ByName("name") @@ -325,14 +366,38 @@ func apiSnapshotsUpdate(c *gin.Context) { return } - resources := []string{string(snapshot.ResourceKey()), "S" + b.Name} - taskName := fmt.Sprintf("Update snapshot %s", name) - maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - _, err := collection.ByName(b.Name) + // Pre-task validation of new name if provided (skip if renaming to same name) + if b.Name != "" && b.Name != name { + _, err = collection.ByName(b.Name) if err == nil { - return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name) + AbortWithJSONError(c, 409, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)) + return + } + } + + resources := []string{string(snapshot.Key())} + taskName := fmt.Sprintf("Update snapshot %s", name) + + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + // Phase 2: Inside task lock - create fresh factory + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.SnapshotCollection() + + // Fresh load after lock acquired + snapshot, err = taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } + // Fresh duplicate check inside lock + if b.Name != "" { + _, err := taskCollection.ByName(b.Name) + if err == nil { + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name) + } + } + + // Update fresh copy if b.Name != "" { snapshot.Name = b.Name } @@ -341,7 +406,7 @@ func apiSnapshotsUpdate(c *gin.Context) { snapshot.Description = b.Description } - err = collectionFactory.SnapshotCollection().Update(snapshot) + err = taskCollection.Update(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -395,9 +460,9 @@ func apiSnapshotsDrop(c *gin.Context) { name := c.Params.ByName("name") force := c.Request.URL.Query().Get("force") == "1" + // Phase 1: Pre-task validation (shallow load for 404 check only) collectionFactory := context.NewCollectionFactory() snapshotCollection := collectionFactory.SnapshotCollection() - publishedCollection := collectionFactory.PublishedRepoCollection() snapshot, err := snapshotCollection.ByName(name) if err != nil { @@ -405,23 +470,37 @@ func apiSnapshotsDrop(c *gin.Context) { return } - resources := []string{string(snapshot.ResourceKey())} + resources := []string{string(snapshot.Key())} taskName := fmt.Sprintf("Delete snapshot %s", name) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - published := publishedCollection.BySnapshot(snapshot) + // Phase 2: Inside task lock - create fresh collections + taskCollectionFactory := context.NewCollectionFactory() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection() + + // Fresh load after lock acquired + snapshot, err := taskSnapshotCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + // Fresh checks with current collections + published := taskPublishedCollection.BySnapshot(snapshot) if len(published) > 0 { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: snapshot is published") } if !force { - snapshots := snapshotCollection.BySnapshotSource(snapshot) + // Using fresh collection for dependency check + snapshots := taskSnapshotCollection.BySnapshotSource(snapshot) if len(snapshots) > 0 { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override") } } - err = snapshotCollection.Drop(snapshot) + err = taskSnapshotCollection.Drop(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -576,6 +655,7 @@ func apiSnapshotsMerge(c *gin.Context) { return } + // Phase 1: Pre-task validation (shallow load for 404 checks only) collectionFactory := context.NewCollectionFactory() snapshotCollection := collectionFactory.SnapshotCollection() @@ -588,36 +668,47 @@ func apiSnapshotsMerge(c *gin.Context) { return } - resources[i] = string(sources[i].ResourceKey()) + resources[i] = string(sources[i].Key()) } maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = snapshotCollection.LoadComplete(sources[0]) - if err != nil { - return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err - } - result := sources[0].RefList() - for i := 1; i < len(sources); i++ { - err = snapshotCollection.LoadComplete(sources[i]) + // Phase 2: Inside task lock - create fresh factory + taskCollectionFactory := context.NewCollectionFactory() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + + // Fresh load of all sources inside task + freshSources := make([]*deb.Snapshot, len(body.Sources)) + for i := range body.Sources { + freshSources[i], err = taskSnapshotCollection.ByName(body.Sources[i]) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } - result = result.Merge(sources[i].RefList(), overrideMatching, false) + // LoadComplete on fresh copy + err = taskSnapshotCollection.LoadComplete(freshSources[i]) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + } + + // Merge using fresh sources + result := freshSources[0].RefList() + for i := 1; i < len(freshSources); i++ { + result = result.Merge(freshSources[i].RefList(), overrideMatching, false) } if latest { result.FilterLatestRefs() } - sourceDescription := make([]string, len(sources)) - for i, s := range sources { + sourceDescription := make([]string, len(freshSources)) + for i, s := range freshSources { sourceDescription[i] = fmt.Sprintf("'%s'", s.Name) } - snapshot = deb.NewSnapshotFromRefList(name, sources, result, + snapshot = deb.NewSnapshotFromRefList(name, freshSources, result, fmt.Sprintf("Merged from sources: %s", strings.Join(sourceDescription, ", "))) - err = collectionFactory.SnapshotCollection().Add(snapshot) + err = taskCollectionFactory.SnapshotCollection().Add(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create snapshot: %s", err) } @@ -698,24 +789,32 @@ func apiSnapshotsPull(c *gin.Context) { return } - resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())} + resources := []string{string(sourceSnapshot.Key()), string(toSnapshot.Key())} taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collectionFactory.SnapshotCollection().LoadComplete(toSnapshot) + // Phase 2: Inside task lock - create fresh factory + taskCollectionFactory := context.NewCollectionFactory() + + // Fresh load of snapshots after lock acquired + freshToSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(name) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } - err = collectionFactory.SnapshotCollection().LoadComplete(sourceSnapshot) + freshSourceSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(body.Source) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + err = taskCollectionFactory.SnapshotCollection().LoadComplete(freshSourceSnapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } // convert snapshots to package list - toPackageList, err := deb.NewPackageListFromRefList(toSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress()) + toPackageList, err := deb.NewPackageListFromRefList(freshToSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } - sourcePackageList, err := deb.NewPackageListFromRefList(sourceSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress()) + sourcePackageList, err := deb.NewPackageListFromRefList(freshSourceSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -812,10 +911,10 @@ func apiSnapshotsPull(c *gin.Context) { } // Create snapshot - destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{toSnapshot, sourceSnapshot}, toPackageList, - fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", toSnapshot.Name, sourceSnapshot.Name, strings.Join(body.Queries, ", "))) + destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{freshToSnapshot, freshSourceSnapshot}, toPackageList, + fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", freshToSnapshot.Name, freshSourceSnapshot.Name, strings.Join(body.Queries, ", "))) - err = collectionFactory.SnapshotCollection().Add(destinationSnapshot) + err = taskCollectionFactory.SnapshotCollection().Add(destinationSnapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } diff --git a/deb/snapshot.go b/deb/snapshot.go index f79a50d2..c308403c 100644 --- a/deb/snapshot.go +++ b/deb/snapshot.go @@ -143,12 +143,6 @@ func (s *Snapshot) Key() []byte { return []byte("S" + s.UUID) } -// ResourceKey is a unique identifier of the resource -// this snapshot uses. Instead of uuid it uses name -// which needs to be unique as well. -func (s *Snapshot) ResourceKey() []byte { - return []byte("S" + s.Name) -} // RefKey is a unique id for package reference list func (s *Snapshot) RefKey() []byte { From dfa34157a005c11c62a0c12d6689cba615bf87fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Mon, 25 May 2026 12:17:05 +0000 Subject: [PATCH 4/6] mirror: fix race conditions * load data inside background tasks Perform collection.LoadComplete inside maybeRunTaskInBackground Have tasks use a fresh copy of taskCollectionFactory, taskCollection --- api/mirror.go | 62 ++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 15 deletions(-) diff --git a/api/mirror.go b/api/mirror.go index 743101fc..afc7d007 100644 --- a/api/mirror.go +++ b/api/mirror.go @@ -216,9 +216,9 @@ func apiMirrorsDrop(c *gin.Context) { name := c.Params.ByName("name") force := c.Request.URL.Query().Get("force") == "1" + // Phase 1: Pre-task validation (shallow load for 404 check only) collectionFactory := context.NewCollectionFactory() mirrorCollection := collectionFactory.RemoteRepoCollection() - snapshotCollection := collectionFactory.SnapshotCollection() repo, err := mirrorCollection.ByName(name) if err != nil { @@ -228,21 +228,34 @@ func apiMirrorsDrop(c *gin.Context) { resources := []string{string(repo.Key())} taskName := fmt.Sprintf("Delete mirror %s", name) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err := repo.CheckLock() + // Phase 2: Inside task lock - create fresh collections + taskCollectionFactory := context.NewCollectionFactory() + taskMirrorCollection := taskCollectionFactory.RemoteRepoCollection() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + + // Fresh load after lock acquired + repo, err := taskMirrorCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err) + } + + err = repo.CheckLock() if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err) } if !force { - snapshots := snapshotCollection.ByRemoteRepoSource(repo) + // Fresh checks with current collections + snapshots := taskSnapshotCollection.ByRemoteRepoSource(repo) if len(snapshots) > 0 { return &task.ProcessReturnValue{Code: http.StatusForbidden, Value: nil}, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override") } } - err = mirrorCollection.Drop(repo) + err = taskMirrorCollection.Drop(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err) } @@ -535,7 +548,8 @@ func apiMirrorsUpdate(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.RemoteRepoCollection() - remote, err = collection.ByName(c.Params.ByName("name")) + name := c.Params.ByName("name") + remote, err = collection.ByName(name) if err != nil { AbortWithJSONError(c, 404, err) return @@ -550,6 +564,7 @@ func apiMirrorsUpdate(c *gin.Context) { return } + // Pre-task validation of new name if provided if b.Name != remote.Name { _, err = collection.ByName(b.Name) if err == nil { @@ -566,9 +581,26 @@ func apiMirrorsUpdate(c *gin.Context) { resources := []string{string(remote.Key())} maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { + // Phase 2: Inside task lock - create fresh factory + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.RemoteRepoCollection() + + // Fresh load after lock acquired (use captured `name` variable, not gin context) + remote, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + // Fresh rename check inside lock (if renaming) + if b.Name != remote.Name { + _, err := taskCollection.ByName(b.Name) + if err == nil { + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: mirror %s already exists", b.Name) + } + } downloader := context.NewDownloader(out) - err := remote.Fetch(downloader, verifier, b.IgnoreSignatures) + err = remote.Fetch(downloader, verifier, b.IgnoreSignatures) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } @@ -580,14 +612,14 @@ func apiMirrorsUpdate(c *gin.Context) { } } - err = remote.DownloadPackageIndexes(out, downloader, verifier, collectionFactory, b.IgnoreSignatures, remote.SkipComponentCheck) + err = remote.DownloadPackageIndexes(out, downloader, verifier, taskCollectionFactory, b.IgnoreSignatures, remote.SkipComponentCheck) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } if remote.DownloadAppStream && !remote.IsFlat() { err = remote.DownloadAppStreamFiles(out, downloader, - context.PackagePool(), collectionFactory.ChecksumCollection(nil), b.IgnoreChecksums) + context.PackagePool(), taskCollectionFactory.ChecksumCollection(nil), b.IgnoreChecksums) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } @@ -607,8 +639,8 @@ func apiMirrorsUpdate(c *gin.Context) { } } - queue, downloadSize, err := remote.BuildDownloadQueue(context.PackagePool(), collectionFactory.PackageCollection(), - collectionFactory.ChecksumCollection(nil), b.SkipExistingPackages, b.LatestOnly) + queue, downloadSize, err := remote.BuildDownloadQueue(context.PackagePool(), taskCollectionFactory.PackageCollection(), + taskCollectionFactory.ChecksumCollection(nil), b.SkipExistingPackages, b.LatestOnly) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } @@ -618,12 +650,12 @@ func apiMirrorsUpdate(c *gin.Context) { e := context.ReOpenDatabase() if e == nil { remote.MarkAsIdle() - _ = collection.Update(remote) + _ = taskCollection.Update(remote) } }() remote.MarkAsUpdating() - err = collection.Update(remote) + err = taskCollection.Update(remote) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } @@ -727,7 +759,7 @@ func apiMirrorsUpdate(c *gin.Context) { } // and import it back to the pool - task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, collectionFactory.ChecksumCollection(nil)) + task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, taskCollectionFactory.ChecksumCollection(nil)) if err != nil { //return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to import file: %s", err) pushError(err) @@ -780,8 +812,8 @@ func apiMirrorsUpdate(c *gin.Context) { } log.Info().Msgf("%s: Finalizing download...", b.Name) - _ = remote.FinalizeDownload(collectionFactory, out) - err = collectionFactory.RemoteRepoCollection().Update(remote) + _ = remote.FinalizeDownload(taskCollectionFactory, out) + err = taskCollection.Update(remote) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } From af1df410a27e26da7b45dc8a6177fffe4fdc633a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Mon, 25 May 2026 15:39:48 +0000 Subject: [PATCH 5/6] tasks: fix race conditions * show resources in task details * fix task state locking * return task object consistently Race condition iexisted where task State, err, and processReturnValue fields were written by consumer goroutine and read by concurrent accessors without proper synchronization, causing torn reads and data races. --- task/list.go | 63 +++++++++++++++++++++++++++++++--------------------- task/task.go | 5 +++-- 2 files changed, 41 insertions(+), 27 deletions(-) diff --git a/task/list.go b/task/list.go index 5b9e9395..6a1a720d 100644 --- a/task/list.go +++ b/task/list.go @@ -44,28 +44,30 @@ func (list *List) consumer() { for { select { case task := <-list.queue: + // Set task state to RUNNING before processing list.Lock() - { - task.State = RUNNING - } + task.State = RUNNING list.Unlock() go func() { retValue, err := task.process(aptly.Progress(task.output), task.detail) + // Update task completion state and cleanup with list lock held list.Lock() { - task.processReturnValue = retValue - task.err = err if err != nil { task.output.Printf("Task failed with error: %v", err) task.State = FAILED + task.err = err + task.processReturnValue = retValue } else { task.output.Print("Task succeeded") task.State = SUCCEEDED + task.err = nil + task.processReturnValue = retValue } - list.usedResources.Free(task.resources) + list.usedResources.Free(task.Resources) task.wgTask.Done() list.wg.Done() @@ -74,9 +76,9 @@ func (list *List) consumer() { for _, t := range list.tasks { if t.State == IDLE { // check resources - blockingTasks := list.usedResources.UsedBy(t.resources) + blockingTasks := list.usedResources.UsedBy(t.Resources) if len(blockingTasks) == 0 { - list.usedResources.MarkInUse(t.resources, t) + list.usedResources.MarkInUse(t.Resources, t) // unlock list since queueing may block list.Unlock() unlocked = true @@ -105,13 +107,15 @@ func (list *List) Stop() { // GetTasks gets complete list of tasks func (list *List) GetTasks() []Task { - tasks := []Task{} list.Lock() + defer list.Unlock() + + tasks := []Task{} for _, task := range list.tasks { + // Copy task while holding list lock tasks = append(tasks, *task) } - list.Unlock() return tasks } @@ -139,11 +143,11 @@ func (list *List) DeleteTaskByID(ID int) (Task, error) { // GetTaskByID returns task with given id func (list *List) GetTaskByID(ID int) (Task, error) { list.Lock() - tasks := list.tasks - list.Unlock() + defer list.Unlock() - for _, task := range tasks { + for _, task := range list.tasks { if task.ID == ID { + // Copy task while holding list lock return *task, nil } } @@ -180,13 +184,16 @@ func (list *List) GetTaskDetailByID(ID int) (interface{}, error) { // GetTaskReturnValueByID returns process return value of task with given id func (list *List) GetTaskReturnValueByID(ID int) (*ProcessReturnValue, error) { - task, err := list.GetTaskByID(ID) + list.Lock() + defer list.Unlock() - if err != nil { - return nil, err + for _, task := range list.tasks { + if task.ID == ID { + return task.processReturnValue, nil + } } - return task.processReturnValue, nil + return nil, fmt.Errorf("could not find task with id %v", ID) } // RunTaskInBackground creates task and runs it in background. This will block until the necessary resources @@ -204,11 +211,15 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P list.wg.Add(1) task.wgTask.Add(1) + // Copy task while still holding the lock to avoid racing with consumer + // setting State=RUNNING after receiving from queue + taskCopy := *task + // add task to queue for processing if resources are available // if not, task will be queued by the consumer once resources are available tasks := list.usedResources.UsedBy(resources) if len(tasks) == 0 { - list.usedResources.MarkInUse(task.resources, task) + list.usedResources.MarkInUse(task.Resources, task) // queueing task might block if channel not ready, unlock list before queueing list.Unlock() list.queue <- task @@ -216,12 +227,13 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P list.Unlock() } - return *task, nil + return taskCopy, nil } // Clear removes finished tasks from list func (list *List) Clear() { list.Lock() + defer list.Unlock() var tasks []*Task for _, task := range list.tasks { @@ -230,8 +242,6 @@ func (list *List) Clear() { } } list.tasks = tasks - - list.Unlock() } // Wait waits till all tasks are processed @@ -254,11 +264,14 @@ func (list *List) WaitForTaskByID(ID int) (Task, error) { // GetTaskErrorByID returns the Task error for a given id func (list *List) GetTaskErrorByID(ID int) (error, error) { - task, err := list.GetTaskByID(ID) + list.Lock() + defer list.Unlock() - if err != nil { - return nil, err + for _, task := range list.tasks { + if task.ID == ID { + return task.err, nil + } } - return task.err, nil + return nil, fmt.Errorf("could not find task with id %v", ID) } diff --git a/task/task.go b/task/task.go index 02aa7037..72f60699 100644 --- a/task/task.go +++ b/task/task.go @@ -42,6 +42,7 @@ const ( ) // Task represents as task in a queue encapsulates process code +// All fields are protected by List.Mutex - access task fields only while holding list.Lock() type Task struct { output *Output detail *Detail @@ -51,7 +52,7 @@ type Task struct { Name string ID int State State - resources []string + Resources []string wgTask *sync.WaitGroup } @@ -64,7 +65,7 @@ func NewTask(process Process, name string, ID int, resources []string, wgTask *s Name: name, ID: ID, State: IDLE, - resources: resources, + Resources: resources, wgTask: wgTask, } return task From caed9c234dd87b202a1d60bc01157b1a3987f9bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Sat, 13 Jun 2026 12:42:00 +0200 Subject: [PATCH 6/6] publish: support MultiDist toggle --- api/publish.go | 24 ++++++ deb/publish.go | 46 ++++++++++++ system/t12_api/publish.py | 154 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 224 insertions(+) diff --git a/api/publish.go b/api/publish.go index 5ab7ccdb..80859680 100644 --- a/api/publish.go +++ b/api/publish.go @@ -530,6 +530,9 @@ func apiPublishUpdateSwitch(c *gin.Context) { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } + // Capture MultiDist before mutations to detect a false→true transition. + prevMultiDist := published.MultiDist + // Apply field mutations on the freshly loaded object. if b.SkipContents != nil { published.SkipContents = *b.SkipContents @@ -589,6 +592,15 @@ func apiPublishUpdateSwitch(c *gin.Context) { if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } + // When MultiDist is toggled, the old pool layout still has files that + // CleanupPrefixComponentFiles won't touch (it only scans the new layout). + // Run a second pass over the previous layout to remove stale files. + if prevMultiDist != published.MultiDist { + err = taskCollection.CleanupAfterMultiDistToggle(context, published, prevMultiDist, cleanComponents, taskCollectionFactory, out) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to clean legacy pool: %s", err) + } + } } return &task.ProcessReturnValue{Code: http.StatusOK, Value: published}, nil @@ -1196,6 +1208,9 @@ func apiPublishUpdate(c *gin.Context) { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } + // Capture MultiDist before mutations to detect a false→true transition. + prevMultiDist := published.MultiDist + // Apply field mutations on the freshly loaded object. if b.SkipContents != nil { published.SkipContents = *b.SkipContents @@ -1244,6 +1259,15 @@ func apiPublishUpdate(c *gin.Context) { if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } + // When MultiDist is toggled, the old pool layout still has files that + // CleanupPrefixComponentFiles won't touch (it only scans the new layout). + // Run a second pass over the previous layout to remove stale files. + if prevMultiDist != published.MultiDist { + err = taskCollection.CleanupAfterMultiDistToggle(context, published, prevMultiDist, cleanComponents, taskCollectionFactory, out) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to clean legacy pool: %s", err) + } + } } return &task.ProcessReturnValue{Code: http.StatusOK, Value: published}, nil diff --git a/deb/publish.go b/deb/publish.go index dbaa81a7..a17dae81 100644 --- a/deb/publish.go +++ b/deb/publish.go @@ -1598,6 +1598,52 @@ func (collection *PublishedRepoCollection) listReferencedFilesByComponent(prefix return referencedFiles, nil } +// CleanupAfterMultiDistToggle cleans up stale pool files left behind when the +// MultiDist flag is toggled on a published repository. +// +// - false→true: Publish() wrote packages into pool/// +// but the old flat pool// files were not removed because +// CleanupPrefixComponentFiles only scans the new MultiDist tree. +// A second pass with MultiDist=false cleans the legacy flat layout by +// reusing the existing orphan-detection logic (the repo is now MultiDist=true +// so it is excluded from the referenced-files scan, making its old pool +// entries appear orphaned). +// +// - true→false: Publish() wrote packages into pool// but the old +// per-distribution pool/// directories were not +// removed. The orphan-detection approach cannot be used here because the +// repo's RefList still contains all packages (they just moved locations). +// Instead we directly remove each pool/// directory. +// This is safe because per-distribution pool dirs are exclusive to a single +// prefix+distribution combination — no other published repo can share them. +func (collection *PublishedRepoCollection) CleanupAfterMultiDistToggle(publishedStorageProvider aptly.PublishedStorageProvider, + published *PublishedRepo, prevMultiDist bool, cleanComponents []string, collectionFactory *CollectionFactory, progress aptly.Progress) error { + if prevMultiDist == published.MultiDist { + return nil + } + + if !prevMultiDist && published.MultiDist { + // false→true: use orphan-detection via the existing cleanup, but with + // MultiDist temporarily set to false so it scans the flat pool layout. + legacy := *published + legacy.MultiDist = false + return collection.CleanupPrefixComponentFiles(publishedStorageProvider, &legacy, cleanComponents, collectionFactory, progress) + } + + // true→false: directly remove the per-distribution pool directories. + publishedStorage := publishedStorageProvider.GetPublishedStorage(published.Storage) + for _, component := range cleanComponents { + poolDir := filepath.Join(published.Prefix, "pool", published.Distribution, component) + if err := publishedStorage.RemoveDirs(poolDir, progress); err != nil { + return err + } + } + // Remove the distribution-level pool dir if it is now empty. + distPoolDir := filepath.Join(published.Prefix, "pool", published.Distribution) + _ = publishedStorage.RemoveDirs(distPoolDir, progress) + return nil +} + // CleanupPrefixComponentFiles removes all unreferenced files in published storage under prefix/component pair func (collection *PublishedRepoCollection) CleanupPrefixComponentFiles(publishedStorageProvider aptly.PublishedStorageProvider, published *PublishedRepo, cleanComponents []string, collectionFactory *CollectionFactory, progress aptly.Progress) error { diff --git a/system/t12_api/publish.py b/system/t12_api/publish.py index cd92f775..e83ae2c4 100644 --- a/system/t12_api/publish.py +++ b/system/t12_api/publish.py @@ -666,6 +666,160 @@ class PublishUpdateAPIMultiDist(APITest): self.check_not_exists("public/" + prefix + "dists/") +class PublishUpdateAPIMultiDistToggle(APITest): + """ + POST /publish/:prefix with MultiDist=false, then PUT to enable MultiDist=true + """ + fixtureGpg = True + + def check(self): + repo_name = self.random_name() + self.check_equal(self.post( + "/api/repos", json={"Name": repo_name, "DefaultDistribution": "bookworm"}).status_code, 201) + + d = self.random_name() + self.check_equal(self.upload("/api/files/" + d, + "libboost-program-options-dev_1.49.0.1_i386.deb", "pyspi_0.6.1-1.3.dsc", + "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz", + "pyspi-0.6.1-1.3.stripped.dsc").status_code, 200) + + task = self.post_task("/api/repos/" + repo_name + "/file/" + d) + self.check_task(task) + + # Publish with MultiDist=false (default) + prefix = self.random_name() + task = self.post_task( + "/api/publish/" + prefix, + json={ + "Architectures": ["i386", "source"], + "SourceKind": "local", + "Sources": [{"Name": repo_name}], + "Signing": DefaultSigningOptions, + "MultiDist": False, + } + ) + self.check_task(task) + + repo_expected = { + 'AcquireByHash': False, + 'Architectures': ['i386', 'source'], + 'Codename': '', + 'Distribution': 'bookworm', + 'Label': '', + 'Origin': '', + 'Version': '', + 'NotAutomatic': '', + 'ButAutomaticUpgrades': '', + 'Path': prefix + '/' + 'bookworm', + 'Prefix': prefix, + 'SignedBy': '', + 'SkipContents': False, + 'MultiDist': False, + 'SourceKind': 'local', + 'Sources': [{'Component': 'main', 'Name': repo_name}], + 'Storage': '', + 'Suite': ''} + + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) + + # With MultiDist=false packages are stored under pool/main/... + self.check_exists("public/" + prefix + "/dists/bookworm/Release") + self.check_exists("public/" + prefix + + "/dists/bookworm/main/binary-i386/Packages") + self.check_exists("public/" + prefix + + "/dists/bookworm/main/source/Sources") + self.check_exists( + "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + self.check_exists( + "public/" + prefix + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") + # MultiDist-style per-distribution pool must not exist yet + self.check_not_exists( + "public/" + prefix + "/pool/bookworm/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + + # Now update the published repo enabling MultiDist=true + task = self.put_task( + "/api/publish/" + prefix + "/bookworm", + json={ + "MultiDist": True, + "Signing": DefaultSigningOptions, + } + ) + self.check_task(task) + + repo_expected_multidist = { + 'AcquireByHash': False, + 'Architectures': ['i386', 'source'], + 'Codename': '', + 'Distribution': 'bookworm', + 'Label': '', + 'Origin': '', + 'Version': '', + 'NotAutomatic': '', + 'ButAutomaticUpgrades': '', + 'Path': prefix + '/' + 'bookworm', + 'Prefix': prefix, + 'SignedBy': '', + 'SkipContents': False, + 'MultiDist': True, + 'SourceKind': 'local', + 'Sources': [{'Component': 'main', 'Name': repo_name}], + 'Storage': '', + 'Suite': ''} + + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected_multidist, all_repos.json()) + + # After enabling MultiDist, packages are stored under pool//main/... + self.check_exists("public/" + prefix + "/dists/bookworm/Release") + self.check_exists("public/" + prefix + + "/dists/bookworm/main/binary-i386/Packages") + self.check_exists("public/" + prefix + + "/dists/bookworm/main/source/Sources") + self.check_exists( + "public/" + prefix + "/pool/bookworm/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + self.check_exists( + "public/" + prefix + "/pool/bookworm/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") + # Flat pool must not exist while MultiDist is on + self.check_not_exists( + "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + + # Switch MultiDist back to false + task = self.put_task( + "/api/publish/" + prefix + "/bookworm", + json={ + "MultiDist": False, + "Signing": DefaultSigningOptions, + } + ) + self.check_task(task) + + repo_expected["MultiDist"] = False + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) + + # Packages are back under the flat pool/main/... + self.check_exists("public/" + prefix + "/dists/bookworm/Release") + self.check_exists("public/" + prefix + + "/dists/bookworm/main/binary-i386/Packages") + self.check_exists("public/" + prefix + + "/dists/bookworm/main/source/Sources") + self.check_exists( + "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + self.check_exists( + "public/" + prefix + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") + # Per-distribution pool must be gone + self.check_not_exists( + "public/" + prefix + "/pool/bookworm/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + + task = self.delete_task("/api/publish/" + prefix + "/bookworm") + self.check_task(task) + self.check_not_exists("public/" + prefix + "dists/") + + class PublishConcurrentUpdateAPITestRepo(APITest): """ PUT /publish/:prefix/:distribution (local repos), DELETE /publish/:prefix/:distribution