diff --git a/http/download.go b/http/download.go index f739ae36..c84947e4 100644 --- a/http/download.go +++ b/http/download.go @@ -1,6 +1,7 @@ package http import ( + "code.google.com/p/mxk/go1/flowcontrol" "compress/bzip2" "compress/gzip" "fmt" @@ -21,14 +22,15 @@ var ( // downloaderImpl is implementation of Downloader interface type downloaderImpl struct { - queue chan *downloadTask - stop chan bool - stopped chan bool - pause chan bool - unpause chan bool - progress aptly.Progress - threads int - client *http.Client + queue chan *downloadTask + stop chan bool + stopped chan bool + pause chan bool + unpause chan bool + progress aptly.Progress + aggWriter io.Writer + threads int + client *http.Client } // downloadTask represents single item in queue @@ -41,8 +43,8 @@ type downloadTask struct { } // NewDownloader creates new instance of Downloader which specified number -// of threads -func NewDownloader(threads int, progress aptly.Progress) aptly.Downloader { +// of threads and download limit in bytes/sec +func NewDownloader(threads int, downLimit int64, progress aptly.Progress) aptly.Downloader { downloader := &downloaderImpl{ queue: make(chan *downloadTask, 1000), stop: make(chan bool), @@ -59,6 +61,13 @@ func NewDownloader(threads int, progress aptly.Progress) aptly.Downloader { }, } + fmt.Printf("downLimit = %v\n", downLimit) + if downLimit > 0 { + downloader.aggWriter = flowcontrol.NewWriter(progress, downLimit) + } else { + downloader.aggWriter = progress + } + for i := 0; i < downloader.threads; i++ { go downloader.process() } @@ -140,7 +149,7 @@ func (downloader *downloaderImpl) handleTask(task *downloadTask) { defer outfile.Close() checksummer := utils.NewChecksumWriter() - writers := []io.Writer{outfile, downloader.progress} + writers := []io.Writer{outfile, downloader.aggWriter} if task.expected.Size != -1 { writers = append(writers, checksummer) diff --git a/http/download_test.go b/http/download_test.go index 9e6db6ea..cfe51c02 100644 --- a/http/download_test.go +++ b/http/download_test.go @@ -60,7 +60,7 @@ func (s *DownloaderSuite) TearDownTest(c *C) { func (s *DownloaderSuite) TestStartupShutdown(c *C) { goroutines := runtime.NumGoroutine() - d := NewDownloader(10, s.progress) + d := NewDownloader(10, 100, s.progress) d.Shutdown() // wait for goroutines to shutdown @@ -72,7 +72,7 @@ func (s *DownloaderSuite) TestStartupShutdown(c *C) { } func (s *DownloaderSuite) TestPauseResume(c *C) { - d := NewDownloader(2, s.progress) + d := NewDownloader(2, 0, s.progress) defer d.Shutdown() d.Pause() @@ -80,7 +80,7 @@ func (s *DownloaderSuite) TestPauseResume(c *C) { } func (s *DownloaderSuite) TestDownloadOK(c *C) { - d := NewDownloader(2, s.progress) + d := NewDownloader(2, 0, s.progress) defer d.Shutdown() ch := make(chan error) @@ -90,7 +90,7 @@ func (s *DownloaderSuite) TestDownloadOK(c *C) { } func (s *DownloaderSuite) TestDownloadWithChecksum(c *C) { - d := NewDownloader(2, s.progress) + d := NewDownloader(2, 0, s.progress) defer d.Shutdown() ch := make(chan error) @@ -131,7 +131,7 @@ func (s *DownloaderSuite) TestDownloadWithChecksum(c *C) { } func (s *DownloaderSuite) TestDownload404(c *C) { - d := NewDownloader(2, s.progress) + d := NewDownloader(2, 0, s.progress) defer d.Shutdown() ch := make(chan error) @@ -141,7 +141,7 @@ func (s *DownloaderSuite) TestDownload404(c *C) { } func (s *DownloaderSuite) TestDownloadConnectError(c *C) { - d := NewDownloader(2, s.progress) + d := NewDownloader(2, 0, s.progress) defer d.Shutdown() ch := make(chan error) @@ -151,7 +151,7 @@ func (s *DownloaderSuite) TestDownloadConnectError(c *C) { } func (s *DownloaderSuite) TestDownloadFileError(c *C) { - d := NewDownloader(2, s.progress) + d := NewDownloader(2, 0, s.progress) defer d.Shutdown() ch := make(chan error) @@ -161,7 +161,7 @@ func (s *DownloaderSuite) TestDownloadFileError(c *C) { } func (s *DownloaderSuite) TestDownloadTemp(c *C) { - d := NewDownloader(2, s.progress) + d := NewDownloader(2, 0, s.progress) defer d.Shutdown() f, err := DownloadTemp(d, s.url+"/test") @@ -178,7 +178,7 @@ func (s *DownloaderSuite) TestDownloadTemp(c *C) { } func (s *DownloaderSuite) TestDownloadTempWithChecksum(c *C) { - d := NewDownloader(2, s.progress) + d := NewDownloader(2, 0, s.progress) defer d.Shutdown() f, err := DownloadTempWithChecksum(d, s.url+"/test", utils.ChecksumInfo{Size: 12, MD5: "a1acb0fe91c7db45ec4d775192ec5738", @@ -191,7 +191,7 @@ func (s *DownloaderSuite) TestDownloadTempWithChecksum(c *C) { } func (s *DownloaderSuite) TestDownloadTempError(c *C) { - d := NewDownloader(2, s.progress) + d := NewDownloader(2, 0, s.progress) defer d.Shutdown() f, err := DownloadTemp(d, s.url+"/doesntexist")