mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-04-20 19:38:39 +00:00
In current aptly, each repository and snapshot has its own reflist in the database. This brings a few problems with it: - Given a sufficiently large repositories and snapshots, these lists can get enormous, reaching >1MB. This is a problem for LevelDB's overall performance, as it tends to prefer values around the confiruged block size (defaults to just 4KiB). - When you take these large repositories and snapshot them, you have a full, new copy of the reflist, even if only a few packages changed. This means that having a lot of snapshots with a few changes causes the database to basically be full of largely duplicate reflists. - All the duplication also means that many of the same refs are being loaded repeatedly, which can cause some slowdown but, more notably, eats up huge amounts of memory. - Adding on more and more new repositories and snapshots will cause the time and memory spent on things like cleanup and publishing to grow roughly linearly. At the core, there are two problems here: - Reflists get very big because there are just a lot of packages. - Different reflists can tend to duplicate much of the same contents. *Split reflists* aim at solving this by separating reflists into 64 *buckets*. Package refs are sorted into individual buckets according to the following system: - Take the first 3 letters of the package name, after dropping a `lib` prefix. (Using only the first 3 letters will cause packages with similar prefixes to end up in the same bucket, under the assumption that packages with similar names tend to be updated together.) - Take the 64-bit xxhash of these letters. (xxhash was chosen because it relatively good distribution across the individual bits, which is important for the next step.) - Use the first 6 bits of the hash (range [0:63]) as an index into the buckets. Once refs are placed in buckets, a sha256 digest of all the refs in the bucket is taken. These buckets are then stored in the database, split into roughly block-sized segments, and all the repositories and snapshots simply store an array of bucket digests. This approach means that *repositories and snapshots can share their reflist buckets*. If a snapshot is taken of a repository, it will have the same contents, so its split reflist will point to the same buckets as the base repository, and only one copy of each bucket is stored in the database. When some packages in the repository change, only the buckets containing those packages will be modified; all the other buckets will remain unchanged, and thus their contents will still be shared. Later on, when these reflists are loaded, each bucket is only loaded once, short-cutting loaded many megabytes of data. In effect, split reflists are essentially copy-on-write, with only the changed buckets stored individually. Changing the disk format means that a migration needs to take place, so that task is moved into the database cleanup step, which will migrate reflists over to split reflists, as well as delete any unused reflist buckets. All the reflist tests are also changed to additionally test out split reflists; although the internal logic is all shared (since buckets are, themselves, just normal reflists), some special additions are needed to have native versions of the various reflist helper methods. In our tests, we've observed the following improvements: - Memory usage during publish and database cleanup, with `GOMEMLIMIT=2GiB`, goes down from ~3.2GiB (larger than the memory limit!) to ~0.7GiB, a decrease of ~4.5x. - Database size decreases from 1.3GB to 367MB. *In my local tests*, publish times had also decreased down to mere seconds but the same effect wasn't observed on the server, with the times staying around the same. My suspicions are that this is due to I/O performance: my local system is an M1 MBP, which almost certainly has much faster disk speeds than our DigitalOcean block volumes. Split reflists include a side effect of requiring more random accesses from reading all the buckets by their keys, so if your random I/O performance is slower, it might cancel out the benefits. That being said, even in that case, the memory usage and database size advantages still persist. Signed-off-by: Ryan Gonzalez <ryan.gonzalez@collabora.com>
1160 lines
31 KiB
Go
1160 lines
31 KiB
Go
package deb
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/sha256"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sort"
|
|
|
|
"github.com/AlekSi/pointer"
|
|
"github.com/aptly-dev/aptly/database"
|
|
"github.com/cespare/xxhash/v2"
|
|
"github.com/ugorji/go/codec"
|
|
)
|
|
|
|
// PackageRefList is a list of keys of packages, this is basis for snapshot
|
|
// and similar stuff
|
|
//
|
|
// Refs are sorted in lexicographical order
|
|
type PackageRefList struct {
|
|
// List of package keys
|
|
Refs [][]byte
|
|
}
|
|
|
|
// Verify interface
|
|
var (
|
|
_ sort.Interface = &PackageRefList{}
|
|
)
|
|
|
|
// NewPackageRefList creates empty PackageRefList
|
|
func NewPackageRefList() *PackageRefList {
|
|
return &PackageRefList{}
|
|
}
|
|
|
|
// NewPackageRefListFromPackageList creates PackageRefList from PackageList
|
|
func NewPackageRefListFromPackageList(list *PackageList) *PackageRefList {
|
|
reflist := &PackageRefList{}
|
|
reflist.Refs = make([][]byte, list.Len())
|
|
|
|
i := 0
|
|
for _, p := range list.packages {
|
|
reflist.Refs[i] = p.Key("")
|
|
i++
|
|
}
|
|
|
|
sort.Sort(reflist)
|
|
|
|
return reflist
|
|
}
|
|
|
|
func (l *PackageRefList) Clone() *PackageRefList {
|
|
clone := &PackageRefList{}
|
|
clone.Refs = make([][]byte, l.Len())
|
|
copy(clone.Refs, l.Refs)
|
|
return clone
|
|
}
|
|
|
|
// Len returns number of refs
|
|
func (l *PackageRefList) Len() int {
|
|
return len(l.Refs)
|
|
}
|
|
|
|
// Swap swaps two refs
|
|
func (l *PackageRefList) Swap(i, j int) {
|
|
l.Refs[i], l.Refs[j] = l.Refs[j], l.Refs[i]
|
|
}
|
|
|
|
// Compare compares two refs in lexographical order
|
|
func (l *PackageRefList) Less(i, j int) bool {
|
|
return bytes.Compare(l.Refs[i], l.Refs[j]) < 0
|
|
}
|
|
|
|
// Encode does msgpack encoding of PackageRefList
|
|
func (l *PackageRefList) Encode() []byte {
|
|
var buf bytes.Buffer
|
|
|
|
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
|
|
encoder.Encode(l)
|
|
|
|
return buf.Bytes()
|
|
}
|
|
|
|
// Decode decodes msgpack representation into PackageRefLit
|
|
func (l *PackageRefList) Decode(input []byte) error {
|
|
handle := &codec.MsgpackHandle{}
|
|
handle.ZeroCopy = true
|
|
decoder := codec.NewDecoderBytes(input, handle)
|
|
return decoder.Decode(l)
|
|
}
|
|
|
|
// ForEach calls handler for each package ref in list
|
|
func (l *PackageRefList) ForEach(handler func([]byte) error) error {
|
|
var err error
|
|
for _, p := range l.Refs {
|
|
err = handler(p)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Has checks whether package is part of reflist
|
|
func (l *PackageRefList) Has(p *Package) bool {
|
|
key := p.Key("")
|
|
|
|
i := sort.Search(len(l.Refs), func(j int) bool { return bytes.Compare(l.Refs[j], key) >= 0 })
|
|
return i < len(l.Refs) && bytes.Equal(l.Refs[i], key)
|
|
}
|
|
|
|
// Strings builds list of strings with package keys
|
|
func (l *PackageRefList) Strings() []string {
|
|
if l == nil {
|
|
return []string{}
|
|
}
|
|
|
|
result := make([]string, l.Len())
|
|
|
|
for i := 0; i < l.Len(); i++ {
|
|
result[i] = string(l.Refs[i])
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// Subtract returns all packages in l that are not in r
|
|
func (l *PackageRefList) Subtract(r *PackageRefList) *PackageRefList {
|
|
result := &PackageRefList{Refs: make([][]byte, 0, 128)}
|
|
|
|
// pointer to left and right reflists
|
|
il, ir := 0, 0
|
|
// length of reflists
|
|
ll, lr := l.Len(), r.Len()
|
|
|
|
for il < ll || ir < lr {
|
|
if il == ll {
|
|
// left list exhausted, we got the result
|
|
break
|
|
}
|
|
if ir == lr {
|
|
// right list exhausted, append what is left to result
|
|
result.Refs = append(result.Refs, l.Refs[il:]...)
|
|
break
|
|
}
|
|
|
|
rel := bytes.Compare(l.Refs[il], r.Refs[ir])
|
|
if rel == 0 {
|
|
// r contains entry from l, so we skip it
|
|
il++
|
|
ir++
|
|
} else if rel < 0 {
|
|
// item il is not in r, append
|
|
result.Refs = append(result.Refs, l.Refs[il])
|
|
il++
|
|
} else {
|
|
// skip over to next item in r
|
|
ir++
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// PackageDiff is a difference between two packages in a list.
|
|
//
|
|
// If left & right are present, difference is in package version
|
|
// If left is nil, package is present only in right
|
|
// If right is nil, package is present only in left
|
|
type PackageDiff struct {
|
|
Left, Right *Package
|
|
}
|
|
|
|
// Check interface
|
|
var (
|
|
_ json.Marshaler = PackageDiff{}
|
|
)
|
|
|
|
// MarshalJSON implements json.Marshaler interface
|
|
func (d PackageDiff) MarshalJSON() ([]byte, error) {
|
|
serialized := struct {
|
|
Left, Right *string
|
|
}{}
|
|
|
|
if d.Left != nil {
|
|
serialized.Left = pointer.ToString(string(d.Left.Key("")))
|
|
}
|
|
if d.Right != nil {
|
|
serialized.Right = pointer.ToString(string(d.Right.Key("")))
|
|
}
|
|
|
|
return json.Marshal(serialized)
|
|
}
|
|
|
|
// PackageDiffs is a list of PackageDiff records
|
|
type PackageDiffs []PackageDiff
|
|
|
|
// Diff calculates difference between two reflists
|
|
func (l *PackageRefList) Diff(r *PackageRefList, packageCollection *PackageCollection, result PackageDiffs) (PackageDiffs, error) {
|
|
var err error
|
|
|
|
if result == nil {
|
|
result = make(PackageDiffs, 0, 128)
|
|
}
|
|
|
|
// pointer to left and right reflists
|
|
il, ir := 0, 0
|
|
// length of reflists
|
|
ll, lr := l.Len(), r.Len()
|
|
// cached loaded packages on the left & right
|
|
pl, pr := (*Package)(nil), (*Package)(nil)
|
|
|
|
// until we reached end of both lists
|
|
for il < ll || ir < lr {
|
|
var rl, rr []byte
|
|
if il < ll {
|
|
rl = l.Refs[il]
|
|
}
|
|
if ir < lr {
|
|
rr = r.Refs[ir]
|
|
}
|
|
|
|
// compare refs
|
|
rel := bytes.Compare(rl, rr)
|
|
// an unset ref is less than all others, but since it represents the end
|
|
// of a reflist, it should be *greater*, so flip the comparison result
|
|
if rl == nil || rr == nil {
|
|
rel = -rel
|
|
}
|
|
|
|
if rel == 0 {
|
|
// refs are identical, so are packages, advance pointer
|
|
il++
|
|
ir++
|
|
pl, pr = nil, nil
|
|
} else {
|
|
// load pl & pr if they haven't been loaded before
|
|
if pl == nil && rl != nil {
|
|
pl, err = packageCollection.ByKey(rl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if pr == nil && rr != nil {
|
|
pr, err = packageCollection.ByKey(rr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// otherwise pl or pr is missing on one of the sides
|
|
if rel < 0 {
|
|
// compaction: +(,A) -(B,) --> !(A,B)
|
|
if len(result) > 0 && result[len(result)-1].Left == nil && result[len(result)-1].Right.Name == pl.Name &&
|
|
result[len(result)-1].Right.Architecture == pl.Architecture {
|
|
result[len(result)-1] = PackageDiff{Left: pl, Right: result[len(result)-1].Right}
|
|
} else {
|
|
result = append(result, PackageDiff{Left: pl, Right: nil})
|
|
}
|
|
il++
|
|
pl = nil
|
|
} else {
|
|
// compaction: -(A,) +(,B) --> !(A,B)
|
|
if len(result) > 0 && result[len(result)-1].Right == nil && result[len(result)-1].Left.Name == pr.Name &&
|
|
result[len(result)-1].Left.Architecture == pr.Architecture {
|
|
result[len(result)-1] = PackageDiff{Left: result[len(result)-1].Left, Right: pr}
|
|
} else {
|
|
result = append(result, PackageDiff{Left: nil, Right: pr})
|
|
}
|
|
ir++
|
|
pr = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// Merge merges reflist r into current reflist. If overrideMatching, merge
|
|
// replaces matching packages (by architecture/name) with reference from r.
|
|
// If ignoreConflicting is set, all packages are preserved, otherwise conflicting
|
|
// packages are overwritten with packages from "right" snapshot.
|
|
func (l *PackageRefList) Merge(r *PackageRefList, overrideMatching, ignoreConflicting bool) (result *PackageRefList) {
|
|
var overriddenArch, overridenName []byte
|
|
|
|
// pointer to left and right reflists
|
|
il, ir := 0, 0
|
|
// length of reflists
|
|
ll, lr := l.Len(), r.Len()
|
|
|
|
result = &PackageRefList{}
|
|
result.Refs = make([][]byte, 0, ll+lr)
|
|
|
|
// until we reached end of both lists
|
|
for il < ll || ir < lr {
|
|
// if we've exhausted left list, pull the rest from the right
|
|
if il == ll {
|
|
result.Refs = append(result.Refs, r.Refs[ir:]...)
|
|
break
|
|
}
|
|
// if we've exhausted right list, pull the rest from the left
|
|
if ir == lr {
|
|
result.Refs = append(result.Refs, l.Refs[il:]...)
|
|
break
|
|
}
|
|
|
|
// refs on both sides are present, load them
|
|
rl, rr := l.Refs[il], r.Refs[ir]
|
|
// compare refs
|
|
rel := bytes.Compare(rl, rr)
|
|
|
|
if rel == 0 {
|
|
// refs are identical, so are packages, advance pointer
|
|
result.Refs = append(result.Refs, l.Refs[il])
|
|
il++
|
|
ir++
|
|
overridenName = nil
|
|
overriddenArch = nil
|
|
} else {
|
|
if !ignoreConflicting || overrideMatching {
|
|
partsL := bytes.Split(rl, []byte(" "))
|
|
archL, nameL, versionL := partsL[0][1:], partsL[1], partsL[2]
|
|
|
|
partsR := bytes.Split(rr, []byte(" "))
|
|
archR, nameR, versionR := partsR[0][1:], partsR[1], partsR[2]
|
|
|
|
if !ignoreConflicting && bytes.Equal(archL, archR) &&
|
|
bytes.Equal(nameL, nameR) && bytes.Equal(versionL, versionR) {
|
|
// conflicting duplicates with same arch, name, version, but different file hash
|
|
result.Refs = append(result.Refs, r.Refs[ir])
|
|
il++
|
|
ir++
|
|
overridenName = nil
|
|
overriddenArch = nil
|
|
continue
|
|
}
|
|
|
|
if overrideMatching {
|
|
if bytes.Equal(archL, overriddenArch) && bytes.Equal(nameL, overridenName) {
|
|
// this package has already been overridden on the right
|
|
il++
|
|
continue
|
|
}
|
|
|
|
if bytes.Equal(archL, archR) && bytes.Equal(nameL, nameR) {
|
|
// override with package from the right
|
|
result.Refs = append(result.Refs, r.Refs[ir])
|
|
il++
|
|
ir++
|
|
overriddenArch = archL
|
|
overridenName = nameL
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
// otherwise append smallest of two
|
|
if rel < 0 {
|
|
result.Refs = append(result.Refs, l.Refs[il])
|
|
il++
|
|
} else {
|
|
result.Refs = append(result.Refs, r.Refs[ir])
|
|
ir++
|
|
overridenName = nil
|
|
overriddenArch = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// FilterLatestRefs takes in a reflist with potentially multiples of the same
|
|
// packages and reduces it to only the latest of each package. The operations
|
|
// are done in-place. This implements a "latest wins" approach which can be used
|
|
// while merging two or more snapshots together.
|
|
func (l *PackageRefList) FilterLatestRefs() {
|
|
var (
|
|
lastArch, lastName, lastVer []byte
|
|
arch, name, ver []byte
|
|
parts [][]byte
|
|
)
|
|
|
|
for i := 0; i < len(l.Refs); i++ {
|
|
parts = bytes.Split(l.Refs[i][1:], []byte(" "))
|
|
arch, name, ver = parts[0], parts[1], parts[2]
|
|
|
|
if bytes.Equal(arch, lastArch) && bytes.Equal(name, lastName) {
|
|
// Two packages are identical, check version and only one wins
|
|
vres := CompareVersions(string(ver), string(lastVer))
|
|
|
|
// Remove the older refs from the result
|
|
if vres > 0 {
|
|
// ver[i] > ver[i-1], remove element i-1
|
|
l.Refs = append(l.Refs[:i-1], l.Refs[i:]...)
|
|
} else {
|
|
// ver[i] < ver[i-1], remove element i
|
|
l.Refs = append(l.Refs[:i], l.Refs[i+1:]...)
|
|
arch, name, ver = lastArch, lastName, lastVer
|
|
}
|
|
|
|
// Compensate for the reduced set
|
|
i--
|
|
}
|
|
|
|
lastArch, lastName, lastVer = arch, name, ver
|
|
}
|
|
}
|
|
|
|
const (
|
|
reflistBucketCount = 1 << 6
|
|
reflistBucketMask = reflistBucketCount - 1
|
|
)
|
|
|
|
type reflistDigestArray [sha256.Size]byte
|
|
|
|
func bucketRefPrefix(ref []byte) []byte {
|
|
const maxPrefixLen = 3
|
|
|
|
// Cut out the arch, leaving behind the package name and subsequent info.
|
|
_, ref, _ = bytes.Cut(ref, []byte{' '})
|
|
|
|
// Strip off the lib prefix, so that "libxyz" and "xyz", which are likely
|
|
// to be updated together, go in the same bucket.
|
|
libPrefix := []byte("lib")
|
|
if bytes.HasPrefix(ref, libPrefix) {
|
|
ref = ref[len(libPrefix):]
|
|
}
|
|
|
|
prefixLen := min(maxPrefixLen, len(ref))
|
|
prefix, _, _ := bytes.Cut(ref[:prefixLen], []byte{' '})
|
|
return prefix
|
|
}
|
|
|
|
func bucketIdxForRef(ref []byte) int {
|
|
return int(xxhash.Sum64(bucketRefPrefix(ref))) & reflistBucketMask
|
|
}
|
|
|
|
// SplitRefList is a list of package refs, similar to a PackageRefList. However,
|
|
// instead of storing a linear array of refs, SplitRefList splits the refs into
|
|
// PackageRefList "buckets", based on a hash of the package name inside the ref.
|
|
// Each bucket has a digest of its contents that serves as its key in the database.
|
|
//
|
|
// When serialized, a SplitRefList just becomes an array of bucket digests, and
|
|
// the buckets themselves are stored separately. Because the buckets are then
|
|
// referenced by their digests, multiple independent reflists can share buckets,
|
|
// if their buckets have matching digests.
|
|
//
|
|
// Buckets themselves may not be confirmed to a single database value; instead,
|
|
// they're split into "segments", based on the database's preferred maximum
|
|
// value size. This prevents large buckets from slowing down the database.
|
|
type SplitRefList struct {
|
|
Buckets [][]byte
|
|
|
|
bucketRefs []*PackageRefList
|
|
}
|
|
|
|
// NewSplitRefList creates empty SplitRefList
|
|
func NewSplitRefList() *SplitRefList {
|
|
sl := &SplitRefList{}
|
|
sl.reset()
|
|
return sl
|
|
}
|
|
|
|
// NewSplitRefListFromRefList creates SplitRefList from PackageRefList
|
|
func NewSplitRefListFromRefList(reflist *PackageRefList) *SplitRefList {
|
|
sl := NewSplitRefList()
|
|
sl.Replace(reflist)
|
|
return sl
|
|
}
|
|
|
|
// NewSplitRefListFromRefList creates SplitRefList from PackageList
|
|
func NewSplitRefListFromPackageList(list *PackageList) *SplitRefList {
|
|
return NewSplitRefListFromRefList(NewPackageRefListFromPackageList(list))
|
|
}
|
|
|
|
func (sl *SplitRefList) reset() {
|
|
sl.Buckets = make([][]byte, reflistBucketCount)
|
|
sl.bucketRefs = make([]*PackageRefList, reflistBucketCount)
|
|
}
|
|
|
|
// Has checks whether package is part of reflist
|
|
func (sl *SplitRefList) Has(p *Package) bool {
|
|
idx := bucketIdxForRef(p.Key(""))
|
|
if bucket := sl.bucketRefs[idx]; bucket != nil {
|
|
return bucket.Has(p)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Len returns number of refs
|
|
func (sl *SplitRefList) Len() int {
|
|
total := 0
|
|
for _, bucket := range sl.bucketRefs {
|
|
if bucket != nil {
|
|
total += bucket.Len()
|
|
}
|
|
}
|
|
return total
|
|
}
|
|
|
|
func reflistDigest(l *PackageRefList) []byte {
|
|
// Different algorithms on PackageRefLists will sometimes return a nil slice
|
|
// of refs and other times return an empty slice. Regardless, they should
|
|
// both be treated identically and be given an empty digest.
|
|
if len(l.Refs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
h := sha256.New()
|
|
for _, ref := range l.Refs {
|
|
h.Write(ref)
|
|
h.Write([]byte{0})
|
|
}
|
|
return h.Sum(nil)
|
|
}
|
|
|
|
// Removes all the refs inside and replaces them with those in the given reflist
|
|
func (sl *SplitRefList) Replace(reflist *PackageRefList) {
|
|
sl.reset()
|
|
|
|
for _, ref := range reflist.Refs {
|
|
idx := bucketIdxForRef(ref)
|
|
|
|
bucket := sl.bucketRefs[idx]
|
|
if bucket == nil {
|
|
bucket = NewPackageRefList()
|
|
sl.bucketRefs[idx] = bucket
|
|
}
|
|
|
|
bucket.Refs = append(bucket.Refs, ref)
|
|
}
|
|
|
|
for idx, bucket := range sl.bucketRefs {
|
|
if bucket != nil {
|
|
sort.Sort(bucket)
|
|
sl.Buckets[idx] = reflistDigest(bucket)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Merge merges reflist r into current reflist (see PackageRefList.Merge)
|
|
func (sl *SplitRefList) Merge(r *SplitRefList, overrideMatching, ignoreConflicting bool) (result *SplitRefList) {
|
|
result = NewSplitRefList()
|
|
|
|
var empty PackageRefList
|
|
for idx, lbucket := range sl.bucketRefs {
|
|
rbucket := r.bucketRefs[idx]
|
|
if lbucket == nil && rbucket == nil {
|
|
continue
|
|
}
|
|
|
|
if lbucket == nil {
|
|
lbucket = &empty
|
|
} else if rbucket == nil {
|
|
rbucket = &empty
|
|
}
|
|
|
|
result.bucketRefs[idx] = lbucket.Merge(rbucket, overrideMatching, ignoreConflicting)
|
|
result.Buckets[idx] = reflistDigest(result.bucketRefs[idx])
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Subtract returns all packages in l that are not in r
|
|
func (sl *SplitRefList) Subtract(r *SplitRefList) (result *SplitRefList) {
|
|
result = NewSplitRefList()
|
|
|
|
for idx, lbucket := range sl.bucketRefs {
|
|
rbucket := r.bucketRefs[idx]
|
|
if lbucket != nil {
|
|
if rbucket != nil {
|
|
result.bucketRefs[idx] = lbucket.Subtract(rbucket)
|
|
result.Buckets[idx] = reflistDigest(result.bucketRefs[idx])
|
|
} else {
|
|
result.bucketRefs[idx] = lbucket.Clone()
|
|
result.Buckets[idx] = sl.Buckets[idx]
|
|
}
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Diff calculates difference between two reflists
|
|
func (sl *SplitRefList) Diff(r *SplitRefList, packageCollection *PackageCollection, result PackageDiffs) (PackageDiffs, error) {
|
|
var err error
|
|
|
|
if result == nil {
|
|
result = make(PackageDiffs, 0, 128)
|
|
}
|
|
|
|
var empty PackageRefList
|
|
for idx, lbucket := range sl.bucketRefs {
|
|
rbucket := r.bucketRefs[idx]
|
|
if lbucket != nil {
|
|
if rbucket != nil {
|
|
result, err = lbucket.Diff(rbucket, packageCollection, result)
|
|
} else {
|
|
result, err = lbucket.Diff(&empty, packageCollection, result)
|
|
}
|
|
} else if rbucket != nil {
|
|
result, err = empty.Diff(rbucket, packageCollection, result)
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
sort.Slice(result, func(i, j int) bool {
|
|
var ri, rj []byte
|
|
if result[i].Left != nil {
|
|
ri = result[i].Left.Key("")
|
|
} else {
|
|
ri = result[i].Right.Key("")
|
|
}
|
|
if result[j].Left != nil {
|
|
rj = result[j].Left.Key("")
|
|
} else {
|
|
rj = result[j].Right.Key("")
|
|
}
|
|
|
|
return bytes.Compare(ri, rj) < 0
|
|
})
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// FilterLatestRefs reduces a reflist to the latest of each package (see PackageRefList.FilterLatestRefs)
|
|
func (sl *SplitRefList) FilterLatestRefs() {
|
|
for idx, bucket := range sl.bucketRefs {
|
|
if bucket != nil {
|
|
bucket.FilterLatestRefs()
|
|
sl.Buckets[idx] = reflistDigest(bucket)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Flatten creates a flat PackageRefList containing all the refs in this reflist
|
|
func (sl *SplitRefList) Flatten() *PackageRefList {
|
|
reflist := NewPackageRefList()
|
|
sl.ForEach(func(ref []byte) error {
|
|
reflist.Refs = append(reflist.Refs, ref)
|
|
return nil
|
|
})
|
|
sort.Sort(reflist)
|
|
return reflist
|
|
}
|
|
|
|
// ForEachBucket calls handler for each bucket in list
|
|
func (sl *SplitRefList) ForEachBucket(handler func(digest []byte, bucket *PackageRefList) error) error {
|
|
for idx, digest := range sl.Buckets {
|
|
if len(digest) == 0 {
|
|
continue
|
|
}
|
|
|
|
bucket := sl.bucketRefs[idx]
|
|
if bucket != nil {
|
|
if err := handler(digest, bucket); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ForEach calls handler for each package ref in list
|
|
//
|
|
// IMPORTANT: unlike PackageRefList.ForEach, the order of handler invocations
|
|
// is *not* guaranteed to be sorted.
|
|
func (sl *SplitRefList) ForEach(handler func([]byte) error) error {
|
|
for idx, digest := range sl.Buckets {
|
|
if len(digest) == 0 {
|
|
continue
|
|
}
|
|
|
|
bucket := sl.bucketRefs[idx]
|
|
if bucket != nil {
|
|
if err := bucket.ForEach(handler); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RefListDigestSet is a set of SplitRefList bucket digests
|
|
type RefListDigestSet struct {
|
|
items map[reflistDigestArray]struct{}
|
|
}
|
|
|
|
// NewRefListDigestSet creates empty RefListDigestSet
|
|
func NewRefListDigestSet() *RefListDigestSet {
|
|
return &RefListDigestSet{items: map[reflistDigestArray]struct{}{}}
|
|
}
|
|
|
|
// Len returns number of digests in the set
|
|
func (set *RefListDigestSet) Len() int {
|
|
return len(set.items)
|
|
}
|
|
|
|
// ForEach calls handler for each digest in the set
|
|
func (set *RefListDigestSet) ForEach(handler func(digest []byte) error) error {
|
|
for digest := range set.items {
|
|
if err := handler(digest[:]); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Add adds digest to set, doing nothing if the digest was already present
|
|
func (set *RefListDigestSet) Add(digest []byte) {
|
|
set.items[reflistDigestArray(digest)] = struct{}{}
|
|
}
|
|
|
|
// AddAllInRefList adds all the bucket digests in a SplitRefList to the set
|
|
func (set *RefListDigestSet) AddAllInRefList(sl *SplitRefList) {
|
|
for _, digest := range sl.Buckets {
|
|
if len(digest) > 0 {
|
|
set.Add(digest)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Has checks whether a digest is part of set
|
|
func (set *RefListDigestSet) Has(digest []byte) bool {
|
|
_, ok := set.items[reflistDigestArray(digest)]
|
|
return ok
|
|
}
|
|
|
|
// Remove removes a digest from set
|
|
func (set *RefListDigestSet) Remove(digest []byte) {
|
|
delete(set.items, reflistDigestArray(digest))
|
|
}
|
|
|
|
// RemoveAll removes all the digests in other from the current set
|
|
func (set *RefListDigestSet) RemoveAll(other *RefListDigestSet) {
|
|
for digest := range other.items {
|
|
delete(set.items, digest)
|
|
}
|
|
}
|
|
|
|
// RefListCollection does listing, updating/adding/deleting of SplitRefLists
|
|
type RefListCollection struct {
|
|
db database.Storage
|
|
|
|
cache map[reflistDigestArray]*PackageRefList
|
|
}
|
|
|
|
// NewRefListCollection creates a RefListCollection
|
|
func NewRefListCollection(db database.Storage) *RefListCollection {
|
|
return &RefListCollection{db: db, cache: make(map[reflistDigestArray]*PackageRefList)}
|
|
}
|
|
|
|
type reflistStorageFormat int
|
|
|
|
const (
|
|
// (legacy format) all the refs are stored inline in a single value
|
|
reflistStorageFormatInline reflistStorageFormat = iota
|
|
// the refs are split into buckets that are stored externally from the value
|
|
reflistStorageFormatSplit
|
|
)
|
|
|
|
// NoPadding is used because all digests are the same length, so the padding
|
|
// is useless and only serves to muddy the output.
|
|
var bucketDigestEncoding = base64.StdEncoding.WithPadding(base64.NoPadding)
|
|
|
|
func segmentPrefix(encodedDigest string) []byte {
|
|
return []byte(fmt.Sprintf("F%s-", encodedDigest))
|
|
}
|
|
|
|
func segmentIndexKey(prefix []byte, idx int) []byte {
|
|
// Assume most buckets won't have more than 0xFFFF = ~65k segments (which
|
|
// would be an extremely large bucket!).
|
|
return append(bytes.Clone(prefix), []byte(fmt.Sprintf("%04x", idx))...)
|
|
}
|
|
|
|
// AllBucketDigests returns a set of all the bucket digests in the database
|
|
func (collection *RefListCollection) AllBucketDigests() (*RefListDigestSet, error) {
|
|
digests := NewRefListDigestSet()
|
|
|
|
err := collection.db.ProcessByPrefix([]byte("F"), func(key []byte, value []byte) error {
|
|
if !bytes.HasSuffix(key, []byte("-0000")) {
|
|
// Ignore additional segments for the same digest.
|
|
return nil
|
|
}
|
|
|
|
encodedDigest, _, foundDash := bytes.Cut(key[1:], []byte("-"))
|
|
if !foundDash {
|
|
return fmt.Errorf("invalid key: %s", string(key))
|
|
}
|
|
digest := make([]byte, bucketDigestEncoding.DecodedLen(len(encodedDigest)))
|
|
if _, err := bucketDigestEncoding.Decode(digest, encodedDigest); err != nil {
|
|
return fmt.Errorf("decoding key %s: %w", string(key), err)
|
|
}
|
|
|
|
digests.Add(digest)
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return digests, nil
|
|
}
|
|
|
|
// UnsafeDropBucket drops the bucket associated with digest from the database,
|
|
// doing so inside batch
|
|
//
|
|
// This is considered "unsafe" because no checks are performed to ensure that
|
|
// the bucket is no longer referenced by any saved reflists.
|
|
func (collection *RefListCollection) UnsafeDropBucket(digest []byte, batch database.Batch) error {
|
|
prefix := segmentPrefix(bucketDigestEncoding.EncodeToString(digest))
|
|
return collection.db.ProcessByPrefix(prefix, func(key []byte, value []byte) error {
|
|
return batch.Delete(key)
|
|
})
|
|
}
|
|
|
|
func (collection *RefListCollection) load(sl *SplitRefList, key []byte) (reflistStorageFormat, error) {
|
|
sl.reset()
|
|
|
|
data, err := collection.db.Get(key)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
var splitOrInlineRefList struct {
|
|
*SplitRefList
|
|
*PackageRefList
|
|
}
|
|
handle := &codec.MsgpackHandle{}
|
|
handle.ZeroCopy = true
|
|
decoder := codec.NewDecoderBytes(data, handle)
|
|
if err := decoder.Decode(&splitOrInlineRefList); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if splitOrInlineRefList.SplitRefList != nil {
|
|
sl.Buckets = splitOrInlineRefList.Buckets
|
|
} else if splitOrInlineRefList.PackageRefList != nil {
|
|
sl.Replace(splitOrInlineRefList.PackageRefList)
|
|
return reflistStorageFormatInline, nil
|
|
}
|
|
|
|
return reflistStorageFormatSplit, nil
|
|
}
|
|
|
|
func (collection *RefListCollection) loadBuckets(sl *SplitRefList) error {
|
|
for idx := range sl.Buckets {
|
|
if sl.bucketRefs[idx] != nil {
|
|
continue
|
|
}
|
|
|
|
var bucket *PackageRefList
|
|
|
|
if digest := sl.Buckets[idx]; len(digest) > 0 {
|
|
cacheKey := reflistDigestArray(digest)
|
|
bucket = collection.cache[cacheKey]
|
|
if bucket == nil {
|
|
bucket = NewPackageRefList()
|
|
prefix := segmentPrefix(bucketDigestEncoding.EncodeToString(digest))
|
|
err := collection.db.ProcessByPrefix(prefix, func(digest []byte, value []byte) error {
|
|
var l PackageRefList
|
|
if err := l.Decode(append([]byte{}, value...)); err != nil {
|
|
return err
|
|
}
|
|
|
|
bucket.Refs = append(bucket.Refs, l.Refs...)
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// The segments may not have been iterated in order, so make sure to re-sort
|
|
// here.
|
|
sort.Sort(bucket)
|
|
collection.cache[cacheKey] = bucket
|
|
}
|
|
|
|
actualDigest := reflistDigest(bucket)
|
|
if !bytes.Equal(actualDigest, digest) {
|
|
return fmt.Errorf("corrupt reflist bucket %d: expected digest %s, got %s",
|
|
idx,
|
|
bucketDigestEncoding.EncodeToString(digest),
|
|
bucketDigestEncoding.EncodeToString(actualDigest))
|
|
}
|
|
}
|
|
|
|
sl.bucketRefs[idx] = bucket
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// LoadComplete loads the reflist stored at the given key, as well as all the
|
|
// buckets referenced by a split reflist
|
|
func (collection *RefListCollection) LoadComplete(sl *SplitRefList, key []byte) error {
|
|
if _, err := collection.load(sl, key); err != nil {
|
|
return err
|
|
}
|
|
|
|
return collection.loadBuckets(sl)
|
|
}
|
|
|
|
// RefListBatch is a wrapper over a database.Batch that tracks already-written
|
|
// reflists to avoid writing them multiple times
|
|
//
|
|
// It is *not* safe to use the same underlying database.Batch that has already
|
|
// been given to UnsafeDropBucket.
|
|
type RefListBatch struct {
|
|
batch database.Batch
|
|
|
|
alreadyWritten *RefListDigestSet
|
|
}
|
|
|
|
// NewBatch creates a new RefListBatch wrapping the given database.Batch
|
|
func (collection *RefListCollection) NewBatch(batch database.Batch) *RefListBatch {
|
|
return &RefListBatch{
|
|
batch: batch,
|
|
alreadyWritten: NewRefListDigestSet(),
|
|
}
|
|
}
|
|
|
|
type reflistUpdateContext struct {
|
|
rb *RefListBatch
|
|
stats *RefListMigrationStats
|
|
}
|
|
|
|
func clearSegmentRefs(reflist *PackageRefList, recommendedMaxKVSize int) {
|
|
avgRefsInSegment := recommendedMaxKVSize / 70
|
|
reflist.Refs = make([][]byte, 0, avgRefsInSegment)
|
|
}
|
|
|
|
func flushSegmentRefs(uctx *reflistUpdateContext, prefix []byte, segment int, reflist *PackageRefList) error {
|
|
encoded := reflist.Encode()
|
|
err := uctx.rb.batch.Put(segmentIndexKey(prefix, segment), encoded)
|
|
if err == nil && uctx.stats != nil {
|
|
uctx.stats.Segments++
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (collection *RefListCollection) updateWithContext(sl *SplitRefList, key []byte, uctx *reflistUpdateContext) error {
|
|
if sl != nil {
|
|
recommendedMaxKVSize := collection.db.GetRecommendedMaxKVSize()
|
|
|
|
for idx, digest := range sl.Buckets {
|
|
if len(digest) == 0 {
|
|
continue
|
|
}
|
|
|
|
if uctx.rb.alreadyWritten.Has(digest) {
|
|
continue
|
|
}
|
|
|
|
prefix := segmentPrefix(bucketDigestEncoding.EncodeToString(digest))
|
|
if collection.db.HasPrefix(prefix) {
|
|
continue
|
|
}
|
|
|
|
// All the sizing information taken from the msgpack spec:
|
|
// https://github.com/msgpack/msgpack/blob/master/spec.md
|
|
|
|
// Assume that a segment will have [16,2^16) elements, which would
|
|
// fit into an array 16 and thus have 3 bytes of overhead.
|
|
// (A database would need a massive recommendedMaxKVSize to pass
|
|
// that limit.)
|
|
size := len(segmentIndexKey(prefix, 0)) + 3
|
|
segment := 0
|
|
|
|
var reflist PackageRefList
|
|
clearSegmentRefs(&reflist, recommendedMaxKVSize)
|
|
for _, ref := range sl.bucketRefs[idx].Refs {
|
|
// In order to determine the size of the ref in the database,
|
|
// we need to know how much overhead will be added with by msgpack
|
|
// encoding.
|
|
requiredSize := len(ref)
|
|
if requiredSize < 1<<5 {
|
|
requiredSize++
|
|
} else if requiredSize < 1<<8 {
|
|
requiredSize += 2
|
|
} else if requiredSize < 1<<16 {
|
|
requiredSize += 3
|
|
} else {
|
|
requiredSize += 4
|
|
}
|
|
if size+requiredSize > recommendedMaxKVSize {
|
|
if err := flushSegmentRefs(uctx, prefix, segment, &reflist); err != nil {
|
|
return err
|
|
}
|
|
clearSegmentRefs(&reflist, recommendedMaxKVSize)
|
|
segment++
|
|
}
|
|
|
|
reflist.Refs = append(reflist.Refs, ref)
|
|
size += requiredSize
|
|
}
|
|
|
|
if len(reflist.Refs) > 0 {
|
|
if err := flushSegmentRefs(uctx, prefix, segment, &reflist); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
uctx.rb.alreadyWritten.Add(digest)
|
|
if uctx.stats != nil {
|
|
uctx.stats.Buckets++
|
|
}
|
|
}
|
|
}
|
|
|
|
var buf bytes.Buffer
|
|
encoder := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
|
|
encoder.Encode(sl)
|
|
err := uctx.rb.batch.Put(key, buf.Bytes())
|
|
if err == nil && uctx.stats != nil {
|
|
uctx.stats.Reflists++
|
|
}
|
|
return err
|
|
}
|
|
|
|
// UpdateInBatch will save or update the SplitRefList at key, as well as save the buckets inside,
|
|
// as part of the given batch
|
|
func (collection *RefListCollection) UpdateInBatch(sl *SplitRefList, key []byte, batch *RefListBatch) error {
|
|
return collection.updateWithContext(sl, key, &reflistUpdateContext{rb: batch})
|
|
}
|
|
|
|
// Update will save or update the SplitRefList at key, as well as save the buckets inside
|
|
func (collection *RefListCollection) Update(sl *SplitRefList, key []byte) error {
|
|
rb := collection.NewBatch(collection.db.CreateBatch())
|
|
err := collection.UpdateInBatch(sl, key, rb)
|
|
if err == nil {
|
|
err = rb.batch.Write()
|
|
}
|
|
return err
|
|
}
|
|
|
|
// RefListMigrationStats counts a number of reflists, buckets, and segments
|
|
type RefListMigrationStats struct {
|
|
Reflists, Buckets, Segments int
|
|
}
|
|
|
|
// RefListMigration wraps a RefListBatch for the purpose of migrating inline format
|
|
// reflists to split reflists
|
|
//
|
|
// Once the batch gets too large, it will automatically be flushed to the database,
|
|
// and a new batch will be created in its place.
|
|
type RefListMigration struct {
|
|
rb *RefListBatch
|
|
|
|
dryRun bool
|
|
|
|
// current number of reflists/buckets/segments queued in the current, unwritten batch
|
|
batchStats RefListMigrationStats
|
|
flushStats RefListMigrationStats
|
|
}
|
|
|
|
// NewMigration creates an empty RefListMigration
|
|
func (collection *RefListCollection) NewMigration() *RefListMigration {
|
|
return &RefListMigration{}
|
|
}
|
|
|
|
// NewMigrationDryRun creates an empty RefListMigration that will track the
|
|
// changes to make as usual but avoid actually writing to the db
|
|
func (collection *RefListCollection) NewMigrationDryRun() *RefListMigration {
|
|
return &RefListMigration{dryRun: true}
|
|
}
|
|
|
|
// Stats returns statistics on the written values in the current migration
|
|
func (migration *RefListMigration) Stats() RefListMigrationStats {
|
|
return migration.flushStats
|
|
}
|
|
|
|
// Flush will flush the current batch in the migration to the database
|
|
func (migration *RefListMigration) Flush() error {
|
|
if migration.batchStats.Segments > 0 {
|
|
if !migration.dryRun {
|
|
if err := migration.rb.batch.Write(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// It's important that we don't clear the batch on dry runs, because
|
|
// the batch is what contains the list of already-written buckets.
|
|
// If we're not writing to the database, and we clear that list,
|
|
// duplicate "writes" will occur.
|
|
migration.rb = nil
|
|
}
|
|
|
|
migration.flushStats.Reflists += migration.batchStats.Reflists
|
|
migration.flushStats.Buckets += migration.batchStats.Buckets
|
|
migration.flushStats.Segments += migration.batchStats.Segments
|
|
migration.batchStats = RefListMigrationStats{}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// LoadCompleteAndMigrate will load the reflist and its buckets as RefListCollection.LoadComplete,
|
|
// migrating any inline reflists to split ones along the way
|
|
func (collection *RefListCollection) LoadCompleteAndMigrate(sl *SplitRefList, key []byte, migration *RefListMigration) error {
|
|
// Given enough reflists, the memory used by a batch starts to become massive, so
|
|
// make sure to flush the written segments periodically. Note that this is only
|
|
// checked *after* a migration of a full bucket (and all the segments inside)
|
|
// takes place, as splitting a single bucket write into multiple batches would
|
|
// be unsafe if an interruption occurs midway.
|
|
const maxMigratorBatch = 50000
|
|
|
|
format, err := collection.load(sl, key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
switch format {
|
|
case reflistStorageFormatInline:
|
|
if migration.rb == nil {
|
|
migration.rb = collection.NewBatch(collection.db.CreateBatch())
|
|
}
|
|
|
|
collection.updateWithContext(sl, key, &reflistUpdateContext{
|
|
rb: migration.rb,
|
|
stats: &migration.batchStats,
|
|
})
|
|
|
|
if migration.batchStats.Segments > maxMigratorBatch {
|
|
if err := migration.Flush(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
case reflistStorageFormatSplit:
|
|
return collection.loadBuckets(sl)
|
|
default:
|
|
panic(fmt.Sprintf("unexpected format %v", format))
|
|
}
|
|
}
|
|
|
|
// AnyRefList is implemented by both PackageRefList and SplitRefList
|
|
type AnyRefList interface {
|
|
Has(p *Package) bool
|
|
Len() int
|
|
ForEach(handler func([]byte) error) error
|
|
FilterLatestRefs()
|
|
}
|
|
|
|
// Check interface
|
|
var (
|
|
_ AnyRefList = (*PackageRefList)(nil)
|
|
_ AnyRefList = (*SplitRefList)(nil)
|
|
)
|