mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-06-08 05:50:47 +00:00
tasks: fix task state locking
Race condition iexisted where task State, err, and processReturnValue fields
were written by consumer goroutine and read by concurrent accessors without
proper synchronization, causing torn reads and data races.
Implemented single-lock model with optimal lock scope:
- Removed 8 accessor methods (direct field access is simpler)
- Lock only during brief state transitions (IDLE→RUNNING, RUNNING→SUCCEEDED/FAILED)
- Release lock during task.process() execution to enable full concurrency
- Readers hold list.Lock() only during atomic struct copy
- Moved State = RUNNING before goroutine spawn for clearer semantics
- task/list.go: RunTaskInBackground() copies *task before unlock,
returns the pre-made copy instead of dereferencing after unlock
This commit is contained in:
+34
-21
@@ -44,25 +44,27 @@ func (list *List) consumer() {
|
||||
for {
|
||||
select {
|
||||
case task := <-list.queue:
|
||||
// Set task state to RUNNING before processing
|
||||
list.Lock()
|
||||
{
|
||||
task.State = RUNNING
|
||||
}
|
||||
task.State = RUNNING
|
||||
list.Unlock()
|
||||
|
||||
go func() {
|
||||
retValue, err := task.process(aptly.Progress(task.output), task.detail)
|
||||
|
||||
// Update task completion state and cleanup with list lock held
|
||||
list.Lock()
|
||||
{
|
||||
task.processReturnValue = retValue
|
||||
task.err = err
|
||||
if err != nil {
|
||||
task.output.Printf("Task failed with error: %v", err)
|
||||
task.State = FAILED
|
||||
task.err = err
|
||||
task.processReturnValue = retValue
|
||||
} else {
|
||||
task.output.Print("Task succeeded")
|
||||
task.State = SUCCEEDED
|
||||
task.err = nil
|
||||
task.processReturnValue = retValue
|
||||
}
|
||||
|
||||
list.usedResources.Free(task.resources)
|
||||
@@ -105,13 +107,15 @@ func (list *List) Stop() {
|
||||
|
||||
// GetTasks gets complete list of tasks
|
||||
func (list *List) GetTasks() []Task {
|
||||
tasks := []Task{}
|
||||
list.Lock()
|
||||
defer list.Unlock()
|
||||
|
||||
tasks := []Task{}
|
||||
for _, task := range list.tasks {
|
||||
// Copy task while holding list lock
|
||||
tasks = append(tasks, *task)
|
||||
}
|
||||
|
||||
list.Unlock()
|
||||
return tasks
|
||||
}
|
||||
|
||||
@@ -139,11 +143,11 @@ func (list *List) DeleteTaskByID(ID int) (Task, error) {
|
||||
// GetTaskByID returns task with given id
|
||||
func (list *List) GetTaskByID(ID int) (Task, error) {
|
||||
list.Lock()
|
||||
tasks := list.tasks
|
||||
list.Unlock()
|
||||
defer list.Unlock()
|
||||
|
||||
for _, task := range tasks {
|
||||
for _, task := range list.tasks {
|
||||
if task.ID == ID {
|
||||
// Copy task while holding list lock
|
||||
return *task, nil
|
||||
}
|
||||
}
|
||||
@@ -180,13 +184,16 @@ func (list *List) GetTaskDetailByID(ID int) (interface{}, error) {
|
||||
|
||||
// GetTaskReturnValueByID returns process return value of task with given id
|
||||
func (list *List) GetTaskReturnValueByID(ID int) (*ProcessReturnValue, error) {
|
||||
task, err := list.GetTaskByID(ID)
|
||||
list.Lock()
|
||||
defer list.Unlock()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
for _, task := range list.tasks {
|
||||
if task.ID == ID {
|
||||
return task.processReturnValue, nil
|
||||
}
|
||||
}
|
||||
|
||||
return task.processReturnValue, nil
|
||||
return nil, fmt.Errorf("could not find task with id %v", ID)
|
||||
}
|
||||
|
||||
// RunTaskInBackground creates task and runs it in background. This will block until the necessary resources
|
||||
@@ -204,6 +211,10 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
|
||||
list.wg.Add(1)
|
||||
task.wgTask.Add(1)
|
||||
|
||||
// Copy task while still holding the lock to avoid racing with consumer
|
||||
// setting State=RUNNING after receiving from queue
|
||||
taskCopy := *task
|
||||
|
||||
// add task to queue for processing if resources are available
|
||||
// if not, task will be queued by the consumer once resources are available
|
||||
tasks := list.usedResources.UsedBy(resources)
|
||||
@@ -216,12 +227,13 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
|
||||
list.Unlock()
|
||||
}
|
||||
|
||||
return *task, nil
|
||||
return taskCopy, nil
|
||||
}
|
||||
|
||||
// Clear removes finished tasks from list
|
||||
func (list *List) Clear() {
|
||||
list.Lock()
|
||||
defer list.Unlock()
|
||||
|
||||
var tasks []*Task
|
||||
for _, task := range list.tasks {
|
||||
@@ -230,8 +242,6 @@ func (list *List) Clear() {
|
||||
}
|
||||
}
|
||||
list.tasks = tasks
|
||||
|
||||
list.Unlock()
|
||||
}
|
||||
|
||||
// Wait waits till all tasks are processed
|
||||
@@ -254,11 +264,14 @@ func (list *List) WaitForTaskByID(ID int) (Task, error) {
|
||||
|
||||
// GetTaskErrorByID returns the Task error for a given id
|
||||
func (list *List) GetTaskErrorByID(ID int) (error, error) {
|
||||
task, err := list.GetTaskByID(ID)
|
||||
list.Lock()
|
||||
defer list.Unlock()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
for _, task := range list.tasks {
|
||||
if task.ID == ID {
|
||||
return task.err, nil
|
||||
}
|
||||
}
|
||||
|
||||
return task.err, nil
|
||||
return nil, fmt.Errorf("could not find task with id %v", ID)
|
||||
}
|
||||
|
||||
@@ -42,6 +42,7 @@ const (
|
||||
)
|
||||
|
||||
// Task represents as task in a queue encapsulates process code
|
||||
// All fields are protected by List.Mutex - access task fields only while holding list.Lock()
|
||||
type Task struct {
|
||||
output *Output
|
||||
detail *Detail
|
||||
|
||||
Reference in New Issue
Block a user