Generating contents indexes via temporary LevelDB

This commit is contained in:
Andrey Smirnov
2017-02-21 19:09:51 +03:00
parent f58d2627c1
commit f2dc4eeec9
3 changed files with 107 additions and 32 deletions

View File

@@ -1,8 +1,9 @@
package deb
import (
"github.com/smira/aptly/database"
"sync"
"github.com/smira/aptly/database"
)
// CollectionFactory is a single place to generate all desired collections
@@ -21,6 +22,13 @@ func NewCollectionFactory(db database.Storage) *CollectionFactory {
return &CollectionFactory{Mutex: &sync.Mutex{}, db: db}
}
// TemporaryDB creates new temporary DB
//
// DB should be closed/droped after being used
func (factory *CollectionFactory) TemporaryDB() (database.Storage, error) {
return factory.db.CreateTemporary()
}
// PackageCollection returns (or creates) new PackageCollection
func (factory *CollectionFactory) PackageCollection() *PackageCollection {
factory.Lock()

View File

@@ -1,75 +1,135 @@
package deb
import (
"bytes"
"errors"
"fmt"
"github.com/smira/aptly/aptly"
"github.com/smira/aptly/utils"
"io"
"sort"
"strings"
"github.com/smira/aptly/aptly"
"github.com/smira/aptly/database"
"github.com/smira/go-uuid/uuid"
)
// ContentsIndex calculates mapping from files to packages, with sorting and aggregation
type ContentsIndex struct {
index map[string][]*Package
db database.Storage
prefix []byte
}
// NewContentsIndex creates empty ContentsIndex
func NewContentsIndex() *ContentsIndex {
func NewContentsIndex(db database.Storage) *ContentsIndex {
return &ContentsIndex{
index: make(map[string][]*Package),
db: db,
prefix: []byte(uuid.New()),
}
}
// Push adds package to contents index, calculating package contents as required
func (index *ContentsIndex) Push(p *Package, packagePool aptly.PackagePool) {
func (index *ContentsIndex) Push(p *Package, packagePool aptly.PackagePool) error {
contents := p.Contents(packagePool)
qualifiedName := []byte(p.QualifiedName())
for _, path := range contents {
index.index[path] = append(index.index[path], p)
// for performance reasons we only write to leveldb during push.
// merging of qualified names per path will be done in WriteTo
err := index.db.Put(append(append(append(index.prefix, []byte(path)...), byte(0)), qualifiedName...), nil)
if err != nil {
return err
}
}
return nil
}
// Empty checks whether index contains no packages
func (index *ContentsIndex) Empty() bool {
return len(index.index) == 0
return !index.db.HasPrefix(index.prefix)
}
// WriteTo dumps sorted mapping of files to qualified package names
func (index *ContentsIndex) WriteTo(w io.Writer) (int64, error) {
// For performance reasons push method wrote on key per path and package
// in this method we now need to merge all pkg with have the same path
// and write it to contents index file
var n int64
paths := make([]string, len(index.index))
i := 0
for path := range index.index {
paths[i] = path
i++
}
sort.Strings(paths)
nn, err := fmt.Fprintf(w, "%s %s\n", "FILE", "LOCATION")
n += int64(nn)
if err != nil {
return n, err
}
for _, path := range paths {
packages := index.index[path]
parts := make([]string, 0, len(packages))
for i := range packages {
name := packages[i].QualifiedName()
if !utils.StrSliceHasItem(parts, name) {
parts = append(parts, name)
}
prefixLen := len(index.prefix)
var (
currentPath []byte
currentPkgs [][]byte
)
err = index.db.ProcessByPrefix(index.prefix, func(key []byte, value []byte) error {
// cut prefix
key = key[prefixLen:]
i := bytes.Index(key, []byte{0})
if i == -1 {
return errors.New("corrupted index entry")
}
nn, err = fmt.Fprintf(w, "%s %s\n", path, strings.Join(parts, ","))
path := key[:i]
pkg := key[i+1:]
if !bytes.Equal(path, currentPath) {
if currentPath != nil {
nn, err = w.Write(append(currentPath, ' '))
n += int64(nn)
if err != nil {
return err
}
nn, err = w.Write(bytes.Join(currentPkgs, []byte{','}))
n += int64(nn)
if err != nil {
return err
}
nn, err = w.Write([]byte{'\n'})
n += int64(nn)
if err != nil {
return err
}
}
currentPath = append([]byte(nil), path...)
currentPkgs = nil
}
currentPkgs = append(currentPkgs, append([]byte(nil), pkg...))
return nil
})
if err != nil {
return n, err
}
if currentPath != nil {
nn, err = w.Write(append(currentPath, ' '))
n += int64(nn)
if err != nil {
return n, err
}
nn, err = w.Write(bytes.Join(currentPkgs, []byte{','}))
n += int64(nn)
if err != nil {
return n, err
}
nn, err = w.Write([]byte{'\n'})
n += int64(nn)
}
return n, nil
return n, err
}

View File

@@ -469,6 +469,13 @@ func (p *PublishedRepo) Publish(packagePool aptly.PackagePool, publishedStorageP
return err
}
tempDB, err := collectionFactory.TemporaryDB()
if err != nil {
return err
}
defer tempDB.Close()
defer tempDB.Drop()
if progress != nil {
progress.Printf("Loading packages...\n")
}
@@ -563,7 +570,7 @@ func (p *PublishedRepo) Publish(packagePool aptly.PackagePool, publishedStorageP
contentIndex := contentIndexes[key]
if contentIndex == nil {
contentIndex = NewContentsIndex()
contentIndex = NewContentsIndex(tempDB)
contentIndexes[key] = contentIndex
}