From 3c4f08ffd9e6d0d4e129545242d0743c323d84e4 Mon Sep 17 00:00:00 2001 From: Philippe Martin Date: Mon, 19 Jun 2023 15:01:55 +0200 Subject: [PATCH 1/4] Watch for new pods when logs --follow --- pkg/exec/exec_test.go | 5 +++ pkg/logs/logs.go | 64 ++++++++++++++++++++++++++++++--------- pkg/odo/cli/logs/logs.go | 26 ++++++++++++++-- pkg/platform/interface.go | 3 ++ pkg/platform/mock.go | 16 ++++++++++ pkg/podman/mock.go | 16 ++++++++++ pkg/podman/pods.go | 12 ++++++++ 7 files changed, 126 insertions(+), 16 deletions(-) diff --git a/pkg/exec/exec_test.go b/pkg/exec/exec_test.go index 4b5119f6deb..b95fbc97196 100644 --- a/pkg/exec/exec_test.go +++ b/pkg/exec/exec_test.go @@ -9,6 +9,7 @@ import ( "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" ) const ( @@ -48,6 +49,10 @@ func (o fakePlatform) GetPodUsingComponentName(componentName string) (*corev1.Po panic("not implemented yet") } +func (o fakePlatform) PodWatcher(ctx context.Context, selector string) (watch.Interface, error) { + return nil, nil +} + func TestExecuteCommand(t *testing.T) { for _, tt := range []struct { name string diff --git a/pkg/logs/logs.go b/pkg/logs/logs.go index 92d3eae43ca..cedbb5d452b 100644 --- a/pkg/logs/logs.go +++ b/pkg/logs/logs.go @@ -6,6 +6,7 @@ import ( "io" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" odolabels "github.com/redhat-developer/odo/pkg/labels" odocontext "github.com/redhat-developer/odo/pkg/odo/context" @@ -17,8 +18,9 @@ type LogsClient struct { } type ContainerLogs struct { - Name string - Logs io.ReadCloser + PodName string + ContainerName string + Logs io.ReadCloser } type Events struct { @@ -80,7 +82,11 @@ func (o *LogsClient) getLogsForMode( if err != nil { events.Err <- fmt.Errorf("failed to get logs for container %s; error: %v", container.Name, err) } - events.Logs <- ContainerLogs{container.Name, containerLogs} + events.Logs <- ContainerLogs{ + PodName: pod.GetName(), + ContainerName: container.Name, + Logs: containerLogs, + } } case err := <-errChan: events.Err <- err @@ -92,19 +98,43 @@ func (o *LogsClient) getLogsForMode( appname := odocontext.GetApplication(ctx) - if mode == odolabels.ComponentDevMode || mode == odolabels.ComponentAnyMode { - selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDevMode, false) - err := o.getPodsForSelector(selector, namespace, podChan) - if err != nil { - errChan <- err + getPods := func() error { + if mode == odolabels.ComponentDevMode || mode == odolabels.ComponentAnyMode { + selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDevMode, false) + err := o.getPodsForSelector(selector, namespace, podChan) + if err != nil { + return err + } } + if mode == odolabels.ComponentDeployMode || mode == odolabels.ComponentAnyMode { + selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDeployMode, false) + err := o.getPodsForSelector(selector, namespace, podChan) + if err != nil { + return err + } + } + return nil + } + + err := getPods() + if err != nil { + errChan <- err } - if mode == odolabels.ComponentDeployMode || mode == odolabels.ComponentAnyMode { - selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDeployMode, false) - err := o.getPodsForSelector(selector, namespace, podChan) + + if follow { + podWatcher, err := o.platformClient.PodWatcher(ctx, "") if err != nil { errChan <- err } + for ev := range podWatcher.ResultChan() { + switch ev.Type { + case watch.Added, watch.Modified: + err = getPods() + if err != nil { + errChan <- err + } + } + } } doneChan <- struct{}{} @@ -125,7 +155,9 @@ func (o *LogsClient) getPodsForSelector( return err } for _, pod := range podList.Items { - pods[pod.GetName()] = struct{}{} + if pod.Status.Phase == "Running" { + pods[pod.GetName()] = struct{}{} + } } // get all pods in the namespace @@ -139,11 +171,15 @@ func (o *LogsClient) getPodsForSelector( // Pod's logs have already been displayed to user continue } - podList.Items = append(podList.Items, pod) + if pod.Status.Phase == "Running" { + podList.Items = append(podList.Items, pod) + } } for _, pod := range podList.Items { - podChan <- pod + if pod.Status.Phase == "Running" { + podChan <- pod + } } return nil diff --git a/pkg/odo/cli/logs/logs.go b/pkg/odo/cli/logs/logs.go index dfc70e80fe9..5ca800cb24d 100644 --- a/pkg/odo/cli/logs/logs.go +++ b/pkg/odo/cli/logs/logs.go @@ -152,14 +152,36 @@ func (o *LogsOptions) Run(ctx context.Context) error { errChan := make(chan error) // errors are put on this channel var mu sync.Mutex + displayedLogs := map[string]struct{}{} for { select { case containerLogs := <-events.Logs: - uniqueName := getUniqueContainerName(containerLogs.Name, uniqueContainerNames) + podContainerName := fmt.Sprintf("%s-%s", containerLogs.PodName, containerLogs.ContainerName) + if _, ok := displayedLogs[podContainerName]; ok { + continue + } + displayedLogs[podContainerName] = struct{}{} + + uniqueName := getUniqueContainerName(containerLogs.ContainerName, uniqueContainerNames) uniqueContainerNames[uniqueName] = struct{}{} colour := log.ColorPicker() logs := containerLogs.Logs + func() { + mu.Lock() + defer mu.Unlock() + color.Set(colour) + defer color.Unset() + help := "" + if uniqueName != containerLogs.ContainerName { + help = fmt.Sprintf(" (%s)", uniqueName) + } + _, err = fmt.Fprintf(o.out, "--> Logs for %s / %s%s\n", containerLogs.PodName, containerLogs.ContainerName, help) + if err != nil { + errChan <- err + } + }() + if o.follow { atomic.AddInt64(&goroutines.count, 1) go func(out io.Writer) { @@ -183,7 +205,7 @@ func (o *LogsOptions) Run(ctx context.Context) error { case err = <-events.Err: return err case <-events.Done: - if goroutines.count == 0 { + if !o.follow && goroutines.count == 0 { if len(uniqueContainerNames) == 0 { // This will be the case when: // 1. user specifies --dev flag, but the component's running in Deploy mode diff --git a/pkg/platform/interface.go b/pkg/platform/interface.go index baf5f6489eb..19d1236794a 100644 --- a/pkg/platform/interface.go +++ b/pkg/platform/interface.go @@ -6,6 +6,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" ) // Client is the interface that wraps operations that can be performed on any supported platform. @@ -33,4 +34,6 @@ type Client interface { GetRunningPodFromSelector(selector string) (*corev1.Pod, error) GetPodUsingComponentName(componentName string) (*corev1.Pod, error) + + PodWatcher(ctx context.Context, selector string) (watch.Interface, error) } diff --git a/pkg/platform/mock.go b/pkg/platform/mock.go index 82cc718b363..84e0a793148 100644 --- a/pkg/platform/mock.go +++ b/pkg/platform/mock.go @@ -12,6 +12,7 @@ import ( gomock "github.com/golang/mock/gomock" v1 "k8s.io/api/core/v1" unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + watch "k8s.io/apimachinery/pkg/watch" ) // MockClient is a mock of Client interface. @@ -140,3 +141,18 @@ func (mr *MockClientMockRecorder) GetRunningPodFromSelector(selector interface{} mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRunningPodFromSelector", reflect.TypeOf((*MockClient)(nil).GetRunningPodFromSelector), selector) } + +// PodWatcher mocks base method. +func (m *MockClient) PodWatcher(ctx context.Context, selector string) (watch.Interface, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PodWatcher", ctx, selector) + ret0, _ := ret[0].(watch.Interface) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PodWatcher indicates an expected call of PodWatcher. +func (mr *MockClientMockRecorder) PodWatcher(ctx, selector interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodWatcher", reflect.TypeOf((*MockClient)(nil).PodWatcher), ctx, selector) +} diff --git a/pkg/podman/mock.go b/pkg/podman/mock.go index 0eba45104db..55bd798b2c2 100644 --- a/pkg/podman/mock.go +++ b/pkg/podman/mock.go @@ -13,6 +13,7 @@ import ( api "github.com/redhat-developer/odo/pkg/api" v1 "k8s.io/api/core/v1" unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + watch "k8s.io/apimachinery/pkg/watch" ) // MockClient is a mock of Client interface. @@ -243,6 +244,21 @@ func (mr *MockClientMockRecorder) PodStop(podname interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodStop", reflect.TypeOf((*MockClient)(nil).PodStop), podname) } +// PodWatcher mocks base method. +func (m *MockClient) PodWatcher(ctx context.Context, selector string) (watch.Interface, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PodWatcher", ctx, selector) + ret0, _ := ret[0].(watch.Interface) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PodWatcher indicates an expected call of PodWatcher. +func (mr *MockClientMockRecorder) PodWatcher(ctx, selector interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodWatcher", reflect.TypeOf((*MockClient)(nil).PodWatcher), ctx, selector) +} + // Version mocks base method. func (m *MockClient) Version(ctx context.Context) (SystemVersionReport, error) { m.ctrl.T.Helper() diff --git a/pkg/podman/pods.go b/pkg/podman/pods.go index e4bae249f4e..0d3e99e67fa 100644 --- a/pkg/podman/pods.go +++ b/pkg/podman/pods.go @@ -1,6 +1,7 @@ package podman import ( + "context" "encoding/json" "fmt" "os/exec" @@ -8,6 +9,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" "k8s.io/klog" "github.com/redhat-developer/odo/pkg/platform" @@ -33,6 +35,12 @@ func (o *PodmanCli) GetPodsMatchingSelector(selector string) (*corev1.PodList, e prefix := pod.GetName() + "-" container.Name = strings.TrimPrefix(container.Name, prefix) } + inspect, err := o.PodInspect(podReport.Name) + if err != nil { + return nil, err + } + pod.Status.Phase = corev1.PodPhase(inspect.State) + result.Items = append(result.Items, *pod) } return &result, nil @@ -129,3 +137,7 @@ func (o *PodmanCli) getPodsFromSelector(selector string) ([]ListPodsReport, erro } return list, nil } + +func (o *PodmanCli) PodWatcher(ctx context.Context, selector string) (watch.Interface, error) { + return nil, nil +} From f7e778cf98abb9a98b5700891fbc537c7ae3948c Mon Sep 17 00:00:00 2001 From: Philippe Martin Date: Tue, 20 Jun 2023 18:27:55 +0200 Subject: [PATCH 2/4] Fix integration tests --- tests/integration/cmd_logs_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/cmd_logs_test.go b/tests/integration/cmd_logs_test.go index 63abf488753..7c2f74a3241 100644 --- a/tests/integration/cmd_logs_test.go +++ b/tests/integration/cmd_logs_test.go @@ -101,7 +101,7 @@ var _ = Describe("odo logs command tests", func() { cmd := getLogCommand(podman) out := cmd.ShouldPass().Out() Expect(out).To(ContainSubstring(noContainersRunning)) - cmd = getLogCommand(podman, "--follow") + cmd = getLogCommand(podman) out = cmd.ShouldPass().Out() Expect(out).To(ContainSubstring(noContainersRunning)) }) From 1fa3d6f6207c0179b041219d8127dea2c0276a71 Mon Sep 17 00:00:00 2001 From: Philippe Martin Date: Wed, 21 Jun 2023 14:37:34 +0200 Subject: [PATCH 3/4] Implement --follow for podman platform --- pkg/odo/cli/logs/logs.go | 1 + pkg/podman/pods.go | 71 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/pkg/odo/cli/logs/logs.go b/pkg/odo/cli/logs/logs.go index 5ca800cb24d..b11fd24883b 100644 --- a/pkg/odo/cli/logs/logs.go +++ b/pkg/odo/cli/logs/logs.go @@ -192,6 +192,7 @@ func (o *LogsOptions) Run(ctx context.Context) error { if err != nil { errChan <- err } + delete(displayedLogs, podContainerName) events.Done <- struct{}{} }(o.out) } else { diff --git a/pkg/podman/pods.go b/pkg/podman/pods.go index 0d3e99e67fa..5081747c648 100644 --- a/pkg/podman/pods.go +++ b/pkg/podman/pods.go @@ -1,11 +1,14 @@ package podman import ( + "bufio" + "bytes" "context" "encoding/json" "fmt" "os/exec" "strings" + "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -26,7 +29,8 @@ func (o *PodmanCli) GetPodsMatchingSelector(selector string) (*corev1.PodList, e for _, podReport := range podsReport { pod, err := o.KubeGenerate(podReport.Name) if err != nil { - return nil, err + // The pod has disappeared in the meantime, forget it + continue } // We remove the podname- prefix from the container names as Podman adds this prefix // (to avoid colliding container names?) @@ -37,7 +41,8 @@ func (o *PodmanCli) GetPodsMatchingSelector(selector string) (*corev1.PodList, e } inspect, err := o.PodInspect(podReport.Name) if err != nil { - return nil, err + // The pod has disappeared in the meantime, forget it + continue } pod.Status.Phase = corev1.PodPhase(inspect.State) @@ -138,6 +143,66 @@ func (o *PodmanCli) getPodsFromSelector(selector string) ([]ListPodsReport, erro return list, nil } +type podWatcher struct { + stop chan struct{} + pods map[string]struct{} + events chan watch.Event +} + func (o *PodmanCli) PodWatcher(ctx context.Context, selector string) (watch.Interface, error) { - return nil, nil + + watcher := podWatcher{ + stop: make(chan struct{}), + pods: make(map[string]struct{}), + events: make(chan watch.Event), + } + go watcher.watch(o.podmanCmd, o.containerRunGlobalExtraArgs) + return watcher, nil +} + +func (o podWatcher) watch(podmanCmd string, containerRunGlobalExtraArgs []string) { + args := []string{"ps", "--quiet"} + args = append(containerRunGlobalExtraArgs, args...) + ticker := time.NewTicker(3 * time.Second) + for { + select { + case <-o.stop: + return + case <-ticker.C: + cmd := exec.Command(podmanCmd, args...) + out, err := cmd.Output() + if err != nil { + klog.V(4).Infof("error getting containers from podman: %s", err) + continue + } + scanner := bufio.NewScanner(bytes.NewReader(out)) + currentPods := make(map[string]struct{}) + for scanner.Scan() { + podName := scanner.Text() + currentPods[podName] = struct{}{} + if _, ok := o.pods[podName]; !ok { + o.events <- watch.Event{ + Type: watch.Added, + } + o.pods[podName] = struct{}{} + } + } + for p := range o.pods { + if _, ok := currentPods[p]; !ok { + o.events <- watch.Event{ + Type: watch.Deleted, + } + delete(o.pods, p) + } + } + } + } +} + +func (o podWatcher) Stop() { + o.stop <- struct{}{} +} + +func (o podWatcher) ResultChan() <-chan watch.Event { + return o.events } From 23c10682180b86f90568fdc2332a27495a1101d0 Mon Sep 17 00:00:00 2001 From: Philippe Martin Date: Tue, 27 Jun 2023 12:38:44 +0200 Subject: [PATCH 4/4] Add integration test --- .../devfile-deploy-functional-pods.yaml | 4 +- tests/integration/cmd_logs_test.go | 42 +++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/tests/examples/source/devfiles/nodejs/devfile-deploy-functional-pods.yaml b/tests/examples/source/devfiles/nodejs/devfile-deploy-functional-pods.yaml index 9cb85bb9d79..8d371e49f29 100644 --- a/tests/examples/source/devfiles/nodejs/devfile-deploy-functional-pods.yaml +++ b/tests/examples/source/devfiles/nodejs/devfile-deploy-functional-pods.yaml @@ -59,8 +59,8 @@ commands: components: - container: endpoints: - - name: http-3000 - targetPort: 3000 + - name: http-8080 + targetPort: 8080 image: registry.access.redhat.com/ubi8/nodejs-14:latest memoryLimit: 1024Mi mountSources: true diff --git a/tests/integration/cmd_logs_test.go b/tests/integration/cmd_logs_test.go index 7c2f74a3241..cbcb67c36a3 100644 --- a/tests/integration/cmd_logs_test.go +++ b/tests/integration/cmd_logs_test.go @@ -182,6 +182,48 @@ var _ = Describe("odo logs command tests", func() { }) }) })) + + When("logs --follow is started", func() { + var logsSession helper.LogsSession + var err error + + BeforeEach(func() { + logsSession, _, _, err = helper.StartLogsFollow(podman, "--dev") + Expect(err).ToNot(HaveOccurred()) + }) + AfterEach(func() { + logsSession.Kill() + }) + + When("running in Dev mode", helper.LabelPodmanIf(podman, func() { + var devSession helper.DevSession + + BeforeEach(func() { + var err error + devSession, err = helper.StartDevMode(helper.DevSessionOpts{ + RunOnPodman: podman, + }) + Expect(err).ToNot(HaveOccurred()) + if !podman { + // We need to wait for the pod deployed as a Kubernetes component + Eventually(func() bool { + return areAllPodsRunning() + }).Should(Equal(true)) + } + }) + AfterEach(func() { + devSession.Stop() + devSession.WaitEnd() + }) + + It("should successfully follow logs of running component", func() { + Eventually(func() bool { + logs := logsSession.OutContents() + return strings.Contains(string(logs), "Server running on") + }, 20*time.Second, 5).Should(BeTrue()) + }) + })) + }) } When("running in Deploy mode", func() {