Rework mirror update (download packages) implementation

`PackageDownloadTask` is just a reference to file now. Whole process
was rewritten to follow pattern: download to temp location inside the pool,
verify/update checksums, import into pool as final step.

This removes a lot of edge cases when aptly internal state might be broken
if updating from rogue mirror.

Also this changes whole memory model: package list/files are kept in memory
now during the duration of `mirror update` command and saved to disk
only in the end.
This commit is contained in:
Andrey Smirnov
2017-04-10 23:40:01 +03:00
parent 72d233b587
commit bc7903f86e
6 changed files with 162 additions and 93 deletions
+89 -22
View File
@@ -5,7 +5,9 @@ import (
"os"
"os/signal"
"strings"
"sync"
"github.com/smira/aptly/aptly"
"github.com/smira/aptly/deb"
"github.com/smira/aptly/query"
"github.com/smira/aptly/utils"
@@ -112,6 +114,14 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
// Catch ^C
sigch := make(chan os.Signal)
signal.Notify(sigch, os.Interrupt)
defer signal.Stop(sigch)
abort := make(chan struct{})
go func() {
<-sigch
signal.Stop(sigch)
close(abort)
}()
count := len(queue)
context.Progress().Printf("Download queue: %d items (%s)\n", count, utils.HumanBytes(downloadSize))
@@ -119,37 +129,94 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
// Download from the queue
context.Progress().InitBar(downloadSize, true)
// Download all package files
ch := make(chan error, count)
downloadQueue := make(chan deb.PackageDownloadTask)
// In separate goroutine (to avoid blocking main), push queue to downloader
/*go func() {
var (
errors []string
errLock sync.Mutex
)
pushError := func(err error) {
errLock.Lock()
errors = append(errors, err.Error())
errLock.Unlock()
}
go func() {
for _, task := range queue {
context.Downloader().DownloadWithChecksum(repo.PackageURL(task.RepoURI).String(), task.DestinationPath, ch, task.Checksums, ignoreMismatch, maxTries)
select {
case downloadQueue <- task:
case <-abort:
return
}
}
// We don't need queue after this point
queue = nil
}()*/
close(downloadQueue)
}()
var wg sync.WaitGroup
for i := 0; i < context.Config().DownloadConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case task, ok := <-downloadQueue:
if !ok {
return
}
var e error
// provision download location
tempDownPath, e := context.PackagePool().(aptly.LocalPackagePool).GenerateTempPath(task.File.Filename)
if e != nil {
pushError(e)
continue
}
// download file...
e = context.Downloader().DownloadWithChecksum(
repo.PackageURL(task.File.DownloadURL()).String(),
tempDownPath,
&task.File.Checksums,
ignoreMismatch,
maxTries)
if e != nil {
pushError(e)
continue
}
// and import it back to the pool
task.File.PoolPath, e = context.PackagePool().Import(tempDownPath, task.File.Filename, &task.File.Checksums, true)
if e != nil {
pushError(e)
continue
}
// update "attached" files if any
for _, additionalTask := range task.Additional {
additionalTask.File.PoolPath = task.File.PoolPath
additionalTask.File.Checksums = task.File.Checksums
}
case <-abort:
return
}
}
}()
}
// Wait for all downloads to finish
var errors []string
wg.Wait()
for count > 0 {
select {
case <-sigch:
signal.Stop(sigch)
return fmt.Errorf("unable to update: interrupted")
case err = <-ch:
if err != nil {
errors = append(errors, err.Error())
}
count--
}
select {
case <-abort:
return fmt.Errorf("unable to update: interrupted")
default:
}
context.Progress().ShutdownBar()
signal.Stop(sigch)
if len(errors) > 0 {
return fmt.Errorf("unable to update: download errors:\n %s", strings.Join(errors, "\n "))
@@ -160,7 +227,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
return fmt.Errorf("unable to update: %s", err)
}
repo.FinalizeDownload()
repo.FinalizeDownload(context.CollectionFactory())
err = context.CollectionFactory().RemoteRepoCollection().Update(repo)
if err != nil {
return fmt.Errorf("unable to update: %s", err)