Skip to content

Commit

Permalink
Merge Sharanya's RCI idempotency changes
Browse files Browse the repository at this point in the history
  • Loading branch information
tommyhahn committed Nov 29, 2018
1 parent f683204 commit 25c2208
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 59 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 1.22.1
* Bug - Fixed a bug where agent can register container instance back to back and gets
assigned two container instance ARNs [#1579](https://github.com/aws/amazon-ecs-agent/pull/1579)

## 1.22.0
* Feature - Add support for ECS Secrets integrating with AWS Systems Manager Parameter Store
* Feature - Support for `--pid`, `--ipc` Docker run flags. [#1584](https://github.com/aws/amazon-ecs-agent/pull/1584)
Expand Down
10 changes: 6 additions & 4 deletions agent/api/ecsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (client *APIECSClient) CreateCluster(clusterName string) (string, error) {
// instance ARN allows a container instance to update its registered
// resources.
func (client *APIECSClient) RegisterContainerInstance(containerInstanceArn string,
attributes []*ecs.Attribute, tags []*ecs.Tag) (string, string, error) {
attributes []*ecs.Attribute, tags []*ecs.Tag, registrationToken string) (string, string, error) {
clusterRef := client.config.Cluster
// If our clusterRef is empty, we should try to create the default
if clusterRef == "" {
Expand All @@ -120,7 +120,7 @@ func (client *APIECSClient) RegisterContainerInstance(containerInstanceArn strin
}()
// Attempt to register without checking existence of the cluster so we don't require
// excess permissions in the case where the cluster already exists and is active
containerInstanceArn, availabilityzone, err := client.registerContainerInstance(clusterRef, containerInstanceArn, attributes, tags)
containerInstanceArn, availabilityzone, err := client.registerContainerInstance(clusterRef, containerInstanceArn, attributes, tags, registrationToken)
if err == nil {
return containerInstanceArn, availabilityzone, nil
}
Expand All @@ -131,11 +131,11 @@ func (client *APIECSClient) RegisterContainerInstance(containerInstanceArn strin
return "", "", err
}
}
return client.registerContainerInstance(clusterRef, containerInstanceArn, attributes, tags)
return client.registerContainerInstance(clusterRef, containerInstanceArn, attributes, tags, registrationToken)
}

func (client *APIECSClient) registerContainerInstance(clusterRef string, containerInstanceArn string,
attributes []*ecs.Attribute, tags []*ecs.Tag) (string, string, error) {
attributes []*ecs.Attribute, tags []*ecs.Tag, registrationToken string) (string, string, error) {
registerRequest := ecs.RegisterContainerInstanceInput{Cluster: &clusterRef}
var registrationAttributes []*ecs.Attribute
if containerInstanceArn != "" {
Expand Down Expand Up @@ -169,6 +169,8 @@ func (client *APIECSClient) registerContainerInstance(clusterRef string, contain
}

registerRequest.TotalResources = resources

registerRequest.ClientToken = &registrationToken
resp, err := client.standardClient.RegisterContainerInstance(&registerRequest)
if err != nil {
seelog.Errorf("Unable to register as a container instance with ECS: %v", err)
Expand Down
25 changes: 13 additions & 12 deletions agent/api/ecsclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
configuredCluster = "mycluster"
iid = "instanceIdentityDocument"
iidSignature = "signature"
registrationToken = "clientToken"
)

var (
Expand Down Expand Up @@ -343,6 +344,7 @@ func TestReRegisterContainerInstance(t *testing.T) {
mc.EXPECT().RegisterContainerInstance(gomock.Any()).Do(func(req *ecs.RegisterContainerInstanceInput) {
assert.Equal(t, "arn:test", *req.ContainerInstanceArn, "Wrong container instance ARN")
assert.Equal(t, configuredCluster, *req.Cluster, "Wrong cluster")
assert.Equal(t, registrationToken, *req.ClientToken, "Wrong client token")
assert.Equal(t, iid, *req.InstanceIdentityDocument, "Wrong IID")
assert.Equal(t, iidSignature, *req.InstanceIdentityDocumentSignature, "Wrong IID sig")
assert.Equal(t, 4, len(req.TotalResources), "Wrong length of TotalResources")
Expand Down Expand Up @@ -376,13 +378,10 @@ func TestReRegisterContainerInstance(t *testing.T) {
nil),
)

arn, availabilityzone, err := client.RegisterContainerInstance("arn:test", capabilities, containerInstanceTags)
if err != nil {
t.Errorf("Should not be an error: %v", err)
}
if arn != "registerArn" {
t.Errorf("Wrong arn: %v", arn)
}
arn, availabilityzone, err := client.RegisterContainerInstance("arn:test", capabilities, containerInstanceTags, registrationToken)

assert.NoError(t, err)
assert.Equal(t, "registerArn", arn)
assert.Equal(t, "us-west-2b", availabilityzone, "availabilityZone is incorrect")
}

Expand Down Expand Up @@ -410,6 +409,7 @@ func TestRegisterContainerInstance(t *testing.T) {
mc.EXPECT().RegisterContainerInstance(gomock.Any()).Do(func(req *ecs.RegisterContainerInstanceInput) {
assert.Nil(t, req.ContainerInstanceArn)
assert.Equal(t, configuredCluster, *req.Cluster, "Wrong cluster")
assert.Equal(t, registrationToken, *req.ClientToken, "Wrong client token")
assert.Equal(t, iid, *req.InstanceIdentityDocument, "Wrong IID")
assert.Equal(t, iidSignature, *req.InstanceIdentityDocumentSignature, "Wrong IID sig")
assert.Equal(t, 4, len(req.TotalResources), "Wrong length of TotalResources")
Expand Down Expand Up @@ -438,7 +438,7 @@ func TestRegisterContainerInstance(t *testing.T) {
nil),
)

arn, availabilityzone, err := client.RegisterContainerInstance("", capabilities, containerInstanceTags)
arn, availabilityzone, err := client.RegisterContainerInstance("", capabilities, containerInstanceTags, registrationToken)
assert.NoError(t, err)
assert.Equal(t, "registerArn", arn)
assert.Equal(t, "us-west-2b", availabilityzone)
Expand Down Expand Up @@ -472,6 +472,7 @@ func TestRegisterContainerInstanceNoIID(t *testing.T) {
mc.EXPECT().RegisterContainerInstance(gomock.Any()).Do(func(req *ecs.RegisterContainerInstanceInput) {
assert.Nil(t, req.ContainerInstanceArn)
assert.Equal(t, configuredCluster, *req.Cluster, "Wrong cluster")
assert.Equal(t, registrationToken, *req.ClientToken, "Wrong client token")
assert.Equal(t, "", *req.InstanceIdentityDocument, "Wrong IID")
assert.Equal(t, "", *req.InstanceIdentityDocumentSignature, "Wrong IID sig")
assert.Equal(t, 4, len(req.TotalResources), "Wrong length of TotalResources")
Expand Down Expand Up @@ -500,7 +501,7 @@ func TestRegisterContainerInstanceNoIID(t *testing.T) {
nil),
)

arn, availabilityzone, err := client.RegisterContainerInstance("", capabilities, containerInstanceTags)
arn, availabilityzone, err := client.RegisterContainerInstance("", capabilities, containerInstanceTags, registrationToken)
assert.NoError(t, err)
assert.Equal(t, "registerArn", arn)
assert.Equal(t, "us-west-2b", availabilityzone)
Expand All @@ -527,7 +528,7 @@ func TestRegisterContainerInstanceWithNegativeResource(t *testing.T) {
mockEC2Metadata.EXPECT().GetDynamicData(ec2.InstanceIdentityDocumentResource).Return("instanceIdentityDocument", nil),
mockEC2Metadata.EXPECT().GetDynamicData(ec2.InstanceIdentityDocumentSignatureResource).Return("signature", nil),
)
_, _, err := client.RegisterContainerInstance("", nil, nil)
_, _, err := client.RegisterContainerInstance("", nil, nil, "")
assert.Error(t, err, "Register resource with negative value should cause registration fail")
}

Expand Down Expand Up @@ -557,7 +558,7 @@ func TestRegisterContainerInstanceWithEmptyTags(t *testing.T) {
nil),
)

_, _, err := client.RegisterContainerInstance("", nil, make([]*ecs.Tag, 0))
_, _, err := client.RegisterContainerInstance("", nil, make([]*ecs.Tag, 0), "")
assert.NoError(t, err)
}

Expand Down Expand Up @@ -631,7 +632,7 @@ func TestRegisterBlankCluster(t *testing.T) {
nil),
)

arn, availabilityzone, err := client.RegisterContainerInstance("", nil, nil)
arn, availabilityzone, err := client.RegisterContainerInstance("", nil, nil, "")
if err != nil {
t.Errorf("Should not be an error: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion agent/api/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ECSClient interface {
// instance ARN allows a container instance to update its registered
// resources.
RegisterContainerInstance(existingContainerInstanceArn string,
attributes []*ecs.Attribute, tags []*ecs.Tag) (string, string, error)
attributes []*ecs.Attribute, tags []*ecs.Tag, registrationToken string) (string, string, error)
// SubmitTaskStateChange sends a state change and returns an error
// indicating if it was submitted
SubmitTaskStateChange(change TaskStateChange) error
Expand Down
8 changes: 4 additions & 4 deletions agent/api/mocks/api_mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 17 additions & 8 deletions agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
aws_credentials "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/defaults"
"github.com/cihub/seelog"
"github.com/pborman/uuid"
)

const (
Expand Down Expand Up @@ -107,6 +108,7 @@ type ecsAgent struct {
terminationHandler sighandlers.TerminationHandler
mobyPlugins mobypkgwrapper.Plugins
resourceFields *taskresource.ResourceFields
registrationToken string
availabilityZone string
}

Expand Down Expand Up @@ -241,7 +243,7 @@ func (agent *ecsAgent) doStart(containerChangeEventStream *eventstream.EventStre

// Initialize the state manager
stateManager, err := agent.newStateManager(taskEngine,
&agent.cfg.Cluster, &agent.containerInstanceARN, &currentEC2InstanceID, &agent.availabilityZone)
&agent.cfg.Cluster, &agent.containerInstanceARN, &currentEC2InstanceID, &agent.availabilityZone, &agent.registrationToken)
if err != nil {
seelog.Criticalf("Error creating state manager: %v", err)
return exitcodes.ExitTerminal
Expand Down Expand Up @@ -323,15 +325,15 @@ func (agent *ecsAgent) newTaskEngine(containerChangeEventStream *eventstream.Eve
}

// We try to set these values by loading the existing state file first
var previousCluster, previousEC2InstanceID, previousContainerInstanceArn, previousAZ string
var previousCluster, previousEC2InstanceID, previousContainerInstanceArn, previousAZ, previousRegistrationToken string
previousTaskEngine := engine.NewTaskEngine(agent.cfg, agent.dockerClient,
credentialsManager, containerChangeEventStream, imageManager, state,
agent.metadataManager, agent.resourceFields)

// previousStateManager is used to verify that our current runtime configuration is
// compatible with our past configuration as reflected by our state-file
previousStateManager, err := agent.newStateManager(previousTaskEngine, &previousCluster,
&previousContainerInstanceArn, &previousEC2InstanceID, &previousAZ)
&previousContainerInstanceArn, &previousEC2InstanceID, &previousAZ, &previousRegistrationToken)
if err != nil {
seelog.Criticalf("Error creating state manager: %v", err)
return nil, "", err
Expand Down Expand Up @@ -370,6 +372,7 @@ func (agent *ecsAgent) newTaskEngine(containerChangeEventStream *eventstream.Eve

// Use the values we loaded if there's no issue
agent.containerInstanceARN = previousContainerInstanceArn
agent.registrationToken = previousRegistrationToken

return previousTaskEngine, currentEC2InstanceID, nil
}
Expand Down Expand Up @@ -416,7 +419,7 @@ func (agent *ecsAgent) newStateManager(
cluster *string,
containerInstanceArn *string,
savedInstanceID *string,
availabilityZone *string) (statemanager.StateManager, error) {
availabilityZone *string, registrationToken *string) (statemanager.StateManager, error) {

if !agent.cfg.Checkpoint {
return statemanager.NewNoopStateManager(), nil
Expand All @@ -431,6 +434,7 @@ func (agent *ecsAgent) newStateManager(
// This is for making testing easier as we can mock this
agent.saveableOptionFactory.AddSaveable("EC2InstanceID", savedInstanceID),
agent.saveableOptionFactory.AddSaveable("availabilityZone", availabilityZone),
agent.saveableOptionFactory.AddSaveable("RegistrationToken", registrationToken),
)
}

Expand Down Expand Up @@ -479,13 +483,18 @@ func (agent *ecsAgent) registerContainerInstance(
tags = mergeTags(tags, ec2Tags)
}

if agent.registrationToken == "" {
agent.registrationToken = uuid.New()
stateManager.Save()
}

if agent.containerInstanceARN != "" {
seelog.Infof("Restored from checkpoint file. I am running as '%s' in cluster '%s'", agent.containerInstanceARN, agent.cfg.Cluster)
return agent.reregisterContainerInstance(client, capabilities, tags)
return agent.reregisterContainerInstance(client, capabilities, tags, agent.registrationToken)
}

seelog.Info("Registering Instance with ECS")
containerInstanceArn, availabilityZone, err := client.RegisterContainerInstance("", capabilities, tags)
containerInstanceArn, availabilityZone, err := client.RegisterContainerInstance("", capabilities, tags, agent.registrationToken)
if err != nil {
seelog.Errorf("Error registering: %v", err)
if retriable, ok := err.(apierrors.Retriable); ok && !retriable.Retry() {
Expand Down Expand Up @@ -513,8 +522,8 @@ func (agent *ecsAgent) registerContainerInstance(
// registered with ECS. This is for cases where the ECS Agent is being restored
// from a check point.
func (agent *ecsAgent) reregisterContainerInstance(client api.ECSClient,
capabilities []*ecs.Attribute, tags []*ecs.Tag) error {
_, _, err := client.RegisterContainerInstance(agent.containerInstanceARN, capabilities, tags)
capabilities []*ecs.Attribute, tags []*ecs.Tag, registrationToken string) error {
_, _, err := client.RegisterContainerInstance(agent.containerInstanceARN, capabilities, tags, registrationToken)

if err == nil {
return nil
Expand Down
6 changes: 3 additions & 3 deletions agent/app/agent_compatibility_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestCompatibilityEnabledSuccess(t *testing.T) {

gomock.InOrder(
saveableOptionFactory.EXPECT().AddSaveable(gomock.Any(), gomock.Any()).AnyTimes(),
stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(stateManager, nil),
stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(stateManager, nil),
stateManager.EXPECT().Load().AnyTimes(),
state.EXPECT().AllTasks().Return([]*apitask.Task{}),
)
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestCompatibilityDefaultEnabledFail(t *testing.T) {
}
gomock.InOrder(
saveableOptionFactory.EXPECT().AddSaveable(gomock.Any(), gomock.Any()).AnyTimes(),
stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(stateManager, nil),
stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(stateManager, nil),
stateManager.EXPECT().Load().AnyTimes(),
state.EXPECT().AllTasks().Return(getTaskListWithOneBadTask()),
)
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestCompatibilityExplicitlyEnabledFail(t *testing.T) {
}
gomock.InOrder(
saveableOptionFactory.EXPECT().AddSaveable(gomock.Any(), gomock.Any()).AnyTimes(),
stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(stateManager, nil),
stateManagerFactory.EXPECT().NewStateManager(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(stateManager, nil),
stateManager.EXPECT().Load().AnyTimes(),
state.EXPECT().AllTasks().Return(getTaskListWithOneBadTask()),
)
Expand Down
Loading

0 comments on commit 25c2208

Please sign in to comment.