Move downloader to separate package.

This commit is contained in:
Andrey Smirnov
2013-12-16 00:53:38 +04:00
parent c7cb2a9a83
commit 5a8799f8c1
2 changed files with 228 additions and 0 deletions

125
utils/download.go Normal file
View File

@@ -0,0 +1,125 @@
package utils
import (
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
)
// Downloader is parallel HTTP fetcher
type Downloader struct {
queue chan *downloadTask
stop chan bool
stopped chan bool
threads int
}
// downloadTask represents single item in queue
type downloadTask struct {
url string
destination string
result chan<- error
}
// NewDownloader creates new instance of Downloader which specified number
// of threads
func NewDownloader(threads int) (downloader *Downloader) {
downloader = &Downloader{
queue: make(chan *downloadTask, 1000),
stop: make(chan bool),
stopped: make(chan bool),
threads: threads,
}
for i := 0; i < downloader.threads; i++ {
go downloader.process()
}
return
}
// Shutdown stops downloader after current tasks are finished,
// but doesn't process rest of queue
func (downloader *Downloader) Shutdown() {
for i := 0; i < downloader.threads; i++ {
downloader.stop <- true
}
for i := 0; i < downloader.threads; i++ {
<-downloader.stopped
}
}
// Download starts new download task
func (downloader *Downloader) Download(url string, destination string) <-chan error {
ch := make(chan error, 1)
downloader.queue <- &downloadTask{url: url, destination: destination, result: ch}
return ch
}
// DownloadTemp starts new download to temporary file and returns File
//
// Temporary file would be already removed, so no need to cleanup
func (downloader *Downloader) DownloadTemp(url string) (*os.File, error) {
ch := make(chan error, 1)
tempfile, err := ioutil.TempFile(os.TempDir(), "aptly")
if err != nil {
return nil, err
}
defer os.Remove(tempfile.Name())
downloader.queue <- &downloadTask{url: url, destination: tempfile.Name(), result: ch}
err = <-ch
if err != nil {
tempfile.Close()
return nil, err
}
return tempfile, nil
}
// handleTask processes single download task
func (downloader *Downloader) handleTask(task *downloadTask) {
resp, err := http.Get(task.url)
if err != nil {
task.result <- err
return
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
task.result <- fmt.Errorf("HTTP code %d while fetching %s", resp.StatusCode, task.url)
return
}
outfile, err := os.Create(task.destination)
if err != nil {
task.result <- err
return
}
defer outfile.Close()
io.Copy(outfile, resp.Body)
task.result <- nil
}
// process implements download thread in goroutine
func (downloader *Downloader) process() {
for {
select {
case <-downloader.stop:
downloader.stopped <- true
return
case task := <-downloader.queue:
downloader.handleTask(task)
}
}
}

103
utils/download_test.go Normal file
View File

@@ -0,0 +1,103 @@
package utils
import (
"io/ioutil"
. "launchpad.net/gocheck"
"os"
"runtime"
"testing"
"time"
)
// Launch gocheck tests
func Test(t *testing.T) {
TestingT(t)
}
type DownloaderSuite struct {
tempfile *os.File
}
var _ = Suite(&DownloaderSuite{})
func (s *DownloaderSuite) SetUpTest(c *C) {
s.tempfile, _ = ioutil.TempFile(os.TempDir(), "aptly-test")
}
func (s *DownloaderSuite) TearDownTest(c *C) {
os.Remove(s.tempfile.Name())
s.tempfile.Close()
}
func (s *DownloaderSuite) TestStartupShutdown(c *C) {
goroutines := runtime.NumGoroutine()
d := NewDownloader(10)
d.Shutdown()
// wait for goroutines to shutdown
time.Sleep(100 * time.Millisecond)
if runtime.NumGoroutine()-goroutines > 1 {
c.Errorf("Number of goroutines %d, expected %d", runtime.NumGoroutine(), goroutines)
}
}
func (s *DownloaderSuite) TestDownloadOK(c *C) {
d := NewDownloader(2)
defer d.Shutdown()
res := <-d.Download("http://smira.ru/", s.tempfile.Name())
c.Assert(res, IsNil)
}
func (s *DownloaderSuite) TestDownload404(c *C) {
d := NewDownloader(2)
defer d.Shutdown()
res := <-d.Download("http://smira.ru/doesntexist", s.tempfile.Name())
c.Assert(res, ErrorMatches, "HTTP code 404.*")
}
func (s *DownloaderSuite) TestDownloadConnectError(c *C) {
d := NewDownloader(2)
defer d.Shutdown()
res := <-d.Download("http://nosuch.smira.ru/", s.tempfile.Name())
c.Assert(res, ErrorMatches, ".*no such host")
}
func (s *DownloaderSuite) TestDownloadFileError(c *C) {
d := NewDownloader(2)
defer d.Shutdown()
res := <-d.Download("http://smira.ru/", "/no/such/file")
c.Assert(res, ErrorMatches, ".*no such file or directory")
}
func (s *DownloaderSuite) TestDownloadTemp(c *C) {
d := NewDownloader(2)
defer d.Shutdown()
f, err := d.DownloadTemp("http://smira.ru/")
c.Assert(err, IsNil)
defer f.Close()
buf := make([]byte, 1)
f.Read(buf)
c.Assert(buf, DeepEquals, []byte("<"))
_, err = os.Stat(f.Name())
c.Assert(os.IsNotExist(err), Equals, true)
}
func (s *DownloaderSuite) TestDownloadTempError(c *C) {
d := NewDownloader(2)
defer d.Shutdown()
f, err := d.DownloadTemp("http://smira.ru/doesntexist")
c.Assert(err, NotNil)
c.Assert(f, IsNil)
c.Assert(err, ErrorMatches, "HTTP code 404.*")
}