Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge branch 'boltdb' into dev #2562

Merged
merged 22 commits into from
Aug 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
64a8953
Added boltdb dependency to vendor.
fenxiong Jun 1, 2020
cce6787
Added new data interface and dummy implementation.
fenxiong Jun 16, 2020
b72826b
Implemented task/container related methods for boltdb data client.
fenxiong Jun 16, 2020
c6c4ea6
Save task and container data to boltdb.
fenxiong Jun 17, 2020
1d46a83
Implement metadata related methods and save agent metadata to boltdb.
fenxiong Jun 26, 2020
0a1b4e3
Added boltdb implementation to get, update and delete image state. Up…
mythri-garaga Jun 29, 2020
8f6c9b7
Add and save task local ip address in task struct.
fenxiong Jun 29, 2020
8e4f86b
Added boltdb implementation to get, update and delete eni attachments…
mythri-garaga Jul 2, 2020
bb836cc
Save all state to boltdb in termination handler
mythri-garaga Jul 13, 2020
a59d9ea
Add method for task engine to load state from boltdb.
fenxiong Jul 15, 2020
40e4277
Load data from boltdb upon startup.
fenxiong Jul 15, 2020
0209f31
Merge branch 'dev' into boltdb
fenxiong Jul 21, 2020
000f6e4
Merge pull request #2532 from fenxiong/boltdb-merge
fenxiong Jul 21, 2020
482fd46
Remove usages of state manager in acs and event handler.
fenxiong Jul 21, 2020
6bd4e0f
Add more unit tests to api/eni package.
fenxiong Jul 21, 2020
0f72765
Remove state manager usage in task engine.
fenxiong Jul 27, 2020
ce953c6
Make sure ip <-> task mapping in state is correctly persisted.
fenxiong Jul 27, 2020
cbb6d2f
Merge branch 'dev' into boltdb
fenxiong Aug 3, 2020
ca26790
Merge pull request #2553 from fenxiong/boltdb-merge
fenxiong Aug 4, 2020
876425a
Remove unnecessary db saves and use batch for db update.
fenxiong Aug 3, 2020
b196631
Merge pull request #2552 from fenxiong/boltdb-update
fenxiong Aug 4, 2020
ad61341
Merge branch 'boltdb' into dev
fenxiong Aug 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions agent/Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions agent/acs/handler/acs_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import (
"github.com/aws/amazon-ecs-agent/agent/api"
"github.com/aws/amazon-ecs-agent/agent/config"
rolecredentials "github.com/aws/amazon-ecs-agent/agent/credentials"
"github.com/aws/amazon-ecs-agent/agent/data"
"github.com/aws/amazon-ecs-agent/agent/engine"
"github.com/aws/amazon-ecs-agent/agent/engine/dockerstate"
"github.com/aws/amazon-ecs-agent/agent/eventhandler"
"github.com/aws/amazon-ecs-agent/agent/eventstream"
"github.com/aws/amazon-ecs-agent/agent/statemanager"
"github.com/aws/amazon-ecs-agent/agent/utils/retry"
"github.com/aws/amazon-ecs-agent/agent/utils/ttime"
"github.com/aws/amazon-ecs-agent/agent/version"
Expand Down Expand Up @@ -83,7 +83,7 @@ type session struct {
taskEngine engine.TaskEngine
ecsClient api.ECSClient
state dockerstate.TaskEngineState
stateManager statemanager.StateManager
dataClient data.Client
credentialsManager rolecredentials.Manager
taskHandler *eventhandler.TaskHandler
ctx context.Context
Expand Down Expand Up @@ -141,7 +141,7 @@ func NewSession(ctx context.Context,
credentialsProvider *credentials.Credentials,
ecsClient api.ECSClient,
taskEngineState dockerstate.TaskEngineState,
stateManager statemanager.StateManager,
dataClient data.Client,
taskEngine engine.TaskEngine,
credentialsManager rolecredentials.Manager,
taskHandler *eventhandler.TaskHandler, latestSeqNumTaskManifest *int64) Session {
Expand All @@ -157,7 +157,7 @@ func NewSession(ctx context.Context,
credentialsProvider: credentialsProvider,
ecsClient: ecsClient,
state: taskEngineState,
stateManager: stateManager,
dataClient: dataClient,
taskEngine: taskEngine,
credentialsManager: credentialsManager,
taskHandler: taskHandler,
Expand Down Expand Up @@ -281,7 +281,7 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
acsSession.containerInstanceARN,
client,
acsSession.state,
acsSession.stateManager,
acsSession.dataClient,
)
eniAttachHandler.start()
defer eniAttachHandler.stop()
Expand All @@ -295,7 +295,7 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
acsSession.containerInstanceARN,
client,
acsSession.state,
acsSession.stateManager,
acsSession.dataClient,
)
instanceENIAttachHandler.start()
defer instanceENIAttachHandler.stop()
Expand All @@ -304,7 +304,7 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {

// Add TaskManifestHandler
taskManifestHandler := newTaskManifestHandler(acsSession.ctx, cfg.Cluster, acsSession.containerInstanceARN,
client, acsSession.stateManager, acsSession.taskEngine, acsSession.latestSeqNumTaskManifest)
client, acsSession.dataClient, acsSession.taskEngine, acsSession.latestSeqNumTaskManifest)

defer taskManifestHandler.clearAcks()
taskManifestHandler.start()
Expand All @@ -321,7 +321,7 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
cfg.Cluster,
acsSession.containerInstanceARN,
client,
acsSession.stateManager,
acsSession.dataClient,
refreshCredsHandler,
acsSession.credentialsManager,
acsSession.taskHandler, acsSession.latestSeqNumTaskManifest)
Expand All @@ -335,7 +335,7 @@ func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
// Ignore heartbeat messages; anyMessageHandler gets 'em
client.AddRequestHandler(func(*ecsacs.HeartbeatMessage) {})

updater.AddAgentUpdateHandlers(client, cfg, acsSession.stateManager, acsSession.taskEngine)
updater.AddAgentUpdateHandlers(client, cfg, acsSession.state, acsSession.dataClient, acsSession.taskEngine)

err := client.Connect()
if err != nil {
Expand Down
62 changes: 25 additions & 37 deletions agent/acs/handler/acs_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ import (
"github.com/aws/amazon-ecs-agent/agent/config"
rolecredentials "github.com/aws/amazon-ecs-agent/agent/credentials"
mock_credentials "github.com/aws/amazon-ecs-agent/agent/credentials/mocks"
"github.com/aws/amazon-ecs-agent/agent/data"
"github.com/aws/amazon-ecs-agent/agent/engine/dockerstate"
mock_engine "github.com/aws/amazon-ecs-agent/agent/engine/mocks"
"github.com/aws/amazon-ecs-agent/agent/eventhandler"
"github.com/aws/amazon-ecs-agent/agent/eventstream"
"github.com/aws/amazon-ecs-agent/agent/statemanager"

"github.com/aws/amazon-ecs-agent/agent/utils/retry"
mock_retry "github.com/aws/amazon-ecs-agent/agent/utils/retry/mock"
Expand Down Expand Up @@ -190,9 +190,8 @@ func TestHandlerReconnectsOnConnectErrors(t *testing.T) {
ecsClient := mock_api.NewMockECSClient(ctrl)
ecsClient.EXPECT().DiscoverPollEndpoint(gomock.Any()).Return(acsURL, nil).AnyTimes()

stateManager := statemanager.NewNoopStateManager()
ctx, cancel := context.WithCancel(context.Background())
taskHandler := eventhandler.NewTaskHandler(ctx, stateManager, nil, nil)
taskHandler := eventhandler.NewTaskHandler(ctx, data.NewNoopClient(), nil, nil)

mockWsClient := mock_wsclient.NewMockClientServer(ctrl)
mockWsClient.EXPECT().SetAnyRequestHandler(gomock.Any()).AnyTimes()
Expand All @@ -215,7 +214,7 @@ func TestHandlerReconnectsOnConnectErrors(t *testing.T) {
agentConfig: testConfig,
taskEngine: taskEngine,
ecsClient: ecsClient,
stateManager: stateManager,
dataClient: data.NewNoopClient(),
taskHandler: taskHandler,
backoff: retry.NewExponentialBackoff(connectionBackoffMin, connectionBackoffMax, connectionBackoffJitter, connectionBackoffMultiplier),
ctx: ctx,
Expand Down Expand Up @@ -329,9 +328,8 @@ func TestHandlerReconnectsWithoutBackoffOnEOFError(t *testing.T) {
ecsClient := mock_api.NewMockECSClient(ctrl)
ecsClient.EXPECT().DiscoverPollEndpoint(gomock.Any()).Return(acsURL, nil).AnyTimes()

stateManager := statemanager.NewNoopStateManager()
ctx, cancel := context.WithCancel(context.Background())
taskHandler := eventhandler.NewTaskHandler(ctx, stateManager, nil, nil)
taskHandler := eventhandler.NewTaskHandler(ctx, data.NewNoopClient(), nil, nil)

deregisterInstanceEventStream := eventstream.NewEventStream("DeregisterContainerInstance", ctx)
deregisterInstanceEventStream.StartListening()
Expand Down Expand Up @@ -360,7 +358,7 @@ func TestHandlerReconnectsWithoutBackoffOnEOFError(t *testing.T) {
taskEngine: taskEngine,
ecsClient: ecsClient,
deregisterInstanceEventStream: deregisterInstanceEventStream,
stateManager: stateManager,
dataClient: data.NewNoopClient(),
taskHandler: taskHandler,
backoff: mockBackoff,
ctx: ctx,
Expand Down Expand Up @@ -393,9 +391,8 @@ func TestHandlerReconnectsWithBackoffOnNonEOFError(t *testing.T) {
ecsClient := mock_api.NewMockECSClient(ctrl)
ecsClient.EXPECT().DiscoverPollEndpoint(gomock.Any()).Return(acsURL, nil).AnyTimes()

stateManager := statemanager.NewNoopStateManager()
ctx, cancel := context.WithCancel(context.Background())
taskHandler := eventhandler.NewTaskHandler(ctx, stateManager, nil, nil)
taskHandler := eventhandler.NewTaskHandler(ctx, data.NewNoopClient(), nil, nil)

deregisterInstanceEventStream := eventstream.NewEventStream("DeregisterContainerInstance", ctx)
deregisterInstanceEventStream.StartListening()
Expand Down Expand Up @@ -424,7 +421,7 @@ func TestHandlerReconnectsWithBackoffOnNonEOFError(t *testing.T) {
taskEngine: taskEngine,
ecsClient: ecsClient,
deregisterInstanceEventStream: deregisterInstanceEventStream,
stateManager: stateManager,
dataClient: data.NewNoopClient(),
taskHandler: taskHandler,
backoff: mockBackoff,
ctx: ctx,
Expand Down Expand Up @@ -455,9 +452,8 @@ func TestHandlerGeneratesDeregisteredInstanceEvent(t *testing.T) {
ecsClient := mock_api.NewMockECSClient(ctrl)
ecsClient.EXPECT().DiscoverPollEndpoint(gomock.Any()).Return(acsURL, nil).AnyTimes()

stateManager := statemanager.NewNoopStateManager()
ctx, cancel := context.WithCancel(context.Background())
taskHandler := eventhandler.NewTaskHandler(ctx, stateManager, nil, nil)
taskHandler := eventhandler.NewTaskHandler(ctx, data.NewNoopClient(), nil, nil)

deregisterInstanceEventStream := eventstream.NewEventStream("DeregisterContainerInstance", ctx)

Expand All @@ -483,7 +479,7 @@ func TestHandlerGeneratesDeregisteredInstanceEvent(t *testing.T) {
taskEngine: taskEngine,
ecsClient: ecsClient,
deregisterInstanceEventStream: deregisterInstanceEventStream,
stateManager: stateManager,
dataClient: data.NewNoopClient(),
taskHandler: taskHandler,
backoff: retry.NewExponentialBackoff(connectionBackoffMin, connectionBackoffMax, connectionBackoffJitter, connectionBackoffMultiplier),
ctx: ctx,
Expand Down Expand Up @@ -515,9 +511,8 @@ func TestHandlerReconnectDelayForInactiveInstanceError(t *testing.T) {
ecsClient := mock_api.NewMockECSClient(ctrl)
ecsClient.EXPECT().DiscoverPollEndpoint(gomock.Any()).Return(acsURL, nil).AnyTimes()

stateManager := statemanager.NewNoopStateManager()
ctx, cancel := context.WithCancel(context.Background())
taskHandler := eventhandler.NewTaskHandler(ctx, stateManager, nil, nil)
taskHandler := eventhandler.NewTaskHandler(ctx, data.NewNoopClient(), nil, nil)

deregisterInstanceEventStream := eventstream.NewEventStream("DeregisterContainerInstance", ctx)
deregisterInstanceEventStream.StartListening()
Expand Down Expand Up @@ -552,7 +547,7 @@ func TestHandlerReconnectDelayForInactiveInstanceError(t *testing.T) {
taskEngine: taskEngine,
ecsClient: ecsClient,
deregisterInstanceEventStream: deregisterInstanceEventStream,
stateManager: stateManager,
dataClient: data.NewNoopClient(),
taskHandler: taskHandler,
backoff: retry.NewExponentialBackoff(connectionBackoffMin, connectionBackoffMax, connectionBackoffJitter, connectionBackoffMultiplier),
ctx: ctx,
Expand Down Expand Up @@ -583,9 +578,8 @@ func TestHandlerReconnectsOnServeErrors(t *testing.T) {
ecsClient := mock_api.NewMockECSClient(ctrl)
ecsClient.EXPECT().DiscoverPollEndpoint(gomock.Any()).Return(acsURL, nil).AnyTimes()

stateManager := statemanager.NewNoopStateManager()
ctx, cancel := context.WithCancel(context.Background())
taskHandler := eventhandler.NewTaskHandler(ctx, stateManager, nil, nil)
taskHandler := eventhandler.NewTaskHandler(ctx, data.NewNoopClient(), nil, nil)

mockWsClient := mock_wsclient.NewMockClientServer(ctrl)
mockWsClient.EXPECT().SetAnyRequestHandler(gomock.Any()).AnyTimes()
Expand All @@ -609,7 +603,7 @@ func TestHandlerReconnectsOnServeErrors(t *testing.T) {
agentConfig: testConfig,
taskEngine: taskEngine,
ecsClient: ecsClient,
stateManager: stateManager,
dataClient: data.NewNoopClient(),
taskHandler: taskHandler,
backoff: retry.NewExponentialBackoff(connectionBackoffMin, connectionBackoffMax, connectionBackoffJitter, connectionBackoffMultiplier),
ctx: ctx,
Expand Down Expand Up @@ -639,9 +633,8 @@ func TestHandlerStopsWhenContextIsCancelled(t *testing.T) {
ecsClient := mock_api.NewMockECSClient(ctrl)
ecsClient.EXPECT().DiscoverPollEndpoint(gomock.Any()).Return(acsURL, nil).AnyTimes()

stateManager := statemanager.NewNoopStateManager()
ctx, cancel := context.WithCancel(context.Background())
taskHandler := eventhandler.NewTaskHandler(ctx, stateManager, nil, nil)
taskHandler := eventhandler.NewTaskHandler(ctx, data.NewNoopClient(), nil, nil)

mockWsClient := mock_wsclient.NewMockClientServer(ctrl)
mockWsClient.EXPECT().SetAnyRequestHandler(gomock.Any()).AnyTimes()
Expand All @@ -660,7 +653,7 @@ func TestHandlerStopsWhenContextIsCancelled(t *testing.T) {
agentConfig: testConfig,
taskEngine: taskEngine,
ecsClient: ecsClient,
stateManager: stateManager,
dataClient: data.NewNoopClient(),
taskHandler: taskHandler,
backoff: retry.NewExponentialBackoff(connectionBackoffMin, connectionBackoffMax, connectionBackoffJitter, connectionBackoffMultiplier),
ctx: ctx,
Expand Down Expand Up @@ -688,9 +681,8 @@ func TestHandlerReconnectsOnDiscoverPollEndpointError(t *testing.T) {
taskEngine.EXPECT().Version().Return("Docker: 1.5.0", nil).AnyTimes()

ecsClient := mock_api.NewMockECSClient(ctrl)
stateManager := statemanager.NewNoopStateManager()
ctx, cancel := context.WithCancel(context.Background())
taskHandler := eventhandler.NewTaskHandler(ctx, stateManager, nil, nil)
taskHandler := eventhandler.NewTaskHandler(ctx, data.NewNoopClient(), nil, nil)

mockWsClient := mock_wsclient.NewMockClientServer(ctrl)
mockWsClient.EXPECT().SetAnyRequestHandler(gomock.Any()).AnyTimes()
Expand All @@ -714,7 +706,7 @@ func TestHandlerReconnectsOnDiscoverPollEndpointError(t *testing.T) {
agentConfig: testConfig,
taskEngine: taskEngine,
ecsClient: ecsClient,
stateManager: stateManager,
dataClient: data.NewNoopClient(),
taskHandler: taskHandler,
backoff: retry.NewExponentialBackoff(connectionBackoffMin, connectionBackoffMax, connectionBackoffJitter, connectionBackoffMultiplier),
ctx: ctx,
Expand Down Expand Up @@ -756,9 +748,8 @@ func TestConnectionIsClosedOnIdle(t *testing.T) {
taskEngine.EXPECT().Version().Return("Docker: 1.5.0", nil).AnyTimes()

ecsClient := mock_api.NewMockECSClient(ctrl)
stateManager := statemanager.NewNoopStateManager()
ctx, cancel := context.WithCancel(context.Background())
taskHandler := eventhandler.NewTaskHandler(ctx, stateManager, nil, nil)
taskHandler := eventhandler.NewTaskHandler(ctx, data.NewNoopClient(), nil, nil)
defer cancel()

wait := sync.WaitGroup{}
Expand Down Expand Up @@ -786,7 +777,7 @@ func TestConnectionIsClosedOnIdle(t *testing.T) {
agentConfig: testConfig,
taskEngine: taskEngine,
ecsClient: ecsClient,
stateManager: stateManager,
dataClient: data.NewNoopClient(),
taskHandler: taskHandler,
ctx: context.Background(),
backoff: retry.NewExponentialBackoff(connectionBackoffMin, connectionBackoffMax, connectionBackoffJitter, connectionBackoffMultiplier),
Expand All @@ -806,9 +797,8 @@ func TestHandlerDoesntLeakGoroutines(t *testing.T) {
defer ctrl.Finish()
taskEngine := mock_engine.NewMockTaskEngine(ctrl)
ecsClient := mock_api.NewMockECSClient(ctrl)
stateManager := statemanager.NewNoopStateManager()
ctx, cancel := context.WithCancel(context.Background())
taskHandler := eventhandler.NewTaskHandler(ctx, stateManager, nil, nil)
taskHandler := eventhandler.NewTaskHandler(ctx, data.NewNoopClient(), nil, nil)

closeWS := make(chan bool)
server, serverIn, requests, errs, err := startMockAcsServer(t, closeWS)
Expand Down Expand Up @@ -840,7 +830,7 @@ func TestHandlerDoesntLeakGoroutines(t *testing.T) {
agentConfig: testConfig,
taskEngine: taskEngine,
ecsClient: ecsClient,
stateManager: stateManager,
dataClient: data.NewNoopClient(),
taskHandler: taskHandler,
ctx: ctx,
_heartbeatTimeout: 1 * time.Second,
Expand Down Expand Up @@ -891,9 +881,8 @@ func TestStartSessionHandlesRefreshCredentialsMessages(t *testing.T) {
defer ctrl.Finish()
taskEngine := mock_engine.NewMockTaskEngine(ctrl)
ecsClient := mock_api.NewMockECSClient(ctrl)
stateManager := statemanager.NewNoopStateManager()
ctx, cancel := context.WithCancel(context.Background())
taskHandler := eventhandler.NewTaskHandler(ctx, stateManager, nil, nil)
taskHandler := eventhandler.NewTaskHandler(ctx, data.NewNoopClient(), nil, nil)
closeWS := make(chan bool)
server, serverIn, requestsChan, errChan, err := startMockAcsServer(t, closeWS)
if err != nil {
Expand Down Expand Up @@ -927,7 +916,7 @@ func TestStartSessionHandlesRefreshCredentialsMessages(t *testing.T) {
testCreds,
ecsClient,
dockerstate.NewTaskEngineState(),
stateManager,
data.NewNoopClient(),
taskEngine,
credentialsManager,
taskHandler, &latestSeqNumberTaskManifest,
Expand Down Expand Up @@ -1013,9 +1002,8 @@ func TestHandlerReconnectsCorrectlySetsSendCredentialsURLParameter(t *testing.T)
defer ctrl.Finish()
taskEngine := mock_engine.NewMockTaskEngine(ctrl)
ecsClient := mock_api.NewMockECSClient(ctrl)
stateManager := statemanager.NewNoopStateManager()
ctx, cancel := context.WithCancel(context.Background())
taskHandler := eventhandler.NewTaskHandler(ctx, stateManager, nil, nil)
taskHandler := eventhandler.NewTaskHandler(ctx, data.NewNoopClient(), nil, nil)

mockWsClient := mock_wsclient.NewMockClientServer(ctrl)

Expand Down Expand Up @@ -1043,7 +1031,7 @@ func TestHandlerReconnectsCorrectlySetsSendCredentialsURLParameter(t *testing.T)
agentConfig: testConfig,
taskEngine: taskEngine,
ecsClient: ecsClient,
stateManager: stateManager,
dataClient: data.NewNoopClient(),
taskHandler: taskHandler,
ctx: ctx,
resources: resources,
Expand Down
Loading