Files
aptly/task/list.go
T
André Roth 4f339be879 fix(task): Eliminate data race in RunTaskInBackground return value
RunTaskInBackground() previously returned *task AFTER releasing list.Lock()
and sending the task to the consumer queue. This created a data race:

  1. list.queue <- task  (consumer receives)
  2. Consumer: list.Lock() → task.State = RUNNING → list.Unlock()
  3. RunTaskInBackground: return *task  (struct copy WITHOUT lock)

Steps 2 and 3 can execute concurrently — consumer writes task.State
while RunTaskInBackground reads the entire struct via copy.

Fix: Copy the task struct BEFORE unlocking, while list.Lock() is still
held. At this point the task was just created and no other goroutine can
access it, so the copy is guaranteed consistent (always State=IDLE).

The returned copy is a snapshot of the initial task state, which is what
callers expect — the task ID and name for tracking purposes.

Safety invariant maintained:
  - I4: All struct copies happen while list.Lock() is held

Changes:
  - task/list.go: RunTaskInBackground() copies *task before unlock,
    returns the pre-made copy instead of dereferencing after unlock
2026-05-26 00:29:46 +02:00

278 lines
6.3 KiB
Go

package task
import (
"fmt"
"sync"
"github.com/aptly-dev/aptly/aptly"
)
// List is handling list of processes and makes sure
// only one process is executed at the time
type List struct {
*sync.Mutex
tasks []*Task
wgTasks map[int]*sync.WaitGroup
wg *sync.WaitGroup
// resources currently used by running tasks
usedResources *ResourcesSet
idCounter int
queue chan *Task
queueWg *sync.WaitGroup
queueDone chan bool
}
// NewList creates empty task list
func NewList() *List {
list := &List{
Mutex: &sync.Mutex{},
tasks: make([]*Task, 0),
wgTasks: make(map[int]*sync.WaitGroup),
wg: &sync.WaitGroup{},
usedResources: NewResourcesSet(),
queue: make(chan *Task),
queueWg: &sync.WaitGroup{},
queueDone: make(chan bool),
}
go list.consumer()
return list
}
// consumer is processing the queue
func (list *List) consumer() {
for {
select {
case task := <-list.queue:
// Set task state to RUNNING before processing
list.Lock()
task.State = RUNNING
list.Unlock()
go func() {
retValue, err := task.process(aptly.Progress(task.output), task.detail)
// Update task completion state and cleanup with list lock held
list.Lock()
{
if err != nil {
task.output.Printf("Task failed with error: %v", err)
task.State = FAILED
task.err = err
task.processReturnValue = retValue
} else {
task.output.Print("Task succeeded")
task.State = SUCCEEDED
task.err = nil
task.processReturnValue = retValue
}
list.usedResources.Free(task.resources)
task.wgTask.Done()
list.wg.Done()
unlocked := false
for _, t := range list.tasks {
if t.State == IDLE {
// check resources
blockingTasks := list.usedResources.UsedBy(t.resources)
if len(blockingTasks) == 0 {
list.usedResources.MarkInUse(t.resources, t)
// unlock list since queueing may block
list.Unlock()
unlocked = true
list.queue <- t
break
}
}
}
if !unlocked {
list.Unlock()
}
}
}()
case <-list.queueDone:
return
}
}
}
// Stop signals the consumer to stop processing tasks and waits for it to finish
func (list *List) Stop() {
close(list.queueDone)
list.queueWg.Wait()
}
// GetTasks gets complete list of tasks
func (list *List) GetTasks() []Task {
list.Lock()
defer list.Unlock()
tasks := []Task{}
for _, task := range list.tasks {
// Copy task while holding list lock
tasks = append(tasks, *task)
}
return tasks
}
// DeleteTaskByID deletes given task from list. Only finished
// tasks can be deleted.
func (list *List) DeleteTaskByID(ID int) (Task, error) {
list.Lock()
defer list.Unlock()
tasks := list.tasks
for i, task := range tasks {
if task.ID == ID {
if task.State == SUCCEEDED || task.State == FAILED {
list.tasks = append(tasks[:i], tasks[i+1:]...)
return *task, nil
}
return *task, fmt.Errorf("task with id %v is still in state=%d", ID, task.State)
}
}
return Task{}, fmt.Errorf("could not find task with id %v", ID)
}
// GetTaskByID returns task with given id
func (list *List) GetTaskByID(ID int) (Task, error) {
list.Lock()
defer list.Unlock()
for _, task := range list.tasks {
if task.ID == ID {
// Copy task while holding list lock
return *task, nil
}
}
return Task{}, fmt.Errorf("could not find task with id %v", ID)
}
// GetTaskOutputByID returns standard output of task with given id
func (list *List) GetTaskOutputByID(ID int) (string, error) {
task, err := list.GetTaskByID(ID)
if err != nil {
return "", err
}
return task.output.String(), nil
}
// GetTaskDetailByID returns detail of task with given id
func (list *List) GetTaskDetailByID(ID int) (interface{}, error) {
task, err := list.GetTaskByID(ID)
if err != nil {
return nil, err
}
detail := task.detail.Load()
if detail == nil {
return struct{}{}, nil
}
return detail, nil
}
// GetTaskReturnValueByID returns process return value of task with given id
func (list *List) GetTaskReturnValueByID(ID int) (*ProcessReturnValue, error) {
list.Lock()
defer list.Unlock()
for _, task := range list.tasks {
if task.ID == ID {
return task.processReturnValue, nil
}
}
return nil, fmt.Errorf("could not find task with id %v", ID)
}
// RunTaskInBackground creates task and runs it in background. This will block until the necessary resources
// become available.
func (list *List) RunTaskInBackground(name string, resources []string, process Process) (Task, *ResourceConflictError) {
list.Lock()
list.idCounter++
wgTask := &sync.WaitGroup{}
task := NewTask(process, name, list.idCounter, resources, wgTask)
list.tasks = append(list.tasks, task)
list.wgTasks[task.ID] = wgTask
list.wg.Add(1)
task.wgTask.Add(1)
// Copy task while still holding the lock to avoid racing with consumer
// setting State=RUNNING after receiving from queue
taskCopy := *task
// add task to queue for processing if resources are available
// if not, task will be queued by the consumer once resources are available
tasks := list.usedResources.UsedBy(resources)
if len(tasks) == 0 {
list.usedResources.MarkInUse(task.resources, task)
// queueing task might block if channel not ready, unlock list before queueing
list.Unlock()
list.queue <- task
} else {
list.Unlock()
}
return taskCopy, nil
}
// Clear removes finished tasks from list
func (list *List) Clear() {
list.Lock()
defer list.Unlock()
var tasks []*Task
for _, task := range list.tasks {
if task.State == IDLE || task.State == RUNNING {
tasks = append(tasks, task)
}
}
list.tasks = tasks
}
// Wait waits till all tasks are processed
func (list *List) Wait() {
list.wg.Wait()
}
// WaitForTaskByID waits for task with given id to be processed
func (list *List) WaitForTaskByID(ID int) (Task, error) {
list.Lock()
wgTask, ok := list.wgTasks[ID]
list.Unlock()
if !ok {
return Task{}, fmt.Errorf("could not find task with id %v", ID)
}
wgTask.Wait()
return list.GetTaskByID(ID)
}
// GetTaskErrorByID returns the Task error for a given id
func (list *List) GetTaskErrorByID(ID int) (error, error) {
list.Lock()
defer list.Unlock()
for _, task := range list.tasks {
if task.ID == ID {
return task.err, nil
}
}
return nil, fmt.Errorf("could not find task with id %v", ID)
}