publish: fix race conditions

* remove useless resource lock
  Resource locks need to be before the background task. creating same publish endpoint at the same time is unlikely...
* load data inside background tasks
  This fixes a flaw in async apis, which loaded the published repo from the DB and mutated it outside the task closure, before the task lock was acquired.
  Perform collection.LoadComplete inside maybeRunTaskInBackground and have tasks use a fresh copy of taskCollectionFactory, taskCollection
* lock source repos/snapshots for publish operations
  Concurrent tasks were not properly locking their resources, leading to inconsistent published indexes:
  SourceLocalRepo: iterate published.Sources (component -> source UUID), look up each local repo via localRepoCollection.ByUUID and append string(repo.Key()) to resources
  SourceSnapshot: iterate b.Snapshots,look up each snapshot via snapshotCollection.ByName and append string(snapshot.ResourceKey()) to resources.
* lock pool on non MultiDist publish
* revert mutex on LinkFromPool
* use uuids, since names can be renamed
This commit is contained in:
André Roth
2026-05-20 13:40:37 +00:00
parent 2974558aa7
commit 80e4e9bdac
8 changed files with 1294 additions and 519 deletions
+320 -199
View File
@@ -267,7 +267,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
return
}
resources = append(resources, string(snapshot.ResourceKey()))
resources = append(resources, string(snapshot.Key()))
sources = append(sources, snapshot)
}
} else if b.SourceKind == deb.SourceLocalRepo {
@@ -298,11 +298,24 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
multiDist = *b.MultiDist
}
collection := collectionFactory.PublishedRepoCollection()
// Non-MultiDist publishes share a single pool/ directory under the
// prefix. Lock at the prefix level so that concurrent publish/drop
// operations on sibling distributions cannot race during cleanup.
if !multiDist {
storagePrefix := prefix
if storage != "" {
storagePrefix = storage + ":" + prefix
}
resources = append(resources, deb.PrefixPoolLockKey(storagePrefix))
}
taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"",
b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`))
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
taskDetail := task.PublishDetail{
Detail: detail,
}
@@ -314,10 +327,10 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
for _, source := range sources {
switch s := source.(type) {
case *deb.Snapshot:
snapshotCollection := collectionFactory.SnapshotCollection()
snapshotCollection := taskCollectionFactory.SnapshotCollection()
err = snapshotCollection.LoadComplete(s)
case *deb.LocalRepo:
localCollection := collectionFactory.LocalRepoCollection()
localCollection := taskCollectionFactory.LocalRepoCollection()
err = localCollection.LoadComplete(s)
default:
err = fmt.Errorf("unexpected type for source: %T", source)
@@ -327,13 +340,11 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
}
}
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, multiDist)
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, taskCollectionFactory, multiDist)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
}
resources = append(resources, string(published.Key()))
if b.Origin != "" {
published.Origin = b.Origin
}
@@ -367,18 +378,18 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
published.Version = b.Version
}
duplicate := collection.CheckDuplicate(published)
duplicate := taskCollection.CheckDuplicate(published)
if duplicate != nil {
_ = collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory)
_ = taskCollectionFactory.PublishedRepoCollection().LoadComplete(duplicate, taskCollectionFactory)
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate)
}
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath())
err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, publishOutput, b.ForceOverwrite, context.SkelPath())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
}
err = collection.Add(published)
err = taskCollection.Add(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
@@ -458,6 +469,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
localRepoCollection := collectionFactory.LocalRepoCollection()
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
@@ -465,64 +477,85 @@ func apiPublishUpdateSwitch(c *gin.Context) {
return
}
resources := []string{string(published.Key())}
if published.SourceKind == deb.SourceLocalRepo {
if len(b.Snapshots) > 0 {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo"))
return
}
} else if published.SourceKind == deb.SourceSnapshot {
for _, snapshotInfo := range b.Snapshots {
_, err2 := snapshotCollection.ByName(snapshotInfo.Name)
for _, uuid := range published.Sources {
repo, err2 := localRepoCollection.ByUUID(uuid)
if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(repo.Key()))
}
} else if published.SourceKind == deb.SourceSnapshot {
for _, snapshotInfo := range b.Snapshots {
snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name)
if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(snapshot.Key()))
}
} else {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type"))
return
}
if b.SkipContents != nil {
published.SkipContents = *b.SkipContents
// Non-MultiDist distributions share a single pool/ directory under the
// prefix. Acquire the prefix-level pool lock so that concurrent updates
// on sibling distributions are serialised and cannot race during cleanup.
if !published.MultiDist {
resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix()))
}
if b.SkipBz2 != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
if b.SignedBy != nil {
published.SignedBy = *b.SignedBy
}
if b.MultiDist != nil {
published.MultiDist = *b.MultiDist
}
if b.Label != nil {
published.Label = *b.Label
}
if b.Origin != nil {
published.Origin = *b.Origin
}
if b.Version != nil {
published.Version = *b.Version
}
resources := []string{string(published.Key())}
// Field mutations and fresh DB load are deferred to inside the task so
// they always operate on a consistent state after the lock is held.
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.LoadComplete(published, collectionFactory)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
// Apply field mutations on the freshly loaded object.
if b.SkipContents != nil {
published.SkipContents = *b.SkipContents
}
if b.SkipBz2 != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
if b.SignedBy != nil {
published.SignedBy = *b.SignedBy
}
if b.MultiDist != nil {
published.MultiDist = *b.MultiDist
}
if b.Label != nil {
published.Label = *b.Label
}
if b.Origin != nil {
published.Origin = *b.Origin
}
if b.Version != nil {
published.Version = *b.Version
}
revision := published.ObtainRevision()
sources := revision.Sources
@@ -534,17 +567,17 @@ func apiPublishUpdateSwitch(c *gin.Context) {
}
}
result, err := published.Update(collectionFactory, out)
result, err := published.Update(taskCollectionFactory, out)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = collection.Update(published)
err = taskCollection.Update(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
@@ -552,7 +585,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
if b.SkipCleanup == nil || !*b.SkipCleanup {
cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources))
cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...)
err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out)
err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
@@ -600,10 +633,19 @@ func apiPublishDrop(c *gin.Context) {
}
resources := []string{string(published.Key())}
// Non-MultiDist distributions share a single pool/ directory under the
// prefix. Acquire the prefix-level pool lock so that a drop cannot race
// with a concurrent update or drop of a sibling distribution during cleanup.
if !published.MultiDist {
resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix()))
}
taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err := collection.Remove(context, storage, prefix, distribution,
collectionFactory, out, force, skipCleanup)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
err := taskCollection.Remove(context, storage, prefix, distribution,
taskCollectionFactory, out, force, skipCleanup)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to drop: %s", err)
}
@@ -639,43 +681,52 @@ func apiPublishAddSource(c *gin.Context) {
storage, prefix := deb.ParsePrefix(param)
distribution := slashEscape(c.Params.ByName("distribution"))
if c.Bind(&b) != nil {
return
}
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
// Load shallowly (no LoadComplete) to verify existence and obtain the
// resource key and task name. The actual mutation is performed inside
// the task on a freshly loaded copy to prevent lost-update races.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err))
return
}
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to create: %s", err))
return
}
if c.Bind(&b) != nil {
return
}
revision := published.ObtainRevision()
sources := revision.Sources
component := b.Component
name := b.Name
_, exists := sources[component]
if exists {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("unable to create: Component '%s' already exists", component))
return
}
sources[component] = name
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.Update(published)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to create: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create: %s", err)
}
revision := published.ObtainRevision()
sources := revision.Sources
component := b.Component
name := b.Name
_, exists := sources[component]
if exists {
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("unable to create: Component '%s' already exists", component)
}
sources[component] = name
err = taskCollection.Update(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
@@ -757,39 +808,48 @@ func apiPublishSetSources(c *gin.Context) {
storage, prefix := deb.ParsePrefix(param)
distribution := slashEscape(c.Params.ByName("distribution"))
if c.Bind(&b) != nil {
return
}
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and mutation happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
return
}
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
return
}
if c.Bind(&b) != nil {
return
}
revision := published.ObtainRevision()
sources := make(map[string]string, len(b))
revision.Sources = sources
for _, source := range b {
component := source.Component
name := source.Name
sources[component] = name
}
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.Update(published)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
revision := published.ObtainRevision()
sources := make(map[string]string, len(b))
revision.Sources = sources
for _, source := range b {
component := source.Component
name := source.Name
sources[component] = name
}
err = taskCollection.Update(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
@@ -822,24 +882,33 @@ func apiPublishDropChanges(c *gin.Context) {
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and DropRevision happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
return
}
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
return
}
published.DropRevision()
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.Update(published)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err)
}
published.DropRevision()
err = taskCollection.Update(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
@@ -875,51 +944,58 @@ func apiPublishUpdateSource(c *gin.Context) {
param := slashEscape(c.Params.ByName("prefix"))
storage, prefix := deb.ParsePrefix(param)
distribution := slashEscape(c.Params.ByName("distribution"))
component := slashEscape(c.Params.ByName("component"))
urlComponent := slashEscape(c.Params.ByName("component"))
// Default component to the URL path segment; the body may rename it.
b.Component = urlComponent
if c.Bind(&b) != nil {
return
}
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and mutation happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
return
}
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
return
}
revision := published.ObtainRevision()
sources := revision.Sources
_, exists := sources[component]
if !exists {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: Component '%s' does not exist", component))
return
}
b.Component = component
b.Name = revision.Sources[component]
if c.Bind(&b) != nil {
return
}
if b.Component != component {
delete(sources, component)
}
component = b.Component
name := b.Name
sources[component] = name
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.Update(published)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
revision := published.ObtainRevision()
sources := revision.Sources
_, exists := sources[urlComponent]
if !exists {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: Component '%s' does not exist", urlComponent)
}
if b.Component != urlComponent {
delete(sources, urlComponent)
}
newComponent := b.Component
name := b.Name
sources[newComponent] = name
err = taskCollection.Update(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
@@ -956,33 +1032,41 @@ func apiPublishRemoveSource(c *gin.Context) {
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and mutation happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
return
}
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
return
}
revision := published.ObtainRevision()
sources := revision.Sources
_, exists := sources[component]
if !exists {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: Component '%s' does not exist", component))
return
}
delete(sources, component)
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.Update(published)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err)
}
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err)
}
revision := published.ObtainRevision()
sources := revision.Sources
_, exists := sources[component]
if !exists {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: Component '%s' does not exist", component)
}
delete(sources, component)
err = taskCollection.Update(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
@@ -1054,64 +1138,101 @@ func apiPublishUpdate(c *gin.Context) {
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
// Load shallowly for 404 check, resource key, and task name.
// Full load and field mutations happen inside the task.
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
return
}
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
return
}
if b.SkipContents != nil {
published.SkipContents = *b.SkipContents
}
if b.SkipBz2 != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
if b.SignedBy != nil {
published.SignedBy = *b.SignedBy
}
if b.MultiDist != nil {
published.MultiDist = *b.MultiDist
}
if b.Label != nil {
published.Label = *b.Label
}
if b.Origin != nil {
published.Origin = *b.Origin
}
if b.Version != nil {
published.Version = *b.Version
}
resources := []string{string(published.Key())}
// Non-MultiDist distributions share a single pool/ directory under the
// prefix. Acquire the prefix-level pool lock so that concurrent updates
// on sibling distributions are serialised and cannot race during cleanup.
if !published.MultiDist {
resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix()))
}
// Lock source repos / snapshots the same way apiPublishUpdateSwitch does,
// because published.Update() reads from them and concurrent modification
// would produce an inconsistent view.
snapshotCollection := collectionFactory.SnapshotCollection()
localRepoCollection := collectionFactory.LocalRepoCollection()
if published.SourceKind == deb.SourceLocalRepo {
for _, uuid := range published.Sources {
repo, err2 := localRepoCollection.ByUUID(uuid)
if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(repo.Key()))
}
} else if published.SourceKind == deb.SourceSnapshot {
for _, uuid := range published.Sources {
snapshot, err2 := snapshotCollection.ByUUID(uuid)
if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
resources = append(resources, string(snapshot.Key()))
}
}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
result, err := published.Update(collectionFactory, out)
taskCollectionFactory := context.NewCollectionFactory()
taskCollection := taskCollectionFactory.PublishedRepoCollection()
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
err = taskCollection.LoadComplete(published, taskCollectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = collection.Update(published)
// Apply field mutations on the freshly loaded object.
if b.SkipContents != nil {
published.SkipContents = *b.SkipContents
}
if b.SkipBz2 != nil {
published.SkipBz2 = *b.SkipBz2
}
if b.AcquireByHash != nil {
published.AcquireByHash = *b.AcquireByHash
}
if b.SignedBy != nil {
published.SignedBy = *b.SignedBy
}
if b.MultiDist != nil {
published.MultiDist = *b.MultiDist
}
if b.Label != nil {
published.Label = *b.Label
}
if b.Origin != nil {
published.Origin = *b.Origin
}
if b.Version != nil {
published.Version = *b.Version
}
result, err := published.Update(taskCollectionFactory, out)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}
err = taskCollection.Update(published)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
}
@@ -1119,7 +1240,7 @@ func apiPublishUpdate(c *gin.Context) {
if b.SkipCleanup == nil || !*b.SkipCleanup {
cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources))
cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...)
err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out)
err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}