Rework mirror update to support closing/reoping DB for the download duration

This requires splitting up import file phase as separate step in then end,
it should be pretty fast, as it only does file move (hardlink) and
DB update for new checksums.
This commit is contained in:
Andrey Smirnov
2017-04-26 00:14:13 +03:00
parent 7dcc0d597d
commit 01512df853
2 changed files with 49 additions and 30 deletions
+46 -28
View File
@@ -108,10 +108,10 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
return fmt.Errorf("unable to update: %s", err)
}
// err = context.CloseDatabase()
// if err != nil {
// return fmt.Errorf("unable to update: %s", err)
// }
err = context.CloseDatabase()
if err != nil {
return fmt.Errorf("unable to update: %s", err)
}
// Catch ^C
sigch := make(chan os.Signal)
@@ -131,7 +131,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
// Download from the queue
context.Progress().InitBar(downloadSize, true)
downloadQueue := make(chan deb.PackageDownloadTask)
downloadQueue := make(chan int)
var (
errors []string
@@ -145,14 +145,13 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
}
go func() {
for _, task := range queue {
for idx := range queue {
select {
case downloadQueue <- task:
case downloadQueue <- idx:
case <-abort:
return
}
}
queue = nil
close(downloadQueue)
}()
@@ -164,15 +163,17 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
defer wg.Done()
for {
select {
case task, ok := <-downloadQueue:
case idx, ok := <-downloadQueue:
if !ok {
return
}
task := &queue[idx]
var e error
// provision download location
tempDownPath, e := context.PackagePool().(aptly.LocalPackagePool).GenerateTempPath(task.File.Filename)
task.TempDownPath, e = context.PackagePool().(aptly.LocalPackagePool).GenerateTempPath(task.File.Filename)
if e != nil {
pushError(e)
continue
@@ -181,7 +182,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
// download file...
e = context.Downloader().DownloadWithChecksum(
repo.PackageURL(task.File.DownloadURL()).String(),
tempDownPath,
task.TempDownPath,
&task.File.Checksums,
ignoreMismatch,
maxTries)
@@ -189,19 +190,6 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
pushError(e)
continue
}
// and import it back to the pool
task.File.PoolPath, e = context.PackagePool().Import(tempDownPath, task.File.Filename, &task.File.Checksums, true, context.CollectionFactory().ChecksumCollection())
if e != nil {
pushError(e)
continue
}
// update "attached" files if any
for _, additionalTask := range task.Additional {
additionalTask.File.PoolPath = task.File.PoolPath
additionalTask.File.Checksums = task.File.Checksums
}
case <-abort:
return
}
@@ -224,10 +212,40 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error {
return fmt.Errorf("unable to update: download errors:\n %s", strings.Join(errors, "\n "))
}
// err = context.ReOpenDatabase()
// if err != nil {
// return fmt.Errorf("unable to update: %s", err)
// }
err = context.ReOpenDatabase()
if err != nil {
return fmt.Errorf("unable to update: %s", err)
}
// Import downloaded files
context.Progress().InitBar(int64(len(queue)), false)
for idx := range queue {
context.Progress().AddBar(1)
task := &queue[idx]
// and import it back to the pool
task.File.PoolPath, err = context.PackagePool().Import(task.TempDownPath, task.File.Filename, &task.File.Checksums, true, context.CollectionFactory().ChecksumCollection())
if err != nil {
return fmt.Errorf("unable to import file: %s", err)
}
// update "attached" files if any
for _, additionalTask := range task.Additional {
additionalTask.File.PoolPath = task.File.PoolPath
additionalTask.File.Checksums = task.File.Checksums
}
select {
case <-abort:
return fmt.Errorf("unable to update: interrupted")
default:
}
}
context.Progress().ShutdownBar()
repo.FinalizeDownload(context.CollectionFactory(), context.Progress())
err = context.CollectionFactory().RemoteRepoCollection().Update(repo)