Method Pause/Resume for Downloader.

This commit is contained in:
Andrey Smirnov
2014-01-14 14:11:28 +04:00
parent e63bbe839b
commit 7afcc93507
3 changed files with 38 additions and 0 deletions
+22
View File
@@ -15,6 +15,8 @@ import (
// Downloader is parallel HTTP fetcher // Downloader is parallel HTTP fetcher
type Downloader interface { type Downloader interface {
Download(url string, destination string, result chan<- error) Download(url string, destination string, result chan<- error)
Pause()
Resume()
Shutdown() Shutdown()
} }
@@ -28,6 +30,8 @@ type downloaderImpl struct {
queue chan *downloadTask queue chan *downloadTask
stop chan bool stop chan bool
stopped chan bool stopped chan bool
pause chan bool
unpause chan bool
threads int threads int
} }
@@ -45,6 +49,8 @@ func NewDownloader(threads int) Downloader {
queue: make(chan *downloadTask, 1000), queue: make(chan *downloadTask, 1000),
stop: make(chan bool), stop: make(chan bool),
stopped: make(chan bool), stopped: make(chan bool),
pause: make(chan bool),
unpause: make(chan bool),
threads: threads, threads: threads,
} }
@@ -67,6 +73,20 @@ func (downloader *downloaderImpl) Shutdown() {
} }
} }
// Pause pauses task processing
func (downloader *downloaderImpl) Pause() {
for i := 0; i < downloader.threads; i++ {
downloader.pause <- true
}
}
// Resume resumes task processing
func (downloader *downloaderImpl) Resume() {
for i := 0; i < downloader.threads; i++ {
downloader.unpause <- true
}
}
// Download starts new download task // Download starts new download task
func (downloader *downloaderImpl) Download(url string, destination string, result chan<- error) { func (downloader *downloaderImpl) Download(url string, destination string, result chan<- error) {
downloader.queue <- &downloadTask{url: url, destination: destination, result: result} downloader.queue <- &downloadTask{url: url, destination: destination, result: result}
@@ -127,6 +147,8 @@ func (downloader *downloaderImpl) process() {
case <-downloader.stop: case <-downloader.stop:
downloader.stopped <- true downloader.stopped <- true
return return
case <-downloader.pause:
<-downloader.unpause
case task := <-downloader.queue: case task := <-downloader.queue:
downloader.handleTask(task) downloader.handleTask(task)
} }
+8
View File
@@ -45,6 +45,14 @@ func (s *DownloaderSuite) TestStartupShutdown(c *C) {
} }
} }
func (s *DownloaderSuite) TestPauseResume(c *C) {
d := NewDownloader(2)
defer d.Shutdown()
d.Pause()
d.Resume()
}
func (s *DownloaderSuite) TestDownloadOK(c *C) { func (s *DownloaderSuite) TestDownloadOK(c *C) {
d := NewDownloader(2) d := NewDownloader(2)
defer d.Shutdown() defer d.Shutdown()
+8
View File
@@ -88,3 +88,11 @@ func (f *FakeDownloader) Download(url string, filename string, result chan<- err
// Shutdown does nothing // Shutdown does nothing
func (f *FakeDownloader) Shutdown() { func (f *FakeDownloader) Shutdown() {
} }
// Pause does nothing
func (f *FakeDownloader) Pause() {
}
// Resume does nothing
func (f *FakeDownloader) Resume() {
}