Merge pull request #765 from aptly-dev/761-lazy-iteration

Implement lazy iteration (ForEach) over collections
This commit is contained in:
Andrey Smirnov
2018-08-16 16:44:05 +03:00
committed by GitHub
5 changed files with 170 additions and 53 deletions

View File

@@ -99,28 +99,34 @@ type LocalRepoCollection struct {
// NewLocalRepoCollection loads LocalRepos from DB and makes up collection
func NewLocalRepoCollection(db database.Storage) *LocalRepoCollection {
result := &LocalRepoCollection{
return &LocalRepoCollection{
RWMutex: &sync.RWMutex{},
db: db,
}
}
blobs := db.FetchByPrefix([]byte("L"))
result.list = make([]*LocalRepo, 0, len(blobs))
func (collection *LocalRepoCollection) loadList() {
if collection.list != nil {
return
}
blobs := collection.db.FetchByPrefix([]byte("L"))
collection.list = make([]*LocalRepo, 0, len(blobs))
for _, blob := range blobs {
r := &LocalRepo{}
if err := r.Decode(blob); err != nil {
log.Printf("Error decoding repo: %s\n", err)
} else {
result.list = append(result.list, r)
collection.list = append(collection.list, r)
}
}
return result
}
// Add appends new repo to collection and saves it
func (collection *LocalRepoCollection) Add(repo *LocalRepo) error {
collection.loadList()
for _, r := range collection.list {
if r.Name == repo.Name {
return fmt.Errorf("local repo with name %s already exists", repo.Name)
@@ -153,6 +159,8 @@ func (collection *LocalRepoCollection) Update(repo *LocalRepo) error {
// LoadComplete loads additional information for local repo
func (collection *LocalRepoCollection) LoadComplete(repo *LocalRepo) error {
collection.loadList()
encoded, err := collection.db.Get(repo.RefKey())
if err == database.ErrNotFound {
return nil
@@ -167,6 +175,8 @@ func (collection *LocalRepoCollection) LoadComplete(repo *LocalRepo) error {
// ByName looks up repository by name
func (collection *LocalRepoCollection) ByName(name string) (*LocalRepo, error) {
collection.loadList()
for _, r := range collection.list {
if r.Name == name {
return r, nil
@@ -177,6 +187,8 @@ func (collection *LocalRepoCollection) ByName(name string) (*LocalRepo, error) {
// ByUUID looks up repository by uuid
func (collection *LocalRepoCollection) ByUUID(uuid string) (*LocalRepo, error) {
collection.loadList()
for _, r := range collection.list {
if r.UUID == uuid {
return r, nil
@@ -187,23 +199,28 @@ func (collection *LocalRepoCollection) ByUUID(uuid string) (*LocalRepo, error) {
// ForEach runs method for each repository
func (collection *LocalRepoCollection) ForEach(handler func(*LocalRepo) error) error {
var err error
for _, r := range collection.list {
err = handler(r)
if err != nil {
return err
return collection.db.ProcessByPrefix([]byte("L"), func(key, blob []byte) error {
r := &LocalRepo{}
if err := r.Decode(blob); err != nil {
log.Printf("Error decoding repo: %s\n", err)
return nil
}
}
return err
return handler(r)
})
}
// Len returns number of remote repos
func (collection *LocalRepoCollection) Len() int {
collection.loadList()
return len(collection.list)
}
// Drop removes remote repo from collection
func (collection *LocalRepoCollection) Drop(repo *LocalRepo) error {
collection.loadList()
repoPosition := -1
for i, r := range collection.list {

View File

@@ -852,28 +852,34 @@ type PublishedRepoCollection struct {
// NewPublishedRepoCollection loads PublishedRepos from DB and makes up collection
func NewPublishedRepoCollection(db database.Storage) *PublishedRepoCollection {
result := &PublishedRepoCollection{
return &PublishedRepoCollection{
RWMutex: &sync.RWMutex{},
db: db,
}
}
blobs := db.FetchByPrefix([]byte("U"))
result.list = make([]*PublishedRepo, 0, len(blobs))
func (collection *PublishedRepoCollection) loadList() {
if collection.list != nil {
return
}
blobs := collection.db.FetchByPrefix([]byte("U"))
collection.list = make([]*PublishedRepo, 0, len(blobs))
for _, blob := range blobs {
r := &PublishedRepo{}
if err := r.Decode(blob); err != nil {
log.Printf("Error decoding published repo: %s\n", err)
} else {
result.list = append(result.list, r)
collection.list = append(collection.list, r)
}
}
return result
}
// Add appends new repo to collection and saves it
func (collection *PublishedRepoCollection) Add(repo *PublishedRepo) error {
collection.loadList()
if collection.CheckDuplicate(repo) != nil {
return fmt.Errorf("published repo with storage/prefix/distribution %s/%s/%s already exists", repo.Storage, repo.Prefix, repo.Distribution)
}
@@ -889,6 +895,8 @@ func (collection *PublishedRepoCollection) Add(repo *PublishedRepo) error {
// CheckDuplicate verifies that there's no published repo with the same name
func (collection *PublishedRepoCollection) CheckDuplicate(repo *PublishedRepo) *PublishedRepo {
collection.loadList()
for _, r := range collection.list {
if r.Prefix == repo.Prefix && r.Distribution == repo.Distribution && r.Storage == repo.Storage {
return r
@@ -978,6 +986,8 @@ func (collection *PublishedRepoCollection) LoadComplete(repo *PublishedRepo, col
// ByStoragePrefixDistribution looks up repository by storage, prefix & distribution
func (collection *PublishedRepoCollection) ByStoragePrefixDistribution(storage, prefix, distribution string) (*PublishedRepo, error) {
collection.loadList()
for _, r := range collection.list {
if r.Prefix == prefix && r.Distribution == distribution && r.Storage == storage {
return r, nil
@@ -991,6 +1001,8 @@ func (collection *PublishedRepoCollection) ByStoragePrefixDistribution(storage,
// ByUUID looks up repository by uuid
func (collection *PublishedRepoCollection) ByUUID(uuid string) (*PublishedRepo, error) {
collection.loadList()
for _, r := range collection.list {
if r.UUID == uuid {
return r, nil
@@ -1001,6 +1013,8 @@ func (collection *PublishedRepoCollection) ByUUID(uuid string) (*PublishedRepo,
// BySnapshot looks up repository by snapshot source
func (collection *PublishedRepoCollection) BySnapshot(snapshot *Snapshot) []*PublishedRepo {
collection.loadList()
var result []*PublishedRepo
for _, r := range collection.list {
if r.SourceKind == SourceSnapshot {
@@ -1021,6 +1035,8 @@ func (collection *PublishedRepoCollection) BySnapshot(snapshot *Snapshot) []*Pub
// ByLocalRepo looks up repository by local repo source
func (collection *PublishedRepoCollection) ByLocalRepo(repo *LocalRepo) []*PublishedRepo {
collection.loadList()
var result []*PublishedRepo
for _, r := range collection.list {
if r.SourceKind == SourceLocalRepo {
@@ -1041,18 +1057,21 @@ func (collection *PublishedRepoCollection) ByLocalRepo(repo *LocalRepo) []*Publi
// ForEach runs method for each repository
func (collection *PublishedRepoCollection) ForEach(handler func(*PublishedRepo) error) error {
var err error
for _, r := range collection.list {
err = handler(r)
if err != nil {
return err
return collection.db.ProcessByPrefix([]byte("U"), func(key, blob []byte) error {
r := &PublishedRepo{}
if err := r.Decode(blob); err != nil {
log.Printf("Error decoding published repo: %s\n", err)
return nil
}
}
return err
return handler(r)
})
}
// Len returns number of remote repos
func (collection *PublishedRepoCollection) Len() int {
collection.loadList()
return len(collection.list)
}
@@ -1060,6 +1079,8 @@ func (collection *PublishedRepoCollection) Len() int {
func (collection *PublishedRepoCollection) CleanupPrefixComponentFiles(prefix string, components []string,
publishedStorage aptly.PublishedStorage, collectionFactory *CollectionFactory, progress aptly.Progress) error {
collection.loadList()
var err error
referencedFiles := map[string][]string{}
@@ -1141,6 +1162,9 @@ func (collection *PublishedRepoCollection) CleanupPrefixComponentFiles(prefix st
func (collection *PublishedRepoCollection) Remove(publishedStorageProvider aptly.PublishedStorageProvider,
storage, prefix, distribution string, collectionFactory *CollectionFactory, progress aptly.Progress,
force, skipCleanup bool) error {
collection.loadList()
repo, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
if err != nil {
return err

View File

@@ -660,28 +660,34 @@ type RemoteRepoCollection struct {
// NewRemoteRepoCollection loads RemoteRepos from DB and makes up collection
func NewRemoteRepoCollection(db database.Storage) *RemoteRepoCollection {
result := &RemoteRepoCollection{
return &RemoteRepoCollection{
RWMutex: &sync.RWMutex{},
db: db,
}
}
blobs := db.FetchByPrefix([]byte("R"))
result.list = make([]*RemoteRepo, 0, len(blobs))
func (collection *RemoteRepoCollection) loadList() {
if collection.list != nil {
return
}
blobs := collection.db.FetchByPrefix([]byte("R"))
collection.list = make([]*RemoteRepo, 0, len(blobs))
for _, blob := range blobs {
r := &RemoteRepo{}
if err := r.Decode(blob); err != nil {
log.Printf("Error decoding mirror: %s\n", err)
} else {
result.list = append(result.list, r)
collection.list = append(collection.list, r)
}
}
return result
}
// Add appends new repo to collection and saves it
func (collection *RemoteRepoCollection) Add(repo *RemoteRepo) error {
collection.loadList()
for _, r := range collection.list {
if r.Name == repo.Name {
return fmt.Errorf("mirror with name %s already exists", repo.Name)
@@ -728,6 +734,8 @@ func (collection *RemoteRepoCollection) LoadComplete(repo *RemoteRepo) error {
// ByName looks up repository by name
func (collection *RemoteRepoCollection) ByName(name string) (*RemoteRepo, error) {
collection.loadList()
for _, r := range collection.list {
if r.Name == name {
return r, nil
@@ -738,6 +746,8 @@ func (collection *RemoteRepoCollection) ByName(name string) (*RemoteRepo, error)
// ByUUID looks up repository by uuid
func (collection *RemoteRepoCollection) ByUUID(uuid string) (*RemoteRepo, error) {
collection.loadList()
for _, r := range collection.list {
if r.UUID == uuid {
return r, nil
@@ -748,23 +758,28 @@ func (collection *RemoteRepoCollection) ByUUID(uuid string) (*RemoteRepo, error)
// ForEach runs method for each repository
func (collection *RemoteRepoCollection) ForEach(handler func(*RemoteRepo) error) error {
var err error
for _, r := range collection.list {
err = handler(r)
if err != nil {
return err
return collection.db.ProcessByPrefix([]byte("R"), func(key, blob []byte) error {
r := &RemoteRepo{}
if err := r.Decode(blob); err != nil {
log.Printf("Error decoding mirror: %s\n", err)
return nil
}
}
return err
return handler(r)
})
}
// Len returns number of remote repos
func (collection *RemoteRepoCollection) Len() int {
collection.loadList()
return len(collection.list)
}
// Drop removes remote repo from collection
func (collection *RemoteRepoCollection) Drop(repo *RemoteRepo) error {
collection.loadList()
repoPosition := -1
for i, r := range collection.list {

View File

@@ -179,28 +179,34 @@ type SnapshotCollection struct {
// NewSnapshotCollection loads Snapshots from DB and makes up collection
func NewSnapshotCollection(db database.Storage) *SnapshotCollection {
result := &SnapshotCollection{
return &SnapshotCollection{
RWMutex: &sync.RWMutex{},
db: db,
}
}
blobs := db.FetchByPrefix([]byte("S"))
result.list = make([]*Snapshot, 0, len(blobs))
func (collection *SnapshotCollection) loadList() {
if collection.list != nil {
return
}
blobs := collection.db.FetchByPrefix([]byte("S"))
collection.list = make([]*Snapshot, 0, len(blobs))
for _, blob := range blobs {
s := &Snapshot{}
if err := s.Decode(blob); err != nil {
log.Printf("Error decoding snapshot: %s\n", err)
} else {
result.list = append(result.list, s)
collection.list = append(collection.list, s)
}
}
return result
}
// Add appends new repo to collection and saves it
func (collection *SnapshotCollection) Add(snapshot *Snapshot) error {
collection.loadList()
for _, s := range collection.list {
if s.Name == snapshot.Name {
return fmt.Errorf("snapshot with name %s already exists", snapshot.Name)
@@ -216,7 +222,7 @@ func (collection *SnapshotCollection) Add(snapshot *Snapshot) error {
return nil
}
// Update stores updated information about repo in DB
// Update stores updated information about snapshot in DB
func (collection *SnapshotCollection) Update(snapshot *Snapshot) error {
err := collection.db.Put(snapshot.Key(), snapshot.Encode())
if err != nil {
@@ -241,6 +247,8 @@ func (collection *SnapshotCollection) LoadComplete(snapshot *Snapshot) error {
// ByName looks up snapshot by name
func (collection *SnapshotCollection) ByName(name string) (*Snapshot, error) {
collection.loadList()
for _, s := range collection.list {
if s.Name == name {
return s, nil
@@ -251,6 +259,8 @@ func (collection *SnapshotCollection) ByName(name string) (*Snapshot, error) {
// ByUUID looks up snapshot by UUID
func (collection *SnapshotCollection) ByUUID(uuid string) (*Snapshot, error) {
collection.loadList()
for _, s := range collection.list {
if s.UUID == uuid {
return s, nil
@@ -261,6 +271,8 @@ func (collection *SnapshotCollection) ByUUID(uuid string) (*Snapshot, error) {
// ByRemoteRepoSource looks up snapshots that have specified RemoteRepo as a source
func (collection *SnapshotCollection) ByRemoteRepoSource(repo *RemoteRepo) []*Snapshot {
collection.loadList()
var result []*Snapshot
for _, s := range collection.list {
@@ -273,6 +285,8 @@ func (collection *SnapshotCollection) ByRemoteRepoSource(repo *RemoteRepo) []*Sn
// ByLocalRepoSource looks up snapshots that have specified LocalRepo as a source
func (collection *SnapshotCollection) ByLocalRepoSource(repo *LocalRepo) []*Snapshot {
collection.loadList()
var result []*Snapshot
for _, s := range collection.list {
@@ -285,6 +299,8 @@ func (collection *SnapshotCollection) ByLocalRepoSource(repo *LocalRepo) []*Snap
// BySnapshotSource looks up snapshots that have specified snapshot as a source
func (collection *SnapshotCollection) BySnapshotSource(snapshot *Snapshot) []*Snapshot {
collection.loadList()
var result []*Snapshot
for _, s := range collection.list {
@@ -297,18 +313,21 @@ func (collection *SnapshotCollection) BySnapshotSource(snapshot *Snapshot) []*Sn
// ForEach runs method for each snapshot
func (collection *SnapshotCollection) ForEach(handler func(*Snapshot) error) error {
var err error
for _, s := range collection.list {
err = handler(s)
if err != nil {
return err
return collection.db.ProcessByPrefix([]byte("S"), func(key, blob []byte) error {
s := &Snapshot{}
if err := s.Decode(blob); err != nil {
log.Printf("Error decoding snapshot: %s\n", err)
return nil
}
}
return err
return handler(s)
})
}
// ForEachSorted runs method for each snapshot following some sort order
func (collection *SnapshotCollection) ForEachSorted(sortMethod string, handler func(*Snapshot) error) error {
collection.loadList()
sorter, err := newSnapshotSorter(sortMethod, collection)
if err != nil {
return err
@@ -327,11 +346,15 @@ func (collection *SnapshotCollection) ForEachSorted(sortMethod string, handler f
// Len returns number of snapshots in collection
// ForEach runs method for each snapshot
func (collection *SnapshotCollection) Len() int {
collection.loadList()
return len(collection.list)
}
// Drop removes snapshot from collection
func (collection *SnapshotCollection) Drop(snapshot *Snapshot) error {
collection.loadList()
snapshotPosition := -1
for i, s := range collection.list {

View File

@@ -0,0 +1,38 @@
package deb
import (
"fmt"
"os"
"testing"
"github.com/aptly-dev/aptly/database"
)
func BenchmarkSnapshotCollectionForEach(b *testing.B) {
const count = 1024
tmpDir := os.TempDir()
defer os.RemoveAll(tmpDir)
db, _ := database.NewOpenDB(tmpDir)
defer db.Close()
collection := NewSnapshotCollection(db)
for i := 0; i < count; i++ {
snapshot := NewSnapshotFromRefList(fmt.Sprintf("snapshot%d", i), nil, NewPackageRefList(), fmt.Sprintf("Snapshot number %d", i))
if collection.Add(snapshot) != nil {
b.FailNow()
}
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
collection = NewSnapshotCollection(db)
collection.ForEach(func(s *Snapshot) error {
return nil
})
}
}