mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-05-06 22:18:28 +00:00
Consistently use transactions to update database
For any action which is multi-step (requires updating more than 1 DB key), use transaction to make update atomic. Also pack big chunks of updates (importing packages for importing and mirror updates) into single transaction to improve aptly performance and get some isolation. Note that still layers up (Collections) provide some level of isolation, so this is going to shine with the future PRs to remove collection locks. Spin-off of #459
This commit is contained in:
committed by
Andrey Smirnov
parent
67e38955ae
commit
77d7c3871a
+2
-2
@@ -327,7 +327,7 @@ func apiReposPackageFromDir(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
|
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
|
||||||
context.CollectionFactory().PackageCollection(), reporter, nil, context.CollectionFactory().ChecksumCollection())
|
context.CollectionFactory().PackageCollection(), reporter, nil, context.CollectionFactory().ChecksumCollection)
|
||||||
failedFiles = append(failedFiles, failedFiles2...)
|
failedFiles = append(failedFiles, failedFiles2...)
|
||||||
|
|
||||||
processedFiles = append(processedFiles, otherFiles...)
|
processedFiles = append(processedFiles, otherFiles...)
|
||||||
@@ -420,7 +420,7 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
|
|||||||
_, failedFiles2, err = deb.ImportChangesFiles(
|
_, failedFiles2, err = deb.ImportChangesFiles(
|
||||||
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
|
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
|
||||||
repoTemplateString, context.Progress(), localRepoCollection, context.CollectionFactory().PackageCollection(),
|
repoTemplateString, context.Progress(), localRepoCollection, context.CollectionFactory().PackageCollection(),
|
||||||
context.PackagePool(), context.CollectionFactory().ChecksumCollection(), nil, query.Parse)
|
context.PackagePool(), context.CollectionFactory().ChecksumCollection, nil, query.Parse)
|
||||||
failedFiles = append(failedFiles, failedFiles2...)
|
failedFiles = append(failedFiles, failedFiles2...)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
|
"github.com/aptly-dev/aptly/database"
|
||||||
"github.com/aptly-dev/aptly/utils"
|
"github.com/aptly-dev/aptly/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -134,6 +135,9 @@ type Downloader interface {
|
|||||||
GetLength(ctx context.Context, url string) (int64, error)
|
GetLength(ctx context.Context, url string) (int64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ChecksumStorageProvider creates ChecksumStorage based on DB
|
||||||
|
type ChecksumStorageProvider func(db database.ReaderWriter) ChecksumStorage
|
||||||
|
|
||||||
// ChecksumStorage is stores checksums in some (persistent) storage
|
// ChecksumStorage is stores checksums in some (persistent) storage
|
||||||
type ChecksumStorage interface {
|
type ChecksumStorage interface {
|
||||||
// Get finds checksums in DB by path
|
// Get finds checksums in DB by path
|
||||||
|
|||||||
@@ -84,7 +84,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
|
|||||||
|
|
||||||
context.Progress().Printf("Building download queue...\n")
|
context.Progress().Printf("Building download queue...\n")
|
||||||
queue, downloadSize, err = repo.BuildDownloadQueue(context.PackagePool(), context.CollectionFactory().PackageCollection(),
|
queue, downloadSize, err = repo.BuildDownloadQueue(context.PackagePool(), context.CollectionFactory().PackageCollection(),
|
||||||
context.CollectionFactory().ChecksumCollection(), skipExistingPackages)
|
context.CollectionFactory().ChecksumCollection(nil), skipExistingPackages)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to update: %s", err)
|
return fmt.Errorf("unable to update: %s", err)
|
||||||
@@ -210,7 +210,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// and import it back to the pool
|
// and import it back to the pool
|
||||||
task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, context.CollectionFactory().ChecksumCollection())
|
task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, context.CollectionFactory().ChecksumCollection(nil))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to import file: %s", err)
|
return fmt.Errorf("unable to import file: %s", err)
|
||||||
}
|
}
|
||||||
|
|||||||
+1
-1
@@ -49,7 +49,7 @@ func aptlyRepoAdd(cmd *commander.Command, args []string) error {
|
|||||||
|
|
||||||
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
|
processedFiles, failedFiles2, err = deb.ImportPackageFiles(list, packageFiles, forceReplace, verifier, context.PackagePool(),
|
||||||
context.CollectionFactory().PackageCollection(), &aptly.ConsoleResultReporter{Progress: context.Progress()}, nil,
|
context.CollectionFactory().PackageCollection(), &aptly.ConsoleResultReporter{Progress: context.Progress()}, nil,
|
||||||
context.CollectionFactory().ChecksumCollection())
|
context.CollectionFactory().ChecksumCollection)
|
||||||
failedFiles = append(failedFiles, failedFiles2...)
|
failedFiles = append(failedFiles, failedFiles2...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to import package files: %s", err)
|
return fmt.Errorf("unable to import package files: %s", err)
|
||||||
|
|||||||
+1
-1
@@ -56,7 +56,7 @@ func aptlyRepoInclude(cmd *commander.Command, args []string) error {
|
|||||||
_, failedFiles2, err = deb.ImportChangesFiles(
|
_, failedFiles2, err = deb.ImportChangesFiles(
|
||||||
changesFiles, reporter, acceptUnsigned, ignoreSignatures, forceReplace, noRemoveFiles, verifier, repoTemplateString,
|
changesFiles, reporter, acceptUnsigned, ignoreSignatures, forceReplace, noRemoveFiles, verifier, repoTemplateString,
|
||||||
context.Progress(), context.CollectionFactory().LocalRepoCollection(), context.CollectionFactory().PackageCollection(),
|
context.Progress(), context.CollectionFactory().LocalRepoCollection(), context.CollectionFactory().PackageCollection(),
|
||||||
context.PackagePool(), context.CollectionFactory().ChecksumCollection(),
|
context.PackagePool(), context.CollectionFactory().ChecksumCollection,
|
||||||
uploaders, query.Parse)
|
uploaders, query.Parse)
|
||||||
failedFiles = append(failedFiles, failedFiles2...)
|
failedFiles = append(failedFiles, failedFiles2...)
|
||||||
|
|
||||||
|
|||||||
@@ -30,6 +30,12 @@ type Writer interface {
|
|||||||
Delete(key []byte) error
|
Delete(key []byte) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReaderWriter combines Reader and Writer
|
||||||
|
type ReaderWriter interface {
|
||||||
|
Reader
|
||||||
|
Writer
|
||||||
|
}
|
||||||
|
|
||||||
// Storage is an interface to KV storage
|
// Storage is an interface to KV storage
|
||||||
type Storage interface {
|
type Storage interface {
|
||||||
Reader
|
Reader
|
||||||
|
|||||||
+2
-2
@@ -294,7 +294,7 @@ func CollectChangesFiles(locations []string, reporter aptly.ResultReporter) (cha
|
|||||||
// ImportChangesFiles imports referenced files in changes files into local repository
|
// ImportChangesFiles imports referenced files in changes files into local repository
|
||||||
func ImportChangesFiles(changesFiles []string, reporter aptly.ResultReporter, acceptUnsigned, ignoreSignatures, forceReplace, noRemoveFiles bool,
|
func ImportChangesFiles(changesFiles []string, reporter aptly.ResultReporter, acceptUnsigned, ignoreSignatures, forceReplace, noRemoveFiles bool,
|
||||||
verifier pgp.Verifier, repoTemplateString string, progress aptly.Progress, localRepoCollection *LocalRepoCollection, packageCollection *PackageCollection,
|
verifier pgp.Verifier, repoTemplateString string, progress aptly.Progress, localRepoCollection *LocalRepoCollection, packageCollection *PackageCollection,
|
||||||
pool aptly.PackagePool, checksumStorage aptly.ChecksumStorage, uploaders *Uploaders, parseQuery parseQuery) (processedFiles []string, failedFiles []string, err error) {
|
pool aptly.PackagePool, checksumStorageProvider aptly.ChecksumStorageProvider, uploaders *Uploaders, parseQuery parseQuery) (processedFiles []string, failedFiles []string, err error) {
|
||||||
|
|
||||||
var repoTemplate *template.Template
|
var repoTemplate *template.Template
|
||||||
repoTemplate, err = template.New("repo").Parse(repoTemplateString)
|
repoTemplate, err = template.New("repo").Parse(repoTemplateString)
|
||||||
@@ -384,7 +384,7 @@ func ImportChangesFiles(changesFiles []string, reporter aptly.ResultReporter, ac
|
|||||||
var processedFiles2, failedFiles2 []string
|
var processedFiles2, failedFiles2 []string
|
||||||
|
|
||||||
processedFiles2, failedFiles2, err = ImportPackageFiles(list, packageFiles, forceReplace, verifier, pool,
|
processedFiles2, failedFiles2, err = ImportPackageFiles(list, packageFiles, forceReplace, verifier, pool,
|
||||||
packageCollection, reporter, restriction, checksumStorage)
|
packageCollection, reporter, restriction, checksumStorageProvider)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("unable to import package files: %s", err)
|
return nil, nil, fmt.Errorf("unable to import package files: %s", err)
|
||||||
|
|||||||
+1
-1
@@ -123,7 +123,7 @@ func (s *ChangesSuite) TestImportChangesFiles(c *C) {
|
|||||||
processedFiles, failedFiles, err := ImportChangesFiles(
|
processedFiles, failedFiles, err := ImportChangesFiles(
|
||||||
append(changesFiles, "testdata/changes/notexistent.changes"),
|
append(changesFiles, "testdata/changes/notexistent.changes"),
|
||||||
s.Reporter, true, true, false, false, &NullVerifier{},
|
s.Reporter, true, true, false, false, &NullVerifier{},
|
||||||
"test", s.progress, s.localRepoCollection, s.packageCollection, s.packagePool, s.checksumStorage,
|
"test", s.progress, s.localRepoCollection, s.packageCollection, s.packagePool, func(database.ReaderWriter) aptly.ChecksumStorage { return s.checksumStorage },
|
||||||
nil, nil)
|
nil, nil)
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
c.Check(failedFiles, DeepEquals, append(expectedFailedFiles, "testdata/changes/notexistent.changes"))
|
c.Check(failedFiles, DeepEquals, append(expectedFailedFiles, "testdata/changes/notexistent.changes"))
|
||||||
|
|||||||
@@ -11,12 +11,12 @@ import (
|
|||||||
|
|
||||||
// ChecksumCollection does management of ChecksumInfo in DB
|
// ChecksumCollection does management of ChecksumInfo in DB
|
||||||
type ChecksumCollection struct {
|
type ChecksumCollection struct {
|
||||||
db database.Storage
|
db database.ReaderWriter
|
||||||
codecHandle *codec.MsgpackHandle
|
codecHandle *codec.MsgpackHandle
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewChecksumCollection creates new ChecksumCollection and binds it to database
|
// NewChecksumCollection creates new ChecksumCollection and binds it to database
|
||||||
func NewChecksumCollection(db database.Storage) *ChecksumCollection {
|
func NewChecksumCollection(db database.ReaderWriter) *ChecksumCollection {
|
||||||
return &ChecksumCollection{
|
return &ChecksumCollection{
|
||||||
db: db,
|
db: db,
|
||||||
codecHandle: &codec.MsgpackHandle{},
|
codecHandle: &codec.MsgpackHandle{},
|
||||||
|
|||||||
+6
-1
@@ -3,6 +3,7 @@ package deb
|
|||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/aptly-dev/aptly/aptly"
|
||||||
"github.com/aptly-dev/aptly/database"
|
"github.com/aptly-dev/aptly/database"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -91,10 +92,14 @@ func (factory *CollectionFactory) PublishedRepoCollection() *PublishedRepoCollec
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ChecksumCollection returns (or creates) new ChecksumCollection
|
// ChecksumCollection returns (or creates) new ChecksumCollection
|
||||||
func (factory *CollectionFactory) ChecksumCollection() *ChecksumCollection {
|
func (factory *CollectionFactory) ChecksumCollection(db database.ReaderWriter) aptly.ChecksumStorage {
|
||||||
factory.Lock()
|
factory.Lock()
|
||||||
defer factory.Unlock()
|
defer factory.Unlock()
|
||||||
|
|
||||||
|
if db != nil {
|
||||||
|
return NewChecksumCollection(db)
|
||||||
|
}
|
||||||
|
|
||||||
if factory.checksums == nil {
|
if factory.checksums == nil {
|
||||||
factory.checksums = NewChecksumCollection(factory.db)
|
factory.checksums = NewChecksumCollection(factory.db)
|
||||||
}
|
}
|
||||||
|
|||||||
+11
-3
@@ -66,11 +66,19 @@ func CollectPackageFiles(locations []string, reporter aptly.ResultReporter) (pac
|
|||||||
// ImportPackageFiles imports files into local repository
|
// ImportPackageFiles imports files into local repository
|
||||||
func ImportPackageFiles(list *PackageList, packageFiles []string, forceReplace bool, verifier pgp.Verifier,
|
func ImportPackageFiles(list *PackageList, packageFiles []string, forceReplace bool, verifier pgp.Verifier,
|
||||||
pool aptly.PackagePool, collection *PackageCollection, reporter aptly.ResultReporter, restriction PackageQuery,
|
pool aptly.PackagePool, collection *PackageCollection, reporter aptly.ResultReporter, restriction PackageQuery,
|
||||||
checksumStorage aptly.ChecksumStorage) (processedFiles []string, failedFiles []string, err error) {
|
checksumStorageProvider aptly.ChecksumStorageProvider) (processedFiles []string, failedFiles []string, err error) {
|
||||||
if forceReplace {
|
if forceReplace {
|
||||||
list.PrepareIndex()
|
list.PrepareIndex()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
transaction, err := collection.db.OpenTransaction()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
defer transaction.Discard()
|
||||||
|
|
||||||
|
checksumStorage := checksumStorageProvider(transaction)
|
||||||
|
|
||||||
for _, file := range packageFiles {
|
for _, file := range packageFiles {
|
||||||
var (
|
var (
|
||||||
stanza Stanza
|
stanza Stanza
|
||||||
@@ -193,7 +201,7 @@ func ImportPackageFiles(list *PackageList, packageFiles []string, forceReplace b
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err = collection.Update(p)
|
err = collection.UpdateInTransaction(p, transaction)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
reporter.Warning("Unable to save package %s: %s", p, err)
|
reporter.Warning("Unable to save package %s: %s", p, err)
|
||||||
failedFiles = append(failedFiles, file)
|
failedFiles = append(failedFiles, file)
|
||||||
@@ -219,6 +227,6 @@ func ImportPackageFiles(list *PackageList, packageFiles []string, forceReplace b
|
|||||||
processedFiles = append(processedFiles, candidateProcessedFiles...)
|
processedFiles = append(processedFiles, candidateProcessedFiles...)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = nil
|
err = transaction.Commit()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
+29
-11
@@ -161,17 +161,23 @@ func (collection *LocalRepoCollection) Add(repo *LocalRepo) error {
|
|||||||
|
|
||||||
// Update stores updated information about repo in DB
|
// Update stores updated information about repo in DB
|
||||||
func (collection *LocalRepoCollection) Update(repo *LocalRepo) error {
|
func (collection *LocalRepoCollection) Update(repo *LocalRepo) error {
|
||||||
err := collection.db.Put(repo.Key(), repo.Encode())
|
transaction, err := collection.db.OpenTransaction()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer transaction.Discard()
|
||||||
|
|
||||||
|
err = transaction.Put(repo.Key(), repo.Encode())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if repo.packageRefs != nil {
|
if repo.packageRefs != nil {
|
||||||
err = collection.db.Put(repo.RefKey(), repo.packageRefs.Encode())
|
err = transaction.Put(repo.RefKey(), repo.packageRefs.Encode())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return transaction.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadComplete loads additional information for local repo
|
// LoadComplete loads additional information for local repo
|
||||||
@@ -245,16 +251,28 @@ func (collection *LocalRepoCollection) Len() int {
|
|||||||
|
|
||||||
// Drop removes remote repo from collection
|
// Drop removes remote repo from collection
|
||||||
func (collection *LocalRepoCollection) Drop(repo *LocalRepo) error {
|
func (collection *LocalRepoCollection) Drop(repo *LocalRepo) error {
|
||||||
if _, err := collection.db.Get(repo.Key()); err == database.ErrNotFound {
|
transaction, err := collection.db.OpenTransaction()
|
||||||
panic("local repo not found!")
|
|
||||||
}
|
|
||||||
|
|
||||||
delete(collection.cache, repo.UUID)
|
|
||||||
|
|
||||||
err := collection.db.Delete(repo.Key())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer transaction.Discard()
|
||||||
|
|
||||||
return collection.db.Delete(repo.RefKey())
|
delete(collection.cache, repo.UUID)
|
||||||
|
|
||||||
|
if _, err = transaction.Get(repo.Key()); err != nil {
|
||||||
|
if err == database.ErrNotFound {
|
||||||
|
return errors.New("local repo not found")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = transaction.Delete(repo.Key()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = transaction.Delete(repo.RefKey()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return transaction.Commit()
|
||||||
}
|
}
|
||||||
|
|||||||
+1
-1
@@ -199,5 +199,5 @@ func (s *LocalRepoCollectionSuite) TestDrop(c *C) {
|
|||||||
r2, _ := collection.ByName("local2")
|
r2, _ := collection.ByName("local2")
|
||||||
c.Check(r2.String(), Equals, repo2.String())
|
c.Check(r2.String(), Equals, repo2.String())
|
||||||
|
|
||||||
c.Check(func() { s.collection.Drop(repo1) }, Panics, "local repo not found!")
|
c.Check(s.collection.Drop(repo1), ErrorMatches, "local repo not found")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -201,8 +201,23 @@ func (collection *PackageCollection) loadContents(p *Package, packagePool aptly.
|
|||||||
return contents
|
return contents
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update adds or updates information about package in DB checking for conficts first
|
// Update adds or updates information about package in DB
|
||||||
func (collection *PackageCollection) Update(p *Package) error {
|
func (collection *PackageCollection) Update(p *Package) error {
|
||||||
|
transaction, err := collection.db.OpenTransaction()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer transaction.Discard()
|
||||||
|
|
||||||
|
if err = collection.UpdateInTransaction(p, transaction); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return transaction.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateInTransaction updates/creates package info in the context of the outer transaction
|
||||||
|
func (collection *PackageCollection) UpdateInTransaction(p *Package, transaction database.Transaction) error {
|
||||||
var encodeBuffer bytes.Buffer
|
var encodeBuffer bytes.Buffer
|
||||||
|
|
||||||
encoder := codec.NewEncoder(&encodeBuffer, collection.codecHandle)
|
encoder := codec.NewEncoder(&encodeBuffer, collection.codecHandle)
|
||||||
@@ -210,12 +225,11 @@ func (collection *PackageCollection) Update(p *Package) error {
|
|||||||
encodeBuffer.Reset()
|
encodeBuffer.Reset()
|
||||||
encodeBuffer.WriteByte(0xc1)
|
encodeBuffer.WriteByte(0xc1)
|
||||||
encodeBuffer.WriteByte(0x1)
|
encodeBuffer.WriteByte(0x1)
|
||||||
err := encoder.Encode(p)
|
if err := encoder.Encode(p); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = collection.db.Put(p.Key(""), encodeBuffer.Bytes())
|
err := transaction.Put(p.Key(""), encodeBuffer.Bytes())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -228,7 +242,7 @@ func (collection *PackageCollection) Update(p *Package) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = collection.db.Put(p.Key("xF"), encodeBuffer.Bytes())
|
err = transaction.Put(p.Key("xF"), encodeBuffer.Bytes())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -241,7 +255,7 @@ func (collection *PackageCollection) Update(p *Package) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = collection.db.Put(p.Key("xD"), encodeBuffer.Bytes())
|
err = transaction.Put(p.Key("xD"), encodeBuffer.Bytes())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -256,7 +270,7 @@ func (collection *PackageCollection) Update(p *Package) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = collection.db.Put(p.Key("xE"), encodeBuffer.Bytes())
|
err = transaction.Put(p.Key("xE"), encodeBuffer.Bytes())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
+22
-9
@@ -914,21 +914,27 @@ func (collection *PublishedRepoCollection) CheckDuplicate(repo *PublishedRepo) *
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Update stores updated information about repo in DB
|
// Update stores updated information about repo in DB
|
||||||
func (collection *PublishedRepoCollection) Update(repo *PublishedRepo) (err error) {
|
func (collection *PublishedRepoCollection) Update(repo *PublishedRepo) error {
|
||||||
err = collection.db.Put(repo.Key(), repo.Encode())
|
transaction, err := collection.db.OpenTransaction()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return err
|
||||||
|
}
|
||||||
|
defer transaction.Discard()
|
||||||
|
|
||||||
|
err = transaction.Put(repo.Key(), repo.Encode())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if repo.SourceKind == SourceLocalRepo {
|
if repo.SourceKind == SourceLocalRepo {
|
||||||
for component, item := range repo.sourceItems {
|
for component, item := range repo.sourceItems {
|
||||||
err = collection.db.Put(repo.RefKey(component), item.packageRefs.Encode())
|
err = transaction.Put(repo.RefKey(component), item.packageRefs.Encode())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return transaction.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadComplete loads additional information for remote repo
|
// LoadComplete loads additional information for remote repo
|
||||||
@@ -1170,6 +1176,13 @@ func (collection *PublishedRepoCollection) Remove(publishedStorageProvider aptly
|
|||||||
storage, prefix, distribution string, collectionFactory *CollectionFactory, progress aptly.Progress,
|
storage, prefix, distribution string, collectionFactory *CollectionFactory, progress aptly.Progress,
|
||||||
force, skipCleanup bool) error {
|
force, skipCleanup bool) error {
|
||||||
|
|
||||||
|
transaction, err := collection.db.OpenTransaction()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer transaction.Discard()
|
||||||
|
|
||||||
|
// TODO: load via transaction
|
||||||
collection.loadList()
|
collection.loadList()
|
||||||
|
|
||||||
repo, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
repo, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
|
||||||
@@ -1221,17 +1234,17 @@ func (collection *PublishedRepoCollection) Remove(publishedStorageProvider aptly
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = collection.db.Delete(repo.Key())
|
err = transaction.Delete(repo.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, component := range repo.Components() {
|
for _, component := range repo.Components() {
|
||||||
err = collection.db.Delete(repo.RefKey(component))
|
err = transaction.Delete(repo.RefKey(component))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return transaction.Commit()
|
||||||
}
|
}
|
||||||
|
|||||||
+43
-13
@@ -617,34 +617,44 @@ func (repo *RemoteRepo) BuildDownloadQueue(packagePool aptly.PackagePool, packag
|
|||||||
|
|
||||||
// FinalizeDownload swaps for final value of package refs
|
// FinalizeDownload swaps for final value of package refs
|
||||||
func (repo *RemoteRepo) FinalizeDownload(collectionFactory *CollectionFactory, progress aptly.Progress) error {
|
func (repo *RemoteRepo) FinalizeDownload(collectionFactory *CollectionFactory, progress aptly.Progress) error {
|
||||||
|
transaction, err := collectionFactory.PackageCollection().db.OpenTransaction()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer transaction.Discard()
|
||||||
|
|
||||||
repo.LastDownloadDate = time.Now()
|
repo.LastDownloadDate = time.Now()
|
||||||
|
|
||||||
if progress != nil {
|
if progress != nil {
|
||||||
progress.InitBar(int64(repo.packageList.Len()), true)
|
progress.InitBar(int64(repo.packageList.Len()), false)
|
||||||
}
|
}
|
||||||
|
|
||||||
var i int
|
var i int
|
||||||
|
|
||||||
// update all the packages in collection
|
// update all the packages in collection
|
||||||
err := repo.packageList.ForEach(func(p *Package) error {
|
err = repo.packageList.ForEach(func(p *Package) error {
|
||||||
i++
|
i++
|
||||||
if progress != nil {
|
if progress != nil {
|
||||||
progress.SetBar(i)
|
progress.SetBar(i)
|
||||||
}
|
}
|
||||||
// download process might have updated checksums
|
// download process might have updated checksums
|
||||||
p.UpdateFiles(p.Files())
|
p.UpdateFiles(p.Files())
|
||||||
return collectionFactory.PackageCollection().Update(p)
|
return collectionFactory.PackageCollection().UpdateInTransaction(p, transaction)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
repo.packageRefs = NewPackageRefListFromPackageList(repo.packageList)
|
repo.packageRefs = NewPackageRefListFromPackageList(repo.packageList)
|
||||||
|
repo.packageList = nil
|
||||||
|
}
|
||||||
|
|
||||||
if progress != nil {
|
if progress != nil {
|
||||||
progress.ShutdownBar()
|
progress.ShutdownBar()
|
||||||
}
|
}
|
||||||
|
|
||||||
repo.packageList = nil
|
if err != nil {
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
}
|
||||||
|
return transaction.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Encode does msgpack encoding of RemoteRepo
|
// Encode does msgpack encoding of RemoteRepo
|
||||||
@@ -795,17 +805,24 @@ func (collection *RemoteRepoCollection) Add(repo *RemoteRepo) error {
|
|||||||
|
|
||||||
// Update stores updated information about repo in DB
|
// Update stores updated information about repo in DB
|
||||||
func (collection *RemoteRepoCollection) Update(repo *RemoteRepo) error {
|
func (collection *RemoteRepoCollection) Update(repo *RemoteRepo) error {
|
||||||
err := collection.db.Put(repo.Key(), repo.Encode())
|
transaction, err := collection.db.OpenTransaction()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer transaction.Discard()
|
||||||
|
|
||||||
|
err = transaction.Put(repo.Key(), repo.Encode())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if repo.packageRefs != nil {
|
if repo.packageRefs != nil {
|
||||||
err = collection.db.Put(repo.RefKey(), repo.packageRefs.Encode())
|
err = transaction.Put(repo.RefKey(), repo.packageRefs.Encode())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
|
return transaction.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadComplete loads additional information for remote repo
|
// LoadComplete loads additional information for remote repo
|
||||||
@@ -878,16 +895,29 @@ func (collection *RemoteRepoCollection) Len() int {
|
|||||||
|
|
||||||
// Drop removes remote repo from collection
|
// Drop removes remote repo from collection
|
||||||
func (collection *RemoteRepoCollection) Drop(repo *RemoteRepo) error {
|
func (collection *RemoteRepoCollection) Drop(repo *RemoteRepo) error {
|
||||||
if _, err := collection.db.Get(repo.Key()); err == database.ErrNotFound {
|
transaction, err := collection.db.OpenTransaction()
|
||||||
panic("repo not found!")
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer transaction.Discard()
|
||||||
|
|
||||||
|
if _, err = transaction.Get(repo.Key()); err != nil {
|
||||||
|
if err == database.ErrNotFound {
|
||||||
|
return errors.New("repo not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(collection.cache, repo.UUID)
|
delete(collection.cache, repo.UUID)
|
||||||
|
|
||||||
err := collection.db.Delete(repo.Key())
|
if err = transaction.Delete(repo.Key()); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return collection.db.Delete(repo.RefKey())
|
if err = transaction.Delete(repo.RefKey()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return transaction.Commit()
|
||||||
}
|
}
|
||||||
|
|||||||
+1
-1
@@ -774,7 +774,7 @@ func (s *RemoteRepoCollectionSuite) TestDrop(c *C) {
|
|||||||
r2, _ := collection.ByName("tyndex")
|
r2, _ := collection.ByName("tyndex")
|
||||||
c.Check(r2.String(), Equals, repo2.String())
|
c.Check(r2.String(), Equals, repo2.String())
|
||||||
|
|
||||||
c.Check(func() { s.collection.Drop(repo1) }, Panics, "repo not found!")
|
c.Check(s.collection.Drop(repo1), ErrorMatches, "repo not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
const exampleReleaseFile = `Origin: LP-PPA-agenda-developers-daily
|
const exampleReleaseFile = `Origin: LP-PPA-agenda-developers-daily
|
||||||
|
|||||||
+29
-8
@@ -216,14 +216,22 @@ func (collection *SnapshotCollection) Add(snapshot *Snapshot) error {
|
|||||||
|
|
||||||
// Update stores updated information about snapshot in DB
|
// Update stores updated information about snapshot in DB
|
||||||
func (collection *SnapshotCollection) Update(snapshot *Snapshot) error {
|
func (collection *SnapshotCollection) Update(snapshot *Snapshot) error {
|
||||||
err := collection.db.Put(snapshot.Key(), snapshot.Encode())
|
transaction, err := collection.db.OpenTransaction()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer transaction.Discard()
|
||||||
|
|
||||||
|
err = transaction.Put(snapshot.Key(), snapshot.Encode())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if snapshot.packageRefs != nil {
|
if snapshot.packageRefs != nil {
|
||||||
return collection.db.Put(snapshot.RefKey(), snapshot.packageRefs.Encode())
|
if err = transaction.Put(snapshot.RefKey(), snapshot.packageRefs.Encode()); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
}
|
||||||
|
return transaction.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadComplete loads additional information about snapshot
|
// LoadComplete loads additional information about snapshot
|
||||||
@@ -379,18 +387,31 @@ func (collection *SnapshotCollection) Len() int {
|
|||||||
|
|
||||||
// Drop removes snapshot from collection
|
// Drop removes snapshot from collection
|
||||||
func (collection *SnapshotCollection) Drop(snapshot *Snapshot) error {
|
func (collection *SnapshotCollection) Drop(snapshot *Snapshot) error {
|
||||||
if _, err := collection.db.Get(snapshot.Key()); err == database.ErrNotFound {
|
transaction, err := collection.db.OpenTransaction()
|
||||||
panic("snapshot not found!")
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer transaction.Discard()
|
||||||
|
|
||||||
|
if _, err = transaction.Get(snapshot.Key()); err != nil {
|
||||||
|
if err == database.ErrNotFound {
|
||||||
|
return errors.New("snapshot not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(collection.cache, snapshot.UUID)
|
delete(collection.cache, snapshot.UUID)
|
||||||
|
|
||||||
err := collection.db.Delete(snapshot.Key())
|
if err = transaction.Delete(snapshot.Key()); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return collection.db.Delete(snapshot.RefKey())
|
if err = transaction.Delete(snapshot.RefKey()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return transaction.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Snapshot sorting methods
|
// Snapshot sorting methods
|
||||||
|
|||||||
@@ -280,5 +280,5 @@ func (s *SnapshotCollectionSuite) TestDrop(c *C) {
|
|||||||
_, err = collection.ByUUID(s.snapshot1.UUID)
|
_, err = collection.ByUUID(s.snapshot1.UUID)
|
||||||
c.Check(err, ErrorMatches, "snapshot .* not found")
|
c.Check(err, ErrorMatches, "snapshot .* not found")
|
||||||
|
|
||||||
c.Check(func() { s.collection.Drop(s.snapshot1) }, Panics, "snapshot not found!")
|
c.Check(s.collection.Drop(s.snapshot1), ErrorMatches, "snapshot not found")
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user