implement task queue waiting for resources

This commit is contained in:
André Roth
2024-06-06 17:10:39 +02:00
parent 2d97ba2bbd
commit 45035802be
5 changed files with 92 additions and 50 deletions

View File

@@ -17,6 +17,10 @@ type List struct {
// resources currently used by running tasks
usedResources *ResourcesSet
idCounter int
queue chan *Task
queueWg *sync.WaitGroup
queueDone chan bool
}
// NewList creates empty task list
@@ -27,10 +31,71 @@ func NewList() *List {
wgTasks: make(map[int]*sync.WaitGroup),
wg: &sync.WaitGroup{},
usedResources: NewResourcesSet(),
queue: make(chan *Task, 0),
queueWg: &sync.WaitGroup{},
queueDone: make(chan bool),
}
go list.consumer()
return list
}
// consumer is processing the queue
func (list *List) consumer() {
for {
select {
case task := <-list.queue:
list.Lock()
{
task.State = RUNNING
}
list.Unlock()
go func() {
retValue, err := task.process(aptly.Progress(task.output), task.detail)
list.Lock()
{
task.processReturnValue = retValue
if err != nil {
task.output.Printf("Task failed with error: %v", err)
task.State = FAILED
} else {
task.output.Print("Task succeeded")
task.State = SUCCEEDED
}
list.usedResources.Free(task.resources)
task.wgTask.Done()
list.wg.Done()
for _, t := range list.tasks {
if t.State == IDLE {
// check resources
blockingTasks := list.usedResources.UsedBy(t.resources)
if len(blockingTasks) == 0 {
list.usedResources.MarkInUse(task.resources, task)
list.queue <- t
break
}
}
}
}
list.Unlock()
}()
case <-list.queueDone:
return
}
}
}
// Stop signals the consumer to stop processing tasks and waits for it to finish
func (list *List) Stop() {
close(list.queueDone)
list.queueWg.Wait()
}
// GetTasks gets complete list of tasks
func (list *List) GetTasks() []Task {
tasks := []Task{}
@@ -123,55 +188,23 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
list.Lock()
defer list.Unlock()
tasks := list.usedResources.UsedBy(resources)
for len(tasks) > 0 {
for _, task := range tasks {
list.Unlock()
list.wgTasks[task.ID].Wait()
list.Lock()
}
tasks = list.usedResources.UsedBy(resources)
}
list.idCounter++
wgTask := &sync.WaitGroup{}
task := NewTask(process, name, list.idCounter)
task := NewTask(process, name, list.idCounter, resources, wgTask)
list.tasks = append(list.tasks, task)
list.wgTasks[task.ID] = wgTask
list.usedResources.MarkInUse(resources, task)
list.wg.Add(1)
wgTask.Add(1)
task.wgTask.Add(1)
go func() {
list.Lock()
{
task.State = RUNNING
}
list.Unlock()
retValue, err := process(aptly.Progress(task.output), task.detail)
list.Lock()
{
task.processReturnValue = retValue
if err != nil {
task.output.Printf("Task failed with error: %v", err)
task.State = FAILED
} else {
task.output.Print("Task succeeded")
task.State = SUCCEEDED
}
list.usedResources.Free(resources)
wgTask.Done()
list.wg.Done()
}
list.Unlock()
}()
// add task to queue for processing if resources are available
// if not, task will be queued by the consumer once resources are available
tasks := list.usedResources.UsedBy(resources)
if len(tasks) == 0 {
list.usedResources.MarkInUse(task.resources, task)
list.queue <- task
}
return *task, nil
}

View File

@@ -50,4 +50,5 @@ func (s *ListSuite) TestList(c *check.C) {
c.Check(detail, check.Equals, "Details")
_, deleteErr := list.DeleteTaskByID(task.ID)
c.Check(deleteErr, check.IsNil)
list.Stop()
}

View File

@@ -1,6 +1,7 @@
package task
import (
"sync"
"sync/atomic"
"github.com/aptly-dev/aptly/aptly"
@@ -49,17 +50,21 @@ type Task struct {
Name string
ID int
State State
resources []string
wgTask *sync.WaitGroup
}
// NewTask creates new task
func NewTask(process Process, name string, ID int) *Task {
func NewTask(process Process, name string, ID int, resources []string, wgTask *sync.WaitGroup) *Task {
task := &Task{
output: NewOutput(),
detail: &Detail{},
process: process,
Name: name,
ID: ID,
State: IDLE,
output: NewOutput(),
detail: &Detail{},
process: process,
Name: name,
ID: ID,
State: IDLE,
resources: resources,
wgTask: wgTask,
}
return task
}