fix race condition with repo add files

Do all relevant database reading/modifying inside `maybeRunTaskInBackground`.

Notably, `LoadComplete` will load the reflist of a repo. if this is done outside of a background operation,
the data might be outdated when the background tasks runs.
This commit is contained in:
André Roth
2024-10-21 14:35:08 +02:00
parent 0e6f9c38fb
commit f16a68f59c
3 changed files with 102 additions and 93 deletions
+54 -42
View File
@@ -150,11 +150,6 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
}
resources = append(resources, string(snapshot.ResourceKey()))
err = snapshotCollection.LoadComplete(snapshot)
if err != nil {
AbortWithJSONError(c, 500, fmt.Errorf("unable to publish: %s", err))
return
}
sources = append(sources, snapshot)
}
@@ -173,12 +168,6 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
return
}
resources = append(resources, string(localRepo.Key()))
err = localCollection.LoadComplete(localRepo)
if err != nil {
AbortWithJSONError(c, 500, fmt.Errorf("unable to publish: %s", err))
}
sources = append(sources, localRepo)
}
} else {
@@ -186,15 +175,6 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
return
}
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, b.MultiDist)
if err != nil {
AbortWithJSONError(c, 500, fmt.Errorf("unable to publish: %s", err))
return
}
resources = append(resources, string(published.Key()))
collection := collectionFactory.PublishedRepoCollection()
taskName := fmt.Sprintf("Publish %s: %s", b.SourceKind, strings.Join(names, ", "))
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
taskDetail := task.PublishDetail{
@@ -205,6 +185,29 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
PublishDetail: taskDetail,
}
for _, source := range sources {
switch s := source.(type) {
case *deb.Snapshot:
snapshotCollection := collectionFactory.SnapshotCollection()
err = snapshotCollection.LoadComplete(s)
case *deb.LocalRepo:
localCollection := collectionFactory.LocalRepoCollection()
err = localCollection.LoadComplete(s)
default:
err = fmt.Errorf("unexpected type for source: %T", source)
}
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
}
}
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, b.MultiDist)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
}
resources = append(resources, string(published.Key()))
if b.Origin != "" {
published.Origin = b.Origin
}
@@ -230,13 +233,14 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
published.AcquireByHash = *b.AcquireByHash
}
collection := collectionFactory.PublishedRepoCollection()
duplicate := collection.CheckDuplicate(published)
if duplicate != nil {
collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory)
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate)
}
err := published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite)
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
}
@@ -282,47 +286,29 @@ func apiPublishUpdateSwitch(c *gin.Context) {
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
AbortWithJSONError(c, 404, fmt.Errorf("unable to update: %s", err))
return
}
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
AbortWithJSONError(c, 500, fmt.Errorf("unable to update: %s", err))
return
}
var updatedComponents []string
var updatedSnapshots []string
var resources []string
if published.SourceKind == deb.SourceLocalRepo {
if len(b.Snapshots) > 0 {
AbortWithJSONError(c, 400, fmt.Errorf("snapshots shouldn't be given when updating local repo"))
return
}
updatedComponents = published.Components()
for _, component := range updatedComponents {
published.UpdateLocalRepo(component)
}
} else if published.SourceKind == "snapshot" {
for _, snapshotInfo := range b.Snapshots {
snapshotCollection := collectionFactory.SnapshotCollection()
snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name)
if err2 != nil {
AbortWithJSONError(c, 404, err2)
AbortWithJSONError(c, http.StatusNotFound, err2)
return
}
err2 = snapshotCollection.LoadComplete(snapshot)
if err2 != nil {
AbortWithJSONError(c, 500, err2)
return
}
published.UpdateSnapshot(snapshotInfo.Component, snapshot)
updatedComponents = append(updatedComponents, snapshotInfo.Component)
updatedSnapshots = append(updatedSnapshots, snapshot.Name)
}
@@ -347,10 +333,36 @@ func apiPublishUpdateSwitch(c *gin.Context) {
published.MultiDist = *b.MultiDist
}
var resources []string
resources = append(resources, string(published.Key()))
taskName := fmt.Sprintf("Update published %s (%s): %s", published.SourceKind, strings.Join(updatedComponents, " "), strings.Join(updatedSnapshots, ", "))
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err := published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite)
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("Unable to update: %s", err)
}
if published.SourceKind == deb.SourceLocalRepo {
for _, component := range updatedComponents {
published.UpdateLocalRepo(component)
}
} else if published.SourceKind == "snapshot" {
for _, snapshotInfo := range b.Snapshots {
snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name)
if err2 != nil {
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err2
}
err2 = snapshotCollection.LoadComplete(snapshot)
if err2 != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err2
}
published.UpdateSnapshot(snapshotInfo.Component, snapshot)
}
}
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite)
if err != nil {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
}