Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watch for new pods when logs --follow #6914

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
)

const (
Expand Down Expand Up @@ -48,6 +49,10 @@ func (o fakePlatform) GetPodUsingComponentName(componentName string) (*corev1.Po
panic("not implemented yet")
}

func (o fakePlatform) PodWatcher(ctx context.Context, selector string) (watch.Interface, error) {
return nil, nil
}

func TestExecuteCommand(t *testing.T) {
for _, tt := range []struct {
name string
Expand Down
64 changes: 50 additions & 14 deletions pkg/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"

odolabels "github.com/redhat-developer/odo/pkg/labels"
odocontext "github.com/redhat-developer/odo/pkg/odo/context"
Expand All @@ -17,8 +18,9 @@ type LogsClient struct {
}

type ContainerLogs struct {
Name string
Logs io.ReadCloser
PodName string
ContainerName string
Logs io.ReadCloser
}

type Events struct {
Expand Down Expand Up @@ -80,7 +82,11 @@ func (o *LogsClient) getLogsForMode(
if err != nil {
events.Err <- fmt.Errorf("failed to get logs for container %s; error: %v", container.Name, err)
}
events.Logs <- ContainerLogs{container.Name, containerLogs}
events.Logs <- ContainerLogs{
PodName: pod.GetName(),
ContainerName: container.Name,
Logs: containerLogs,
}
}
case err := <-errChan:
events.Err <- err
Expand All @@ -92,19 +98,43 @@ func (o *LogsClient) getLogsForMode(

appname := odocontext.GetApplication(ctx)

if mode == odolabels.ComponentDevMode || mode == odolabels.ComponentAnyMode {
selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDevMode, false)
err := o.getPodsForSelector(selector, namespace, podChan)
if err != nil {
errChan <- err
getPods := func() error {
if mode == odolabels.ComponentDevMode || mode == odolabels.ComponentAnyMode {
selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDevMode, false)
err := o.getPodsForSelector(selector, namespace, podChan)
if err != nil {
return err
}
}
if mode == odolabels.ComponentDeployMode || mode == odolabels.ComponentAnyMode {
selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDeployMode, false)
err := o.getPodsForSelector(selector, namespace, podChan)
if err != nil {
return err
}
}
return nil
}

err := getPods()
if err != nil {
errChan <- err
}
if mode == odolabels.ComponentDeployMode || mode == odolabels.ComponentAnyMode {
selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDeployMode, false)
err := o.getPodsForSelector(selector, namespace, podChan)

if follow {
podWatcher, err := o.platformClient.PodWatcher(ctx, "")
if err != nil {
errChan <- err
}
for ev := range podWatcher.ResultChan() {
switch ev.Type {
case watch.Added, watch.Modified:
err = getPods()
if err != nil {
errChan <- err
}
}
}
}

doneChan <- struct{}{}
Expand All @@ -125,7 +155,9 @@ func (o *LogsClient) getPodsForSelector(
return err
}
for _, pod := range podList.Items {
pods[pod.GetName()] = struct{}{}
if pod.Status.Phase == "Running" {
pods[pod.GetName()] = struct{}{}
}
}

// get all pods in the namespace
Expand All @@ -139,11 +171,15 @@ func (o *LogsClient) getPodsForSelector(
// Pod's logs have already been displayed to user
continue
}
podList.Items = append(podList.Items, pod)
if pod.Status.Phase == "Running" {
podList.Items = append(podList.Items, pod)
}
}

for _, pod := range podList.Items {
podChan <- pod
if pod.Status.Phase == "Running" {
podChan <- pod
}
}

return nil
Expand Down
27 changes: 25 additions & 2 deletions pkg/odo/cli/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,36 @@ func (o *LogsOptions) Run(ctx context.Context) error {
errChan := make(chan error) // errors are put on this channel
var mu sync.Mutex

displayedLogs := map[string]struct{}{}
for {
select {
case containerLogs := <-events.Logs:
uniqueName := getUniqueContainerName(containerLogs.Name, uniqueContainerNames)
podContainerName := fmt.Sprintf("%s-%s", containerLogs.PodName, containerLogs.ContainerName)
if _, ok := displayedLogs[podContainerName]; ok {
continue
}
displayedLogs[podContainerName] = struct{}{}

uniqueName := getUniqueContainerName(containerLogs.ContainerName, uniqueContainerNames)
uniqueContainerNames[uniqueName] = struct{}{}
colour := log.ColorPicker()
logs := containerLogs.Logs

func() {
mu.Lock()
defer mu.Unlock()
color.Set(colour)
defer color.Unset()
help := ""
if uniqueName != containerLogs.ContainerName {
help = fmt.Sprintf(" (%s)", uniqueName)
}
_, err = fmt.Fprintf(o.out, "--> Logs for %s / %s%s\n", containerLogs.PodName, containerLogs.ContainerName, help)
if err != nil {
errChan <- err
}
}()

if o.follow {
atomic.AddInt64(&goroutines.count, 1)
go func(out io.Writer) {
Expand All @@ -170,6 +192,7 @@ func (o *LogsOptions) Run(ctx context.Context) error {
if err != nil {
errChan <- err
}
delete(displayedLogs, podContainerName)
events.Done <- struct{}{}
}(o.out)
} else {
Expand All @@ -183,7 +206,7 @@ func (o *LogsOptions) Run(ctx context.Context) error {
case err = <-events.Err:
return err
case <-events.Done:
if goroutines.count == 0 {
if !o.follow && goroutines.count == 0 {
if len(uniqueContainerNames) == 0 {
// This will be the case when:
// 1. user specifies --dev flag, but the component's running in Deploy mode
Expand Down
3 changes: 3 additions & 0 deletions pkg/platform/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
)

// Client is the interface that wraps operations that can be performed on any supported platform.
Expand Down Expand Up @@ -33,4 +34,6 @@ type Client interface {
GetRunningPodFromSelector(selector string) (*corev1.Pod, error)

GetPodUsingComponentName(componentName string) (*corev1.Pod, error)

PodWatcher(ctx context.Context, selector string) (watch.Interface, error)
}
16 changes: 16 additions & 0 deletions pkg/platform/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions pkg/podman/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 78 additions & 1 deletion pkg/podman/pods.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package podman

import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"os/exec"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog"

"github.com/redhat-developer/odo/pkg/platform"
Expand All @@ -24,7 +29,8 @@ func (o *PodmanCli) GetPodsMatchingSelector(selector string) (*corev1.PodList, e
for _, podReport := range podsReport {
pod, err := o.KubeGenerate(podReport.Name)
if err != nil {
return nil, err
// The pod has disappeared in the meantime, forget it
continue
}
// We remove the podname- prefix from the container names as Podman adds this prefix
// (to avoid colliding container names?)
Expand All @@ -33,6 +39,13 @@ func (o *PodmanCli) GetPodsMatchingSelector(selector string) (*corev1.PodList, e
prefix := pod.GetName() + "-"
container.Name = strings.TrimPrefix(container.Name, prefix)
}
inspect, err := o.PodInspect(podReport.Name)
if err != nil {
// The pod has disappeared in the meantime, forget it
continue
}
pod.Status.Phase = corev1.PodPhase(inspect.State)

result.Items = append(result.Items, *pod)
}
return &result, nil
Expand Down Expand Up @@ -129,3 +142,67 @@ func (o *PodmanCli) getPodsFromSelector(selector string) ([]ListPodsReport, erro
}
return list, nil
}

type podWatcher struct {
stop chan struct{}
pods map[string]struct{}
events chan watch.Event
}

func (o *PodmanCli) PodWatcher(ctx context.Context, selector string) (watch.Interface, error) {

watcher := podWatcher{
stop: make(chan struct{}),
pods: make(map[string]struct{}),
events: make(chan watch.Event),
}
go watcher.watch(o.podmanCmd, o.containerRunGlobalExtraArgs)
return watcher, nil
}

func (o podWatcher) watch(podmanCmd string, containerRunGlobalExtraArgs []string) {
args := []string{"ps", "--quiet"}
args = append(containerRunGlobalExtraArgs, args...)
ticker := time.NewTicker(3 * time.Second)
for {
select {
case <-o.stop:
return
case <-ticker.C:
cmd := exec.Command(podmanCmd, args...)
out, err := cmd.Output()
if err != nil {
klog.V(4).Infof("error getting containers from podman: %s", err)
continue
}
scanner := bufio.NewScanner(bytes.NewReader(out))
currentPods := make(map[string]struct{})
for scanner.Scan() {
podName := scanner.Text()
currentPods[podName] = struct{}{}
if _, ok := o.pods[podName]; !ok {
o.events <- watch.Event{
Type: watch.Added,
}
o.pods[podName] = struct{}{}
}
}
for p := range o.pods {
if _, ok := currentPods[p]; !ok {
o.events <- watch.Event{
Type: watch.Deleted,
}
delete(o.pods, p)
}
}
}
}
}

func (o podWatcher) Stop() {
o.stop <- struct{}{}
}

func (o podWatcher) ResultChan() <-chan watch.Event {
return o.events
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ commands:
components:
- container:
endpoints:
- name: http-3000
targetPort: 3000
- name: http-8080
targetPort: 8080
image: registry.access.redhat.com/ubi8/nodejs-14:latest
memoryLimit: 1024Mi
mountSources: true
Expand Down
Loading