every go routine needs to have its own collection factory

this is needed so concurrent reads and writes are possible.
This commit is contained in:
Oliver Sauder
2016-11-18 15:46:49 +01:00
committed by Lorenzo Bolla
parent 4a6d53e16d
commit 208a2151c1
48 changed files with 305 additions and 387 deletions
+2 -41
View File
@@ -4,7 +4,6 @@ package api
import (
"fmt"
"sort"
"time"
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/deb"
@@ -35,43 +34,6 @@ type dbRequest struct {
err chan<- error
}
// Flushes all collections which cache in-memory objects
func flushColections() {
// lock everything to eliminate in-progress calls
r := context.CollectionFactory().RemoteRepoCollection()
r.Lock()
defer r.Unlock()
l := context.CollectionFactory().LocalRepoCollection()
l.Lock()
defer l.Unlock()
s := context.CollectionFactory().SnapshotCollection()
s.Lock()
defer s.Unlock()
p := context.CollectionFactory().PublishedRepoCollection()
p.Lock()
defer p.Unlock()
// all collections locked, flush them
context.CollectionFactory().Flush()
}
// Periodically flushes CollectionFactory to free up memory used by
// collections, flushing caches.
//
// Should be run in goroutine!
func cacheFlusher() {
ticker := time.Tick(15 * time.Minute)
for {
<-ticker
flushColections()
}
}
// Acquire database lock and release it when not needed anymore.
//
// Should be run in a goroutine!
@@ -94,7 +56,6 @@ func acquireDatabase(requests <-chan dbRequest) {
case releasedb:
clients--
if clients == 0 {
flushColections()
err = context.CloseDatabase()
} else {
err = nil
@@ -107,10 +68,10 @@ func acquireDatabase(requests <-chan dbRequest) {
// Common piece of code to show list of packages,
// with searching & details if requested
func showPackages(c *gin.Context, reflist *deb.PackageRefList) {
func showPackages(c *gin.Context, reflist *deb.PackageRefList, collectionFactory *deb.CollectionFactory) {
result := []*deb.Package{}
list, err := deb.NewPackageListFromRefList(reflist, context.CollectionFactory().PackageCollection(), nil)
list, err := deb.NewPackageListFromRefList(reflist, collectionFactory.PackageCollection(), nil)
if err != nil {
c.AbortWithError(404, err)
return
+1 -11
View File
@@ -21,17 +21,7 @@ func apiGraph(c *gin.Context) {
ext := c.Params.ByName("ext")
layout := c.Request.URL.Query().Get("layout")
factory := context.CollectionFactory()
factory.RemoteRepoCollection().Lock()
defer factory.RemoteRepoCollection().Unlock()
factory.LocalRepoCollection().Lock()
defer factory.LocalRepoCollection().Unlock()
factory.SnapshotCollection().Lock()
defer factory.SnapshotCollection().Unlock()
factory.PublishedRepoCollection().Lock()
defer factory.PublishedRepoCollection().Unlock()
factory := context.NewCollectionFactory()
graph, err := deb.BuildGraph(factory, layout)
if err != nil {
+2 -1
View File
@@ -6,7 +6,8 @@ import (
// GET /api/packages/:key
func apiPackagesShow(c *gin.Context) {
p, err := context.CollectionFactory().PackageCollection().ByKey([]byte(c.Params.ByName("key")))
collectionFactory := context.NewCollectionFactory()
p, err := collectionFactory.PackageCollection().ByKey([]byte(c.Params.ByName("key")))
if err != nil {
c.AbortWithError(404, err)
return
+19 -46
View File
@@ -51,22 +51,13 @@ func parseEscapedPath(path string) string {
// GET /publish
func apiPublishList(c *gin.Context) {
localCollection := context.CollectionFactory().LocalRepoCollection()
localCollection.RLock()
defer localCollection.RUnlock()
snapshotCollection := context.CollectionFactory().SnapshotCollection()
snapshotCollection.RLock()
defer snapshotCollection.RUnlock()
collection := context.CollectionFactory().PublishedRepoCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
result := make([]*deb.PublishedRepo, 0, collection.Len())
err := collection.ForEach(func(repo *deb.PublishedRepo) error {
err := collection.LoadComplete(repo, context.CollectionFactory())
err := collection.LoadComplete(repo, collectionFactory)
if err != nil {
return err
}
@@ -124,13 +115,12 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
var components []string
var sources []interface{}
collectionFactory := context.NewCollectionFactory()
if b.SourceKind == "snapshot" {
var snapshot *deb.Snapshot
snapshotCollection := context.CollectionFactory().SnapshotCollection()
snapshotCollection.Lock()
defer snapshotCollection.Unlock()
snapshotCollection := collectionFactory.SnapshotCollection()
for _, source := range b.Sources {
components = append(components, source.Component)
@@ -152,9 +142,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
} else if b.SourceKind == deb.SourceLocalRepo {
var localRepo *deb.LocalRepo
localCollection := context.CollectionFactory().LocalRepoCollection()
localCollection.Lock()
defer localCollection.Unlock()
localCollection := collectionFactory.LocalRepoCollection()
for _, source := range b.Sources {
components = append(components, source.Component)
@@ -177,11 +165,11 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
return
}
collection := context.CollectionFactory().PublishedRepoCollection()
collection := collectionFactory.PublishedRepoCollection()
collection.Lock()
defer collection.Unlock()
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, context.CollectionFactory())
published, err := deb.NewPublishedRepo(storage, prefix, b.Distribution, b.Architectures, components, sources, collectionFactory)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to publish: %s", err))
return
@@ -208,12 +196,12 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
duplicate := collection.CheckDuplicate(published)
if duplicate != nil {
context.CollectionFactory().PublishedRepoCollection().LoadComplete(duplicate, context.CollectionFactory())
collectionFactory.PublishedRepoCollection().LoadComplete(duplicate, collectionFactory)
c.AbortWithError(400, fmt.Errorf("prefix/distribution already used by another published repo: %s", duplicate))
return
}
err = published.Publish(context.PackagePool(), context, context.CollectionFactory(), signer, nil, b.ForceOverwrite)
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, nil, b.ForceOverwrite)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to publish: %s", err))
return
@@ -256,25 +244,15 @@ func apiPublishUpdateSwitch(c *gin.Context) {
return
}
// published.LoadComplete would touch local repo collection
localRepoCollection := context.CollectionFactory().LocalRepoCollection()
localRepoCollection.Lock()
defer localRepoCollection.Unlock()
snapshotCollection := context.CollectionFactory().SnapshotCollection()
snapshotCollection.Lock()
defer snapshotCollection.Unlock()
collection := context.CollectionFactory().PublishedRepoCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
c.AbortWithError(404, fmt.Errorf("unable to update: %s", err))
return
}
err = collection.LoadComplete(published, context.CollectionFactory())
err = collection.LoadComplete(published, collectionFactory)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to update: %s", err))
return
@@ -299,6 +277,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
return
}
snapshotCollection := collectionFactory.SnapshotCollection()
snapshot, err2 := snapshotCollection.ByName(snapshotInfo.Name)
if err2 != nil {
c.AbortWithError(404, err2)
@@ -327,7 +306,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
published.AcquireByHash = *b.AcquireByHash
}
err = published.Publish(context.PackagePool(), context, context.CollectionFactory(), signer, nil, b.ForceOverwrite)
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, nil, b.ForceOverwrite)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to update: %s", err))
return
@@ -341,7 +320,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
if b.SkipCleanup == nil || !*b.SkipCleanup {
err = collection.CleanupPrefixComponentFiles(published.Prefix, updatedComponents,
context.GetPublishedStorage(storage), context.CollectionFactory(), nil)
context.GetPublishedStorage(storage), collectionFactory, nil)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to update: %s", err))
return
@@ -360,17 +339,11 @@ func apiPublishDrop(c *gin.Context) {
storage, prefix := deb.ParsePrefix(param)
distribution := c.Params.ByName("distribution")
// published.LoadComplete would touch local repo collection
localRepoCollection := context.CollectionFactory().LocalRepoCollection()
localRepoCollection.Lock()
defer localRepoCollection.Unlock()
collection := context.CollectionFactory().PublishedRepoCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.PublishedRepoCollection()
err := collection.Remove(context, storage, prefix, distribution,
context.CollectionFactory(), context.Progress(), force, skipCleanup)
collectionFactory, context.Progress(), force, skipCleanup)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to drop: %s", err))
return
+30 -49
View File
@@ -17,11 +17,9 @@ import (
func apiReposList(c *gin.Context) {
result := []*deb.LocalRepo{}
collection := context.CollectionFactory().LocalRepoCollection()
collection.RLock()
defer collection.RUnlock()
context.CollectionFactory().LocalRepoCollection().ForEach(func(r *deb.LocalRepo) error {
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
collection.ForEach(func(r *deb.LocalRepo) error {
result = append(result, r)
return nil
})
@@ -46,11 +44,9 @@ func apiReposCreate(c *gin.Context) {
repo.DefaultComponent = b.DefaultComponent
repo.DefaultDistribution = b.DefaultDistribution
collection := context.CollectionFactory().LocalRepoCollection()
collection.Lock()
defer collection.Unlock()
err := context.CollectionFactory().LocalRepoCollection().Add(repo)
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
err := collection.Add(repo)
if err != nil {
c.AbortWithError(400, err)
return
@@ -71,9 +67,8 @@ func apiReposEdit(c *gin.Context) {
return
}
collection := context.CollectionFactory().LocalRepoCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
repo, err := collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -102,9 +97,8 @@ func apiReposEdit(c *gin.Context) {
// GET /api/repos/:name
func apiReposShow(c *gin.Context) {
collection := context.CollectionFactory().LocalRepoCollection()
collection.RLock()
defer collection.RUnlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
repo, err := collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -119,17 +113,10 @@ func apiReposShow(c *gin.Context) {
func apiReposDrop(c *gin.Context) {
force := c.Request.URL.Query().Get("force") == "1"
collection := context.CollectionFactory().LocalRepoCollection()
collection.Lock()
defer collection.Unlock()
snapshotCollection := context.CollectionFactory().SnapshotCollection()
snapshotCollection.RLock()
defer snapshotCollection.RUnlock()
publishedCollection := context.CollectionFactory().PublishedRepoCollection()
publishedCollection.RLock()
defer publishedCollection.RUnlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
publishedCollection := collectionFactory.PublishedRepoCollection()
repo, err := collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -162,9 +149,8 @@ func apiReposDrop(c *gin.Context) {
// GET /api/repos/:name/packages
func apiReposPackagesShow(c *gin.Context) {
collection := context.CollectionFactory().LocalRepoCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
repo, err := collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -178,7 +164,7 @@ func apiReposPackagesShow(c *gin.Context) {
return
}
showPackages(c, repo.RefList())
showPackages(c, repo.RefList(), collectionFactory)
}
// Handler for both add and delete
@@ -191,9 +177,8 @@ func apiReposPackagesAddDelete(c *gin.Context, cb func(list *deb.PackageList, p
return
}
collection := context.CollectionFactory().LocalRepoCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
repo, err := collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -207,7 +192,7 @@ func apiReposPackagesAddDelete(c *gin.Context, cb func(list *deb.PackageList, p
return
}
list, err := deb.NewPackageListFromRefList(repo.RefList(), context.CollectionFactory().PackageCollection(), nil)
list, err := deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
if err != nil {
c.AbortWithError(500, err)
return
@@ -217,7 +202,7 @@ func apiReposPackagesAddDelete(c *gin.Context, cb func(list *deb.PackageList, p
for _, ref := range b.PackageRefs {
var p *deb.Package
p, err = context.CollectionFactory().PackageCollection().ByKey([]byte(ref))
p, err = collectionFactory.PackageCollection().ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
c.AbortWithError(404, fmt.Errorf("package %s: %s", ref, err))
@@ -235,7 +220,7 @@ func apiReposPackagesAddDelete(c *gin.Context, cb func(list *deb.PackageList, p
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
err = context.CollectionFactory().LocalRepoCollection().Update(repo)
err = collectionFactory.LocalRepoCollection().Update(repo)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to save: %s", err))
return
@@ -281,9 +266,8 @@ func apiReposPackageFromDir(c *gin.Context) {
return
}
collection := context.CollectionFactory().LocalRepoCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
repo, err := collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -320,14 +304,14 @@ func apiReposPackageFromDir(c *gin.Context) {
packageFiles, otherFiles, failedFiles = deb.CollectPackageFiles(sources, reporter)
list, err = deb.NewPackageListFromRefList(repo.RefList(), context.CollectionFactory().PackageCollection(), nil)
list, err = deb.NewPackageListFromRefList(repo.RefList(), collectionFactory.PackageCollection(), nil)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to load packages: %s", err))
return
}
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
context.CollectionFactory().PackageCollection(), reporter, nil, context.CollectionFactory().ChecksumCollection)
collectionFactory.PackageCollection(), reporter, nil, collectionFactory.ChecksumCollection)
failedFiles = append(failedFiles, failedFiles2...)
processedFiles = append(processedFiles, otherFiles...)
@@ -339,7 +323,7 @@ func apiReposPackageFromDir(c *gin.Context) {
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
err = context.CollectionFactory().LocalRepoCollection().Update(repo)
err = collectionFactory.LocalRepoCollection().Update(repo)
if err != nil {
c.AbortWithError(500, fmt.Errorf("unable to save: %s", err))
return
@@ -412,15 +396,12 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
sources = []string{filepath.Join(context.UploadPath(), c.Params.ByName("dir"), c.Params.ByName("file"))}
}
localRepoCollection := context.CollectionFactory().LocalRepoCollection()
localRepoCollection.Lock()
defer localRepoCollection.Unlock()
collectionFactory := context.NewCollectionFactory()
changesFiles, failedFiles = deb.CollectChangesFiles(sources, reporter)
_, failedFiles2, err = deb.ImportChangesFiles(
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
repoTemplateString, context.Progress(), localRepoCollection, context.CollectionFactory().PackageCollection(),
context.PackagePool(), context.CollectionFactory().ChecksumCollection, nil, query.Parse)
repoTemplateString, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(),
context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse)
failedFiles = append(failedFiles, failedFiles2...)
if err != nil {
-3
View File
@@ -46,9 +46,6 @@ func Router(c *ctx.AptlyContext) http.Handler {
c.Next()
})
} else {
go cacheFlusher()
}
root := router.Group("/api")
+25 -43
View File
@@ -12,9 +12,8 @@ import (
func apiSnapshotsList(c *gin.Context) {
SortMethodString := c.Request.URL.Query().Get("sort")
collection := context.CollectionFactory().SnapshotCollection()
collection.RLock()
defer collection.RUnlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.SnapshotCollection()
if SortMethodString == "" {
SortMethodString = "name"
@@ -46,13 +45,9 @@ func apiSnapshotsCreateFromMirror(c *gin.Context) {
return
}
collection := context.CollectionFactory().RemoteRepoCollection()
collection.Lock()
defer collection.Unlock()
snapshotCollection := context.CollectionFactory().SnapshotCollection()
snapshotCollection.Lock()
defer snapshotCollection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.RemoteRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
repo, err = collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -115,9 +110,8 @@ func apiSnapshotsCreate(c *gin.Context) {
}
}
snapshotCollection := context.CollectionFactory().SnapshotCollection()
snapshotCollection.Lock()
defer snapshotCollection.Unlock()
collectionFactory := context.NewCollectionFactory()
snapshotCollection := collectionFactory.SnapshotCollection()
sources := make([]*deb.Snapshot, len(b.SourceSnapshots))
@@ -141,7 +135,7 @@ func apiSnapshotsCreate(c *gin.Context) {
for _, ref := range b.PackageRefs {
var p *deb.Package
p, err = context.CollectionFactory().PackageCollection().ByKey([]byte(ref))
p, err = collectionFactory.PackageCollection().ByKey([]byte(ref))
if err != nil {
if err == database.ErrNotFound {
c.AbortWithError(404, fmt.Errorf("package %s: %s", ref, err))
@@ -185,13 +179,9 @@ func apiSnapshotsCreateFromRepository(c *gin.Context) {
return
}
collection := context.CollectionFactory().LocalRepoCollection()
collection.Lock()
defer collection.Unlock()
snapshotCollection := context.CollectionFactory().SnapshotCollection()
snapshotCollection.Lock()
defer snapshotCollection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.LocalRepoCollection()
snapshotCollection := collectionFactory.SnapshotCollection()
repo, err = collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -240,9 +230,8 @@ func apiSnapshotsUpdate(c *gin.Context) {
return
}
collection := context.CollectionFactory().SnapshotCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.SnapshotCollection()
snapshot, err = collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -264,7 +253,7 @@ func apiSnapshotsUpdate(c *gin.Context) {
snapshot.Description = b.Description
}
err = context.CollectionFactory().SnapshotCollection().Update(snapshot)
err = collectionFactory.SnapshotCollection().Update(snapshot)
if err != nil {
c.AbortWithError(500, err)
return
@@ -275,9 +264,8 @@ func apiSnapshotsUpdate(c *gin.Context) {
// GET /api/snapshots/:name
func apiSnapshotsShow(c *gin.Context) {
collection := context.CollectionFactory().SnapshotCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.SnapshotCollection()
snapshot, err := collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -299,13 +287,9 @@ func apiSnapshotsDrop(c *gin.Context) {
name := c.Params.ByName("name")
force := c.Request.URL.Query().Get("force") == "1"
snapshotCollection := context.CollectionFactory().SnapshotCollection()
snapshotCollection.Lock()
defer snapshotCollection.Unlock()
publishedCollection := context.CollectionFactory().PublishedRepoCollection()
publishedCollection.RLock()
defer publishedCollection.RUnlock()
collectionFactory := context.NewCollectionFactory()
snapshotCollection := collectionFactory.SnapshotCollection()
publishedCollection := collectionFactory.PublishedRepoCollection()
snapshot, err := snapshotCollection.ByName(name)
if err != nil {
@@ -341,9 +325,8 @@ func apiSnapshotsDrop(c *gin.Context) {
func apiSnapshotsDiff(c *gin.Context) {
onlyMatching := c.Request.URL.Query().Get("onlyMatching") == "1"
collection := context.CollectionFactory().SnapshotCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.SnapshotCollection()
snapshotA, err := collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -370,7 +353,7 @@ func apiSnapshotsDiff(c *gin.Context) {
}
// Calculate diff
diff, err := snapshotA.RefList().Diff(snapshotB.RefList(), context.CollectionFactory().PackageCollection())
diff, err := snapshotA.RefList().Diff(snapshotB.RefList(), collectionFactory.PackageCollection())
if err != nil {
c.AbortWithError(500, err)
return
@@ -391,9 +374,8 @@ func apiSnapshotsDiff(c *gin.Context) {
// GET /api/snapshots/:name/packages
func apiSnapshotsSearchPackages(c *gin.Context) {
collection := context.CollectionFactory().SnapshotCollection()
collection.Lock()
defer collection.Unlock()
collectionFactory := context.NewCollectionFactory()
collection := collectionFactory.SnapshotCollection()
snapshot, err := collection.ByName(c.Params.ByName("name"))
if err != nil {
@@ -407,5 +389,5 @@ func apiSnapshotsSearchPackages(c *gin.Context) {
return
}
showPackages(c, snapshot.RefList())
showPackages(c, snapshot.RefList(), collectionFactory)
}