Merge pull request #683 from smira/545-download-contxt

Use Go context to abort gracefully mirror updates
This commit is contained in:
Andrey Smirnov
2017-12-01 00:27:26 +03:00
committed by GitHub
12 changed files with 123 additions and 80 deletions

View File

@@ -2,8 +2,6 @@ package cmd
import (
"fmt"
"os"
"os/signal"
"strings"
"sync"
@@ -113,17 +111,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
return fmt.Errorf("unable to update: %s", err)
}
// Catch ^C
sigch := make(chan os.Signal)
signal.Notify(sigch, os.Interrupt)
defer signal.Stop(sigch)
abort := make(chan struct{})
go func() {
<-sigch
signal.Stop(sigch)
close(abort)
}()
context.GoContextHandleSignals()
count := len(queue)
context.Progress().Printf("Download queue: %d items (%s)\n", count, utils.HumanBytes(downloadSize))
@@ -148,7 +136,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
for idx := range queue {
select {
case downloadQueue <- idx:
case <-abort:
case <-context.Done():
return
}
}
@@ -181,6 +169,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
// download file...
e = context.Downloader().DownloadWithChecksum(
context,
repo.PackageURL(task.File.DownloadURL()).String(),
task.TempDownPath,
&task.File.Checksums,
@@ -190,28 +179,20 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
pushError(e)
continue
}
case <-abort:
task.Done = true
case <-context.Done():
return
}
}
}()
}
// Wait for all downloads to finish
// Wait for all download goroutines to finish
wg.Wait()
select {
case <-abort:
return fmt.Errorf("unable to update: interrupted")
default:
}
context.Progress().ShutdownBar()
if len(errors) > 0 {
return fmt.Errorf("unable to update: download errors:\n %s", strings.Join(errors, "\n "))
}
err = context.ReOpenDatabase()
if err != nil {
return fmt.Errorf("unable to update: %s", err)
@@ -221,11 +202,15 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
context.Progress().InitBar(int64(len(queue)), false)
for idx := range queue {
context.Progress().AddBar(1)
task := &queue[idx]
if !task.Done {
// download not finished yet
continue
}
// and import it back to the pool
task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, context.CollectionFactory().ChecksumCollection())
if err != nil {
@@ -237,16 +222,20 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
additionalTask.File.PoolPath = task.File.PoolPath
additionalTask.File.Checksums = task.File.Checksums
}
select {
case <-abort:
return fmt.Errorf("unable to update: interrupted")
default:
}
}
context.Progress().ShutdownBar()
select {
case <-context.Done():
return fmt.Errorf("unable to update: interrupted")
default:
}
if len(errors) > 0 {
return fmt.Errorf("unable to update: download errors:\n %s", strings.Join(errors, "\n "))
}
repo.FinalizeDownload(context.CollectionFactory(), context.Progress())
err = context.CollectionFactory().RemoteRepoCollection().Update(repo)
if err != nil {