mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-05-31 04:30:44 +00:00
fix(publish): reload published inside task for source-management endpoints
Affected endpoints: apiPublishAddSource, apiPublishSetSources,
apiPublishUpdateSource, apiPublishRemoveSource, apiPublishDropChanges.
All five handlers shared the same flawed pattern: they loaded the
published repo from the DB and mutated it (ObtainRevision / DropRevision)
outside the task closure, before the task lock was acquired. Each task
closure then just wrote back the already-mutated, pre-lock object.
Because the task queue serialises tasks that share a resource key, two
concurrent requests appear safe — but each task closure holds a stale
copy of the object captured before the lock was taken:
Request A loads published: revision = {}
Request B loads published: revision = {} <- same DB state
A mutates: revision = {main: snap1}
B mutates: revision = {contrib: snap2}
Task A runs: saves {main: snap1} OK
Task B runs: saves {contrib: snap2} <- clobbers A's change
Fix: perform only a shallow ByStoragePrefixDistribution outside the task
(for the early 404 response, resource key, and task name). Inside the
task closure a dedicated taskCollectionFactory is created, the published
repo is re-read fresh from the DB (after the lock is acquired), and
LoadComplete + all mutations + Update are executed against that
authoritative copy.
This commit is contained in:
+147
-105
@@ -648,43 +648,52 @@ func apiPublishAddSource(c *gin.Context) {
|
|||||||
storage, prefix := deb.ParsePrefix(param)
|
storage, prefix := deb.ParsePrefix(param)
|
||||||
distribution := slashEscape(c.Params.ByName("distribution"))
|
distribution := slashEscape(c.Params.ByName("distribution"))
|
||||||
|
|
||||||
|
if c.Bind(&b) != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.PublishedRepoCollection()
|
collection := collectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
|
// Load shallowly (no LoadComplete) to verify existence and obtain the
|
||||||
|
// resource key and task name. The actual mutation is performed inside
|
||||||
|
// the task on a freshly loaded copy to prevent lost-update races.
|
||||||
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err))
|
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = collection.LoadComplete(published, collectionFactory)
|
|
||||||
if err != nil {
|
|
||||||
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to create: %s", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.Bind(&b) != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
revision := published.ObtainRevision()
|
|
||||||
sources := revision.Sources
|
|
||||||
|
|
||||||
component := b.Component
|
|
||||||
name := b.Name
|
|
||||||
|
|
||||||
_, exists := sources[component]
|
|
||||||
if exists {
|
|
||||||
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("unable to create: Component '%s' already exists", component))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
sources[component] = name
|
|
||||||
|
|
||||||
resources := []string{string(published.Key())}
|
resources := []string{string(published.Key())}
|
||||||
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
err = collection.Update(published)
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
taskCollection := taskCollectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
|
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to create: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = taskCollection.LoadComplete(published, taskCollectionFactory)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
revision := published.ObtainRevision()
|
||||||
|
sources := revision.Sources
|
||||||
|
|
||||||
|
component := b.Component
|
||||||
|
name := b.Name
|
||||||
|
|
||||||
|
_, exists := sources[component]
|
||||||
|
if exists {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("unable to create: Component '%s' already exists", component)
|
||||||
|
}
|
||||||
|
|
||||||
|
sources[component] = name
|
||||||
|
|
||||||
|
err = taskCollection.Update(published)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
||||||
}
|
}
|
||||||
@@ -766,39 +775,48 @@ func apiPublishSetSources(c *gin.Context) {
|
|||||||
storage, prefix := deb.ParsePrefix(param)
|
storage, prefix := deb.ParsePrefix(param)
|
||||||
distribution := slashEscape(c.Params.ByName("distribution"))
|
distribution := slashEscape(c.Params.ByName("distribution"))
|
||||||
|
|
||||||
|
if c.Bind(&b) != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.PublishedRepoCollection()
|
collection := collectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
|
// Load shallowly for 404 check, resource key, and task name.
|
||||||
|
// Full load and mutation happen inside the task.
|
||||||
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
|
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = collection.LoadComplete(published, collectionFactory)
|
|
||||||
if err != nil {
|
|
||||||
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.Bind(&b) != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
revision := published.ObtainRevision()
|
|
||||||
sources := make(map[string]string, len(b))
|
|
||||||
revision.Sources = sources
|
|
||||||
|
|
||||||
for _, source := range b {
|
|
||||||
component := source.Component
|
|
||||||
name := source.Name
|
|
||||||
sources[component] = name
|
|
||||||
}
|
|
||||||
|
|
||||||
resources := []string{string(published.Key())}
|
resources := []string{string(published.Key())}
|
||||||
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
err = collection.Update(published)
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
taskCollection := taskCollectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
|
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = taskCollection.LoadComplete(published, taskCollectionFactory)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
revision := published.ObtainRevision()
|
||||||
|
sources := make(map[string]string, len(b))
|
||||||
|
revision.Sources = sources
|
||||||
|
|
||||||
|
for _, source := range b {
|
||||||
|
component := source.Component
|
||||||
|
name := source.Name
|
||||||
|
sources[component] = name
|
||||||
|
}
|
||||||
|
|
||||||
|
err = taskCollection.Update(published)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
||||||
}
|
}
|
||||||
@@ -831,24 +849,33 @@ func apiPublishDropChanges(c *gin.Context) {
|
|||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.PublishedRepoCollection()
|
collection := collectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
|
// Load shallowly for 404 check, resource key, and task name.
|
||||||
|
// Full load and DropRevision happen inside the task.
|
||||||
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
|
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = collection.LoadComplete(published, collectionFactory)
|
|
||||||
if err != nil {
|
|
||||||
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
published.DropRevision()
|
|
||||||
|
|
||||||
resources := []string{string(published.Key())}
|
resources := []string{string(published.Key())}
|
||||||
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
err = collection.Update(published)
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
taskCollection := taskCollectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
|
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = taskCollection.LoadComplete(published, taskCollectionFactory)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
published.DropRevision()
|
||||||
|
|
||||||
|
err = taskCollection.Update(published)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
||||||
}
|
}
|
||||||
@@ -884,51 +911,58 @@ func apiPublishUpdateSource(c *gin.Context) {
|
|||||||
param := slashEscape(c.Params.ByName("prefix"))
|
param := slashEscape(c.Params.ByName("prefix"))
|
||||||
storage, prefix := deb.ParsePrefix(param)
|
storage, prefix := deb.ParsePrefix(param)
|
||||||
distribution := slashEscape(c.Params.ByName("distribution"))
|
distribution := slashEscape(c.Params.ByName("distribution"))
|
||||||
component := slashEscape(c.Params.ByName("component"))
|
urlComponent := slashEscape(c.Params.ByName("component"))
|
||||||
|
|
||||||
|
// Default component to the URL path segment; the body may rename it.
|
||||||
|
b.Component = urlComponent
|
||||||
|
if c.Bind(&b) != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.PublishedRepoCollection()
|
collection := collectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
|
// Load shallowly for 404 check, resource key, and task name.
|
||||||
|
// Full load and mutation happen inside the task.
|
||||||
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
|
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = collection.LoadComplete(published, collectionFactory)
|
|
||||||
if err != nil {
|
|
||||||
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
revision := published.ObtainRevision()
|
|
||||||
sources := revision.Sources
|
|
||||||
|
|
||||||
_, exists := sources[component]
|
|
||||||
if !exists {
|
|
||||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: Component '%s' does not exist", component))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
b.Component = component
|
|
||||||
b.Name = revision.Sources[component]
|
|
||||||
|
|
||||||
if c.Bind(&b) != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if b.Component != component {
|
|
||||||
delete(sources, component)
|
|
||||||
}
|
|
||||||
|
|
||||||
component = b.Component
|
|
||||||
name := b.Name
|
|
||||||
sources[component] = name
|
|
||||||
|
|
||||||
resources := []string{string(published.Key())}
|
resources := []string{string(published.Key())}
|
||||||
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
err = collection.Update(published)
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
taskCollection := taskCollectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
|
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = taskCollection.LoadComplete(published, taskCollectionFactory)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
revision := published.ObtainRevision()
|
||||||
|
sources := revision.Sources
|
||||||
|
|
||||||
|
_, exists := sources[urlComponent]
|
||||||
|
if !exists {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: Component '%s' does not exist", urlComponent)
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.Component != urlComponent {
|
||||||
|
delete(sources, urlComponent)
|
||||||
|
}
|
||||||
|
|
||||||
|
newComponent := b.Component
|
||||||
|
name := b.Name
|
||||||
|
sources[newComponent] = name
|
||||||
|
|
||||||
|
err = taskCollection.Update(published)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
||||||
}
|
}
|
||||||
@@ -965,33 +999,41 @@ func apiPublishRemoveSource(c *gin.Context) {
|
|||||||
collectionFactory := context.NewCollectionFactory()
|
collectionFactory := context.NewCollectionFactory()
|
||||||
collection := collectionFactory.PublishedRepoCollection()
|
collection := collectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
|
// Load shallowly for 404 check, resource key, and task name.
|
||||||
|
// Full load and mutation happen inside the task.
|
||||||
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
|
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = collection.LoadComplete(published, collectionFactory)
|
|
||||||
if err != nil {
|
|
||||||
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
revision := published.ObtainRevision()
|
|
||||||
sources := revision.Sources
|
|
||||||
|
|
||||||
_, exists := sources[component]
|
|
||||||
if !exists {
|
|
||||||
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: Component '%s' does not exist", component))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
delete(sources, component)
|
|
||||||
|
|
||||||
resources := []string{string(published.Key())}
|
resources := []string{string(published.Key())}
|
||||||
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
|
||||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
err = collection.Update(published)
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
taskCollection := taskCollectionFactory.PublishedRepoCollection()
|
||||||
|
|
||||||
|
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = taskCollection.LoadComplete(published, taskCollectionFactory)
|
||||||
|
if err != nil {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
revision := published.ObtainRevision()
|
||||||
|
sources := revision.Sources
|
||||||
|
|
||||||
|
_, exists := sources[component]
|
||||||
|
if !exists {
|
||||||
|
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: Component '%s' does not exist", component)
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(sources, component)
|
||||||
|
|
||||||
|
err = taskCollection.Update(published)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user