diff --git a/api/publish.go b/api/publish.go index 67b260d4..5ab7ccdb 100644 --- a/api/publish.go +++ b/api/publish.go @@ -267,7 +267,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { return } - resources = append(resources, string(snapshot.ResourceKey())) + resources = append(resources, string(snapshot.Key())) sources = append(sources, snapshot) } } else if b.SourceKind == deb.SourceLocalRepo { @@ -298,11 +298,24 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { multiDist = *b.MultiDist } - collection := collectionFactory.PublishedRepoCollection() + // Non-MultiDist publishes share a single pool/ directory under the + // prefix. Lock at the prefix level so that concurrent publish/drop + // operations on sibling distributions cannot race during cleanup. + if !multiDist { + storagePrefix := prefix + if storage != "" { + storagePrefix = storage + ":" + prefix + } + + resources = append(resources, deb.PrefixPoolLockKey(storagePrefix)) + } taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"", b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`)) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + taskDetail := task.PublishDetail{ Detail: detail, } @@ -314,10 +327,10 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { for _, source := range sources { switch s := source.(type) { case *deb.Snapshot: - snapshotCollection := collectionFactory.SnapshotCollection() + snapshotCollection := taskCollectionFactory.SnapshotCollection() err = snapshotCollection.LoadComplete(s) case *deb.LocalRepo: - localCollection := collectionFactory.LocalRepoCollection() + localCollection := taskCollectionFactory.LocalRepoCollection() err = localCollection.LoadComplete(s) default: err = fmt.Errorf("unexpected type for source: %T", source) @@ -327,13 +340,11 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { } } - published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, multiDist) + published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, taskCollectionFactory, multiDist) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) } - resources = append(resources, string(published.Key())) - if b.Origin != "" { published.Origin = b.Origin } @@ -367,18 +378,18 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { published.Version = b.Version } - duplicate := collection.CheckDuplicate(published) + duplicate := taskCollection.CheckDuplicate(published) if duplicate != nil { - _ = collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory) + _ = taskCollectionFactory.PublishedRepoCollection().LoadComplete(duplicate, taskCollectionFactory) return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath()) + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) } - err = collection.Add(published) + err = taskCollection.Add(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -458,6 +469,7 @@ func apiPublishUpdateSwitch(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() snapshotCollection := collectionFactory.SnapshotCollection() + localRepoCollection := collectionFactory.LocalRepoCollection() published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { @@ -465,64 +477,85 @@ func apiPublishUpdateSwitch(c *gin.Context) { return } + resources := []string{string(published.Key())} + if published.SourceKind == deb.SourceLocalRepo { if len(b.Snapshots) > 0 { AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo")) return } - } else if published.SourceKind == deb.SourceSnapshot { - for _, snapshotInfo := range b.Snapshots { - _, err2 := snapshotCollection.ByName(snapshotInfo.Name) + for _, uuid := range published.Sources { + repo, err2 := localRepoCollection.ByUUID(uuid) if err2 != nil { AbortWithJSONError(c, http.StatusNotFound, err2) return } + resources = append(resources, string(repo.Key())) + } + } else if published.SourceKind == deb.SourceSnapshot { + for _, snapshotInfo := range b.Snapshots { + snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(snapshot.Key())) } } else { AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type")) return } - if b.SkipContents != nil { - published.SkipContents = *b.SkipContents + // Non-MultiDist distributions share a single pool/ directory under the + // prefix. Acquire the prefix-level pool lock so that concurrent updates + // on sibling distributions are serialised and cannot race during cleanup. + if !published.MultiDist { + resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix())) } - if b.SkipBz2 != nil { - published.SkipBz2 = *b.SkipBz2 - } - - if b.AcquireByHash != nil { - published.AcquireByHash = *b.AcquireByHash - } - - if b.SignedBy != nil { - published.SignedBy = *b.SignedBy - } - - if b.MultiDist != nil { - published.MultiDist = *b.MultiDist - } - - if b.Label != nil { - published.Label = *b.Label - } - - if b.Origin != nil { - published.Origin = *b.Origin - } - - if b.Version != nil { - published.Version = *b.Version - } - - resources := []string{string(published.Key())} + // Field mutations and fresh DB load are deferred to inside the task so + // they always operate on a consistent state after the lock is held. taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.LoadComplete(published, collectionFactory) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + // Apply field mutations on the freshly loaded object. + if b.SkipContents != nil { + published.SkipContents = *b.SkipContents + } + if b.SkipBz2 != nil { + published.SkipBz2 = *b.SkipBz2 + } + if b.AcquireByHash != nil { + published.AcquireByHash = *b.AcquireByHash + } + if b.SignedBy != nil { + published.SignedBy = *b.SignedBy + } + if b.MultiDist != nil { + published.MultiDist = *b.MultiDist + } + if b.Label != nil { + published.Label = *b.Label + } + if b.Origin != nil { + published.Origin = *b.Origin + } + if b.Version != nil { + published.Version = *b.Version + } + revision := published.ObtainRevision() sources := revision.Sources @@ -534,17 +567,17 @@ func apiPublishUpdateSwitch(c *gin.Context) { } } - result, err := published.Update(collectionFactory, out) + result, err := published.Update(taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = collection.Update(published) + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -552,7 +585,7 @@ func apiPublishUpdateSwitch(c *gin.Context) { if b.SkipCleanup == nil || !*b.SkipCleanup { cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources)) cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...) - err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out) + err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } @@ -600,10 +633,19 @@ func apiPublishDrop(c *gin.Context) { } resources := []string{string(published.Key())} + // Non-MultiDist distributions share a single pool/ directory under the + // prefix. Acquire the prefix-level pool lock so that a drop cannot race + // with a concurrent update or drop of a sibling distribution during cleanup. + if !published.MultiDist { + resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix())) + } taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err := collection.Remove(context, storage, prefix, distribution, - collectionFactory, out, force, skipCleanup) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + err := taskCollection.Remove(context, storage, prefix, distribution, + taskCollectionFactory, out, force, skipCleanup) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %s", err) } @@ -639,43 +681,52 @@ func apiPublishAddSource(c *gin.Context) { storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) + if c.Bind(&b) != nil { + return + } + collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly (no LoadComplete) to verify existence and obtain the + // resource key and task name. The actual mutation is performed inside + // the task on a freshly loaded copy to prevent lost-update races. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to create: %s", err)) - return - } - - if c.Bind(&b) != nil { - return - } - - revision := published.ObtainRevision() - sources := revision.Sources - - component := b.Component - name := b.Name - - _, exists := sources[component] - if exists { - AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("unable to create: Component '%s' already exists", component)) - return - } - - sources[component] = name - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to create: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create: %s", err) + } + + revision := published.ObtainRevision() + sources := revision.Sources + + component := b.Component + name := b.Name + + _, exists := sources[component] + if exists { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("unable to create: Component '%s' already exists", component) + } + + sources[component] = name + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -757,39 +808,48 @@ func apiPublishSetSources(c *gin.Context) { storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) + if c.Bind(&b) != nil { + return + } + collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } - - if c.Bind(&b) != nil { - return - } - - revision := published.ObtainRevision() - sources := make(map[string]string, len(b)) - revision.Sources = sources - - for _, source := range b { - component := source.Component - name := source.Name - sources[component] = name - } - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + revision := published.ObtainRevision() + sources := make(map[string]string, len(b)) + revision.Sources = sources + + for _, source := range b { + component := source.Component + name := source.Name + sources[component] = name + } + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -822,24 +882,33 @@ func apiPublishDropChanges(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and DropRevision happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err)) - return - } - - published.DropRevision() - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + published.DropRevision() + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -875,51 +944,58 @@ func apiPublishUpdateSource(c *gin.Context) { param := slashEscape(c.Params.ByName("prefix")) storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) - component := slashEscape(c.Params.ByName("component")) + urlComponent := slashEscape(c.Params.ByName("component")) + + // Default component to the URL path segment; the body may rename it. + b.Component = urlComponent + if c.Bind(&b) != nil { + return + } collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } - - revision := published.ObtainRevision() - sources := revision.Sources - - _, exists := sources[component] - if !exists { - AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: Component '%s' does not exist", component)) - return - } - - b.Component = component - b.Name = revision.Sources[component] - - if c.Bind(&b) != nil { - return - } - - if b.Component != component { - delete(sources, component) - } - - component = b.Component - name := b.Name - sources[component] = name - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + revision := published.ObtainRevision() + sources := revision.Sources + + _, exists := sources[urlComponent] + if !exists { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: Component '%s' does not exist", urlComponent) + } + + if b.Component != urlComponent { + delete(sources, urlComponent) + } + + newComponent := b.Component + name := b.Name + sources[newComponent] = name + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -956,33 +1032,41 @@ func apiPublishRemoveSource(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err)) - return - } - - revision := published.ObtainRevision() - sources := revision.Sources - - _, exists := sources[component] - if !exists { - AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: Component '%s' does not exist", component)) - return - } - - delete(sources, component) - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + revision := published.ObtainRevision() + sources := revision.Sources + + _, exists := sources[component] + if !exists { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: Component '%s' does not exist", component) + } + + delete(sources, component) + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -1054,64 +1138,101 @@ func apiPublishUpdate(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and field mutations happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } - - if b.SkipContents != nil { - published.SkipContents = *b.SkipContents - } - - if b.SkipBz2 != nil { - published.SkipBz2 = *b.SkipBz2 - } - - if b.AcquireByHash != nil { - published.AcquireByHash = *b.AcquireByHash - } - - if b.SignedBy != nil { - published.SignedBy = *b.SignedBy - } - - if b.MultiDist != nil { - published.MultiDist = *b.MultiDist - } - - if b.Label != nil { - published.Label = *b.Label - } - - if b.Origin != nil { - published.Origin = *b.Origin - } - - if b.Version != nil { - published.Version = *b.Version - } - resources := []string{string(published.Key())} + + // Non-MultiDist distributions share a single pool/ directory under the + // prefix. Acquire the prefix-level pool lock so that concurrent updates + // on sibling distributions are serialised and cannot race during cleanup. + if !published.MultiDist { + resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix())) + } + + // Lock source repos / snapshots the same way apiPublishUpdateSwitch does, + // because published.Update() reads from them and concurrent modification + // would produce an inconsistent view. + snapshotCollection := collectionFactory.SnapshotCollection() + localRepoCollection := collectionFactory.LocalRepoCollection() + + if published.SourceKind == deb.SourceLocalRepo { + for _, uuid := range published.Sources { + repo, err2 := localRepoCollection.ByUUID(uuid) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(repo.Key())) + } + } else if published.SourceKind == deb.SourceSnapshot { + for _, uuid := range published.Sources { + snapshot, err2 := snapshotCollection.ByUUID(uuid) + if err2 != nil { + AbortWithJSONError(c, http.StatusNotFound, err2) + return + } + resources = append(resources, string(snapshot.Key())) + } + } + taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - result, err := published.Update(collectionFactory, out) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) + err = taskCollection.LoadComplete(published, taskCollectionFactory) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } - err = collection.Update(published) + // Apply field mutations on the freshly loaded object. + if b.SkipContents != nil { + published.SkipContents = *b.SkipContents + } + if b.SkipBz2 != nil { + published.SkipBz2 = *b.SkipBz2 + } + if b.AcquireByHash != nil { + published.AcquireByHash = *b.AcquireByHash + } + if b.SignedBy != nil { + published.SignedBy = *b.SignedBy + } + if b.MultiDist != nil { + published.MultiDist = *b.MultiDist + } + if b.Label != nil { + published.Label = *b.Label + } + if b.Origin != nil { + published.Origin = *b.Origin + } + if b.Version != nil { + published.Version = *b.Version + } + + result, err := published.Update(taskCollectionFactory, out) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath()) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -1119,7 +1240,7 @@ func apiPublishUpdate(c *gin.Context) { if b.SkipCleanup == nil || !*b.SkipCleanup { cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources)) cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...) - err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out) + err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) } diff --git a/api/published_file_missing_test.go b/api/published_file_missing_test.go new file mode 100644 index 00000000..6d7b77f8 --- /dev/null +++ b/api/published_file_missing_test.go @@ -0,0 +1,733 @@ +package api + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "os" + "os/exec" + "path/filepath" + "sync" + "time" + + "github.com/aptly-dev/aptly/aptly" + ctx "github.com/aptly-dev/aptly/context" + "github.com/aptly-dev/aptly/deb" + "github.com/gin-gonic/gin" + "github.com/smira/flag" + + . "gopkg.in/check.v1" +) + +// PublishedFileMissingSuite reproduces the exact bug where: +// - Package import succeeds +// - Metadata is updated (Packages.gz shows the package) +// - Publish reports success +// - BUT the .deb file is missing from the published pool directory +// - Result: apt-get returns 404 when trying to download the package +type PublishedFileMissingSuite struct { + context *ctx.AptlyContext + flags *flag.FlagSet + configFile *os.File + router http.Handler + tempDir string + poolPath string + publicPath string +} + +var _ = Suite(&PublishedFileMissingSuite{}) + +func (s *PublishedFileMissingSuite) SetUpSuite(c *C) { + aptly.Version = "publishedFileMissingTest" + + tempDir, err := os.MkdirTemp("", "aptly-published-missing-test") + c.Assert(err, IsNil) + s.tempDir = tempDir + s.poolPath = filepath.Join(tempDir, "pool") + s.publicPath = filepath.Join(tempDir, "public") + + file, err := os.CreateTemp("", "aptly-published-missing-config") + c.Assert(err, IsNil) + s.configFile = file + + config := gin.H{ + "rootDir": tempDir, + "downloadDir": filepath.Join(tempDir, "download"), + "architectures": []string{"amd64"}, + "dependencyFollowSuggests": false, + "dependencyFollowRecommends": false, + "gpgDisableSign": true, + "gpgDisableVerify": true, + "gpgProvider": "internal", + "skipLegacyPool": true, + "enableMetricsEndpoint": false, + } + + jsonString, err := json.Marshal(config) + c.Assert(err, IsNil) + _, err = file.Write(jsonString) + c.Assert(err, IsNil) + + flags := flag.NewFlagSet("publishedFileMissingTestFlags", flag.ContinueOnError) + flags.Bool("no-lock", true, "disable database locking for test") + flags.Int("db-open-attempts", 3, "dummy") + flags.String("config", s.configFile.Name(), "config file") + flags.String("architectures", "", "dummy") + s.flags = flags + + context, err := ctx.NewContext(s.flags) + c.Assert(err, IsNil) + + s.context = context + s.router = Router(context) +} + +func (s *PublishedFileMissingSuite) TearDownSuite(c *C) { + if s.configFile != nil { + _ = os.Remove(s.configFile.Name()) + } + if s.context != nil { + s.context.Shutdown() + } + if s.tempDir != "" { + _ = os.RemoveAll(s.tempDir) + } +} + +func (s *PublishedFileMissingSuite) SetUpTest(c *C) { + collectionFactory := s.context.NewCollectionFactory() + + localRepoCollection := collectionFactory.LocalRepoCollection() + _ = localRepoCollection.ForEach(func(repo *deb.LocalRepo) error { + _ = localRepoCollection.Drop(repo) + return nil + }) + + publishedCollection := collectionFactory.PublishedRepoCollection() + _ = publishedCollection.ForEach(func(published *deb.PublishedRepo) error { + _ = publishedCollection.Remove(s.context, published.Storage, published.Prefix, + published.Distribution, collectionFactory, nil, true, true) + return nil + }) +} + +func (s *PublishedFileMissingSuite) TearDownTest(c *C) { + s.SetUpTest(c) +} + +func (s *PublishedFileMissingSuite) httpRequest(c *C, method string, url string, body []byte) *httptest.ResponseRecorder { + w := httptest.NewRecorder() + var req *http.Request + var err error + + if body != nil { + req, err = http.NewRequest(method, url, bytes.NewReader(body)) + } else { + req, err = http.NewRequest(method, url, nil) + } + c.Assert(err, IsNil) + req.Header.Add("Content-Type", "application/json") + s.router.ServeHTTP(w, req) + return w +} + +func (s *PublishedFileMissingSuite) createDebPackage(c *C, uploadID, packageName, version string) { + uploadPath := s.context.UploadPath() + uploadDir := filepath.Join(uploadPath, uploadID) + err := os.MkdirAll(uploadDir, 0755) + c.Assert(err, IsNil) + + tempDir, err := os.MkdirTemp("", "deb-build") + c.Assert(err, IsNil) + defer func() { _ = os.RemoveAll(tempDir) }() + + debianDir := filepath.Join(tempDir, "DEBIAN") + err = os.MkdirAll(debianDir, 0755) + c.Assert(err, IsNil) + + controlContent := fmt.Sprintf(`Package: %s +Version: %s +Section: libs +Priority: optional +Architecture: amd64 +Maintainer: Test +Description: Test package + Test package for published file missing bug. +`, packageName, version) + + err = os.WriteFile(filepath.Join(debianDir, "control"), []byte(controlContent), 0644) + c.Assert(err, IsNil) + + usrDir := filepath.Join(tempDir, "usr", "lib") + err = os.MkdirAll(usrDir, 0755) + c.Assert(err, IsNil) + err = os.WriteFile(filepath.Join(usrDir, "lib.so"), []byte("library"), 0644) + c.Assert(err, IsNil) + + debFile := filepath.Join(uploadDir, fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + cmd := exec.Command("dpkg-deb", "--build", tempDir, debFile) + err = cmd.Run() + c.Assert(err, IsNil) +} + +// TestPublishedFileGoMissing reproduces the exact production bug +func (s *PublishedFileMissingSuite) TestPublishedFileGoMissing(c *C) { + c.Log("=== Reproducing: Package in metadata but 404 on download ===") + + // Create and publish a repository + repoName := "test-repo" + distribution := "bullseye" + + createBody, _ := json.Marshal(gin.H{ + "Name": repoName, + "DefaultDistribution": distribution, + "DefaultComponent": "main", + }) + resp := s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create repo: %s", resp.Body.String())) + + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "local", + "Distribution": distribution, + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Component": "main", "Name": repoName}, + }, + "Signing": gin.H{"Skip": true}, + }) + resp = s.httpRequest(c, "POST", "/api/publish/hrt", publishBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to publish: %s", resp.Body.String())) + + // Create package + packageName := "hrt-libblobbyclient1" + version := "20250926.152427+hrtdeb11" + uploadID := "test-upload-1" + + s.createDebPackage(c, uploadID, packageName, version) + + // Add package + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoName, uploadID), nil) + c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package: %s", resp.Body.String())) + + // Update publish + updateBody, _ := json.Marshal(gin.H{ + "Signing": gin.H{"Skip": true}, + "ForceOverwrite": true, + }) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/hrt/%s", distribution), updateBody) + c.Assert(resp.Code, Equals, 200, Commentf("Failed to update publish: %s", resp.Body.String())) + + // Now check if the file is actually accessible in the published location + publishedStorage := s.context.GetPublishedStorage("") + publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath() + + // Expected file path: hrt/pool/main/h/hrt-libblobbyclient1/hrt-libblobbyclient1_20250926.152427+hrtdeb11_amd64.deb + expectedPath := filepath.Join(publicPath, "hrt", "pool", "main", "h", packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + c.Logf("Checking for published file at: %s", expectedPath) + + fileInfo, err := os.Stat(expectedPath) + fileExists := err == nil + + c.Logf("File exists: %v", fileExists) + if fileExists { + c.Logf("File size: %d bytes", fileInfo.Size()) + } + + // Check metadata + resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repoName), nil) + var packages []string + err = json.Unmarshal(resp.Body.Bytes(), &packages) + c.Assert(err, IsNil) + c.Logf("Packages in metadata: %d", len(packages)) + + // THE BUG: Metadata says package exists, but file is missing from published location + if len(packages) > 0 && !fileExists { + c.Logf("★★★ BUG REPRODUCED! ★★★") + c.Logf("Metadata shows %d package(s) but file is missing at: %s", len(packages), expectedPath) + c.Logf("This is exactly what causes: 404 Not Found [IP: 10.20.72.62 3142]") + + c.Fatal("BUG CONFIRMED: Package in metadata but missing from published directory!") + } + + c.Assert(fileExists, Equals, true, Commentf( + "Published file should exist at %s when package is in metadata", expectedPath)) +} + +// TestConcurrentPublishRace tries to trigger the race with concurrent publishes +func (s *PublishedFileMissingSuite) TestConcurrentPublishRace(c *C) { + c.Log("=== Testing concurrent publish race condition ===") + + const numIterations = 4 + + for iteration := 0; iteration < numIterations; iteration++ { + c.Logf("--- Iteration %d/%d ---", iteration+1, numIterations) + + // Create repo + repoName := fmt.Sprintf("race-repo-%d", iteration) + distribution := fmt.Sprintf("dist-%d", iteration) + + createBody, _ := json.Marshal(gin.H{ + "Name": repoName, + "DefaultDistribution": distribution, + "DefaultComponent": "main", + }) + resp := s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201) + + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "local", + "Distribution": distribution, + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Component": "main", "Name": repoName}, + }, + "Signing": gin.H{"Skip": true}, + }) + resp = s.httpRequest(c, "POST", "/api/publish/concurrent", publishBody) + c.Assert(resp.Code, Equals, 201) + + // Create multiple packages + var wg sync.WaitGroup + numPackages := 5 + + for i := 0; i < numPackages; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + + packageName := fmt.Sprintf("pkg-%d-%d", iteration, idx) + version := "1.0.0" + uploadID := fmt.Sprintf("upload-%d-%d", iteration, idx) + + s.createDebPackage(c, uploadID, packageName, version) + + // Add package + resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoName, uploadID), nil) + c.Logf("Package %d add: %d", idx, resp.Code) + + // Small delay + time.Sleep(time.Duration(5+idx*2) * time.Millisecond) + + // Publish + updateBody, _ := json.Marshal(gin.H{ + "Signing": gin.H{"Skip": true}, + "ForceOverwrite": true, + }) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/concurrent/%s", distribution), updateBody) + c.Logf("Publish %d: %d", idx, resp.Code) + }(i) + } + + wg.Wait() + time.Sleep(100 * time.Millisecond) + + // Check all packages + resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repoName), nil) + var packages []string + err := json.Unmarshal(resp.Body.Bytes(), &packages) + c.Assert(err, IsNil) + + // Check published files + publishedStorage := s.context.GetPublishedStorage("") + publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath() + + missingFiles := []string{} + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("pkg-%d-%d", iteration, i) + version := "1.0.0" + + // Calculate pool path + poolSubdir := string(packageName[0]) + expectedPath := filepath.Join(publicPath, "concurrent", "pool", "main", poolSubdir, packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + if _, err := os.Stat(expectedPath); os.IsNotExist(err) { + missingFiles = append(missingFiles, expectedPath) + } + } + + if len(missingFiles) > 0 { + c.Logf("★★★ BUG DETECTED in iteration %d/%d! ★★★", iteration+1, numIterations) + c.Logf("Metadata shows %d packages, but %d files are MISSING:", len(packages), len(missingFiles)) + for i, f := range missingFiles { + c.Logf(" [iter %d] File MISSING %d/%d: %s", iteration+1, i+1, len(missingFiles), f) + } + + c.Fatalf("BUG REPRODUCED in iteration %d/%d: %d published files missing", iteration+1, numIterations, len(missingFiles)) + } else { + c.Logf("[iter %d/%d] All %d files present - OK", iteration+1, numIterations, numPackages) + } + } + + c.Logf("All %d iterations passed - bug not reproduced with current timing", numIterations) +} + +// TestIdenticalPackageRace tests the specific case of identical SHA256 packages +func (s *PublishedFileMissingSuite) TestIdenticalPackageRace(c *C) { + c.Log("=== AGGRESSIVE test: identical package (same SHA256) race ===") + + const numIterations = 4 + packageName := "shared-package" + + for iter := 0; iter < numIterations; iter++ { + c.Logf("Iteration %d/%d", iter+1, numIterations) + + // Create two repos that will get the SAME package (unique per iteration) + repos := []string{fmt.Sprintf("identical-a-%d", iter), fmt.Sprintf("identical-b-%d", iter)} + dists := []string{fmt.Sprintf("dist-a-%d", iter), fmt.Sprintf("dist-b-%d", iter)} + + for i := range repos { + createBody, _ := json.Marshal(gin.H{ + "Name": repos[i], + "DefaultDistribution": dists[i], + "DefaultComponent": "main", + }) + resp := s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201) + + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "local", + "Distribution": dists[i], + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Component": "main", "Name": repos[i]}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + }) + resp = s.httpRequest(c, "POST", "/api/publish/identical", publishBody) + c.Assert(resp.Code, Equals, 201) + } + + // Create IDENTICAL package file with UNIQUE VERSION per iteration + version := fmt.Sprintf("1.0.%d", iter) + uploadID1 := fmt.Sprintf("identical-upload-1-%d", iter) + uploadID2 := fmt.Sprintf("identical-upload-2-%d", iter) + + s.createDebPackage(c, uploadID1, packageName, version) + + // Copy to second upload (same SHA256) + uploadPath := s.context.UploadPath() + src := filepath.Join(uploadPath, uploadID1, fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + destDir := filepath.Join(uploadPath, uploadID2) + err := os.MkdirAll(destDir, 0755) + c.Assert(err, IsNil) + dest := filepath.Join(destDir, fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + srcData, readErr := os.ReadFile(src) + c.Assert(readErr, IsNil) + err = os.WriteFile(dest, srcData, 0644) + c.Assert(err, IsNil) + + // Race: add and publish both simultaneously + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[0], uploadID1), nil) + updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true}) + s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody) + }() + + go func() { + defer wg.Done() + s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[1], uploadID2), nil) + updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true}) + s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody) + }() + + wg.Wait() + time.Sleep(200 * time.Millisecond) + c.Logf("[iter %d] All operations complete", iter) + + // Check the shared pool location + publishedStorage := s.context.GetPublishedStorage("") + publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath() + + poolSubdir := string(packageName[0]) + sharedPoolPath := filepath.Join(publicPath, "identical", "pool", "main", poolSubdir, packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + fileInfo, err := os.Stat(sharedPoolPath) + fileExists := err == nil + + if fileExists { + c.Logf("[iter %d] File EXISTS at %s (size: %d)", iter, sharedPoolPath, fileInfo.Size()) + } else { + c.Logf("[iter %d] File MISSING at %s (error: %v)", iter, sharedPoolPath, err) + } + + // Check metadata + var packagesA, packagesB []string + resp := s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repos[0]), nil) + err = json.Unmarshal(resp.Body.Bytes(), &packagesA) + c.Assert(err, IsNil) + resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repos[1]), nil) + err = json.Unmarshal(resp.Body.Bytes(), &packagesB) + c.Assert(err, IsNil) + + c.Logf("[iter %d] Packages in metadata: A=%d, B=%d", iter, len(packagesA), len(packagesB)) + + // THE BUG: Both repos show packages in metadata, but the shared pool file is missing + if (len(packagesA) > 0 || len(packagesB) > 0) && !fileExists { + c.Logf("★★★ BUG REPRODUCED in iteration %d! ★★★", iter+1) + c.Logf("Packages in metadata A: %d, B: %d", len(packagesA), len(packagesB)) + c.Logf("Shared pool file exists: %v", fileExists) + c.Logf("Pool path: %s", sharedPoolPath) + + // List what files ARE in the pool directory + poolDir := filepath.Dir(sharedPoolPath) + if entries, err := os.ReadDir(poolDir); err == nil { + c.Logf("Files in pool directory %s:", poolDir) + for _, entry := range entries { + c.Logf(" - %s", entry.Name()) + } + } + + c.Fatalf("Metadata shows packages but shared pool file is missing (iteration %d)", iter+1) + } + } + + c.Logf("All %d iterations passed - bug not reproduced", numIterations) +} + +// TestConcurrentSnapshotPublishToSamePrefix reproduces the EXACT production bug: +// Multiple snapshots are published concurrently to the SAME prefix but different distributions. +// Example from production logs: +// - trixie-pgdg published to "external/postgres-auto/trixie" +// - bullseye-pgdg published to "external/postgres-auto/bullseye" +// Both share the same pool directory, causing cleanup race conditions. +func (s *PublishedFileMissingSuite) TestConcurrentSnapshotPublishToSamePrefix(c *C) { + const numIterations = 4 + + for iter := 0; iter < numIterations; iter++ { + c.Logf("--- Iteration %d/%d ---", iter+1, numIterations) + + // Create two repos with different packages (simulating trixie-pgdg and bullseye-pgdg) + repoTrixie := fmt.Sprintf("trixie-pgdg-%d", iter) + repoBullseye := fmt.Sprintf("bullseye-pgdg-%d", iter) + + // Create trixie repo + createBody, _ := json.Marshal(gin.H{ + "Name": repoTrixie, + "DefaultDistribution": "trixie", + "DefaultComponent": "main", + }) + resp := s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create trixie repo")) + + // Create bullseye repo + createBody, _ = json.Marshal(gin.H{ + "Name": repoBullseye, + "DefaultDistribution": "bullseye", + "DefaultComponent": "main", + }) + resp = s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create bullseye repo")) + + // Add packages to both repos + numPackages := 3 + + // Add packages to trixie repo + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("postgresql-17-trixie-pkg%d", i) + version := fmt.Sprintf("17.0.%d", iter) + uploadID := fmt.Sprintf("trixie-upload-%d-%d", iter, i) + + s.createDebPackage(c, uploadID, packageName, version) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoTrixie, uploadID), nil) + c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package to trixie")) + } + + // Add packages to bullseye repo + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("postgresql-17-bullseye-pkg%d", i) + version := fmt.Sprintf("17.0.%d", iter) + uploadID := fmt.Sprintf("bullseye-upload-%d-%d", iter, i) + + s.createDebPackage(c, uploadID, packageName, version) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoBullseye, uploadID), nil) + c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package to bullseye")) + } + + // Create snapshots from both repos + snapshotTrixie := fmt.Sprintf("%s-snap", repoTrixie) + snapshotBullseye := fmt.Sprintf("%s-snap", repoBullseye) + + createSnapshotBody, _ := json.Marshal(gin.H{"Name": snapshotTrixie}) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/snapshots", repoTrixie), createSnapshotBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create trixie snapshot")) + + createSnapshotBody, _ = json.Marshal(gin.H{"Name": snapshotBullseye}) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/snapshots", repoBullseye), createSnapshotBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create bullseye snapshot")) + + // Publish both snapshots CONCURRENTLY to the SAME prefix + // This mimics production where both are published to "external/postgres-auto" + // Use the SAME prefix across all iterations to trigger the race more aggressively + sharedPrefix := "postgres-auto" + + var wg sync.WaitGroup + var trixiePublishCode, bullseyePublishCode int + + wg.Add(2) + + // Publish or update trixie snapshot + go func() { + defer wg.Done() + + var resp *httptest.ResponseRecorder + if iter == 0 { + // First iteration: CREATE + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "snapshot", + "Distribution": "trixie", + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Name": snapshotTrixie}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + "ForceOverwrite": true, + "SkipCleanup": false, // Force cleanup to run + }) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/publish/%s", sharedPrefix), publishBody) + } else { + // Subsequent iterations: UPDATE (this is what happens in production) + updateBody, _ := json.Marshal(gin.H{ + "Snapshots": []gin.H{ + {"Component": "main", "Name": snapshotTrixie}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + "ForceOverwrite": true, + "SkipCleanup": false, + }) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/%s/trixie", sharedPrefix), updateBody) + } + trixiePublishCode = resp.Code + c.Logf("[iter %d] Trixie publish/update completed: %d", iter, resp.Code) + }() + + // Publish or update bullseye snapshot + go func() { + defer wg.Done() + + var resp *httptest.ResponseRecorder + if iter == 0 { + // First iteration: CREATE + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "snapshot", + "Distribution": "bullseye", + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Name": snapshotBullseye}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + "ForceOverwrite": true, + "SkipCleanup": false, + }) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/publish/%s", sharedPrefix), publishBody) + } else { + // Subsequent iterations: UPDATE + updateBody, _ := json.Marshal(gin.H{ + "Snapshots": []gin.H{ + {"Component": "main", "Name": snapshotBullseye}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + "ForceOverwrite": true, + "SkipCleanup": false, + }) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/%s/bullseye", sharedPrefix), updateBody) + } + bullseyePublishCode = resp.Code + c.Logf("[iter %d] Bullseye publish/update completed: %d", iter, resp.Code) + }() + + wg.Wait() + time.Sleep(50 * time.Millisecond) + + // Verify publishes succeeded (201 for create, 200 for update) + expectedCode := 201 + if iter > 0 { + expectedCode = 200 + } + c.Assert(trixiePublishCode, Equals, expectedCode, Commentf("Trixie publish/update should succeed")) + c.Assert(bullseyePublishCode, Equals, expectedCode, Commentf("Bullseye publish/update should succeed")) + + // Verify ALL package files exist in the published pool + publishedStorage := s.context.GetPublishedStorage("") + publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath() + + missingFiles := []string{} + expectedFiles := []string{} + + // Check trixie packages + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("postgresql-17-trixie-pkg%d", i) + version := fmt.Sprintf("17.0.%d", iter) + + poolSubdir := string(packageName[0]) + expectedPath := filepath.Join(publicPath, sharedPrefix, "pool", "main", poolSubdir, packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + expectedFiles = append(expectedFiles, expectedPath) + if _, err := os.Stat(expectedPath); os.IsNotExist(err) { + missingFiles = append(missingFiles, fmt.Sprintf("TRIXIE: %s", filepath.Base(expectedPath))) + } + } + + // Check bullseye packages + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("postgresql-17-bullseye-pkg%d", i) + version := fmt.Sprintf("17.0.%d", iter) + + poolSubdir := string(packageName[0]) + expectedPath := filepath.Join(publicPath, sharedPrefix, "pool", "main", poolSubdir, packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + expectedFiles = append(expectedFiles, expectedPath) + if _, err := os.Stat(expectedPath); os.IsNotExist(err) { + missingFiles = append(missingFiles, fmt.Sprintf("BULLSEYE: %s", filepath.Base(expectedPath))) + } + } + + // BUG: Files from one distribution are deleted by the other's cleanup + if len(missingFiles) > 0 { + c.Logf("★★★ BUG REPRODUCED in iteration %d/%d! ★★★", iter+1, numIterations) + c.Logf("Both publishes to prefix '%s' succeeded, but %d files are MISSING:", sharedPrefix, len(missingFiles)) + for i, f := range missingFiles { + c.Logf(" Missing file %d/%d: %s", i+1, len(missingFiles), f) + } + + c.Logf("\nThis reproduces the exact production bug where:") + c.Logf(" 1. Mirror updates complete successfully") + c.Logf(" 2. Snapshots are created") + c.Logf(" 3. Both snapshots publish to same prefix (different distributions)") + c.Logf(" 4. Cleanup from one publish DELETES files from the other") + c.Logf(" 5. Result: apt-get returns 404 when downloading packages") + + // List what's actually in the pool + poolDir := filepath.Join(publicPath, sharedPrefix, "pool", "main") + if entries, err := os.ReadDir(poolDir); err == nil { + c.Logf("\nActual pool directory contents (%s):", poolDir) + for _, entry := range entries { + c.Logf(" - %s/", entry.Name()) + } + } + + c.Fatalf("BUG CONFIRMED (iteration %d/%d): %d files missing from shared pool", + iter+1, numIterations, len(missingFiles)) + } else { + c.Logf("[iter %d/%d] All %d files present - OK", iter+1, numIterations, len(expectedFiles)) + } + } + c.Logf("✓ All %d iterations passed - no files missing", numIterations) +} diff --git a/deb/publish.go b/deb/publish.go index cc4c2a0f..dbaa81a7 100644 --- a/deb/publish.go +++ b/deb/publish.go @@ -612,6 +612,15 @@ func (p *PublishedRepo) Key() []byte { return []byte("U" + p.StoragePrefix() + ">>" + p.Distribution) } +// PrefixPoolLockKey returns the task-queue resource key that serialises all +// publish operations sharing the same pool directory under storagePrefix. +// It must be held whenever a non-MultiDist publish may read or clean the +// shared pool, to prevent concurrent cleanup runs from deleting each other's +// files. See docs/Resource-Locking.md for the full key-namespace table. +func PrefixPoolLockKey(storagePrefix string) string { + return "P" + storagePrefix +} + // RefKey is a unique id for package reference list func (p *PublishedRepo) RefKey(component string) []byte { return []byte("E" + p.UUID + component) diff --git a/deb/publish_test.go b/deb/publish_test.go index 125397b2..933f9ef6 100644 --- a/deb/publish_test.go +++ b/deb/publish_test.go @@ -873,7 +873,10 @@ func (s *PublishedRepoCollectionSuite) TestListReferencedFiles(c *C) { snap3 := NewSnapshotFromRefList("snap3", []*Snapshot{}, s.snap2.RefList(), "desc3") _ = s.snapshotCollection.Add(snap3) - // Ensure that adding a second publish point with matching files doesn't give duplicate results. + // When a second publish point references the same package (snap3 is a clone of snap2, + // both containing p3/lonely-strangers), listReferencedFilesByComponent deduplicates by + // package ref so the file appears only once. StrSlicesSubstract handles a single entry + // correctly, so no duplicate is needed for cleanup safety. repo3, err := NewPublishedRepo("", "", "anaconda-2", []string{}, []string{"main"}, []interface{}{snap3}, s.factory, false) c.Check(err, IsNil) c.Check(s.collection.Add(repo3), IsNil) @@ -888,7 +891,9 @@ func (s *PublishedRepoCollectionSuite) TestListReferencedFiles(c *C) { "a/alien-arena/alien-arena-common_7.40-2_i386.deb", "a/alien-arena/mars-invaders_7.40-2_i386.deb", }, - "main": {"a/alien-arena/lonely-strangers_7.40-2_i386.deb"}, + "main": { + "a/alien-arena/lonely-strangers_7.40-2_i386.deb", + }, }) } diff --git a/files/linkfrompool_concurrency_test.go b/files/linkfrompool_concurrency_test.go deleted file mode 100644 index 0acbab3f..00000000 --- a/files/linkfrompool_concurrency_test.go +++ /dev/null @@ -1,283 +0,0 @@ -package files - -import ( - "fmt" - "os" - "path/filepath" - "sync" - "time" - - "github.com/aptly-dev/aptly/aptly" - "github.com/aptly-dev/aptly/utils" - - . "gopkg.in/check.v1" -) - -type LinkFromPoolConcurrencySuite struct { - root string - poolDir string - storage *PublishedStorage - pool *PackagePool - cs aptly.ChecksumStorage - testFile string - testContent []byte - testChecksums utils.ChecksumInfo - srcPoolPath string -} - -var _ = Suite(&LinkFromPoolConcurrencySuite{}) - -func (s *LinkFromPoolConcurrencySuite) SetUpTest(c *C) { - s.root = c.MkDir() - s.poolDir = filepath.Join(s.root, "pool") - publishDir := filepath.Join(s.root, "public") - - // Create package pool and published storage - s.pool = NewPackagePool(s.poolDir, true) - s.storage = NewPublishedStorage(publishDir, "copy", "checksum") - s.cs = NewMockChecksumStorage() - - // Create test file content - s.testContent = []byte("test package content for concurrency testing") - s.testFile = filepath.Join(s.root, "test-package.deb") - - err := os.WriteFile(s.testFile, s.testContent, 0644) - c.Assert(err, IsNil) - - // Calculate checksums - md5sum, err := utils.MD5ChecksumForFile(s.testFile) - c.Assert(err, IsNil) - - s.testChecksums = utils.ChecksumInfo{ - Size: int64(len(s.testContent)), - MD5: md5sum, - } - - // Import the test file into the pool - s.srcPoolPath, err = s.pool.Import(s.testFile, "test-package.deb", &s.testChecksums, false, s.cs) - c.Assert(err, IsNil) -} - -func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolConcurrency(c *C) { - // Test concurrent LinkFromPool operations to ensure no race conditions - concurrency := 5000 - iterations := 10 - - for iter := 0; iter < iterations; iter++ { - c.Logf("Iteration %d: Testing concurrent LinkFromPool with %d goroutines", iter+1, concurrency) - - destPath := fmt.Sprintf("main/t/test%d", iter) - - var wg sync.WaitGroup - errors := make(chan error, concurrency) - successes := make(chan struct{}, concurrency) - - start := time.Now() - - // Launch concurrent LinkFromPool operations - for i := 0; i < concurrency; i++ { - wg.Add(1) - go func(id int) { - defer wg.Done() - - // Use force=true to test the most vulnerable code path (remove-then-create) - err := s.storage.LinkFromPool( - "", // publishedPrefix - destPath, // publishedRelPath - "test-package.deb", // fileName - s.pool, // sourcePool - s.srcPoolPath, // sourcePath - s.testChecksums, // sourceChecksums - true, // force - this triggers vulnerable remove-then-create pattern - ) - - if err != nil { - errors <- fmt.Errorf("goroutine %d failed: %v", id, err) - } else { - successes <- struct{}{} - } - }(i) - } - - // Wait for completion - wg.Wait() - duration := time.Since(start) - - close(errors) - close(successes) - - // Count results - errorCount := 0 - successCount := 0 - var firstError error - - for err := range errors { - errorCount++ - if firstError == nil { - firstError = err - } - c.Logf("Race condition error: %v", err) - } - - for range successes { - successCount++ - } - - c.Logf("Results: %d successes, %d errors, took %v", successCount, errorCount, duration) - - // Assert no race conditions occurred - if errorCount > 0 { - c.Fatalf("Race condition detected in iteration %d! "+ - "Errors: %d out of %d operations (%.1f%% failure rate). "+ - "First error: %v. "+ - "This indicates the fix is not working properly.", - iter+1, errorCount, concurrency, - float64(errorCount)/float64(concurrency)*100, firstError) - } - - // Verify the final file exists and has correct content - finalFile := filepath.Join(s.storage.rootPath, destPath, "test-package.deb") - _, err := os.Stat(finalFile) - c.Assert(err, IsNil, Commentf("Final file should exist after concurrent operations")) - - content, err := os.ReadFile(finalFile) - c.Assert(err, IsNil, Commentf("Should be able to read final file")) - c.Assert(content, DeepEquals, s.testContent, Commentf("File content should be intact after concurrent operations")) - - c.Logf("✓ Iteration %d: No race conditions detected", iter+1) - } - - c.Logf("SUCCESS: Handled %d total concurrent operations across %d iterations with no race conditions", - concurrency*iterations, iterations) -} - -func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolConcurrencyDifferentFiles(c *C) { - // Test concurrent operations on different files to ensure no blocking - concurrency := 10 - - var wg sync.WaitGroup - errors := make(chan error, concurrency) - - start := time.Now() - - // Launch concurrent operations on different destination files - for i := 0; i < concurrency; i++ { - wg.Add(1) - go func(id int) { - defer wg.Done() - - destPath := fmt.Sprintf("main/t/test-file-%d", id) - - err := s.storage.LinkFromPool( - "", // publishedPrefix - destPath, // publishedRelPath - "test-package.deb", // fileName - s.pool, // sourcePool - s.srcPoolPath, // sourcePath - s.testChecksums, // sourceChecksums - false, // force - ) - - if err != nil { - errors <- fmt.Errorf("goroutine %d failed: %v", id, err) - } - }(i) - } - - // Wait for completion - wg.Wait() - duration := time.Since(start) - - close(errors) - - // Count errors - errorCount := 0 - for err := range errors { - errorCount++ - c.Logf("Error: %v", err) - } - - c.Assert(errorCount, Equals, 0, Commentf("No errors should occur when linking to different files")) - c.Logf("SUCCESS: %d concurrent operations on different files completed in %v", concurrency, duration) - - // Verify all files were created correctly - for i := 0; i < concurrency; i++ { - finalFile := filepath.Join(s.storage.rootPath, fmt.Sprintf("main/t/test-file-%d", i), "test-package.deb") - _, err := os.Stat(finalFile) - c.Assert(err, IsNil, Commentf("File %d should exist", i)) - - content, err := os.ReadFile(finalFile) - c.Assert(err, IsNil, Commentf("Should be able to read file %d", i)) - c.Assert(content, DeepEquals, s.testContent, Commentf("File %d content should be correct", i)) - } -} - -func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolWithoutForceNoConcurrencyIssues(c *C) { - // Test that when force=false, concurrent operations fail gracefully without corruption - concurrency := 20 - destPath := "main/t/single-dest" - - var wg sync.WaitGroup - errors := make(chan error, concurrency) - successes := make(chan struct{}, concurrency) - - // First, create the file so subsequent operations will conflict - err := s.storage.LinkFromPool("", destPath, "test-package.deb", s.pool, s.srcPoolPath, s.testChecksums, false) - c.Assert(err, IsNil) - - start := time.Now() - - // Launch concurrent operations that should mostly fail - for i := 0; i < concurrency; i++ { - wg.Add(1) - go func(id int) { - defer wg.Done() - - err := s.storage.LinkFromPool( - "", // publishedPrefix - destPath, // publishedRelPath - "test-package.deb", // fileName - s.pool, // sourcePool - s.srcPoolPath, // sourcePath - s.testChecksums, // sourceChecksums - false, // force=false - should fail if file exists and is same - ) - - if err != nil { - errors <- err - } else { - successes <- struct{}{} - } - }(i) - } - - // Wait for completion - wg.Wait() - duration := time.Since(start) - - close(errors) - close(successes) - - errorCount := 0 - successCount := 0 - - for range errors { - errorCount++ - } - - for range successes { - successCount++ - } - - c.Logf("Results with force=false: %d successes, %d errors, took %v", successCount, errorCount, duration) - - // With force=false and identical files, operations should succeed (file already exists with same content) - // No race conditions should cause crashes or corruption - c.Assert(errorCount, Equals, 0, Commentf("With identical files and force=false, operations should succeed")) - - // Verify the file still exists and has correct content - finalFile := filepath.Join(s.storage.rootPath, destPath, "test-package.deb") - content, err := os.ReadFile(finalFile) - c.Assert(err, IsNil) - c.Assert(content, DeepEquals, s.testContent, Commentf("File should not be corrupted by concurrent access")) -} diff --git a/files/public.go b/files/public.go index a2d3f815..dea35ea8 100644 --- a/files/public.go +++ b/files/public.go @@ -26,26 +26,6 @@ type PublishedStorage struct { verifyMethod uint } -// Global mutex map to prevent concurrent access to the same destinationPath in LinkFromPool -var ( - fileLockMutex sync.Mutex - fileLocks = make(map[string]*sync.Mutex) -) - -// getFileLock returns a mutex for a specific file path to prevent concurrent modifications -func getFileLock(filePath string) *sync.Mutex { - fileLockMutex.Lock() - defer fileLockMutex.Unlock() - - if mutex, exists := fileLocks[filePath]; exists { - return mutex - } - - mutex := &sync.Mutex{} - fileLocks[filePath] = mutex - return mutex -} - // Check interfaces var ( _ aptly.PublishedStorage = (*PublishedStorage)(nil) @@ -172,11 +152,6 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath, poolPath := filepath.Join(storage.rootPath, publishedPrefix, publishedRelPath, filepath.Dir(fileName)) destinationPath := filepath.Join(poolPath, baseName) - // Acquire file-specific lock to prevent concurrent access to the same file - fileLock := getFileLock(destinationPath) - fileLock.Lock() - defer fileLock.Unlock() - var localSourcePool aptly.LocalPackagePool if storage.linkMethod != LinkMethodCopy { pp, ok := sourcePool.(aptly.LocalPackagePool) diff --git a/files/public_test.go b/files/public_test.go index 058bd8c4..9f9198f3 100644 --- a/files/public_test.go +++ b/files/public_test.go @@ -632,16 +632,6 @@ func (s *DiskFullNoRootSuite) TestLinkFromPoolCopySyncErrorIsReturned(c *C) { c.Check(strings.Contains(err.Error(), "error syncing file"), Equals, true) } -func (s *DiskFullNoRootSuite) TestGetFileLockReusesMutex(c *C) { - a := getFileLock(filepath.Join(s.root, "a")) - b := getFileLock(filepath.Join(s.root, "a")) - c.Check(a == b, Equals, true) - - c1 := getFileLock(filepath.Join(s.root, "c1")) - c2 := getFileLock(filepath.Join(s.root, "c2")) - c.Check(c1 == c2, Equals, false) -} - func (s *DiskFullNoRootSuite) TestPutFileFailsIfDestinationDirMissing(c *C) { storage := NewPublishedStorage(s.root, "", "") diff --git a/system/t12_api/publish.py b/system/t12_api/publish.py index 964a71a9..cd92f775 100644 --- a/system/t12_api/publish.py +++ b/system/t12_api/publish.py @@ -992,6 +992,231 @@ class PublishSwitchAPITestRepo(APITest): self.check_not_exists("public/" + prefix + "dists/") +class PublishSwitchAPITestMirror(APITest): + """ + PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution + """ + fixtureGpg = True + + def check(self): + mirror_name = self.random_name() + mirror_desc = {'Name': mirror_name, + 'ArchiveURL': 'http://repo.aptly.info/system-tests/packagecloud.io/varnishcache/varnish30/debian/', + 'Distribution': 'wheezy', + 'Keyrings': ["aptlytest.gpg"], + 'Architectures': ["amd64"], + 'Components': ['main']} + mirror_desc['IgnoreSignatures'] = True + + # Create Mirror + resp = self.post("/api/mirrors", json=mirror_desc) + self.check_equal(resp.status_code, 201) + + # Get Mirror + resp = self.get("/api/mirrors/" + mirror_name + "/packages") + self.check_equal(resp.status_code, 404) + + # Update Mirror + resp = self.put_task("/api/mirrors/" + mirror_name, json=mirror_desc) + self.check_task(resp) + + # Snapshot Mirror + snapshot1_name = self.random_name() + task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot1_name}) + self.check_task(task) + + # Publish Snapshot + prefix = self.random_name() + task = self.post_task( + "/api/publish/" + prefix, + json={ + "Architectures": ["i386", "source"], + "SourceKind": "snapshot", + "Sources": [{"Name": snapshot1_name}], + "Signing": DefaultSigningOptions, + }) + self.check_task(task) + + repo_expected = { + 'AcquireByHash': False, + 'Architectures': ['i386', 'source'], + 'Codename': '', + 'Distribution': 'wheezy', + 'Label': '', + 'NotAutomatic': '', + 'ButAutomaticUpgrades': '', + 'Origin': 'packagecloud.io/varnishcache/varnish30', + 'Version': '', + 'Path': prefix + '/' + 'wheezy', + 'Prefix': prefix, + 'SignedBy': '', + 'SkipContents': False, + 'MultiDist': False, + 'SourceKind': 'snapshot', + 'Sources': [{'Component': 'main', 'Name': snapshot1_name}], + 'Storage': '', + 'Suite': ''} + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) + + # Snapshot Mirror 2 + snapshot2_name = self.random_name() + task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot2_name}) + self.check_task(task) + + task = self.put_task( + "/api/publish/" + prefix + "/wheezy", + json={ + "Snapshots": [{"Component": "main", "Name": snapshot2_name}], + "Signing": DefaultSigningOptions, + "SkipContents": True, + "Label": "fun", + "Origin": "earth", + "Version": "13.3", + }) + self.check_task(task) + repo_expected = { + 'AcquireByHash': False, + 'Architectures': ['i386', 'source'], + 'Codename': '', + 'Distribution': 'wheezy', + 'Label': 'fun', + 'Origin': 'earth', + 'Version': '13.3', + 'NotAutomatic': '', + 'ButAutomaticUpgrades': '', + 'Path': prefix + '/' + 'wheezy', + 'Prefix': prefix, + 'SignedBy': '', + 'SkipContents': True, + 'MultiDist': False, + 'SourceKind': 'snapshot', + 'Sources': [{'Component': 'main', 'Name': snapshot2_name}], + 'Storage': '', + 'Suite': ''} + + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) + + task = self.delete_task("/api/publish/" + prefix + "/wheezy") + self.check_task(task) + self.check_not_exists("public/" + prefix + "dists/") + + +class PublishSwitchAPITestSnapshot(APITest): + """ + publish snapshot of snapshot + """ + fixtureGpg = True + + def check(self): + repo_name = self.random_name() + self.check_equal(self.post( + "/api/repos", json={"Name": repo_name, "DefaultDistribution": "wheezy"}).status_code, 201) + + d = self.random_name() + self.check_equal( + self.upload("/api/files/" + d, + "pyspi_0.6.1-1.3.dsc", + "pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz", + "pyspi-0.6.1-1.3.stripped.dsc").status_code, 200) + task = self.post_task("/api/repos/" + repo_name + "/file/" + d) + self.check_task(task) + + snapshot1_name = self.random_name() + task = self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot1_name}) + self.check_task(task) + + prefix = self.random_name() + task = self.post_task( + "/api/publish/" + prefix, + json={ + "Architectures": ["i386", "source"], + "SourceKind": "snapshot", + "Sources": [{"Name": snapshot1_name}], + "Signing": DefaultSigningOptions, + }) + self.check_task(task) + + repo_expected = { + 'AcquireByHash': False, + 'Architectures': ['i386', 'source'], + 'Codename': '', + 'Distribution': 'wheezy', + 'Label': '', + 'NotAutomatic': '', + 'ButAutomaticUpgrades': '', + 'Origin': '', + 'Version': '', + 'Path': prefix + '/' + 'wheezy', + 'Prefix': prefix, + 'SignedBy': '', + 'SkipContents': False, + 'MultiDist': False, + 'SourceKind': 'snapshot', + 'Sources': [{'Component': 'main', 'Name': snapshot1_name}], + 'Storage': '', + 'Suite': ''} + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) + + self.check_not_exists( + "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + self.check_exists("public/" + prefix + + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") + + snapshot2_name = self.random_name() + task = self.post_task("/api/snapshots", json={"Name": snapshot2_name, 'SourceSnapshots': [snapshot1_name]}) + self.check_task(task) + + task = self.put_task( + "/api/publish/" + prefix + "/wheezy", + json={ + "Snapshots": [{"Component": "main", "Name": snapshot2_name}], + "Signing": DefaultSigningOptions, + "SkipContents": True, + "Label": "fun", + "Origin": "earth", + "Version": "13.3", + }) + self.check_task(task) + repo_expected = { + 'AcquireByHash': False, + 'Architectures': ['i386', 'source'], + 'Codename': '', + 'Distribution': 'wheezy', + 'Label': 'fun', + 'Origin': 'earth', + 'Version': '13.3', + 'NotAutomatic': '', + 'ButAutomaticUpgrades': '', + 'Path': prefix + '/' + 'wheezy', + 'Prefix': prefix, + 'SignedBy': '', + 'SkipContents': True, + 'MultiDist': False, + 'SourceKind': 'snapshot', + 'Sources': [{'Component': 'main', 'Name': snapshot2_name}], + 'Storage': '', + 'Suite': ''} + + all_repos = self.get("/api/publish") + self.check_equal(all_repos.status_code, 200) + self.check_in(repo_expected, all_repos.json()) + + self.check_not_exists( + "public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb") + self.check_exists("public/" + prefix + + "/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc") + + task = self.delete_task("/api/publish/" + prefix + "/wheezy") + self.check_task(task) + self.check_not_exists("public/" + prefix + "dists/") + + class PublishSwitchAPITestRepoSignedBy(APITest): """ PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution