mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-01-11 03:11:50 +00:00
Rework downloader to be interface + implementation.
This commit is contained in:
@@ -10,7 +10,19 @@ import (
|
||||
)
|
||||
|
||||
// Downloader is parallel HTTP fetcher
|
||||
type Downloader struct {
|
||||
type Downloader interface {
|
||||
Download(url string, destination string) <-chan error
|
||||
DownloadTemp(url string) (*os.File, error)
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
// Check interface
|
||||
var (
|
||||
_ Downloader = &downloaderImpl{}
|
||||
)
|
||||
|
||||
// downloaderImpl is implementation of Downloader interface
|
||||
type downloaderImpl struct {
|
||||
queue chan *downloadTask
|
||||
stop chan bool
|
||||
stopped chan bool
|
||||
@@ -26,8 +38,8 @@ type downloadTask struct {
|
||||
|
||||
// NewDownloader creates new instance of Downloader which specified number
|
||||
// of threads
|
||||
func NewDownloader(threads int) (downloader *Downloader) {
|
||||
downloader = &Downloader{
|
||||
func NewDownloader(threads int) Downloader {
|
||||
downloader := &downloaderImpl{
|
||||
queue: make(chan *downloadTask, 1000),
|
||||
stop: make(chan bool),
|
||||
stopped: make(chan bool),
|
||||
@@ -38,12 +50,12 @@ func NewDownloader(threads int) (downloader *Downloader) {
|
||||
go downloader.process()
|
||||
}
|
||||
|
||||
return
|
||||
return downloader
|
||||
}
|
||||
|
||||
// Shutdown stops downloader after current tasks are finished,
|
||||
// but doesn't process rest of queue
|
||||
func (downloader *Downloader) Shutdown() {
|
||||
func (downloader *downloaderImpl) Shutdown() {
|
||||
for i := 0; i < downloader.threads; i++ {
|
||||
downloader.stop <- true
|
||||
}
|
||||
@@ -54,7 +66,7 @@ func (downloader *Downloader) Shutdown() {
|
||||
}
|
||||
|
||||
// Download starts new download task
|
||||
func (downloader *Downloader) Download(url string, destination string) <-chan error {
|
||||
func (downloader *downloaderImpl) Download(url string, destination string) <-chan error {
|
||||
ch := make(chan error, 1)
|
||||
|
||||
downloader.queue <- &downloadTask{url: url, destination: destination, result: ch}
|
||||
@@ -65,7 +77,7 @@ func (downloader *Downloader) Download(url string, destination string) <-chan er
|
||||
// DownloadTemp starts new download to temporary file and returns File
|
||||
//
|
||||
// Temporary file would be already removed, so no need to cleanup
|
||||
func (downloader *Downloader) DownloadTemp(url string) (*os.File, error) {
|
||||
func (downloader *downloaderImpl) DownloadTemp(url string) (*os.File, error) {
|
||||
ch := make(chan error, 1)
|
||||
|
||||
tempfile, err := ioutil.TempFile(os.TempDir(), "aptly")
|
||||
@@ -87,7 +99,7 @@ func (downloader *Downloader) DownloadTemp(url string) (*os.File, error) {
|
||||
}
|
||||
|
||||
// handleTask processes single download task
|
||||
func (downloader *Downloader) handleTask(task *downloadTask) {
|
||||
func (downloader *downloaderImpl) handleTask(task *downloadTask) {
|
||||
log.Printf("Downloading %s...\n", task.url)
|
||||
|
||||
resp, err := http.Get(task.url)
|
||||
@@ -115,7 +127,7 @@ func (downloader *Downloader) handleTask(task *downloadTask) {
|
||||
}
|
||||
|
||||
// process implements download thread in goroutine
|
||||
func (downloader *Downloader) process() {
|
||||
func (downloader *downloaderImpl) process() {
|
||||
for {
|
||||
select {
|
||||
case <-downloader.stop:
|
||||
|
||||
Reference in New Issue
Block a user