mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-06-10 06:14:22 +00:00
Reimplement DB collections for mirrors, repos and snapshots
See #765, #761 Collections were relying on keeping in-memory list of all the objects for any kind of operation which doesn't scale well the number of objects in the database. With this rewrite, objects are loaded only on demand which might be pessimization in some edge cases but should improve performance and memory footprint signifcantly.
This commit is contained in:
+100
-104
@@ -173,8 +173,8 @@ func (s *Snapshot) Decode(input []byte) error {
|
||||
// SnapshotCollection does listing, updating/adding/deleting of Snapshots
|
||||
type SnapshotCollection struct {
|
||||
*sync.RWMutex
|
||||
db database.Storage
|
||||
list []*Snapshot
|
||||
db database.Storage
|
||||
cache map[string]*Snapshot
|
||||
}
|
||||
|
||||
// NewSnapshotCollection loads Snapshots from DB and makes up collection
|
||||
@@ -182,43 +182,23 @@ func NewSnapshotCollection(db database.Storage) *SnapshotCollection {
|
||||
return &SnapshotCollection{
|
||||
RWMutex: &sync.RWMutex{},
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
func (collection *SnapshotCollection) loadList() {
|
||||
if collection.list != nil {
|
||||
return
|
||||
}
|
||||
|
||||
blobs := collection.db.FetchByPrefix([]byte("S"))
|
||||
collection.list = make([]*Snapshot, 0, len(blobs))
|
||||
|
||||
for _, blob := range blobs {
|
||||
s := &Snapshot{}
|
||||
if err := s.Decode(blob); err != nil {
|
||||
log.Printf("Error decoding snapshot: %s\n", err)
|
||||
} else {
|
||||
collection.list = append(collection.list, s)
|
||||
}
|
||||
cache: map[string]*Snapshot{},
|
||||
}
|
||||
}
|
||||
|
||||
// Add appends new repo to collection and saves it
|
||||
func (collection *SnapshotCollection) Add(snapshot *Snapshot) error {
|
||||
collection.loadList()
|
||||
|
||||
for _, s := range collection.list {
|
||||
if s.Name == snapshot.Name {
|
||||
return fmt.Errorf("snapshot with name %s already exists", snapshot.Name)
|
||||
}
|
||||
_, err := collection.ByName(snapshot.Name)
|
||||
if err == nil {
|
||||
return fmt.Errorf("snapshot with name %s already exists", snapshot.Name)
|
||||
}
|
||||
|
||||
err := collection.Update(snapshot)
|
||||
err = collection.Update(snapshot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
collection.list = append(collection.list, snapshot)
|
||||
collection.cache[snapshot.UUID] = snapshot
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -245,70 +225,96 @@ func (collection *SnapshotCollection) LoadComplete(snapshot *Snapshot) error {
|
||||
return snapshot.packageRefs.Decode(encoded)
|
||||
}
|
||||
|
||||
// ByName looks up snapshot by name
|
||||
func (collection *SnapshotCollection) ByName(name string) (*Snapshot, error) {
|
||||
collection.loadList()
|
||||
|
||||
for _, s := range collection.list {
|
||||
if s.Name == name {
|
||||
return s, nil
|
||||
func (collection *SnapshotCollection) search(filter func(*Snapshot) bool, unique bool) []*Snapshot {
|
||||
result := []*Snapshot(nil)
|
||||
for _, s := range collection.cache {
|
||||
if filter(s) {
|
||||
result = append(result, s)
|
||||
}
|
||||
}
|
||||
|
||||
if unique && len(result) > 0 {
|
||||
return result
|
||||
}
|
||||
|
||||
collection.db.ProcessByPrefix([]byte("S"), func(key, blob []byte) error {
|
||||
s := &Snapshot{}
|
||||
if err := s.Decode(blob); err != nil {
|
||||
log.Printf("Error decoding snapshot: %s\n", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if filter(s) {
|
||||
if _, exists := collection.cache[s.UUID]; !exists {
|
||||
collection.cache[s.UUID] = s
|
||||
result = append(result, s)
|
||||
if unique {
|
||||
return errors.New("abort")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// ByName looks up snapshot by name
|
||||
func (collection *SnapshotCollection) ByName(name string) (*Snapshot, error) {
|
||||
result := collection.search(func(s *Snapshot) bool { return s.Name == name }, true)
|
||||
if len(result) > 0 {
|
||||
return result[0], nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("snapshot with name %s not found", name)
|
||||
}
|
||||
|
||||
// ByUUID looks up snapshot by UUID
|
||||
func (collection *SnapshotCollection) ByUUID(uuid string) (*Snapshot, error) {
|
||||
collection.loadList()
|
||||
|
||||
for _, s := range collection.list {
|
||||
if s.UUID == uuid {
|
||||
return s, nil
|
||||
}
|
||||
if s, ok := collection.cache[uuid]; ok {
|
||||
return s, nil
|
||||
}
|
||||
return nil, fmt.Errorf("snapshot with uuid %s not found", uuid)
|
||||
|
||||
key := (&Snapshot{UUID: uuid}).Key()
|
||||
|
||||
value, err := collection.db.Get(key)
|
||||
if err == database.ErrNotFound {
|
||||
return nil, fmt.Errorf("snapshot with uuid %s not found", uuid)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s := &Snapshot{}
|
||||
err = s.Decode(value)
|
||||
|
||||
if err == nil {
|
||||
collection.cache[s.UUID] = s
|
||||
}
|
||||
|
||||
return s, err
|
||||
}
|
||||
|
||||
// ByRemoteRepoSource looks up snapshots that have specified RemoteRepo as a source
|
||||
func (collection *SnapshotCollection) ByRemoteRepoSource(repo *RemoteRepo) []*Snapshot {
|
||||
collection.loadList()
|
||||
|
||||
var result []*Snapshot
|
||||
|
||||
for _, s := range collection.list {
|
||||
if s.SourceKind == SourceRemoteRepo && utils.StrSliceHasItem(s.SourceIDs, repo.UUID) {
|
||||
result = append(result, s)
|
||||
}
|
||||
}
|
||||
return result
|
||||
return collection.search(func(s *Snapshot) bool {
|
||||
return s.SourceKind == SourceRemoteRepo && utils.StrSliceHasItem(s.SourceIDs, repo.UUID)
|
||||
}, false)
|
||||
}
|
||||
|
||||
// ByLocalRepoSource looks up snapshots that have specified LocalRepo as a source
|
||||
func (collection *SnapshotCollection) ByLocalRepoSource(repo *LocalRepo) []*Snapshot {
|
||||
collection.loadList()
|
||||
|
||||
var result []*Snapshot
|
||||
|
||||
for _, s := range collection.list {
|
||||
if s.SourceKind == SourceLocalRepo && utils.StrSliceHasItem(s.SourceIDs, repo.UUID) {
|
||||
result = append(result, s)
|
||||
}
|
||||
}
|
||||
return result
|
||||
return collection.search(func(s *Snapshot) bool {
|
||||
return s.SourceKind == SourceLocalRepo && utils.StrSliceHasItem(s.SourceIDs, repo.UUID)
|
||||
}, false)
|
||||
}
|
||||
|
||||
// BySnapshotSource looks up snapshots that have specified snapshot as a source
|
||||
func (collection *SnapshotCollection) BySnapshotSource(snapshot *Snapshot) []*Snapshot {
|
||||
collection.loadList()
|
||||
|
||||
var result []*Snapshot
|
||||
|
||||
for _, s := range collection.list {
|
||||
if s.SourceKind == "snapshot" && utils.StrSliceHasItem(s.SourceIDs, snapshot.UUID) {
|
||||
result = append(result, s)
|
||||
}
|
||||
}
|
||||
return result
|
||||
return collection.search(func(s *Snapshot) bool {
|
||||
return s.SourceKind == "snapshot" && utils.StrSliceHasItem(s.SourceIDs, snapshot.UUID)
|
||||
}, false)
|
||||
}
|
||||
|
||||
// ForEach runs method for each snapshot
|
||||
@@ -326,15 +332,25 @@ func (collection *SnapshotCollection) ForEach(handler func(*Snapshot) error) err
|
||||
|
||||
// ForEachSorted runs method for each snapshot following some sort order
|
||||
func (collection *SnapshotCollection) ForEachSorted(sortMethod string, handler func(*Snapshot) error) error {
|
||||
collection.loadList()
|
||||
blobs := collection.db.FetchByPrefix([]byte("S"))
|
||||
list := make([]*Snapshot, 0, len(blobs))
|
||||
|
||||
sorter, err := newSnapshotSorter(sortMethod, collection)
|
||||
for _, blob := range blobs {
|
||||
s := &Snapshot{}
|
||||
if err := s.Decode(blob); err != nil {
|
||||
log.Printf("Error decoding snapshot: %s\n", err)
|
||||
} else {
|
||||
list = append(list, s)
|
||||
}
|
||||
}
|
||||
|
||||
sorter, err := newSnapshotSorter(sortMethod, list)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, i := range sorter.list {
|
||||
err = handler(collection.list[i])
|
||||
for _, s := range sorter.list {
|
||||
err = handler(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -346,30 +362,16 @@ func (collection *SnapshotCollection) ForEachSorted(sortMethod string, handler f
|
||||
// Len returns number of snapshots in collection
|
||||
// ForEach runs method for each snapshot
|
||||
func (collection *SnapshotCollection) Len() int {
|
||||
collection.loadList()
|
||||
|
||||
return len(collection.list)
|
||||
return len(collection.db.KeysByPrefix([]byte("S")))
|
||||
}
|
||||
|
||||
// Drop removes snapshot from collection
|
||||
func (collection *SnapshotCollection) Drop(snapshot *Snapshot) error {
|
||||
collection.loadList()
|
||||
|
||||
snapshotPosition := -1
|
||||
|
||||
for i, s := range collection.list {
|
||||
if s == snapshot {
|
||||
snapshotPosition = i
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if snapshotPosition == -1 {
|
||||
if _, err := collection.db.Get(snapshot.Key()); err == database.ErrNotFound {
|
||||
panic("snapshot not found!")
|
||||
}
|
||||
|
||||
collection.list[len(collection.list)-1], collection.list[snapshotPosition], collection.list =
|
||||
nil, collection.list[len(collection.list)-1], collection.list[:len(collection.list)-1]
|
||||
delete(collection.cache, snapshot.UUID)
|
||||
|
||||
err := collection.db.Delete(snapshot.Key())
|
||||
if err != nil {
|
||||
@@ -386,13 +388,12 @@ const (
|
||||
)
|
||||
|
||||
type snapshotSorter struct {
|
||||
list []int
|
||||
collection *SnapshotCollection
|
||||
list []*Snapshot
|
||||
sortMethod int
|
||||
}
|
||||
|
||||
func newSnapshotSorter(sortMethod string, collection *SnapshotCollection) (*snapshotSorter, error) {
|
||||
s := &snapshotSorter{collection: collection}
|
||||
func newSnapshotSorter(sortMethod string, list []*Snapshot) (*snapshotSorter, error) {
|
||||
s := &snapshotSorter{list: list}
|
||||
|
||||
switch sortMethod {
|
||||
case "time", "Time":
|
||||
@@ -403,11 +404,6 @@ func newSnapshotSorter(sortMethod string, collection *SnapshotCollection) (*snap
|
||||
return nil, fmt.Errorf("sorting method \"%s\" unknown", sortMethod)
|
||||
}
|
||||
|
||||
s.list = make([]int, len(collection.list))
|
||||
for i := range s.list {
|
||||
s.list[i] = i
|
||||
}
|
||||
|
||||
sort.Sort(s)
|
||||
|
||||
return s, nil
|
||||
@@ -420,9 +416,9 @@ func (s *snapshotSorter) Swap(i, j int) {
|
||||
func (s *snapshotSorter) Less(i, j int) bool {
|
||||
switch s.sortMethod {
|
||||
case SortName:
|
||||
return s.collection.list[s.list[i]].Name < s.collection.list[s.list[j]].Name
|
||||
return s.list[i].Name < s.list[j].Name
|
||||
case SortTime:
|
||||
return s.collection.list[s.list[i]].CreatedAt.Before(s.collection.list[s.list[j]].CreatedAt)
|
||||
return s.list[i].CreatedAt.Before(s.list[j].CreatedAt)
|
||||
}
|
||||
panic("unknown sort method")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user