From 5a8799f8c10d2fbbb232d2774558b97fa3fb771f Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Mon, 16 Dec 2013 00:53:38 +0400 Subject: [PATCH] Move downloader to separate package. --- utils/download.go | 125 +++++++++++++++++++++++++++++++++++++++++ utils/download_test.go | 103 +++++++++++++++++++++++++++++++++ 2 files changed, 228 insertions(+) create mode 100644 utils/download.go create mode 100644 utils/download_test.go diff --git a/utils/download.go b/utils/download.go new file mode 100644 index 00000000..0de58089 --- /dev/null +++ b/utils/download.go @@ -0,0 +1,125 @@ +package utils + +import ( + "fmt" + "io" + "io/ioutil" + "net/http" + "os" +) + +// Downloader is parallel HTTP fetcher +type Downloader struct { + queue chan *downloadTask + stop chan bool + stopped chan bool + threads int +} + +// downloadTask represents single item in queue +type downloadTask struct { + url string + destination string + result chan<- error +} + +// NewDownloader creates new instance of Downloader which specified number +// of threads +func NewDownloader(threads int) (downloader *Downloader) { + downloader = &Downloader{ + queue: make(chan *downloadTask, 1000), + stop: make(chan bool), + stopped: make(chan bool), + threads: threads, + } + + for i := 0; i < downloader.threads; i++ { + go downloader.process() + } + + return +} + +// Shutdown stops downloader after current tasks are finished, +// but doesn't process rest of queue +func (downloader *Downloader) Shutdown() { + for i := 0; i < downloader.threads; i++ { + downloader.stop <- true + } + + for i := 0; i < downloader.threads; i++ { + <-downloader.stopped + } +} + +// Download starts new download task +func (downloader *Downloader) Download(url string, destination string) <-chan error { + ch := make(chan error, 1) + + downloader.queue <- &downloadTask{url: url, destination: destination, result: ch} + + return ch +} + +// DownloadTemp starts new download to temporary file and returns File +// +// Temporary file would be already removed, so no need to cleanup +func (downloader *Downloader) DownloadTemp(url string) (*os.File, error) { + ch := make(chan error, 1) + + tempfile, err := ioutil.TempFile(os.TempDir(), "aptly") + if err != nil { + return nil, err + } + + defer os.Remove(tempfile.Name()) + + downloader.queue <- &downloadTask{url: url, destination: tempfile.Name(), result: ch} + + err = <-ch + if err != nil { + tempfile.Close() + return nil, err + } + + return tempfile, nil +} + +// handleTask processes single download task +func (downloader *Downloader) handleTask(task *downloadTask) { + resp, err := http.Get(task.url) + if err != nil { + task.result <- err + return + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode > 299 { + task.result <- fmt.Errorf("HTTP code %d while fetching %s", resp.StatusCode, task.url) + return + } + + outfile, err := os.Create(task.destination) + if err != nil { + task.result <- err + return + } + defer outfile.Close() + + io.Copy(outfile, resp.Body) + + task.result <- nil +} + +// process implements download thread in goroutine +func (downloader *Downloader) process() { + for { + select { + case <-downloader.stop: + downloader.stopped <- true + return + case task := <-downloader.queue: + downloader.handleTask(task) + } + } +} diff --git a/utils/download_test.go b/utils/download_test.go new file mode 100644 index 00000000..9a9fa2ce --- /dev/null +++ b/utils/download_test.go @@ -0,0 +1,103 @@ +package utils + +import ( + "io/ioutil" + . "launchpad.net/gocheck" + "os" + "runtime" + "testing" + "time" +) + +// Launch gocheck tests +func Test(t *testing.T) { + TestingT(t) +} + +type DownloaderSuite struct { + tempfile *os.File +} + +var _ = Suite(&DownloaderSuite{}) + +func (s *DownloaderSuite) SetUpTest(c *C) { + s.tempfile, _ = ioutil.TempFile(os.TempDir(), "aptly-test") +} + +func (s *DownloaderSuite) TearDownTest(c *C) { + os.Remove(s.tempfile.Name()) + s.tempfile.Close() +} + +func (s *DownloaderSuite) TestStartupShutdown(c *C) { + goroutines := runtime.NumGoroutine() + + d := NewDownloader(10) + d.Shutdown() + + // wait for goroutines to shutdown + time.Sleep(100 * time.Millisecond) + + if runtime.NumGoroutine()-goroutines > 1 { + c.Errorf("Number of goroutines %d, expected %d", runtime.NumGoroutine(), goroutines) + } +} + +func (s *DownloaderSuite) TestDownloadOK(c *C) { + d := NewDownloader(2) + defer d.Shutdown() + + res := <-d.Download("http://smira.ru/", s.tempfile.Name()) + c.Assert(res, IsNil) +} + +func (s *DownloaderSuite) TestDownload404(c *C) { + d := NewDownloader(2) + defer d.Shutdown() + + res := <-d.Download("http://smira.ru/doesntexist", s.tempfile.Name()) + c.Assert(res, ErrorMatches, "HTTP code 404.*") +} + +func (s *DownloaderSuite) TestDownloadConnectError(c *C) { + d := NewDownloader(2) + defer d.Shutdown() + + res := <-d.Download("http://nosuch.smira.ru/", s.tempfile.Name()) + c.Assert(res, ErrorMatches, ".*no such host") +} + +func (s *DownloaderSuite) TestDownloadFileError(c *C) { + d := NewDownloader(2) + defer d.Shutdown() + + res := <-d.Download("http://smira.ru/", "/no/such/file") + c.Assert(res, ErrorMatches, ".*no such file or directory") +} + +func (s *DownloaderSuite) TestDownloadTemp(c *C) { + d := NewDownloader(2) + defer d.Shutdown() + + f, err := d.DownloadTemp("http://smira.ru/") + c.Assert(err, IsNil) + defer f.Close() + + buf := make([]byte, 1) + + f.Read(buf) + c.Assert(buf, DeepEquals, []byte("<")) + + _, err = os.Stat(f.Name()) + c.Assert(os.IsNotExist(err), Equals, true) +} + +func (s *DownloaderSuite) TestDownloadTempError(c *C) { + d := NewDownloader(2) + defer d.Shutdown() + + f, err := d.DownloadTemp("http://smira.ru/doesntexist") + c.Assert(err, NotNil) + c.Assert(f, IsNil) + c.Assert(err, ErrorMatches, "HTTP code 404.*") +}