From 2a5992c74eb6f396335fb62f57dde9376825f364 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Wed, 20 May 2026 13:41:58 +0000 Subject: [PATCH] fix(publish): reload published inside task for source-management endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Affected endpoints: apiPublishAddSource, apiPublishSetSources, apiPublishUpdateSource, apiPublishRemoveSource, apiPublishDropChanges. All five handlers shared the same flawed pattern: they loaded the published repo from the DB and mutated it (ObtainRevision / DropRevision) outside the task closure, before the task lock was acquired. Each task closure then just wrote back the already-mutated, pre-lock object. Because the task queue serialises tasks that share a resource key, two concurrent requests appear safe — but each task closure holds a stale copy of the object captured before the lock was taken: Request A loads published: revision = {} Request B loads published: revision = {} <- same DB state A mutates: revision = {main: snap1} B mutates: revision = {contrib: snap2} Task A runs: saves {main: snap1} OK Task B runs: saves {contrib: snap2} <- clobbers A's change Fix: perform only a shallow ByStoragePrefixDistribution outside the task (for the early 404 response, resource key, and task name). Inside the task closure a dedicated taskCollectionFactory is created, the published repo is re-read fresh from the DB (after the lock is acquired), and LoadComplete + all mutations + Update are executed against that authoritative copy. --- api/publish.go | 252 ++++++++++++++++++++++++++++--------------------- 1 file changed, 147 insertions(+), 105 deletions(-) diff --git a/api/publish.go b/api/publish.go index b0cedfc0..73e049fc 100644 --- a/api/publish.go +++ b/api/publish.go @@ -648,43 +648,52 @@ func apiPublishAddSource(c *gin.Context) { storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) + if c.Bind(&b) != nil { + return + } + collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly (no LoadComplete) to verify existence and obtain the + // resource key and task name. The actual mutation is performed inside + // the task on a freshly loaded copy to prevent lost-update races. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to create: %s", err)) - return - } - - if c.Bind(&b) != nil { - return - } - - revision := published.ObtainRevision() - sources := revision.Sources - - component := b.Component - name := b.Name - - _, exists := sources[component] - if exists { - AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("unable to create: Component '%s' already exists", component)) - return - } - - sources[component] = name - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to create: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create: %s", err) + } + + revision := published.ObtainRevision() + sources := revision.Sources + + component := b.Component + name := b.Name + + _, exists := sources[component] + if exists { + return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("unable to create: Component '%s' already exists", component) + } + + sources[component] = name + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -766,39 +775,48 @@ func apiPublishSetSources(c *gin.Context) { storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) + if c.Bind(&b) != nil { + return + } + collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } - - if c.Bind(&b) != nil { - return - } - - revision := published.ObtainRevision() - sources := make(map[string]string, len(b)) - revision.Sources = sources - - for _, source := range b { - component := source.Component - name := source.Name - sources[component] = name - } - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + revision := published.ObtainRevision() + sources := make(map[string]string, len(b)) + revision.Sources = sources + + for _, source := range b { + component := source.Component + name := source.Name + sources[component] = name + } + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -831,24 +849,33 @@ func apiPublishDropChanges(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and DropRevision happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err)) - return - } - - published.DropRevision() - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + published.DropRevision() + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -884,51 +911,58 @@ func apiPublishUpdateSource(c *gin.Context) { param := slashEscape(c.Params.ByName("prefix")) storage, prefix := deb.ParsePrefix(param) distribution := slashEscape(c.Params.ByName("distribution")) - component := slashEscape(c.Params.ByName("component")) + urlComponent := slashEscape(c.Params.ByName("component")) + + // Default component to the URL path segment; the body may rename it. + b.Component = urlComponent + if c.Bind(&b) != nil { + return + } collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err)) - return - } - - revision := published.ObtainRevision() - sources := revision.Sources - - _, exists := sources[component] - if !exists { - AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: Component '%s' does not exist", component)) - return - } - - b.Component = component - b.Name = revision.Sources[component] - - if c.Bind(&b) != nil { - return - } - - if b.Component != component { - delete(sources, component) - } - - component = b.Component - name := b.Name - sources[component] = name - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err) + } + + revision := published.ObtainRevision() + sources := revision.Sources + + _, exists := sources[urlComponent] + if !exists { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: Component '%s' does not exist", urlComponent) + } + + if b.Component != urlComponent { + delete(sources, urlComponent) + } + + newComponent := b.Component + name := b.Name + sources[newComponent] = name + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) } @@ -965,33 +999,41 @@ func apiPublishRemoveSource(c *gin.Context) { collectionFactory := context.NewCollectionFactory() collection := collectionFactory.PublishedRepoCollection() + // Load shallowly for 404 check, resource key, and task name. + // Full load and mutation happen inside the task. published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution) if err != nil { AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err)) return } - err = collection.LoadComplete(published, collectionFactory) - if err != nil { - AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err)) - return - } - - revision := published.ObtainRevision() - sources := revision.Sources - - _, exists := sources[component] - if !exists { - AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: Component '%s' does not exist", component)) - return - } - - delete(sources, component) - resources := []string{string(published.Key())} taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { - err = collection.Update(published) + taskCollectionFactory := context.NewCollectionFactory() + taskCollection := taskCollectionFactory.PublishedRepoCollection() + + published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + err = taskCollection.LoadComplete(published, taskCollectionFactory) + if err != nil { + return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err) + } + + revision := published.ObtainRevision() + sources := revision.Sources + + _, exists := sources[component] + if !exists { + return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: Component '%s' does not exist", component) + } + + delete(sources, component) + + err = taskCollection.Update(published) if err != nil { return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err) }