From f2dc4eeec90850cbf1b1e8dec55c3c176333b168 Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Tue, 21 Feb 2017 19:09:51 +0300 Subject: [PATCH] Generating contents indexes via temporary LevelDB --- deb/collections.go | 10 +++- deb/contents.go | 120 +++++++++++++++++++++++++++++++++------------ deb/publish.go | 9 +++- 3 files changed, 107 insertions(+), 32 deletions(-) diff --git a/deb/collections.go b/deb/collections.go index a34363b4..b7cbf2ea 100644 --- a/deb/collections.go +++ b/deb/collections.go @@ -1,8 +1,9 @@ package deb import ( - "github.com/smira/aptly/database" "sync" + + "github.com/smira/aptly/database" ) // CollectionFactory is a single place to generate all desired collections @@ -21,6 +22,13 @@ func NewCollectionFactory(db database.Storage) *CollectionFactory { return &CollectionFactory{Mutex: &sync.Mutex{}, db: db} } +// TemporaryDB creates new temporary DB +// +// DB should be closed/droped after being used +func (factory *CollectionFactory) TemporaryDB() (database.Storage, error) { + return factory.db.CreateTemporary() +} + // PackageCollection returns (or creates) new PackageCollection func (factory *CollectionFactory) PackageCollection() *PackageCollection { factory.Lock() diff --git a/deb/contents.go b/deb/contents.go index 514516e6..30dc3e0e 100644 --- a/deb/contents.go +++ b/deb/contents.go @@ -1,75 +1,135 @@ package deb import ( + "bytes" + "errors" "fmt" - "github.com/smira/aptly/aptly" - "github.com/smira/aptly/utils" "io" - "sort" - "strings" + + "github.com/smira/aptly/aptly" + "github.com/smira/aptly/database" + "github.com/smira/go-uuid/uuid" ) // ContentsIndex calculates mapping from files to packages, with sorting and aggregation type ContentsIndex struct { - index map[string][]*Package + db database.Storage + prefix []byte } // NewContentsIndex creates empty ContentsIndex -func NewContentsIndex() *ContentsIndex { +func NewContentsIndex(db database.Storage) *ContentsIndex { return &ContentsIndex{ - index: make(map[string][]*Package), + db: db, + prefix: []byte(uuid.New()), } } // Push adds package to contents index, calculating package contents as required -func (index *ContentsIndex) Push(p *Package, packagePool aptly.PackagePool) { +func (index *ContentsIndex) Push(p *Package, packagePool aptly.PackagePool) error { contents := p.Contents(packagePool) + qualifiedName := []byte(p.QualifiedName()) for _, path := range contents { - index.index[path] = append(index.index[path], p) + // for performance reasons we only write to leveldb during push. + // merging of qualified names per path will be done in WriteTo + err := index.db.Put(append(append(append(index.prefix, []byte(path)...), byte(0)), qualifiedName...), nil) + if err != nil { + return err + } } + + return nil } // Empty checks whether index contains no packages func (index *ContentsIndex) Empty() bool { - return len(index.index) == 0 + return !index.db.HasPrefix(index.prefix) } // WriteTo dumps sorted mapping of files to qualified package names func (index *ContentsIndex) WriteTo(w io.Writer) (int64, error) { + // For performance reasons push method wrote on key per path and package + // in this method we now need to merge all pkg with have the same path + // and write it to contents index file + var n int64 - paths := make([]string, len(index.index)) - - i := 0 - for path := range index.index { - paths[i] = path - i++ - } - - sort.Strings(paths) - nn, err := fmt.Fprintf(w, "%s %s\n", "FILE", "LOCATION") n += int64(nn) if err != nil { return n, err } - for _, path := range paths { - packages := index.index[path] - parts := make([]string, 0, len(packages)) - for i := range packages { - name := packages[i].QualifiedName() - if !utils.StrSliceHasItem(parts, name) { - parts = append(parts, name) - } + prefixLen := len(index.prefix) + + var ( + currentPath []byte + currentPkgs [][]byte + ) + + err = index.db.ProcessByPrefix(index.prefix, func(key []byte, value []byte) error { + // cut prefix + key = key[prefixLen:] + + i := bytes.Index(key, []byte{0}) + if i == -1 { + return errors.New("corrupted index entry") } - nn, err = fmt.Fprintf(w, "%s %s\n", path, strings.Join(parts, ",")) + + path := key[:i] + pkg := key[i+1:] + + if !bytes.Equal(path, currentPath) { + if currentPath != nil { + nn, err = w.Write(append(currentPath, ' ')) + n += int64(nn) + if err != nil { + return err + } + + nn, err = w.Write(bytes.Join(currentPkgs, []byte{','})) + n += int64(nn) + if err != nil { + return err + } + + nn, err = w.Write([]byte{'\n'}) + n += int64(nn) + if err != nil { + return err + } + } + + currentPath = append([]byte(nil), path...) + currentPkgs = nil + } + + currentPkgs = append(currentPkgs, append([]byte(nil), pkg...)) + + return nil + }) + + if err != nil { + return n, err + } + + if currentPath != nil { + nn, err = w.Write(append(currentPath, ' ')) n += int64(nn) if err != nil { return n, err } + + nn, err = w.Write(bytes.Join(currentPkgs, []byte{','})) + n += int64(nn) + if err != nil { + return n, err + } + + nn, err = w.Write([]byte{'\n'}) + n += int64(nn) } - return n, nil + return n, err } diff --git a/deb/publish.go b/deb/publish.go index 04dd221c..7f8ee3c4 100644 --- a/deb/publish.go +++ b/deb/publish.go @@ -469,6 +469,13 @@ func (p *PublishedRepo) Publish(packagePool aptly.PackagePool, publishedStorageP return err } + tempDB, err := collectionFactory.TemporaryDB() + if err != nil { + return err + } + defer tempDB.Close() + defer tempDB.Drop() + if progress != nil { progress.Printf("Loading packages...\n") } @@ -563,7 +570,7 @@ func (p *PublishedRepo) Publish(packagePool aptly.PackagePool, publishedStorageP contentIndex := contentIndexes[key] if contentIndex == nil { - contentIndex = NewContentsIndex() + contentIndex = NewContentsIndex(tempDB) contentIndexes[key] = contentIndex }