diff --git a/cmd/mirror_update.go b/cmd/mirror_update.go index b176b1f1..47189060 100644 --- a/cmd/mirror_update.go +++ b/cmd/mirror_update.go @@ -5,7 +5,9 @@ import ( "os" "os/signal" "strings" + "sync" + "github.com/smira/aptly/aptly" "github.com/smira/aptly/deb" "github.com/smira/aptly/query" "github.com/smira/aptly/utils" @@ -112,6 +114,14 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error { // Catch ^C sigch := make(chan os.Signal) signal.Notify(sigch, os.Interrupt) + defer signal.Stop(sigch) + + abort := make(chan struct{}) + go func() { + <-sigch + signal.Stop(sigch) + close(abort) + }() count := len(queue) context.Progress().Printf("Download queue: %d items (%s)\n", count, utils.HumanBytes(downloadSize)) @@ -119,37 +129,94 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error { // Download from the queue context.Progress().InitBar(downloadSize, true) - // Download all package files - ch := make(chan error, count) + downloadQueue := make(chan deb.PackageDownloadTask) - // In separate goroutine (to avoid blocking main), push queue to downloader - /*go func() { + var ( + errors []string + errLock sync.Mutex + ) + + pushError := func(err error) { + errLock.Lock() + errors = append(errors, err.Error()) + errLock.Unlock() + } + + go func() { for _, task := range queue { - context.Downloader().DownloadWithChecksum(repo.PackageURL(task.RepoURI).String(), task.DestinationPath, ch, task.Checksums, ignoreMismatch, maxTries) + select { + case downloadQueue <- task: + case <-abort: + return + } } - - // We don't need queue after this point queue = nil - }()*/ + close(downloadQueue) + }() + + var wg sync.WaitGroup + + for i := 0; i < context.Config().DownloadConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case task, ok := <-downloadQueue: + if !ok { + return + } + + var e error + + // provision download location + tempDownPath, e := context.PackagePool().(aptly.LocalPackagePool).GenerateTempPath(task.File.Filename) + if e != nil { + pushError(e) + continue + } + + // download file... + e = context.Downloader().DownloadWithChecksum( + repo.PackageURL(task.File.DownloadURL()).String(), + tempDownPath, + &task.File.Checksums, + ignoreMismatch, + maxTries) + if e != nil { + pushError(e) + continue + } + + // and import it back to the pool + task.File.PoolPath, e = context.PackagePool().Import(tempDownPath, task.File.Filename, &task.File.Checksums, true) + if e != nil { + pushError(e) + continue + } + + // update "attached" files if any + for _, additionalTask := range task.Additional { + additionalTask.File.PoolPath = task.File.PoolPath + additionalTask.File.Checksums = task.File.Checksums + } + case <-abort: + return + } + } + }() + } // Wait for all downloads to finish - var errors []string + wg.Wait() - for count > 0 { - select { - case <-sigch: - signal.Stop(sigch) - return fmt.Errorf("unable to update: interrupted") - case err = <-ch: - if err != nil { - errors = append(errors, err.Error()) - } - count-- - } + select { + case <-abort: + return fmt.Errorf("unable to update: interrupted") + default: } context.Progress().ShutdownBar() - signal.Stop(sigch) if len(errors) > 0 { return fmt.Errorf("unable to update: download errors:\n %s", strings.Join(errors, "\n ")) @@ -160,7 +227,7 @@ func aptlyMirrorUpdate(cmd *commander.Command, args []string) error { return fmt.Errorf("unable to update: %s", err) } - repo.FinalizeDownload() + repo.FinalizeDownload(context.CollectionFactory()) err = context.CollectionFactory().RemoteRepoCollection().Update(repo) if err != nil { return fmt.Errorf("unable to update: %s", err) diff --git a/deb/package.go b/deb/package.go index d2463df7..2db88e7d 100644 --- a/deb/package.go +++ b/deb/package.go @@ -605,29 +605,27 @@ func (p *Package) PoolDirectory() (string, error) { // PackageDownloadTask is a element of download queue for the package type PackageDownloadTask struct { - RepoURI string - DestinationPath string - Checksums utils.ChecksumInfo + File *PackageFile + Additional []PackageDownloadTask } // DownloadList returns list of missing package files for download in format // [[srcpath, dstpath]] func (p *Package) DownloadList(packagePool aptly.PackagePool) (result []PackageDownloadTask, err error) { - /*result = make([]PackageDownloadTask, 0, 1) + result = make([]PackageDownloadTask, 0, 1) - for _, f := range p.Files() { + for idx, f := range p.Files() { verified, err := f.Verify(packagePool) if err != nil { return nil, err } if !verified { - result = append(result, PackageDownloadTask{RepoURI: f.DownloadURL(), DestinationPath: poolPath, Checksums: f.Checksums}) + result = append(result, PackageDownloadTask{File: &p.Files()[idx]}) } } - return result, nil*/ - panic("NEEDS REWORK") + return result, nil } // VerifyFiles verifies that all package files have neen correctly downloaded diff --git a/deb/package_test.go b/deb/package_test.go index 043b33bb..47a7bd89 100644 --- a/deb/package_test.go +++ b/deb/package_test.go @@ -392,35 +392,25 @@ func (s *PackageSuite) TestFilepathList(c *C) { } func (s *PackageSuite) TestDownloadList(c *C) { - c.Fail() - /* - packagePool := files.NewPackagePool(c.MkDir()) - p := NewPackageFromControlFile(s.stanza) - p.Files()[0].Checksums.Size = 5 - poolPath, _ := packagePool.Path(p.Files()[0].Filename, p.Files()[0].Checksums) + packagePool := files.NewPackagePool(c.MkDir()) + p := NewPackageFromControlFile(s.stanza) + p.Files()[0].Checksums.Size = 5 - list, err := p.DownloadList(packagePool) - c.Check(err, IsNil) - c.Check(list, DeepEquals, []PackageDownloadTask{ - { - RepoURI: "pool/contrib/a/alien-arena/alien-arena-common_7.40-2_i386.deb", - DestinationPath: poolPath, - Checksums: utils.ChecksumInfo{Size: 5, - MD5: "1e8cba92c41420aa7baa8a5718d67122", - SHA1: "46955e48cad27410a83740a21d766ce362364024", - SHA256: "eb4afb9885cba6dc70cccd05b910b2dbccc02c5900578be5e99f0d3dbf9d76a5"}}}) + list, err := p.DownloadList(packagePool) + c.Check(err, IsNil) + c.Check(list, DeepEquals, []PackageDownloadTask{ + { + File: &p.Files()[0], + }, + }) - err = os.MkdirAll(filepath.Dir(poolPath), 0755) - c.Assert(err, IsNil) + tmpFilepath := filepath.Join(c.MkDir(), "file") + c.Assert(ioutil.WriteFile(tmpFilepath, []byte("abcde"), 0777), IsNil) + p.Files()[0].PoolPath, _ = packagePool.Import(tmpFilepath, p.Files()[0].Filename, &p.Files()[0].Checksums, false) - file, err := os.Create(poolPath) - c.Assert(err, IsNil) - file.WriteString("abcde") - file.Close() - - list, err = p.DownloadList(packagePool) - c.Check(err, IsNil) - c.Check(list, DeepEquals, []PackageDownloadTask{})*/ + list, err = p.DownloadList(packagePool) + c.Check(err, IsNil) + c.Check(list, DeepEquals, []PackageDownloadTask{}) } func (s *PackageSuite) TestVerifyFiles(c *C) { diff --git a/deb/remote.go b/deb/remote.go index 9d80e9b1..3ddb5adc 100644 --- a/deb/remote.go +++ b/deb/remote.go @@ -477,11 +477,6 @@ func (repo *RemoteRepo) DownloadPackageIndexes(progress aptly.Progress, d aptly. return err } } - - err = collectionFactory.PackageCollection().Update(p) - if err != nil { - return err - } } progress.ShutdownBar() @@ -508,7 +503,7 @@ func (repo *RemoteRepo) ApplyFilter(dependencyOptions int, filterQuery PackageQu // BuildDownloadQueue builds queue, discards current PackageList func (repo *RemoteRepo) BuildDownloadQueue(packagePool aptly.PackagePool, skipExistingPackages bool) (queue []PackageDownloadTask, downloadSize int64, err error) { queue = make([]PackageDownloadTask, 0, repo.packageList.Len()) - seen := make(map[string]struct{}, repo.packageList.Len()) + seen := make(map[string]int, repo.packageList.Len()) err = repo.packageList.ForEach(func(p *Package) error { download := true @@ -521,15 +516,17 @@ func (repo *RemoteRepo) BuildDownloadQueue(packagePool aptly.PackagePool, skipEx if err2 != nil { return err2 } - p.files = nil for _, task := range list { - key := task.RepoURI + "-" + task.DestinationPath - _, found := seen[key] + key := task.File.DownloadURL() + idx, found := seen[key] if !found { queue = append(queue, task) - downloadSize += task.Checksums.Size - seen[key] = struct{}{} + downloadSize += task.File.Checksums.Size + seen[key] = len(queue) - 1 + } else { + // hook up the task to duplicate entry already on the list + queue[idx].Additional = append(queue[idx].Additional, task) } } } @@ -539,17 +536,22 @@ func (repo *RemoteRepo) BuildDownloadQueue(packagePool aptly.PackagePool, skipEx return } - repo.tempPackageRefs = NewPackageRefListFromPackageList(repo.packageList) - // free up package list, we don't need it after this point - repo.packageList = nil - return } // FinalizeDownload swaps for final value of package refs -func (repo *RemoteRepo) FinalizeDownload() { +func (repo *RemoteRepo) FinalizeDownload(collectionFactory *CollectionFactory) error { repo.LastDownloadDate = time.Now() - repo.packageRefs = repo.tempPackageRefs + repo.packageRefs = NewPackageRefListFromPackageList(repo.packageList) + + // update all the packages in collection + err := repo.packageList.ForEach(func(p *Package) error { + return collectionFactory.PackageCollection().Update(p) + }) + + repo.packageList = nil + + return err } // Encode does msgpack encoding of RemoteRepo diff --git a/deb/remote_test.go b/deb/remote_test.go index a4bb00b1..53925b4d 100644 --- a/deb/remote_test.go +++ b/deb/remote_test.go @@ -267,11 +267,12 @@ func (s *RemoteRepoSuite) TestDownload(c *C) { c.Assert(s.downloader.Empty(), Equals, true) queue, size, err := s.repo.BuildDownloadQueue(s.packagePool, false) + c.Assert(err, IsNil) c.Check(size, Equals, int64(3)) c.Check(queue, HasLen, 1) - c.Check(queue[0].RepoURI, Equals, "pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb") + c.Check(queue[0].File.DownloadURL(), Equals, "pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb") - s.repo.FinalizeDownload() + s.repo.FinalizeDownload(s.collectionFactory) c.Assert(s.repo.packageRefs, NotNil) pkg, err := s.collectionFactory.PackageCollection().ByKey(s.repo.packageRefs.Refs[0]) @@ -293,10 +294,11 @@ func (s *RemoteRepoSuite) TestDownload(c *C) { c.Assert(s.downloader.Empty(), Equals, true) queue, size, err = s.repo.BuildDownloadQueue(s.packagePool, true) + c.Assert(err, IsNil) c.Check(size, Equals, int64(0)) c.Check(queue, HasLen, 0) - s.repo.FinalizeDownload() + s.repo.FinalizeDownload(s.collectionFactory) c.Assert(s.repo.packageRefs, NotNil) // Next call must return the download list without option "skip-existing-packages" @@ -313,11 +315,12 @@ func (s *RemoteRepoSuite) TestDownload(c *C) { c.Assert(s.downloader.Empty(), Equals, true) queue, size, err = s.repo.BuildDownloadQueue(s.packagePool, false) + c.Assert(err, IsNil) c.Check(size, Equals, int64(3)) c.Check(queue, HasLen, 1) - c.Check(queue[0].RepoURI, Equals, "pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb") + c.Check(queue[0].File.DownloadURL(), Equals, "pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb") - s.repo.FinalizeDownload() + s.repo.FinalizeDownload(s.collectionFactory) c.Assert(s.repo.packageRefs, NotNil) } @@ -340,12 +343,13 @@ func (s *RemoteRepoSuite) TestDownloadWithSources(c *C) { c.Assert(s.downloader.Empty(), Equals, true) queue, size, err := s.repo.BuildDownloadQueue(s.packagePool, false) + c.Assert(err, IsNil) c.Check(size, Equals, int64(15)) c.Check(queue, HasLen, 4) q := make([]string, 4) for i := range q { - q[i] = queue[i].RepoURI + q[i] = queue[i].File.DownloadURL() } sort.Strings(q) c.Check(q[3], Equals, "pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb") @@ -353,7 +357,7 @@ func (s *RemoteRepoSuite) TestDownloadWithSources(c *C) { c.Check(q[2], Equals, "pool/main/a/access-modifier-checker/access-modifier-checker_1.0.orig.tar.gz") c.Check(q[0], Equals, "pool/main/a/access-modifier-checker/access-modifier-checker_1.0-4.debian.tar.gz") - s.repo.FinalizeDownload() + s.repo.FinalizeDownload(s.collectionFactory) c.Assert(s.repo.packageRefs, NotNil) pkg, err := s.collectionFactory.PackageCollection().ByKey(s.repo.packageRefs.Refs[0]) @@ -383,10 +387,11 @@ func (s *RemoteRepoSuite) TestDownloadWithSources(c *C) { c.Assert(s.downloader.Empty(), Equals, true) queue, size, err = s.repo.BuildDownloadQueue(s.packagePool, true) + c.Assert(err, IsNil) c.Check(size, Equals, int64(0)) c.Check(queue, HasLen, 0) - s.repo.FinalizeDownload() + s.repo.FinalizeDownload(s.collectionFactory) c.Assert(s.repo.packageRefs, NotNil) // Next call must return the download list without option "skip-existing-packages" @@ -407,10 +412,11 @@ func (s *RemoteRepoSuite) TestDownloadWithSources(c *C) { c.Assert(s.downloader.Empty(), Equals, true) queue, size, err = s.repo.BuildDownloadQueue(s.packagePool, false) + c.Assert(err, IsNil) c.Check(size, Equals, int64(15)) c.Check(queue, HasLen, 4) - s.repo.FinalizeDownload() + s.repo.FinalizeDownload(s.collectionFactory) c.Assert(s.repo.packageRefs, NotNil) } @@ -430,11 +436,12 @@ func (s *RemoteRepoSuite) TestDownloadFlat(c *C) { c.Assert(downloader.Empty(), Equals, true) queue, size, err := s.flat.BuildDownloadQueue(s.packagePool, false) + c.Assert(err, IsNil) c.Check(size, Equals, int64(3)) c.Check(queue, HasLen, 1) - c.Check(queue[0].RepoURI, Equals, "pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb") + c.Check(queue[0].File.DownloadURL(), Equals, "pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb") - s.flat.FinalizeDownload() + s.flat.FinalizeDownload(s.collectionFactory) c.Assert(s.flat.packageRefs, NotNil) pkg, err := s.collectionFactory.PackageCollection().ByKey(s.flat.packageRefs.Refs[0]) @@ -457,10 +464,11 @@ func (s *RemoteRepoSuite) TestDownloadFlat(c *C) { c.Assert(downloader.Empty(), Equals, true) queue, size, err = s.flat.BuildDownloadQueue(s.packagePool, true) + c.Assert(err, IsNil) c.Check(size, Equals, int64(0)) c.Check(queue, HasLen, 0) - s.flat.FinalizeDownload() + s.flat.FinalizeDownload(s.collectionFactory) c.Assert(s.flat.packageRefs, NotNil) // Next call must return the download list without option "skip-existing-packages" @@ -478,11 +486,12 @@ func (s *RemoteRepoSuite) TestDownloadFlat(c *C) { c.Assert(downloader.Empty(), Equals, true) queue, size, err = s.flat.BuildDownloadQueue(s.packagePool, false) + c.Assert(err, IsNil) c.Check(size, Equals, int64(3)) c.Check(queue, HasLen, 1) - c.Check(queue[0].RepoURI, Equals, "pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb") + c.Check(queue[0].File.DownloadURL(), Equals, "pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb") - s.flat.FinalizeDownload() + s.flat.FinalizeDownload(s.collectionFactory) c.Assert(s.flat.packageRefs, NotNil) } @@ -508,12 +517,13 @@ func (s *RemoteRepoSuite) TestDownloadWithSourcesFlat(c *C) { c.Assert(downloader.Empty(), Equals, true) queue, size, err := s.flat.BuildDownloadQueue(s.packagePool, false) + c.Assert(err, IsNil) c.Check(size, Equals, int64(15)) c.Check(queue, HasLen, 4) q := make([]string, 4) for i := range q { - q[i] = queue[i].RepoURI + q[i] = queue[i].File.DownloadURL() } sort.Strings(q) c.Check(q[3], Equals, "pool/main/a/amanda/amanda-client_3.3.1-3~bpo60+1_amd64.deb") @@ -521,7 +531,7 @@ func (s *RemoteRepoSuite) TestDownloadWithSourcesFlat(c *C) { c.Check(q[2], Equals, "pool/main/a/access-modifier-checker/access-modifier-checker_1.0.orig.tar.gz") c.Check(q[0], Equals, "pool/main/a/access-modifier-checker/access-modifier-checker_1.0-4.debian.tar.gz") - s.flat.FinalizeDownload() + s.flat.FinalizeDownload(s.collectionFactory) c.Assert(s.flat.packageRefs, NotNil) pkg, err := s.collectionFactory.PackageCollection().ByKey(s.flat.packageRefs.Refs[0]) @@ -553,10 +563,11 @@ func (s *RemoteRepoSuite) TestDownloadWithSourcesFlat(c *C) { c.Assert(downloader.Empty(), Equals, true) queue, size, err = s.flat.BuildDownloadQueue(s.packagePool, true) + c.Assert(err, IsNil) c.Check(size, Equals, int64(0)) c.Check(queue, HasLen, 0) - s.flat.FinalizeDownload() + s.flat.FinalizeDownload(s.collectionFactory) c.Assert(s.flat.packageRefs, NotNil) // Next call must return the download list without option "skip-existing-packages" @@ -578,10 +589,11 @@ func (s *RemoteRepoSuite) TestDownloadWithSourcesFlat(c *C) { c.Assert(downloader.Empty(), Equals, true) queue, size, err = s.flat.BuildDownloadQueue(s.packagePool, false) + c.Assert(err, IsNil) c.Check(size, Equals, int64(15)) c.Check(queue, HasLen, 4) - s.flat.FinalizeDownload() + s.flat.FinalizeDownload(s.collectionFactory) c.Assert(s.flat.packageRefs, NotNil) } diff --git a/files/package_pool_test.go b/files/package_pool_test.go index 1734b370..30cef716 100644 --- a/files/package_pool_test.go +++ b/files/package_pool_test.go @@ -132,7 +132,7 @@ func (s *PackagePoolSuite) TestImportMove(c *C) { info, err := s.pool.Stat(path) c.Assert(err, IsNil) c.Check(info.Size(), Equals, int64(2738)) - c.Check(info.Sys().(*syscall.Stat_t).Nlink, Equals, uint16(1)) + c.Check(int(info.Sys().(*syscall.Stat_t).Nlink), Equals, 1) } func (s *PackagePoolSuite) TestImportNotExist(c *C) { @@ -204,7 +204,7 @@ func (s *PackagePoolSuite) TestSymlink(c *C) { info, err = os.Lstat(dstPath) c.Assert(err, IsNil) - c.Check(info.Sys().(*syscall.Stat_t).Mode&syscall.S_IFMT, Equals, uint16(syscall.S_IFLNK)) + c.Check(int(info.Sys().(*syscall.Stat_t).Mode&syscall.S_IFMT), Equals, int(syscall.S_IFLNK)) } func (s *PackagePoolSuite) TestGenerateRandomPath(c *C) {