mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-06-05 05:20:34 +00:00
Merge pull request #490 from smira/contents-low-footprint
Use temporary LevelDB to store contents index
This commit is contained in:
+65
-3
@@ -4,6 +4,9 @@ package database
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/filter"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
@@ -16,11 +19,17 @@ var (
|
||||
ErrNotFound = errors.New("key not found")
|
||||
)
|
||||
|
||||
// StorageProcessor is a function to process one single storage entry
|
||||
type StorageProcessor func(key []byte, value []byte) error
|
||||
|
||||
// Storage is an interface to KV storage
|
||||
type Storage interface {
|
||||
CreateTemporary() (Storage, error)
|
||||
Get(key []byte) ([]byte, error)
|
||||
Put(key []byte, value []byte) error
|
||||
Delete(key []byte) error
|
||||
HasPrefix(prefix []byte) bool
|
||||
ProcessByPrefix(prefix []byte, proc StorageProcessor) error
|
||||
KeysByPrefix(prefix []byte) [][]byte
|
||||
FetchByPrefix(prefix []byte) [][]byte
|
||||
Close() error
|
||||
@@ -28,6 +37,7 @@ type Storage interface {
|
||||
StartBatch()
|
||||
FinishBatch() error
|
||||
CompactDB() error
|
||||
Drop() error
|
||||
}
|
||||
|
||||
type levelDB struct {
|
||||
@@ -41,18 +51,24 @@ var (
|
||||
_ Storage = &levelDB{}
|
||||
)
|
||||
|
||||
func internalOpen(path string) (*leveldb.DB, error) {
|
||||
func internalOpen(path string, throttleCompaction bool) (*leveldb.DB, error) {
|
||||
o := &opt.Options{
|
||||
Filter: filter.NewBloomFilter(10),
|
||||
OpenFilesCacheCapacity: 256,
|
||||
}
|
||||
|
||||
if throttleCompaction {
|
||||
o.CompactionL0Trigger = 32
|
||||
o.WriteL0PauseTrigger = 96
|
||||
o.WriteL0SlowdownTrigger = 64
|
||||
}
|
||||
|
||||
return leveldb.OpenFile(path, o)
|
||||
}
|
||||
|
||||
// OpenDB opens (creates) LevelDB database
|
||||
func OpenDB(path string) (Storage, error) {
|
||||
db, err := internalOpen(path)
|
||||
db, err := internalOpen(path, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -77,6 +93,20 @@ func RecoverDB(path string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateTemporary creates new DB of the same type in temp dir
|
||||
func (l *levelDB) CreateTemporary() (Storage, error) {
|
||||
tempdir, err := ioutil.TempDir("", "aptly")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db, err := internalOpen(tempdir, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &levelDB{db: db, path: tempdir}, nil
|
||||
}
|
||||
|
||||
// Get key value from database
|
||||
func (l *levelDB) Get(key []byte) ([]byte, error) {
|
||||
value, err := l.db.Get(key, nil)
|
||||
@@ -152,6 +182,29 @@ func (l *levelDB) FetchByPrefix(prefix []byte) [][]byte {
|
||||
return result
|
||||
}
|
||||
|
||||
// HasPrefix checks whether it can find any key with given prefix and returns true if one exists
|
||||
func (l *levelDB) HasPrefix(prefix []byte) bool {
|
||||
iterator := l.db.NewIterator(nil, nil)
|
||||
defer iterator.Release()
|
||||
return iterator.Seek(prefix) && bytes.HasPrefix(iterator.Key(), prefix)
|
||||
}
|
||||
|
||||
// ProcessByPrefix iterates through all entries where key starts with prefix and calls
|
||||
// StorageProcessor on key value pair
|
||||
func (l *levelDB) ProcessByPrefix(prefix []byte, proc StorageProcessor) error {
|
||||
iterator := l.db.NewIterator(nil, nil)
|
||||
defer iterator.Release()
|
||||
|
||||
for ok := iterator.Seek(prefix); ok && bytes.HasPrefix(iterator.Key(), prefix); ok = iterator.Next() {
|
||||
err := proc(iterator.Key(), iterator.Value())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close finishes DB work
|
||||
func (l *levelDB) Close() error {
|
||||
if l.db == nil {
|
||||
@@ -169,7 +222,7 @@ func (l *levelDB) ReOpen() error {
|
||||
}
|
||||
|
||||
var err error
|
||||
l.db, err = internalOpen(l.path)
|
||||
l.db, err = internalOpen(l.path, false)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -197,3 +250,12 @@ func (l *levelDB) FinishBatch() error {
|
||||
func (l *levelDB) CompactDB() error {
|
||||
return l.db.CompactRange(util.Range{})
|
||||
}
|
||||
|
||||
// Drop removes all the DB files (DANGEROUS!)
|
||||
func (l *levelDB) Drop() error {
|
||||
if l.db != nil {
|
||||
return errors.New("DB is still open")
|
||||
}
|
||||
|
||||
return os.RemoveAll(l.path)
|
||||
}
|
||||
|
||||
@@ -71,6 +71,29 @@ func (s *LevelDBSuite) TestGetPut(c *C) {
|
||||
c.Assert(result, DeepEquals, value)
|
||||
}
|
||||
|
||||
func (s *LevelDBSuite) TestTemporaryDelete(c *C) {
|
||||
var (
|
||||
key = []byte("key")
|
||||
value = []byte("value")
|
||||
)
|
||||
|
||||
err := s.db.Put(key, value)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
temp, err := s.db.CreateTemporary()
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
c.Check(s.db.HasPrefix([]byte(nil)), Equals, true)
|
||||
c.Check(temp.HasPrefix([]byte(nil)), Equals, false)
|
||||
|
||||
err = temp.Put(key, value)
|
||||
c.Assert(err, IsNil)
|
||||
c.Check(temp.HasPrefix([]byte(nil)), Equals, true)
|
||||
|
||||
c.Assert(temp.Close(), IsNil)
|
||||
c.Assert(temp.Drop(), IsNil)
|
||||
}
|
||||
|
||||
func (s *LevelDBSuite) TestDelete(c *C) {
|
||||
var (
|
||||
key = []byte("key")
|
||||
@@ -107,10 +130,41 @@ func (s *LevelDBSuite) TestByPrefix(c *C) {
|
||||
c.Check(s.db.FetchByPrefix([]byte{0x80}), DeepEquals, [][]byte{{0x01}, {0x02}, {0x03}})
|
||||
c.Check(s.db.KeysByPrefix([]byte{0x80}), DeepEquals, [][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}})
|
||||
|
||||
keys := [][]byte{}
|
||||
values := [][]byte{}
|
||||
|
||||
c.Check(s.db.ProcessByPrefix([]byte{0x80}, func(k, v []byte) error {
|
||||
keys = append(keys, append([]byte(nil), k...))
|
||||
values = append(values, append([]byte(nil), v...))
|
||||
return nil
|
||||
}), IsNil)
|
||||
|
||||
c.Check(values, DeepEquals, [][]byte{{0x01}, {0x02}, {0x03}})
|
||||
c.Check(keys, DeepEquals, [][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}})
|
||||
|
||||
c.Check(s.db.ProcessByPrefix([]byte{0x80}, func(k, v []byte) error {
|
||||
return ErrNotFound
|
||||
}), Equals, ErrNotFound)
|
||||
|
||||
c.Check(s.db.ProcessByPrefix([]byte{0xa0}, func(k, v []byte) error {
|
||||
return ErrNotFound
|
||||
}), IsNil)
|
||||
|
||||
c.Check(s.db.FetchByPrefix([]byte{0xa0}), DeepEquals, [][]byte{})
|
||||
c.Check(s.db.KeysByPrefix([]byte{0xa0}), DeepEquals, [][]byte{})
|
||||
}
|
||||
|
||||
func (s *LevelDBSuite) TestHasPrefix(c *C) {
|
||||
c.Check(s.db.HasPrefix([]byte(nil)), Equals, false)
|
||||
c.Check(s.db.HasPrefix([]byte{0x80}), Equals, false)
|
||||
|
||||
s.db.Put([]byte{0x80, 0x01}, []byte{0x01})
|
||||
|
||||
c.Check(s.db.HasPrefix([]byte(nil)), Equals, true)
|
||||
c.Check(s.db.HasPrefix([]byte{0x80}), Equals, true)
|
||||
c.Check(s.db.HasPrefix([]byte{0x79}), Equals, false)
|
||||
}
|
||||
|
||||
func (s *LevelDBSuite) TestBatch(c *C) {
|
||||
var (
|
||||
key = []byte("key")
|
||||
|
||||
+9
-1
@@ -1,8 +1,9 @@
|
||||
package deb
|
||||
|
||||
import (
|
||||
"github.com/smira/aptly/database"
|
||||
"sync"
|
||||
|
||||
"github.com/smira/aptly/database"
|
||||
)
|
||||
|
||||
// CollectionFactory is a single place to generate all desired collections
|
||||
@@ -21,6 +22,13 @@ func NewCollectionFactory(db database.Storage) *CollectionFactory {
|
||||
return &CollectionFactory{Mutex: &sync.Mutex{}, db: db}
|
||||
}
|
||||
|
||||
// TemporaryDB creates new temporary DB
|
||||
//
|
||||
// DB should be closed/droped after being used
|
||||
func (factory *CollectionFactory) TemporaryDB() (database.Storage, error) {
|
||||
return factory.db.CreateTemporary()
|
||||
}
|
||||
|
||||
// PackageCollection returns (or creates) new PackageCollection
|
||||
func (factory *CollectionFactory) PackageCollection() *PackageCollection {
|
||||
factory.Lock()
|
||||
|
||||
+90
-30
@@ -1,75 +1,135 @@
|
||||
package deb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/smira/aptly/aptly"
|
||||
"github.com/smira/aptly/utils"
|
||||
"io"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/smira/aptly/aptly"
|
||||
"github.com/smira/aptly/database"
|
||||
"github.com/smira/go-uuid/uuid"
|
||||
)
|
||||
|
||||
// ContentsIndex calculates mapping from files to packages, with sorting and aggregation
|
||||
type ContentsIndex struct {
|
||||
index map[string][]*Package
|
||||
db database.Storage
|
||||
prefix []byte
|
||||
}
|
||||
|
||||
// NewContentsIndex creates empty ContentsIndex
|
||||
func NewContentsIndex() *ContentsIndex {
|
||||
func NewContentsIndex(db database.Storage) *ContentsIndex {
|
||||
return &ContentsIndex{
|
||||
index: make(map[string][]*Package),
|
||||
db: db,
|
||||
prefix: []byte(uuid.New()),
|
||||
}
|
||||
}
|
||||
|
||||
// Push adds package to contents index, calculating package contents as required
|
||||
func (index *ContentsIndex) Push(p *Package, packagePool aptly.PackagePool) {
|
||||
func (index *ContentsIndex) Push(p *Package, packagePool aptly.PackagePool) error {
|
||||
contents := p.Contents(packagePool)
|
||||
qualifiedName := []byte(p.QualifiedName())
|
||||
|
||||
for _, path := range contents {
|
||||
index.index[path] = append(index.index[path], p)
|
||||
// for performance reasons we only write to leveldb during push.
|
||||
// merging of qualified names per path will be done in WriteTo
|
||||
err := index.db.Put(append(append(append(index.prefix, []byte(path)...), byte(0)), qualifiedName...), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Empty checks whether index contains no packages
|
||||
func (index *ContentsIndex) Empty() bool {
|
||||
return len(index.index) == 0
|
||||
return !index.db.HasPrefix(index.prefix)
|
||||
}
|
||||
|
||||
// WriteTo dumps sorted mapping of files to qualified package names
|
||||
func (index *ContentsIndex) WriteTo(w io.Writer) (int64, error) {
|
||||
// For performance reasons push method wrote on key per path and package
|
||||
// in this method we now need to merge all packages which have the same path
|
||||
// and write it to contents index file
|
||||
|
||||
var n int64
|
||||
|
||||
paths := make([]string, len(index.index))
|
||||
|
||||
i := 0
|
||||
for path := range index.index {
|
||||
paths[i] = path
|
||||
i++
|
||||
}
|
||||
|
||||
sort.Strings(paths)
|
||||
|
||||
nn, err := fmt.Fprintf(w, "%s %s\n", "FILE", "LOCATION")
|
||||
n += int64(nn)
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
|
||||
for _, path := range paths {
|
||||
packages := index.index[path]
|
||||
parts := make([]string, 0, len(packages))
|
||||
for i := range packages {
|
||||
name := packages[i].QualifiedName()
|
||||
if !utils.StrSliceHasItem(parts, name) {
|
||||
parts = append(parts, name)
|
||||
}
|
||||
prefixLen := len(index.prefix)
|
||||
|
||||
var (
|
||||
currentPath []byte
|
||||
currentPkgs [][]byte
|
||||
)
|
||||
|
||||
err = index.db.ProcessByPrefix(index.prefix, func(key []byte, value []byte) error {
|
||||
// cut prefix
|
||||
key = key[prefixLen:]
|
||||
|
||||
i := bytes.Index(key, []byte{0})
|
||||
if i == -1 {
|
||||
return errors.New("corrupted index entry")
|
||||
}
|
||||
nn, err = fmt.Fprintf(w, "%s %s\n", path, strings.Join(parts, ","))
|
||||
|
||||
path := key[:i]
|
||||
pkg := key[i+1:]
|
||||
|
||||
if !bytes.Equal(path, currentPath) {
|
||||
if currentPath != nil {
|
||||
nn, err = w.Write(append(currentPath, ' '))
|
||||
n += int64(nn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nn, err = w.Write(bytes.Join(currentPkgs, []byte{','}))
|
||||
n += int64(nn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
nn, err = w.Write([]byte{'\n'})
|
||||
n += int64(nn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
currentPath = append([]byte(nil), path...)
|
||||
currentPkgs = nil
|
||||
}
|
||||
|
||||
currentPkgs = append(currentPkgs, append([]byte(nil), pkg...))
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
|
||||
if currentPath != nil {
|
||||
nn, err = w.Write(append(currentPath, ' '))
|
||||
n += int64(nn)
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
|
||||
nn, err = w.Write(bytes.Join(currentPkgs, []byte{','}))
|
||||
n += int64(nn)
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
|
||||
nn, err = w.Write([]byte{'\n'})
|
||||
n += int64(nn)
|
||||
}
|
||||
|
||||
return n, nil
|
||||
return n, err
|
||||
}
|
||||
|
||||
+8
-1
@@ -469,6 +469,13 @@ func (p *PublishedRepo) Publish(packagePool aptly.PackagePool, publishedStorageP
|
||||
return err
|
||||
}
|
||||
|
||||
tempDB, err := collectionFactory.TemporaryDB()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tempDB.Close()
|
||||
defer tempDB.Drop()
|
||||
|
||||
if progress != nil {
|
||||
progress.Printf("Loading packages...\n")
|
||||
}
|
||||
@@ -563,7 +570,7 @@ func (p *PublishedRepo) Publish(packagePool aptly.PackagePool, publishedStorageP
|
||||
contentIndex := contentIndexes[key]
|
||||
|
||||
if contentIndex == nil {
|
||||
contentIndex = NewContentsIndex()
|
||||
contentIndex = NewContentsIndex(tempDB)
|
||||
contentIndexes[key] = contentIndex
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user