mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-01-11 03:11:50 +00:00
Merge pull request #1380 from aptly-dev/fix/concurrent-api
Fix race condition with async API operations
This commit is contained in:
@@ -150,11 +150,6 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
|
||||
}
|
||||
|
||||
resources = append(resources, string(snapshot.ResourceKey()))
|
||||
err = snapshotCollection.LoadComplete(snapshot)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, 500, fmt.Errorf("unable to publish: %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
sources = append(sources, snapshot)
|
||||
}
|
||||
@@ -173,12 +168,6 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
resources = append(resources, string(localRepo.Key()))
|
||||
err = localCollection.LoadComplete(localRepo)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, 500, fmt.Errorf("unable to publish: %s", err))
|
||||
}
|
||||
|
||||
sources = append(sources, localRepo)
|
||||
}
|
||||
} else {
|
||||
@@ -186,15 +175,6 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, b.MultiDist)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, 500, fmt.Errorf("unable to publish: %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
resources = append(resources, string(published.Key()))
|
||||
collection := collectionFactory.PublishedRepoCollection()
|
||||
|
||||
taskName := fmt.Sprintf("Publish %s: %s", b.SourceKind, strings.Join(names, ", "))
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
taskDetail := task.PublishDetail{
|
||||
@@ -205,6 +185,29 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
|
||||
PublishDetail: taskDetail,
|
||||
}
|
||||
|
||||
for _, source := range sources {
|
||||
switch s := source.(type) {
|
||||
case *deb.Snapshot:
|
||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||
err = snapshotCollection.LoadComplete(s)
|
||||
case *deb.LocalRepo:
|
||||
localCollection := collectionFactory.LocalRepoCollection()
|
||||
err = localCollection.LoadComplete(s)
|
||||
default:
|
||||
err = fmt.Errorf("unexpected type for source: %T", source)
|
||||
}
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory, b.MultiDist)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
|
||||
}
|
||||
|
||||
resources = append(resources, string(published.Key()))
|
||||
|
||||
if b.Origin != "" {
|
||||
published.Origin = b.Origin
|
||||
}
|
||||
@@ -230,13 +233,14 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
|
||||
published.AcquireByHash = *b.AcquireByHash
|
||||
}
|
||||
|
||||
collection := collectionFactory.PublishedRepoCollection()
|
||||
duplicate := collection.CheckDuplicate(published)
|
||||
if duplicate != nil {
|
||||
collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory)
|
||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate)
|
||||
}
|
||||
|
||||
err := published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite)
|
||||
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, publishOutput, b.ForceOverwrite)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
|
||||
}
|
||||
@@ -282,47 +286,29 @@ func apiPublishUpdateSwitch(c *gin.Context) {
|
||||
|
||||
collectionFactory := context.NewCollectionFactory()
|
||||
collection := collectionFactory.PublishedRepoCollection()
|
||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||
|
||||
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, 404, fmt.Errorf("unable to update: %s", err))
|
||||
return
|
||||
}
|
||||
err = collection.LoadComplete(published, collectionFactory)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, 500, fmt.Errorf("unable to update: %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
var updatedComponents []string
|
||||
var updatedSnapshots []string
|
||||
var resources []string
|
||||
|
||||
if published.SourceKind == deb.SourceLocalRepo {
|
||||
if len(b.Snapshots) > 0 {
|
||||
AbortWithJSONError(c, 400, fmt.Errorf("snapshots shouldn't be given when updating local repo"))
|
||||
return
|
||||
}
|
||||
updatedComponents = published.Components()
|
||||
for _, component := range updatedComponents {
|
||||
published.UpdateLocalRepo(component)
|
||||
}
|
||||
} else if published.SourceKind == "snapshot" {
|
||||
for _, snapshotInfo := range b.Snapshots {
|
||||
snapshotCollection := collectionFactory.SnapshotCollection()
|
||||
snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name)
|
||||
if err2 != nil {
|
||||
AbortWithJSONError(c, 404, err2)
|
||||
AbortWithJSONError(c, http.StatusNotFound, err2)
|
||||
return
|
||||
}
|
||||
|
||||
err2 = snapshotCollection.LoadComplete(snapshot)
|
||||
if err2 != nil {
|
||||
AbortWithJSONError(c, 500, err2)
|
||||
return
|
||||
}
|
||||
|
||||
published.UpdateSnapshot(snapshotInfo.Component, snapshot)
|
||||
updatedComponents = append(updatedComponents, snapshotInfo.Component)
|
||||
updatedSnapshots = append(updatedSnapshots, snapshot.Name)
|
||||
}
|
||||
@@ -347,10 +333,36 @@ func apiPublishUpdateSwitch(c *gin.Context) {
|
||||
published.MultiDist = *b.MultiDist
|
||||
}
|
||||
|
||||
var resources []string
|
||||
resources = append(resources, string(published.Key()))
|
||||
taskName := fmt.Sprintf("Update published %s (%s): %s", published.SourceKind, strings.Join(updatedComponents, " "), strings.Join(updatedSnapshots, ", "))
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
err := published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite)
|
||||
err = collection.LoadComplete(published, collectionFactory)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("Unable to update: %s", err)
|
||||
}
|
||||
|
||||
if published.SourceKind == deb.SourceLocalRepo {
|
||||
for _, component := range updatedComponents {
|
||||
published.UpdateLocalRepo(component)
|
||||
}
|
||||
} else if published.SourceKind == "snapshot" {
|
||||
for _, snapshotInfo := range b.Snapshots {
|
||||
snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name)
|
||||
if err2 != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, err2
|
||||
}
|
||||
|
||||
err2 = snapshotCollection.LoadComplete(snapshot)
|
||||
if err2 != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err2
|
||||
}
|
||||
|
||||
published.UpdateSnapshot(snapshotInfo.Component, snapshot)
|
||||
}
|
||||
}
|
||||
|
||||
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
|
||||
}
|
||||
|
||||
54
api/repos.go
54
api/repos.go
@@ -288,14 +288,14 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
|
||||
return
|
||||
}
|
||||
|
||||
err = collection.LoadComplete(repo)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, 500, err)
|
||||
return
|
||||
}
|
||||
|
||||
resources := []string{string(repo.Key())}
|
||||
|
||||
maybeRunTaskInBackground(c, taskNamePrefix+repo.Name, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
err = collection.LoadComplete(repo)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
|
||||
out.Printf("Loading packages...\n")
|
||||
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
|
||||
if err != nil {
|
||||
@@ -394,12 +394,6 @@ func apiReposPackageFromDir(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
err = collection.LoadComplete(repo)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, 500, err)
|
||||
return
|
||||
}
|
||||
|
||||
var taskName string
|
||||
var sources []string
|
||||
if fileParam == "" {
|
||||
@@ -413,6 +407,11 @@ func apiReposPackageFromDir(c *gin.Context) {
|
||||
resources := []string{string(repo.Key())}
|
||||
resources = append(resources, sources...)
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
err = collection.LoadComplete(repo)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
|
||||
verifier := context.GetVerifier()
|
||||
|
||||
var (
|
||||
@@ -514,17 +513,7 @@ func apiReposCopyPackage(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("dest repo error: %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
srcRefList *deb.PackageRefList
|
||||
srcRepo *deb.LocalRepo
|
||||
)
|
||||
|
||||
var srcRepo *deb.LocalRepo
|
||||
srcRepo, err = collectionFactory.LocalRepoCollection().ByName(srcRepoName)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("src repo error: %s", err))
|
||||
@@ -536,17 +525,22 @@ func apiReposCopyPackage(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("src repo error: %s", err))
|
||||
return
|
||||
}
|
||||
|
||||
srcRefList = srcRepo.RefList()
|
||||
taskName := fmt.Sprintf("Copy packages from repo %s to repo %s", srcRepoName, dstRepoName)
|
||||
resources := []string{string(dstRepo.Key()), string(srcRepo.Key())}
|
||||
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
err = collectionFactory.LocalRepoCollection().LoadComplete(dstRepo)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("dest repo error: %s", err)
|
||||
}
|
||||
|
||||
err = collectionFactory.LocalRepoCollection().LoadComplete(srcRepo)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("src repo error: %s", err)
|
||||
}
|
||||
|
||||
srcRefList := srcRepo.RefList()
|
||||
|
||||
reporter := &aptly.RecordingResultReporter{
|
||||
Warnings: []string{},
|
||||
AddedLines: []string{},
|
||||
|
||||
@@ -135,16 +135,17 @@ func apiSnapshotsCreate(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
err = snapshotCollection.LoadComplete(sources[i])
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, 500, err)
|
||||
return
|
||||
}
|
||||
|
||||
resources = append(resources, string(sources[i].ResourceKey()))
|
||||
}
|
||||
|
||||
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
for i := range sources {
|
||||
err = snapshotCollection.LoadComplete(sources[i])
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
}
|
||||
|
||||
list := deb.NewPackageList()
|
||||
|
||||
// verify package refs and build package list
|
||||
@@ -468,17 +469,20 @@ func apiSnapshotsMerge(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
err = snapshotCollection.LoadComplete(sources[i])
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
resources[i] = string(sources[i].ResourceKey())
|
||||
}
|
||||
|
||||
maybeRunTaskInBackground(c, "Merge snapshot "+name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
err = snapshotCollection.LoadComplete(sources[0])
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
result := sources[0].RefList()
|
||||
for i := 1; i < len(sources); i++ {
|
||||
err = snapshotCollection.LoadComplete(sources[i])
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
result = result.Merge(sources[i].RefList(), overrideMatching, false)
|
||||
}
|
||||
|
||||
@@ -566,11 +570,6 @@ func apiSnapshotsPull(c *gin.Context) {
|
||||
AbortWithJSONError(c, http.StatusNotFound, err)
|
||||
return
|
||||
}
|
||||
err = collectionFactory.SnapshotCollection().LoadComplete(toSnapshot)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Load <Source> snapshot
|
||||
sourceSnapshot, err := collectionFactory.SnapshotCollection().ByName(body.Source)
|
||||
@@ -578,15 +577,19 @@ func apiSnapshotsPull(c *gin.Context) {
|
||||
AbortWithJSONError(c, http.StatusNotFound, err)
|
||||
return
|
||||
}
|
||||
err = collectionFactory.SnapshotCollection().LoadComplete(sourceSnapshot)
|
||||
if err != nil {
|
||||
AbortWithJSONError(c, http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
|
||||
resources := []string{string(sourceSnapshot.ResourceKey()), string(toSnapshot.ResourceKey())}
|
||||
taskName := fmt.Sprintf("Pull snapshot %s into %s and save as %s", body.Source, name, body.Destination)
|
||||
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||
err = collectionFactory.SnapshotCollection().LoadComplete(toSnapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
err = collectionFactory.SnapshotCollection().LoadComplete(sourceSnapshot)
|
||||
if err != nil {
|
||||
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, err
|
||||
}
|
||||
|
||||
// convert snapshots to package list
|
||||
toPackageList, err := deb.NewPackageListFromRefList(toSnapshot.RefList(), collectionFactory.PackageCollection(), context.Progress())
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user