mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-06-01 04:40:38 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ddcdc79091 | |||
| 08a2ebdc0a | |||
| 42f8aa2393 | |||
| 43d9cea041 |
@@ -182,6 +182,13 @@ func apiSnapshotsCreate(c *gin.Context) {
|
|||||||
resources = append(resources, string(sources[i].ResourceKey()))
|
resources = append(resources, string(sources[i].ResourceKey()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Pre-task check for destination snapshot name
|
||||||
|
_, err = snapshotCollection.ByName(b.Name)
|
||||||
|
if err == nil {
|
||||||
|
AbortWithJSONError(c, 409, fmt.Errorf("unable to create: snapshot %s already exists", b.Name))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
maybeRunTaskInBackground(c, "Create snapshot "+b.Name, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
|
||||||
// Phase 2: Inside task lock - create fresh factory
|
// Phase 2: Inside task lock - create fresh factory
|
||||||
taskCollectionFactory := context.NewCollectionFactory()
|
taskCollectionFactory := context.NewCollectionFactory()
|
||||||
|
|||||||
+34
-21
@@ -44,25 +44,27 @@ func (list *List) consumer() {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case task := <-list.queue:
|
case task := <-list.queue:
|
||||||
|
// Set task state to RUNNING before processing
|
||||||
list.Lock()
|
list.Lock()
|
||||||
{
|
task.State = RUNNING
|
||||||
task.State = RUNNING
|
|
||||||
}
|
|
||||||
list.Unlock()
|
list.Unlock()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
retValue, err := task.process(aptly.Progress(task.output), task.detail)
|
retValue, err := task.process(aptly.Progress(task.output), task.detail)
|
||||||
|
|
||||||
|
// Update task completion state and cleanup with list lock held
|
||||||
list.Lock()
|
list.Lock()
|
||||||
{
|
{
|
||||||
task.processReturnValue = retValue
|
|
||||||
task.err = err
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
task.output.Printf("Task failed with error: %v", err)
|
task.output.Printf("Task failed with error: %v", err)
|
||||||
task.State = FAILED
|
task.State = FAILED
|
||||||
|
task.err = err
|
||||||
|
task.processReturnValue = retValue
|
||||||
} else {
|
} else {
|
||||||
task.output.Print("Task succeeded")
|
task.output.Print("Task succeeded")
|
||||||
task.State = SUCCEEDED
|
task.State = SUCCEEDED
|
||||||
|
task.err = nil
|
||||||
|
task.processReturnValue = retValue
|
||||||
}
|
}
|
||||||
|
|
||||||
list.usedResources.Free(task.resources)
|
list.usedResources.Free(task.resources)
|
||||||
@@ -105,13 +107,15 @@ func (list *List) Stop() {
|
|||||||
|
|
||||||
// GetTasks gets complete list of tasks
|
// GetTasks gets complete list of tasks
|
||||||
func (list *List) GetTasks() []Task {
|
func (list *List) GetTasks() []Task {
|
||||||
tasks := []Task{}
|
|
||||||
list.Lock()
|
list.Lock()
|
||||||
|
defer list.Unlock()
|
||||||
|
|
||||||
|
tasks := []Task{}
|
||||||
for _, task := range list.tasks {
|
for _, task := range list.tasks {
|
||||||
|
// Copy task while holding list lock
|
||||||
tasks = append(tasks, *task)
|
tasks = append(tasks, *task)
|
||||||
}
|
}
|
||||||
|
|
||||||
list.Unlock()
|
|
||||||
return tasks
|
return tasks
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -139,11 +143,11 @@ func (list *List) DeleteTaskByID(ID int) (Task, error) {
|
|||||||
// GetTaskByID returns task with given id
|
// GetTaskByID returns task with given id
|
||||||
func (list *List) GetTaskByID(ID int) (Task, error) {
|
func (list *List) GetTaskByID(ID int) (Task, error) {
|
||||||
list.Lock()
|
list.Lock()
|
||||||
tasks := list.tasks
|
defer list.Unlock()
|
||||||
list.Unlock()
|
|
||||||
|
|
||||||
for _, task := range tasks {
|
for _, task := range list.tasks {
|
||||||
if task.ID == ID {
|
if task.ID == ID {
|
||||||
|
// Copy task while holding list lock
|
||||||
return *task, nil
|
return *task, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -180,13 +184,16 @@ func (list *List) GetTaskDetailByID(ID int) (interface{}, error) {
|
|||||||
|
|
||||||
// GetTaskReturnValueByID returns process return value of task with given id
|
// GetTaskReturnValueByID returns process return value of task with given id
|
||||||
func (list *List) GetTaskReturnValueByID(ID int) (*ProcessReturnValue, error) {
|
func (list *List) GetTaskReturnValueByID(ID int) (*ProcessReturnValue, error) {
|
||||||
task, err := list.GetTaskByID(ID)
|
list.Lock()
|
||||||
|
defer list.Unlock()
|
||||||
|
|
||||||
if err != nil {
|
for _, task := range list.tasks {
|
||||||
return nil, err
|
if task.ID == ID {
|
||||||
|
return task.processReturnValue, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return task.processReturnValue, nil
|
return nil, fmt.Errorf("could not find task with id %v", ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunTaskInBackground creates task and runs it in background. This will block until the necessary resources
|
// RunTaskInBackground creates task and runs it in background. This will block until the necessary resources
|
||||||
@@ -204,6 +211,10 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
|
|||||||
list.wg.Add(1)
|
list.wg.Add(1)
|
||||||
task.wgTask.Add(1)
|
task.wgTask.Add(1)
|
||||||
|
|
||||||
|
// Copy task while still holding the lock to avoid racing with consumer
|
||||||
|
// setting State=RUNNING after receiving from queue
|
||||||
|
taskCopy := *task
|
||||||
|
|
||||||
// add task to queue for processing if resources are available
|
// add task to queue for processing if resources are available
|
||||||
// if not, task will be queued by the consumer once resources are available
|
// if not, task will be queued by the consumer once resources are available
|
||||||
tasks := list.usedResources.UsedBy(resources)
|
tasks := list.usedResources.UsedBy(resources)
|
||||||
@@ -216,12 +227,13 @@ func (list *List) RunTaskInBackground(name string, resources []string, process P
|
|||||||
list.Unlock()
|
list.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
return *task, nil
|
return taskCopy, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear removes finished tasks from list
|
// Clear removes finished tasks from list
|
||||||
func (list *List) Clear() {
|
func (list *List) Clear() {
|
||||||
list.Lock()
|
list.Lock()
|
||||||
|
defer list.Unlock()
|
||||||
|
|
||||||
var tasks []*Task
|
var tasks []*Task
|
||||||
for _, task := range list.tasks {
|
for _, task := range list.tasks {
|
||||||
@@ -230,8 +242,6 @@ func (list *List) Clear() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
list.tasks = tasks
|
list.tasks = tasks
|
||||||
|
|
||||||
list.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait waits till all tasks are processed
|
// Wait waits till all tasks are processed
|
||||||
@@ -254,11 +264,14 @@ func (list *List) WaitForTaskByID(ID int) (Task, error) {
|
|||||||
|
|
||||||
// GetTaskErrorByID returns the Task error for a given id
|
// GetTaskErrorByID returns the Task error for a given id
|
||||||
func (list *List) GetTaskErrorByID(ID int) (error, error) {
|
func (list *List) GetTaskErrorByID(ID int) (error, error) {
|
||||||
task, err := list.GetTaskByID(ID)
|
list.Lock()
|
||||||
|
defer list.Unlock()
|
||||||
|
|
||||||
if err != nil {
|
for _, task := range list.tasks {
|
||||||
return nil, err
|
if task.ID == ID {
|
||||||
|
return task.err, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return task.err, nil
|
return nil, fmt.Errorf("could not find task with id %v", ID)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -42,6 +42,7 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Task represents as task in a queue encapsulates process code
|
// Task represents as task in a queue encapsulates process code
|
||||||
|
// All fields are protected by List.Mutex - access task fields only while holding list.Lock()
|
||||||
type Task struct {
|
type Task struct {
|
||||||
output *Output
|
output *Output
|
||||||
detail *Detail
|
detail *Detail
|
||||||
|
|||||||
Reference in New Issue
Block a user