Setting for downloader to limit download speed to specified level. #62

This commit is contained in:
Andrey Smirnov
2014-07-12 23:56:32 +04:00
parent 86206df58d
commit fb1e28b91b
2 changed files with 30 additions and 21 deletions
+20 -11
View File
@@ -1,6 +1,7 @@
package http package http
import ( import (
"code.google.com/p/mxk/go1/flowcontrol"
"compress/bzip2" "compress/bzip2"
"compress/gzip" "compress/gzip"
"fmt" "fmt"
@@ -21,14 +22,15 @@ var (
// downloaderImpl is implementation of Downloader interface // downloaderImpl is implementation of Downloader interface
type downloaderImpl struct { type downloaderImpl struct {
queue chan *downloadTask queue chan *downloadTask
stop chan bool stop chan bool
stopped chan bool stopped chan bool
pause chan bool pause chan bool
unpause chan bool unpause chan bool
progress aptly.Progress progress aptly.Progress
threads int aggWriter io.Writer
client *http.Client threads int
client *http.Client
} }
// downloadTask represents single item in queue // downloadTask represents single item in queue
@@ -41,8 +43,8 @@ type downloadTask struct {
} }
// NewDownloader creates new instance of Downloader which specified number // NewDownloader creates new instance of Downloader which specified number
// of threads // of threads and download limit in bytes/sec
func NewDownloader(threads int, progress aptly.Progress) aptly.Downloader { func NewDownloader(threads int, downLimit int64, progress aptly.Progress) aptly.Downloader {
downloader := &downloaderImpl{ downloader := &downloaderImpl{
queue: make(chan *downloadTask, 1000), queue: make(chan *downloadTask, 1000),
stop: make(chan bool), stop: make(chan bool),
@@ -59,6 +61,13 @@ func NewDownloader(threads int, progress aptly.Progress) aptly.Downloader {
}, },
} }
fmt.Printf("downLimit = %v\n", downLimit)
if downLimit > 0 {
downloader.aggWriter = flowcontrol.NewWriter(progress, downLimit)
} else {
downloader.aggWriter = progress
}
for i := 0; i < downloader.threads; i++ { for i := 0; i < downloader.threads; i++ {
go downloader.process() go downloader.process()
} }
@@ -140,7 +149,7 @@ func (downloader *downloaderImpl) handleTask(task *downloadTask) {
defer outfile.Close() defer outfile.Close()
checksummer := utils.NewChecksumWriter() checksummer := utils.NewChecksumWriter()
writers := []io.Writer{outfile, downloader.progress} writers := []io.Writer{outfile, downloader.aggWriter}
if task.expected.Size != -1 { if task.expected.Size != -1 {
writers = append(writers, checksummer) writers = append(writers, checksummer)
+10 -10
View File
@@ -60,7 +60,7 @@ func (s *DownloaderSuite) TearDownTest(c *C) {
func (s *DownloaderSuite) TestStartupShutdown(c *C) { func (s *DownloaderSuite) TestStartupShutdown(c *C) {
goroutines := runtime.NumGoroutine() goroutines := runtime.NumGoroutine()
d := NewDownloader(10, s.progress) d := NewDownloader(10, 100, s.progress)
d.Shutdown() d.Shutdown()
// wait for goroutines to shutdown // wait for goroutines to shutdown
@@ -72,7 +72,7 @@ func (s *DownloaderSuite) TestStartupShutdown(c *C) {
} }
func (s *DownloaderSuite) TestPauseResume(c *C) { func (s *DownloaderSuite) TestPauseResume(c *C) {
d := NewDownloader(2, s.progress) d := NewDownloader(2, 0, s.progress)
defer d.Shutdown() defer d.Shutdown()
d.Pause() d.Pause()
@@ -80,7 +80,7 @@ func (s *DownloaderSuite) TestPauseResume(c *C) {
} }
func (s *DownloaderSuite) TestDownloadOK(c *C) { func (s *DownloaderSuite) TestDownloadOK(c *C) {
d := NewDownloader(2, s.progress) d := NewDownloader(2, 0, s.progress)
defer d.Shutdown() defer d.Shutdown()
ch := make(chan error) ch := make(chan error)
@@ -90,7 +90,7 @@ func (s *DownloaderSuite) TestDownloadOK(c *C) {
} }
func (s *DownloaderSuite) TestDownloadWithChecksum(c *C) { func (s *DownloaderSuite) TestDownloadWithChecksum(c *C) {
d := NewDownloader(2, s.progress) d := NewDownloader(2, 0, s.progress)
defer d.Shutdown() defer d.Shutdown()
ch := make(chan error) ch := make(chan error)
@@ -131,7 +131,7 @@ func (s *DownloaderSuite) TestDownloadWithChecksum(c *C) {
} }
func (s *DownloaderSuite) TestDownload404(c *C) { func (s *DownloaderSuite) TestDownload404(c *C) {
d := NewDownloader(2, s.progress) d := NewDownloader(2, 0, s.progress)
defer d.Shutdown() defer d.Shutdown()
ch := make(chan error) ch := make(chan error)
@@ -141,7 +141,7 @@ func (s *DownloaderSuite) TestDownload404(c *C) {
} }
func (s *DownloaderSuite) TestDownloadConnectError(c *C) { func (s *DownloaderSuite) TestDownloadConnectError(c *C) {
d := NewDownloader(2, s.progress) d := NewDownloader(2, 0, s.progress)
defer d.Shutdown() defer d.Shutdown()
ch := make(chan error) ch := make(chan error)
@@ -151,7 +151,7 @@ func (s *DownloaderSuite) TestDownloadConnectError(c *C) {
} }
func (s *DownloaderSuite) TestDownloadFileError(c *C) { func (s *DownloaderSuite) TestDownloadFileError(c *C) {
d := NewDownloader(2, s.progress) d := NewDownloader(2, 0, s.progress)
defer d.Shutdown() defer d.Shutdown()
ch := make(chan error) ch := make(chan error)
@@ -161,7 +161,7 @@ func (s *DownloaderSuite) TestDownloadFileError(c *C) {
} }
func (s *DownloaderSuite) TestDownloadTemp(c *C) { func (s *DownloaderSuite) TestDownloadTemp(c *C) {
d := NewDownloader(2, s.progress) d := NewDownloader(2, 0, s.progress)
defer d.Shutdown() defer d.Shutdown()
f, err := DownloadTemp(d, s.url+"/test") f, err := DownloadTemp(d, s.url+"/test")
@@ -178,7 +178,7 @@ func (s *DownloaderSuite) TestDownloadTemp(c *C) {
} }
func (s *DownloaderSuite) TestDownloadTempWithChecksum(c *C) { func (s *DownloaderSuite) TestDownloadTempWithChecksum(c *C) {
d := NewDownloader(2, s.progress) d := NewDownloader(2, 0, s.progress)
defer d.Shutdown() defer d.Shutdown()
f, err := DownloadTempWithChecksum(d, s.url+"/test", utils.ChecksumInfo{Size: 12, MD5: "a1acb0fe91c7db45ec4d775192ec5738", f, err := DownloadTempWithChecksum(d, s.url+"/test", utils.ChecksumInfo{Size: 12, MD5: "a1acb0fe91c7db45ec4d775192ec5738",
@@ -191,7 +191,7 @@ func (s *DownloaderSuite) TestDownloadTempWithChecksum(c *C) {
} }
func (s *DownloaderSuite) TestDownloadTempError(c *C) { func (s *DownloaderSuite) TestDownloadTempError(c *C) {
d := NewDownloader(2, s.progress) d := NewDownloader(2, 0, s.progress)
defer d.Shutdown() defer d.Shutdown()
f, err := DownloadTemp(d, s.url+"/doesntexist") f, err := DownloadTemp(d, s.url+"/doesntexist")