Add task api and resource locking ability

This commit is contained in:
Oliver Sauder
2017-05-22 11:51:35 +02:00
committed by Lorenzo Bolla
parent e63d74dff2
commit 6ab5e60833
24 changed files with 1519 additions and 620 deletions

195
task/list.go Normal file
View File

@@ -0,0 +1,195 @@
package task
import (
"fmt"
"sync"
)
// List is handling list of processes and makes sure
// only one process is executed at the time
type List struct {
*sync.Mutex
tasks []*Task
wgTasks map[int]*sync.WaitGroup
wg *sync.WaitGroup
// resources currently used by running tasks
usedResources *ResourcesSet
idCounter int
}
// NewList creates empty task list
func NewList() *List {
list := &List{
Mutex: &sync.Mutex{},
tasks: make([]*Task, 0),
wgTasks: make(map[int]*sync.WaitGroup),
wg: &sync.WaitGroup{},
usedResources: NewResourcesSet(),
}
return list
}
// GetTasks gets complete list of tasks
func (list *List) GetTasks() []Task {
var tasks []Task
list.Lock()
for _, task := range list.tasks {
tasks = append(tasks, *task)
}
list.Unlock()
return tasks
}
// DeleteTaskByID deletes given task from list. Only finished
// tasks can be deleted.
func (list *List) DeleteTaskByID(ID int) (Task, error) {
list.Lock()
defer list.Unlock()
tasks := list.tasks
for i, task := range tasks {
if task.ID == ID {
if task.State == SUCCEEDED || task.State == FAILED {
list.tasks = append(tasks[:i], tasks[i+1:]...)
return *task, nil
}
return *task, fmt.Errorf("Task with id %v is still running", ID)
}
}
return Task{}, fmt.Errorf("Could not find task with id %v", ID)
}
// GetTaskByID returns task with given id
func (list *List) GetTaskByID(ID int) (Task, error) {
list.Lock()
tasks := list.tasks
list.Unlock()
for _, task := range tasks {
if task.ID == ID {
return *task, nil
}
}
return Task{}, fmt.Errorf("Could not find task with id %v", ID)
}
// GetTaskOutputByID returns standard output of task with given id
func (list *List) GetTaskOutputByID(ID int) (string, error) {
task, err := list.GetTaskByID(ID)
if err != nil {
return "", err
}
return task.output.String(), nil
}
// GetTaskDetailByID returns detail of task with given id
func (list *List) GetTaskDetailByID(ID int) (interface{}, error) {
task, err := list.GetTaskByID(ID)
if err != nil {
return nil, err
}
detail := task.detail.Load()
if detail == nil {
return struct{}{}, nil
}
return detail, nil
}
// RunTaskInBackground creates task and runs it in background. It won't be run and an error
// returned if there are running tasks which are using needed resources already.
func (list *List) RunTaskInBackground(name string, resources []string, process Process) (Task, *ResourceConflictError) {
list.Lock()
defer list.Unlock()
tasks := list.usedResources.UsedBy(resources)
if len(tasks) > 0 {
conflictError := &ResourceConflictError{
Tasks: tasks,
Message: "Needed resources are used by other tasks.",
}
return Task{}, conflictError
}
list.idCounter++
wgTask := &sync.WaitGroup{}
task := NewTask(process, name, list.idCounter)
list.tasks = append(list.tasks, task)
list.wgTasks[task.ID] = wgTask
list.usedResources.MarkInUse(resources, task)
list.wg.Add(1)
wgTask.Add(1)
go func() {
list.Lock()
{
task.State = RUNNING
}
list.Unlock()
err := process(task.output, task.detail)
list.Lock()
{
if err != nil {
task.output.Printf("Task failed with error: %v", err)
task.State = FAILED
} else {
task.output.Print("Task succeeded")
task.State = SUCCEEDED
}
list.usedResources.Free(resources)
wgTask.Done()
list.wg.Done()
}
list.Unlock()
}()
return *task, nil
}
// Clear removes finished tasks from list
func (list *List) Clear() {
list.Lock()
var tasks []*Task
for _, task := range list.tasks {
if task.State == IDLE || task.State == RUNNING {
tasks = append(tasks, task)
}
}
list.tasks = tasks
list.Unlock()
}
// Wait waits till all tasks are processed
func (list *List) Wait() {
list.wg.Wait()
}
// WaitForTaskByID waits for task with given id to be processed
func (list *List) WaitForTaskByID(ID int) (Task, error) {
list.Lock()
wgTask, ok := list.wgTasks[ID]
list.Unlock()
if !ok {
return Task{}, fmt.Errorf("Could not find task with id %v", ID)
}
wgTask.Wait()
return list.GetTaskByID(ID)
}

51
task/list_test.go Normal file
View File

@@ -0,0 +1,51 @@
package task
import (
"errors"
// need to import as check as otherwise List is redeclared
check "gopkg.in/check.v1"
)
type ListSuite struct{}
var _ = check.Suite(&ListSuite{})
func (s *ListSuite) TestList(c *check.C) {
list := NewList()
c.Assert(len(list.GetTasks()), check.Equals, 0)
task, err := list.RunTaskInBackground("Successful task", nil, func(out *Output, detail *Detail) error {
return nil
})
c.Assert(err, check.IsNil)
list.WaitForTaskByID(task.ID)
tasks := list.GetTasks()
c.Assert(len(tasks), check.Equals, 1)
task, _ = list.GetTaskByID(task.ID)
c.Check(task.State, check.Equals, SUCCEEDED)
output, _ := list.GetTaskOutputByID(task.ID)
c.Check(output, check.Equals, "Task succeeded")
detail, _ := list.GetTaskDetailByID(task.ID)
c.Check(detail, check.Equals, struct{}{})
task, err = list.RunTaskInBackground("Faulty task", nil, func(out *Output, detail *Detail) error {
detail.Store("Details")
out.WriteString("Test Progress\n")
return errors.New("Task failed")
})
c.Assert(err, check.IsNil)
list.WaitForTaskByID(task.ID)
tasks = list.GetTasks()
c.Assert(len(tasks), check.Equals, 2)
task, _ = list.GetTaskByID(task.ID)
c.Check(task.State, check.Equals, FAILED)
output, _ = list.GetTaskOutputByID(task.ID)
c.Check(output, check.Equals, "Test Progress\nTask failed with error: Task failed")
detail, _ = list.GetTaskDetailByID(task.ID)
c.Check(detail, check.Equals, "Details")
_, deleteErr := list.DeleteTaskByID(task.ID)
c.Check(deleteErr, check.IsNil)
}

95
task/output.go Normal file
View File

@@ -0,0 +1,95 @@
package task
import (
"bytes"
"fmt"
"sync"
)
// Output represents a safe standard output of task
// which is compatbile to AptlyProgress.
type Output struct {
mu *sync.Mutex
output *bytes.Buffer
}
// NewOutput creates new output
func NewOutput() *Output {
return &Output{mu: &sync.Mutex{}, output: &bytes.Buffer{}}
}
func (t *Output) String() string {
t.mu.Lock()
defer t.mu.Unlock()
return t.output.String()
}
// Write is used to determine how many bytes have been written
// not needed in our case.
func (t *Output) Write(p []byte) (n int, err error) {
return len(p), err
}
// WriteString writes string to output
func (t *Output) WriteString(s string) (n int, err error) {
t.mu.Lock()
defer t.mu.Unlock()
return t.output.WriteString(s)
}
// Start is needed for progress compatibility
func (t *Output) Start() {
// Not implemented
}
// Shutdown is needed for progress compatibility
func (t *Output) Shutdown() {
// Not implemented
}
// Flush is needed for progress compatibility
func (t *Output) Flush() {
// Not implemented
}
// InitBar is needed for progress compatibility
func (t *Output) InitBar(count int64, isBytes bool) {
// Not implemented
}
// ShutdownBar is needed for progress compatibility
func (t *Output) ShutdownBar() {
// Not implemented
}
// AddBar is needed for progress compatibility
func (t *Output) AddBar(count int) {
// Not implemented
}
// SetBar sets current position for progress bar
func (t *Output) SetBar(count int) {
// Not implemented
}
// Printf does printf in a safe manner
func (t *Output) Printf(msg string, a ...interface{}) {
t.WriteString(fmt.Sprintf(msg, a...))
}
// Print does printf in a safe manner
func (t *Output) Print(msg string) {
t.WriteString(msg)
}
// ColoredPrintf does printf in a safe manner + newline
// currently are no colors supported.
func (t *Output) ColoredPrintf(msg string, a ...interface{}) {
t.WriteString(fmt.Sprintf(msg+"\n", a...))
}
// PrintfStdErr does printf but in safe manner to output
func (t *Output) PrintfStdErr(msg string, a ...interface{}) {
t.WriteString(msg)
}

93
task/resources.go Normal file
View File

@@ -0,0 +1,93 @@
package task
import (
"strings"
)
// AllLocalReposResourcesKey to be used as resource key when all local repos are needed
const AllLocalReposResourcesKey = "__alllocalrepos__"
// ResourceConflictError represents a list tasks
// using conflicitng resources
type ResourceConflictError struct {
Tasks []Task
Message string
}
func (e *ResourceConflictError) Error() string {
return e.Message
}
// ResourcesSet represents a set of task resources.
// A resource is represented by its unique key
type ResourcesSet struct {
set map[string]*Task
}
// NewResourcesSet creates new instance of resources set
func NewResourcesSet() *ResourcesSet {
return &ResourcesSet{make(map[string]*Task)}
}
// MarkInUse given resources as used by given task
func (r *ResourcesSet) MarkInUse(resources []string, task *Task) {
for _, resource := range resources {
r.set[resource] = task
}
}
// UsedBy checks whether one of given resources
// is used by a task and if yes returns slice of such task
func (r *ResourcesSet) UsedBy(resources []string) []Task {
var tasks []Task
var task *Task
var found bool
for _, resource := range resources {
if resource == AllLocalReposResourcesKey {
for taskResource, task := range r.set {
if strings.HasPrefix(taskResource, "L") {
tasks = appendTask(tasks, task)
}
}
}
task, found = r.set[resource]
if found {
tasks = appendTask(tasks, task)
}
}
task, found = r.set[AllLocalReposResourcesKey]
if found {
tasks = appendTask(tasks, task)
}
return tasks
}
// appendTask only appends task to tasks slice if not already
// on slice
func appendTask(tasks []Task, task *Task) []Task {
needsAppending := true
for _, givenTask := range tasks {
if givenTask.ID == task.ID {
needsAppending = false
break
}
}
if needsAppending {
return append(tasks, *task)
}
return tasks
}
// Free removes given resources from dependency set
func (r *ResourcesSet) Free(resources []string) {
for _, resource := range resources {
delete(r.set, resource)
}
}

50
task/task.go Normal file
View File

@@ -0,0 +1,50 @@
package task
import (
"sync/atomic"
)
// State task is in
type State int
// Detail represents custom task details
type Detail struct {
atomic.Value
}
// Process is a function implementing the actual task logic
type Process func(out *Output, detail *Detail) error
const (
// IDLE when task is waiting
IDLE State = iota
// RUNNING when task is running
RUNNING
// SUCCEEDED when task is successfully finished
SUCCEEDED
// FAILED when task failed
FAILED
)
// Task represents as task in a queue encapsulates process code
type Task struct {
output *Output
detail *Detail
process Process
Name string
ID int
State State
}
// NewTask creates new task
func NewTask(process Process, name string, ID int) *Task {
task := &Task{
output: NewOutput(),
detail: &Detail{},
process: process,
Name: name,
ID: ID,
State: IDLE,
}
return task
}

12
task/task_test.go Normal file
View File

@@ -0,0 +1,12 @@
package task
import (
"testing"
check "gopkg.in/check.v1"
)
// Launch gocheck tests
func Test(t *testing.T) {
check.TestingT(t)
}