From 9b1522334c8a781313688a0e2ee6972305c670ae Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Thu, 26 Oct 2023 14:25:42 -0700 Subject: [PATCH 01/22] write file package to track filesystem changes --- internal/pkg/cli/file/filetest/watchertest.go | 38 ++++++ internal/pkg/cli/file/hidden.go | 13 ++ internal/pkg/cli/file/hidden_windows.go | 21 +++ internal/pkg/cli/file/watch.go | 124 ++++++++++++++++++ 4 files changed, 196 insertions(+) create mode 100644 internal/pkg/cli/file/filetest/watchertest.go create mode 100644 internal/pkg/cli/file/hidden.go create mode 100644 internal/pkg/cli/file/hidden_windows.go create mode 100644 internal/pkg/cli/file/watch.go diff --git a/internal/pkg/cli/file/filetest/watchertest.go b/internal/pkg/cli/file/filetest/watchertest.go new file mode 100644 index 00000000000..28775d1830a --- /dev/null +++ b/internal/pkg/cli/file/filetest/watchertest.go @@ -0,0 +1,38 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package filetest + +import "github.com/fsnotify/fsnotify" + +// Double is a test double for file.RecursiveWatcher +type Double struct { + EventsFn func() <-chan fsnotify.Event + ErrorsFn func() <-chan error +} + +// Add is a no-op for Double. +func (d *Double) Add(string) error { + return nil +} + +// Close is a no-op for Double. +func (d *Double) Close() error { + return nil +} + +// Events calls the stubbed function. +func (d *Double) Events() <-chan fsnotify.Event { + if d.EventsFn == nil { + return nil + } + return d.EventsFn() +} + +// Errors calls the stubbed function. +func (d *Double) Errors() <-chan error { + if d.ErrorsFn == nil { + return nil + } + return d.ErrorsFn() +} diff --git a/internal/pkg/cli/file/hidden.go b/internal/pkg/cli/file/hidden.go new file mode 100644 index 00000000000..d43aca28e5b --- /dev/null +++ b/internal/pkg/cli/file/hidden.go @@ -0,0 +1,13 @@ +//go:build !windows + +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package file + +import "path/filepath" + +// IsHiddenFile returns true if the file is hidden on non-windows. +func IsHiddenFile(filename string) (bool, error) { + return filepath.Base(filename)[0] == '.', nil +} diff --git a/internal/pkg/cli/file/hidden_windows.go b/internal/pkg/cli/file/hidden_windows.go new file mode 100644 index 00000000000..b8bec85eee4 --- /dev/null +++ b/internal/pkg/cli/file/hidden_windows.go @@ -0,0 +1,21 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package file + +import ( + "syscall" +) + +// IsHiddenFile returns true if the file is hidden on windows. +func IsHiddenFile(filename string) (bool, error) { + pointer, err := syscall.UTF16PtrFromString(filename) + if err != nil { + return false, err + } + attributes, err := syscall.GetFileAttributes(pointer) + if err != nil { + return false, err + } + return attributes&syscall.FILE_ATTRIBUTE_HIDDEN != 0, nil +} diff --git a/internal/pkg/cli/file/watch.go b/internal/pkg/cli/file/watch.go new file mode 100644 index 00000000000..7e52ab23d0b --- /dev/null +++ b/internal/pkg/cli/file/watch.go @@ -0,0 +1,124 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package file + +import ( + "io/fs" + "os" + "path/filepath" + + "github.com/fsnotify/fsnotify" +) + +// RecursiveWatcher wraps an fsnotify Watcher to recursively watch all files in a directory. +type RecursiveWatcher struct { + fsnotifyWatcher *fsnotify.Watcher + done chan struct{} + closed bool + events chan fsnotify.Event + errors chan error +} + +// NewRecursiveWatcher returns a RecursiveWatcher which notifies when changes are made to files inside a recursive directory tree. +func NewRecursiveWatcher(dir string) (*RecursiveWatcher, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + + rw := &RecursiveWatcher{ + events: make(chan fsnotify.Event), + errors: make(chan error), + fsnotifyWatcher: watcher, + done: make(chan struct{}), + closed: false, + } + + go rw.start() + + return rw, nil +} + +// Add recursively adds a directory tree to the list of watched files. +func (rw *RecursiveWatcher) Add(path string) error { + if rw.closed { + return fsnotify.ErrClosed + } + if err := filepath.WalkDir(path, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() { + return rw.fsnotifyWatcher.Add(path) + } + return nil + }); err != nil { + return err + } + return nil +} + +// Remove recursively removes a directory tree from the list of watched files. +func (rw *RecursiveWatcher) Remove(path string) error { + if rw.closed { + return nil + } + if err := filepath.WalkDir(path, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() { + return rw.fsnotifyWatcher.Remove(path) + } + return nil + }); err != nil { + return err + } + return nil +} + +// Events returns the events channel. +func (rw *RecursiveWatcher) Events() <-chan fsnotify.Event { + return rw.events +} + +// Errors returns the errors channel. +func (rw *RecursiveWatcher) Errors() <-chan error { + return rw.errors +} + +// Close closes the RecursiveWatcher. +func (rw *RecursiveWatcher) Close() error { + rw.closed = true + close(rw.done) + return rw.fsnotifyWatcher.Close() +} + +func (rw *RecursiveWatcher) start() { + for { + select { + case event := <-rw.fsnotifyWatcher.Events: + info, err := os.Stat(event.Name) + if err != nil { + break + } + if info.IsDir() { + switch event.Op { + case fsnotify.Create: + rw.Add(event.Name) + case fsnotify.Remove: + rw.Remove(event.Name) + } + } else { + rw.events <- event + } + case err := <-rw.fsnotifyWatcher.Errors: + rw.errors <- err + case <-rw.done: + close(rw.events) + close(rw.errors) + return + } + } +} From 3bb3f2ffd8b93aa243df8bb6b1c341f3ef041a88 Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Thu, 26 Oct 2023 14:27:12 -0700 Subject: [PATCH 02/22] run local --watch flag --- go.mod | 1 + go.sum | 2 + internal/pkg/cli/flag.go | 2 + internal/pkg/cli/run_local.go | 196 ++++++++++++++---- internal/pkg/cli/run_local_test.go | 195 +++++++++++++++++ .../pkg/docker/orchestrator/orchestrator.go | 5 +- 6 files changed, 354 insertions(+), 47 deletions(-) diff --git a/go.mod b/go.mod index e5fd4cd246f..f42d0cee432 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/dustin/go-humanize v1.0.1 github.com/fatih/color v1.15.0 github.com/fatih/structs v1.1.0 + github.com/fsnotify/fsnotify v1.7.0 github.com/golang/mock v1.6.0 github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 github.com/google/uuid v1.3.1 diff --git a/go.sum b/go.sum index 23b823d7782..b7bdeb7a5b9 100644 --- a/go.sum +++ b/go.sum @@ -81,6 +81,8 @@ github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= diff --git a/internal/pkg/cli/flag.go b/internal/pkg/cli/flag.go index 7e65fbc93de..c12d6823bb5 100644 --- a/internal/pkg/cli/flag.go +++ b/internal/pkg/cli/flag.go @@ -69,6 +69,7 @@ const ( // Run local flags portOverrideFlag = "port-override" envVarOverrideFlag = "env-var-override" + watchFlag = "watch" // Flags for CI/CD. githubURLFlag = "github-url" @@ -320,6 +321,7 @@ Defaults to all logs. Only one of end-time / follow may be used.` Format: [container]:KEY=VALUE. Omit container name to apply to all containers.` portOverridesFlagDescription = `Optional. Override ports exposed by service. Format: :. Example: --port-override 5000:80 binds localhost:5000 to the service's port 80.` + watchFlagDescription = `Optional. Watch changes to local files and restart containers when updated.` svcManifestFlagDescription = `Optional. Name of the environment in which the service was deployed; output the manifest file used for that deployment.` diff --git a/internal/pkg/cli/run_local.go b/internal/pkg/cli/run_local.go index 71e3c4ebecf..3cafdb1fb67 100644 --- a/internal/pkg/cli/run_local.go +++ b/internal/pkg/cli/run_local.go @@ -12,6 +12,7 @@ import ( "strings" "sync" "syscall" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/arn" @@ -26,6 +27,7 @@ import ( "github.com/aws/copilot-cli/internal/pkg/aws/sessions" "github.com/aws/copilot-cli/internal/pkg/aws/ssm" clideploy "github.com/aws/copilot-cli/internal/pkg/cli/deploy" + "github.com/aws/copilot-cli/internal/pkg/cli/file" "github.com/aws/copilot-cli/internal/pkg/cli/group" "github.com/aws/copilot-cli/internal/pkg/config" "github.com/aws/copilot-cli/internal/pkg/deploy" @@ -43,6 +45,7 @@ import ( "github.com/aws/copilot-cli/internal/pkg/term/selector" "github.com/aws/copilot-cli/internal/pkg/term/syncbuffer" "github.com/aws/copilot-cli/internal/pkg/workspace" + "github.com/fsnotify/fsnotify" "github.com/spf13/afero" "github.com/spf13/cobra" "golang.org/x/sync/errgroup" @@ -58,34 +61,44 @@ type containerOrchestrator interface { Stop() } +type recursiveWatcher interface { + Add(path string) error + Close() error + Events() <-chan fsnotify.Event + Errors() <-chan error +} + type runLocalVars struct { wkldName string wkldType string appName string envName string envOverrides map[string]string + watch bool portOverrides portOverrides } type runLocalOpts struct { runLocalVars - sel deploySelector - ecsLocalClient ecsLocalClient - ssm secretGetter - secretsManager secretGetter - sessProvider sessionProvider - sess *session.Session - envSess *session.Session - targetEnv *config.Environment - targetApp *config.Application - store store - ws wsWlDirReader - cmd execRunner - dockerEngine dockerEngineRunner - repository repositoryService - prog progress - newOrchestrator func() containerOrchestrator + sel deploySelector + ecsLocalClient ecsLocalClient + ssm secretGetter + secretsManager secretGetter + sessProvider sessionProvider + sess *session.Session + envSess *session.Session + targetEnv *config.Environment + targetApp *config.Application + store store + ws wsWlDirReader + cmd execRunner + dockerEngine dockerEngineRunner + repository repositoryService + prog progress + newOrchestrator func() containerOrchestrator + newRecursiveWatcher func(path string) (recursiveWatcher, error) + newDebounceTimer func() <-chan time.Time buildContainerImages func(mft manifest.DynamicWorkload) (map[string]string, error) configureClients func(o *runLocalOpts) error @@ -195,6 +208,12 @@ func newRunLocalOpts(vars runLocalVars) (*runLocalOpts, error) { } }) } + opts.newRecursiveWatcher = func(path string) (recursiveWatcher, error) { + return file.NewRecursiveWatcher(path) + } + opts.newDebounceTimer = func() <-chan time.Time { + return time.After(5 * time.Second) + } return opts, nil } @@ -257,40 +276,11 @@ func (o *runLocalOpts) Execute() error { ctx := context.Background() - task, err := o.getTask(ctx) - if err != nil { - return fmt.Errorf("get task: %w", err) - } - - mft, _, err := workloadManifest(&workloadManifestInput{ - name: o.wkldName, - appName: o.appName, - envName: o.envName, - ws: o.ws, - interpolator: o.newInterpolator(o.appName, o.envName), - unmarshal: o.unmarshal, - sess: o.envSess, - }) + task, err := o.prepareTask(ctx) if err != nil { return err } - containerURIs, err := o.buildContainerImages(mft) - if err != nil { - return fmt.Errorf("build images: %w", err) - } - - // replace built images with the local built URI - for name, uri := range containerURIs { - ctr, ok := task.Containers[name] - if !ok { - return fmt.Errorf("built an image for %q, which doesn't exist in the task", name) - } - - ctr.ImageURI = uri - task.Containers[name] = ctr - } - orch := o.newOrchestrator() sigCh := make(chan os.Signal, 1) @@ -299,12 +289,20 @@ func (o *runLocalOpts) Execute() error { errCh := orch.Start() orch.RunTask(task) + stopCh := make(chan struct{}) + watchCh := make(chan interface{}) + watchErrCh := make(chan error) + if o.watch { + o.watchLocalFiles(watchCh, watchErrCh, stopCh) + } + for { select { case err, ok := <-errCh: // we loop until errCh closes, since Start() // closes errCh when the orchestrator is completely done. if !ok { + close(stopCh) return nil } @@ -313,6 +311,17 @@ func (o *runLocalOpts) Execute() error { case <-sigCh: signal.Stop(sigCh) orch.Stop() + case <-watchErrCh: + fmt.Printf("watch: %s\n", err) + orch.Stop() + case <-watchCh: + task, err = o.prepareTask(ctx) + if err != nil { + fmt.Printf("rerun task: %s\n", err) + orch.Stop() + break + } + orch.RunTask(task) } } } @@ -364,6 +373,100 @@ func (o *runLocalOpts) getTask(ctx context.Context) (orchestrator.Task, error) { return task, nil } +func (o *runLocalOpts) prepareTask(ctx context.Context) (orchestrator.Task, error) { + task, err := o.getTask(ctx) + if err != nil { + return orchestrator.Task{}, fmt.Errorf("get task: %w", err) + } + + mft, _, err := workloadManifest(&workloadManifestInput{ + name: o.wkldName, + appName: o.appName, + envName: o.envName, + ws: o.ws, + interpolator: o.newInterpolator(o.appName, o.envName), + unmarshal: o.unmarshal, + sess: o.envSess, + }) + if err != nil { + return orchestrator.Task{}, err + } + + containerURIs, err := o.buildContainerImages(mft) + if err != nil { + return orchestrator.Task{}, fmt.Errorf("build images: %w", err) + } + + // replace built images with the local built URI + for name, uri := range containerURIs { + ctr, ok := task.Containers[name] + if !ok { + return orchestrator.Task{}, fmt.Errorf("built an image for %q, which doesn't exist in the task", name) + } + + ctr.ImageURI = uri + task.Containers[name] = ctr + } + + return task, nil +} + +func (o *runLocalOpts) watchLocalFiles(watchCh chan<- interface{}, watchErrCh chan<- error, stopCh <-chan struct{}) error { + copilotDir := o.ws.Path() + + watcher, err := o.newRecursiveWatcher(copilotDir) + if err != nil { + return err + } + + if err = watcher.Add(copilotDir); err != nil { + return err + } + + watcherEvents := watcher.Events() + watcherErrors := watcher.Errors() + + go func() { + debounceTimerActive := false + for { + select { + case <-stopCh: + watcher.Close() + return + case err, ok := <-watcherErrors: + watchErrCh <- err + if !ok { + watcher.Close() + return + } + case event, ok := <-watcherEvents: + if !ok { + watcher.Close() + return + } + + isHidden, err := file.IsHiddenFile(event.Name) + if err != nil { + break + } + // TODO(Aiden): implement dockerignore blacklist for update + // Only update on Write operations to non-hidden files + if event.Has(fsnotify.Write) && !isHidden { + debounceTimerActive = true + } + case <-o.newDebounceTimer(): + // If debounce timer is active latest update was 5 seconds ago so send rebuild signal + if debounceTimerActive { + debounceTimerActive = false + watchCh <- nil + } + } + } + }() + + return nil +} + type containerEnv map[string]envVarValue type envVarValue struct { @@ -582,6 +685,7 @@ func BuildRunLocalCmd() *cobra.Command { cmd.Flags().StringVarP(&vars.wkldName, nameFlag, nameFlagShort, "", workloadFlagDescription) cmd.Flags().StringVarP(&vars.envName, envFlag, envFlagShort, "", envFlagDescription) cmd.Flags().StringVarP(&vars.appName, appFlag, appFlagShort, tryReadingAppName(), appFlagDescription) + cmd.Flags().BoolVar(&vars.watch, watchFlag, false, watchFlagDescription) cmd.Flags().Var(&vars.portOverrides, portOverrideFlag, portOverridesFlagDescription) cmd.Flags().StringToStringVar(&vars.envOverrides, envVarOverrideFlag, nil, envVarOverrideFlagDescription) return cmd diff --git a/internal/pkg/cli/run_local_test.go b/internal/pkg/cli/run_local_test.go index e2bdb20b0a8..87b95c25697 100644 --- a/internal/pkg/cli/run_local_test.go +++ b/internal/pkg/cli/run_local_test.go @@ -9,18 +9,21 @@ import ( "fmt" "syscall" "testing" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" sdkecs "github.com/aws/aws-sdk-go/service/ecs" "github.com/aws/copilot-cli/internal/pkg/aws/ecs" + "github.com/aws/copilot-cli/internal/pkg/cli/file/filetest" "github.com/aws/copilot-cli/internal/pkg/cli/mocks" "github.com/aws/copilot-cli/internal/pkg/config" "github.com/aws/copilot-cli/internal/pkg/docker/orchestrator" "github.com/aws/copilot-cli/internal/pkg/docker/orchestrator/orchestratortest" "github.com/aws/copilot-cli/internal/pkg/manifest" "github.com/aws/copilot-cli/internal/pkg/term/selector" + "github.com/fsnotify/fsnotify" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" ) @@ -206,6 +209,7 @@ type runLocalExecuteMocks struct { secretsManager *mocks.MocksecretGetter prog *mocks.Mockprogress orchestrator *orchestratortest.Double + watcher *filetest.Double } type mockProvider struct { @@ -298,6 +302,58 @@ func TestRunLocalOpts_Execute(t *testing.T) { }, }, } + alteredTaskDef := &ecs.TaskDefinition{ + ContainerDefinitions: []*sdkecs.ContainerDefinition{ + { + Name: aws.String("foo"), + Environment: []*sdkecs.KeyValuePair{ + { + Name: aws.String("FOO_VAR"), + Value: aws.String("foo-value"), + }, + }, + Secrets: []*sdkecs.Secret{ + { + Name: aws.String("SHARED_SECRET"), + ValueFrom: aws.String("mysecret"), + }, + }, + PortMappings: []*sdkecs.PortMapping{ + { + HostPort: aws.Int64(80), + ContainerPort: aws.Int64(8081), + }, + { + HostPort: aws.Int64(9999), + }, + }, + }, + { + Name: aws.String("bar"), + Environment: []*sdkecs.KeyValuePair{ + { + Name: aws.String("BAR_VAR"), + Value: aws.String("bar-value"), + }, + }, + Secrets: []*sdkecs.Secret{ + { + Name: aws.String("SHARED_SECRET"), + ValueFrom: aws.String("mysecret"), + }, + }, + PortMappings: []*sdkecs.PortMapping{ + { + HostPort: aws.Int64(10000), + }, + { + HostPort: aws.Int64(77), + ContainerPort: aws.Int64(7777), + }, + }, + }, + }, + } expectedTask := orchestrator.Task{ Containers: map[string]orchestrator.ContainerDefinition{ "foo": { @@ -341,6 +397,7 @@ func TestRunLocalOpts_Execute(t *testing.T) { inputWkldName string inputEnvOverrides map[string]string inputPortOverrides []string + inputWatch bool buildImagesError error setupMocks func(t *testing.T, m *runLocalExecuteMocks) @@ -465,6 +522,136 @@ func TestRunLocalOpts_Execute(t *testing.T) { } }, }, + "watch flag restarts, error for pause container definition update": { + inputAppName: testAppName, + inputWkldName: testWkldName, + inputEnvName: testEnvName, + inputWatch: true, + setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { + m.ecsLocalClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) + m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil).Times(2) + m.ws.EXPECT().ReadWorkloadManifest(testWkldName).Return([]byte(""), nil).Times(2) + m.interpolator.EXPECT().Interpolate("").Return("", nil).Times(2) + m.ws.EXPECT().Path().Return("") + m.ecsLocalClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(alteredTaskDef, nil) + + eventCh := make(chan fsnotify.Event, 1) + m.watcher.EventsFn = func() <-chan fsnotify.Event { + eventCh <- fsnotify.Event{ + Name: "mockFilename", + Op: fsnotify.Write, + } + return eventCh + } + + watcherErrCh := make(chan error, 1) + m.watcher.ErrorsFn = func() <-chan error { + return watcherErrCh + } + + errCh := make(chan error, 1) + m.orchestrator.StartFn = func() <-chan error { + return errCh + } + + count := 1 + m.orchestrator.RunTaskFn = func(task orchestrator.Task) { + switch count { + case 1: + require.Equal(t, expectedTask, task) + case 2: + require.NotEqual(t, expectedTask, task) + errCh <- errors.New("new task requires recreating pause container") + } + count++ + } + + m.orchestrator.StopFn = func() { + close(errCh) + } + }, + }, + "watcher error succesfully stops all goroutines": { + inputAppName: testAppName, + inputWkldName: testWkldName, + inputEnvName: testEnvName, + inputWatch: true, + setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { + m.ecsLocalClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) + m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil) + m.ws.EXPECT().ReadWorkloadManifest(testWkldName).Return([]byte(""), nil) + m.interpolator.EXPECT().Interpolate("").Return("", nil) + m.ws.EXPECT().Path().Return("") + + eventCh := make(chan fsnotify.Event, 1) + m.watcher.EventsFn = func() <-chan fsnotify.Event { + return eventCh + } + + watcherErrCh := make(chan error, 1) + m.watcher.ErrorsFn = func() <-chan error { + watcherErrCh <- errors.New("some error") + return watcherErrCh + } + + errCh := make(chan error, 1) + m.orchestrator.StartFn = func() <-chan error { + return errCh + } + + m.orchestrator.RunTaskFn = func(task orchestrator.Task) { + require.Equal(t, expectedTask, task) + } + + m.orchestrator.StopFn = func() { + close(errCh) + } + }, + }, + "watch flag restarts and finishes successfully": { + inputAppName: testAppName, + inputWkldName: testWkldName, + inputEnvName: testEnvName, + inputWatch: true, + setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { + m.ecsLocalClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil).Times(2) + m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil).Times(2) + m.ws.EXPECT().ReadWorkloadManifest(testWkldName).Return([]byte(""), nil).Times(2) + m.interpolator.EXPECT().Interpolate("").Return("", nil).Times(2) + m.ws.EXPECT().Path().Return("") + + eventCh := make(chan fsnotify.Event, 1) + m.watcher.EventsFn = func() <-chan fsnotify.Event { + eventCh <- fsnotify.Event{ + Name: "mockFilename", + Op: fsnotify.Write, + } + return eventCh + } + + watcherErrCh := make(chan error, 1) + m.watcher.ErrorsFn = func() <-chan error { + return watcherErrCh + } + + errCh := make(chan error, 1) + m.orchestrator.StartFn = func() <-chan error { + return errCh + } + runCount := 1 + m.orchestrator.RunTaskFn = func(task orchestrator.Task) { + require.Equal(t, expectedTask, task) + if runCount > 1 { + syscall.Kill(syscall.Getpid(), syscall.SIGINT) + } + runCount++ + } + + m.orchestrator.StopFn = func() { + close(errCh) + } + }, + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { @@ -483,6 +670,7 @@ func TestRunLocalOpts_Execute(t *testing.T) { repository: mocks.NewMockrepositoryService(ctrl), prog: mocks.NewMockprogress(ctrl), orchestrator: &orchestratortest.Double{}, + watcher: &filetest.Double{}, } tc.setupMocks(t, m) opts := runLocalOpts{ @@ -491,6 +679,7 @@ func TestRunLocalOpts_Execute(t *testing.T) { wkldName: tc.inputWkldName, envName: tc.inputEnvName, envOverrides: tc.inputEnvOverrides, + watch: tc.inputWatch, portOverrides: portOverrides{ { host: "777", @@ -533,6 +722,12 @@ func TestRunLocalOpts_Execute(t *testing.T) { newOrchestrator: func() containerOrchestrator { return m.orchestrator }, + newRecursiveWatcher: func(string) (recursiveWatcher, error) { + return m.watcher, nil + }, + newDebounceTimer: func() <-chan time.Time { + return time.After(10 * time.Millisecond) + }, } // WHEN err := opts.Execute() diff --git a/internal/pkg/docker/orchestrator/orchestrator.go b/internal/pkg/docker/orchestrator/orchestrator.go index 53ede4f26c5..1fe4fca5714 100644 --- a/internal/pkg/docker/orchestrator/orchestrator.go +++ b/internal/pkg/docker/orchestrator/orchestrator.go @@ -162,6 +162,10 @@ func (a *runTaskAction) Do(o *Orchestrator) error { if err := o.stopTask(ctx, o.curTask); err != nil { return fmt.Errorf("stop existing task: %w", err) } + + // ensure that containers are fully stopped after o.stopTask finishes blocking + // TODO(Aiden): Implement a container ID system or use `docker ps` to ensure containers are stopped + time.Sleep(1 * time.Second) } for name, ctr := range a.task.Containers { @@ -311,7 +315,6 @@ func (o *Orchestrator) run(taskID int32, opts dockerengine.RunOptions) { o.wg.Add(1) go func() { defer o.wg.Done() - if err := o.docker.Run(context.Background(), &opts); err != nil { curTaskID := o.curTaskID.Load() if curTaskID == orchestratorStoppedTaskID { From e265dd54ce9ae3f5910e0bccfd3fc02f88445f11 Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Thu, 26 Oct 2023 14:32:21 -0700 Subject: [PATCH 03/22] reverse accidental line spacing --- internal/pkg/docker/orchestrator/orchestrator.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/pkg/docker/orchestrator/orchestrator.go b/internal/pkg/docker/orchestrator/orchestrator.go index 1fe4fca5714..bbf69d1b052 100644 --- a/internal/pkg/docker/orchestrator/orchestrator.go +++ b/internal/pkg/docker/orchestrator/orchestrator.go @@ -315,6 +315,7 @@ func (o *Orchestrator) run(taskID int32, opts dockerengine.RunOptions) { o.wg.Add(1) go func() { defer o.wg.Done() + if err := o.docker.Run(context.Background(), &opts); err != nil { curTaskID := o.curTaskID.Load() if curTaskID == orchestratorStoppedTaskID { From 423fafdda9031478ce1aba9fbbffa38c76774862 Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Thu, 26 Oct 2023 14:54:45 -0700 Subject: [PATCH 04/22] handle returned errors --- internal/pkg/cli/file/watch.go | 10 ++++++++-- internal/pkg/cli/run_local.go | 4 +++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/internal/pkg/cli/file/watch.go b/internal/pkg/cli/file/watch.go index 7e52ab23d0b..c5e03acebd6 100644 --- a/internal/pkg/cli/file/watch.go +++ b/internal/pkg/cli/file/watch.go @@ -106,9 +106,15 @@ func (rw *RecursiveWatcher) start() { if info.IsDir() { switch event.Op { case fsnotify.Create: - rw.Add(event.Name) + err := rw.Add(event.Name) + if err != nil { + rw.errors <- err + } case fsnotify.Remove: - rw.Remove(event.Name) + err := rw.Remove(event.Name) + if err != nil { + rw.errors <- err + } } } else { rw.events <- event diff --git a/internal/pkg/cli/run_local.go b/internal/pkg/cli/run_local.go index 3cafdb1fb67..8dcac0b8a31 100644 --- a/internal/pkg/cli/run_local.go +++ b/internal/pkg/cli/run_local.go @@ -293,7 +293,9 @@ func (o *runLocalOpts) Execute() error { watchCh := make(chan interface{}) watchErrCh := make(chan error) if o.watch { - o.watchLocalFiles(watchCh, watchErrCh, stopCh) + if err := o.watchLocalFiles(watchCh, watchErrCh, stopCh); err != nil { + return fmt.Errorf("setup watch: %s", err) + } } for { From 8a7d8a2bbbdcfb4c36146b59dc93dea2aea41f62 Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Thu, 26 Oct 2023 16:10:18 -0700 Subject: [PATCH 05/22] reorder test calls --- internal/pkg/cli/run_local_test.go | 66 ++++++++++++++++-------------- 1 file changed, 36 insertions(+), 30 deletions(-) diff --git a/internal/pkg/cli/run_local_test.go b/internal/pkg/cli/run_local_test.go index 9cc4c0efc76..512e911687f 100644 --- a/internal/pkg/cli/run_local_test.go +++ b/internal/pkg/cli/run_local_test.go @@ -442,80 +442,86 @@ func TestRunLocalOpts_Execute(t *testing.T) { }, wantedError: errors.New(`get task: get env vars: parse env overrides: "bad:OVERRIDE" targets invalid container`), }, - "error getting env version": { + "error reading workload manifest": { inputAppName: testAppName, inputWkldName: testWkldName, inputEnvName: testEnvName, - inProxy: true, setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { m.ecsClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil) - m.envChecker.EXPECT().Version().Return("", fmt.Errorf("some error")) + m.ws.EXPECT().ReadWorkloadManifest(testWkldName).Return(nil, errors.New("some error")) }, - wantedError: errors.New(`retrieve version of environment stack "testEnv" in application "testApp": some error`), + wantedError: errors.New(`read manifest file for testWkld: some error`), }, - "error due to old env version": { + "error interpolating workload manifest": { inputAppName: testAppName, inputWkldName: testWkldName, inputEnvName: testEnvName, - inProxy: true, setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { m.ecsClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil) - m.envChecker.EXPECT().Version().Return("v1.31.0", nil) + m.ws.EXPECT().ReadWorkloadManifest(testWkldName).Return([]byte(""), nil) + m.interpolator.EXPECT().Interpolate("").Return("", errors.New("some error")) }, - wantedError: errors.New(`environment "testEnv" is on version "v1.31.0" which does not support the "run local --proxy" feature`), + wantedError: errors.New(`interpolate environment variables for testWkld manifest: some error`), }, - "error getting hosts to proxy to": { - inputAppName: testAppName, - inputWkldName: testWkldName, - inputEnvName: testEnvName, - inProxy: true, + "error building container images": { + inputAppName: testAppName, + inputWkldName: testWkldName, + inputEnvName: testEnvName, + buildImagesError: errors.New("some error"), setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { m.ecsClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil) - m.envChecker.EXPECT().Version().Return("v1.32.0", nil) - m.hostFinder.HostsFn = func(ctx context.Context) ([]host, error) { - return nil, fmt.Errorf("some error") - } + m.ws.EXPECT().ReadWorkloadManifest(testWkldName).Return([]byte(""), nil) + m.interpolator.EXPECT().Interpolate("").Return("", nil) }, - wantedError: errors.New(`find hosts to connect to: some error`), + wantedError: errors.New(`build images: some error`), }, - "error reading workload manifest": { + "error getting env version": { inputAppName: testAppName, inputWkldName: testWkldName, inputEnvName: testEnvName, + inProxy: true, setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { m.ecsClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil) - m.ws.EXPECT().ReadWorkloadManifest(testWkldName).Return(nil, errors.New("some error")) + m.ws.EXPECT().ReadWorkloadManifest(testWkldName).Return([]byte(""), nil) + m.interpolator.EXPECT().Interpolate("").Return("", nil) + m.envChecker.EXPECT().Version().Return("", fmt.Errorf("some error")) }, - wantedError: errors.New(`read manifest file for testWkld: some error`), + wantedError: errors.New(`retrieve version of environment stack "testEnv" in application "testApp": some error`), }, - "error interpolating workload manifest": { + "error due to old env version": { inputAppName: testAppName, inputWkldName: testWkldName, inputEnvName: testEnvName, + inProxy: true, setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { m.ecsClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil) m.ws.EXPECT().ReadWorkloadManifest(testWkldName).Return([]byte(""), nil) - m.interpolator.EXPECT().Interpolate("").Return("", errors.New("some error")) + m.interpolator.EXPECT().Interpolate("").Return("", nil) + m.envChecker.EXPECT().Version().Return("v1.31.0", nil) }, - wantedError: errors.New(`interpolate environment variables for testWkld manifest: some error`), + wantedError: errors.New(`environment "testEnv" is on version "v1.31.0" which does not support the "run local --proxy" feature`), }, - "error building container images": { - inputAppName: testAppName, - inputWkldName: testWkldName, - inputEnvName: testEnvName, - buildImagesError: errors.New("some error"), + "error getting hosts to proxy to": { + inputAppName: testAppName, + inputWkldName: testWkldName, + inputEnvName: testEnvName, + inProxy: true, setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { m.ecsClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil) m.ws.EXPECT().ReadWorkloadManifest(testWkldName).Return([]byte(""), nil) m.interpolator.EXPECT().Interpolate("").Return("", nil) + m.envChecker.EXPECT().Version().Return("v1.32.0", nil) + m.hostFinder.HostsFn = func(ctx context.Context) ([]host, error) { + return nil, fmt.Errorf("some error") + } }, - wantedError: errors.New(`build images: some error`), + wantedError: errors.New(`find hosts to connect to: some error`), }, "pulls errors from orchestrator": { inputAppName: testAppName, From 21d6605cdaf15106d0bb0b06b20470eab164244b Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Fri, 27 Oct 2023 13:50:17 -0700 Subject: [PATCH 06/22] simplify debounce --- internal/pkg/cli/run_local.go | 13 +++++-------- internal/pkg/cli/run_local_test.go | 4 +++- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/internal/pkg/cli/run_local.go b/internal/pkg/cli/run_local.go index b8eecda2367..5e4a8d3118a 100644 --- a/internal/pkg/cli/run_local.go +++ b/internal/pkg/cli/run_local.go @@ -465,8 +465,9 @@ func (o *runLocalOpts) watchLocalFiles(watchCh chan<- interface{}, watchErrCh ch watcherEvents := watcher.Events() watcherErrors := watcher.Errors() + var debounceCh <-chan time.Time + go func() { - debounceTimerActive := false for { select { case <-stopCh: @@ -491,14 +492,10 @@ func (o *runLocalOpts) watchLocalFiles(watchCh chan<- interface{}, watchErrCh ch // TODO(Aiden): implement dockerignore blacklist for update // Only update on Write operations to non-hidden files if event.Has(fsnotify.Write) && !isHidden { - debounceTimerActive = true - } - case <-o.newDebounceTimer(): - // If debounce timer is active latest update was 5 seconds ago so send rebuild signal - if debounceTimerActive { - debounceTimerActive = false - watchCh <- nil + debounceCh = o.newDebounceTimer() } + case <-debounceCh: + watchCh <- nil } } }() diff --git a/internal/pkg/cli/run_local_test.go b/internal/pkg/cli/run_local_test.go index 512e911687f..811dd0eca8b 100644 --- a/internal/pkg/cli/run_local_test.go +++ b/internal/pkg/cli/run_local_test.go @@ -789,7 +789,9 @@ func TestRunLocalOpts_Execute(t *testing.T) { return m.watcher, nil }, newDebounceTimer: func() <-chan time.Time { - return time.After(10 * time.Millisecond) + ch := make(chan time.Time, 1) + ch <- time.Now() + return ch }, } // WHEN From 8ea2c388c9b7d03ff98099efac0cda1a0f976d0f Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Fri, 27 Oct 2023 14:02:35 -0700 Subject: [PATCH 07/22] removed unused parameter --- internal/pkg/cli/file/watch.go | 2 +- internal/pkg/cli/run_local.go | 8 ++++---- internal/pkg/cli/run_local_test.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/pkg/cli/file/watch.go b/internal/pkg/cli/file/watch.go index c5e03acebd6..b8f788430f3 100644 --- a/internal/pkg/cli/file/watch.go +++ b/internal/pkg/cli/file/watch.go @@ -21,7 +21,7 @@ type RecursiveWatcher struct { } // NewRecursiveWatcher returns a RecursiveWatcher which notifies when changes are made to files inside a recursive directory tree. -func NewRecursiveWatcher(dir string) (*RecursiveWatcher, error) { +func NewRecursiveWatcher() (*RecursiveWatcher, error) { watcher, err := fsnotify.NewWatcher() if err != nil { return nil, err diff --git a/internal/pkg/cli/run_local.go b/internal/pkg/cli/run_local.go index 5e4a8d3118a..e63b214111b 100644 --- a/internal/pkg/cli/run_local.go +++ b/internal/pkg/cli/run_local.go @@ -108,7 +108,7 @@ type runLocalOpts struct { orchestrator containerOrchestrator hostFinder hostFinder envChecker versionCompatibilityChecker - newRecursiveWatcher func(path string) (recursiveWatcher, error) + newRecursiveWatcher func() (recursiveWatcher, error) newDebounceTimer func() <-chan time.Time buildContainerImages func(mft manifest.DynamicWorkload) (map[string]string, error) @@ -233,8 +233,8 @@ func newRunLocalOpts(vars runLocalVars) (*runLocalOpts, error) { } return containerURIs, nil } - o.newRecursiveWatcher = func(path string) (recursiveWatcher, error) { - return file.NewRecursiveWatcher(path) + o.newRecursiveWatcher = func() (recursiveWatcher, error) { + return file.NewRecursiveWatcher() } o.newDebounceTimer = func() <-chan time.Time { return time.After(5 * time.Second) @@ -453,7 +453,7 @@ func (o *runLocalOpts) prepareTask(ctx context.Context) (orchestrator.Task, erro func (o *runLocalOpts) watchLocalFiles(watchCh chan<- interface{}, watchErrCh chan<- error, stopCh <-chan struct{}) error { copilotDir := o.ws.Path() - watcher, err := o.newRecursiveWatcher(copilotDir) + watcher, err := o.newRecursiveWatcher() if err != nil { return err } diff --git a/internal/pkg/cli/run_local_test.go b/internal/pkg/cli/run_local_test.go index 811dd0eca8b..13fb271b1ff 100644 --- a/internal/pkg/cli/run_local_test.go +++ b/internal/pkg/cli/run_local_test.go @@ -785,7 +785,7 @@ func TestRunLocalOpts_Execute(t *testing.T) { orchestrator: m.orchestrator, hostFinder: m.hostFinder, envChecker: m.envChecker, - newRecursiveWatcher: func(string) (recursiveWatcher, error) { + newRecursiveWatcher: func() (recursiveWatcher, error) { return m.watcher, nil }, newDebounceTimer: func() <-chan time.Time { From 142bc120463678cb8fe196f72ede10b43108a9c5 Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Mon, 30 Oct 2023 15:23:33 -0700 Subject: [PATCH 08/22] address feedback --- internal/pkg/cli/file/watch.go | 13 ++++++++++ internal/pkg/cli/run_local.go | 40 ++++++++++++++++++------------ internal/pkg/cli/run_local_test.go | 7 +----- 3 files changed, 38 insertions(+), 22 deletions(-) diff --git a/internal/pkg/cli/file/watch.go b/internal/pkg/cli/file/watch.go index b8f788430f3..08c881345ae 100644 --- a/internal/pkg/cli/file/watch.go +++ b/internal/pkg/cli/file/watch.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" + "github.com/aws/copilot-cli/internal/pkg/term/log" "github.com/fsnotify/fsnotify" ) @@ -99,17 +100,28 @@ func (rw *RecursiveWatcher) start() { for { select { case event := <-rw.fsnotifyWatcher.Events: + if event.Has(fsnotify.Write) { + // cannot be a directory, skip call to os.Stat + rw.events <- event + return + } + info, err := os.Stat(event.Name) if err != nil { + log.Error(err) break } + if info.IsDir() { + // handle recursive watch switch event.Op { case fsnotify.Create: err := rw.Add(event.Name) if err != nil { rw.errors <- err } + case fsnotify.Rename: + fallthrough case fsnotify.Remove: err := rw.Remove(event.Name) if err != nil { @@ -117,6 +129,7 @@ func (rw *RecursiveWatcher) start() { } } } else { + // not a directory, pass event rw.events <- event } case err := <-rw.fsnotifyWatcher.Errors: diff --git a/internal/pkg/cli/run_local.go b/internal/pkg/cli/run_local.go index e63b214111b..433ffeb05e4 100644 --- a/internal/pkg/cli/run_local.go +++ b/internal/pkg/cli/run_local.go @@ -108,8 +108,8 @@ type runLocalOpts struct { orchestrator containerOrchestrator hostFinder hostFinder envChecker versionCompatibilityChecker + debounceTime time.Duration newRecursiveWatcher func() (recursiveWatcher, error) - newDebounceTimer func() <-chan time.Time buildContainerImages func(mft manifest.DynamicWorkload) (map[string]string, error) configureClients func() error @@ -233,12 +233,10 @@ func newRunLocalOpts(vars runLocalVars) (*runLocalOpts, error) { } return containerURIs, nil } + o.debounceTime = 5 * time.Second o.newRecursiveWatcher = func() (recursiveWatcher, error) { return file.NewRecursiveWatcher() } - o.newDebounceTimer = func() <-chan time.Time { - return time.After(5 * time.Second) - } return o, nil } @@ -326,11 +324,12 @@ func (o *runLocalOpts) Execute() error { errCh := o.orchestrator.Start() o.orchestrator.RunTask(task) + var watchCh <-chan interface{} + var watchErrCh <-chan error stopCh := make(chan struct{}) - watchCh := make(chan interface{}) - watchErrCh := make(chan error) if o.watch { - if err := o.watchLocalFiles(watchCh, watchErrCh, stopCh); err != nil { + watchCh, watchErrCh, err = o.watchLocalFiles(stopCh) + if err != nil { return fmt.Errorf("setup watch: %s", err) } } @@ -450,22 +449,26 @@ func (o *runLocalOpts) prepareTask(ctx context.Context) (orchestrator.Task, erro return task, nil } -func (o *runLocalOpts) watchLocalFiles(watchCh chan<- interface{}, watchErrCh chan<- error, stopCh <-chan struct{}) error { +func (o *runLocalOpts) watchLocalFiles(stopCh <-chan struct{}) (<-chan interface{}, <-chan error, error) { copilotDir := o.ws.Path() + watchCh := make(chan interface{}) + watchErrCh := make(chan error) + watcher, err := o.newRecursiveWatcher() if err != nil { - return err + return nil, nil, err } if err = watcher.Add(copilotDir); err != nil { - return err + return nil, nil, err } watcherEvents := watcher.Events() watcherErrors := watcher.Errors() - var debounceCh <-chan time.Time + debounceTimer := time.NewTimer(0) + <-debounceTimer.C // flush channel to create timer for later use with Reset() go func() { for { @@ -485,22 +488,27 @@ func (o *runLocalOpts) watchLocalFiles(watchCh chan<- interface{}, watchErrCh ch return } + // skip chmod events + if event.Has(fsnotify.Chmod) { + break + } + isHidden, err := file.IsHiddenFile(event.Name) if err != nil { break } + // TODO(Aiden): implement dockerignore blacklist for update - // Only update on Write operations to non-hidden files - if event.Has(fsnotify.Write) && !isHidden { - debounceCh = o.newDebounceTimer() + if !isHidden { + debounceTimer.Reset(o.debounceTime) } - case <-debounceCh: + case <-debounceTimer.C: watchCh <- nil } } }() - return nil + return watchCh, watchErrCh, nil } type containerEnv map[string]envVarValue diff --git a/internal/pkg/cli/run_local_test.go b/internal/pkg/cli/run_local_test.go index 13fb271b1ff..63955c13cf9 100644 --- a/internal/pkg/cli/run_local_test.go +++ b/internal/pkg/cli/run_local_test.go @@ -9,7 +9,6 @@ import ( "fmt" "syscall" "testing" - "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -785,14 +784,10 @@ func TestRunLocalOpts_Execute(t *testing.T) { orchestrator: m.orchestrator, hostFinder: m.hostFinder, envChecker: m.envChecker, + debounceTime: 0, // disable debounce during testing newRecursiveWatcher: func() (recursiveWatcher, error) { return m.watcher, nil }, - newDebounceTimer: func() <-chan time.Time { - ch := make(chan time.Time, 1) - ch <- time.Now() - return ch - }, } // WHEN err := opts.Execute() From 80aa093bcd9e9acb218d18a80720b39fbcff67ea Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Fri, 3 Nov 2023 09:42:45 -0700 Subject: [PATCH 09/22] dont update for hidden files and folders --- internal/pkg/cli/run_local.go | 17 ++++++++++--- internal/pkg/cli/run_local_test.go | 40 ++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/internal/pkg/cli/run_local.go b/internal/pkg/cli/run_local.go index 433ffeb05e4..9ff556b935d 100644 --- a/internal/pkg/cli/run_local.go +++ b/internal/pkg/cli/run_local.go @@ -8,6 +8,7 @@ import ( "fmt" "os" "os/signal" + "path/filepath" "slices" "strconv" "strings" @@ -493,9 +494,19 @@ func (o *runLocalOpts) watchLocalFiles(stopCh <-chan struct{}) (<-chan interface break } - isHidden, err := file.IsHiddenFile(event.Name) - if err != nil { - break + // check if any subdirectories within copilotDir are hidden + isHidden := false + parent := copilotDir + suffix, _ := strings.CutPrefix(event.Name, parent+"/") + for _, child := range strings.Split(suffix, "/") { + parent = filepath.Join(parent, child) + subdirHidden, err := file.IsHiddenFile(child) + if err != nil { + break + } + if subdirHidden { + isHidden = true + } } // TODO(Aiden): implement dockerignore blacklist for update diff --git a/internal/pkg/cli/run_local_test.go b/internal/pkg/cli/run_local_test.go index 63955c13cf9..9727fa28c35 100644 --- a/internal/pkg/cli/run_local_test.go +++ b/internal/pkg/cli/run_local_test.go @@ -581,6 +581,46 @@ func TestRunLocalOpts_Execute(t *testing.T) { } }, }, + "watch flag receives hidden file update, doesn't restart": { + inputAppName: testAppName, + inputWkldName: testWkldName, + inputEnvName: testEnvName, + inputWatch: true, + setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { + m.ecsClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) + m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil) + m.ws.EXPECT().ReadWorkloadManifest(testWkldName).Return([]byte(""), nil) + m.interpolator.EXPECT().Interpolate("").Return("", nil) + m.ws.EXPECT().Path().Return("") + + eventCh := make(chan fsnotify.Event, 1) + m.watcher.EventsFn = func() <-chan fsnotify.Event { + eventCh <- fsnotify.Event{ + Name: ".hiddensubdir/mockFilename", + Op: fsnotify.Write, + } + return eventCh + } + + watcherErrCh := make(chan error, 1) + m.watcher.ErrorsFn = func() <-chan error { + return watcherErrCh + } + + errCh := make(chan error, 1) + m.orchestrator.StartFn = func() <-chan error { + return errCh + } + + m.orchestrator.RunTaskFn = func(task orchestrator.Task) { + syscall.Kill(syscall.Getpid(), syscall.SIGINT) + } + + m.orchestrator.StopFn = func() { + close(errCh) + } + }, + }, "watch flag restarts, error for pause container definition update": { inputAppName: testAppName, inputWkldName: testWkldName, From 3ccfae279ab3253b3c7040c78b80b4925440e2e9 Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Fri, 3 Nov 2023 10:25:18 -0700 Subject: [PATCH 10/22] watcher integration test --- internal/pkg/cli/file/watch.go | 76 ++-------- .../pkg/cli/file/watch_integration_test.go | 143 ++++++++++++++++++ internal/pkg/cli/run_local.go | 4 +- 3 files changed, 160 insertions(+), 63 deletions(-) create mode 100644 internal/pkg/cli/file/watch_integration_test.go diff --git a/internal/pkg/cli/file/watch.go b/internal/pkg/cli/file/watch.go index 08c881345ae..198ae121907 100644 --- a/internal/pkg/cli/file/watch.go +++ b/internal/pkg/cli/file/watch.go @@ -5,10 +5,8 @@ package file import ( "io/fs" - "os" "path/filepath" - "github.com/aws/copilot-cli/internal/pkg/term/log" "github.com/fsnotify/fsnotify" ) @@ -22,14 +20,14 @@ type RecursiveWatcher struct { } // NewRecursiveWatcher returns a RecursiveWatcher which notifies when changes are made to files inside a recursive directory tree. -func NewRecursiveWatcher() (*RecursiveWatcher, error) { - watcher, err := fsnotify.NewWatcher() +func NewRecursiveWatcher(buffer uint) (*RecursiveWatcher, error) { + watcher, err := fsnotify.NewBufferedWatcher(buffer) if err != nil { return nil, err } rw := &RecursiveWatcher{ - events: make(chan fsnotify.Event), + events: make(chan fsnotify.Event, buffer), errors: make(chan error), fsnotifyWatcher: watcher, done: make(chan struct{}), @@ -46,37 +44,15 @@ func (rw *RecursiveWatcher) Add(path string) error { if rw.closed { return fsnotify.ErrClosed } - if err := filepath.WalkDir(path, func(path string, d fs.DirEntry, err error) error { + return filepath.WalkDir(path, func(p string, d fs.DirEntry, err error) error { if err != nil { - return err + return nil } if d.IsDir() { - return rw.fsnotifyWatcher.Add(path) + return rw.fsnotifyWatcher.Add(p) } return nil - }); err != nil { - return err - } - return nil -} - -// Remove recursively removes a directory tree from the list of watched files. -func (rw *RecursiveWatcher) Remove(path string) error { - if rw.closed { - return nil - } - if err := filepath.WalkDir(path, func(path string, d fs.DirEntry, err error) error { - if err != nil { - return err - } - if d.IsDir() { - return rw.fsnotifyWatcher.Remove(path) - } - return nil - }); err != nil { - return err - } - return nil + }) } // Events returns the events channel. @@ -100,37 +76,15 @@ func (rw *RecursiveWatcher) start() { for { select { case event := <-rw.fsnotifyWatcher.Events: - if event.Has(fsnotify.Write) { - // cannot be a directory, skip call to os.Stat - rw.events <- event - return - } - - info, err := os.Stat(event.Name) - if err != nil { - log.Error(err) - break - } - - if info.IsDir() { - // handle recursive watch - switch event.Op { - case fsnotify.Create: - err := rw.Add(event.Name) - if err != nil { - rw.errors <- err - } - case fsnotify.Rename: - fallthrough - case fsnotify.Remove: - err := rw.Remove(event.Name) - if err != nil { - rw.errors <- err - } + rw.events <- event + + // handle recursive watch + switch event.Op { + case fsnotify.Create: + err := rw.Add(event.Name) + if err != nil { + rw.errors <- err } - } else { - // not a directory, pass event - rw.events <- event } case err := <-rw.fsnotifyWatcher.Errors: rw.errors <- err diff --git a/internal/pkg/cli/file/watch_integration_test.go b/internal/pkg/cli/file/watch_integration_test.go new file mode 100644 index 00000000000..f60ec5e9f98 --- /dev/null +++ b/internal/pkg/cli/file/watch_integration_test.go @@ -0,0 +1,143 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package file_test + +import ( + "fmt" + "io/fs" + "os" + "testing" + "time" + + "github.com/aws/copilot-cli/internal/pkg/cli/file" + "github.com/fsnotify/fsnotify" + "github.com/stretchr/testify/require" +) + +func TestRecursiveWatcher(t *testing.T) { + var ( + watcher *file.RecursiveWatcher + tmp string + eventsExpected []fsnotify.Event + eventsActual []fsnotify.Event + ) + + tmp = os.TempDir() + eventsActual = make([]fsnotify.Event, 0) + eventsExpected = []fsnotify.Event{ + { + Name: fmt.Sprintf("%s/watch/subdir/testfile", tmp), + Op: fsnotify.Create, + }, + { + Name: fmt.Sprintf("%s/watch/subdir/testfile", tmp), + Op: fsnotify.Chmod, + }, + { + Name: fmt.Sprintf("%s/watch/subdir/testfile", tmp), + Op: fsnotify.Write, + }, + { + Name: fmt.Sprintf("%s/watch/subdir", tmp), + Op: fsnotify.Rename, + }, + { + Name: fmt.Sprintf("%s/watch/subdir2", tmp), + Op: fsnotify.Create, + }, + { + Name: fmt.Sprintf("%s/watch/subdir", tmp), + Op: fsnotify.Rename, + }, + { + Name: fmt.Sprintf("%s/watch/subdir2/testfile", tmp), + Op: fsnotify.Rename, + }, + { + Name: fmt.Sprintf("%s/watch/subdir2/testfile2", tmp), + Op: fsnotify.Create, + }, + { + Name: fmt.Sprintf("%s/watch/subdir2/testfile2", tmp), + Op: fsnotify.Remove, + }, + } + + t.Run("Setup Watcher", func(t *testing.T) { + err := os.MkdirAll(fmt.Sprintf("%s/watch/subdir", tmp), 0755) + require.NoError(t, err) + + watcher, err = file.NewRecursiveWatcher(uint(len(eventsExpected))) + require.NoError(t, err) + }) + + t.Run("Handle Events", func(t *testing.T) { + eventsCh := watcher.Events() + errorsCh := watcher.Errors() + go func() { + for { + select { + case e, ok := <-eventsCh: + if !ok { + require.Empty(t, errorsCh) + return + } + eventsActual = append(eventsActual, e) + case e, ok := <-errorsCh: + require.NoError(t, e) + if !ok { + require.Empty(t, errorsCh) + return + } + } + + } + }() + }) + + t.Run("Watch", func(t *testing.T) { + // SETUP + err := watcher.Add(fmt.Sprintf("%s/watch", tmp)) + require.NoError(t, err) + + // WATCH + f, err := os.Create(fmt.Sprintf("%s/watch/subdir/testfile", tmp)) + require.NoError(t, err) + + err = os.Chmod(fmt.Sprintf("%s/watch/subdir/testfile", tmp), 0755) + require.NoError(t, err) + + err = os.WriteFile(fmt.Sprintf("%s/watch/subdir/testfile", tmp), []byte("write to file"), fs.ModeAppend) + require.NoError(t, err) + + err = f.Close() + require.NoError(t, err) + + err = os.Rename(fmt.Sprintf("%s/watch/subdir", tmp), fmt.Sprintf("%s/watch/subdir2", tmp)) + require.NoError(t, err) + + // filepath.WalkDir is slow, wait to prevent race condition + time.Sleep(100 * time.Millisecond) + + err = os.Rename(fmt.Sprintf("%s/watch/subdir2/testfile", tmp), fmt.Sprintf("%s/watch/subdir2/testfile2", tmp)) + require.NoError(t, err) + + // filepath.WalkDir is slow, wait to prevent race condition + time.Sleep(100 * time.Millisecond) + + err = os.Remove(fmt.Sprintf("%s/watch/subdir2/testfile2", tmp)) + require.NoError(t, err) + + // CLOSE + err = watcher.Close() + require.NoError(t, err) + }) + + t.Run("Clean", func(t *testing.T) { + err := os.RemoveAll(fmt.Sprintf("%s/watch", tmp)) + require.NoError(t, err) + + require.ElementsMatch(t, eventsExpected, eventsActual) + }) +} diff --git a/internal/pkg/cli/run_local.go b/internal/pkg/cli/run_local.go index 9ff556b935d..6db192e6813 100644 --- a/internal/pkg/cli/run_local.go +++ b/internal/pkg/cli/run_local.go @@ -236,7 +236,7 @@ func newRunLocalOpts(vars runLocalVars) (*runLocalOpts, error) { } o.debounceTime = 5 * time.Second o.newRecursiveWatcher = func() (recursiveWatcher, error) { - return file.NewRecursiveWatcher() + return file.NewRecursiveWatcher(1) } return o, nil } @@ -494,7 +494,7 @@ func (o *runLocalOpts) watchLocalFiles(stopCh <-chan struct{}) (<-chan interface break } - // check if any subdirectories within copilotDir are hidden + // check if any subdirectories within copilot directory are hidden isHidden := false parent := copilotDir suffix, _ := strings.CutPrefix(event.Name, parent+"/") From 036405215a3ed204ef75682230168ddd6a53c75f Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Fri, 3 Nov 2023 10:26:29 -0700 Subject: [PATCH 11/22] localintegration build tag --- internal/pkg/cli/file/watch_integration_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/pkg/cli/file/watch_integration_test.go b/internal/pkg/cli/file/watch_integration_test.go index f60ec5e9f98..428fb8b8f5c 100644 --- a/internal/pkg/cli/file/watch_integration_test.go +++ b/internal/pkg/cli/file/watch_integration_test.go @@ -1,3 +1,5 @@ +//go:build integration || localintegration + // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 From 52ca8e70c812414bf81e383bab94d1122c92861d Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Fri, 3 Nov 2023 11:02:53 -0700 Subject: [PATCH 12/22] fix merge errors --- internal/pkg/cli/run_local_test.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/internal/pkg/cli/run_local_test.go b/internal/pkg/cli/run_local_test.go index a977cf60079..58a7ecf651c 100644 --- a/internal/pkg/cli/run_local_test.go +++ b/internal/pkg/cli/run_local_test.go @@ -474,11 +474,10 @@ func TestRunLocalOpts_Execute(t *testing.T) { }, wantedError: errors.New(`interpolate environment variables for testWkld manifest: some error`), }, - "error getting hosts to proxy to": { + "error building container images": { inputAppName: testAppName, inputWkldName: testWkldName, inputEnvName: testEnvName, - inputProxy: true, setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { m.ecsClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil) @@ -491,7 +490,7 @@ func TestRunLocalOpts_Execute(t *testing.T) { inputAppName: testAppName, inputWkldName: testWkldName, inputEnvName: testEnvName, - inProxy: true, + inputProxy: true, setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { m.ecsClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil) @@ -505,7 +504,7 @@ func TestRunLocalOpts_Execute(t *testing.T) { inputAppName: testAppName, inputWkldName: testWkldName, inputEnvName: testEnvName, - inProxy: true, + inputProxy: true, setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { m.ecsClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil) @@ -519,7 +518,7 @@ func TestRunLocalOpts_Execute(t *testing.T) { inputAppName: testAppName, inputWkldName: testWkldName, inputEnvName: testEnvName, - inProxy: true, + inputProxy: true, setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { m.ecsClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil) From 497df054361424ef64d139cfb5f097a5799d16a0 Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Fri, 3 Nov 2023 12:31:43 -0700 Subject: [PATCH 13/22] remove race conditions --- internal/pkg/cli/file/watch.go | 25 ++++++--- .../pkg/cli/file/watch_integration_test.go | 56 ++++++------------- internal/pkg/cli/run_local.go | 4 +- 3 files changed, 37 insertions(+), 48 deletions(-) diff --git a/internal/pkg/cli/file/watch.go b/internal/pkg/cli/file/watch.go index 198ae121907..81ad7bc7fb0 100644 --- a/internal/pkg/cli/file/watch.go +++ b/internal/pkg/cli/file/watch.go @@ -20,14 +20,14 @@ type RecursiveWatcher struct { } // NewRecursiveWatcher returns a RecursiveWatcher which notifies when changes are made to files inside a recursive directory tree. -func NewRecursiveWatcher(buffer uint) (*RecursiveWatcher, error) { - watcher, err := fsnotify.NewBufferedWatcher(buffer) +func NewRecursiveWatcher() (*RecursiveWatcher, error) { + watcher, err := fsnotify.NewBufferedWatcher(1) if err != nil { return nil, err } rw := &RecursiveWatcher{ - events: make(chan fsnotify.Event, buffer), + events: make(chan fsnotify.Event, 1), errors: make(chan error), fsnotifyWatcher: watcher, done: make(chan struct{}), @@ -67,17 +67,24 @@ func (rw *RecursiveWatcher) Errors() <-chan error { // Close closes the RecursiveWatcher. func (rw *RecursiveWatcher) Close() error { - rw.closed = true - close(rw.done) - return rw.fsnotifyWatcher.Close() + for { + select { + case event := <-rw.fsnotifyWatcher.Events: + rw.events <- event + case err := <-rw.fsnotifyWatcher.Errors: + rw.errors <- err + default: + rw.closed = true + close(rw.done) + return rw.fsnotifyWatcher.Close() + } + } } func (rw *RecursiveWatcher) start() { for { select { case event := <-rw.fsnotifyWatcher.Events: - rw.events <- event - // handle recursive watch switch event.Op { case fsnotify.Create: @@ -86,6 +93,8 @@ func (rw *RecursiveWatcher) start() { rw.errors <- err } } + + rw.events <- event case err := <-rw.fsnotifyWatcher.Errors: rw.errors <- err case <-rw.done: diff --git a/internal/pkg/cli/file/watch_integration_test.go b/internal/pkg/cli/file/watch_integration_test.go index 428fb8b8f5c..b5d732baf22 100644 --- a/internal/pkg/cli/file/watch_integration_test.go +++ b/internal/pkg/cli/file/watch_integration_test.go @@ -1,5 +1,3 @@ -//go:build integration || localintegration - // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 @@ -10,7 +8,6 @@ import ( "io/fs" "os" "testing" - "time" "github.com/aws/copilot-cli/internal/pkg/cli/file" "github.com/fsnotify/fsnotify" @@ -70,76 +67,59 @@ func TestRecursiveWatcher(t *testing.T) { err := os.MkdirAll(fmt.Sprintf("%s/watch/subdir", tmp), 0755) require.NoError(t, err) - watcher, err = file.NewRecursiveWatcher(uint(len(eventsExpected))) + watcher, err = file.NewRecursiveWatcher() require.NoError(t, err) }) - t.Run("Handle Events", func(t *testing.T) { - eventsCh := watcher.Events() - errorsCh := watcher.Errors() - go func() { - for { - select { - case e, ok := <-eventsCh: - if !ok { - require.Empty(t, errorsCh) - return - } - eventsActual = append(eventsActual, e) - case e, ok := <-errorsCh: - require.NoError(t, e) - if !ok { - require.Empty(t, errorsCh) - return - } - } - - } - }() - }) - t.Run("Watch", func(t *testing.T) { // SETUP err := watcher.Add(fmt.Sprintf("%s/watch", tmp)) require.NoError(t, err) + eventsCh := watcher.Events() + errorsCh := watcher.Errors() + // WATCH - f, err := os.Create(fmt.Sprintf("%s/watch/subdir/testfile", tmp)) + file, err := os.Create(fmt.Sprintf("%s/watch/subdir/testfile", tmp)) require.NoError(t, err) + eventsActual = append(eventsActual, <-eventsCh) err = os.Chmod(fmt.Sprintf("%s/watch/subdir/testfile", tmp), 0755) require.NoError(t, err) + eventsActual = append(eventsActual, <-eventsCh) err = os.WriteFile(fmt.Sprintf("%s/watch/subdir/testfile", tmp), []byte("write to file"), fs.ModeAppend) require.NoError(t, err) + eventsActual = append(eventsActual, <-eventsCh) - err = f.Close() + err = file.Close() require.NoError(t, err) err = os.Rename(fmt.Sprintf("%s/watch/subdir", tmp), fmt.Sprintf("%s/watch/subdir2", tmp)) require.NoError(t, err) - - // filepath.WalkDir is slow, wait to prevent race condition - time.Sleep(100 * time.Millisecond) + eventsActual = append(eventsActual, <-eventsCh) + eventsActual = append(eventsActual, <-eventsCh) + eventsActual = append(eventsActual, <-eventsCh) err = os.Rename(fmt.Sprintf("%s/watch/subdir2/testfile", tmp), fmt.Sprintf("%s/watch/subdir2/testfile2", tmp)) require.NoError(t, err) - - // filepath.WalkDir is slow, wait to prevent race condition - time.Sleep(100 * time.Millisecond) + eventsActual = append(eventsActual, <-eventsCh) + eventsActual = append(eventsActual, <-eventsCh) err = os.Remove(fmt.Sprintf("%s/watch/subdir2/testfile2", tmp)) require.NoError(t, err) + eventsActual = append(eventsActual, <-eventsCh) // CLOSE err = watcher.Close() require.NoError(t, err) + require.Empty(t, errorsCh) + + require.Equal(t, eventsExpected, eventsActual) }) t.Run("Clean", func(t *testing.T) { err := os.RemoveAll(fmt.Sprintf("%s/watch", tmp)) require.NoError(t, err) - - require.ElementsMatch(t, eventsExpected, eventsActual) }) } diff --git a/internal/pkg/cli/run_local.go b/internal/pkg/cli/run_local.go index c26ff7bc899..3afe55c286b 100644 --- a/internal/pkg/cli/run_local.go +++ b/internal/pkg/cli/run_local.go @@ -236,7 +236,7 @@ func newRunLocalOpts(vars runLocalVars) (*runLocalOpts, error) { } o.debounceTime = 5 * time.Second o.newRecursiveWatcher = func() (recursiveWatcher, error) { - return file.NewRecursiveWatcher(1) + return file.NewRecursiveWatcher() } return o, nil } @@ -529,7 +529,7 @@ func (o *runLocalOpts) watchLocalFiles(stopCh <-chan struct{}) (<-chan interface return watchCh, watchErrCh, nil } - + func sessionEnvVars(ctx context.Context, sess *session.Session) (map[string]string, error) { creds, err := sess.Config.Credentials.GetWithContext(ctx) if err != nil { From b09f63a828ad954015faecf65baf4788019a1b9b Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Fri, 3 Nov 2023 13:42:21 -0700 Subject: [PATCH 14/22] fix merge change causing tests to time out --- internal/pkg/cli/run_local_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/pkg/cli/run_local_test.go b/internal/pkg/cli/run_local_test.go index 58a7ecf651c..55295121eb4 100644 --- a/internal/pkg/cli/run_local_test.go +++ b/internal/pkg/cli/run_local_test.go @@ -475,9 +475,10 @@ func TestRunLocalOpts_Execute(t *testing.T) { wantedError: errors.New(`interpolate environment variables for testWkld manifest: some error`), }, "error building container images": { - inputAppName: testAppName, - inputWkldName: testWkldName, - inputEnvName: testEnvName, + inputAppName: testAppName, + inputWkldName: testWkldName, + inputEnvName: testEnvName, + buildImagesError: errors.New("some error"), setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { m.ecsClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil) From e0c82a8545f00ae734bdcbe5fb51de0b7e41a321 Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Fri, 3 Nov 2023 13:56:33 -0700 Subject: [PATCH 15/22] buffer watcher for tests --- internal/pkg/cli/file/watch.go | 6 +++--- internal/pkg/cli/file/watch_integration_test.go | 9 ++++++++- internal/pkg/cli/run_local.go | 2 +- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/internal/pkg/cli/file/watch.go b/internal/pkg/cli/file/watch.go index 81ad7bc7fb0..5b1a312194e 100644 --- a/internal/pkg/cli/file/watch.go +++ b/internal/pkg/cli/file/watch.go @@ -20,14 +20,14 @@ type RecursiveWatcher struct { } // NewRecursiveWatcher returns a RecursiveWatcher which notifies when changes are made to files inside a recursive directory tree. -func NewRecursiveWatcher() (*RecursiveWatcher, error) { - watcher, err := fsnotify.NewBufferedWatcher(1) +func NewRecursiveWatcher(buffer uint) (*RecursiveWatcher, error) { + watcher, err := fsnotify.NewBufferedWatcher(buffer) if err != nil { return nil, err } rw := &RecursiveWatcher{ - events: make(chan fsnotify.Event, 1), + events: make(chan fsnotify.Event, buffer), errors: make(chan error), fsnotifyWatcher: watcher, done: make(chan struct{}), diff --git a/internal/pkg/cli/file/watch_integration_test.go b/internal/pkg/cli/file/watch_integration_test.go index b5d732baf22..126d1b73780 100644 --- a/internal/pkg/cli/file/watch_integration_test.go +++ b/internal/pkg/cli/file/watch_integration_test.go @@ -1,3 +1,5 @@ +//go:build integration || localintegration + // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 @@ -37,6 +39,10 @@ func TestRecursiveWatcher(t *testing.T) { Name: fmt.Sprintf("%s/watch/subdir/testfile", tmp), Op: fsnotify.Write, }, + { + Name: fmt.Sprintf("%s/watch/subdir/testfile", tmp), + Op: fsnotify.Write, + }, { Name: fmt.Sprintf("%s/watch/subdir", tmp), Op: fsnotify.Rename, @@ -67,7 +73,7 @@ func TestRecursiveWatcher(t *testing.T) { err := os.MkdirAll(fmt.Sprintf("%s/watch/subdir", tmp), 0755) require.NoError(t, err) - watcher, err = file.NewRecursiveWatcher() + watcher, err = file.NewRecursiveWatcher(10) require.NoError(t, err) }) @@ -91,6 +97,7 @@ func TestRecursiveWatcher(t *testing.T) { err = os.WriteFile(fmt.Sprintf("%s/watch/subdir/testfile", tmp), []byte("write to file"), fs.ModeAppend) require.NoError(t, err) eventsActual = append(eventsActual, <-eventsCh) + eventsActual = append(eventsActual, <-eventsCh) err = file.Close() require.NoError(t, err) diff --git a/internal/pkg/cli/run_local.go b/internal/pkg/cli/run_local.go index 3afe55c286b..2a4105cefb5 100644 --- a/internal/pkg/cli/run_local.go +++ b/internal/pkg/cli/run_local.go @@ -236,7 +236,7 @@ func newRunLocalOpts(vars runLocalVars) (*runLocalOpts, error) { } o.debounceTime = 5 * time.Second o.newRecursiveWatcher = func() (recursiveWatcher, error) { - return file.NewRecursiveWatcher() + return file.NewRecursiveWatcher(0) } return o, nil } From 28c34b77e6c65ee941136a7047fbe9598feed0e2 Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Fri, 3 Nov 2023 14:31:20 -0700 Subject: [PATCH 16/22] make integration test non-blocking on failure --- .../pkg/cli/file/watch_integration_test.go | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/internal/pkg/cli/file/watch_integration_test.go b/internal/pkg/cli/file/watch_integration_test.go index 126d1b73780..c2d37e74962 100644 --- a/internal/pkg/cli/file/watch_integration_test.go +++ b/internal/pkg/cli/file/watch_integration_test.go @@ -10,6 +10,7 @@ import ( "io/fs" "os" "testing" + "time" "github.com/aws/copilot-cli/internal/pkg/cli/file" "github.com/fsnotify/fsnotify" @@ -39,10 +40,6 @@ func TestRecursiveWatcher(t *testing.T) { Name: fmt.Sprintf("%s/watch/subdir/testfile", tmp), Op: fsnotify.Write, }, - { - Name: fmt.Sprintf("%s/watch/subdir/testfile", tmp), - Op: fsnotify.Write, - }, { Name: fmt.Sprintf("%s/watch/subdir", tmp), Op: fsnotify.Rename, @@ -73,7 +70,7 @@ func TestRecursiveWatcher(t *testing.T) { err := os.MkdirAll(fmt.Sprintf("%s/watch/subdir", tmp), 0755) require.NoError(t, err) - watcher, err = file.NewRecursiveWatcher(10) + watcher, err = file.NewRecursiveWatcher(uint(len(eventsExpected))) require.NoError(t, err) }) @@ -85,37 +82,45 @@ func TestRecursiveWatcher(t *testing.T) { eventsCh := watcher.Events() errorsCh := watcher.Errors() + expectEvents := func(t *testing.T, n int) []fsnotify.Event { + receivedEvents := []fsnotify.Event{} + for i := 0; i < n; i++ { + select { + case e := <-eventsCh: + receivedEvents = append(receivedEvents, e) + case <-time.After(time.Second): + } + } + return receivedEvents + } + // WATCH file, err := os.Create(fmt.Sprintf("%s/watch/subdir/testfile", tmp)) require.NoError(t, err) - eventsActual = append(eventsActual, <-eventsCh) + eventsActual = append(eventsActual, expectEvents(t, 1)...) err = os.Chmod(fmt.Sprintf("%s/watch/subdir/testfile", tmp), 0755) require.NoError(t, err) - eventsActual = append(eventsActual, <-eventsCh) + eventsActual = append(eventsActual, expectEvents(t, 1)...) err = os.WriteFile(fmt.Sprintf("%s/watch/subdir/testfile", tmp), []byte("write to file"), fs.ModeAppend) require.NoError(t, err) - eventsActual = append(eventsActual, <-eventsCh) - eventsActual = append(eventsActual, <-eventsCh) + eventsActual = append(eventsActual, expectEvents(t, 2)...) err = file.Close() require.NoError(t, err) err = os.Rename(fmt.Sprintf("%s/watch/subdir", tmp), fmt.Sprintf("%s/watch/subdir2", tmp)) require.NoError(t, err) - eventsActual = append(eventsActual, <-eventsCh) - eventsActual = append(eventsActual, <-eventsCh) - eventsActual = append(eventsActual, <-eventsCh) + eventsActual = append(eventsActual, expectEvents(t, 3)...) err = os.Rename(fmt.Sprintf("%s/watch/subdir2/testfile", tmp), fmt.Sprintf("%s/watch/subdir2/testfile2", tmp)) require.NoError(t, err) - eventsActual = append(eventsActual, <-eventsCh) - eventsActual = append(eventsActual, <-eventsCh) + eventsActual = append(eventsActual, expectEvents(t, 2)...) err = os.Remove(fmt.Sprintf("%s/watch/subdir2/testfile2", tmp)) require.NoError(t, err) - eventsActual = append(eventsActual, <-eventsCh) + eventsActual = append(eventsActual, expectEvents(t, 1)...) // CLOSE err = watcher.Close() From edcc10fa8536851369b0bd854d566cafcde65c89 Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Fri, 3 Nov 2023 14:56:33 -0700 Subject: [PATCH 17/22] two events --- internal/pkg/cli/file/watch_integration_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/pkg/cli/file/watch_integration_test.go b/internal/pkg/cli/file/watch_integration_test.go index c2d37e74962..4d55e468d5a 100644 --- a/internal/pkg/cli/file/watch_integration_test.go +++ b/internal/pkg/cli/file/watch_integration_test.go @@ -40,6 +40,10 @@ func TestRecursiveWatcher(t *testing.T) { Name: fmt.Sprintf("%s/watch/subdir/testfile", tmp), Op: fsnotify.Write, }, + { + Name: fmt.Sprintf("%s/watch/subdir/testfile", tmp), + Op: fsnotify.Write, + }, { Name: fmt.Sprintf("%s/watch/subdir", tmp), Op: fsnotify.Rename, From 563bdc305899daf2b93be1668a2ed035b23994d3 Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Mon, 6 Nov 2023 09:22:03 -0800 Subject: [PATCH 18/22] address adi feedback --- internal/pkg/cli/file/watch.go | 9 +++++---- internal/pkg/cli/run_local.go | 7 +++++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/internal/pkg/cli/file/watch.go b/internal/pkg/cli/file/watch.go index 5b1a312194e..e7e4db6eafa 100644 --- a/internal/pkg/cli/file/watch.go +++ b/internal/pkg/cli/file/watch.go @@ -46,6 +46,7 @@ func (rw *RecursiveWatcher) Add(path string) error { } return filepath.WalkDir(path, func(p string, d fs.DirEntry, err error) error { if err != nil { + // swallow error from WalkDir, don't attempt to add to watcher. return nil } if d.IsDir() { @@ -84,6 +85,10 @@ func (rw *RecursiveWatcher) Close() error { func (rw *RecursiveWatcher) start() { for { select { + case <-rw.done: + close(rw.events) + close(rw.errors) + return case event := <-rw.fsnotifyWatcher.Events: // handle recursive watch switch event.Op { @@ -97,10 +102,6 @@ func (rw *RecursiveWatcher) start() { rw.events <- event case err := <-rw.fsnotifyWatcher.Errors: rw.errors <- err - case <-rw.done: - close(rw.events) - close(rw.errors) - return } } } diff --git a/internal/pkg/cli/run_local.go b/internal/pkg/cli/run_local.go index 2a4105cefb5..e592e3ce5c8 100644 --- a/internal/pkg/cli/run_local.go +++ b/internal/pkg/cli/run_local.go @@ -476,8 +476,11 @@ func (o *runLocalOpts) watchLocalFiles(stopCh <-chan struct{}) (<-chan interface watcherEvents := watcher.Events() watcherErrors := watcher.Errors() - debounceTimer := time.NewTimer(0) - <-debounceTimer.C // flush channel to create timer for later use with Reset() + debounceTimer := time.NewTimer(o.debounceTime) + if !debounceTimer.Stop() { + // flush the timer in case stop is called after the timer finishes + <-debounceTimer.C + } go func() { for { From c361a7df87f0a2d03aaac672f1555fcf5709c1cc Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Mon, 6 Nov 2023 14:43:47 -0800 Subject: [PATCH 19/22] make watcher close not handle outstanding events/errors --- internal/pkg/cli/file/watch.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/internal/pkg/cli/file/watch.go b/internal/pkg/cli/file/watch.go index e7e4db6eafa..ee58728ecf0 100644 --- a/internal/pkg/cli/file/watch.go +++ b/internal/pkg/cli/file/watch.go @@ -68,18 +68,12 @@ func (rw *RecursiveWatcher) Errors() <-chan error { // Close closes the RecursiveWatcher. func (rw *RecursiveWatcher) Close() error { - for { - select { - case event := <-rw.fsnotifyWatcher.Events: - rw.events <- event - case err := <-rw.fsnotifyWatcher.Errors: - rw.errors <- err - default: - rw.closed = true - close(rw.done) - return rw.fsnotifyWatcher.Close() - } + if !rw.closed { + rw.closed = true + close(rw.done) + return rw.fsnotifyWatcher.Close() } + return nil } func (rw *RecursiveWatcher) start() { From 64392297c24c5628306ccb4332ec1358c6dfeebd Mon Sep 17 00:00:00 2001 From: Aiden Carpenter <72675057+CaptainCarpensir@users.noreply.github.com> Date: Mon, 6 Nov 2023 16:06:25 -0800 Subject: [PATCH 20/22] Update internal/pkg/cli/file/watch.go Co-authored-by: Adithya Kolla <71282729+KollaAdithya@users.noreply.github.com> --- internal/pkg/cli/file/watch.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/pkg/cli/file/watch.go b/internal/pkg/cli/file/watch.go index ee58728ecf0..553ea34fb31 100644 --- a/internal/pkg/cli/file/watch.go +++ b/internal/pkg/cli/file/watch.go @@ -87,8 +87,7 @@ func (rw *RecursiveWatcher) start() { // handle recursive watch switch event.Op { case fsnotify.Create: - err := rw.Add(event.Name) - if err != nil { + if err := rw.Add(event.Name); err != nil { rw.errors <- err } } From 193b84ec15dec83a7c5e27493ddd59c2c2d85900 Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Tue, 7 Nov 2023 10:21:04 -0800 Subject: [PATCH 21/22] fix merge changes --- internal/pkg/cli/run_local_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/pkg/cli/run_local_test.go b/internal/pkg/cli/run_local_test.go index 559580de76f..9f8a5fc48f2 100644 --- a/internal/pkg/cli/run_local_test.go +++ b/internal/pkg/cli/run_local_test.go @@ -315,7 +315,7 @@ func TestRunLocalOpts_Execute(t *testing.T) { }, }, } - alteredTaskDef := &ecs.TaskDefinition{ + alteredTaskDef := &awsecs.TaskDefinition{ ContainerDefinitions: []*sdkecs.ContainerDefinition{ { Name: aws.String("foo"), @@ -526,7 +526,7 @@ func TestRunLocalOpts_Execute(t *testing.T) { m.ws.EXPECT().ReadWorkloadManifest(testWkldName).Return([]byte(""), nil) m.interpolator.EXPECT().Interpolate("").Return("", nil) m.envChecker.EXPECT().Version().Return("v1.32.0", nil) - m.hostFinder.HostsFn = func(ctx context.Context) ([]host, error) { + m.hostFinder.HostsFn = func(ctx context.Context) ([]orchestrator.Host, error) { return nil, fmt.Errorf("some error") } }, @@ -783,7 +783,7 @@ func TestRunLocalOpts_Execute(t *testing.T) { return errCh } - m.orchestrator.RunTaskFn = func(task orchestrator.Task) { + m.orchestrator.RunTaskFn = func(task orchestrator.Task, opts ...orchestrator.RunTaskOption) { syscall.Kill(syscall.Getpid(), syscall.SIGINT) } @@ -825,7 +825,7 @@ func TestRunLocalOpts_Execute(t *testing.T) { } count := 1 - m.orchestrator.RunTaskFn = func(task orchestrator.Task) { + m.orchestrator.RunTaskFn = func(task orchestrator.Task, opts ...orchestrator.RunTaskOption) { switch count { case 1: require.Equal(t, expectedTask, task) @@ -869,7 +869,7 @@ func TestRunLocalOpts_Execute(t *testing.T) { return errCh } - m.orchestrator.RunTaskFn = func(task orchestrator.Task) { + m.orchestrator.RunTaskFn = func(task orchestrator.Task, opts ...orchestrator.RunTaskOption) { require.Equal(t, expectedTask, task) } @@ -909,7 +909,7 @@ func TestRunLocalOpts_Execute(t *testing.T) { return errCh } runCount := 1 - m.orchestrator.RunTaskFn = func(task orchestrator.Task) { + m.orchestrator.RunTaskFn = func(task orchestrator.Task, opts ...orchestrator.RunTaskOption) { require.Equal(t, expectedTask, task) if runCount > 1 { syscall.Kill(syscall.Getpid(), syscall.SIGINT) From 6349e0ca1e56797bce4688cdf367e793e54f2fa0 Mon Sep 17 00:00:00 2001 From: Aiden Carpenter Date: Tue, 7 Nov 2023 10:56:24 -0800 Subject: [PATCH 22/22] address wanxian feedback, fix tests --- internal/pkg/cli/file/hidden.go | 2 +- internal/pkg/cli/file/watch.go | 10 +++++----- internal/pkg/cli/run_local.go | 15 ++++++++------- internal/pkg/cli/run_local_test.go | 8 ++++++++ 4 files changed, 22 insertions(+), 13 deletions(-) diff --git a/internal/pkg/cli/file/hidden.go b/internal/pkg/cli/file/hidden.go index d43aca28e5b..181a4ae012a 100644 --- a/internal/pkg/cli/file/hidden.go +++ b/internal/pkg/cli/file/hidden.go @@ -7,7 +7,7 @@ package file import "path/filepath" -// IsHiddenFile returns true if the file is hidden on non-windows. +// IsHiddenFile returns true if the file is hidden on non-windows. The filename must be non-empty. func IsHiddenFile(filename string) (bool, error) { return filepath.Base(filename)[0] == '.', nil } diff --git a/internal/pkg/cli/file/watch.go b/internal/pkg/cli/file/watch.go index 553ea34fb31..e4cb9782fb5 100644 --- a/internal/pkg/cli/file/watch.go +++ b/internal/pkg/cli/file/watch.go @@ -68,12 +68,12 @@ func (rw *RecursiveWatcher) Errors() <-chan error { // Close closes the RecursiveWatcher. func (rw *RecursiveWatcher) Close() error { - if !rw.closed { - rw.closed = true - close(rw.done) - return rw.fsnotifyWatcher.Close() + if rw.closed { + return nil } - return nil + rw.closed = true + close(rw.done) + return rw.fsnotifyWatcher.Close() } func (rw *RecursiveWatcher) start() { diff --git a/internal/pkg/cli/run_local.go b/internal/pkg/cli/run_local.go index 21529f20a47..eddbc9746ab 100644 --- a/internal/pkg/cli/run_local.go +++ b/internal/pkg/cli/run_local.go @@ -356,18 +356,18 @@ func (o *runLocalOpts) Execute() error { return nil } - fmt.Printf("error: %s\n", err) + log.Errorf("error: %s\n", err) o.orchestrator.Stop() case <-sigCh: signal.Stop(sigCh) o.orchestrator.Stop() case <-watchErrCh: - fmt.Printf("watch: %s\n", err) + log.Errorf("watch: %s\n", err) o.orchestrator.Stop() case <-watchCh: task, err = o.prepareTask(ctx) if err != nil { - fmt.Printf("rerun task: %s\n", err) + log.Errorf("rerun task: %s\n", err) o.orchestrator.Stop() break } @@ -505,17 +505,17 @@ func (o *runLocalOpts) prepareTask(ctx context.Context) (orchestrator.Task, erro } func (o *runLocalOpts) watchLocalFiles(stopCh <-chan struct{}) (<-chan interface{}, <-chan error, error) { - copilotDir := o.ws.Path() + workspacePath := o.ws.Path() watchCh := make(chan interface{}) watchErrCh := make(chan error) watcher, err := o.newRecursiveWatcher() if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("file: %w", err) } - if err = watcher.Add(copilotDir); err != nil { + if err = watcher.Add(workspacePath); err != nil { return nil, nil, err } @@ -553,8 +553,9 @@ func (o *runLocalOpts) watchLocalFiles(stopCh <-chan struct{}) (<-chan interface // check if any subdirectories within copilot directory are hidden isHidden := false - parent := copilotDir + parent := workspacePath suffix, _ := strings.CutPrefix(event.Name, parent+"/") + // fsnotify events are always of form /a/b/c, don't use filepath.Split as that's OS dependent for _, child := range strings.Split(suffix, "/") { parent = filepath.Join(parent, child) subdirHidden, err := file.IsHiddenFile(child) diff --git a/internal/pkg/cli/run_local_test.go b/internal/pkg/cli/run_local_test.go index 9f8a5fc48f2..edc73a2e8f1 100644 --- a/internal/pkg/cli/run_local_test.go +++ b/internal/pkg/cli/run_local_test.go @@ -540,6 +540,8 @@ func TestRunLocalOpts_Execute(t *testing.T) { setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { m.ecsClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil) + m.ws.EXPECT().ReadWorkloadManifest(testWkldName).Return([]byte(""), nil) + m.interpolator.EXPECT().Interpolate("").Return("", nil) m.envChecker.EXPECT().Version().Return("v1.32.0", nil) m.hostFinder.HostsFn = func(ctx context.Context) ([]orchestrator.Host, error) { return []orchestrator.Host{ @@ -561,6 +563,8 @@ func TestRunLocalOpts_Execute(t *testing.T) { setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { m.ecsClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil) + m.ws.EXPECT().ReadWorkloadManifest(testWkldName).Return([]byte(""), nil) + m.interpolator.EXPECT().Interpolate("").Return("", nil) m.envChecker.EXPECT().Version().Return("v1.32.0", nil) m.hostFinder.HostsFn = func(ctx context.Context) ([]orchestrator.Host, error) { return []orchestrator.Host{ @@ -588,6 +592,8 @@ func TestRunLocalOpts_Execute(t *testing.T) { setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { m.ecsClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil) + m.ws.EXPECT().ReadWorkloadManifest(testWkldName).Return([]byte(""), nil) + m.interpolator.EXPECT().Interpolate("").Return("", nil) m.envChecker.EXPECT().Version().Return("v1.32.0", nil) m.hostFinder.HostsFn = func(ctx context.Context) ([]orchestrator.Host, error) { return []orchestrator.Host{ @@ -615,6 +621,8 @@ func TestRunLocalOpts_Execute(t *testing.T) { setupMocks: func(t *testing.T, m *runLocalExecuteMocks) { m.ecsClient.EXPECT().TaskDefinition(testAppName, testEnvName, testWkldName).Return(taskDef, nil) m.ssm.EXPECT().GetSecretValue(gomock.Any(), "mysecret").Return("secretvalue", nil) + m.ws.EXPECT().ReadWorkloadManifest(testWkldName).Return([]byte(""), nil) + m.interpolator.EXPECT().Interpolate("").Return("", nil) m.envChecker.EXPECT().Version().Return("v1.32.0", nil) m.hostFinder.HostsFn = func(ctx context.Context) ([]orchestrator.Host, error) { return []orchestrator.Host{