diff --git a/database/leveldb.go b/database/leveldb.go index 18b8b93f..8b065725 100644 --- a/database/leveldb.go +++ b/database/leveldb.go @@ -4,6 +4,9 @@ package database import ( "bytes" "errors" + "io/ioutil" + "os" + "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/filter" "github.com/syndtr/goleveldb/leveldb/opt" @@ -16,11 +19,17 @@ var ( ErrNotFound = errors.New("key not found") ) +// StorageProcessor is a function to process one single storage entry +type StorageProcessor func(key []byte, value []byte) error + // Storage is an interface to KV storage type Storage interface { + CreateTemporary() (Storage, error) Get(key []byte) ([]byte, error) Put(key []byte, value []byte) error Delete(key []byte) error + HasPrefix(prefix []byte) bool + ProcessByPrefix(prefix []byte, proc StorageProcessor) error KeysByPrefix(prefix []byte) [][]byte FetchByPrefix(prefix []byte) [][]byte Close() error @@ -28,6 +37,7 @@ type Storage interface { StartBatch() FinishBatch() error CompactDB() error + Drop() error } type levelDB struct { @@ -41,18 +51,24 @@ var ( _ Storage = &levelDB{} ) -func internalOpen(path string) (*leveldb.DB, error) { +func internalOpen(path string, throttleCompaction bool) (*leveldb.DB, error) { o := &opt.Options{ Filter: filter.NewBloomFilter(10), OpenFilesCacheCapacity: 256, } + if throttleCompaction { + o.CompactionL0Trigger = 32 + o.WriteL0PauseTrigger = 96 + o.WriteL0SlowdownTrigger = 64 + } + return leveldb.OpenFile(path, o) } // OpenDB opens (creates) LevelDB database func OpenDB(path string) (Storage, error) { - db, err := internalOpen(path) + db, err := internalOpen(path, false) if err != nil { return nil, err } @@ -77,6 +93,20 @@ func RecoverDB(path string) error { return nil } +// CreateTemporary creates new DB of the same type in temp dir +func (l *levelDB) CreateTemporary() (Storage, error) { + tempdir, err := ioutil.TempDir("", "aptly") + if err != nil { + return nil, err + } + + db, err := internalOpen(tempdir, true) + if err != nil { + return nil, err + } + return &levelDB{db: db, path: tempdir}, nil +} + // Get key value from database func (l *levelDB) Get(key []byte) ([]byte, error) { value, err := l.db.Get(key, nil) @@ -152,6 +182,29 @@ func (l *levelDB) FetchByPrefix(prefix []byte) [][]byte { return result } +// HasPrefix checks whether it can find any key with given prefix and returns true if one exists +func (l *levelDB) HasPrefix(prefix []byte) bool { + iterator := l.db.NewIterator(nil, nil) + defer iterator.Release() + return iterator.Seek(prefix) && bytes.HasPrefix(iterator.Key(), prefix) +} + +// ProcessByPrefix iterates through all entries where key starts with prefix and calls +// StorageProcessor on key value pair +func (l *levelDB) ProcessByPrefix(prefix []byte, proc StorageProcessor) error { + iterator := l.db.NewIterator(nil, nil) + defer iterator.Release() + + for ok := iterator.Seek(prefix); ok && bytes.HasPrefix(iterator.Key(), prefix); ok = iterator.Next() { + err := proc(iterator.Key(), iterator.Value()) + if err != nil { + return err + } + } + + return nil +} + // Close finishes DB work func (l *levelDB) Close() error { if l.db == nil { @@ -169,7 +222,7 @@ func (l *levelDB) ReOpen() error { } var err error - l.db, err = internalOpen(l.path) + l.db, err = internalOpen(l.path, false) return err } @@ -197,3 +250,12 @@ func (l *levelDB) FinishBatch() error { func (l *levelDB) CompactDB() error { return l.db.CompactRange(util.Range{}) } + +// Drop removes all the DB files (DANGEROUS!) +func (l *levelDB) Drop() error { + if l.db != nil { + return errors.New("DB is still open") + } + + return os.RemoveAll(l.path) +} diff --git a/database/leveldb_test.go b/database/leveldb_test.go index 2119c388..155a3120 100644 --- a/database/leveldb_test.go +++ b/database/leveldb_test.go @@ -71,6 +71,29 @@ func (s *LevelDBSuite) TestGetPut(c *C) { c.Assert(result, DeepEquals, value) } +func (s *LevelDBSuite) TestTemporaryDelete(c *C) { + var ( + key = []byte("key") + value = []byte("value") + ) + + err := s.db.Put(key, value) + c.Assert(err, IsNil) + + temp, err := s.db.CreateTemporary() + c.Assert(err, IsNil) + + c.Check(s.db.HasPrefix([]byte(nil)), Equals, true) + c.Check(temp.HasPrefix([]byte(nil)), Equals, false) + + err = temp.Put(key, value) + c.Assert(err, IsNil) + c.Check(temp.HasPrefix([]byte(nil)), Equals, true) + + c.Assert(temp.Close(), IsNil) + c.Assert(temp.Drop(), IsNil) +} + func (s *LevelDBSuite) TestDelete(c *C) { var ( key = []byte("key") @@ -107,10 +130,41 @@ func (s *LevelDBSuite) TestByPrefix(c *C) { c.Check(s.db.FetchByPrefix([]byte{0x80}), DeepEquals, [][]byte{{0x01}, {0x02}, {0x03}}) c.Check(s.db.KeysByPrefix([]byte{0x80}), DeepEquals, [][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}}) + keys := [][]byte{} + values := [][]byte{} + + c.Check(s.db.ProcessByPrefix([]byte{0x80}, func(k, v []byte) error { + keys = append(keys, append([]byte(nil), k...)) + values = append(values, append([]byte(nil), v...)) + return nil + }), IsNil) + + c.Check(values, DeepEquals, [][]byte{{0x01}, {0x02}, {0x03}}) + c.Check(keys, DeepEquals, [][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}}) + + c.Check(s.db.ProcessByPrefix([]byte{0x80}, func(k, v []byte) error { + return ErrNotFound + }), Equals, ErrNotFound) + + c.Check(s.db.ProcessByPrefix([]byte{0xa0}, func(k, v []byte) error { + return ErrNotFound + }), IsNil) + c.Check(s.db.FetchByPrefix([]byte{0xa0}), DeepEquals, [][]byte{}) c.Check(s.db.KeysByPrefix([]byte{0xa0}), DeepEquals, [][]byte{}) } +func (s *LevelDBSuite) TestHasPrefix(c *C) { + c.Check(s.db.HasPrefix([]byte(nil)), Equals, false) + c.Check(s.db.HasPrefix([]byte{0x80}), Equals, false) + + s.db.Put([]byte{0x80, 0x01}, []byte{0x01}) + + c.Check(s.db.HasPrefix([]byte(nil)), Equals, true) + c.Check(s.db.HasPrefix([]byte{0x80}), Equals, true) + c.Check(s.db.HasPrefix([]byte{0x79}), Equals, false) +} + func (s *LevelDBSuite) TestBatch(c *C) { var ( key = []byte("key") diff --git a/deb/collections.go b/deb/collections.go index a34363b4..b7cbf2ea 100644 --- a/deb/collections.go +++ b/deb/collections.go @@ -1,8 +1,9 @@ package deb import ( - "github.com/smira/aptly/database" "sync" + + "github.com/smira/aptly/database" ) // CollectionFactory is a single place to generate all desired collections @@ -21,6 +22,13 @@ func NewCollectionFactory(db database.Storage) *CollectionFactory { return &CollectionFactory{Mutex: &sync.Mutex{}, db: db} } +// TemporaryDB creates new temporary DB +// +// DB should be closed/droped after being used +func (factory *CollectionFactory) TemporaryDB() (database.Storage, error) { + return factory.db.CreateTemporary() +} + // PackageCollection returns (or creates) new PackageCollection func (factory *CollectionFactory) PackageCollection() *PackageCollection { factory.Lock() diff --git a/deb/contents.go b/deb/contents.go index 514516e6..e3dc8eb7 100644 --- a/deb/contents.go +++ b/deb/contents.go @@ -1,75 +1,135 @@ package deb import ( + "bytes" + "errors" "fmt" - "github.com/smira/aptly/aptly" - "github.com/smira/aptly/utils" "io" - "sort" - "strings" + + "github.com/smira/aptly/aptly" + "github.com/smira/aptly/database" + "github.com/smira/go-uuid/uuid" ) // ContentsIndex calculates mapping from files to packages, with sorting and aggregation type ContentsIndex struct { - index map[string][]*Package + db database.Storage + prefix []byte } // NewContentsIndex creates empty ContentsIndex -func NewContentsIndex() *ContentsIndex { +func NewContentsIndex(db database.Storage) *ContentsIndex { return &ContentsIndex{ - index: make(map[string][]*Package), + db: db, + prefix: []byte(uuid.New()), } } // Push adds package to contents index, calculating package contents as required -func (index *ContentsIndex) Push(p *Package, packagePool aptly.PackagePool) { +func (index *ContentsIndex) Push(p *Package, packagePool aptly.PackagePool) error { contents := p.Contents(packagePool) + qualifiedName := []byte(p.QualifiedName()) for _, path := range contents { - index.index[path] = append(index.index[path], p) + // for performance reasons we only write to leveldb during push. + // merging of qualified names per path will be done in WriteTo + err := index.db.Put(append(append(append(index.prefix, []byte(path)...), byte(0)), qualifiedName...), nil) + if err != nil { + return err + } } + + return nil } // Empty checks whether index contains no packages func (index *ContentsIndex) Empty() bool { - return len(index.index) == 0 + return !index.db.HasPrefix(index.prefix) } // WriteTo dumps sorted mapping of files to qualified package names func (index *ContentsIndex) WriteTo(w io.Writer) (int64, error) { + // For performance reasons push method wrote on key per path and package + // in this method we now need to merge all packages which have the same path + // and write it to contents index file + var n int64 - paths := make([]string, len(index.index)) - - i := 0 - for path := range index.index { - paths[i] = path - i++ - } - - sort.Strings(paths) - nn, err := fmt.Fprintf(w, "%s %s\n", "FILE", "LOCATION") n += int64(nn) if err != nil { return n, err } - for _, path := range paths { - packages := index.index[path] - parts := make([]string, 0, len(packages)) - for i := range packages { - name := packages[i].QualifiedName() - if !utils.StrSliceHasItem(parts, name) { - parts = append(parts, name) - } + prefixLen := len(index.prefix) + + var ( + currentPath []byte + currentPkgs [][]byte + ) + + err = index.db.ProcessByPrefix(index.prefix, func(key []byte, value []byte) error { + // cut prefix + key = key[prefixLen:] + + i := bytes.Index(key, []byte{0}) + if i == -1 { + return errors.New("corrupted index entry") } - nn, err = fmt.Fprintf(w, "%s %s\n", path, strings.Join(parts, ",")) + + path := key[:i] + pkg := key[i+1:] + + if !bytes.Equal(path, currentPath) { + if currentPath != nil { + nn, err = w.Write(append(currentPath, ' ')) + n += int64(nn) + if err != nil { + return err + } + + nn, err = w.Write(bytes.Join(currentPkgs, []byte{','})) + n += int64(nn) + if err != nil { + return err + } + + nn, err = w.Write([]byte{'\n'}) + n += int64(nn) + if err != nil { + return err + } + } + + currentPath = append([]byte(nil), path...) + currentPkgs = nil + } + + currentPkgs = append(currentPkgs, append([]byte(nil), pkg...)) + + return nil + }) + + if err != nil { + return n, err + } + + if currentPath != nil { + nn, err = w.Write(append(currentPath, ' ')) n += int64(nn) if err != nil { return n, err } + + nn, err = w.Write(bytes.Join(currentPkgs, []byte{','})) + n += int64(nn) + if err != nil { + return n, err + } + + nn, err = w.Write([]byte{'\n'}) + n += int64(nn) } - return n, nil + return n, err } diff --git a/deb/publish.go b/deb/publish.go index 04dd221c..7f8ee3c4 100644 --- a/deb/publish.go +++ b/deb/publish.go @@ -469,6 +469,13 @@ func (p *PublishedRepo) Publish(packagePool aptly.PackagePool, publishedStorageP return err } + tempDB, err := collectionFactory.TemporaryDB() + if err != nil { + return err + } + defer tempDB.Close() + defer tempDB.Drop() + if progress != nil { progress.Printf("Loading packages...\n") } @@ -563,7 +570,7 @@ func (p *PublishedRepo) Publish(packagePool aptly.PackagePool, publishedStorageP contentIndex := contentIndexes[key] if contentIndex == nil { - contentIndex = NewContentsIndex() + contentIndex = NewContentsIndex(tempDB) contentIndexes[key] = contentIndex }