Refactor mirror download code, split it into separate methods. #45 #114

This commit is contained in:
Andrey Smirnov
2014-10-02 19:30:37 +04:00
parent d45b456334
commit a0870f6726
4 changed files with 149 additions and 112 deletions

View File

@@ -4,8 +4,11 @@ import (
"fmt"
"github.com/smira/aptly/deb"
"github.com/smira/aptly/query"
"github.com/smira/aptly/utils"
"github.com/smira/commander"
"github.com/smira/flag"
"strings"
"time"
)
func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
@@ -39,21 +42,75 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
return fmt.Errorf("unable to update: %s", err)
}
var filterQuery deb.PackageQuery
context.Progress().Printf("Downloading & parsing package files...\n")
err = repo.DownloadPackageIndexes(context.Progress(), context.Downloader(), context.CollectionFactory(), ignoreMismatch)
if err != nil {
return fmt.Errorf("unable to update: %s", err)
}
if repo.Filter != "" {
context.Progress().Printf("Applying filter...\n")
var filterQuery deb.PackageQuery
filterQuery, err = query.Parse(repo.Filter)
if err != nil {
return fmt.Errorf("unable to update: %s", err)
}
var oldLen, newLen int
oldLen, newLen, err = repo.ApplyFilter(context.DependencyOptions(), filterQuery)
if err != nil {
return fmt.Errorf("unable to update: %s", err)
}
context.Progress().Printf("Packages filtered: %d -> %d.\n", oldLen, newLen)
}
err = repo.Download(context.Progress(), context.Downloader(), context.CollectionFactory(), context.PackagePool(), ignoreMismatch,
context.DependencyOptions(), filterQuery)
var (
downloadSize int64
queue []deb.PackageDownloadTask
)
context.Progress().Printf("Building download queue...\n")
queue, downloadSize, err = repo.BuildDownloadQueue(context.PackagePool())
if err != nil {
return fmt.Errorf("unable to update: %s", err)
}
count := len(queue)
context.Progress().Printf("Download queue: %d items (%s)\n", count, utils.HumanBytes(downloadSize))
// Download from the queue
context.Progress().InitBar(downloadSize, true)
// Download all package files
ch := make(chan error, count)
for _, task := range queue {
context.Downloader().DownloadWithChecksum(repo.PackageURL(task.RepoURI).String(), task.DestinationPath, ch, task.Checksums, ignoreMismatch)
}
// We don't need queued after this point
queue = nil
// Wait for all downloads to finish
errors := make([]string, 0)
for count > 0 {
err = <-ch
if err != nil {
errors = append(errors, err.Error())
}
count--
}
context.Progress().ShutdownBar()
if len(errors) > 0 {
return fmt.Errorf("unable to update: download errors:\n %s\n", strings.Join(errors, "\n "))
}
repo.LastDownloadDate = time.Now()
err = context.CollectionFactory().RemoteRepoCollection().Update(repo)
if err != nil {
return fmt.Errorf("unable to update: %s", err)

View File

@@ -30,6 +30,7 @@ type Storage interface {
}
type levelDB struct {
path string
db *leveldb.DB
batch *leveldb.Batch
}
@@ -49,7 +50,7 @@ func OpenDB(path string) (Storage, error) {
if err != nil {
return nil, err
}
return &levelDB{db: db}, nil
return &levelDB{db: db, path: path}, nil
}
// RecoverDB recovers LevelDB database from corruption
@@ -147,7 +148,9 @@ func (l *levelDB) FetchByPrefix(prefix []byte) [][]byte {
// Close finishes DB work
func (l *levelDB) Close() error {
return l.db.Close()
err := l.db.Close()
l.db = nil
return err
}
// StartBatch starts batch processing of keys

View File

@@ -53,6 +53,8 @@ type RemoteRepo struct {
packageRefs *PackageRefList
// Parsed archived root
archiveRootURL *url.URL
// Current list of packages (filled while updating mirror)
packageList *PackageList
}
// NewRemoteRepo creates new instance of Debian remote repository with specified params
@@ -339,12 +341,13 @@ ok:
return nil
}
// Download downloads all repo files
func (repo *RemoteRepo) Download(progress aptly.Progress, d aptly.Downloader, collectionFactory *CollectionFactory,
packagePool aptly.PackagePool, ignoreMismatch bool, dependencyOptions int, filterQuery PackageQuery) error {
list := NewPackageList()
progress.Printf("Downloading & parsing package files...\n")
// DownloadPackageIndexes downloads & parses package index files
func (repo *RemoteRepo) DownloadPackageIndexes(progress aptly.Progress, d aptly.Downloader, collectionFactory *CollectionFactory,
ignoreMismatch bool) error {
if repo.packageList != nil {
panic("packageList != nil")
}
repo.packageList = NewPackageList()
// Download and parse all Packages & Source files
packagesURLs := [][]string{}
@@ -405,7 +408,7 @@ func (repo *RemoteRepo) Download(progress aptly.Progress, d aptly.Downloader, co
return err
}
}
err = list.Add(p)
err = repo.packageList.Add(p)
if err != nil {
return err
}
@@ -419,33 +422,30 @@ func (repo *RemoteRepo) Download(progress aptly.Progress, d aptly.Downloader, co
progress.ShutdownBar()
}
var err error
return nil
}
if repo.Filter != "" {
progress.Printf("Applying filter...\n")
// ApplyFilter applies filtering to already built PackageList
func (repo *RemoteRepo) ApplyFilter(dependencyOptions int, filterQuery PackageQuery) (oldLen, newLen int, err error) {
repo.packageList.PrepareIndex()
list.PrepareIndex()
emptyList := NewPackageList()
emptyList.PrepareIndex()
emptyList := NewPackageList()
emptyList.PrepareIndex()
origPackages := list.Len()
list, err = list.Filter([]PackageQuery{filterQuery}, repo.FilterWithDeps, emptyList, dependencyOptions, repo.Architectures)
if err != nil {
return err
}
progress.Printf("Packages filtered: %d -> %d.\n", origPackages, list.Len())
oldLen = repo.packageList.Len()
repo.packageList, err = repo.packageList.Filter([]PackageQuery{filterQuery}, repo.FilterWithDeps, emptyList, dependencyOptions, repo.Architectures)
if repo.packageList != nil {
newLen = repo.packageList.Len()
}
return
}
progress.Printf("Building download queue...\n")
// BuildDownloadQueue builds queue, discards current PackageList
func (repo *RemoteRepo) BuildDownloadQueue(packagePool aptly.PackagePool) (queue []PackageDownloadTask, downloadSize int64, err error) {
queue = make([]PackageDownloadTask, 0, repo.packageList.Len())
seen := make(map[string]struct{}, repo.packageList.Len())
// Build download queue
queued := make(map[string]PackageDownloadTask, list.Len())
count := 0
downloadSize := int64(0)
err = list.ForEach(func(p *Package) error {
err = repo.packageList.ForEach(func(p *Package) error {
list, err2 := p.DownloadList(packagePool)
if err2 != nil {
return err2
@@ -454,58 +454,25 @@ func (repo *RemoteRepo) Download(progress aptly.Progress, d aptly.Downloader, co
for _, task := range list {
key := task.RepoURI + "-" + task.DestinationPath
_, found := queued[key]
_, found := seen[key]
if !found {
count++
queue = append(queue, task)
downloadSize += task.Checksums.Size
queued[key] = task
seen[key] = struct{}{}
}
}
return nil
})
if err != nil {
return fmt.Errorf("unable to build download queue: %s", err)
return
}
repo.packageRefs = NewPackageRefListFromPackageList(list)
repo.packageRefs = NewPackageRefListFromPackageList(repo.packageList)
// free up package list, we don't need it after this point
list = nil
repo.packageList = nil
progress.Printf("Download queue: %d items (%s)\n", count, utils.HumanBytes(downloadSize))
progress.InitBar(downloadSize, true)
// Download all package files
ch := make(chan error, len(queued))
for _, task := range queued {
d.DownloadWithChecksum(repo.PackageURL(task.RepoURI).String(), task.DestinationPath, ch, task.Checksums, ignoreMismatch)
}
// We don't need queued after this point
queued = nil
// Wait for all downloads to finish
errors := make([]string, 0)
for count > 0 {
err = <-ch
if err != nil {
errors = append(errors, err.Error())
}
count--
}
progress.ShutdownBar()
if len(errors) > 0 {
return fmt.Errorf("download errors:\n %s\n", strings.Join(errors, "\n "))
}
repo.LastDownloadDate = time.Now()
return nil
return
}
// Encode does msgpack encoding of RemoteRepo

View File

@@ -12,6 +12,7 @@ import (
"io/ioutil"
. "launchpad.net/gocheck"
"os"
"sort"
)
type NullVerifier struct {
@@ -254,20 +255,20 @@ func (s *RemoteRepoSuite) TestDownload(c *C) {
s.downloader.ExpectError("http://mirror.yandex.ru/debian/dists/squeeze/main/binary-i386/Packages.bz2", errors.New("HTTP 404"))
s.downloader.ExpectError("http://mirror.yandex.ru/debian/dists/squeeze/main/binary-i386/Packages.gz", errors.New("HTTP 404"))
s.downloader.ExpectResponse("http://mirror.yandex.ru/debian/dists/squeeze/main/binary-i386/Packages", examplePackagesFile)
s.downloader.ExpectResponse("http://mirror.yandex.ru/debian/pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb", "xyz")
err = s.repo.Download(s.progress, s.downloader, s.collectionFactory, s.packagePool, false, 0, nil)
err = s.repo.DownloadPackageIndexes(s.progress, s.downloader, s.collectionFactory, false)
c.Assert(err, IsNil)
c.Assert(s.downloader.Empty(), Equals, true)
queue, size, err := s.repo.BuildDownloadQueue(s.packagePool)
c.Assert(s.repo.packageRefs, NotNil)
c.Check(size, Equals, int64(3))
c.Check(queue, HasLen, 1)
c.Check(queue[0].RepoURI, Equals, "pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb")
pkg, err := s.collectionFactory.PackageCollection().ByKey(s.repo.packageRefs.Refs[0])
c.Assert(err, IsNil)
result, err := pkg.VerifyFiles(s.packagePool)
c.Check(result, Equals, true)
c.Check(err, IsNil)
c.Check(pkg.Name, Equals, "amanda-client")
}
@@ -284,32 +285,34 @@ func (s *RemoteRepoSuite) TestDownloadWithSources(c *C) {
s.downloader.ExpectError("http://mirror.yandex.ru/debian/dists/squeeze/main/source/Sources.bz2", errors.New("HTTP 404"))
s.downloader.ExpectError("http://mirror.yandex.ru/debian/dists/squeeze/main/source/Sources.gz", errors.New("HTTP 404"))
s.downloader.ExpectResponse("http://mirror.yandex.ru/debian/dists/squeeze/main/source/Sources", exampleSourcesFile)
s.downloader.AnyExpectResponse("http://mirror.yandex.ru/debian/pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb", "xyz")
s.downloader.AnyExpectResponse("http://mirror.yandex.ru/debian/pool/main/a/access-modifier-checker/access-modifier-checker_1.0-4.dsc", "abc")
s.downloader.AnyExpectResponse("http://mirror.yandex.ru/debian/pool/main/a/access-modifier-checker/access-modifier-checker_1.0.orig.tar.gz", "abcd")
s.downloader.AnyExpectResponse("http://mirror.yandex.ru/debian/pool/main/a/access-modifier-checker/access-modifier-checker_1.0-4.debian.tar.gz", "abcde")
err = s.repo.Download(s.progress, s.downloader, s.collectionFactory, s.packagePool, false, 0, nil)
err = s.repo.DownloadPackageIndexes(s.progress, s.downloader, s.collectionFactory, false)
c.Assert(err, IsNil)
c.Assert(s.downloader.Empty(), Equals, true)
queue, size, err := s.repo.BuildDownloadQueue(s.packagePool)
c.Check(size, Equals, int64(15))
c.Check(queue, HasLen, 4)
q := make([]string, 4)
for i := range q {
q[i] = queue[i].RepoURI
}
sort.Strings(q)
c.Check(q[3], Equals, "pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb")
c.Check(q[1], Equals, "pool/main/a/access-modifier-checker/access-modifier-checker_1.0-4.dsc")
c.Check(q[2], Equals, "pool/main/a/access-modifier-checker/access-modifier-checker_1.0.orig.tar.gz")
c.Check(q[0], Equals, "pool/main/a/access-modifier-checker/access-modifier-checker_1.0-4.debian.tar.gz")
c.Assert(s.repo.packageRefs, NotNil)
pkg, err := s.collectionFactory.PackageCollection().ByKey(s.repo.packageRefs.Refs[0])
c.Assert(err, IsNil)
result, err := pkg.VerifyFiles(s.packagePool)
c.Check(result, Equals, true)
c.Check(err, IsNil)
c.Check(pkg.Name, Equals, "amanda-client")
pkg, err = s.collectionFactory.PackageCollection().ByKey(s.repo.packageRefs.Refs[1])
c.Assert(err, IsNil)
result, err = pkg.VerifyFiles(s.packagePool)
c.Check(result, Equals, true)
c.Check(err, IsNil)
c.Check(pkg.Name, Equals, "access-modifier-checker")
}
@@ -319,23 +322,23 @@ func (s *RemoteRepoSuite) TestDownloadFlat(c *C) {
downloader.ExpectError("http://repos.express42.com/virool/precise/Packages.bz2", errors.New("HTTP 404"))
downloader.ExpectError("http://repos.express42.com/virool/precise/Packages.gz", errors.New("HTTP 404"))
downloader.ExpectResponse("http://repos.express42.com/virool/precise/Packages", examplePackagesFile)
downloader.ExpectResponse("http://repos.express42.com/virool/precise/pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb", "xyz")
err := s.flat.Fetch(downloader, nil)
c.Assert(err, IsNil)
err = s.flat.Download(s.progress, downloader, s.collectionFactory, s.packagePool, false, 0, nil)
err = s.flat.DownloadPackageIndexes(s.progress, downloader, s.collectionFactory, false)
c.Assert(err, IsNil)
c.Assert(downloader.Empty(), Equals, true)
queue, size, err := s.flat.BuildDownloadQueue(s.packagePool)
c.Check(size, Equals, int64(3))
c.Check(queue, HasLen, 1)
c.Check(queue[0].RepoURI, Equals, "pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb")
c.Assert(s.flat.packageRefs, NotNil)
pkg, err := s.collectionFactory.PackageCollection().ByKey(s.flat.packageRefs.Refs[0])
c.Assert(err, IsNil)
result, err := pkg.VerifyFiles(s.packagePool)
c.Check(result, Equals, true)
c.Check(err, IsNil)
c.Check(pkg.Name, Equals, "amanda-client")
}
@@ -350,35 +353,42 @@ func (s *RemoteRepoSuite) TestDownloadWithSourcesFlat(c *C) {
downloader.ExpectError("http://repos.express42.com/virool/precise/Sources.bz2", errors.New("HTTP 404"))
downloader.ExpectError("http://repos.express42.com/virool/precise/Sources.gz", errors.New("HTTP 404"))
downloader.ExpectResponse("http://repos.express42.com/virool/precise/Sources", exampleSourcesFile)
downloader.AnyExpectResponse("http://repos.express42.com/virool/precise/pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb", "xyz")
downloader.AnyExpectResponse("http://repos.express42.com/virool/precise/pool/main/a/access-modifier-checker/access-modifier-checker_1.0-4.dsc", "abc")
downloader.AnyExpectResponse("http://repos.express42.com/virool/precise/pool/main/a/access-modifier-checker/access-modifier-checker_1.0.orig.tar.gz", "abcd")
downloader.AnyExpectResponse("http://repos.express42.com/virool/precise/pool/main/a/access-modifier-checker/access-modifier-checker_1.0-4.debian.tar.gz", "abcde")
// downloader.AnyExpectResponse("http://repos.express42.com/virool/precise/pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb", "xyz")
// downloader.AnyExpectResponse("http://repos.express42.com/virool/precise/pool/main/a/access-modifier-checker/access-modifier-checker_1.0-4.dsc", "abc")
// downloader.AnyExpectResponse("http://repos.express42.com/virool/precise/pool/main/a/access-modifier-checker/access-modifier-checker_1.0.orig.tar.gz", "abcd")
// downloader.AnyExpectResponse("http://repos.express42.com/virool/precise/pool/main/a/access-modifier-checker/access-modifier-checker_1.0-4.debian.tar.gz", "abcde")
err := s.flat.Fetch(downloader, nil)
c.Assert(err, IsNil)
err = s.flat.Download(s.progress, downloader, s.collectionFactory, s.packagePool, false, 0, nil)
err = s.flat.DownloadPackageIndexes(s.progress, downloader, s.collectionFactory, false)
c.Assert(err, IsNil)
c.Assert(downloader.Empty(), Equals, true)
queue, size, err := s.flat.BuildDownloadQueue(s.packagePool)
c.Check(size, Equals, int64(15))
c.Check(queue, HasLen, 4)
q := make([]string, 4)
for i := range q {
q[i] = queue[i].RepoURI
}
sort.Strings(q)
c.Check(q[3], Equals, "pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb")
c.Check(q[1], Equals, "pool/main/a/access-modifier-checker/access-modifier-checker_1.0-4.dsc")
c.Check(q[2], Equals, "pool/main/a/access-modifier-checker/access-modifier-checker_1.0.orig.tar.gz")
c.Check(q[0], Equals, "pool/main/a/access-modifier-checker/access-modifier-checker_1.0-4.debian.tar.gz")
c.Assert(s.flat.packageRefs, NotNil)
pkg, err := s.collectionFactory.PackageCollection().ByKey(s.flat.packageRefs.Refs[0])
c.Assert(err, IsNil)
result, err := pkg.VerifyFiles(s.packagePool)
c.Check(result, Equals, true)
c.Check(err, IsNil)
c.Check(pkg.Name, Equals, "amanda-client")
pkg, err = s.collectionFactory.PackageCollection().ByKey(s.flat.packageRefs.Refs[1])
c.Assert(err, IsNil)
result, err = pkg.VerifyFiles(s.packagePool)
c.Check(result, Equals, true)
c.Check(err, IsNil)
c.Check(pkg.Name, Equals, "access-modifier-checker")
}