Consistently use transactions to update database

For any action which is multi-step (requires updating more than 1 DB
key), use transaction to make update atomic.

Also pack big chunks of updates (importing packages for importing and
mirror updates) into single transaction to improve aptly performance and
get some isolation.

Note that still layers up (Collections) provide some level of isolation,
so this is going to shine with the future PRs to remove collection
locks.

Spin-off of #459
This commit is contained in:
Andrey Smirnov
2019-08-09 22:34:55 +03:00
committed by Andrey Smirnov
parent 67e38955ae
commit 77d7c3871a
19 changed files with 187 additions and 68 deletions

View File

@@ -327,7 +327,7 @@ func apiReposPackageFromDir(c *gin.Context) {
}
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
context.CollectionFactory().PackageCollection(), reporter, nil, context.CollectionFactory().ChecksumCollection())
context.CollectionFactory().PackageCollection(), reporter, nil, context.CollectionFactory().ChecksumCollection)
failedFiles = append(failedFiles, failedFiles2...)
processedFiles = append(processedFiles, otherFiles...)
@@ -420,7 +420,7 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
_, failedFiles2, err = deb.ImportChangesFiles(
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
repoTemplateString, context.Progress(), localRepoCollection, context.CollectionFactory().PackageCollection(),
context.PackagePool(), context.CollectionFactory().ChecksumCollection(), nil, query.Parse)
context.PackagePool(), context.CollectionFactory().ChecksumCollection, nil, query.Parse)
failedFiles = append(failedFiles, failedFiles2...)
if err != nil {

View File

@@ -7,6 +7,7 @@ import (
"io"
"os"
"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/utils"
)
@@ -134,6 +135,9 @@ type Downloader interface {
GetLength(ctx context.Context, url string) (int64, error)
}
// ChecksumStorageProvider creates ChecksumStorage based on DB
type ChecksumStorageProvider func(db database.ReaderWriter) ChecksumStorage
// ChecksumStorage is stores checksums in some (persistent) storage
type ChecksumStorage interface {
// Get finds checksums in DB by path

View File

@@ -84,7 +84,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
context.Progress().Printf("Building download queue...\n")
queue, downloadSize, err = repo.BuildDownloadQueue(context.PackagePool(), context.CollectionFactory().PackageCollection(),
context.CollectionFactory().ChecksumCollection(), skipExistingPackages)
context.CollectionFactory().ChecksumCollection(nil), skipExistingPackages)
if err != nil {
return fmt.Errorf("unable to update: %s", err)
@@ -210,7 +210,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
}
// and import it back to the pool
task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, context.CollectionFactory().ChecksumCollection())
task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, context.CollectionFactory().ChecksumCollection(nil))
if err != nil {
return fmt.Errorf("unable to import file: %s", err)
}

View File

@@ -49,7 +49,7 @@ func aptlyRepoAdd(cmd *commander.Command, args []string) error {
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
context.CollectionFactory().PackageCollection(), &aptly.ConsoleResultReporter{Progress: context.Progress()}, nil,
context.CollectionFactory().ChecksumCollection())
context.CollectionFactory().ChecksumCollection)
failedFiles = append(failedFiles, failedFiles2...)
if err != nil {
return fmt.Errorf("unable to import package files: %s", err)

View File

@@ -56,7 +56,7 @@ func aptlyRepoInclude(cmd *commander.Command, args []string) error {
_, failedFiles2, err = deb.ImportChangesFiles(
changesFiles, reporter, acceptUnsigned, ignoreSignatures, forceReplace, noRemoveFiles, verifier, repoTemplateString,
context.Progress(), context.CollectionFactory().LocalRepoCollection(), context.CollectionFactory().PackageCollection(),
context.PackagePool(), context.CollectionFactory().ChecksumCollection(),
context.PackagePool(), context.CollectionFactory().ChecksumCollection,
uploaders, query.Parse)
failedFiles = append(failedFiles, failedFiles2...)

View File

@@ -30,6 +30,12 @@ type Writer interface {
Delete(key []byte) error
}
// ReaderWriter combines Reader and Writer
type ReaderWriter interface {
Reader
Writer
}
// Storage is an interface to KV storage
type Storage interface {
Reader

View File

@@ -294,7 +294,7 @@ func CollectChangesFiles(locations []string, reporter aptly.ResultReporter) (cha
// ImportChangesFiles imports referenced files in changes files into local repository
func ImportChangesFiles(changesFiles []string, reporter aptly.ResultReporter, acceptUnsigned, ignoreSignatures, forceReplace, noRemoveFiles bool,
verifier pgp.Verifier, repoTemplateString string, progress aptly.Progress, localRepoCollection *LocalRepoCollection, packageCollection *PackageCollection,
pool aptly.PackagePool, checksumStorage aptly.ChecksumStorage, uploaders *Uploaders, parseQuery parseQuery) (processedFiles []string, failedFiles []string, err error) {
pool aptly.PackagePool, checksumStorageProvider aptly.ChecksumStorageProvider, uploaders *Uploaders, parseQuery parseQuery) (processedFiles []string, failedFiles []string, err error) {
var repoTemplate *template.Template
repoTemplate, err = template.New("repo").Parse(repoTemplateString)
@@ -384,7 +384,7 @@ func ImportChangesFiles(changesFiles []string, reporter aptly.ResultReporter, ac
var processedFiles2, failedFiles2 []string
processedFiles2, failedFiles2, err = ImportPackageFiles(list, packageFiles, forceReplace, verifier, pool,
packageCollection, reporter, restriction, checksumStorage)
packageCollection, reporter, restriction, checksumStorageProvider)
if err != nil {
return nil, nil, fmt.Errorf("unable to import package files: %s", err)

View File

@@ -123,7 +123,7 @@ func (s *ChangesSuite) TestImportChangesFiles(c *C) {
processedFiles, failedFiles, err := ImportChangesFiles(
append(changesFiles, "testdata/changes/notexistent.changes"),
s.Reporter, true, true, false, false, &NullVerifier{},
"test", s.progress, s.localRepoCollection, s.packageCollection, s.packagePool, s.checksumStorage,
"test", s.progress, s.localRepoCollection, s.packageCollection, s.packagePool, func(database.ReaderWriter) aptly.ChecksumStorage { return s.checksumStorage },
nil, nil)
c.Assert(err, IsNil)
c.Check(failedFiles, DeepEquals, append(expectedFailedFiles, "testdata/changes/notexistent.changes"))

View File

@@ -11,12 +11,12 @@ import (
// ChecksumCollection does management of ChecksumInfo in DB
type ChecksumCollection struct {
db database.Storage
db database.ReaderWriter
codecHandle *codec.MsgpackHandle
}
// NewChecksumCollection creates new ChecksumCollection and binds it to database
func NewChecksumCollection(db database.Storage) *ChecksumCollection {
func NewChecksumCollection(db database.ReaderWriter) *ChecksumCollection {
return &ChecksumCollection{
db: db,
codecHandle: &codec.MsgpackHandle{},

View File

@@ -3,6 +3,7 @@ package deb
import (
"sync"
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/database"
)
@@ -91,10 +92,14 @@ func (factory *CollectionFactory) PublishedRepoCollection() *PublishedRepoCollec
}
// ChecksumCollection returns (or creates) new ChecksumCollection
func (factory *CollectionFactory) ChecksumCollection() *ChecksumCollection {
func (factory *CollectionFactory) ChecksumCollection(db database.ReaderWriter) aptly.ChecksumStorage {
factory.Lock()
defer factory.Unlock()
if db != nil {
return NewChecksumCollection(db)
}
if factory.checksums == nil {
factory.checksums = NewChecksumCollection(factory.db)
}

View File

@@ -66,11 +66,19 @@ func CollectPackageFiles(locations []string, reporter aptly.ResultReporter) (pac
// ImportPackageFiles imports files into local repository
func ImportPackageFiles(list *PackageList, packageFiles []string, forceReplace bool, verifier pgp.Verifier,
pool aptly.PackagePool, collection *PackageCollection, reporter aptly.ResultReporter, restriction PackageQuery,
checksumStorage aptly.ChecksumStorage) (processedFiles []string, failedFiles []string, err error) {
checksumStorageProvider aptly.ChecksumStorageProvider) (processedFiles []string, failedFiles []string, err error) {
if forceReplace {
list.PrepareIndex()
}
transaction, err := collection.db.OpenTransaction()
if err != nil {
return nil, nil, err
}
defer transaction.Discard()
checksumStorage := checksumStorageProvider(transaction)
for _, file := range packageFiles {
var (
stanza Stanza
@@ -193,7 +201,7 @@ func ImportPackageFiles(list *PackageList, packageFiles []string, forceReplace b
continue
}
err = collection.Update(p)
err = collection.UpdateInTransaction(p, transaction)
if err != nil {
reporter.Warning("Unable to save package %s: %s", p, err)
failedFiles = append(failedFiles, file)
@@ -219,6 +227,6 @@ func ImportPackageFiles(list *PackageList, packageFiles []string, forceReplace b
processedFiles = append(processedFiles, candidateProcessedFiles...)
}
err = nil
err = transaction.Commit()
return
}

View File

@@ -161,17 +161,23 @@ func (collection *LocalRepoCollection) Add(repo *LocalRepo) error {
// Update stores updated information about repo in DB
func (collection *LocalRepoCollection) Update(repo *LocalRepo) error {
err := collection.db.Put(repo.Key(), repo.Encode())
transaction, err := collection.db.OpenTransaction()
if err != nil {
return err
}
defer transaction.Discard()
err = transaction.Put(repo.Key(), repo.Encode())
if err != nil {
return err
}
if repo.packageRefs != nil {
err = collection.db.Put(repo.RefKey(), repo.packageRefs.Encode())
err = transaction.Put(repo.RefKey(), repo.packageRefs.Encode())
if err != nil {
return err
}
}
return nil
return transaction.Commit()
}
// LoadComplete loads additional information for local repo
@@ -245,16 +251,28 @@ func (collection *LocalRepoCollection) Len() int {
// Drop removes remote repo from collection
func (collection *LocalRepoCollection) Drop(repo *LocalRepo) error {
if _, err := collection.db.Get(repo.Key()); err == database.ErrNotFound {
panic("local repo not found!")
}
delete(collection.cache, repo.UUID)
err := collection.db.Delete(repo.Key())
transaction, err := collection.db.OpenTransaction()
if err != nil {
return err
}
defer transaction.Discard()
return collection.db.Delete(repo.RefKey())
delete(collection.cache, repo.UUID)
if _, err = transaction.Get(repo.Key()); err != nil {
if err == database.ErrNotFound {
return errors.New("local repo not found")
}
return err
}
if err = transaction.Delete(repo.Key()); err != nil {
return err
}
if err = transaction.Delete(repo.RefKey()); err != nil {
return err
}
return transaction.Commit()
}

View File

@@ -199,5 +199,5 @@ func (s *LocalRepoCollectionSuite) TestDrop(c *C) {
r2, _ := collection.ByName("local2")
c.Check(r2.String(), Equals, repo2.String())
c.Check(func() { s.collection.Drop(repo1) }, Panics, "local repo not found!")
c.Check(s.collection.Drop(repo1), ErrorMatches, "local repo not found")
}

View File

@@ -201,8 +201,23 @@ func (collection *PackageCollection) loadContents(p *Package, packagePool aptly.
return contents
}
// Update adds or updates information about package in DB checking for conficts first
// Update adds or updates information about package in DB
func (collection *PackageCollection) Update(p *Package) error {
transaction, err := collection.db.OpenTransaction()
if err != nil {
return err
}
defer transaction.Discard()
if err = collection.UpdateInTransaction(p, transaction); err != nil {
return err
}
return transaction.Commit()
}
// UpdateInTransaction updates/creates package info in the context of the outer transaction
func (collection *PackageCollection) UpdateInTransaction(p *Package, transaction database.Transaction) error {
var encodeBuffer bytes.Buffer
encoder := codec.NewEncoder(&encodeBuffer, collection.codecHandle)
@@ -210,12 +225,11 @@ func (collection *PackageCollection) Update(p *Package) error {
encodeBuffer.Reset()
encodeBuffer.WriteByte(0xc1)
encodeBuffer.WriteByte(0x1)
err := encoder.Encode(p)
if err != nil {
if err := encoder.Encode(p); err != nil {
return err
}
err = collection.db.Put(p.Key(""), encodeBuffer.Bytes())
err := transaction.Put(p.Key(""), encodeBuffer.Bytes())
if err != nil {
return err
}
@@ -228,7 +242,7 @@ func (collection *PackageCollection) Update(p *Package) error {
return err
}
err = collection.db.Put(p.Key("xF"), encodeBuffer.Bytes())
err = transaction.Put(p.Key("xF"), encodeBuffer.Bytes())
if err != nil {
return err
}
@@ -241,7 +255,7 @@ func (collection *PackageCollection) Update(p *Package) error {
return err
}
err = collection.db.Put(p.Key("xD"), encodeBuffer.Bytes())
err = transaction.Put(p.Key("xD"), encodeBuffer.Bytes())
if err != nil {
return err
}
@@ -256,7 +270,7 @@ func (collection *PackageCollection) Update(p *Package) error {
return err
}
err = collection.db.Put(p.Key("xE"), encodeBuffer.Bytes())
err = transaction.Put(p.Key("xE"), encodeBuffer.Bytes())
if err != nil {
return err
}

View File

@@ -914,21 +914,27 @@ func (collection *PublishedRepoCollection) CheckDuplicate(repo *PublishedRepo) *
}
// Update stores updated information about repo in DB
func (collection *PublishedRepoCollection) Update(repo *PublishedRepo) (err error) {
err = collection.db.Put(repo.Key(), repo.Encode())
func (collection *PublishedRepoCollection) Update(repo *PublishedRepo) error {
transaction, err := collection.db.OpenTransaction()
if err != nil {
return
return err
}
defer transaction.Discard()
err = transaction.Put(repo.Key(), repo.Encode())
if err != nil {
return err
}
if repo.SourceKind == SourceLocalRepo {
for component, item := range repo.sourceItems {
err = collection.db.Put(repo.RefKey(component), item.packageRefs.Encode())
err = transaction.Put(repo.RefKey(component), item.packageRefs.Encode())
if err != nil {
return
return err
}
}
}
return
return transaction.Commit()
}
// LoadComplete loads additional information for remote repo
@@ -1170,6 +1176,13 @@ func (collection *PublishedRepoCollection) Remove(publishedStorageProvider aptly
storage, prefix, distribution string, collectionFactory *CollectionFactory, progress aptly.Progress,
force, skipCleanup bool) error {
transaction, err := collection.db.OpenTransaction()
if err != nil {
return err
}
defer transaction.Discard()
// TODO: load via transaction
collection.loadList()
repo, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
@@ -1221,17 +1234,17 @@ func (collection *PublishedRepoCollection) Remove(publishedStorageProvider aptly
}
}
err = collection.db.Delete(repo.Key())
err = transaction.Delete(repo.Key())
if err != nil {
return err
}
for _, component := range repo.Components() {
err = collection.db.Delete(repo.RefKey(component))
err = transaction.Delete(repo.RefKey(component))
if err != nil {
return err
}
}
return nil
return transaction.Commit()
}

View File

@@ -617,34 +617,44 @@ func (repo *RemoteRepo) BuildDownloadQueue(packagePool aptly.PackagePool, packag
// FinalizeDownload swaps for final value of package refs
func (repo *RemoteRepo) FinalizeDownload(collectionFactory *CollectionFactory, progress aptly.Progress) error {
transaction, err := collectionFactory.PackageCollection().db.OpenTransaction()
if err != nil {
return err
}
defer transaction.Discard()
repo.LastDownloadDate = time.Now()
if progress != nil {
progress.InitBar(int64(repo.packageList.Len()), true)
progress.InitBar(int64(repo.packageList.Len()), false)
}
var i int
// update all the packages in collection
err := repo.packageList.ForEach(func(p *Package) error {
err = repo.packageList.ForEach(func(p *Package) error {
i++
if progress != nil {
progress.SetBar(i)
}
// download process might have updated checksums
p.UpdateFiles(p.Files())
return collectionFactory.PackageCollection().Update(p)
return collectionFactory.PackageCollection().UpdateInTransaction(p, transaction)
})
repo.packageRefs = NewPackageRefListFromPackageList(repo.packageList)
if err == nil {
repo.packageRefs = NewPackageRefListFromPackageList(repo.packageList)
repo.packageList = nil
}
if progress != nil {
progress.ShutdownBar()
}
repo.packageList = nil
return err
if err != nil {
return err
}
return transaction.Commit()
}
// Encode does msgpack encoding of RemoteRepo
@@ -795,17 +805,24 @@ func (collection *RemoteRepoCollection) Add(repo *RemoteRepo) error {
// Update stores updated information about repo in DB
func (collection *RemoteRepoCollection) Update(repo *RemoteRepo) error {
err := collection.db.Put(repo.Key(), repo.Encode())
transaction, err := collection.db.OpenTransaction()
if err != nil {
return err
}
defer transaction.Discard()
err = transaction.Put(repo.Key(), repo.Encode())
if err != nil {
return err
}
if repo.packageRefs != nil {
err = collection.db.Put(repo.RefKey(), repo.packageRefs.Encode())
err = transaction.Put(repo.RefKey(), repo.packageRefs.Encode())
if err != nil {
return err
}
}
return nil
return transaction.Commit()
}
// LoadComplete loads additional information for remote repo
@@ -878,16 +895,29 @@ func (collection *RemoteRepoCollection) Len() int {
// Drop removes remote repo from collection
func (collection *RemoteRepoCollection) Drop(repo *RemoteRepo) error {
if _, err := collection.db.Get(repo.Key()); err == database.ErrNotFound {
panic("repo not found!")
transaction, err := collection.db.OpenTransaction()
if err != nil {
return err
}
defer transaction.Discard()
if _, err = transaction.Get(repo.Key()); err != nil {
if err == database.ErrNotFound {
return errors.New("repo not found")
}
return err
}
delete(collection.cache, repo.UUID)
err := collection.db.Delete(repo.Key())
if err != nil {
if err = transaction.Delete(repo.Key()); err != nil {
return err
}
return collection.db.Delete(repo.RefKey())
if err = transaction.Delete(repo.RefKey()); err != nil {
return err
}
return transaction.Commit()
}

View File

@@ -774,7 +774,7 @@ func (s *RemoteRepoCollectionSuite) TestDrop(c *C) {
r2, _ := collection.ByName("tyndex")
c.Check(r2.String(), Equals, repo2.String())
c.Check(func() { s.collection.Drop(repo1) }, Panics, "repo not found!")
c.Check(s.collection.Drop(repo1), ErrorMatches, "repo not found")
}
const exampleReleaseFile = `Origin: LP-PPA-agenda-developers-daily

View File

@@ -216,14 +216,22 @@ func (collection *SnapshotCollection) Add(snapshot *Snapshot) error {
// Update stores updated information about snapshot in DB
func (collection *SnapshotCollection) Update(snapshot *Snapshot) error {
err := collection.db.Put(snapshot.Key(), snapshot.Encode())
transaction, err := collection.db.OpenTransaction()
if err != nil {
return err
}
defer transaction.Discard()
err = transaction.Put(snapshot.Key(), snapshot.Encode())
if err != nil {
return err
}
if snapshot.packageRefs != nil {
return collection.db.Put(snapshot.RefKey(), snapshot.packageRefs.Encode())
if err = transaction.Put(snapshot.RefKey(), snapshot.packageRefs.Encode()); err != nil {
return err
}
}
return nil
return transaction.Commit()
}
// LoadComplete loads additional information about snapshot
@@ -379,18 +387,31 @@ func (collection *SnapshotCollection) Len() int {
// Drop removes snapshot from collection
func (collection *SnapshotCollection) Drop(snapshot *Snapshot) error {
if _, err := collection.db.Get(snapshot.Key()); err == database.ErrNotFound {
panic("snapshot not found!")
transaction, err := collection.db.OpenTransaction()
if err != nil {
return err
}
defer transaction.Discard()
if _, err = transaction.Get(snapshot.Key()); err != nil {
if err == database.ErrNotFound {
return errors.New("snapshot not found")
}
return err
}
delete(collection.cache, snapshot.UUID)
err := collection.db.Delete(snapshot.Key())
if err != nil {
if err = transaction.Delete(snapshot.Key()); err != nil {
return err
}
return collection.db.Delete(snapshot.RefKey())
if err = transaction.Delete(snapshot.RefKey()); err != nil {
return err
}
return transaction.Commit()
}
// Snapshot sorting methods

View File

@@ -280,5 +280,5 @@ func (s *SnapshotCollectionSuite) TestDrop(c *C) {
_, err = collection.ByUUID(s.snapshot1.UUID)
c.Check(err, ErrorMatches, "snapshot .* not found")
c.Check(func() { s.collection.Drop(s.snapshot1) }, Panics, "snapshot not found!")
c.Check(s.collection.Drop(s.snapshot1), ErrorMatches, "snapshot not found")
}