From 11456e89595cbf901ef16a08985b453dac38a7e0 Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Fri, 13 Dec 2013 21:51:47 +0400 Subject: [PATCH] Downloader. --- .gitignore | 25 +++++++++++++ utils.go | 100 ++++++++++++++++++++++++++++++++++++++++++++++++++ utils_test.go | 76 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 201 insertions(+) create mode 100644 .gitignore create mode 100644 utils.go create mode 100644 utils_test.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..4f2d8b7a --- /dev/null +++ b/.gitignore @@ -0,0 +1,25 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test + +coverage.html \ No newline at end of file diff --git a/utils.go b/utils.go new file mode 100644 index 00000000..8b59751a --- /dev/null +++ b/utils.go @@ -0,0 +1,100 @@ +package main + +import ( + "fmt" + "io" + "net/http" + "os" +) + +// Downloader is parallel HTTP fetcher +type Downloader struct { + queue chan *downloadTask + stop chan bool + stopped chan bool + threads int +} + +// downloadTask represents single item in queue +type downloadTask struct { + url string + destination string + result chan<- error +} + +// NewDownloader creates new instance of Downloader which specified number +// of threads +func NewDownloader(threads int) (downloader *Downloader) { + downloader = &Downloader{ + queue: make(chan *downloadTask, 1000), + stop: make(chan bool), + stopped: make(chan bool), + threads: threads, + } + + for i := 0; i < downloader.threads; i++ { + go downloader.process() + } + + return +} + +// Shutdown stops downloader after current tasks are finished, +// but doesn't process rest of queue +func (downloader *Downloader) Shutdown() { + for i := 0; i < downloader.threads; i++ { + downloader.stop <- true + } + + for i := 0; i < downloader.threads; i++ { + <-downloader.stopped + } +} + +// Download starts new download task +func (downloader *Downloader) Download(url string, destination string) <-chan error { + ch := make(chan error, 1) + + downloader.queue <- &downloadTask{url: url, destination: destination, result: ch} + + return ch +} + +// handleTask processes single download task +func (downloader *Downloader) handleTask(task *downloadTask) { + resp, err := http.Get(task.url) + if err != nil { + task.result <- err + return + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode > 299 { + task.result <- fmt.Errorf("HTTP code %d while fetching %s", resp.StatusCode, task.url) + return + } + + outfile, err := os.Create(task.destination) + if err != nil { + task.result <- err + return + } + defer outfile.Close() + + io.Copy(outfile, resp.Body) + + task.result <- nil +} + +// process implements download thread in goroutine +func (downloader *Downloader) process() { + for { + select { + case <-downloader.stop: + downloader.stopped <- true + return + case task := <-downloader.queue: + downloader.handleTask(task) + } + } +} diff --git a/utils_test.go b/utils_test.go new file mode 100644 index 00000000..3a85fafe --- /dev/null +++ b/utils_test.go @@ -0,0 +1,76 @@ +package main + +import ( + "io/ioutil" + . "launchpad.net/gocheck" + "os" + "runtime" + "testing" + "time" +) + +// Launch gocheck tests +func Test(t *testing.T) { + TestingT(t) +} + +type DownloaderSuite struct { + tempfile *os.File +} + +var _ = Suite(&DownloaderSuite{}) + +func (s *DownloaderSuite) SetUpTest(c *C) { + s.tempfile, _ = ioutil.TempFile(os.TempDir(), "aptly-test") +} + +func (s *DownloaderSuite) TearDownTest(c *C) { + os.Remove(s.tempfile.Name()) + s.tempfile.Close() +} + +func (s *DownloaderSuite) TestStartupShutdown(c *C) { + goroutines := runtime.NumGoroutine() + + d := NewDownloader(10) + d.Shutdown() + + // wait for goroutines to shutdown + time.Sleep(100 * time.Millisecond) + + if runtime.NumGoroutine()-goroutines > 1 { + c.Errorf("Number of goroutines %d, expected %d", runtime.NumGoroutine(), goroutines) + } +} + +func (s *DownloaderSuite) TestDownloadOK(c *C) { + d := NewDownloader(2) + defer d.Shutdown() + + res := <-d.Download("http://smira.ru/", s.tempfile.Name()) + c.Assert(res, IsNil) +} + +func (s *DownloaderSuite) TestDownload404(c *C) { + d := NewDownloader(2) + defer d.Shutdown() + + res := <-d.Download("http://smira.ru/doesntexist", s.tempfile.Name()) + c.Assert(res, ErrorMatches, "HTTP code 404.*") +} + +func (s *DownloaderSuite) TestDownloadConnectError(c *C) { + d := NewDownloader(2) + defer d.Shutdown() + + res := <-d.Download("http://nosuch.smira.ru/", s.tempfile.Name()) + c.Assert(res, ErrorMatches, ".*no such host") +} + +func (s *DownloaderSuite) TestDownloadFileError(c *C) { + d := NewDownloader(2) + defer d.Shutdown() + + res := <-d.Download("http://smira.ru/", "/no/such/file") + c.Assert(res, ErrorMatches, ".*no such file or directory") +}