diff --git a/context/context.go b/context/context.go index 5c4e9d5f..ea246f06 100644 --- a/context/context.go +++ b/context/context.go @@ -567,6 +567,9 @@ func (context *AptlyContext) Shutdown() { context.fileMemProfile = nil } } + if context.taskList != nil { + context.taskList.Stop() + } if context.database != nil { context.database.Close() context.database = nil diff --git a/system/t12_api/tasks.py b/system/t12_api/tasks.py index 5291689c..42b17c98 100644 --- a/system/t12_api/tasks.py +++ b/system/t12_api/tasks.py @@ -20,9 +20,9 @@ class TaskAPITestParallelTasks(APITest): resp = self.put("/api/mirrors/" + mirror_name, json=mirror_desc, params={'_async': True}) self.check_equal(resp.status_code, 202) - # check that two mirror updates cannot run at the same time + # check that two mirror updates are queuedd resp2 = self.put("/api/mirrors/" + mirror_name, json=mirror_desc, params={'_async': True}) - self.check_equal(resp2.status_code, 409) + self.check_equal(resp2.status_code, 202) return resp.json()['ID'], mirror_name diff --git a/task/list.go b/task/list.go index 453e4db4..e72faba6 100644 --- a/task/list.go +++ b/task/list.go @@ -17,6 +17,10 @@ type List struct { // resources currently used by running tasks usedResources *ResourcesSet idCounter int + + queue chan *Task + queueWg *sync.WaitGroup + queueDone chan bool } // NewList creates empty task list @@ -27,10 +31,71 @@ func NewList() *List { wgTasks: make(map[int]*sync.WaitGroup), wg: &sync.WaitGroup{}, usedResources: NewResourcesSet(), + queue: make(chan *Task, 0), + queueWg: &sync.WaitGroup{}, + queueDone: make(chan bool), } + go list.consumer() return list } +// consumer is processing the queue +func (list *List) consumer() { + for { + select { + case task := <-list.queue: + list.Lock() + { + task.State = RUNNING + } + list.Unlock() + + go func() { + retValue, err := task.process(aptly.Progress(task.output), task.detail) + + list.Lock() + { + task.processReturnValue = retValue + if err != nil { + task.output.Printf("Task failed with error: %v", err) + task.State = FAILED + } else { + task.output.Print("Task succeeded") + task.State = SUCCEEDED + } + + list.usedResources.Free(task.resources) + + task.wgTask.Done() + list.wg.Done() + + for _, t := range list.tasks { + if t.State == IDLE { + // check resources + blockingTasks := list.usedResources.UsedBy(t.resources) + if len(blockingTasks) == 0 { + list.usedResources.MarkInUse(task.resources, task) + list.queue <- t + break + } + } + } + } + list.Unlock() + }() + + case <-list.queueDone: + return + } + } +} + +// Stop signals the consumer to stop processing tasks and waits for it to finish +func (list *List) Stop() { + close(list.queueDone) + list.queueWg.Wait() +} + // GetTasks gets complete list of tasks func (list *List) GetTasks() []Task { tasks := []Task{} @@ -123,55 +188,23 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P list.Lock() defer list.Unlock() - tasks := list.usedResources.UsedBy(resources) - for len(tasks) > 0 { - for _, task := range tasks { - list.Unlock() - list.wgTasks[task.ID].Wait() - list.Lock() - } - tasks = list.usedResources.UsedBy(resources) - } - list.idCounter++ wgTask := &sync.WaitGroup{} - task := NewTask(process, name, list.idCounter) + task := NewTask(process, name, list.idCounter, resources, wgTask) list.tasks = append(list.tasks, task) list.wgTasks[task.ID] = wgTask - list.usedResources.MarkInUse(resources, task) list.wg.Add(1) - wgTask.Add(1) + task.wgTask.Add(1) - go func() { - - list.Lock() - { - task.State = RUNNING - } - list.Unlock() - - retValue, err := process(aptly.Progress(task.output), task.detail) - - list.Lock() - { - task.processReturnValue = retValue - if err != nil { - task.output.Printf("Task failed with error: %v", err) - task.State = FAILED - } else { - task.output.Print("Task succeeded") - task.State = SUCCEEDED - } - - list.usedResources.Free(resources) - - wgTask.Done() - list.wg.Done() - } - list.Unlock() - }() + // add task to queue for processing if resources are available + // if not, task will be queued by the consumer once resources are available + tasks := list.usedResources.UsedBy(resources) + if len(tasks) == 0 { + list.usedResources.MarkInUse(task.resources, task) + list.queue <- task + } return *task, nil } diff --git a/task/list_test.go b/task/list_test.go index 297e97e2..bf5d3efc 100644 --- a/task/list_test.go +++ b/task/list_test.go @@ -50,4 +50,5 @@ func (s *ListSuite) TestList(c *check.C) { c.Check(detail, check.Equals, "Details") _, deleteErr := list.DeleteTaskByID(task.ID) c.Check(deleteErr, check.IsNil) + list.Stop() } diff --git a/task/task.go b/task/task.go index 54da0e3c..ed43d329 100644 --- a/task/task.go +++ b/task/task.go @@ -1,6 +1,7 @@ package task import ( + "sync" "sync/atomic" "github.com/aptly-dev/aptly/aptly" @@ -49,17 +50,21 @@ type Task struct { Name string ID int State State + resources []string + wgTask *sync.WaitGroup } // NewTask creates new task -func NewTask(process Process, name string, ID int) *Task { +func NewTask(process Process, name string, ID int, resources []string, wgTask *sync.WaitGroup) *Task { task := &Task{ - output: NewOutput(), - detail: &Detail{}, - process: process, - Name: name, - ID: ID, - State: IDLE, + output: NewOutput(), + detail: &Detail{}, + process: process, + Name: name, + ID: ID, + State: IDLE, + resources: resources, + wgTask: wgTask, } return task }