Skip to content

Commit

Permalink
Fix leaking tasks.db
Browse files Browse the repository at this point in the history
For a long time, it's been a known fault that tasks.db grows out of
control. The reason is that this database is only cleaned up, and old
tasks removed, during initialization.

When an assignment to a worker is removed, previously, the assignment
was just marked as removed in the tasks database. However, an assignment
is only removed from a worker when the task is removed from the manager.
The worker does not need to continue to keep track of the task.

Instead of marking a task as no longer assigned, when a task is removed
as an assignment, we'll simply delete it from the database.

I'm not 100% sure of what all the task database is responsible for, or
why it needs to be persisted, so this change is targeted to have the
minimal impact on the system.

Signed-off-by: Drew Erny <[email protected]>
(cherry picked from commit 585521d)
Signed-off-by: Sebastiaan van Stijn <[email protected]>
  • Loading branch information
dperny authored and thaJeztah committed Apr 2, 2020
1 parent 062b694 commit 875d503
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 15 deletions.
4 changes: 3 additions & 1 deletion agent/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ func PutTask(tx *bolt.Tx, task *api.Task) error {

// PutTaskStatus updates the status for the task with id.
func PutTaskStatus(tx *bolt.Tx, id string, status *api.TaskStatus) error {
return withCreateTaskBucketIfNotExists(tx, id, func(bkt *bolt.Bucket) error {
// this used to be withCreateTaskBucketIfNotExists, but that could lead
// to weird race conditions, and was not necessary.
return withTaskBucket(tx, id, func(bkt *bolt.Bucket) error {
p, err := proto.Marshal(status)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion agent/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func TestStoragePutGetStatusAssigned(t *testing.T) {
// set task, status and assignment for all tasks.
assert.NoError(t, db.Update(func(tx *bolt.Tx) error {
for _, task := range tasks {
assert.NoError(t, PutTaskStatus(tx, task.ID, &task.Status))
assert.NoError(t, PutTask(tx, task))
assert.NoError(t, PutTaskStatus(tx, task.ID, &task.Status))
assert.NoError(t, SetTaskAssignment(tx, task.ID, true))
}

Expand Down
26 changes: 23 additions & 3 deletions agent/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,15 @@ func reconcileTaskState(ctx context.Context, w *worker, assignments []*api.Assig

removeTaskAssignment := func(taskID string) error {
ctx := log.WithLogger(ctx, log.G(ctx).WithField("task.id", taskID))
if err := SetTaskAssignment(tx, taskID, false); err != nil {
log.G(ctx).WithError(err).Error("error setting task assignment in database")
// if a task is no longer assigned, then we do not have to keep track
// of it. a task will only be unassigned when it is deleted on the
// manager. instead of SetTaskAssginment to true, we'll just remove the
// task now.
if err := DeleteTask(tx, taskID); err != nil {
log.G(ctx).WithError(err).Error("error removing de-assigned task")
return err
}
return err
return nil
}

// If this was a complete set of assignments, we're going to remove all the remaining
Expand Down Expand Up @@ -500,6 +505,21 @@ func (w *worker) newTaskManager(ctx context.Context, tx *bolt.Tx, task *api.Task
// updateTaskStatus reports statuses to listeners, read lock must be held.
func (w *worker) updateTaskStatus(ctx context.Context, tx *bolt.Tx, taskID string, status *api.TaskStatus) error {
if err := PutTaskStatus(tx, taskID, status); err != nil {
// we shouldn't fail to put a task status. however, there exists the
// possibility of a race in which we try to put a task status after the
// task has been deleted. because this whole contraption is a careful
// dance of too-tightly-coupled concurrent parts, fixing tht race is
// fraught with hazards. instead, we'll recognize that it can occur,
// log the error, and then ignore it.
if err == errTaskUnknown {
// log at info level. debug logging in docker is already really
// verbose, so many people disable it. the race that causes this
// behavior should be very rare, but if it occurs, we should know
// about it, because if there is some case where it is _not_ rare,
// then knowing about it will go a long way toward debugging.
log.G(ctx).Info("attempted to update status for a task that has been removed")
return nil
}
log.G(ctx).WithError(err).Error("failed writing status to disk")
return err
}
Expand Down
14 changes: 4 additions & 10 deletions agent/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func TestWorkerAssign(t *testing.T) {
},
},
expectedTasks: []*api.Task{
{ID: "task-1"},
{ID: "task-2"},
},
expectedSecrets: []*api.Secret{
Expand All @@ -153,15 +152,14 @@ func TestWorkerAssign(t *testing.T) {
{ID: "config-2"},
},
expectedAssigned: []*api.Task{
// task-1 should be cleaned up and deleted.
{ID: "task-2"},
},
},
{
// remove assigned tasks, secret and config no longer present
expectedTasks: []*api.Task{
{ID: "task-1"},
{ID: "task-2"},
},
// there should be no tasks in the tasks db after this.
expectedTasks: nil,
},

// TODO(stevvooe): There are a few more states here we need to get
Expand All @@ -173,6 +171,7 @@ func TestWorkerAssign(t *testing.T) {
tasks []*api.Task
assigned []*api.Task
)

assert.NoError(t, worker.db.View(func(tx *bolt.Tx) error {
return WalkTasks(tx, func(task *api.Task) error {
tasks = append(tasks, task)
Expand Down Expand Up @@ -491,7 +490,6 @@ func TestWorkerUpdate(t *testing.T) {
},
},
expectedTasks: []*api.Task{
{ID: "task-1"},
{ID: "task-2"},
},
expectedSecrets: []*api.Secret{
Expand Down Expand Up @@ -556,10 +554,6 @@ func TestWorkerUpdate(t *testing.T) {
Action: api.AssignmentChange_AssignmentActionRemove,
},
},
expectedTasks: []*api.Task{
{ID: "task-1"},
{ID: "task-2"},
},
},
} {
assert.NoError(t, worker.Update(ctx, testcase.changeSet))
Expand Down

0 comments on commit 875d503

Please sign in to comment.