diff --git a/task/list.go b/task/list.go index 5b9e9395..6a1a720d 100644 --- a/task/list.go +++ b/task/list.go @@ -44,28 +44,30 @@ func (list *List) consumer() { for { select { case task := <-list.queue: + // Set task state to RUNNING before processing list.Lock() - { - task.State = RUNNING - } + task.State = RUNNING list.Unlock() go func() { retValue, err := task.process(aptly.Progress(task.output), task.detail) + // Update task completion state and cleanup with list lock held list.Lock() { - task.processReturnValue = retValue - task.err = err if err != nil { task.output.Printf("Task failed with error: %v", err) task.State = FAILED + task.err = err + task.processReturnValue = retValue } else { task.output.Print("Task succeeded") task.State = SUCCEEDED + task.err = nil + task.processReturnValue = retValue } - list.usedResources.Free(task.resources) + list.usedResources.Free(task.Resources) task.wgTask.Done() list.wg.Done() @@ -74,9 +76,9 @@ func (list *List) consumer() { for _, t := range list.tasks { if t.State == IDLE { // check resources - blockingTasks := list.usedResources.UsedBy(t.resources) + blockingTasks := list.usedResources.UsedBy(t.Resources) if len(blockingTasks) == 0 { - list.usedResources.MarkInUse(t.resources, t) + list.usedResources.MarkInUse(t.Resources, t) // unlock list since queueing may block list.Unlock() unlocked = true @@ -105,13 +107,15 @@ func (list *List) Stop() { // GetTasks gets complete list of tasks func (list *List) GetTasks() []Task { - tasks := []Task{} list.Lock() + defer list.Unlock() + + tasks := []Task{} for _, task := range list.tasks { + // Copy task while holding list lock tasks = append(tasks, *task) } - list.Unlock() return tasks } @@ -139,11 +143,11 @@ func (list *List) DeleteTaskByID(ID int) (Task, error) { // GetTaskByID returns task with given id func (list *List) GetTaskByID(ID int) (Task, error) { list.Lock() - tasks := list.tasks - list.Unlock() + defer list.Unlock() - for _, task := range tasks { + for _, task := range list.tasks { if task.ID == ID { + // Copy task while holding list lock return *task, nil } } @@ -180,13 +184,16 @@ func (list *List) GetTaskDetailByID(ID int) (interface{}, error) { // GetTaskReturnValueByID returns process return value of task with given id func (list *List) GetTaskReturnValueByID(ID int) (*ProcessReturnValue, error) { - task, err := list.GetTaskByID(ID) + list.Lock() + defer list.Unlock() - if err != nil { - return nil, err + for _, task := range list.tasks { + if task.ID == ID { + return task.processReturnValue, nil + } } - return task.processReturnValue, nil + return nil, fmt.Errorf("could not find task with id %v", ID) } // RunTaskInBackground creates task and runs it in background. This will block until the necessary resources @@ -204,11 +211,15 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P list.wg.Add(1) task.wgTask.Add(1) + // Copy task while still holding the lock to avoid racing with consumer + // setting State=RUNNING after receiving from queue + taskCopy := *task + // add task to queue for processing if resources are available // if not, task will be queued by the consumer once resources are available tasks := list.usedResources.UsedBy(resources) if len(tasks) == 0 { - list.usedResources.MarkInUse(task.resources, task) + list.usedResources.MarkInUse(task.Resources, task) // queueing task might block if channel not ready, unlock list before queueing list.Unlock() list.queue <- task @@ -216,12 +227,13 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P list.Unlock() } - return *task, nil + return taskCopy, nil } // Clear removes finished tasks from list func (list *List) Clear() { list.Lock() + defer list.Unlock() var tasks []*Task for _, task := range list.tasks { @@ -230,8 +242,6 @@ func (list *List) Clear() { } } list.tasks = tasks - - list.Unlock() } // Wait waits till all tasks are processed @@ -254,11 +264,14 @@ func (list *List) WaitForTaskByID(ID int) (Task, error) { // GetTaskErrorByID returns the Task error for a given id func (list *List) GetTaskErrorByID(ID int) (error, error) { - task, err := list.GetTaskByID(ID) + list.Lock() + defer list.Unlock() - if err != nil { - return nil, err + for _, task := range list.tasks { + if task.ID == ID { + return task.err, nil + } } - return task.err, nil + return nil, fmt.Errorf("could not find task with id %v", ID) } diff --git a/task/task.go b/task/task.go index 02aa7037..72f60699 100644 --- a/task/task.go +++ b/task/task.go @@ -42,6 +42,7 @@ const ( ) // Task represents as task in a queue encapsulates process code +// All fields are protected by List.Mutex - access task fields only while holding list.Lock() type Task struct { output *Output detail *Detail @@ -51,7 +52,7 @@ type Task struct { Name string ID int State State - resources []string + Resources []string wgTask *sync.WaitGroup } @@ -64,7 +65,7 @@ func NewTask(process Process, name string, ID int, resources []string, wgTask *s Name: name, ID: ID, State: IDLE, - resources: resources, + Resources: resources, wgTask: wgTask, } return task