Skip to content

Commit

Permalink
Merge pull request #2940 from thaJeztah/19.03_backport_fix_leaking_ta…
Browse files Browse the repository at this point in the history
…sk_db

[19.03 backport] Fix leaking tasks.db
  • Loading branch information
dperny authored Apr 3, 2020
2 parents 062b694 + 875d503 commit 0b8364e
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 15 deletions.
4 changes: 3 additions & 1 deletion agent/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func PutTask(tx *bolt.Tx, task *api.Task) error {

// PutTaskStatus updates the status for the task with id.
func PutTaskStatus(tx *bolt.Tx, id string, status *api.TaskStatus) error {
return withCreateTaskBucketIfNotExists(tx, id, func(bkt *bolt.Bucket) error {
// this used to be withCreateTaskBucketIfNotExists, but that could lead
// to weird race conditions, and was not necessary.
return withTaskBucket(tx, id, func(bkt *bolt.Bucket) error {
p, err := proto.Marshal(status)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion agent/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func TestStoragePutGetStatusAssigned(t *testing.T) {
// set task, status and assignment for all tasks.
assert.NoError(t, db.Update(func(tx *bolt.Tx) error {
for _, task := range tasks {
assert.NoError(t, PutTaskStatus(tx, task.ID, &task.Status))
assert.NoError(t, PutTask(tx, task))
assert.NoError(t, PutTaskStatus(tx, task.ID, &task.Status))
assert.NoError(t, SetTaskAssignment(tx, task.ID, true))
}

Expand Down
26 changes: 23 additions & 3 deletions agent/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,15 @@ func reconcileTaskState(ctx context.Context, w *worker, assignments []*api.Assig

removeTaskAssignment := func(taskID string) error {
ctx := log.WithLogger(ctx, log.G(ctx).WithField("task.id", taskID))
if err := SetTaskAssignment(tx, taskID, false); err != nil {
log.G(ctx).WithError(err).Error("error setting task assignment in database")
// if a task is no longer assigned, then we do not have to keep track
// of it. a task will only be unassigned when it is deleted on the
// manager. instead of SetTaskAssginment to true, we'll just remove the
// task now.
if err := DeleteTask(tx, taskID); err != nil {
log.G(ctx).WithError(err).Error("error removing de-assigned task")
return err
}
return err
return nil
}

// If this was a complete set of assignments, we're going to remove all the remaining
Expand Down Expand Up @@ -500,6 +505,21 @@ func (w *worker) newTaskManager(ctx context.Context, tx *bolt.Tx, task *api.Task
// updateTaskStatus reports statuses to listeners, read lock must be held.
func (w *worker) updateTaskStatus(ctx context.Context, tx *bolt.Tx, taskID string, status *api.TaskStatus) error {
if err := PutTaskStatus(tx, taskID, status); err != nil {
// we shouldn't fail to put a task status. however, there exists the
// possibility of a race in which we try to put a task status after the
// task has been deleted. because this whole contraption is a careful
// dance of too-tightly-coupled concurrent parts, fixing tht race is
// fraught with hazards. instead, we'll recognize that it can occur,
// log the error, and then ignore it.
if err == errTaskUnknown {
// log at info level. debug logging in docker is already really
// verbose, so many people disable it. the race that causes this
// behavior should be very rare, but if it occurs, we should know
// about it, because if there is some case where it is _not_ rare,
// then knowing about it will go a long way toward debugging.
log.G(ctx).Info("attempted to update status for a task that has been removed")
return nil
}
log.G(ctx).WithError(err).Error("failed writing status to disk")
return err
}
Expand Down
14 changes: 4 additions & 10 deletions agent/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func TestWorkerAssign(t *testing.T) {
},
},
expectedTasks: []*api.Task{
{ID: "task-1"},
{ID: "task-2"},
},
expectedSecrets: []*api.Secret{
Expand All @@ -153,15 +152,14 @@ func TestWorkerAssign(t *testing.T) {
{ID: "config-2"},
},
expectedAssigned: []*api.Task{
// task-1 should be cleaned up and deleted.
{ID: "task-2"},
},
},
{
// remove assigned tasks, secret and config no longer present
expectedTasks: []*api.Task{
{ID: "task-1"},
{ID: "task-2"},
},
// there should be no tasks in the tasks db after this.
expectedTasks: nil,
},

// TODO(stevvooe): There are a few more states here we need to get
Expand All @@ -173,6 +171,7 @@ func TestWorkerAssign(t *testing.T) {
tasks []*api.Task
assigned []*api.Task
)

assert.NoError(t, worker.db.View(func(tx *bolt.Tx) error {
return WalkTasks(tx, func(task *api.Task) error {
tasks = append(tasks, task)
Expand Down Expand Up @@ -491,7 +490,6 @@ func TestWorkerUpdate(t *testing.T) {
},
},
expectedTasks: []*api.Task{
{ID: "task-1"},
{ID: "task-2"},
},
expectedSecrets: []*api.Secret{
Expand Down Expand Up @@ -556,10 +554,6 @@ func TestWorkerUpdate(t *testing.T) {
Action: api.AssignmentChange_AssignmentActionRemove,
},
},
expectedTasks: []*api.Task{
{ID: "task-1"},
{ID: "task-2"},
},
},
} {
assert.NoError(t, worker.Update(ctx, testcase.changeSet))
Expand Down

0 comments on commit 0b8364e

Please sign in to comment.