mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-06-12 06:30:35 +00:00
2a99fdfcf1
* show resources in task details * fix task state locking * return task object consistently Race condition iexisted where task State, err, and processReturnValue fields were written by consumer goroutine and read by concurrent accessors without proper synchronization, causing torn reads and data races.
278 lines
6.3 KiB
Go
278 lines
6.3 KiB
Go
package task
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/aptly-dev/aptly/aptly"
|
|
)
|
|
|
|
// List is handling list of processes and makes sure
|
|
// only one process is executed at the time
|
|
type List struct {
|
|
*sync.Mutex
|
|
tasks []*Task
|
|
wgTasks map[int]*sync.WaitGroup
|
|
wg *sync.WaitGroup
|
|
// resources currently used by running tasks
|
|
usedResources *ResourcesSet
|
|
idCounter int
|
|
|
|
queue chan *Task
|
|
queueWg *sync.WaitGroup
|
|
queueDone chan bool
|
|
}
|
|
|
|
// NewList creates empty task list
|
|
func NewList() *List {
|
|
list := &List{
|
|
Mutex: &sync.Mutex{},
|
|
tasks: make([]*Task, 0),
|
|
wgTasks: make(map[int]*sync.WaitGroup),
|
|
wg: &sync.WaitGroup{},
|
|
usedResources: NewResourcesSet(),
|
|
queue: make(chan *Task),
|
|
queueWg: &sync.WaitGroup{},
|
|
queueDone: make(chan bool),
|
|
}
|
|
go list.consumer()
|
|
return list
|
|
}
|
|
|
|
// consumer is processing the queue
|
|
func (list *List) consumer() {
|
|
for {
|
|
select {
|
|
case task := <-list.queue:
|
|
// Set task state to RUNNING before processing
|
|
list.Lock()
|
|
task.State = RUNNING
|
|
list.Unlock()
|
|
|
|
go func() {
|
|
retValue, err := task.process(aptly.Progress(task.output), task.detail)
|
|
|
|
// Update task completion state and cleanup with list lock held
|
|
list.Lock()
|
|
{
|
|
if err != nil {
|
|
task.output.Printf("Task failed with error: %v", err)
|
|
task.State = FAILED
|
|
task.err = err
|
|
task.processReturnValue = retValue
|
|
} else {
|
|
task.output.Print("Task succeeded")
|
|
task.State = SUCCEEDED
|
|
task.err = nil
|
|
task.processReturnValue = retValue
|
|
}
|
|
|
|
list.usedResources.Free(task.Resources)
|
|
|
|
task.wgTask.Done()
|
|
list.wg.Done()
|
|
|
|
unlocked := false
|
|
for _, t := range list.tasks {
|
|
if t.State == IDLE {
|
|
// check resources
|
|
blockingTasks := list.usedResources.UsedBy(t.Resources)
|
|
if len(blockingTasks) == 0 {
|
|
list.usedResources.MarkInUse(t.Resources, t)
|
|
// unlock list since queueing may block
|
|
list.Unlock()
|
|
unlocked = true
|
|
list.queue <- t
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if !unlocked {
|
|
list.Unlock()
|
|
}
|
|
}
|
|
}()
|
|
|
|
case <-list.queueDone:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop signals the consumer to stop processing tasks and waits for it to finish
|
|
func (list *List) Stop() {
|
|
close(list.queueDone)
|
|
list.queueWg.Wait()
|
|
}
|
|
|
|
// GetTasks gets complete list of tasks
|
|
func (list *List) GetTasks() []Task {
|
|
list.Lock()
|
|
defer list.Unlock()
|
|
|
|
tasks := []Task{}
|
|
for _, task := range list.tasks {
|
|
// Copy task while holding list lock
|
|
tasks = append(tasks, *task)
|
|
}
|
|
|
|
return tasks
|
|
}
|
|
|
|
// DeleteTaskByID deletes given task from list. Only finished
|
|
// tasks can be deleted.
|
|
func (list *List) DeleteTaskByID(ID int) (Task, error) {
|
|
list.Lock()
|
|
defer list.Unlock()
|
|
|
|
tasks := list.tasks
|
|
for i, task := range tasks {
|
|
if task.ID == ID {
|
|
if task.State == SUCCEEDED || task.State == FAILED {
|
|
list.tasks = append(tasks[:i], tasks[i+1:]...)
|
|
return *task, nil
|
|
}
|
|
|
|
return *task, fmt.Errorf("task with id %v is still in state=%d", ID, task.State)
|
|
}
|
|
}
|
|
|
|
return Task{}, fmt.Errorf("could not find task with id %v", ID)
|
|
}
|
|
|
|
// GetTaskByID returns task with given id
|
|
func (list *List) GetTaskByID(ID int) (Task, error) {
|
|
list.Lock()
|
|
defer list.Unlock()
|
|
|
|
for _, task := range list.tasks {
|
|
if task.ID == ID {
|
|
// Copy task while holding list lock
|
|
return *task, nil
|
|
}
|
|
}
|
|
|
|
return Task{}, fmt.Errorf("could not find task with id %v", ID)
|
|
}
|
|
|
|
// GetTaskOutputByID returns standard output of task with given id
|
|
func (list *List) GetTaskOutputByID(ID int) (string, error) {
|
|
task, err := list.GetTaskByID(ID)
|
|
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return task.output.String(), nil
|
|
}
|
|
|
|
// GetTaskDetailByID returns detail of task with given id
|
|
func (list *List) GetTaskDetailByID(ID int) (interface{}, error) {
|
|
task, err := list.GetTaskByID(ID)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
detail := task.detail.Load()
|
|
if detail == nil {
|
|
return struct{}{}, nil
|
|
}
|
|
|
|
return detail, nil
|
|
}
|
|
|
|
// GetTaskReturnValueByID returns process return value of task with given id
|
|
func (list *List) GetTaskReturnValueByID(ID int) (*ProcessReturnValue, error) {
|
|
list.Lock()
|
|
defer list.Unlock()
|
|
|
|
for _, task := range list.tasks {
|
|
if task.ID == ID {
|
|
return task.processReturnValue, nil
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("could not find task with id %v", ID)
|
|
}
|
|
|
|
// RunTaskInBackground creates task and runs it in background. This will block until the necessary resources
|
|
// become available.
|
|
func (list *List) RunTaskInBackground(name string, resources []string, process Process) (Task, *ResourceConflictError) {
|
|
list.Lock()
|
|
|
|
list.idCounter++
|
|
wgTask := &sync.WaitGroup{}
|
|
task := NewTask(process, name, list.idCounter, resources, wgTask)
|
|
|
|
list.tasks = append(list.tasks, task)
|
|
list.wgTasks[task.ID] = wgTask
|
|
|
|
list.wg.Add(1)
|
|
task.wgTask.Add(1)
|
|
|
|
// Copy task while still holding the lock to avoid racing with consumer
|
|
// setting State=RUNNING after receiving from queue
|
|
taskCopy := *task
|
|
|
|
// add task to queue for processing if resources are available
|
|
// if not, task will be queued by the consumer once resources are available
|
|
tasks := list.usedResources.UsedBy(resources)
|
|
if len(tasks) == 0 {
|
|
list.usedResources.MarkInUse(task.Resources, task)
|
|
// queueing task might block if channel not ready, unlock list before queueing
|
|
list.Unlock()
|
|
list.queue <- task
|
|
} else {
|
|
list.Unlock()
|
|
}
|
|
|
|
return taskCopy, nil
|
|
}
|
|
|
|
// Clear removes finished tasks from list
|
|
func (list *List) Clear() {
|
|
list.Lock()
|
|
defer list.Unlock()
|
|
|
|
var tasks []*Task
|
|
for _, task := range list.tasks {
|
|
if task.State == IDLE || task.State == RUNNING {
|
|
tasks = append(tasks, task)
|
|
}
|
|
}
|
|
list.tasks = tasks
|
|
}
|
|
|
|
// Wait waits till all tasks are processed
|
|
func (list *List) Wait() {
|
|
list.wg.Wait()
|
|
}
|
|
|
|
// WaitForTaskByID waits for task with given id to be processed
|
|
func (list *List) WaitForTaskByID(ID int) (Task, error) {
|
|
list.Lock()
|
|
wgTask, ok := list.wgTasks[ID]
|
|
list.Unlock()
|
|
if !ok {
|
|
return Task{}, fmt.Errorf("could not find task with id %v", ID)
|
|
}
|
|
|
|
wgTask.Wait()
|
|
return list.GetTaskByID(ID)
|
|
}
|
|
|
|
// GetTaskErrorByID returns the Task error for a given id
|
|
func (list *List) GetTaskErrorByID(ID int) (error, error) {
|
|
list.Lock()
|
|
defer list.Unlock()
|
|
|
|
for _, task := range list.tasks {
|
|
if task.ID == ID {
|
|
return task.err, nil
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("could not find task with id %v", ID)
|
|
}
|