Skip to content

Commit

Permalink
Docker version: mov caching logic to docker client
Browse files Browse the repository at this point in the history
Moved the logic of caching Docker daemon version string out of the
DockerTaskEngine to dockerGoClient struct. This makes it much more
reusable across the agent.

Also, got rid of the unused SetDockerClient() method in the
DockerTaskEngine.
  • Loading branch information
aaithal committed Apr 26, 2018
1 parent b57ed65 commit 7e78571
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 41 deletions.
27 changes: 26 additions & 1 deletion agent/dockerclient/dockerapi/docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ type dockerGoClient struct {

_time ttime.Time
_timeOnce sync.Once

daemonVersionUnsafe string
lock sync.Mutex
}

func (dg *dockerGoClient) WithVersion(version dockerclient.DockerVersion) DockerClient {
Expand Down Expand Up @@ -931,6 +934,11 @@ func (dg *dockerGoClient) KnownVersions() []dockerclient.DockerVersion {
}

func (dg *dockerGoClient) Version(ctx context.Context, timeout time.Duration) (string, error) {
version := dg.getDaemonVersion()
if version != "" {
return version, nil
}

derivedCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

Expand All @@ -942,7 +950,24 @@ func (dg *dockerGoClient) Version(ctx context.Context, timeout time.Duration) (s
if err != nil {
return "", err
}
return info.Get("Version"), nil

version = info.Get("Version")
dg.setDaemonVersion(version)
return version, nil
}

func (dg *dockerGoClient) getDaemonVersion() string {
dg.lock.Lock()
defer dg.lock.Unlock()

return dg.daemonVersionUnsafe
}

func (dg *dockerGoClient) setDaemonVersion(version string) {
dg.lock.Lock()
defer dg.lock.Unlock()

dg.daemonVersionUnsafe = version
}

// APIVersion returns the client api version
Expand Down
18 changes: 18 additions & 0 deletions agent/dockerclient/dockerapi/docker_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,24 @@ func TestDockerVersion(t *testing.T) {
}
}

func TestDockerVersionCached(t *testing.T) {
_, client, _, _, _, done := dockerClientSetup(t)
defer done()

// Explicitly set daemon version so that mockDocker (the docker client)
// is not invoked again
client.setDaemonVersion("1.6.0")
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
str, err := client.Version(ctx, dockerclient.VersionTimeout)
if err != nil {
t.Error(err)
}
if str != "1.6.0" {
t.Error("Got unexpected version string: " + str)
}
}

func TestListContainers(t *testing.T) {
mockDocker, client, _, _, _, done := dockerClientSetup(t)
defer done()
Expand Down
58 changes: 18 additions & 40 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,18 @@ type DockerTaskEngine struct {
stateChangeEvents chan statechange.Event
saver statemanager.Saver

client dockerapi.DockerClient
clientLock sync.Mutex
cniClient ecscni.CNIClient
client dockerapi.DockerClient
cniClient ecscni.CNIClient

containerChangeEventStream *eventstream.EventStream

stopEngine context.CancelFunc

// processTasks is a mutex that the task engine must acquire before changing
// tasksLock is a mutex that the task engine must acquire before changing
// any task's state which it manages. Since this is a lock that encompasses
// all tasks, it must not acquire it for any significant duration
// The write mutex should be taken when adding and removing tasks from managedTasks.
processTasks sync.RWMutex
tasksLock sync.RWMutex

enableConcurrentPull bool
credentialsManager credentials.Manager
Expand All @@ -121,9 +120,6 @@ type DockerTaskEngine struct {
// This can be used by tests that are looking to ensure that the steady state
// verification logic gets executed to set it to a low interval
taskSteadyStatePollInterval time.Duration

// dockerVersion is used to store the Docker version string
dockerVersion string
}

// NewDockerTaskEngine returns a created, but uninitialized, DockerTaskEngine.
Expand Down Expand Up @@ -221,13 +217,6 @@ func (engine *DockerTaskEngine) Init(ctx context.Context) error {
return nil
}

// SetDockerClient provides a way to override the client used for communication with docker as a testing hook.
func (engine *DockerTaskEngine) SetDockerClient(client dockerapi.DockerClient) {
engine.clientLock.Lock()
engine.clientLock.Unlock()
engine.client = client
}

// MustInit blocks and retries until an engine can be initialized.
func (engine *DockerTaskEngine) MustInit(ctx context.Context) {
if engine.initialized {
Expand Down Expand Up @@ -266,15 +255,15 @@ func (engine *DockerTaskEngine) Shutdown() {

// Disable prevents this engine from managing any additional tasks.
func (engine *DockerTaskEngine) Disable() {
engine.processTasks.Lock()
engine.tasksLock.Lock()
}

// synchronizeState explicitly goes through each docker container stored in
// "state" and updates its KnownStatus appropriately, as well as queueing up
// events to push upstream.
func (engine *DockerTaskEngine) synchronizeState() {
engine.processTasks.Lock()
defer engine.processTasks.Unlock()
engine.tasksLock.Lock()
defer engine.tasksLock.Unlock()
imageStates := engine.state.AllImageStates()
if len(imageStates) != 0 {
engine.imageManager.AddAllImageStates(imageStates)
Expand Down Expand Up @@ -411,9 +400,9 @@ func (engine *DockerTaskEngine) checkTaskState(task *api.Task) {
continue
}
status, metadata := engine.client.DescribeContainer(engine.ctx, dockerContainer.DockerID)
engine.processTasks.RLock()
engine.tasksLock.RLock()
managedTask, ok := engine.managedTasks[task.Arn]
engine.processTasks.RUnlock()
engine.tasksLock.RUnlock()

if ok {
managedTask.emitDockerContainerChange(dockerContainerChange{
Expand Down Expand Up @@ -466,7 +455,7 @@ func (engine *DockerTaskEngine) deleteTask(task *api.Task) {
}

// Now remove ourselves from the global state and cleanup channels
engine.processTasks.Lock()
engine.tasksLock.Lock()
engine.state.RemoveTask(task)
eni := task.GetTaskENI()
if eni == nil {
Expand All @@ -477,7 +466,7 @@ func (engine *DockerTaskEngine) deleteTask(task *api.Task) {
}
seelog.Debugf("Task engine [%s]: finished removing task data, removing task from managed tasks", task.Arn)
delete(engine.managedTasks, task.Arn)
engine.processTasks.Unlock()
engine.tasksLock.Unlock()
engine.saver.Save()
}

Expand Down Expand Up @@ -571,10 +560,10 @@ func (engine *DockerTaskEngine) handleDockerEvent(event dockerapi.DockerContaine
return
}

engine.processTasks.RLock()
engine.tasksLock.RLock()
managedTask, ok := engine.managedTasks[task.Arn]
// hold the lock until the message is sent so we don't send on a closed channel
defer engine.processTasks.RUnlock()
defer engine.tasksLock.RUnlock()
if !ok {
seelog.Criticalf("Task engine: could not find managed task [%s] corresponding to a docker event: %s",
task.Arn, event.String())
Expand All @@ -598,8 +587,8 @@ func (engine *DockerTaskEngine) StateChangeEvents() chan statechange.Event {
func (engine *DockerTaskEngine) AddTask(task *api.Task) error {
task.PostUnmarshalTask(engine.cfg, engine.credentialsManager)

engine.processTasks.Lock()
defer engine.processTasks.Unlock()
engine.tasksLock.Lock()
defer engine.tasksLock.Unlock()

existingTask, exists := engine.state.TaskByArn(task.Arn)
if !exists {
Expand Down Expand Up @@ -1078,9 +1067,9 @@ func (engine *DockerTaskEngine) transitionContainer(task *api.Task, container *a
// This is safe because 'applyContainerState' will not mutate the task
metadata := engine.applyContainerState(task, container, to)

engine.processTasks.RLock()
engine.tasksLock.RLock()
managedTask, ok := engine.managedTasks[task.Arn]
engine.processTasks.RUnlock()
engine.tasksLock.RUnlock()
if ok {
managedTask.emitDockerContainerChange(dockerContainerChange{
container: container,
Expand Down Expand Up @@ -1131,18 +1120,7 @@ func (engine *DockerTaskEngine) State() dockerstate.TaskEngineState {

// Version returns the underlying docker version.
func (engine *DockerTaskEngine) Version() (string, error) {
if engine.dockerVersion != "" {
// If the version string has already been populated, return it
return engine.dockerVersion, nil
}
// Version string is empty. Try getting it from Docker client
version, err := engine.client.Version(engine.ctx, dockerclient.VersionTimeout)
if err != nil {
return "", err
}
// Successfully retrieved the version string. Save it for future re-use
engine.dockerVersion = version
return engine.dockerVersion, nil
return engine.client.Version(engine.ctx, dockerclient.VersionTimeout)
}

func (engine *DockerTaskEngine) updateMetadataFile(task *api.Task, cont *api.DockerContainer) {
Expand Down

0 comments on commit 7e78571

Please sign in to comment.