Compare commits

...

7 Commits

Author SHA1 Message Date
André Roth 4defa49b7f publish: lock resources from all SourceKinds 2026-05-04 20:48:05 +02:00
André Roth 6fbcbc108c more debug 2026-05-04 18:41:50 +02:00
André Roth 41f5d22637 publish: remove useless ressource assignment 2026-05-04 17:19:36 +02:00
André Roth 8179f73bf0 publish: cleanup 2026-05-04 17:19:15 +02:00
André Roth f8efb3e9b7 publish update: lock all snapshots and repos as well 2026-05-04 16:12:54 +02:00
André Roth 55b2943f44 more debug 2026-05-04 13:49:24 +02:00
André Roth 9280231c1d publish: debug locking 2026-05-04 12:49:16 +02:00
2 changed files with 53 additions and 8 deletions
+48 -8
View File
@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"strings" "strings"
"errors"
"github.com/aptly-dev/aptly/aptly" "github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/deb" "github.com/aptly-dev/aptly/deb"
@@ -255,13 +256,13 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
if b.SourceKind == deb.SourceSnapshot { if b.SourceKind == deb.SourceSnapshot {
var snapshot *deb.Snapshot var snapshot *deb.Snapshot
snapshotCollection := collectionFactory.SnapshotCollection() tmpCollection := collectionFactory.SnapshotCollection()
for _, source := range b.Sources { for _, source := range b.Sources {
components = append(components, source.Component) components = append(components, source.Component)
names = append(names, source.Name) names = append(names, source.Name)
snapshot, err = snapshotCollection.ByName(source.Name) snapshot, err = tmpCollection.ByName(source.Name)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err))
return return
@@ -273,13 +274,13 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
} else if b.SourceKind == deb.SourceLocalRepo { } else if b.SourceKind == deb.SourceLocalRepo {
var localRepo *deb.LocalRepo var localRepo *deb.LocalRepo
localCollection := collectionFactory.LocalRepoCollection() tmpCollection := collectionFactory.LocalRepoCollection()
for _, source := range b.Sources { for _, source := range b.Sources {
components = append(components, source.Component) components = append(components, source.Component)
names = append(names, source.Name) names = append(names, source.Name)
localRepo, err = localCollection.ByName(source.Name) localRepo, err = tmpCollection.ByName(source.Name)
if err != nil { if err != nil {
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err)) AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err))
return return
@@ -332,8 +333,6 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err) return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
} }
resources = append(resources, string(published.Key()))
if b.Origin != "" { if b.Origin != "" {
published.Origin = b.Origin published.Origin = b.Origin
} }
@@ -465,18 +464,60 @@ func apiPublishUpdateSwitch(c *gin.Context) {
return return
} }
resources := []string{string(published.Key())}
if published.SourceKind == deb.SourceLocalRepo { if published.SourceKind == deb.SourceLocalRepo {
if len(b.Snapshots) > 0 { if len(b.Snapshots) > 0 {
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo")) AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo"))
return return
} }
fmt.Printf("RACE DEBUG: deb.SourceLocalRepo\n")
// FIXME: lock repo ?
// localCollection := collectionFactory.LocalRepoCollection()
// for _, source := range b.Sources {
// components = append(components, source.Component)
// names = append(names, source.Name)
// localRepo, err = localCollection.ByName(source.Name)
// if err != nil {
// AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err))
// return
// }
// resources = append(resources, string(localRepo.Key()))
// }
} else if published.SourceKind == deb.SourceSnapshot { } else if published.SourceKind == deb.SourceSnapshot {
fmt.Printf("RACE DEBUG: deb.SourceSnapshot: %s\n", b.Snapshots)
for _, snapshotInfo := range b.Snapshots { for _, snapshotInfo := range b.Snapshots {
_, err2 := snapshotCollection.ByName(snapshotInfo.Name) snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name)
if err2 != nil { if err2 != nil {
AbortWithJSONError(c, http.StatusNotFound, err2) AbortWithJSONError(c, http.StatusNotFound, err2)
return return
} }
resources = append(resources, string(snapshot.ResourceKey()))
for _, sourceID := range snapshot.SourceIDs {
if snapshot.SourceKind == deb.SourceSnapshot {
// FIXME: implement
err := errors.New("not implemented deb.SourceSnapshot")
AbortWithJSONError(c, http.StatusNotFound, err)
return
} else if snapshot.SourceKind == deb.SourceLocalRepo {
var repo *deb.LocalRepo
repo, err = context.NewCollectionFactory().LocalRepoCollection().ByUUID(sourceID)
if err != nil {
AbortWithJSONError(c, http.StatusNotFound, err)
return
}
resources = append(resources, string(repo.Key()))
} else if snapshot.SourceKind == deb.SourceRemoteRepo {
// FIXME: implement
err := errors.New("not implemented: deb.SourceRemoteRepo")
AbortWithJSONError(c, http.StatusNotFound, err)
return
}
}
} }
} else { } else {
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type")) AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type"))
@@ -515,7 +556,6 @@ func apiPublishUpdateSwitch(c *gin.Context) {
published.Version = *b.Version published.Version = *b.Version
} }
resources := []string{string(published.Key())}
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
err = collection.LoadComplete(published, collectionFactory) err = collection.LoadComplete(published, collectionFactory)
+5
View File
@@ -65,6 +65,7 @@ func (list *List) consumer() {
task.State = SUCCEEDED task.State = SUCCEEDED
} }
fmt.Printf("RACE DEBUG: Task %s done, freeing %s\n", task.Name, task.resources)
list.usedResources.Free(task.resources) list.usedResources.Free(task.resources)
task.wgTask.Done() task.wgTask.Done()
@@ -77,6 +78,8 @@ func (list *List) consumer() {
blockingTasks := list.usedResources.UsedBy(t.resources) blockingTasks := list.usedResources.UsedBy(t.resources)
if len(blockingTasks) == 0 { if len(blockingTasks) == 0 {
list.usedResources.MarkInUse(t.resources, t) list.usedResources.MarkInUse(t.resources, t)
fmt.Printf("RACE DEBUG: Starting queued task %s, using %s\n", t.Name, t.resources)
// unlock list since queueing may block // unlock list since queueing may block
list.Unlock() list.Unlock()
unlocked = true unlocked = true
@@ -209,10 +212,12 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
tasks := list.usedResources.UsedBy(resources) tasks := list.usedResources.UsedBy(resources)
if len(tasks) == 0 { if len(tasks) == 0 {
list.usedResources.MarkInUse(task.resources, task) list.usedResources.MarkInUse(task.resources, task)
fmt.Printf("RACE DEBUG: Starting task %s, using %s\n", name, resources)
// queueing task might block if channel not ready, unlock list before queueing // queueing task might block if channel not ready, unlock list before queueing
list.Unlock() list.Unlock()
list.queue <- task list.queue <- task
} else { } else {
fmt.Printf("RACE DEBUG: Queued task %s, locked %s\n", name, resources)
list.Unlock() list.Unlock()
} }