diff --git a/master/internal/rm/kubernetesrm/events.go b/master/internal/rm/kubernetesrm/events.go index c900d1deace5..9ee97457d874 100644 --- a/master/internal/rm/kubernetesrm/events.go +++ b/master/internal/rm/kubernetesrm/events.go @@ -67,7 +67,7 @@ func (e *eventListener) startEventListener(ctx *actor.Context) { context.TODO(), metaV1.ListOptions{}) if err != nil { ctx.Log().WithError(err).Warnf("error retrieving internal resource version") - actors.NotifyAfter(ctx, defaultInformerBackoff, startEventListener{}) + actors.NotifyAfter(ctx, defaultBackoff, startEventListener{}) return } @@ -79,7 +79,7 @@ func (e *eventListener) startEventListener(ctx *actor.Context) { }) if err != nil { ctx.Log().WithError(err).Warnf("error initializing event retry watcher") - actors.NotifyAfter(ctx, defaultInformerBackoff, startEventListener{}) + actors.NotifyAfter(ctx, defaultBackoff, startEventListener{}) return } diff --git a/master/internal/rm/kubernetesrm/informer.go b/master/internal/rm/kubernetesrm/informer.go index d95d8e7a0dbb..b5c2c1511aba 100644 --- a/master/internal/rm/kubernetesrm/informer.go +++ b/master/internal/rm/kubernetesrm/informer.go @@ -15,65 +15,42 @@ import ( typedV1 "k8s.io/client-go/kubernetes/typed/core/v1" ) -const defaultInformerBackoff = 5 * time.Second - -// messages that are sent to the informer. -type ( - startInformer struct{} -) - -// messages that are sent by the informer. -type ( - podStatusUpdate struct { - updatedPod *k8sV1.Pod - } -) +const defaultBackoff = 5 * time.Second +// Informer struct TODO CAROLINA. type Informer struct { syslog *logrus.Entry podInterface typedV1.PodInterface - // podsHandler *actor.Ref } -// DefaultInformer is the global informer singleton. -var DefaultInformer *startInformer - -// TODO CAROLINA -- actor.Prestart & startInformer - -// TODO CAROLINA -- actor.PostStop - func newInformer( namespace string, podInterface typedV1.PodInterface, - // podsHandler *actor.Ref, // TODO CAROLINA ) *Informer { return &Informer{ - syslog: logrus.WithField("Informer namespace", namespace), // TODO CAROLINA + syslog: logrus.WithField("Informer", namespace), podInterface: podInterface, - // podsHandler: podsHandler, } } -// TODO CAROLINA -func (i *Informer) startInformer() { - // TODO +// startInformer returns the updated pod, if any. +func (i *Informer) startInformer() *k8sV1.Pod { pods, err := i.podInterface.List( context.TODO(), metaV1.ListOptions{LabelSelector: determinedLabel}) if err != nil { i.syslog.WithError(err).Warnf("error retrieving internal resource version") - return + return nil } rw, err := watchtools.NewRetryWatcher(pods.ResourceVersion, &cache.ListWatch{ WatchFunc: func(options metaV1.ListOptions) (watch.Interface, error) { - // TODO return i.podInterface.Watch( context.TODO(), metaV1.ListOptions{LabelSelector: determinedLabel}) }, }) if err != nil { i.syslog.WithError(err).Warnf("error initializing pod retry watcher") - return + return nil } i.syslog.Info("pod informer is starting") @@ -90,9 +67,9 @@ func (i *Informer) startInformer() { } i.syslog.Debugf("informer got new pod event for pod: %s %s", pod.Name, pod.Status.Phase) - // ctx.Tell(i.podsHandler, podStatusUpdate{updatedPod: pod}) - return + return pod } i.syslog.Warn("pod informer stopped unexpectedly") + return nil } diff --git a/master/internal/rm/kubernetesrm/nodes.go b/master/internal/rm/kubernetesrm/nodes.go index 7ba5154303ea..5796cd848dda 100644 --- a/master/internal/rm/kubernetesrm/nodes.go +++ b/master/internal/rm/kubernetesrm/nodes.go @@ -9,72 +9,36 @@ import ( "k8s.io/client-go/tools/cache" ) -// Messages sent by the nodeInformer to itself. -type ( - startNodeInformer struct{} -) - // Messages sent by the nodeInformer to the podsHandler. -type ( - nodeStatusUpdate struct { - updatedNode *k8sV1.Node - deletedNode *k8sV1.Node - } -) +type nodeStatusUpdate struct { + updatedNode *k8sV1.Node + deletedNode *k8sV1.Node +} type nodeInformer struct { informer k8Informers.SharedInformerFactory syslog *logrus.Entry - // podsHandler *actor.Ref - stop chan struct{} + stop chan struct{} } func newNodeInformer(clientSet k8sClient.Interface) *nodeInformer { return &nodeInformer{ informer: k8Informers.NewSharedInformerFactoryWithOptions( clientSet, 0, []k8Informers.SharedInformerOption{}...), - syslog: logrus.WithField("component", "nodeInformer"), // TODO CAROLINA - // podsHandler: podsHandler, - stop: make(chan struct{}), - } -} - -// TODO CAROLINA: actor.PreStart ctx.Tell(ctx.Self(), startNodeInformer{}), -// startNodeInformer if err := n.startNodeInformer(ctx); err != nil { return err } -// actor.PostStop ctx.Log().Infof("shutting down node informer") close(n.stop) -// default ctx.Log().Errorf("unexpected message %T", msg) return actor.ErrUnexpectedMessage(ctx) -/* -func (n *nodeInformer) Receive(ctx *actor.Context) error { - switch msg := ctx.Message().(type) { - case actor.PreStart: - ctx.Tell(ctx.Self(), startNodeInformer{}) - - case startNodeInformer: - if err := n.startNodeInformer(); err != nil { - return err - } - - case actor.PostStop: - ctx.Log().Infof("shutting down node informer") - close(n.stop) - - default: - ctx.Log().Errorf("unexpected message %T", msg) - return actor.ErrUnexpectedMessage(ctx) + syslog: logrus.WithField("component", "nodeInformer"), + stop: make(chan struct{}), } - - return nil } -*/ -func (n *nodeInformer) startNodeInformer() error { +func (n *nodeInformer) startNodeInformer() nodeStatusUpdate { + nodeStatus := nodeStatusUpdate{} nodeInformer := n.informer.Core().V1().Nodes().Informer() nodeInformer.AddEventHandler(&cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { node, ok := obj.(*k8sV1.Node) if ok { n.syslog.Debugf("node added %s", node.Name) - // ctx.Tell(n.podsHandler, nodeStatusUpdate{updatedNode: node}) // TODO CAROLINA + nodeStatus.updatedNode = node } else { n.syslog.Warnf("error converting event of type %T to *k8sV1.Node: %+v", obj, obj) } @@ -83,7 +47,7 @@ func (n *nodeInformer) startNodeInformer() error { node, ok := newObj.(*k8sV1.Node) if ok { n.syslog.Debugf("node updated %s", node.Name) - // ctx.Tell(n.podsHandler, nodeStatusUpdate{updatedNode: node}) // TODO CAROLINA + nodeStatus.updatedNode = node } else { n.syslog.Warnf("error converting event of type %T to *k8sV1.Node: %+v", newObj, newObj) } @@ -92,7 +56,7 @@ func (n *nodeInformer) startNodeInformer() error { node, ok := obj.(*k8sV1.Node) if ok { n.syslog.Debugf("node stopped %s", node.Name) - // ctx.Tell(n.podsHandler, nodeStatusUpdate{deletedNode: node}) // TODO CAROLINA + nodeStatus.deletedNode = node } else { n.syslog.Warnf("error converting event of type %T to *k8sV1.Node: %+v", obj, obj) } @@ -105,5 +69,10 @@ func (n *nodeInformer) startNodeInformer() error { } n.syslog.Info("node informer has started") - return nil + return nodeStatus +} + +func (n *nodeInformer) stopNodeInformer() { + n.syslog.Infof("shutting down node informer") + close(n.stop) } diff --git a/master/internal/rm/kubernetesrm/pod.go b/master/internal/rm/kubernetesrm/pod.go index 942f08c03d70..136dc075f250 100644 --- a/master/internal/rm/kubernetesrm/pod.go +++ b/master/internal/rm/kubernetesrm/pod.go @@ -175,8 +175,8 @@ func (p *pod) Receive(ctx *actor.Context) error { case resourceCreationFailed: p.receiveResourceCreationFailed(ctx, msg) - case podStatusUpdate: - if err := p.receivePodStatusUpdate(ctx, msg); err != nil { + case k8sV1.Pod: + if err := p.receivePodStatusUpdate(ctx, &msg); err != nil { return err } @@ -262,8 +262,8 @@ func (p *pod) receiveResourceCreationFailed(ctx *actor.Context, msg resourceCrea ctx.Self().Stop() } -func (p *pod) receivePodStatusUpdate(ctx *actor.Context, msg podStatusUpdate) error { - p.pod = msg.updatedPod +func (p *pod) receivePodStatusUpdate(ctx *actor.Context, pod *k8sV1.Pod) error { + p.pod = pod containerState, err := getPodState(ctx, p.pod, p.containerNames) if err != nil { diff --git a/master/internal/rm/kubernetesrm/pod_test.go b/master/internal/rm/kubernetesrm/pod_test.go index 065b02ca9294..87189e178e57 100644 --- a/master/internal/rm/kubernetesrm/pod_test.go +++ b/master/internal/rm/kubernetesrm/pod_test.go @@ -208,13 +208,13 @@ func cleanup(t *testing.T) { func checkReceiveTermination( t *testing.T, - update podStatusUpdate, + update *k8sV1.Pod, system *actor.System, ref *actor.Ref, newPod *pod, podMap map[string]*mockReceiver, ) { - system.Ask(ref, update) + // system.Ask(ref, update) // TODO CAROLINA time.Sleep(time.Second) assert.Equal(t, podMap["task"].GetLength(), 1) @@ -285,7 +285,7 @@ func TestReceivePodStatusUpdateTerminated(t *testing.T) { Status: k8sV1.PodStatus{Phase: k8sV1.PodPending}, } - statusUpdate := podStatusUpdate{updatedPod: &pod} + statusUpdate := &pod checkReceiveTermination(t, statusUpdate, system, ref, newPod, podMap) @@ -299,7 +299,7 @@ func TestReceivePodStatusUpdateTerminated(t *testing.T) { ObjectMeta: objectMeta, Status: k8sV1.PodStatus{Phase: k8sV1.PodFailed}, } - statusUpdate = podStatusUpdate{updatedPod: &pod} + statusUpdate = &pod checkReceiveTermination(t, statusUpdate, system, ref, newPod, podMap) @@ -312,7 +312,7 @@ func TestReceivePodStatusUpdateTerminated(t *testing.T) { ObjectMeta: objectMeta, Status: k8sV1.PodStatus{Phase: k8sV1.PodSucceeded}, } - statusUpdate = podStatusUpdate{updatedPod: &pod} + statusUpdate = &pod checkReceiveTermination(t, statusUpdate, system, ref, newPod, podMap) @@ -358,9 +358,7 @@ func TestMultipleContainerTerminate(t *testing.T) { ContainerStatuses: containerStatuses, }, } - - statusUpdate := podStatusUpdate{updatedPod: &pod} - checkReceiveTermination(t, statusUpdate, system, ref, newPod, podMap) + checkReceiveTermination(t, &pod, system, ref, newPod, podMap) // Multiple pods, 1 termination, no deletion timestamp. // This results in an error, which causes pod termination and the same outcome. @@ -379,9 +377,7 @@ func TestMultipleContainerTerminate(t *testing.T) { ContainerStatuses: containerStatuses, }, } - - statusUpdate = podStatusUpdate{updatedPod: &pod} - checkReceiveTermination(t, statusUpdate, system, ref, newPod, podMap) + checkReceiveTermination(t, &pod, system, ref, newPod, podMap) } func TestReceivePodStatusUpdateAssigned(t *testing.T) { @@ -402,16 +398,14 @@ func TestReceivePodStatusUpdateAssigned(t *testing.T) { Status: k8sV1.PodStatus{Phase: k8sV1.PodPending}, } - statusUpdate := podStatusUpdate{updatedPod: &pod} - assert.Equal(t, newPod.container.State, cproto.Assigned) - system.Ask(ref, statusUpdate) + system.Ask(ref, &pod) // TODO CAROLINA time.Sleep(time.Second) assert.Equal(t, podMap["task"].GetLength(), 0) newPod.container.State = cproto.Starting - system.Ask(ref, statusUpdate) + // system.Ask(ref, &pod) // TODO CAROLINA time.Sleep(time.Second) assert.Equal(t, podMap["task"].GetLength(), 0) assert.Equal(t, newPod.container.State, cproto.Starting) @@ -445,15 +439,14 @@ func TestReceivePodStatusUpdateStarting(t *testing.T) { ObjectMeta: objectMeta, Status: status, } - statusUpdate := podStatusUpdate{updatedPod: &pod} - system.Ask(ref, statusUpdate) + system.Ask(ref, &pod) // TODO CAROLINA time.Sleep(time.Second) assert.Equal(t, podMap["task"].GetLength(), 2) assert.Equal(t, newPod.container.State, cproto.Starting) podMap["task"].Purge() - system.Ask(ref, statusUpdate) + system.Ask(ref, &pod) // TODO CAROLINA time.Sleep(time.Second) assert.Equal(t, podMap["task"].GetLength(), 0) assert.Equal(t, newPod.container.State, cproto.Starting) @@ -483,9 +476,8 @@ func TestReceivePodStatusUpdateStarting(t *testing.T) { ObjectMeta: objectMeta, Status: status, } - statusUpdate = podStatusUpdate{updatedPod: &pod} - system.Ask(ref, statusUpdate) + system.Ask(ref, &pod) // TODO CAROLINA time.Sleep(time.Second) assert.Equal(t, podMap["task"].GetLength(), 2) @@ -507,8 +499,8 @@ func TestReceivePodStatusUpdateStarting(t *testing.T) { ObjectMeta: objectMeta, Status: status, } - statusUpdate = podStatusUpdate{updatedPod: &pod} - system.Ask(ref, statusUpdate) + + system.Ask(ref, &pod) // TODO CAROLINA time.Sleep(time.Second) assert.Equal(t, podMap["task"].GetLength(), 2) assert.Equal(t, newPod.container.State, cproto.Starting) @@ -558,9 +550,8 @@ func TestMultipleContainersRunning(t *testing.T) { "determined-fluent-container", "test-pod", }) - statusUpdate := podStatusUpdate{updatedPod: &pod} - system.Ask(ref, statusUpdate) + system.Ask(ref, &pod) // TODO CAROLINA time.Sleep(time.Second) assert.Equal(t, podMap["task"].GetLength(), 0) assert.Equal(t, newPod.container.State, cproto.Starting) @@ -586,8 +577,8 @@ func TestMultipleContainersRunning(t *testing.T) { ObjectMeta: objectMeta, Status: status, } - statusUpdate = podStatusUpdate{updatedPod: &pod} - system.Ask(ref, statusUpdate) + + system.Ask(ref, &pod) // TODO CAROLINA time.Sleep(time.Second) assert.Equal(t, podMap["task"].GetLength(), 1) @@ -704,7 +695,7 @@ func TestReceiveContainerLog(t *testing.T) { newPod.containerNames = set.FromSlice([]string{ "sample-container", }) - statusUpdate := podStatusUpdate{updatedPod: &pod} + statusUpdate := &pod system.Ask(ref, statusUpdate) time.Sleep(time.Second) diff --git a/master/internal/rm/kubernetesrm/pods.go b/master/internal/rm/kubernetesrm/pods.go index c2b4d340cdd3..a99aae5179c9 100644 --- a/master/internal/rm/kubernetesrm/pods.go +++ b/master/internal/rm/kubernetesrm/pods.go @@ -210,24 +210,19 @@ func (p *pods) Receive(ctx *actor.Context) error { } } - p.startPodInformer(ctx) - p.startNodeInformer(ctx) + p.startPodInformer() + p.startNodeInformer() p.startEventListeners(ctx) p.startPreemptionListeners(ctx) case actor.PostStop: + p.nodeInformer.stopNodeInformer() case StartTaskPod: if err := p.receiveStartTaskPod(ctx, msg); err != nil { return err } - case podStatusUpdate: - p.receivePodStatusUpdate(ctx, msg) - - case nodeStatusUpdate: - p.receiveNodeStatusUpdate(ctx, msg) - case podEventUpdate: p.receivePodEventUpdate(ctx, msg) @@ -569,13 +564,13 @@ func (p *pods) reattachPod( containerID: containerID, } - // Send a podStatusUpdate for any missed updates between master going up + // Send a message for any missed updates between master going up // and the pod being reattached. updated, err := p.podInterfaces[pod.Namespace].Get(context.TODO(), pod.Name, metaV1.GetOptions{}) if err != nil { return reattachPodResponse{}, errors.Wrap(err, "error getting pod status update in restore") } - ctx.Tell(ctx.Self(), podStatusUpdate{updatedPod: updated}) + ctx.Tell(ctx.Self(), updated) return reattachPodResponse{containerID: containerID, started: started}, nil } @@ -676,19 +671,26 @@ func (p *pods) deleteDoomedKubernetesResources(ctx *actor.Context) error { return nil } -func (p *pods) startPodInformer(ctx *actor.Context) { +func (p *pods) startPodInformer() { for namespace := range p.namespaceToPoolName { - // TODO CAROLINA: this is where we init a new informer - // i, _ := ctx.ActorOf("pod-informer-"+namespace, - // newInformer(p.podInterfaces[namespace], ctx.Self()), - // ) - p.informers = append(p.informers, newInformer(namespace, p.podInterfaces[namespace])) + i := newInformer(namespace, p.podInterfaces[namespace]) + pod := i.startInformer() + p.receivePodStatusUpdate(i, pod) + p.informers = append(p.informers, i) } } -func (p *pods) startNodeInformer(ctx *actor.Context) { - // p.nodeInformer, _ = ctx.ActorOf("node-informer", newNodeInformer(p.clientSet, ctx.Self())) +func (p *pods) startNodeInformer() { p.nodeInformer = newNodeInformer(p.clientSet) + nodeStatus := p.nodeInformer.startNodeInformer() + + if nodeStatus.updatedNode != nil { + p.currentNodes[nodeStatus.updatedNode.Name] = nodeStatus.updatedNode + } + + if nodeStatus.deletedNode != nil { + delete(p.currentNodes, nodeStatus.deletedNode.Name) + } } func (p *pods) startEventListeners(ctx *actor.Context) { @@ -761,43 +763,35 @@ func (p *pods) receiveStartTaskPod(ctx *actor.Context, msg StartTaskPod) error { return nil } -func (p *pods) receivePodStatusUpdate(ctx *actor.Context, msg podStatusUpdate) { - ref, ok := p.podNameToPodHandler[msg.updatedPod.Name] +func (p *pods) receivePodStatusUpdate(i *Informer, pod *k8sV1.Pod) { + _, ok := p.podNameToPodHandler[pod.Name] if !ok { - ctx.Log().WithField("pod-name", msg.updatedPod.Name).Warn( + i.syslog.WithField("pod-name", pod.Name).Warn( "received pod status update for un-registered pod") return } - ctx.Tell(ref, msg) + // ctx.Tell(ref, pod) TODO CAROLINA - if containerID, ok := p.podNameToContainerID[msg.updatedPod.Name]; ok { + if containerID, ok := p.podNameToContainerID[pod.Name]; ok { if state, ok := p.containerIDToSchedulingState[containerID]; ok { currState := sproto.SchedulingStateQueued - if msg.updatedPod.Status.Phase == "Running" { + if pod.Status.Phase == "Running" { currState = sproto.SchedulingStateScheduled } if currState != state { p.containerIDToSchedulingState[containerID] = currState - ctx.Tell(p.cluster, sproto.UpdatePodStatus{ + // TODO CAROLINA -- what to do with this tell?? + /* ctx.Tell(p.cluster, sproto.UpdatePodStatus{ ContainerID: containerID, State: currState, }) + */ } } } } -func (p *pods) receiveNodeStatusUpdate(ctx *actor.Context, msg nodeStatusUpdate) { - if msg.updatedNode != nil { - p.currentNodes[msg.updatedNode.Name] = msg.updatedNode - } - - if msg.deletedNode != nil { - delete(p.currentNodes, msg.deletedNode.Name) - } -} - func (p *pods) receivePodEventUpdate(ctx *actor.Context, msg podEventUpdate) { ref, ok := p.podNameToPodHandler[msg.event.InvolvedObject.Name] if !ok { diff --git a/master/internal/rm/kubernetesrm/preemptions.go b/master/internal/rm/kubernetesrm/preemptions.go index f6dcbba4a5cb..5622d1722b46 100644 --- a/master/internal/rm/kubernetesrm/preemptions.go +++ b/master/internal/rm/kubernetesrm/preemptions.go @@ -62,7 +62,7 @@ func (p *preemptionListener) startPreemptionListener(ctx *actor.Context) { ctx.Log().WithError(err).Warnf( "error in initializing preemption listener: checking for pods to preempt", ) - actors.NotifyAfter(ctx, defaultInformerBackoff, startPreemptionListener{}) + actors.NotifyAfter(ctx, defaultBackoff, startPreemptionListener{}) return } @@ -78,7 +78,7 @@ func (p *preemptionListener) startPreemptionListener(ctx *actor.Context) { }) if err != nil { ctx.Log().WithError(err).Warnf("error initializing preemption watch") - actors.NotifyAfter(ctx, defaultInformerBackoff, startPreemptionListener{}) + actors.NotifyAfter(ctx, defaultBackoff, startPreemptionListener{}) return }