From 2a99fdfcf1321af1cadd0470232383fb6cc0764a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Mon, 25 May 2026 15:39:48 +0000 Subject: [PATCH] tasks: fix race conditions * show resources in task details * fix task state locking * return task object consistently Race condition iexisted where task State, err, and processReturnValue fields were written by consumer goroutine and read by concurrent accessors without proper synchronization, causing torn reads and data races. --- task/list.go | 63 +++++++++++++++++++++++++++++++--------------------- task/task.go | 5 +++-- 2 files changed, 41 insertions(+), 27 deletions(-) diff --git a/task/list.go b/task/list.go index 5b9e9395..6a1a720d 100644 --- a/task/list.go +++ b/task/list.go @@ -44,28 +44,30 @@ func (list *List) consumer() { for { select { case task := <-list.queue: + // Set task state to RUNNING before processing list.Lock() - { - task.State = RUNNING - } + task.State = RUNNING list.Unlock() go func() { retValue, err := task.process(aptly.Progress(task.output), task.detail) + // Update task completion state and cleanup with list lock held list.Lock() { - task.processReturnValue = retValue - task.err = err if err != nil { task.output.Printf("Task failed with error: %v", err) task.State = FAILED + task.err = err + task.processReturnValue = retValue } else { task.output.Print("Task succeeded") task.State = SUCCEEDED + task.err = nil + task.processReturnValue = retValue } - list.usedResources.Free(task.resources) + list.usedResources.Free(task.Resources) task.wgTask.Done() list.wg.Done() @@ -74,9 +76,9 @@ func (list *List) consumer() { for _, t := range list.tasks { if t.State == IDLE { // check resources - blockingTasks := list.usedResources.UsedBy(t.resources) + blockingTasks := list.usedResources.UsedBy(t.Resources) if len(blockingTasks) == 0 { - list.usedResources.MarkInUse(t.resources, t) + list.usedResources.MarkInUse(t.Resources, t) // unlock list since queueing may block list.Unlock() unlocked = true @@ -105,13 +107,15 @@ func (list *List) Stop() { // GetTasks gets complete list of tasks func (list *List) GetTasks() []Task { - tasks := []Task{} list.Lock() + defer list.Unlock() + + tasks := []Task{} for _, task := range list.tasks { + // Copy task while holding list lock tasks = append(tasks, *task) } - list.Unlock() return tasks } @@ -139,11 +143,11 @@ func (list *List) DeleteTaskByID(ID int) (Task, error) { // GetTaskByID returns task with given id func (list *List) GetTaskByID(ID int) (Task, error) { list.Lock() - tasks := list.tasks - list.Unlock() + defer list.Unlock() - for _, task := range tasks { + for _, task := range list.tasks { if task.ID == ID { + // Copy task while holding list lock return *task, nil } } @@ -180,13 +184,16 @@ func (list *List) GetTaskDetailByID(ID int) (interface{}, error) { // GetTaskReturnValueByID returns process return value of task with given id func (list *List) GetTaskReturnValueByID(ID int) (*ProcessReturnValue, error) { - task, err := list.GetTaskByID(ID) + list.Lock() + defer list.Unlock() - if err != nil { - return nil, err + for _, task := range list.tasks { + if task.ID == ID { + return task.processReturnValue, nil + } } - return task.processReturnValue, nil + return nil, fmt.Errorf("could not find task with id %v", ID) } // RunTaskInBackground creates task and runs it in background. This will block until the necessary resources @@ -204,11 +211,15 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P list.wg.Add(1) task.wgTask.Add(1) + // Copy task while still holding the lock to avoid racing with consumer + // setting State=RUNNING after receiving from queue + taskCopy := *task + // add task to queue for processing if resources are available // if not, task will be queued by the consumer once resources are available tasks := list.usedResources.UsedBy(resources) if len(tasks) == 0 { - list.usedResources.MarkInUse(task.resources, task) + list.usedResources.MarkInUse(task.Resources, task) // queueing task might block if channel not ready, unlock list before queueing list.Unlock() list.queue <- task @@ -216,12 +227,13 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P list.Unlock() } - return *task, nil + return taskCopy, nil } // Clear removes finished tasks from list func (list *List) Clear() { list.Lock() + defer list.Unlock() var tasks []*Task for _, task := range list.tasks { @@ -230,8 +242,6 @@ func (list *List) Clear() { } } list.tasks = tasks - - list.Unlock() } // Wait waits till all tasks are processed @@ -254,11 +264,14 @@ func (list *List) WaitForTaskByID(ID int) (Task, error) { // GetTaskErrorByID returns the Task error for a given id func (list *List) GetTaskErrorByID(ID int) (error, error) { - task, err := list.GetTaskByID(ID) + list.Lock() + defer list.Unlock() - if err != nil { - return nil, err + for _, task := range list.tasks { + if task.ID == ID { + return task.err, nil + } } - return task.err, nil + return nil, fmt.Errorf("could not find task with id %v", ID) } diff --git a/task/task.go b/task/task.go index 02aa7037..72f60699 100644 --- a/task/task.go +++ b/task/task.go @@ -42,6 +42,7 @@ const ( ) // Task represents as task in a queue encapsulates process code +// All fields are protected by List.Mutex - access task fields only while holding list.Lock() type Task struct { output *Output detail *Detail @@ -51,7 +52,7 @@ type Task struct { Name string ID int State State - resources []string + Resources []string wgTask *sync.WaitGroup } @@ -64,7 +65,7 @@ func NewTask(process Process, name string, ID int, resources []string, wgTask *s Name: name, ID: ID, State: IDLE, - resources: resources, + Resources: resources, wgTask: wgTask, } return task