mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-05-31 04:30:44 +00:00
publish: fix locking of snapshots
snapshots come in 3 kinds: local, remote and snapshot find resources to be locked for each kind, recursively for the snapshot kind
This commit is contained in:
+58
-48
@@ -4,7 +4,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"errors"
|
|
||||||
|
|
||||||
"github.com/aptly-dev/aptly/aptly"
|
"github.com/aptly-dev/aptly/aptly"
|
||||||
"github.com/aptly-dev/aptly/deb"
|
"github.com/aptly-dev/aptly/deb"
|
||||||
@@ -386,6 +385,46 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Return resources to be locked for a Snapshot name
|
||||||
|
func getSnapshotResources(snapshotCollection *deb.SnapshotCollection, snapshotName string) (resources []string, err error) {
|
||||||
|
snapshot, err := snapshotCollection.ByName(snapshotName)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resources = append(resources, string(snapshot.ResourceKey()))
|
||||||
|
|
||||||
|
for _, sourceID := range snapshot.SourceIDs {
|
||||||
|
if snapshot.SourceKind == deb.SourceSnapshot {
|
||||||
|
snapshot2, err2 := snapshotCollection.ByUUID(sourceID)
|
||||||
|
if err2 != nil {
|
||||||
|
err = err2
|
||||||
|
return
|
||||||
|
}
|
||||||
|
res, err3 := getSnapshotResources(snapshotCollection, snapshot2.Name)
|
||||||
|
if err3 != nil {
|
||||||
|
err = err3
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resources = append(resources, res...)
|
||||||
|
} else if snapshot.SourceKind == deb.SourceLocalRepo {
|
||||||
|
var repo *deb.LocalRepo
|
||||||
|
repo, err = context.NewCollectionFactory().LocalRepoCollection().ByUUID(sourceID)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resources = append(resources, string(repo.Key()))
|
||||||
|
} else if snapshot.SourceKind == deb.SourceRemoteRepo {
|
||||||
|
var mirror *deb.RemoteRepo
|
||||||
|
mirror, err = context.NewCollectionFactory().RemoteRepoCollection().ByUUID(sourceID)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resources = append(resources, string(mirror.Key()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
type publishedRepoUpdateSwitchParams struct {
|
type publishedRepoUpdateSwitchParams struct {
|
||||||
// when publishing, overwrite files in pool/ directory without notice
|
// when publishing, overwrite files in pool/ directory without notice
|
||||||
ForceOverwrite bool ` json:"ForceOverwrite" example:"false"`
|
ForceOverwrite bool ` json:"ForceOverwrite" example:"false"`
|
||||||
@@ -405,12 +444,12 @@ type publishedRepoUpdateSwitchParams struct {
|
|||||||
SignedBy *string ` json:"SignedBy" example:""`
|
SignedBy *string ` json:"SignedBy" example:""`
|
||||||
// Enable multiple packages with the same filename in different distributions
|
// Enable multiple packages with the same filename in different distributions
|
||||||
MultiDist *bool ` json:"MultiDist" example:"false"`
|
MultiDist *bool ` json:"MultiDist" example:"false"`
|
||||||
// Value of Label: field in published repository stanza
|
// Value of Label: field in published repository stanza
|
||||||
Label *string ` json:"Label" example:"Debian"`
|
Label *string ` json:"Label" example:"Debian"`
|
||||||
// Value of Origin: field in published repository stanza
|
// Value of Origin: field in published repository stanza
|
||||||
Origin *string ` json:"Origin" example:"Debian"`
|
Origin *string ` json:"Origin" example:"Debian"`
|
||||||
// Version of the release: Optional
|
// Version of the release: Optional
|
||||||
Version *string ` json:"Version" example:"13.3"`
|
Version *string ` json:"Version" example:"13.3"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// @Summary Update Published Repository
|
// @Summary Update Published Repository
|
||||||
@@ -471,53 +510,24 @@ func apiPublishUpdateSwitch(c *gin.Context) {
|
|||||||
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo"))
|
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("snapshots shouldn't be given when updating local repo"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fmt.Printf("RACE DEBUG: deb.SourceLocalRepo\n")
|
|
||||||
|
|
||||||
// FIXME: lock repo ?
|
localCollection := collectionFactory.LocalRepoCollection()
|
||||||
// localCollection := collectionFactory.LocalRepoCollection()
|
for _, sourceID := range published.Sources {
|
||||||
// for _, source := range b.Sources {
|
localRepo, err2 := localCollection.ByUUID(sourceID)
|
||||||
// components = append(components, source.Component)
|
|
||||||
// names = append(names, source.Name)
|
|
||||||
|
|
||||||
// localRepo, err = localCollection.ByName(source.Name)
|
|
||||||
// if err != nil {
|
|
||||||
// AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to publish: %s", err))
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
|
|
||||||
// resources = append(resources, string(localRepo.Key()))
|
|
||||||
// }
|
|
||||||
} else if published.SourceKind == deb.SourceSnapshot {
|
|
||||||
fmt.Printf("RACE DEBUG: deb.SourceSnapshot: %s\n", b.Snapshots)
|
|
||||||
for _, snapshotInfo := range b.Snapshots {
|
|
||||||
snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name)
|
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, err2)
|
AbortWithJSONError(c, http.StatusNotFound, err2)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resources = append(resources, string(snapshot.ResourceKey()))
|
resources = append(resources, string(localRepo.Key()))
|
||||||
|
}
|
||||||
for _, sourceID := range snapshot.SourceIDs {
|
} else if published.SourceKind == deb.SourceSnapshot {
|
||||||
if snapshot.SourceKind == deb.SourceSnapshot {
|
for _, snapshotInfo := range b.Snapshots {
|
||||||
// FIXME: implement
|
res, err2 := getSnapshotResources(snapshotCollection, snapshotInfo.Name)
|
||||||
err := errors.New("not implemented deb.SourceSnapshot")
|
if err2 != nil {
|
||||||
AbortWithJSONError(c, http.StatusNotFound, err)
|
AbortWithJSONError(c, http.StatusNotFound, err2)
|
||||||
return
|
return
|
||||||
} else if snapshot.SourceKind == deb.SourceLocalRepo {
|
|
||||||
var repo *deb.LocalRepo
|
|
||||||
repo, err = context.NewCollectionFactory().LocalRepoCollection().ByUUID(sourceID)
|
|
||||||
if err != nil {
|
|
||||||
AbortWithJSONError(c, http.StatusNotFound, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
resources = append(resources, string(repo.Key()))
|
|
||||||
} else if snapshot.SourceKind == deb.SourceRemoteRepo {
|
|
||||||
// FIXME: implement
|
|
||||||
err := errors.New("not implemented: deb.SourceRemoteRepo")
|
|
||||||
AbortWithJSONError(c, http.StatusNotFound, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
resources = append(resources, res...)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type"))
|
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unknown published repository type"))
|
||||||
|
|||||||
@@ -992,6 +992,232 @@ class PublishSwitchAPITestRepo(APITest):
|
|||||||
self.check_not_exists("public/" + prefix + "dists/")
|
self.check_not_exists("public/" + prefix + "dists/")
|
||||||
|
|
||||||
|
|
||||||
|
class PublishSwitchAPITestMirror(APITest):
|
||||||
|
"""
|
||||||
|
PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution
|
||||||
|
"""
|
||||||
|
fixtureGpg = True
|
||||||
|
|
||||||
|
def check(self):
|
||||||
|
mirror_name = self.random_name()
|
||||||
|
mirror_desc = {'Name': mirror_name,
|
||||||
|
'ArchiveURL': 'http://repo.aptly.info/system-tests/packagecloud.io/varnishcache/varnish30/debian/',
|
||||||
|
'Distribution': 'wheezy',
|
||||||
|
'Keyrings': ["aptlytest.gpg"],
|
||||||
|
'Architectures': ["amd64"],
|
||||||
|
'Components': ['main']}
|
||||||
|
mirror_desc['IgnoreSignatures'] = True
|
||||||
|
|
||||||
|
# Create Mirror
|
||||||
|
resp = self.post("/api/mirrors", json=mirror_desc)
|
||||||
|
self.check_equal(resp.status_code, 201)
|
||||||
|
|
||||||
|
# Get Mirror
|
||||||
|
resp = self.get("/api/mirrors/" + mirror_name + "/packages")
|
||||||
|
self.check_equal(resp.status_code, 404)
|
||||||
|
|
||||||
|
# Update Mirror
|
||||||
|
resp = self.put_task("/api/mirrors/" + mirror_name, json=mirror_desc)
|
||||||
|
self.check_task(resp)
|
||||||
|
|
||||||
|
# Snapshot Mirror
|
||||||
|
snapshot1_name = self.random_name()
|
||||||
|
task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot1_name})
|
||||||
|
self.check_task(task)
|
||||||
|
|
||||||
|
# Publish Snapshot
|
||||||
|
prefix = self.random_name()
|
||||||
|
task = self.post_task(
|
||||||
|
"/api/publish/" + prefix,
|
||||||
|
json={
|
||||||
|
"Architectures": ["i386", "source"],
|
||||||
|
"SourceKind": "snapshot",
|
||||||
|
"Sources": [{"Name": snapshot1_name}],
|
||||||
|
"Signing": DefaultSigningOptions,
|
||||||
|
})
|
||||||
|
self.check_task(task)
|
||||||
|
|
||||||
|
repo_expected = {
|
||||||
|
'AcquireByHash': False,
|
||||||
|
'Architectures': ['i386', 'source'],
|
||||||
|
'Codename': '',
|
||||||
|
'Distribution': 'wheezy',
|
||||||
|
'Label': '',
|
||||||
|
'NotAutomatic': '',
|
||||||
|
'ButAutomaticUpgrades': '',
|
||||||
|
'Origin': 'packagecloud.io/varnishcache/varnish30',
|
||||||
|
'Version': '',
|
||||||
|
'Path': prefix + '/' + 'wheezy',
|
||||||
|
'Prefix': prefix,
|
||||||
|
'SignedBy': '',
|
||||||
|
'SkipContents': False,
|
||||||
|
'MultiDist': False,
|
||||||
|
'SourceKind': 'snapshot',
|
||||||
|
'Sources': [{'Component': 'main', 'Name': snapshot1_name}],
|
||||||
|
'Storage': '',
|
||||||
|
'Suite': ''}
|
||||||
|
all_repos = self.get("/api/publish")
|
||||||
|
self.check_equal(all_repos.status_code, 200)
|
||||||
|
self.check_in(repo_expected, all_repos.json())
|
||||||
|
|
||||||
|
# Snapshot Mirror 2
|
||||||
|
snapshot2_name = self.random_name()
|
||||||
|
task = self.post_task("/api/mirrors/" + mirror_name + '/snapshots', json={'Name': snapshot2_name})
|
||||||
|
self.check_task(task)
|
||||||
|
|
||||||
|
task = self.put_task(
|
||||||
|
"/api/publish/" + prefix + "/wheezy",
|
||||||
|
json={
|
||||||
|
"Snapshots": [{"Component": "main", "Name": snapshot2_name}],
|
||||||
|
"Signing": DefaultSigningOptions,
|
||||||
|
"SkipContents": True,
|
||||||
|
"Label": "fun",
|
||||||
|
"Origin": "earth",
|
||||||
|
"Version": "13.3",
|
||||||
|
})
|
||||||
|
self.check_task(task)
|
||||||
|
repo_expected = {
|
||||||
|
'AcquireByHash': False,
|
||||||
|
'Architectures': ['i386', 'source'],
|
||||||
|
'Codename': '',
|
||||||
|
'Distribution': 'wheezy',
|
||||||
|
'Label': 'fun',
|
||||||
|
'Origin': 'earth',
|
||||||
|
'Version': '13.3',
|
||||||
|
'NotAutomatic': '',
|
||||||
|
'ButAutomaticUpgrades': '',
|
||||||
|
'Path': prefix + '/' + 'wheezy',
|
||||||
|
'Prefix': prefix,
|
||||||
|
'SignedBy': '',
|
||||||
|
'SkipContents': True,
|
||||||
|
'MultiDist': False,
|
||||||
|
'SourceKind': 'snapshot',
|
||||||
|
'Sources': [{'Component': 'main', 'Name': snapshot2_name}],
|
||||||
|
'Storage': '',
|
||||||
|
'Suite': ''}
|
||||||
|
|
||||||
|
all_repos = self.get("/api/publish")
|
||||||
|
self.check_equal(all_repos.status_code, 200)
|
||||||
|
self.check_in(repo_expected, all_repos.json())
|
||||||
|
|
||||||
|
task = self.delete_task("/api/publish/" + prefix + "/wheezy")
|
||||||
|
self.check_task(task)
|
||||||
|
self.check_not_exists("public/" + prefix + "dists/")
|
||||||
|
|
||||||
|
|
||||||
|
class PublishSwitchAPITestSnapshot(APITest):
|
||||||
|
"""
|
||||||
|
publish snapshot of snapshot
|
||||||
|
"""
|
||||||
|
fixtureGpg = True
|
||||||
|
|
||||||
|
def check(self):
|
||||||
|
repo_name = self.random_name()
|
||||||
|
self.check_equal(self.post(
|
||||||
|
"/api/repos", json={"Name": repo_name, "DefaultDistribution": "wheezy"}).status_code, 201)
|
||||||
|
|
||||||
|
d = self.random_name()
|
||||||
|
self.check_equal(
|
||||||
|
self.upload("/api/files/" + d,
|
||||||
|
"pyspi_0.6.1-1.3.dsc",
|
||||||
|
"pyspi_0.6.1-1.3.diff.gz", "pyspi_0.6.1.orig.tar.gz",
|
||||||
|
"pyspi-0.6.1-1.3.stripped.dsc").status_code, 200)
|
||||||
|
task = self.post_task("/api/repos/" + repo_name + "/file/" + d)
|
||||||
|
self.check_task(task)
|
||||||
|
|
||||||
|
snapshot1_name = self.random_name()
|
||||||
|
task = self.post_task("/api/repos/" + repo_name + '/snapshots', json={'Name': snapshot1_name})
|
||||||
|
self.check_task(task)
|
||||||
|
|
||||||
|
prefix = self.random_name()
|
||||||
|
task = self.post_task(
|
||||||
|
"/api/publish/" + prefix,
|
||||||
|
json={
|
||||||
|
"Architectures": ["i386", "source"],
|
||||||
|
"SourceKind": "snapshot",
|
||||||
|
"Sources": [{"Name": snapshot1_name}],
|
||||||
|
"Signing": DefaultSigningOptions,
|
||||||
|
})
|
||||||
|
self.check_task(task)
|
||||||
|
|
||||||
|
repo_expected = {
|
||||||
|
'AcquireByHash': False,
|
||||||
|
'Architectures': ['i386', 'source'],
|
||||||
|
'Codename': '',
|
||||||
|
'Distribution': 'wheezy',
|
||||||
|
'Label': '',
|
||||||
|
'NotAutomatic': '',
|
||||||
|
'ButAutomaticUpgrades': '',
|
||||||
|
'Origin': '',
|
||||||
|
'Version': '',
|
||||||
|
'Path': prefix + '/' + 'wheezy',
|
||||||
|
'Prefix': prefix,
|
||||||
|
'SignedBy': '',
|
||||||
|
'SkipContents': False,
|
||||||
|
'MultiDist': False,
|
||||||
|
'SourceKind': 'snapshot',
|
||||||
|
'Sources': [{'Component': 'main', 'Name': snapshot1_name}],
|
||||||
|
'Storage': '',
|
||||||
|
'Suite': ''}
|
||||||
|
all_repos = self.get("/api/publish")
|
||||||
|
self.check_equal(all_repos.status_code, 200)
|
||||||
|
self.check_in(repo_expected, all_repos.json())
|
||||||
|
|
||||||
|
self.check_not_exists(
|
||||||
|
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
|
||||||
|
self.check_exists("public/" + prefix +
|
||||||
|
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
|
||||||
|
|
||||||
|
snapshot2_name = self.random_name()
|
||||||
|
task = self.post_task("/api/snapshots", json={"Name": snapshot2_name, 'SourceSnapshots': [snapshot1_name]})
|
||||||
|
self.check_task(task)
|
||||||
|
|
||||||
|
task = self.put_task(
|
||||||
|
"/api/publish/" + prefix + "/wheezy",
|
||||||
|
json={
|
||||||
|
"Snapshots": [{"Component": "main", "Name": snapshot2_name}],
|
||||||
|
"Signing": DefaultSigningOptions,
|
||||||
|
"SkipContents": True,
|
||||||
|
"Label": "fun",
|
||||||
|
"Origin": "earth",
|
||||||
|
"Version": "13.3",
|
||||||
|
})
|
||||||
|
self.check_task(task)
|
||||||
|
repo_expected = {
|
||||||
|
'AcquireByHash': False,
|
||||||
|
'Architectures': ['i386', 'source'],
|
||||||
|
'Codename': '',
|
||||||
|
'Distribution': 'wheezy',
|
||||||
|
'Label': 'fun',
|
||||||
|
'Origin': 'earth',
|
||||||
|
'Version': '13.3',
|
||||||
|
'NotAutomatic': '',
|
||||||
|
'ButAutomaticUpgrades': '',
|
||||||
|
'Path': prefix + '/' + 'wheezy',
|
||||||
|
'Prefix': prefix,
|
||||||
|
'SignedBy': '',
|
||||||
|
'SkipContents': True,
|
||||||
|
'MultiDist': False,
|
||||||
|
'SourceKind': 'snapshot',
|
||||||
|
'Sources': [{'Component': 'main', 'Name': snapshot2_name}],
|
||||||
|
'Storage': '',
|
||||||
|
'Suite': ''}
|
||||||
|
|
||||||
|
all_repos = self.get("/api/publish")
|
||||||
|
self.check_equal(all_repos.status_code, 200)
|
||||||
|
self.check_in(repo_expected, all_repos.json())
|
||||||
|
|
||||||
|
# FIXME: what should exist here ? publish snapshot of snapshot
|
||||||
|
self.check_not_exists(
|
||||||
|
"public/" + prefix + "/pool/main/b/boost-defaults/libboost-program-options-dev_1.49.0.1_i386.deb")
|
||||||
|
self.check_not_exists("public/" + prefix +
|
||||||
|
"/pool/main/p/pyspi/pyspi-0.6.1-1.3.stripped.dsc")
|
||||||
|
|
||||||
|
task = self.delete_task("/api/publish/" + prefix + "/wheezy")
|
||||||
|
self.check_task(task)
|
||||||
|
self.check_not_exists("public/" + prefix + "dists/")
|
||||||
|
|
||||||
|
|
||||||
class PublishSwitchAPITestRepoSignedBy(APITest):
|
class PublishSwitchAPITestRepoSignedBy(APITest):
|
||||||
"""
|
"""
|
||||||
PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution
|
PUT /publish/:prefix/:distribution (snapshots), DELETE /publish/:prefix/:distribution
|
||||||
|
|||||||
+4
-4
@@ -65,7 +65,7 @@ func (list *List) consumer() {
|
|||||||
task.State = SUCCEEDED
|
task.State = SUCCEEDED
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("RACE DEBUG: Task %s done, freeing %s\n", task.Name, task.resources)
|
fmt.Printf("RACE DEBUG: Task Done '%s', freeing %s\n", task.Name, task.resources)
|
||||||
list.usedResources.Free(task.resources)
|
list.usedResources.Free(task.resources)
|
||||||
|
|
||||||
task.wgTask.Done()
|
task.wgTask.Done()
|
||||||
@@ -79,7 +79,7 @@ func (list *List) consumer() {
|
|||||||
if len(blockingTasks) == 0 {
|
if len(blockingTasks) == 0 {
|
||||||
list.usedResources.MarkInUse(t.resources, t)
|
list.usedResources.MarkInUse(t.resources, t)
|
||||||
|
|
||||||
fmt.Printf("RACE DEBUG: Starting queued task %s, using %s\n", t.Name, t.resources)
|
fmt.Printf("RACE DEBUG: Task Resuming '%s', locking %s\n", t.Name, t.resources)
|
||||||
// unlock list since queueing may block
|
// unlock list since queueing may block
|
||||||
list.Unlock()
|
list.Unlock()
|
||||||
unlocked = true
|
unlocked = true
|
||||||
@@ -212,12 +212,12 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
|
|||||||
tasks := list.usedResources.UsedBy(resources)
|
tasks := list.usedResources.UsedBy(resources)
|
||||||
if len(tasks) == 0 {
|
if len(tasks) == 0 {
|
||||||
list.usedResources.MarkInUse(task.resources, task)
|
list.usedResources.MarkInUse(task.resources, task)
|
||||||
fmt.Printf("RACE DEBUG: Starting task %s, using %s\n", name, resources)
|
fmt.Printf("RACE DEBUG: Task Starting '%s', locking %s\n", name, resources)
|
||||||
// queueing task might block if channel not ready, unlock list before queueing
|
// queueing task might block if channel not ready, unlock list before queueing
|
||||||
list.Unlock()
|
list.Unlock()
|
||||||
list.queue <- task
|
list.queue <- task
|
||||||
} else {
|
} else {
|
||||||
fmt.Printf("RACE DEBUG: Queued task %s, locked %s\n", name, resources)
|
fmt.Printf("RACE DEBUG: Task Queued '%s', waiting on %s\n", name, resources)
|
||||||
list.Unlock()
|
list.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user