diff --git a/api/mirror.go b/api/mirror.go index 743101fc..afc7d007 100644 --- a/api/mirror.go +++ b/api/mirror.go @@ -216,9 +216,9 @@ func apiMirrorsDrop(c *gin.Context) { name := c.Params.ByName("name") force := c.Request.URL.Query().Get("force") == "1" + // Phase 1: Pre-task validation (shallow load for 404 check only) collectionFactory := context.NewCollectionFactory() mirrorCollection := collectionFactory.RemoteRepoCollection() - snapshotCollection := collectionFactory.SnapshotCollection() repo, err := mirrorCollection.ByName(name) if err != nil { @@ -228,21 +228,34 @@ func apiMirrorsDrop(c *gin.Context) { resources := []string{string(repo.Key())} taskName := fmt.Sprintf("Delete mirror %s", name) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err := repo.CheckLock() + // Phase 2: Inside task lock - create fresh collections + taskCollectionFactory := context.NewCollectionFactory() + taskMirrorCollection := taskCollectionFactory.RemoteRepoCollection() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + + // Fresh load after lock acquired + repo, err := taskMirrorCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err) + } + + err = repo.CheckLock() if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err) } if !force { - snapshots := snapshotCollection.ByRemoteRepoSource(repo) + // Fresh checks with current collections + snapshots := taskSnapshotCollection.ByRemoteRepoSource(repo) if len(snapshots) > 0 { return &task.ProcessReturnValue{Code: http.StatusForbidden, Value: nil}, fmt.Errorf("won't delete mirror with snapshots, use 'force=1' to override") } } - err = mirrorCollection.Drop(repo) + err = taskMirrorCollection.Drop(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %v", err) } @@ -535,7 +548,8 @@ func apiMirrorsUpdate(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.RemoteRepoCollection() - remote, err = collection.ByName(c.Params.ByName("name")) + name := c.Params.ByName("name") + remote, err = collection.ByName(name) if err != nil { AbortWithJSONError(c, 404, err) return @@ -550,6 +564,7 @@ func apiMirrorsUpdate(c *gin.Context) { return } + // Pre-task validation of new name if provided if b.Name != remote.Name { _, err = collection.ByName(b.Name) if err == nil { @@ -566,9 +581,26 @@ func apiMirrorsUpdate(c *gin.Context) { resources := []string{string(remote.Key())} maybeRunTaskInBackground(c, "Update mirror "+b.Name, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { + // Phase 2: Inside task lock - create fresh factory + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.RemoteRepoCollection() + + // Fresh load after lock acquired (use captured `name` variable, not gin context) + remote, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + // Fresh rename check inside lock (if renaming) + if b.Name != remote.Name { + _, err := taskCollection.ByName(b.Name) + if err == nil { + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: mirror %s already exists", b.Name) + } + } downloader := context.NewDownloader(out) - err := remote.Fetch(downloader, verifier, b.IgnoreSignatures) + err = remote.Fetch(downloader, verifier, b.IgnoreSignatures) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } @@ -580,14 +612,14 @@ func apiMirrorsUpdate(c *gin.Context) { } } - err = remote.DownloadPackageIndexes(out, downloader, verifier, collectionFactory, b.IgnoreSignatures, remote.SkipComponentCheck) + err = remote.DownloadPackageIndexes(out, downloader, verifier, taskCollectionFactory, b.IgnoreSignatures, remote.SkipComponentCheck) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } if remote.DownloadAppStream && !remote.IsFlat() { err = remote.DownloadAppStreamFiles(out, downloader, - context.PackagePool(), collectionFactory.ChecksumCollection(nil), b.IgnoreChecksums) + context.PackagePool(), taskCollectionFactory.ChecksumCollection(nil), b.IgnoreChecksums) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } @@ -607,8 +639,8 @@ func apiMirrorsUpdate(c *gin.Context) { } } - queue, downloadSize, err := remote.BuildDownloadQueue(context.PackagePool(), collectionFactory.PackageCollection(), - collectionFactory.ChecksumCollection(nil), b.SkipExistingPackages, b.LatestOnly) + queue, downloadSize, err := remote.BuildDownloadQueue(context.PackagePool(), taskCollectionFactory.PackageCollection(), + taskCollectionFactory.ChecksumCollection(nil), b.SkipExistingPackages, b.LatestOnly) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } @@ -618,12 +650,12 @@ func apiMirrorsUpdate(c *gin.Context) { e := context.ReOpenDatabase() if e == nil { remote.MarkAsIdle() - _ = collection.Update(remote) + _ = taskCollection.Update(remote) } }() remote.MarkAsUpdating() - err = collection.Update(remote) + err = taskCollection.Update(remote) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } @@ -727,7 +759,7 @@ func apiMirrorsUpdate(c *gin.Context) { } // and import it back to the pool - task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, collectionFactory.ChecksumCollection(nil)) + task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, taskCollectionFactory.ChecksumCollection(nil)) if err != nil { //return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to import file: %s", err) pushError(err) @@ -780,8 +812,8 @@ func apiMirrorsUpdate(c *gin.Context) { } log.Info().Msgf("%s: Finalizing download...", b.Name) - _ = remote.FinalizeDownload(collectionFactory, out) - err = collectionFactory.RemoteRepoCollection().Update(remote) + _ = remote.FinalizeDownload(taskCollectionFactory, out) + err = taskCollection.Update(remote) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } diff --git a/api/publish.go b/api/publish.go index 67b260d4..80859680 100644 --- a/api/publish.go +++ b/api/publish.go @@ -267,7 +267,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { return } - resources = append(resources, string(snapshot.ResourceKey())) + resources = append(resources, string(snapshot.Key())) sources = append(sources, snapshot) } } else if b.SourceKind == deb.SourceLocalRepo { @@ -298,11 +298,24 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { multiDist = *b.MultiDist } - collection := collectionFactory.PublishedRepoCollection() + // Non-MultiDist publishes share a single pool/ directory under the + // prefix. Lock at the prefix level so that concurrent publish/drop + // operations on sibling distributions cannot race during cleanup. + if !multiDist { + storagePrefix := prefix + if storage != "" { + storagePrefix = storage + ":" + prefix + } + + resources = append(resources, deb.PrefixPoolLockKey(storagePrefix)) + } taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"", b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`)) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + taskDetail := task.PublishDetail{ Detail: detail, } @@ -314,10 +327,10 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { for _, source := range sources { switch s := source.(type) { case *deb.Snapshot: - snapshotCollection := collectionFactory.SnapshotCollection() + snapshotCollection := taskCollectionFactory.SnapshotCollection() err = snapshotCollection.LoadComplete(s) case *deb.LocalRepo: - localCollection := collectionFactory.LocalRepoCollection() + localCollection := taskCollectionFactory.LocalRepoCollection() err = localCollection.LoadComplete(s) default: err = fmt.Errorf("unexpected type for source: %T", source) @@ -327,13 +340,11 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { } } - published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, multiDist) + published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, taskCollectionFactory, multiDist) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) } - resources = append(resources, string(published.Key())) - if b.Origin != "" { published.Origin = b.Origin } @@ -367,18 +378,18 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { published.Version = b.Version } - duplicate := collection.CheckDuplicate(published) + duplicate := taskCollection.CheckDuplicate(published) if duplicate != nil { - _ = collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory) + _ = taskCollectionFactory.PublishedRepoCollection().LoadComplete(duplicate, taskCollectionFactory) return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath()) + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) } - err = collection.Add(published) + err = taskCollection.Add(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -458,6 +469,7 @@ func apiPublishUpdateSwitch(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() snapshotCollection := collectionFactory.SnapshotCollection() + localRepoCollection := collectionFactory.LocalRepoCollection() published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { @@ -465,64 +477,88 @@ func apiPublishUpdateSwitch(c *gin.Context) { return } + resources := []string{string(published.Key())} + if published.SourceKind == deb.SourceLocalRepo { if len(b.Snapshots) > 0 { AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo")) return } - } else if published.SourceKind == deb.SourceSnapshot { - for _, snapshotInfo := range b.Snapshots { - _, err2 := snapshotCollection.ByName(snapshotInfo.Name) + for _, uuid := range published.Sources { + repo, err2 := localRepoCollection.ByUUID(uuid) if err2 != nil { AbortWithJSONError(c, http.StatusNotFound, err2) return } + resources = append(resources, string(repo.Key())) + } + } else if published.SourceKind == deb.SourceSnapshot { + for _, snapshotInfo := range b.Snapshots { + snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(snapshot.Key())) } } else { AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type")) return } - if b.SkipContents != nil { - published.SkipContents = *b.SkipContents + // Non-MultiDist distributions share a single pool/ directory under the + // prefix. Acquire the prefix-level pool lock so that concurrent updates + // on sibling distributions are serialised and cannot race during cleanup. + if !published.MultiDist { + resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix())) } - if b.SkipBz2 != nil { - published.SkipBz2 = *b.SkipBz2 - } - - if b.AcquireByHash != nil { - published.AcquireByHash = *b.AcquireByHash - } - - if b.SignedBy != nil { - published.SignedBy = *b.SignedBy - } - - if b.MultiDist != nil { - published.MultiDist = *b.MultiDist - } - - if b.Label != nil { - published.Label = *b.Label - } - - if b.Origin != nil { - published.Origin = *b.Origin - } - - if b.Version != nil { - published.Version = *b.Version - } - - resources := []string{string(published.Key())} + // Field mutations and fresh DB load are deferred to inside the task so + // they always operate on a consistent state after the lock is held. taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.LoadComplete(published, collectionFactory) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + // Capture MultiDist before mutations to detect a false→true transition. + prevMultiDist := published.MultiDist + + // Apply field mutations on the freshly loaded object. + if b.SkipContents != nil { + published.SkipContents = *b.SkipContents + } + if b.SkipBz2 != nil { + published.SkipBz2 = *b.SkipBz2 + } + if b.AcquireByHash != nil { + published.AcquireByHash = *b.AcquireByHash + } + if b.SignedBy != nil { + published.SignedBy = *b.SignedBy + } + if b.MultiDist != nil { + published.MultiDist = *b.MultiDist + } + if b.Label != nil { + published.Label = *b.Label + } + if b.Origin != nil { + published.Origin = *b.Origin + } + if b.Version != nil { + published.Version = *b.Version + } + revision := published.ObtainRevision() sources := revision.Sources @@ -534,17 +570,17 @@ func apiPublishUpdateSwitch(c *gin.Context) { } } - result, err := published.Update(collectionFactory, out) + result, err := published.Update(taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = collection.Update(published) + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -552,10 +588,19 @@ func apiPublishUpdateSwitch(c *gin.Context) { if b.SkipCleanup == nil || !*b.SkipCleanup { cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources)) cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...) - err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out) + err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } + // When MultiDist is toggled, the old pool layout still has files that + // CleanupPrefixComponentFiles won't touch (it only scans the new layout). + // Run a second pass over the previous layout to remove stale files. + if prevMultiDist != published.MultiDist { + err = taskCollection.CleanupAfterMultiDistToggle(context, published, prevMultiDist, cleanComponents, taskCollectionFactory, out) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to clean legacy pool: %s", err) + } + } } return &task.ProcessReturnValue{Code: http.StatusOK, Value: published}, nil @@ -600,10 +645,19 @@ func apiPublishDrop(c *gin.Context) { } resources := []string{string(published.Key())} + // Non-MultiDist distributions share a single pool/ directory under the + // prefix. Acquire the prefix-level pool lock so that a drop cannot race + // with a concurrent update or drop of a sibling distribution during cleanup. + if !published.MultiDist { + resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix())) + } taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err := collection.Remove(context, storage, prefix, distribution, - collectionFactory, out, force, skipCleanup) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + err := taskCollection.Remove(context, storage, prefix, distribution, + taskCollectionFactory, out, force, skipCleanup) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %s", err) } @@ -639,43 +693,52 @@ func apiPublishAddSource(c *gin.Context) { storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) + if c.Bind(&b) != nil { + return + } + collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly (no LoadComplete) to verify existence and obtain the + // resource key and task name. The actual mutation is performed inside + // the task on a freshly loaded copy to prevent lost-update races. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to create: %s", err)) - return - } - - if c.Bind(&b) != nil { - return - } - - revision := published.ObtainRevision() - sources := revision.Sources - - component := b.Component - name := b.Name - - _, exists := sources[component] - if exists { - AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("unable to create: Component '%s' already exists", component)) - return - } - - sources[component] = name - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to create: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create: %s", err) + } + + revision := published.ObtainRevision() + sources := revision.Sources + + component := b.Component + name := b.Name + + _, exists := sources[component] + if exists { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("unable to create: Component '%s' already exists", component) + } + + sources[component] = name + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -757,39 +820,48 @@ func apiPublishSetSources(c *gin.Context) { storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) + if c.Bind(&b) != nil { + return + } + collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } - - if c.Bind(&b) != nil { - return - } - - revision := published.ObtainRevision() - sources := make(map[string]string, len(b)) - revision.Sources = sources - - for _, source := range b { - component := source.Component - name := source.Name - sources[component] = name - } - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + revision := published.ObtainRevision() + sources := make(map[string]string, len(b)) + revision.Sources = sources + + for _, source := range b { + component := source.Component + name := source.Name + sources[component] = name + } + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -822,24 +894,33 @@ func apiPublishDropChanges(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and DropRevision happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err)) - return - } - - published.DropRevision() - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + published.DropRevision() + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -875,51 +956,58 @@ func apiPublishUpdateSource(c *gin.Context) { param := slashEscape(c.Params.ByName("prefix")) storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) - component := slashEscape(c.Params.ByName("component")) + urlComponent := slashEscape(c.Params.ByName("component")) + + // Default component to the URL path segment; the body may rename it. + b.Component = urlComponent + if c.Bind(&b) != nil { + return + } collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } - - revision := published.ObtainRevision() - sources := revision.Sources - - _, exists := sources[component] - if !exists { - AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: Component '%s' does not exist", component)) - return - } - - b.Component = component - b.Name = revision.Sources[component] - - if c.Bind(&b) != nil { - return - } - - if b.Component != component { - delete(sources, component) - } - - component = b.Component - name := b.Name - sources[component] = name - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + revision := published.ObtainRevision() + sources := revision.Sources + + _, exists := sources[urlComponent] + if !exists { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: Component '%s' does not exist", urlComponent) + } + + if b.Component != urlComponent { + delete(sources, urlComponent) + } + + newComponent := b.Component + name := b.Name + sources[newComponent] = name + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -956,33 +1044,41 @@ func apiPublishRemoveSource(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err)) - return - } - - revision := published.ObtainRevision() - sources := revision.Sources - - _, exists := sources[component] - if !exists { - AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: Component '%s' does not exist", component)) - return - } - - delete(sources, component) - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + revision := published.ObtainRevision() + sources := revision.Sources + + _, exists := sources[component] + if !exists { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: Component '%s' does not exist", component) + } + + delete(sources, component) + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -1054,64 +1150,104 @@ func apiPublishUpdate(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and field mutations happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } - - if b.SkipContents != nil { - published.SkipContents = *b.SkipContents - } - - if b.SkipBz2 != nil { - published.SkipBz2 = *b.SkipBz2 - } - - if b.AcquireByHash != nil { - published.AcquireByHash = *b.AcquireByHash - } - - if b.SignedBy != nil { - published.SignedBy = *b.SignedBy - } - - if b.MultiDist != nil { - published.MultiDist = *b.MultiDist - } - - if b.Label != nil { - published.Label = *b.Label - } - - if b.Origin != nil { - published.Origin = *b.Origin - } - - if b.Version != nil { - published.Version = *b.Version - } - resources := []string{string(published.Key())} + + // Non-MultiDist distributions share a single pool/ directory under the + // prefix. Acquire the prefix-level pool lock so that concurrent updates + // on sibling distributions are serialised and cannot race during cleanup. + if !published.MultiDist { + resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix())) + } + + // Lock source repos / snapshots the same way apiPublishUpdateSwitch does, + // because published.Update() reads from them and concurrent modification + // would produce an inconsistent view. + snapshotCollection := collectionFactory.SnapshotCollection() + localRepoCollection := collectionFactory.LocalRepoCollection() + + if published.SourceKind == deb.SourceLocalRepo { + for _, uuid := range published.Sources { + repo, err2 := localRepoCollection.ByUUID(uuid) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(repo.Key())) + } + } else if published.SourceKind == deb.SourceSnapshot { + for _, uuid := range published.Sources { + snapshot, err2 := snapshotCollection.ByUUID(uuid) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(snapshot.Key())) + } + } + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - result, err := published.Update(collectionFactory, out) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) + err = taskCollection.LoadComplete(published, taskCollectionFactory) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = collection.Update(published) + // Capture MultiDist before mutations to detect a false→true transition. + prevMultiDist := published.MultiDist + + // Apply field mutations on the freshly loaded object. + if b.SkipContents != nil { + published.SkipContents = *b.SkipContents + } + if b.SkipBz2 != nil { + published.SkipBz2 = *b.SkipBz2 + } + if b.AcquireByHash != nil { + published.AcquireByHash = *b.AcquireByHash + } + if b.SignedBy != nil { + published.SignedBy = *b.SignedBy + } + if b.MultiDist != nil { + published.MultiDist = *b.MultiDist + } + if b.Label != nil { + published.Label = *b.Label + } + if b.Origin != nil { + published.Origin = *b.Origin + } + if b.Version != nil { + published.Version = *b.Version + } + + result, err := published.Update(taskCollectionFactory, out) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -1119,10 +1255,19 @@ func apiPublishUpdate(c *gin.Context) { if b.SkipCleanup == nil || !*b.SkipCleanup { cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources)) cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...) - err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out) + err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } + // When MultiDist is toggled, the old pool layout still has files that + // CleanupPrefixComponentFiles won't touch (it only scans the new layout). + // Run a second pass over the previous layout to remove stale files. + if prevMultiDist != published.MultiDist { + err = taskCollection.CleanupAfterMultiDistToggle(context, published, prevMultiDist, cleanComponents, taskCollectionFactory, out) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to clean legacy pool: %s", err) + } + } } return &task.ProcessReturnValue{Code: http.StatusOK, Value: published}, nil diff --git a/api/published_file_missing_test.go b/api/published_file_missing_test.go new file mode 100644 index 00000000..6d7b77f8 --- /dev/null +++ b/api/published_file_missing_test.go @@ -0,0 +1,733 @@ +package api + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "os" + "os/exec" + "path/filepath" + "sync" + "time" + + "github.com/aptly-dev/aptly/aptly" + ctx "github.com/aptly-dev/aptly/context" + "github.com/aptly-dev/aptly/deb" + "github.com/gin-gonic/gin" + "github.com/smira/flag" + + . "gopkg.in/check.v1" +) + +// PublishedFileMissingSuite reproduces the exact bug where: +// - Package import succeeds +// - Metadata is updated (Packages.gz shows the package) +// - Publish reports success +// - BUT the .deb file is missing from the published pool directory +// - Result: apt-get returns 404 when trying to download the package +type PublishedFileMissingSuite struct { + context *ctx.AptlyContext + flags *flag.FlagSet + configFile *os.File + router http.Handler + tempDir string + poolPath string + publicPath string +} + +var _ = Suite(&PublishedFileMissingSuite{}) + +func (s *PublishedFileMissingSuite) SetUpSuite(c *C) { + aptly.Version = "publishedFileMissingTest" + + tempDir, err := os.MkdirTemp("", "aptly-published-missing-test") + c.Assert(err, IsNil) + s.tempDir = tempDir + s.poolPath = filepath.Join(tempDir, "pool") + s.publicPath = filepath.Join(tempDir, "public") + + file, err := os.CreateTemp("", "aptly-published-missing-config") + c.Assert(err, IsNil) + s.configFile = file + + config := gin.H{ + "rootDir": tempDir, + "downloadDir": filepath.Join(tempDir, "download"), + "architectures": []string{"amd64"}, + "dependencyFollowSuggests": false, + "dependencyFollowRecommends": false, + "gpgDisableSign": true, + "gpgDisableVerify": true, + "gpgProvider": "internal", + "skipLegacyPool": true, + "enableMetricsEndpoint": false, + } + + jsonString, err := json.Marshal(config) + c.Assert(err, IsNil) + _, err = file.Write(jsonString) + c.Assert(err, IsNil) + + flags := flag.NewFlagSet("publishedFileMissingTestFlags", flag.ContinueOnError) + flags.Bool("no-lock", true, "disable database locking for test") + flags.Int("db-open-attempts", 3, "dummy") + flags.String("config", s.configFile.Name(), "config file") + flags.String("architectures", "", "dummy") + s.flags = flags + + context, err := ctx.NewContext(s.flags) + c.Assert(err, IsNil) + + s.context = context + s.router = Router(context) +} + +func (s *PublishedFileMissingSuite) TearDownSuite(c *C) { + if s.configFile != nil { + _ = os.Remove(s.configFile.Name()) + } + if s.context != nil { + s.context.Shutdown() + } + if s.tempDir != "" { + _ = os.RemoveAll(s.tempDir) + } +} + +func (s *PublishedFileMissingSuite) SetUpTest(c *C) { + collectionFactory := s.context.NewCollectionFactory() + + localRepoCollection := collectionFactory.LocalRepoCollection() + _ = localRepoCollection.ForEach(func(repo *deb.LocalRepo) error { + _ = localRepoCollection.Drop(repo) + return nil + }) + + publishedCollection := collectionFactory.PublishedRepoCollection() + _ = publishedCollection.ForEach(func(published *deb.PublishedRepo) error { + _ = publishedCollection.Remove(s.context, published.Storage, published.Prefix, + published.Distribution, collectionFactory, nil, true, true) + return nil + }) +} + +func (s *PublishedFileMissingSuite) TearDownTest(c *C) { + s.SetUpTest(c) +} + +func (s *PublishedFileMissingSuite) httpRequest(c *C, method string, url string, body []byte) *httptest.ResponseRecorder { + w := httptest.NewRecorder() + var req *http.Request + var err error + + if body != nil { + req, err = http.NewRequest(method, url, bytes.NewReader(body)) + } else { + req, err = http.NewRequest(method, url, nil) + } + c.Assert(err, IsNil) + req.Header.Add("Content-Type", "application/json") + s.router.ServeHTTP(w, req) + return w +} + +func (s *PublishedFileMissingSuite) createDebPackage(c *C, uploadID, packageName, version string) { + uploadPath := s.context.UploadPath() + uploadDir := filepath.Join(uploadPath, uploadID) + err := os.MkdirAll(uploadDir, 0755) + c.Assert(err, IsNil) + + tempDir, err := os.MkdirTemp("", "deb-build") + c.Assert(err, IsNil) + defer func() { _ = os.RemoveAll(tempDir) }() + + debianDir := filepath.Join(tempDir, "DEBIAN") + err = os.MkdirAll(debianDir, 0755) + c.Assert(err, IsNil) + + controlContent := fmt.Sprintf(`Package: %s +Version: %s +Section: libs +Priority: optional +Architecture: amd64 +Maintainer: Test +Description: Test package + Test package for published file missing bug. +`, packageName, version) + + err = os.WriteFile(filepath.Join(debianDir, "control"), []byte(controlContent), 0644) + c.Assert(err, IsNil) + + usrDir := filepath.Join(tempDir, "usr", "lib") + err = os.MkdirAll(usrDir, 0755) + c.Assert(err, IsNil) + err = os.WriteFile(filepath.Join(usrDir, "lib.so"), []byte("library"), 0644) + c.Assert(err, IsNil) + + debFile := filepath.Join(uploadDir, fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + cmd := exec.Command("dpkg-deb", "--build", tempDir, debFile) + err = cmd.Run() + c.Assert(err, IsNil) +} + +// TestPublishedFileGoMissing reproduces the exact production bug +func (s *PublishedFileMissingSuite) TestPublishedFileGoMissing(c *C) { + c.Log("=== Reproducing: Package in metadata but 404 on download ===") + + // Create and publish a repository + repoName := "test-repo" + distribution := "bullseye" + + createBody, _ := json.Marshal(gin.H{ + "Name": repoName, + "DefaultDistribution": distribution, + "DefaultComponent": "main", + }) + resp := s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create repo: %s", resp.Body.String())) + + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "local", + "Distribution": distribution, + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Component": "main", "Name": repoName}, + }, + "Signing": gin.H{"Skip": true}, + }) + resp = s.httpRequest(c, "POST", "/api/publish/hrt", publishBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to publish: %s", resp.Body.String())) + + // Create package + packageName := "hrt-libblobbyclient1" + version := "20250926.152427+hrtdeb11" + uploadID := "test-upload-1" + + s.createDebPackage(c, uploadID, packageName, version) + + // Add package + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoName, uploadID), nil) + c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package: %s", resp.Body.String())) + + // Update publish + updateBody, _ := json.Marshal(gin.H{ + "Signing": gin.H{"Skip": true}, + "ForceOverwrite": true, + }) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/hrt/%s", distribution), updateBody) + c.Assert(resp.Code, Equals, 200, Commentf("Failed to update publish: %s", resp.Body.String())) + + // Now check if the file is actually accessible in the published location + publishedStorage := s.context.GetPublishedStorage("") + publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath() + + // Expected file path: hrt/pool/main/h/hrt-libblobbyclient1/hrt-libblobbyclient1_20250926.152427+hrtdeb11_amd64.deb + expectedPath := filepath.Join(publicPath, "hrt", "pool", "main", "h", packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + c.Logf("Checking for published file at: %s", expectedPath) + + fileInfo, err := os.Stat(expectedPath) + fileExists := err == nil + + c.Logf("File exists: %v", fileExists) + if fileExists { + c.Logf("File size: %d bytes", fileInfo.Size()) + } + + // Check metadata + resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repoName), nil) + var packages []string + err = json.Unmarshal(resp.Body.Bytes(), &packages) + c.Assert(err, IsNil) + c.Logf("Packages in metadata: %d", len(packages)) + + // THE BUG: Metadata says package exists, but file is missing from published location + if len(packages) > 0 && !fileExists { + c.Logf("★★★ BUG REPRODUCED! ★★★") + c.Logf("Metadata shows %d package(s) but file is missing at: %s", len(packages), expectedPath) + c.Logf("This is exactly what causes: 404 Not Found [IP: 10.20.72.62 3142]") + + c.Fatal("BUG CONFIRMED: Package in metadata but missing from published directory!") + } + + c.Assert(fileExists, Equals, true, Commentf( + "Published file should exist at %s when package is in metadata", expectedPath)) +} + +// TestConcurrentPublishRace tries to trigger the race with concurrent publishes +func (s *PublishedFileMissingSuite) TestConcurrentPublishRace(c *C) { + c.Log("=== Testing concurrent publish race condition ===") + + const numIterations = 4 + + for iteration := 0; iteration < numIterations; iteration++ { + c.Logf("--- Iteration %d/%d ---", iteration+1, numIterations) + + // Create repo + repoName := fmt.Sprintf("race-repo-%d", iteration) + distribution := fmt.Sprintf("dist-%d", iteration) + + createBody, _ := json.Marshal(gin.H{ + "Name": repoName, + "DefaultDistribution": distribution, + "DefaultComponent": "main", + }) + resp := s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201) + + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "local", + "Distribution": distribution, + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Component": "main", "Name": repoName}, + }, + "Signing": gin.H{"Skip": true}, + }) + resp = s.httpRequest(c, "POST", "/api/publish/concurrent", publishBody) + c.Assert(resp.Code, Equals, 201) + + // Create multiple packages + var wg sync.WaitGroup + numPackages := 5 + + for i := 0; i < numPackages; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + + packageName := fmt.Sprintf("pkg-%d-%d", iteration, idx) + version := "1.0.0" + uploadID := fmt.Sprintf("upload-%d-%d", iteration, idx) + + s.createDebPackage(c, uploadID, packageName, version) + + // Add package + resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoName, uploadID), nil) + c.Logf("Package %d add: %d", idx, resp.Code) + + // Small delay + time.Sleep(time.Duration(5+idx*2) * time.Millisecond) + + // Publish + updateBody, _ := json.Marshal(gin.H{ + "Signing": gin.H{"Skip": true}, + "ForceOverwrite": true, + }) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/concurrent/%s", distribution), updateBody) + c.Logf("Publish %d: %d", idx, resp.Code) + }(i) + } + + wg.Wait() + time.Sleep(100 * time.Millisecond) + + // Check all packages + resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repoName), nil) + var packages []string + err := json.Unmarshal(resp.Body.Bytes(), &packages) + c.Assert(err, IsNil) + + // Check published files + publishedStorage := s.context.GetPublishedStorage("") + publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath() + + missingFiles := []string{} + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("pkg-%d-%d", iteration, i) + version := "1.0.0" + + // Calculate pool path + poolSubdir := string(packageName[0]) + expectedPath := filepath.Join(publicPath, "concurrent", "pool", "main", poolSubdir, packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + if _, err := os.Stat(expectedPath); os.IsNotExist(err) { + missingFiles = append(missingFiles, expectedPath) + } + } + + if len(missingFiles) > 0 { + c.Logf("★★★ BUG DETECTED in iteration %d/%d! ★★★", iteration+1, numIterations) + c.Logf("Metadata shows %d packages, but %d files are MISSING:", len(packages), len(missingFiles)) + for i, f := range missingFiles { + c.Logf(" [iter %d] File MISSING %d/%d: %s", iteration+1, i+1, len(missingFiles), f) + } + + c.Fatalf("BUG REPRODUCED in iteration %d/%d: %d published files missing", iteration+1, numIterations, len(missingFiles)) + } else { + c.Logf("[iter %d/%d] All %d files present - OK", iteration+1, numIterations, numPackages) + } + } + + c.Logf("All %d iterations passed - bug not reproduced with current timing", numIterations) +} + +// TestIdenticalPackageRace tests the specific case of identical SHA256 packages +func (s *PublishedFileMissingSuite) TestIdenticalPackageRace(c *C) { + c.Log("=== AGGRESSIVE test: identical package (same SHA256) race ===") + + const numIterations = 4 + packageName := "shared-package" + + for iter := 0; iter < numIterations; iter++ { + c.Logf("Iteration %d/%d", iter+1, numIterations) + + // Create two repos that will get the SAME package (unique per iteration) + repos := []string{fmt.Sprintf("identical-a-%d", iter), fmt.Sprintf("identical-b-%d", iter)} + dists := []string{fmt.Sprintf("dist-a-%d", iter), fmt.Sprintf("dist-b-%d", iter)} + + for i := range repos { + createBody, _ := json.Marshal(gin.H{ + "Name": repos[i], + "DefaultDistribution": dists[i], + "DefaultComponent": "main", + }) + resp := s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201) + + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "local", + "Distribution": dists[i], + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Component": "main", "Name": repos[i]}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + }) + resp = s.httpRequest(c, "POST", "/api/publish/identical", publishBody) + c.Assert(resp.Code, Equals, 201) + } + + // Create IDENTICAL package file with UNIQUE VERSION per iteration + version := fmt.Sprintf("1.0.%d", iter) + uploadID1 := fmt.Sprintf("identical-upload-1-%d", iter) + uploadID2 := fmt.Sprintf("identical-upload-2-%d", iter) + + s.createDebPackage(c, uploadID1, packageName, version) + + // Copy to second upload (same SHA256) + uploadPath := s.context.UploadPath() + src := filepath.Join(uploadPath, uploadID1, fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + destDir := filepath.Join(uploadPath, uploadID2) + err := os.MkdirAll(destDir, 0755) + c.Assert(err, IsNil) + dest := filepath.Join(destDir, fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + srcData, readErr := os.ReadFile(src) + c.Assert(readErr, IsNil) + err = os.WriteFile(dest, srcData, 0644) + c.Assert(err, IsNil) + + // Race: add and publish both simultaneously + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[0], uploadID1), nil) + updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true}) + s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody) + }() + + go func() { + defer wg.Done() + s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[1], uploadID2), nil) + updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true}) + s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody) + }() + + wg.Wait() + time.Sleep(200 * time.Millisecond) + c.Logf("[iter %d] All operations complete", iter) + + // Check the shared pool location + publishedStorage := s.context.GetPublishedStorage("") + publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath() + + poolSubdir := string(packageName[0]) + sharedPoolPath := filepath.Join(publicPath, "identical", "pool", "main", poolSubdir, packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + fileInfo, err := os.Stat(sharedPoolPath) + fileExists := err == nil + + if fileExists { + c.Logf("[iter %d] File EXISTS at %s (size: %d)", iter, sharedPoolPath, fileInfo.Size()) + } else { + c.Logf("[iter %d] File MISSING at %s (error: %v)", iter, sharedPoolPath, err) + } + + // Check metadata + var packagesA, packagesB []string + resp := s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repos[0]), nil) + err = json.Unmarshal(resp.Body.Bytes(), &packagesA) + c.Assert(err, IsNil) + resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repos[1]), nil) + err = json.Unmarshal(resp.Body.Bytes(), &packagesB) + c.Assert(err, IsNil) + + c.Logf("[iter %d] Packages in metadata: A=%d, B=%d", iter, len(packagesA), len(packagesB)) + + // THE BUG: Both repos show packages in metadata, but the shared pool file is missing + if (len(packagesA) > 0 || len(packagesB) > 0) && !fileExists { + c.Logf("★★★ BUG REPRODUCED in iteration %d! ★★★", iter+1) + c.Logf("Packages in metadata A: %d, B: %d", len(packagesA), len(packagesB)) + c.Logf("Shared pool file exists: %v", fileExists) + c.Logf("Pool path: %s", sharedPoolPath) + + // List what files ARE in the pool directory + poolDir := filepath.Dir(sharedPoolPath) + if entries, err := os.ReadDir(poolDir); err == nil { + c.Logf("Files in pool directory %s:", poolDir) + for _, entry := range entries { + c.Logf(" - %s", entry.Name()) + } + } + + c.Fatalf("Metadata shows packages but shared pool file is missing (iteration %d)", iter+1) + } + } + + c.Logf("All %d iterations passed - bug not reproduced", numIterations) +} + +// TestConcurrentSnapshotPublishToSamePrefix reproduces the EXACT production bug: +// Multiple snapshots are published concurrently to the SAME prefix but different distributions. +// Example from production logs: +// - trixie-pgdg published to "external/postgres-auto/trixie" +// - bullseye-pgdg published to "external/postgres-auto/bullseye" +// Both share the same pool directory, causing cleanup race conditions. +func (s *PublishedFileMissingSuite) TestConcurrentSnapshotPublishToSamePrefix(c *C) { + const numIterations = 4 + + for iter := 0; iter < numIterations; iter++ { + c.Logf("--- Iteration %d/%d ---", iter+1, numIterations) + + // Create two repos with different packages (simulating trixie-pgdg and bullseye-pgdg) + repoTrixie := fmt.Sprintf("trixie-pgdg-%d", iter) + repoBullseye := fmt.Sprintf("bullseye-pgdg-%d", iter) + + // Create trixie repo + createBody, _ := json.Marshal(gin.H{ + "Name": repoTrixie, + "DefaultDistribution": "trixie", + "DefaultComponent": "main", + }) + resp := s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create trixie repo")) + + // Create bullseye repo + createBody, _ = json.Marshal(gin.H{ + "Name": repoBullseye, + "DefaultDistribution": "bullseye", + "DefaultComponent": "main", + }) + resp = s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create bullseye repo")) + + // Add packages to both repos + numPackages := 3 + + // Add packages to trixie repo + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("postgresql-17-trixie-pkg%d", i) + version := fmt.Sprintf("17.0.%d", iter) + uploadID := fmt.Sprintf("trixie-upload-%d-%d", iter, i) + + s.createDebPackage(c, uploadID, packageName, version) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoTrixie, uploadID), nil) + c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package to trixie")) + } + + // Add packages to bullseye repo + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("postgresql-17-bullseye-pkg%d", i) + version := fmt.Sprintf("17.0.%d", iter) + uploadID := fmt.Sprintf("bullseye-upload-%d-%d", iter, i) + + s.createDebPackage(c, uploadID, packageName, version) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoBullseye, uploadID), nil) + c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package to bullseye")) + } + + // Create snapshots from both repos + snapshotTrixie := fmt.Sprintf("%s-snap", repoTrixie) + snapshotBullseye := fmt.Sprintf("%s-snap", repoBullseye) + + createSnapshotBody, _ := json.Marshal(gin.H{"Name": snapshotTrixie}) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/snapshots", repoTrixie), createSnapshotBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create trixie snapshot")) + + createSnapshotBody, _ = json.Marshal(gin.H{"Name": snapshotBullseye}) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/snapshots", repoBullseye), createSnapshotBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create bullseye snapshot")) + + // Publish both snapshots CONCURRENTLY to the SAME prefix + // This mimics production where both are published to "external/postgres-auto" + // Use the SAME prefix across all iterations to trigger the race more aggressively + sharedPrefix := "postgres-auto" + + var wg sync.WaitGroup + var trixiePublishCode, bullseyePublishCode int + + wg.Add(2) + + // Publish or update trixie snapshot + go func() { + defer wg.Done() + + var resp *httptest.ResponseRecorder + if iter == 0 { + // First iteration: CREATE + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "snapshot", + "Distribution": "trixie", + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Name": snapshotTrixie}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + "ForceOverwrite": true, + "SkipCleanup": false, // Force cleanup to run + }) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/publish/%s", sharedPrefix), publishBody) + } else { + // Subsequent iterations: UPDATE (this is what happens in production) + updateBody, _ := json.Marshal(gin.H{ + "Snapshots": []gin.H{ + {"Component": "main", "Name": snapshotTrixie}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + "ForceOverwrite": true, + "SkipCleanup": false, + }) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/%s/trixie", sharedPrefix), updateBody) + } + trixiePublishCode = resp.Code + c.Logf("[iter %d] Trixie publish/update completed: %d", iter, resp.Code) + }() + + // Publish or update bullseye snapshot + go func() { + defer wg.Done() + + var resp *httptest.ResponseRecorder + if iter == 0 { + // First iteration: CREATE + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "snapshot", + "Distribution": "bullseye", + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Name": snapshotBullseye}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + "ForceOverwrite": true, + "SkipCleanup": false, + }) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/publish/%s", sharedPrefix), publishBody) + } else { + // Subsequent iterations: UPDATE + updateBody, _ := json.Marshal(gin.H{ + "Snapshots": []gin.H{ + {"Component": "main", "Name": snapshotBullseye}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + "ForceOverwrite": true, + "SkipCleanup": false, + }) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/%s/bullseye", sharedPrefix), updateBody) + } + bullseyePublishCode = resp.Code + c.Logf("[iter %d] Bullseye publish/update completed: %d", iter, resp.Code) + }() + + wg.Wait() + time.Sleep(50 * time.Millisecond) + + // Verify publishes succeeded (201 for create, 200 for update) + expectedCode := 201 + if iter > 0 { + expectedCode = 200 + } + c.Assert(trixiePublishCode, Equals, expectedCode, Commentf("Trixie publish/update should succeed")) + c.Assert(bullseyePublishCode, Equals, expectedCode, Commentf("Bullseye publish/update should succeed")) + + // Verify ALL package files exist in the published pool + publishedStorage := s.context.GetPublishedStorage("") + publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath() + + missingFiles := []string{} + expectedFiles := []string{} + + // Check trixie packages + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("postgresql-17-trixie-pkg%d", i) + version := fmt.Sprintf("17.0.%d", iter) + + poolSubdir := string(packageName[0]) + expectedPath := filepath.Join(publicPath, sharedPrefix, "pool", "main", poolSubdir, packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + expectedFiles = append(expectedFiles, expectedPath) + if _, err := os.Stat(expectedPath); os.IsNotExist(err) { + missingFiles = append(missingFiles, fmt.Sprintf("TRIXIE: %s", filepath.Base(expectedPath))) + } + } + + // Check bullseye packages + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("postgresql-17-bullseye-pkg%d", i) + version := fmt.Sprintf("17.0.%d", iter) + + poolSubdir := string(packageName[0]) + expectedPath := filepath.Join(publicPath, sharedPrefix, "pool", "main", poolSubdir, packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + expectedFiles = append(expectedFiles, expectedPath) + if _, err := os.Stat(expectedPath); os.IsNotExist(err) { + missingFiles = append(missingFiles, fmt.Sprintf("BULLSEYE: %s", filepath.Base(expectedPath))) + } + } + + // BUG: Files from one distribution are deleted by the other's cleanup + if len(missingFiles) > 0 { + c.Logf("★★★ BUG REPRODUCED in iteration %d/%d! ★★★", iter+1, numIterations) + c.Logf("Both publishes to prefix '%s' succeeded, but %d files are MISSING:", sharedPrefix, len(missingFiles)) + for i, f := range missingFiles { + c.Logf(" Missing file %d/%d: %s", i+1, len(missingFiles), f) + } + + c.Logf("\nThis reproduces the exact production bug where:") + c.Logf(" 1. Mirror updates complete successfully") + c.Logf(" 2. Snapshots are created") + c.Logf(" 3. Both snapshots publish to same prefix (different distributions)") + c.Logf(" 4. Cleanup from one publish DELETES files from the other") + c.Logf(" 5. Result: apt-get returns 404 when downloading packages") + + // List what's actually in the pool + poolDir := filepath.Join(publicPath, sharedPrefix, "pool", "main") + if entries, err := os.ReadDir(poolDir); err == nil { + c.Logf("\nActual pool directory contents (%s):", poolDir) + for _, entry := range entries { + c.Logf(" - %s/", entry.Name()) + } + } + + c.Fatalf("BUG CONFIRMED (iteration %d/%d): %d files missing from shared pool", + iter+1, numIterations, len(missingFiles)) + } else { + c.Logf("[iter %d/%d] All %d files present - OK", iter+1, numIterations, len(expectedFiles)) + } + } + c.Logf("✓ All %d iterations passed - no files missing", numIterations) +} diff --git a/api/repos.go b/api/repos.go index 1d6dfae3..fb002039 100644 --- a/api/repos.go +++ b/api/repos.go @@ -131,46 +131,62 @@ func apiReposCreate(c *gin.Context) { return } - repo := deb.NewLocalRepo(b.Name, b.Comment) - repo.DefaultComponent = b.DefaultComponent - repo.DefaultDistribution = b.DefaultDistribution - + // Handler: Pre-task validations (shallow) collectionFactory := context.NewCollectionFactory() + var resources []string if b.FromSnapshot != "" { - var snapshot *deb.Snapshot - - snapshotCollection := collectionFactory.SnapshotCollection() - - snapshot, err := snapshotCollection.ByName(b.FromSnapshot) + snapshot, err := collectionFactory.SnapshotCollection().ByName(b.FromSnapshot) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("source snapshot not found: %s", err)) return } + resources = append(resources, string(snapshot.Key())) + } - err = snapshotCollection.LoadComplete(snapshot) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to load source snapshot: %s", err)) - return + taskName := fmt.Sprintf("Create repository %s", b.Name) + + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + // Task: Create fresh collection and check/create ATOMIC inside task + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Check duplicate inside lock + if _, err := taskCollection.ByName(b.Name); err == nil { + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, + fmt.Errorf("local repo with name %s already exists", b.Name) } - repo.UpdateRefList(snapshot.RefList()) - } + // Create repo + repo := deb.NewLocalRepo(b.Name, b.Comment) + repo.DefaultComponent = b.DefaultComponent + repo.DefaultDistribution = b.DefaultDistribution - localRepoCollection := collectionFactory.LocalRepoCollection() + if b.FromSnapshot != "" { + snapshotCollection := taskCollectionFactory.SnapshotCollection() - if _, err := localRepoCollection.ByName(b.Name); err == nil { - AbortWithJSONError(c, http.StatusConflict, fmt.Errorf("local repo with name %s already exists", b.Name)) - return - } + snapshot, err := snapshotCollection.ByName(b.FromSnapshot) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, + fmt.Errorf("source snapshot not found: %s", err) + } - err := localRepoCollection.Add(repo) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, err) - return - } + err = snapshotCollection.LoadComplete(snapshot) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, + fmt.Errorf("unable to load source snapshot: %s", err) + } - c.JSON(http.StatusCreated, repo) + repo.UpdateRefList(snapshot.RefList()) + } + + err := taskCollection.Add(repo) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + return &task.ProcessReturnValue{Code: http.StatusCreated, Value: repo}, nil + }) } type reposEditParams struct { @@ -201,6 +217,8 @@ func apiReposEdit(c *gin.Context) { return } + // Load shallowly for 404 check and resource key. + // Mutation and duplicate check happen inside the task for atomicity. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() @@ -212,31 +230,53 @@ func apiReposEdit(c *gin.Context) { } if b.Name != nil && *b.Name != name { - _, err := collection.ByName(*b.Name) - if err == nil { - // already exists - AbortWithJSONError(c, 404, fmt.Errorf("local repo with name %q already exists", *b.Name)) + if _, err = collection.ByName(*b.Name); err == nil { + AbortWithJSONError(c, 409, fmt.Errorf("unable to rename: local repo %q already exists", *b.Name)) return } - repo.Name = *b.Name - } - if b.Comment != nil { - repo.Comment = *b.Comment - } - if b.DefaultDistribution != nil { - repo.DefaultDistribution = *b.DefaultDistribution - } - if b.DefaultComponent != nil { - repo.DefaultComponent = *b.DefaultComponent } - err = collection.Update(repo) - if err != nil { - AbortWithJSONError(c, 500, err) - return - } + resources := []string{string(repo.Key())} + taskName := fmt.Sprintf("Edit repository %s", name) - c.JSON(200, repo) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + // Task: Create fresh collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Fresh load after lock acquired + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err + } + + // Check and update ATOMIC (inside lock) + if b.Name != nil && *b.Name != name { + _, err := taskCollection.ByName(*b.Name) + if err == nil { + // already exists + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, + fmt.Errorf("local repo with name %q already exists", *b.Name) + } + repo.Name = *b.Name + } + if b.Comment != nil { + repo.Comment = *b.Comment + } + if b.DefaultDistribution != nil { + repo.DefaultDistribution = *b.DefaultDistribution + } + if b.DefaultComponent != nil { + repo.DefaultComponent = *b.DefaultComponent + } + + err = taskCollection.Update(repo) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + return &task.ProcessReturnValue{Code: http.StatusOK, Value: repo}, nil + }) } // GET /api/repos/:name @@ -278,10 +318,10 @@ func apiReposDrop(c *gin.Context) { force := c.Request.URL.Query().Get("force") == "1" name := c.Params.ByName("name") + // Load shallowly for 404 check, resource key, and task name. + // Full checks (published/snapshots) happen inside the task. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() - snapshotCollection := collectionFactory.SnapshotCollection() - publishedCollection := collectionFactory.PublishedRepoCollection() repo, err := collection.ByName(name) if err != nil { @@ -292,19 +332,32 @@ func apiReposDrop(c *gin.Context) { resources := []string{string(repo.Key())} taskName := fmt.Sprintf("Delete repo %s", name) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - published := publishedCollection.ByLocalRepo(repo) + // Task: Create fresh collections inside task after lock acquired + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection() + + // Re-read repo with fresh collection after lock + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: %s", err) + } + + // Check with fresh collections + published := taskPublishedCollection.ByLocalRepo(repo) if len(published) > 0 { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo is published") } if !force { - snapshots := snapshotCollection.ByLocalRepoSource(repo) + snapshots := taskSnapshotCollection.ByLocalRepoSource(repo) if len(snapshots) > 0 { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop, local repo has snapshots, use ?force=1 to override") } } - return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, collection.Drop(repo) + return &task.ProcessReturnValue{Code: http.StatusOK, Value: gin.H{}}, taskCollection.Drop(repo) }) } @@ -361,10 +414,13 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li return } + // Load shallowly for 404 check and resource key. + // Full load and mutations happen inside the task. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() - repo, err := collection.ByName(c.Params.ByName("name")) + name := c.Params.ByName("name") + repo, err := collection.ByName(name) if err != nil { AbortWithJSONError(c, 404, err) return @@ -373,13 +429,23 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li resources := []string{string(repo.Key())} maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.LoadComplete(repo) + // Task: Create fresh factory and collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Fresh load after lock acquired (use captured `name` variable, not gin context) + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err + } + + err = taskCollection.LoadComplete(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } out.Printf("Loading packages...\n") - list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil) + list, err := deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -388,7 +454,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li for _, ref := range b.PackageRefs { var p *deb.Package - p, err = collectionFactory.PackageCollection().ByKey([]byte(ref)) + p, err = taskCollectionFactory.PackageCollection().ByKey([]byte(ref)) if err != nil { if err == database.ErrNotFound { return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("packages %s: %s", ref, err) @@ -404,7 +470,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list)) - err = collectionFactory.LocalRepoCollection().Update(repo) + err = taskCollection.Update(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) } @@ -511,6 +577,8 @@ func apiReposPackageFromDir(c *gin.Context) { return } + // Load shallowly for 404 check and resource key. + // Full load and mutations happen inside the task. collectionFactory := context.NewCollectionFactory() collection := collectionFactory.LocalRepoCollection() @@ -534,7 +602,17 @@ func apiReposPackageFromDir(c *gin.Context) { resources := []string{string(repo.Key())} resources = append(resources, sources...) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.LoadComplete(repo) + // Task: Create fresh factory and collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.LocalRepoCollection() + + // Fresh load after lock acquired + repo, err := taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + err = taskCollection.LoadComplete(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -555,13 +633,13 @@ func apiReposPackageFromDir(c *gin.Context) { packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter) - list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil) + list, err = deb.NewPackageListFromRefList(repo.RefList(), taskCollectionFactory.PackageCollection(), nil) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages: %s", err) } processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(), - collectionFactory.PackageCollection(), reporter, nil, collectionFactory.ChecksumCollection) + taskCollectionFactory.PackageCollection(), reporter, nil, taskCollectionFactory.ChecksumCollection) failedFiles = append(failedFiles, failedFiles2...) processedFiles = append(processedFiles, otherFiles...) @@ -571,7 +649,7 @@ func apiReposPackageFromDir(c *gin.Context) { repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list)) - err = collectionFactory.LocalRepoCollection().Update(repo) + err = taskCollection.Update(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) } @@ -650,6 +728,8 @@ func apiReposCopyPackage(c *gin.Context) { return } + // Load shallowly for 404 check and resource keys. + // Full load and mutations happen inside the task. collectionFactory := context.NewCollectionFactory() dstRepo, err := collectionFactory.LocalRepoCollection().ByName(dstRepoName) if err != nil { @@ -673,12 +753,26 @@ func apiReposCopyPackage(c *gin.Context) { resources := []string{string(dstRepo.Key()), string(srcRepo.Key())} maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo) + // Task: Create fresh factory and collections inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + + // Fresh load of both repos after lock acquired + dstRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(dstRepoName) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err) } - err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo) + srcRepo, err := taskCollectionFactory.LocalRepoCollection().ByName(srcRepoName) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err) + } + + err = taskCollectionFactory.LocalRepoCollection().LoadComplete(dstRepo) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err) + } + + err = taskCollectionFactory.LocalRepoCollection().LoadComplete(srcRepo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err) } @@ -691,12 +785,12 @@ func apiReposCopyPackage(c *gin.Context) { RemovedLines: []string{}, } - dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), collectionFactory.PackageCollection(), context.Progress()) + dstList, err := deb.NewPackageListFromRefList(dstRepo.RefList(), taskCollectionFactory.PackageCollection(), context.Progress()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in dest: %s", err) } - srcList, err := deb.NewPackageListFromRefList(srcRefList, collectionFactory.PackageCollection(), context.Progress()) + srcList, err := deb.NewPackageListFromRefList(srcRefList, taskCollectionFactory.PackageCollection(), context.Progress()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to load packages in src: %s", err) } @@ -764,7 +858,7 @@ func apiReposCopyPackage(c *gin.Context) { } else { dstRepo.UpdateRefList(deb.NewPackageRefListFromPackageList(dstList)) - err = collectionFactory.LocalRepoCollection().Update(dstRepo) + err = taskCollectionFactory.LocalRepoCollection().Update(dstRepo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err) } @@ -867,6 +961,9 @@ func apiReposIncludePackageFromDir(c *gin.Context) { resources = append(resources, sources...) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + // Task: Create fresh factory and collection inside task after lock + taskCollectionFactory := context.NewCollectionFactory() + var ( err error verifier = context.GetVerifier() @@ -882,8 +979,8 @@ func apiReposIncludePackageFromDir(c *gin.Context) { changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter) _, failedFiles2, err = deb.ImportChangesFiles( changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier, - repoTemplate, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(), - context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse) + repoTemplate, context.Progress(), taskCollectionFactory.LocalRepoCollection(), taskCollectionFactory.PackageCollection(), + context.PackagePool(), taskCollectionFactory.ChecksumCollection, nil, query.Parse) failedFiles = append(failedFiles, failedFiles2...) if err != nil { diff --git a/api/snapshot.go b/api/snapshot.go index 2c50566f..7b6422f0 100644 --- a/api/snapshot.go +++ b/api/snapshot.go @@ -83,26 +83,33 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) { } collectionFactory := context.NewCollectionFactory() - collection := collectionFactory.RemoteRepoCollection() - snapshotCollection := collectionFactory.SnapshotCollection() name := c.Params.ByName("name") - repo, err = collection.ByName(name) + repo, err = collectionFactory.RemoteRepoCollection().ByName(name) if err != nil { AbortWithJSONError(c, 404, err) return } // including snapshot resource key - resources := []string{string(repo.Key()), "S" + b.Name} + resources := []string{string(repo.Key())} taskName := fmt.Sprintf("Create snapshot of mirror %s", name) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err := repo.CheckLock() + taskCollectionFactory := context.NewCollectionFactory() + taskMirrorCollection := taskCollectionFactory.RemoteRepoCollection() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + + repo, err := taskMirrorCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + err = repo.CheckLock() if err != nil { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, err } - err = collection.LoadComplete(repo) + err = taskMirrorCollection.LoadComplete(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -116,7 +123,7 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) { snapshot.Description = b.Description } - err = snapshotCollection.Add(snapshot) + err = taskSnapshotCollection.Add(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err } @@ -165,6 +172,7 @@ func apiSnapshotsCreate(c *gin.Context) { } } + // Phase 1: Pre-task validation (shallow load for 404 checks only) collectionFactory := context.NewCollectionFactory() snapshotCollection := collectionFactory.SnapshotCollection() var resources []string @@ -178,37 +186,62 @@ func apiSnapshotsCreate(c *gin.Context) { return } - resources = append(resources, string(sources[i].ResourceKey())) + resources = append(resources, string(sources[i].Key())) } maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - for i := range sources { - err = snapshotCollection.LoadComplete(sources[i]) + // Phase 2: Inside task lock - create fresh factory + taskCollectionFactory := context.NewCollectionFactory() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + taskPackageCollection := taskCollectionFactory.PackageCollection() + + // Fresh load of all sources after lock acquired + freshSources := make([]*deb.Snapshot, len(b.SourceSnapshots)) + for i := range b.SourceSnapshots { + freshSources[i], err = taskSnapshotCollection.ByName(b.SourceSnapshots[i]) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + // LoadComplete on fresh copy + err = taskSnapshotCollection.LoadComplete(freshSources[i]) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } } - list := deb.NewPackageList() + // Merge packages from all source snapshots + var refList *deb.PackageRefList + if len(freshSources) > 0 { + refList = freshSources[0].RefList() + for i := 1; i < len(freshSources); i++ { + refList = refList.Merge(freshSources[i].RefList(), true, false) + } + } else { + refList = deb.NewPackageRefList() + } - // verify package refs and build package list - for _, ref := range b.PackageRefs { - p, err := collectionFactory.PackageCollection().ByKey([]byte(ref)) - if err != nil { - if err == database.ErrNotFound { - return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err) + // Add any explicitly specified package refs on top + if len(b.PackageRefs) > 0 { + list := deb.NewPackageList() + for _, ref := range b.PackageRefs { + p, err := taskPackageCollection.ByKey([]byte(ref)) + if err != nil { + if err == database.ErrNotFound { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("package %s: %s", ref, err) + } + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + err = list.Add(p) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err } - return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err - } - err = list.Add(p) - if err != nil { - return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err } + refList = refList.Merge(deb.NewPackageRefListFromPackageList(list), true, false) } - snapshot = deb.NewSnapshotFromRefList(b.Name, sources, deb.NewPackageRefListFromPackageList(list), b.Description) + snapshot = deb.NewSnapshotFromRefList(b.Name, freshSources, refList, b.Description) - err = snapshotCollection.Add(snapshot) + err = taskSnapshotCollection.Add(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err } @@ -249,21 +282,28 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) { } collectionFactory := context.NewCollectionFactory() - collection := collectionFactory.LocalRepoCollection() - snapshotCollection := collectionFactory.SnapshotCollection() name := c.Params.ByName("name") - repo, err = collection.ByName(name) + repo, err = collectionFactory.LocalRepoCollection().ByName(name) if err != nil { AbortWithJSONError(c, 404, err) return } // including snapshot resource key - resources := []string{string(repo.Key()), "S" + b.Name} + resources := []string{string(repo.Key())} taskName := fmt.Sprintf("Create snapshot of repo %s", name) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err := collection.LoadComplete(repo) + taskCollectionFactory := context.NewCollectionFactory() + taskRepoCollection := taskCollectionFactory.LocalRepoCollection() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + + repo, err := taskRepoCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + err = taskRepoCollection.LoadComplete(repo) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -277,7 +317,7 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) { snapshot.Description = b.Description } - err = snapshotCollection.Add(snapshot) + err = taskSnapshotCollection.Add(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, err } @@ -315,6 +355,7 @@ func apiSnapshotsUpdate(c *gin.Context) { return } + // Phase 1: Pre-task validation (shallow load for 404 check only) collectionFactory := context.NewCollectionFactory() collection := collectionFactory.SnapshotCollection() name := c.Params.ByName("name") @@ -325,14 +366,38 @@ func apiSnapshotsUpdate(c *gin.Context) { return } - resources := []string{string(snapshot.ResourceKey()), "S" + b.Name} - taskName := fmt.Sprintf("Update snapshot %s", name) - maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - _, err := collection.ByName(b.Name) + // Pre-task validation of new name if provided (skip if renaming to same name) + if b.Name != "" && b.Name != name { + _, err = collection.ByName(b.Name) if err == nil { - return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name) + AbortWithJSONError(c, 409, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name)) + return + } + } + + resources := []string{string(snapshot.Key())} + taskName := fmt.Sprintf("Update snapshot %s", name) + + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { + // Phase 2: Inside task lock - create fresh factory + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.SnapshotCollection() + + // Fresh load after lock acquired + snapshot, err = taskCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } + // Fresh duplicate check inside lock + if b.Name != "" { + _, err := taskCollection.ByName(b.Name) + if err == nil { + return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to rename: snapshot %s already exists", b.Name) + } + } + + // Update fresh copy if b.Name != "" { snapshot.Name = b.Name } @@ -341,7 +406,7 @@ func apiSnapshotsUpdate(c *gin.Context) { snapshot.Description = b.Description } - err = collectionFactory.SnapshotCollection().Update(snapshot) + err = taskCollection.Update(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -395,9 +460,9 @@ func apiSnapshotsDrop(c *gin.Context) { name := c.Params.ByName("name") force := c.Request.URL.Query().Get("force") == "1" + // Phase 1: Pre-task validation (shallow load for 404 check only) collectionFactory := context.NewCollectionFactory() snapshotCollection := collectionFactory.SnapshotCollection() - publishedCollection := collectionFactory.PublishedRepoCollection() snapshot, err := snapshotCollection.ByName(name) if err != nil { @@ -405,23 +470,37 @@ func apiSnapshotsDrop(c *gin.Context) { return } - resources := []string{string(snapshot.ResourceKey())} + resources := []string{string(snapshot.Key())} taskName := fmt.Sprintf("Delete snapshot %s", name) + maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - published := publishedCollection.BySnapshot(snapshot) + // Phase 2: Inside task lock - create fresh collections + taskCollectionFactory := context.NewCollectionFactory() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + taskPublishedCollection := taskCollectionFactory.PublishedRepoCollection() + + // Fresh load after lock acquired + snapshot, err := taskSnapshotCollection.ByName(name) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + + // Fresh checks with current collections + published := taskPublishedCollection.BySnapshot(snapshot) if len(published) > 0 { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("unable to drop: snapshot is published") } if !force { - snapshots := snapshotCollection.BySnapshotSource(snapshot) + // Using fresh collection for dependency check + snapshots := taskSnapshotCollection.BySnapshotSource(snapshot) if len(snapshots) > 0 { return &task.ProcessReturnValue{Code: http.StatusConflict, Value: nil}, fmt.Errorf("won't delete snapshot that was used as source for other snapshots, use ?force=1 to override") } } - err = snapshotCollection.Drop(snapshot) + err = taskSnapshotCollection.Drop(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -576,6 +655,7 @@ func apiSnapshotsMerge(c *gin.Context) { return } + // Phase 1: Pre-task validation (shallow load for 404 checks only) collectionFactory := context.NewCollectionFactory() snapshotCollection := collectionFactory.SnapshotCollection() @@ -588,36 +668,47 @@ func apiSnapshotsMerge(c *gin.Context) { return } - resources[i] = string(sources[i].ResourceKey()) + resources[i] = string(sources[i].Key()) } maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = snapshotCollection.LoadComplete(sources[0]) - if err != nil { - return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err - } - result := sources[0].RefList() - for i := 1; i < len(sources); i++ { - err = snapshotCollection.LoadComplete(sources[i]) + // Phase 2: Inside task lock - create fresh factory + taskCollectionFactory := context.NewCollectionFactory() + taskSnapshotCollection := taskCollectionFactory.SnapshotCollection() + + // Fresh load of all sources inside task + freshSources := make([]*deb.Snapshot, len(body.Sources)) + for i := range body.Sources { + freshSources[i], err = taskSnapshotCollection.ByName(body.Sources[i]) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } - result = result.Merge(sources[i].RefList(), overrideMatching, false) + // LoadComplete on fresh copy + err = taskSnapshotCollection.LoadComplete(freshSources[i]) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + } + + // Merge using fresh sources + result := freshSources[0].RefList() + for i := 1; i < len(freshSources); i++ { + result = result.Merge(freshSources[i].RefList(), overrideMatching, false) } if latest { result.FilterLatestRefs() } - sourceDescription := make([]string, len(sources)) - for i, s := range sources { + sourceDescription := make([]string, len(freshSources)) + for i, s := range freshSources { sourceDescription[i] = fmt.Sprintf("'%s'", s.Name) } - snapshot = deb.NewSnapshotFromRefList(name, sources, result, + snapshot = deb.NewSnapshotFromRefList(name, freshSources, result, fmt.Sprintf("Merged from sources: %s", strings.Join(sourceDescription, ", "))) - err = collectionFactory.SnapshotCollection().Add(snapshot) + err = taskCollectionFactory.SnapshotCollection().Add(snapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create snapshot: %s", err) } @@ -698,24 +789,32 @@ func apiSnapshotsPull(c *gin.Context) { return } - resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())} + resources := []string{string(sourceSnapshot.Key()), string(toSnapshot.Key())} taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collectionFactory.SnapshotCollection().LoadComplete(toSnapshot) + // Phase 2: Inside task lock - create fresh factory + taskCollectionFactory := context.NewCollectionFactory() + + // Fresh load of snapshots after lock acquired + freshToSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(name) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } - err = collectionFactory.SnapshotCollection().LoadComplete(sourceSnapshot) + freshSourceSnapshot, err := taskCollectionFactory.SnapshotCollection().ByName(body.Source) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err + } + err = taskCollectionFactory.SnapshotCollection().LoadComplete(freshSourceSnapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } // convert snapshots to package list - toPackageList, err := deb.NewPackageListFromRefList(toSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress()) + toPackageList, err := deb.NewPackageListFromRefList(freshToSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } - sourcePackageList, err := deb.NewPackageListFromRefList(sourceSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress()) + sourcePackageList, err := deb.NewPackageListFromRefList(freshSourceSnapshot.RefList(), taskCollectionFactory.PackageCollection(), context.Progress()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } @@ -812,10 +911,10 @@ func apiSnapshotsPull(c *gin.Context) { } // Create snapshot - destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{toSnapshot, sourceSnapshot}, toPackageList, - fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", toSnapshot.Name, sourceSnapshot.Name, strings.Join(body.Queries, ", "))) + destinationSnapshot = deb.NewSnapshotFromPackageList(body.Destination, []*deb.Snapshot{freshToSnapshot, freshSourceSnapshot}, toPackageList, + fmt.Sprintf("Pulled into '%s' with '%s' as source, pull request was: '%s'", freshToSnapshot.Name, freshSourceSnapshot.Name, strings.Join(body.Queries, ", "))) - err = collectionFactory.SnapshotCollection().Add(destinationSnapshot) + err = taskCollectionFactory.SnapshotCollection().Add(destinationSnapshot) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err } diff --git a/deb/publish.go b/deb/publish.go index cc4c2a0f..a17dae81 100644 --- a/deb/publish.go +++ b/deb/publish.go @@ -612,6 +612,15 @@ func (p *PublishedRepo) Key() []byte { return []byte("U" + p.StoragePrefix() + ">>" + p.Distribution) } +// PrefixPoolLockKey returns the task-queue resource key that serialises all +// publish operations sharing the same pool directory under storagePrefix. +// It must be held whenever a non-MultiDist publish may read or clean the +// shared pool, to prevent concurrent cleanup runs from deleting each other's +// files. See docs/Resource-Locking.md for the full key-namespace table. +func PrefixPoolLockKey(storagePrefix string) string { + return "P" + storagePrefix +} + // RefKey is a unique id for package reference list func (p *PublishedRepo) RefKey(component string) []byte { return []byte("E" + p.UUID + component) @@ -1589,6 +1598,52 @@ func (collection *PublishedRepoCollection) listReferencedFilesByComponent(prefix return referencedFiles, nil } +// CleanupAfterMultiDistToggle cleans up stale pool files left behind when the +// MultiDist flag is toggled on a published repository. +// +// - false→true: Publish() wrote packages into pool/// +// but the old flat pool// files were not removed because +// CleanupPrefixComponentFiles only scans the new MultiDist tree. +// A second pass with MultiDist=false cleans the legacy flat layout by +// reusing the existing orphan-detection logic (the repo is now MultiDist=true +// so it is excluded from the referenced-files scan, making its old pool +// entries appear orphaned). +// +// - true→false: Publish() wrote packages into pool// but the old +// per-distribution pool/// directories were not +// removed. The orphan-detection approach cannot be used here because the +// repo's RefList still contains all packages (they just moved locations). +// Instead we directly remove each pool/// directory. +// This is safe because per-distribution pool dirs are exclusive to a single +// prefix+distribution combination — no other published repo can share them. +func (collection *PublishedRepoCollection) CleanupAfterMultiDistToggle(publishedStorageProvider aptly.PublishedStorageProvider, + published *PublishedRepo, prevMultiDist bool, cleanComponents []string, collectionFactory *CollectionFactory, progress aptly.Progress) error { + if prevMultiDist == published.MultiDist { + return nil + } + + if !prevMultiDist && published.MultiDist { + // false→true: use orphan-detection via the existing cleanup, but with + // MultiDist temporarily set to false so it scans the flat pool layout. + legacy := *published + legacy.MultiDist = false + return collection.CleanupPrefixComponentFiles(publishedStorageProvider, &legacy, cleanComponents, collectionFactory, progress) + } + + // true→false: directly remove the per-distribution pool directories. + publishedStorage := publishedStorageProvider.GetPublishedStorage(published.Storage) + for _, component := range cleanComponents { + poolDir := filepath.Join(published.Prefix, "pool", published.Distribution, component) + if err := publishedStorage.RemoveDirs(poolDir, progress); err != nil { + return err + } + } + // Remove the distribution-level pool dir if it is now empty. + distPoolDir := filepath.Join(published.Prefix, "pool", published.Distribution) + _ = publishedStorage.RemoveDirs(distPoolDir, progress) + return nil +} + // CleanupPrefixComponentFiles removes all unreferenced files in published storage under prefix/component pair func (collection *PublishedRepoCollection) CleanupPrefixComponentFiles(publishedStorageProvider aptly.PublishedStorageProvider, published *PublishedRepo, cleanComponents []string, collectionFactory *CollectionFactory, progress aptly.Progress) error { diff --git a/deb/publish_test.go b/deb/publish_test.go index 125397b2..933f9ef6 100644 --- a/deb/publish_test.go +++ b/deb/publish_test.go @@ -873,7 +873,10 @@ func (s *PublishedRepoCollectionSuite) TestListReferencedFiles(c *C) { snap3 := NewSnapshotFromRefList("snap3", []*Snapshot{}, s.snap2.RefList(), "desc3") _ = s.snapshotCollection.Add(snap3) - // Ensure that adding a second publish point with matching files doesn't give duplicate results. + // When a second publish point references the same package (snap3 is a clone of snap2, + // both containing p3/lonely-strangers), listReferencedFilesByComponent deduplicates by + // package ref so the file appears only once. StrSlicesSubstract handles a single entry + // correctly, so no duplicate is needed for cleanup safety. repo3, err := NewPublishedRepo("", "", "anaconda-2", []string{}, []string{"main"}, []interface{}{snap3}, s.factory, false) c.Check(err, IsNil) c.Check(s.collection.Add(repo3), IsNil) @@ -888,7 +891,9 @@ func (s *PublishedRepoCollectionSuite) TestListReferencedFiles(c *C) { "a/alien-arena/alien-arena-common_7.40-2_i386.deb", "a/alien-arena/mars-invaders_7.40-2_i386.deb", }, - "main": {"a/alien-arena/lonely-strangers_7.40-2_i386.deb"}, + "main": { + "a/alien-arena/lonely-strangers_7.40-2_i386.deb", + }, }) } diff --git a/deb/snapshot.go b/deb/snapshot.go index f79a50d2..c308403c 100644 --- a/deb/snapshot.go +++ b/deb/snapshot.go @@ -143,12 +143,6 @@ func (s *Snapshot) Key() []byte { return []byte("S" + s.UUID) } -// ResourceKey is a unique identifier of the resource -// this snapshot uses. Instead of uuid it uses name -// which needs to be unique as well. -func (s *Snapshot) ResourceKey() []byte { - return []byte("S" + s.Name) -} // RefKey is a unique id for package reference list func (s *Snapshot) RefKey() []byte { diff --git a/files/linkfrompool_concurrency_test.go b/files/linkfrompool_concurrency_test.go deleted file mode 100644 index 0acbab3f..00000000 --- a/files/linkfrompool_concurrency_test.go +++ /dev/null @@ -1,283 +0,0 @@ -package files - -import ( - "fmt" - "os" - "path/filepath" - "sync" - "time" - - "github.com/aptly-dev/aptly/aptly" - "github.com/aptly-dev/aptly/utils" - - . "gopkg.in/check.v1" -) - -type LinkFromPoolConcurrencySuite struct { - root string - poolDir string - storage *PublishedStorage - pool *PackagePool - cs aptly.ChecksumStorage - testFile string - testContent []byte - testChecksums utils.ChecksumInfo - srcPoolPath string -} - -var _ = Suite(&LinkFromPoolConcurrencySuite{}) - -func (s *LinkFromPoolConcurrencySuite) SetUpTest(c *C) { - s.root = c.MkDir() - s.poolDir = filepath.Join(s.root, "pool") - publishDir := filepath.Join(s.root, "public") - - // Create package pool and published storage - s.pool = NewPackagePool(s.poolDir, true) - s.storage = NewPublishedStorage(publishDir, "copy", "checksum") - s.cs = NewMockChecksumStorage() - - // Create test file content - s.testContent = []byte("test package content for concurrency testing") - s.testFile = filepath.Join(s.root, "test-package.deb") - - err := os.WriteFile(s.testFile, s.testContent, 0644) - c.Assert(err, IsNil) - - // Calculate checksums - md5sum, err := utils.MD5ChecksumForFile(s.testFile) - c.Assert(err, IsNil) - - s.testChecksums = utils.ChecksumInfo{ - Size: int64(len(s.testContent)), - MD5: md5sum, - } - - // Import the test file into the pool - s.srcPoolPath, err = s.pool.Import(s.testFile, "test-package.deb", &s.testChecksums, false, s.cs) - c.Assert(err, IsNil) -} - -func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolConcurrency(c *C) { - // Test concurrent LinkFromPool operations to ensure no race conditions - concurrency := 5000 - iterations := 10 - - for iter := 0; iter < iterations; iter++ { - c.Logf("Iteration %d: Testing concurrent LinkFromPool with %d goroutines", iter+1, concurrency) - - destPath := fmt.Sprintf("main/t/test%d", iter) - - var wg sync.WaitGroup - errors := make(chan error, concurrency) - successes := make(chan struct{}, concurrency) - - start := time.Now() - - // Launch concurrent LinkFromPool operations - for i := 0; i < concurrency; i++ { - wg.Add(1) - go func(id int) { - defer wg.Done() - - // Use force=true to test the most vulnerable code path (remove-then-create) - err := s.storage.LinkFromPool( - "", // publishedPrefix - destPath, // publishedRelPath - "test-package.deb", // fileName - s.pool, // sourcePool - s.srcPoolPath, // sourcePath - s.testChecksums, // sourceChecksums - true, // force - this triggers vulnerable remove-then-create pattern - ) - - if err != nil { - errors <- fmt.Errorf("goroutine %d failed: %v", id, err) - } else { - successes <- struct{}{} - } - }(i) - } - - // Wait for completion - wg.Wait() - duration := time.Since(start) - - close(errors) - close(successes) - - // Count results - errorCount := 0 - successCount := 0 - var firstError error - - for err := range errors { - errorCount++ - if firstError == nil { - firstError = err - } - c.Logf("Race condition error: %v", err) - } - - for range successes { - successCount++ - } - - c.Logf("Results: %d successes, %d errors, took %v", successCount, errorCount, duration) - - // Assert no race conditions occurred - if errorCount > 0 { - c.Fatalf("Race condition detected in iteration %d! "+ - "Errors: %d out of %d operations (%.1f%% failure rate). "+ - "First error: %v. "+ - "This indicates the fix is not working properly.", - iter+1, errorCount, concurrency, - float64(errorCount)/float64(concurrency)*100, firstError) - } - - // Verify the final file exists and has correct content - finalFile := filepath.Join(s.storage.rootPath, destPath, "test-package.deb") - _, err := os.Stat(finalFile) - c.Assert(err, IsNil, Commentf("Final file should exist after concurrent operations")) - - content, err := os.ReadFile(finalFile) - c.Assert(err, IsNil, Commentf("Should be able to read final file")) - c.Assert(content, DeepEquals, s.testContent, Commentf("File content should be intact after concurrent operations")) - - c.Logf("✓ Iteration %d: No race conditions detected", iter+1) - } - - c.Logf("SUCCESS: Handled %d total concurrent operations across %d iterations with no race conditions", - concurrency*iterations, iterations) -} - -func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolConcurrencyDifferentFiles(c *C) { - // Test concurrent operations on different files to ensure no blocking - concurrency := 10 - - var wg sync.WaitGroup - errors := make(chan error, concurrency) - - start := time.Now() - - // Launch concurrent operations on different destination files - for i := 0; i < concurrency; i++ { - wg.Add(1) - go func(id int) { - defer wg.Done() - - destPath := fmt.Sprintf("main/t/test-file-%d", id) - - err := s.storage.LinkFromPool( - "", // publishedPrefix - destPath, // publishedRelPath - "test-package.deb", // fileName - s.pool, // sourcePool - s.srcPoolPath, // sourcePath - s.testChecksums, // sourceChecksums - false, // force - ) - - if err != nil { - errors <- fmt.Errorf("goroutine %d failed: %v", id, err) - } - }(i) - } - - // Wait for completion - wg.Wait() - duration := time.Since(start) - - close(errors) - - // Count errors - errorCount := 0 - for err := range errors { - errorCount++ - c.Logf("Error: %v", err) - } - - c.Assert(errorCount, Equals, 0, Commentf("No errors should occur when linking to different files")) - c.Logf("SUCCESS: %d concurrent operations on different files completed in %v", concurrency, duration) - - // Verify all files were created correctly - for i := 0; i < concurrency; i++ { - finalFile := filepath.Join(s.storage.rootPath, fmt.Sprintf("main/t/test-file-%d", i), "test-package.deb") - _, err := os.Stat(finalFile) - c.Assert(err, IsNil, Commentf("File %d should exist", i)) - - content, err := os.ReadFile(finalFile) - c.Assert(err, IsNil, Commentf("Should be able to read file %d", i)) - c.Assert(content, DeepEquals, s.testContent, Commentf("File %d content should be correct", i)) - } -} - -func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolWithoutForceNoConcurrencyIssues(c *C) { - // Test that when force=false, concurrent operations fail gracefully without corruption - concurrency := 20 - destPath := "main/t/single-dest" - - var wg sync.WaitGroup - errors := make(chan error, concurrency) - successes := make(chan struct{}, concurrency) - - // First, create the file so subsequent operations will conflict - err := s.storage.LinkFromPool("", destPath, "test-package.deb", s.pool, s.srcPoolPath, s.testChecksums, false) - c.Assert(err, IsNil) - - start := time.Now() - - // Launch concurrent operations that should mostly fail - for i := 0; i < concurrency; i++ { - wg.Add(1) - go func(id int) { - defer wg.Done() - - err := s.storage.LinkFromPool( - "", // publishedPrefix - destPath, // publishedRelPath - "test-package.deb", // fileName - s.pool, // sourcePool - s.srcPoolPath, // sourcePath - s.testChecksums, // sourceChecksums - false, // force=false - should fail if file exists and is same - ) - - if err != nil { - errors <- err - } else { - successes <- struct{}{} - } - }(i) - } - - // Wait for completion - wg.Wait() - duration := time.Since(start) - - close(errors) - close(successes) - - errorCount := 0 - successCount := 0 - - for range errors { - errorCount++ - } - - for range successes { - successCount++ - } - - c.Logf("Results with force=false: %d successes, %d errors, took %v", successCount, errorCount, duration) - - // With force=false and identical files, operations should succeed (file already exists with same content) - // No race conditions should cause crashes or corruption - c.Assert(errorCount, Equals, 0, Commentf("With identical files and force=false, operations should succeed")) - - // Verify the file still exists and has correct content - finalFile := filepath.Join(s.storage.rootPath, destPath, "test-package.deb") - content, err := os.ReadFile(finalFile) - c.Assert(err, IsNil) - c.Assert(content, DeepEquals, s.testContent, Commentf("File should not be corrupted by concurrent access")) -} diff --git a/files/public.go b/files/public.go index a2d3f815..dea35ea8 100644 --- a/files/public.go +++ b/files/public.go @@ -26,26 +26,6 @@ type PublishedStorage struct { verifyMethod uint } -// Global mutex map to prevent concurrent access to the same destinationPath in LinkFromPool -var ( - fileLockMutex sync.Mutex - fileLocks = make(map[string]*sync.Mutex) -) - -// getFileLock returns a mutex for a specific file path to prevent concurrent modifications -func getFileLock(filePath string) *sync.Mutex { - fileLockMutex.Lock() - defer fileLockMutex.Unlock() - - if mutex, exists := fileLocks[filePath]; exists { - return mutex - } - - mutex := &sync.Mutex{} - fileLocks[filePath] = mutex - return mutex -} - // Check interfaces var ( _ aptly.PublishedStorage = (*PublishedStorage)(nil) @@ -172,11 +152,6 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath, poolPath := filepath.Join(storage.rootPath, publishedPrefix, publishedRelPath, filepath.Dir(fileName)) destinationPath := filepath.Join(poolPath, baseName) - // Acquire file-specific lock to prevent concurrent access to the same file - fileLock := getFileLock(destinationPath) - fileLock.Lock() - defer fileLock.Unlock() - var localSourcePool aptly.LocalPackagePool if storage.linkMethod != LinkMethodCopy { pp, ok := sourcePool.(aptly.LocalPackagePool) diff --git a/files/public_test.go b/files/public_test.go index 058bd8c4..9f9198f3 100644 --- a/files/public_test.go +++ b/files/public_test.go @@ -632,16 +632,6 @@ func (s *DiskFullNoRootSuite) TestLinkFromPoolCopySyncErrorIsReturned(c *C) { c.Check(strings.Contains(err.Error(), "error syncing file"), Equals, true) } -func (s *DiskFullNoRootSuite) TestGetFileLockReusesMutex(c *C) { - a := getFileLock(filepath.Join(s.root, "a")) - b := getFileLock(filepath.Join(s.root, "a")) - c.Check(a == b, Equals, true) - - c1 := getFileLock(filepath.Join(s.root, "c1")) - c2 := getFileLock(filepath.Join(s.root, "c2")) - c.Check(c1 == c2, Equals, false) -} - func (s *DiskFullNoRootSuite) TestPutFileFailsIfDestinationDirMissing(c *C) { storage := NewPublishedStorage(s.root, "", "") diff --git a/system/t12_api/publish.py b/system/t12_api/publish.py index 964a71a9..e83ae2c4 100644 --- a/system/t12_api/publish.py +++ b/system/t12_api/publish.py @@ -666,6 +666,160 @@ class PublishUpdateAPIMultiDist(APITest): self.check_not_exists("public/" + prefix + "dists/") +class PublishUpdateAPIMultiDistToggle(APITest): + """ + POST /publish/:prefix with MultiDist=false, then PUT to enable MultiDist=true + """ + fixtureGpg = True + + def check(self): + repo_name = self.random_name() + self.check_equal(self.post( + "/api/repos", json={"Name": repo_name, "DefaultDistribution": "bookworm"}).status_code, 201) + + d = self.random_name() + self.check_equal(self.upload("/api/files/" + d, + "libboost-program-options-dev_1.49.0.1_i386.deb", "pyspi_0.6.1-1.3.dsc", + "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz", + "pyspi-0.6.1-1.3.stripped.dsc").status_code, 200) + + task = self.post_task("/api/repos/" + repo_name + "/file/" + d) + self.check_task(task) + + # Publish with MultiDist=false (default) + prefix = self.random_name() + task = self.post_task( + "/api/publish/" + prefix, + json={ + "Architectures": ["i386", "source"], + "SourceKind": "local", + "Sources": [{"Name": repo_name}], + "Signing": DefaultSigningOptions, + "MultiDist": False, + } + ) + self.check_task(task) + + repo_expected = { + 'AcquireByHash': False, + 'Architectures': ['i386', 'source'], + 'Codename': '', + 'Distribution': 'bookworm', + 'Label': '', + 'Origin': '', + 'Version': '', + 'NotAutomatic': '', + 'ButAutomaticUpgrades': '', + 'Path': prefix + '/' + 'bookworm', + 'Prefix': prefix, + 'SignedBy': '', + 'SkipContents': False, + 'MultiDist': False, + 'SourceKind': 'local', + 'Sources': [{'Component': 'main', 'Name': repo_name}], + 'Storage': '', + 'Suite': ''} + + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) + + # With MultiDist=false packages are stored under pool/main/... + self.check_exists("public/" + prefix + "/dists/bookworm/Release") + self.check_exists("public/" + prefix + + "/dists/bookworm/main/binary-i386/Packages") + self.check_exists("public/" + prefix + + "/dists/bookworm/main/source/Sources") + self.check_exists( + "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + self.check_exists( + "public/" + prefix + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") + # MultiDist-style per-distribution pool must not exist yet + self.check_not_exists( + "public/" + prefix + "/pool/bookworm/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + + # Now update the published repo enabling MultiDist=true + task = self.put_task( + "/api/publish/" + prefix + "/bookworm", + json={ + "MultiDist": True, + "Signing": DefaultSigningOptions, + } + ) + self.check_task(task) + + repo_expected_multidist = { + 'AcquireByHash': False, + 'Architectures': ['i386', 'source'], + 'Codename': '', + 'Distribution': 'bookworm', + 'Label': '', + 'Origin': '', + 'Version': '', + 'NotAutomatic': '', + 'ButAutomaticUpgrades': '', + 'Path': prefix + '/' + 'bookworm', + 'Prefix': prefix, + 'SignedBy': '', + 'SkipContents': False, + 'MultiDist': True, + 'SourceKind': 'local', + 'Sources': [{'Component': 'main', 'Name': repo_name}], + 'Storage': '', + 'Suite': ''} + + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected_multidist, all_repos.json()) + + # After enabling MultiDist, packages are stored under pool//main/... + self.check_exists("public/" + prefix + "/dists/bookworm/Release") + self.check_exists("public/" + prefix + + "/dists/bookworm/main/binary-i386/Packages") + self.check_exists("public/" + prefix + + "/dists/bookworm/main/source/Sources") + self.check_exists( + "public/" + prefix + "/pool/bookworm/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + self.check_exists( + "public/" + prefix + "/pool/bookworm/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") + # Flat pool must not exist while MultiDist is on + self.check_not_exists( + "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + + # Switch MultiDist back to false + task = self.put_task( + "/api/publish/" + prefix + "/bookworm", + json={ + "MultiDist": False, + "Signing": DefaultSigningOptions, + } + ) + self.check_task(task) + + repo_expected["MultiDist"] = False + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) + + # Packages are back under the flat pool/main/... + self.check_exists("public/" + prefix + "/dists/bookworm/Release") + self.check_exists("public/" + prefix + + "/dists/bookworm/main/binary-i386/Packages") + self.check_exists("public/" + prefix + + "/dists/bookworm/main/source/Sources") + self.check_exists( + "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + self.check_exists( + "public/" + prefix + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") + # Per-distribution pool must be gone + self.check_not_exists( + "public/" + prefix + "/pool/bookworm/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + + task = self.delete_task("/api/publish/" + prefix + "/bookworm") + self.check_task(task) + self.check_not_exists("public/" + prefix + "dists/") + + class PublishConcurrentUpdateAPITestRepo(APITest): """ PUT /publish/:prefix/:distribution (local repos), DELETE /publish/:prefix/:distribution @@ -992,6 +1146,231 @@ class PublishSwitchAPITestRepo(APITest): self.check_not_exists("public/" + prefix + "dists/") +class PublishSwitchAPITestMirror(APITest): + """ + PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution + """ + fixtureGpg = True + + def check(self): + mirror_name = self.random_name() + mirror_desc = {'Name': mirror_name, + 'ArchiveURL': 'http://repo.aptly.info/system-tests/packagecloud.io/varnishcache/varnish30/debian/', + 'Distribution': 'wheezy', + 'Keyrings': ["aptlytest.gpg"], + 'Architectures': ["amd64"], + 'Components': ['main']} + mirror_desc['IgnoreSignatures'] = True + + # Create Mirror + resp = self.post("/api/mirrors", json=mirror_desc) + self.check_equal(resp.status_code, 201) + + # Get Mirror + resp = self.get("/api/mirrors/" + mirror_name + "/packages") + self.check_equal(resp.status_code, 404) + + # Update Mirror + resp = self.put_task("/api/mirrors/" + mirror_name, json=mirror_desc) + self.check_task(resp) + + # Snapshot Mirror + snapshot1_name = self.random_name() + task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot1_name}) + self.check_task(task) + + # Publish Snapshot + prefix = self.random_name() + task = self.post_task( + "/api/publish/" + prefix, + json={ + "Architectures": ["i386", "source"], + "SourceKind": "snapshot", + "Sources": [{"Name": snapshot1_name}], + "Signing": DefaultSigningOptions, + }) + self.check_task(task) + + repo_expected = { + 'AcquireByHash': False, + 'Architectures': ['i386', 'source'], + 'Codename': '', + 'Distribution': 'wheezy', + 'Label': '', + 'NotAutomatic': '', + 'ButAutomaticUpgrades': '', + 'Origin': 'packagecloud.io/varnishcache/varnish30', + 'Version': '', + 'Path': prefix + '/' + 'wheezy', + 'Prefix': prefix, + 'SignedBy': '', + 'SkipContents': False, + 'MultiDist': False, + 'SourceKind': 'snapshot', + 'Sources': [{'Component': 'main', 'Name': snapshot1_name}], + 'Storage': '', + 'Suite': ''} + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) + + # Snapshot Mirror 2 + snapshot2_name = self.random_name() + task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot2_name}) + self.check_task(task) + + task = self.put_task( + "/api/publish/" + prefix + "/wheezy", + json={ + "Snapshots": [{"Component": "main", "Name": snapshot2_name}], + "Signing": DefaultSigningOptions, + "SkipContents": True, + "Label": "fun", + "Origin": "earth", + "Version": "13.3", + }) + self.check_task(task) + repo_expected = { + 'AcquireByHash': False, + 'Architectures': ['i386', 'source'], + 'Codename': '', + 'Distribution': 'wheezy', + 'Label': 'fun', + 'Origin': 'earth', + 'Version': '13.3', + 'NotAutomatic': '', + 'ButAutomaticUpgrades': '', + 'Path': prefix + '/' + 'wheezy', + 'Prefix': prefix, + 'SignedBy': '', + 'SkipContents': True, + 'MultiDist': False, + 'SourceKind': 'snapshot', + 'Sources': [{'Component': 'main', 'Name': snapshot2_name}], + 'Storage': '', + 'Suite': ''} + + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) + + task = self.delete_task("/api/publish/" + prefix + "/wheezy") + self.check_task(task) + self.check_not_exists("public/" + prefix + "dists/") + + +class PublishSwitchAPITestSnapshot(APITest): + """ + publish snapshot of snapshot + """ + fixtureGpg = True + + def check(self): + repo_name = self.random_name() + self.check_equal(self.post( + "/api/repos", json={"Name": repo_name, "DefaultDistribution": "wheezy"}).status_code, 201) + + d = self.random_name() + self.check_equal( + self.upload("/api/files/" + d, + "pyspi_0.6.1-1.3.dsc", + "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz", + "pyspi-0.6.1-1.3.stripped.dsc").status_code, 200) + task = self.post_task("/api/repos/" + repo_name + "/file/" + d) + self.check_task(task) + + snapshot1_name = self.random_name() + task = self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot1_name}) + self.check_task(task) + + prefix = self.random_name() + task = self.post_task( + "/api/publish/" + prefix, + json={ + "Architectures": ["i386", "source"], + "SourceKind": "snapshot", + "Sources": [{"Name": snapshot1_name}], + "Signing": DefaultSigningOptions, + }) + self.check_task(task) + + repo_expected = { + 'AcquireByHash': False, + 'Architectures': ['i386', 'source'], + 'Codename': '', + 'Distribution': 'wheezy', + 'Label': '', + 'NotAutomatic': '', + 'ButAutomaticUpgrades': '', + 'Origin': '', + 'Version': '', + 'Path': prefix + '/' + 'wheezy', + 'Prefix': prefix, + 'SignedBy': '', + 'SkipContents': False, + 'MultiDist': False, + 'SourceKind': 'snapshot', + 'Sources': [{'Component': 'main', 'Name': snapshot1_name}], + 'Storage': '', + 'Suite': ''} + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) + + self.check_not_exists( + "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + self.check_exists("public/" + prefix + + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") + + snapshot2_name = self.random_name() + task = self.post_task("/api/snapshots", json={"Name": snapshot2_name, 'SourceSnapshots': [snapshot1_name]}) + self.check_task(task) + + task = self.put_task( + "/api/publish/" + prefix + "/wheezy", + json={ + "Snapshots": [{"Component": "main", "Name": snapshot2_name}], + "Signing": DefaultSigningOptions, + "SkipContents": True, + "Label": "fun", + "Origin": "earth", + "Version": "13.3", + }) + self.check_task(task) + repo_expected = { + 'AcquireByHash': False, + 'Architectures': ['i386', 'source'], + 'Codename': '', + 'Distribution': 'wheezy', + 'Label': 'fun', + 'Origin': 'earth', + 'Version': '13.3', + 'NotAutomatic': '', + 'ButAutomaticUpgrades': '', + 'Path': prefix + '/' + 'wheezy', + 'Prefix': prefix, + 'SignedBy': '', + 'SkipContents': True, + 'MultiDist': False, + 'SourceKind': 'snapshot', + 'Sources': [{'Component': 'main', 'Name': snapshot2_name}], + 'Storage': '', + 'Suite': ''} + + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) + + self.check_not_exists( + "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + self.check_exists("public/" + prefix + + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") + + task = self.delete_task("/api/publish/" + prefix + "/wheezy") + self.check_task(task) + self.check_not_exists("public/" + prefix + "dists/") + + class PublishSwitchAPITestRepoSignedBy(APITest): """ PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution diff --git a/system/t12_api/repos.py b/system/t12_api/repos.py index 424f9f49..6448a557 100644 --- a/system/t12_api/repos.py +++ b/system/t12_api/repos.py @@ -461,3 +461,34 @@ class ReposAPITestCopyPackage(APITest): self.check_equal(self.get(f"/api/repos/{repo2_name}/packages").json(), ['Pi386 libboost-program-options-dev 1.49.0.1 918d2f433384e378']) + + +class ReposAPITestCreateEdit(APITest): + """ + POST /api/repos, + """ + def check(self): + repo_name = self.random_name() + ' with space' + repo_desc = {'Comment': 'fun repo', + 'DefaultComponent': 'contrib', + 'DefaultDistribution': 'bookworm', + 'Name': repo_name} + + resp = self.post("/api/repos", json=repo_desc) + self.check_equal(resp.json(), repo_desc) + self.check_equal(resp.status_code, 201) + + repo_desc = {'Comment': 'modified repo', + 'DefaultComponent': 'main', + 'DefaultDistribution': 'trixie', + 'Name': repo_name + '@renamed'} + resp = self.put(f"/api/repos/{repo_name}", json=repo_desc) + self.check_equal(resp.json(), repo_desc) + self.check_equal(resp.status_code, 200) + + resp = self.get("/api/repos/" + repo_name + '@renamed') + self.check_equal(resp.json(), repo_desc) + self.check_equal(resp.status_code, 200) + + resp = self.delete("/api/repos/" + repo_name + '@renamed') + self.check_equal(resp.status_code, 200) diff --git a/task/list.go b/task/list.go index 5b9e9395..6a1a720d 100644 --- a/task/list.go +++ b/task/list.go @@ -44,28 +44,30 @@ func (list *List) consumer() { for { select { case task := <-list.queue: + // Set task state to RUNNING before processing list.Lock() - { - task.State = RUNNING - } + task.State = RUNNING list.Unlock() go func() { retValue, err := task.process(aptly.Progress(task.output), task.detail) + // Update task completion state and cleanup with list lock held list.Lock() { - task.processReturnValue = retValue - task.err = err if err != nil { task.output.Printf("Task failed with error: %v", err) task.State = FAILED + task.err = err + task.processReturnValue = retValue } else { task.output.Print("Task succeeded") task.State = SUCCEEDED + task.err = nil + task.processReturnValue = retValue } - list.usedResources.Free(task.resources) + list.usedResources.Free(task.Resources) task.wgTask.Done() list.wg.Done() @@ -74,9 +76,9 @@ func (list *List) consumer() { for _, t := range list.tasks { if t.State == IDLE { // check resources - blockingTasks := list.usedResources.UsedBy(t.resources) + blockingTasks := list.usedResources.UsedBy(t.Resources) if len(blockingTasks) == 0 { - list.usedResources.MarkInUse(t.resources, t) + list.usedResources.MarkInUse(t.Resources, t) // unlock list since queueing may block list.Unlock() unlocked = true @@ -105,13 +107,15 @@ func (list *List) Stop() { // GetTasks gets complete list of tasks func (list *List) GetTasks() []Task { - tasks := []Task{} list.Lock() + defer list.Unlock() + + tasks := []Task{} for _, task := range list.tasks { + // Copy task while holding list lock tasks = append(tasks, *task) } - list.Unlock() return tasks } @@ -139,11 +143,11 @@ func (list *List) DeleteTaskByID(ID int) (Task, error) { // GetTaskByID returns task with given id func (list *List) GetTaskByID(ID int) (Task, error) { list.Lock() - tasks := list.tasks - list.Unlock() + defer list.Unlock() - for _, task := range tasks { + for _, task := range list.tasks { if task.ID == ID { + // Copy task while holding list lock return *task, nil } } @@ -180,13 +184,16 @@ func (list *List) GetTaskDetailByID(ID int) (interface{}, error) { // GetTaskReturnValueByID returns process return value of task with given id func (list *List) GetTaskReturnValueByID(ID int) (*ProcessReturnValue, error) { - task, err := list.GetTaskByID(ID) + list.Lock() + defer list.Unlock() - if err != nil { - return nil, err + for _, task := range list.tasks { + if task.ID == ID { + return task.processReturnValue, nil + } } - return task.processReturnValue, nil + return nil, fmt.Errorf("could not find task with id %v", ID) } // RunTaskInBackground creates task and runs it in background. This will block until the necessary resources @@ -204,11 +211,15 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P list.wg.Add(1) task.wgTask.Add(1) + // Copy task while still holding the lock to avoid racing with consumer + // setting State=RUNNING after receiving from queue + taskCopy := *task + // add task to queue for processing if resources are available // if not, task will be queued by the consumer once resources are available tasks := list.usedResources.UsedBy(resources) if len(tasks) == 0 { - list.usedResources.MarkInUse(task.resources, task) + list.usedResources.MarkInUse(task.Resources, task) // queueing task might block if channel not ready, unlock list before queueing list.Unlock() list.queue <- task @@ -216,12 +227,13 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P list.Unlock() } - return *task, nil + return taskCopy, nil } // Clear removes finished tasks from list func (list *List) Clear() { list.Lock() + defer list.Unlock() var tasks []*Task for _, task := range list.tasks { @@ -230,8 +242,6 @@ func (list *List) Clear() { } } list.tasks = tasks - - list.Unlock() } // Wait waits till all tasks are processed @@ -254,11 +264,14 @@ func (list *List) WaitForTaskByID(ID int) (Task, error) { // GetTaskErrorByID returns the Task error for a given id func (list *List) GetTaskErrorByID(ID int) (error, error) { - task, err := list.GetTaskByID(ID) + list.Lock() + defer list.Unlock() - if err != nil { - return nil, err + for _, task := range list.tasks { + if task.ID == ID { + return task.err, nil + } } - return task.err, nil + return nil, fmt.Errorf("could not find task with id %v", ID) } diff --git a/task/task.go b/task/task.go index 02aa7037..72f60699 100644 --- a/task/task.go +++ b/task/task.go @@ -42,6 +42,7 @@ const ( ) // Task represents as task in a queue encapsulates process code +// All fields are protected by List.Mutex - access task fields only while holding list.Lock() type Task struct { output *Output detail *Detail @@ -51,7 +52,7 @@ type Task struct { Name string ID int State State - resources []string + Resources []string wgTask *sync.WaitGroup } @@ -64,7 +65,7 @@ func NewTask(process Process, name string, ID int, resources []string, wgTask *s Name: name, ID: ID, State: IDLE, - resources: resources, + Resources: resources, wgTask: wgTask, } return task