Rework the way database is open/re-open in aptly

Allow database to be initialized without opening, unify all the
open paths to retry on failure.

In API router make sure open requests are matched with acks in explicit
way.

This also enables re-open attempts in all the aptly commands, so it
should make running aptly CLI much easier now hopefully.

Fix up system tests for oldoldstable ;)
This commit is contained in:
Andrey Smirnov
2017-07-05 00:17:48 +03:00
parent af2f7baf63
commit 211ac0501f
36 changed files with 138 additions and 106 deletions

View File

@@ -23,11 +23,18 @@ func apiVersion(c *gin.Context) {
c.JSON(200, gin.H{"Version": aptly.Version})
}
type dbRequestKind int
const (
acquiredb = iota
acquiredb dbRequestKind = iota
releasedb
)
type dbRequest struct {
kind dbRequestKind
err chan<- error
}
// Flushes all collections which cache in-memory objects
func flushColections() {
// lock everything to eliminate in-progress calls
@@ -52,50 +59,48 @@ func flushColections() {
}
// Periodically flushes CollectionFactory to free up memory used by
// collections, flushing caches. If the two channels are provided,
// they are used to acquire and release the database.
// collections, flushing caches.
//
// Should be run in goroutine!
func cacheFlusher(requests chan int, acks chan error) {
func cacheFlusher() {
ticker := time.Tick(15 * time.Minute)
for {
<-ticker
// if aptly API runs in -no-lock mode,
// caches are flushed when DB is closed anyway, no need
// to flush them here
if requests == nil {
flushColections()
}
flushColections()
}
}
// Acquire database lock and release it when not needed anymore. Two
// channels must be provided. The first one is to receive requests to
// acquire/release the database and the second one is to send acks.
// Acquire database lock and release it when not needed anymore.
//
// Should be run in a goroutine!
func acquireDatabase(requests chan int, acks chan error) {
func acquireDatabase(requests <-chan dbRequest) {
clients := 0
for {
request := <-requests
switch request {
for request := range requests {
var err error
switch request.kind {
case acquiredb:
if clients == 0 {
acks <- context.ReOpenDatabase()
} else {
acks <- nil
err = context.ReOpenDatabase()
}
request.err <- err
if err == nil {
clients++
}
clients++
case releasedb:
clients--
if clients == 0 {
flushColections()
acks <- context.CloseDatabase()
err = context.CloseDatabase()
} else {
acks <- nil
err = nil
}
request.err <- err
}
}
}

View File

@@ -20,35 +20,35 @@ func Router(c *ctx.AptlyContext) http.Handler {
// We use a goroutine to count the number of
// concurrent requests. When no more requests are
// running, we close the database to free the lock.
requests := make(chan int)
acks := make(chan error)
requests := make(chan dbRequest)
go acquireDatabase(requests, acks)
go cacheFlusher(requests, acks)
go acquireDatabase(requests)
router.Use(func(c *gin.Context) {
var err error
requests <- acquiredb
errCh := make(chan error)
requests <- dbRequest{acquiredb, errCh}
err = <-errCh
if err != nil {
c.Fail(500, err)
return
}
defer func() {
requests <- releasedb
err = <-acks
requests <- dbRequest{releasedb, errCh}
err = <-errCh
if err != nil {
c.Fail(500, err)
}
}()
err = <-acks
if err != nil {
c.Fail(500, err)
return
}
c.Next()
})
} else {
go cacheFlusher(nil, nil)
go cacheFlusher()
}
root := router.Group("/api")