Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watch for new pods when logs --follow #6914

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/exec/exec_test.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import (
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
)

const (
@@ -48,6 +49,10 @@ func (o fakePlatform) GetPodUsingComponentName(componentName string) (*corev1.Po
panic("not implemented yet")
}

func (o fakePlatform) PodWatcher(ctx context.Context, selector string) (watch.Interface, error) {
return nil, nil
}

func TestExecuteCommand(t *testing.T) {
for _, tt := range []struct {
name string
64 changes: 50 additions & 14 deletions pkg/logs/logs.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (
"io"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"

odolabels "github.com/redhat-developer/odo/pkg/labels"
odocontext "github.com/redhat-developer/odo/pkg/odo/context"
@@ -17,8 +18,9 @@ type LogsClient struct {
}

type ContainerLogs struct {
Name string
Logs io.ReadCloser
PodName string
ContainerName string
Logs io.ReadCloser
}

type Events struct {
@@ -80,7 +82,11 @@ func (o *LogsClient) getLogsForMode(
if err != nil {
events.Err <- fmt.Errorf("failed to get logs for container %s; error: %v", container.Name, err)
}
events.Logs <- ContainerLogs{container.Name, containerLogs}
events.Logs <- ContainerLogs{
PodName: pod.GetName(),
ContainerName: container.Name,
Logs: containerLogs,
}
}
case err := <-errChan:
events.Err <- err
@@ -92,19 +98,43 @@ func (o *LogsClient) getLogsForMode(

appname := odocontext.GetApplication(ctx)

if mode == odolabels.ComponentDevMode || mode == odolabels.ComponentAnyMode {
selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDevMode, false)
err := o.getPodsForSelector(selector, namespace, podChan)
if err != nil {
errChan <- err
getPods := func() error {
if mode == odolabels.ComponentDevMode || mode == odolabels.ComponentAnyMode {
selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDevMode, false)
err := o.getPodsForSelector(selector, namespace, podChan)
if err != nil {
return err
}
}
if mode == odolabels.ComponentDeployMode || mode == odolabels.ComponentAnyMode {
selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDeployMode, false)
err := o.getPodsForSelector(selector, namespace, podChan)
if err != nil {
return err
}
}
return nil
}

err := getPods()
if err != nil {
errChan <- err
}
if mode == odolabels.ComponentDeployMode || mode == odolabels.ComponentAnyMode {
selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDeployMode, false)
err := o.getPodsForSelector(selector, namespace, podChan)

if follow {
podWatcher, err := o.platformClient.PodWatcher(ctx, "")
if err != nil {
errChan <- err
}
for ev := range podWatcher.ResultChan() {
switch ev.Type {
case watch.Added, watch.Modified:
err = getPods()
if err != nil {
errChan <- err
}
}
}
}

doneChan <- struct{}{}
@@ -125,7 +155,9 @@ func (o *LogsClient) getPodsForSelector(
return err
}
for _, pod := range podList.Items {
pods[pod.GetName()] = struct{}{}
if pod.Status.Phase == "Running" {
pods[pod.GetName()] = struct{}{}
}
}

// get all pods in the namespace
@@ -139,11 +171,15 @@ func (o *LogsClient) getPodsForSelector(
// Pod's logs have already been displayed to user
continue
}
podList.Items = append(podList.Items, pod)
if pod.Status.Phase == "Running" {
podList.Items = append(podList.Items, pod)
}
}

for _, pod := range podList.Items {
podChan <- pod
if pod.Status.Phase == "Running" {
podChan <- pod
}
}

return nil
27 changes: 25 additions & 2 deletions pkg/odo/cli/logs/logs.go
Original file line number Diff line number Diff line change
@@ -152,14 +152,36 @@ func (o *LogsOptions) Run(ctx context.Context) error {
errChan := make(chan error) // errors are put on this channel
var mu sync.Mutex

displayedLogs := map[string]struct{}{}
for {
select {
case containerLogs := <-events.Logs:
uniqueName := getUniqueContainerName(containerLogs.Name, uniqueContainerNames)
podContainerName := fmt.Sprintf("%s-%s", containerLogs.PodName, containerLogs.ContainerName)
if _, ok := displayedLogs[podContainerName]; ok {
continue
}
displayedLogs[podContainerName] = struct{}{}

uniqueName := getUniqueContainerName(containerLogs.ContainerName, uniqueContainerNames)
uniqueContainerNames[uniqueName] = struct{}{}
colour := log.ColorPicker()
logs := containerLogs.Logs

func() {
mu.Lock()
defer mu.Unlock()
color.Set(colour)
defer color.Unset()
help := ""
if uniqueName != containerLogs.ContainerName {
help = fmt.Sprintf(" (%s)", uniqueName)
}
_, err = fmt.Fprintf(o.out, "--> Logs for %s / %s%s\n", containerLogs.PodName, containerLogs.ContainerName, help)
if err != nil {
errChan <- err
}
}()

if o.follow {
atomic.AddInt64(&goroutines.count, 1)
go func(out io.Writer) {
@@ -170,6 +192,7 @@ func (o *LogsOptions) Run(ctx context.Context) error {
if err != nil {
errChan <- err
}
delete(displayedLogs, podContainerName)
events.Done <- struct{}{}
}(o.out)
} else {
@@ -183,7 +206,7 @@ func (o *LogsOptions) Run(ctx context.Context) error {
case err = <-events.Err:
return err
case <-events.Done:
if goroutines.count == 0 {
if !o.follow && goroutines.count == 0 {
if len(uniqueContainerNames) == 0 {
// This will be the case when:
// 1. user specifies --dev flag, but the component's running in Deploy mode
3 changes: 3 additions & 0 deletions pkg/platform/interface.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
)

// Client is the interface that wraps operations that can be performed on any supported platform.
@@ -33,4 +34,6 @@ type Client interface {
GetRunningPodFromSelector(selector string) (*corev1.Pod, error)

GetPodUsingComponentName(componentName string) (*corev1.Pod, error)

PodWatcher(ctx context.Context, selector string) (watch.Interface, error)
}
16 changes: 16 additions & 0 deletions pkg/platform/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions pkg/podman/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 78 additions & 1 deletion pkg/podman/pods.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package podman

import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"os/exec"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog"

"github.com/redhat-developer/odo/pkg/platform"
@@ -24,7 +29,8 @@ func (o *PodmanCli) GetPodsMatchingSelector(selector string) (*corev1.PodList, e
for _, podReport := range podsReport {
pod, err := o.KubeGenerate(podReport.Name)
if err != nil {
return nil, err
// The pod has disappeared in the meantime, forget it
continue
}
// We remove the podname- prefix from the container names as Podman adds this prefix
// (to avoid colliding container names?)
@@ -33,6 +39,13 @@ func (o *PodmanCli) GetPodsMatchingSelector(selector string) (*corev1.PodList, e
prefix := pod.GetName() + "-"
container.Name = strings.TrimPrefix(container.Name, prefix)
}
inspect, err := o.PodInspect(podReport.Name)
if err != nil {
// The pod has disappeared in the meantime, forget it
continue
}
pod.Status.Phase = corev1.PodPhase(inspect.State)

result.Items = append(result.Items, *pod)
}
return &result, nil
@@ -129,3 +142,67 @@ func (o *PodmanCli) getPodsFromSelector(selector string) ([]ListPodsReport, erro
}
return list, nil
}

type podWatcher struct {
stop chan struct{}
pods map[string]struct{}
events chan watch.Event
}

func (o *PodmanCli) PodWatcher(ctx context.Context, selector string) (watch.Interface, error) {

watcher := podWatcher{
stop: make(chan struct{}),
pods: make(map[string]struct{}),
events: make(chan watch.Event),
}
go watcher.watch(o.podmanCmd, o.containerRunGlobalExtraArgs)
return watcher, nil
}

func (o podWatcher) watch(podmanCmd string, containerRunGlobalExtraArgs []string) {
args := []string{"ps", "--quiet"}
args = append(containerRunGlobalExtraArgs, args...)
ticker := time.NewTicker(3 * time.Second)
for {
select {
case <-o.stop:
return
case <-ticker.C:
cmd := exec.Command(podmanCmd, args...)
out, err := cmd.Output()
if err != nil {
klog.V(4).Infof("error getting containers from podman: %s", err)
continue
}
scanner := bufio.NewScanner(bytes.NewReader(out))
currentPods := make(map[string]struct{})
for scanner.Scan() {
podName := scanner.Text()
currentPods[podName] = struct{}{}
if _, ok := o.pods[podName]; !ok {
o.events <- watch.Event{
Type: watch.Added,
}
o.pods[podName] = struct{}{}
}
}
for p := range o.pods {
if _, ok := currentPods[p]; !ok {
o.events <- watch.Event{
Type: watch.Deleted,
}
delete(o.pods, p)
}
}
}
}
}

func (o podWatcher) Stop() {
o.stop <- struct{}{}
}

func (o podWatcher) ResultChan() <-chan watch.Event {
return o.events
}
Original file line number Diff line number Diff line change
@@ -59,8 +59,8 @@ commands:
components:
- container:
endpoints:
- name: http-3000
targetPort: 3000
- name: http-8080
targetPort: 8080
image: registry.access.redhat.com/ubi8/nodejs-14:latest
memoryLimit: 1024Mi
mountSources: true
Loading