Skip to content

Commit 145f769

Browse files
added handler functions, todo: write test
1 parent 23847ee commit 145f769

File tree

4 files changed

+74
-41
lines changed

4 files changed

+74
-41
lines changed

master/internal/rm/kubernetesrm/informer.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,34 @@ import (
1717

1818
const defaultBackoff = 5 * time.Second
1919

20+
type podCallbackFunc func(*k8sV1.Pod)
21+
2022
type informer struct {
2123
syslog *logrus.Entry
2224
podInterface typedV1.PodInterface
25+
podHandler podCallbackFunc
2326
}
2427

2528
func newInformer(
2629
namespace string,
2730
podInterface typedV1.PodInterface,
31+
podHandler func(*k8sV1.Pod),
2832
) *informer {
2933
return &informer{
3034
syslog: logrus.WithField("Informer", namespace),
3135
podInterface: podInterface,
36+
podHandler: podHandler,
3237
}
3338
}
3439

3540
// startInformer returns the updated pod, if any.
36-
func (i *informer) startInformer() (*k8sV1.Pod, bool) {
41+
func (i *informer) startInformer() bool {
42+
ok := true
3743
pods, err := i.podInterface.List(
3844
context.TODO(), metaV1.ListOptions{LabelSelector: determinedLabel})
3945
if err != nil {
4046
i.syslog.WithError(err).Warnf("error retrieving internal resource version")
47+
ok = false
4148
}
4249

4350
rw, err := watchtools.NewRetryWatcher(pods.ResourceVersion, &cache.ListWatch{
@@ -48,6 +55,7 @@ func (i *informer) startInformer() (*k8sV1.Pod, bool) {
4855
})
4956
if err != nil {
5057
i.syslog.WithError(err).Warnf("error initializing pod retry watcher")
58+
ok = false
5159
}
5260

5361
i.syslog.Info("pod informer is starting")
@@ -57,16 +65,16 @@ func (i *informer) startInformer() (*k8sV1.Pod, bool) {
5765
continue
5866
}
5967

60-
pod, ok := event.Object.(*k8sV1.Pod)
61-
if !ok {
68+
pod, OK := event.Object.(*k8sV1.Pod)
69+
if !OK { // TODO CAROLINA: capitalizing OK to differentiate from ok var, better name?
6270
i.syslog.Warnf("error converting event of type %T to *k8sV1.Pod: %+v", event, event)
6371
continue
6472
}
6573

6674
i.syslog.Debugf("informer got new pod event for pod: %s %s", pod.Name, pod.Status.Phase)
67-
return pod, true
75+
go i.podHandler(pod)
6876
}
6977

7078
i.syslog.Warn("pod informer stopped unexpectedly")
71-
return nil, false
79+
return ok
7280
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package kubernetesrm
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
k8sV1 "k8s.io/api/core/v1"
8+
)
9+
10+
// Testing informer.go.
11+
func mockPodHandler(pod *k8sV1.Pod) {}
12+
13+
func TestStartInformer(t *testing.T) {
14+
podInterface := &mockPodInterface{}
15+
mockLogMessage := "mock log message"
16+
podInterface = &mockPodInterface{logMessage: &mockLogMessage}
17+
i := newInformer("testing", podInterface, mockPodHandler)
18+
assert.NotNil(t, i)
19+
}
20+
21+
// Testing nodes.go.
22+
23+
// TODO Testing events.go.
24+
25+
// TODO Testing preemptions.go.

master/internal/rm/kubernetesrm/nodes.go

+14-17
Original file line numberDiff line numberDiff line change
@@ -9,37 +9,34 @@ import (
99
"k8s.io/client-go/tools/cache"
1010
)
1111

12-
// Messages sent by the nodeInformer to the podsHandler.
13-
type nodeStatusUpdate struct {
14-
updatedNode *k8sV1.Node
15-
deletedNode *k8sV1.Node
16-
}
12+
type nodeCallbackFunc func(*k8sV1.Node, bool)
1713

1814
type nodeInformer struct {
19-
informer k8Informers.SharedInformerFactory
20-
syslog *logrus.Entry
21-
stop chan struct{}
15+
informer k8Informers.SharedInformerFactory
16+
nodeHandler nodeCallbackFunc
17+
syslog *logrus.Entry
18+
stop chan struct{}
2219
}
2320

24-
func newNodeInformer(clientSet k8sClient.Interface) *nodeInformer {
21+
func newNodeInformer(clientSet k8sClient.Interface, nodeHandler nodeCallbackFunc) *nodeInformer {
2522
return &nodeInformer{
2623
informer: k8Informers.NewSharedInformerFactoryWithOptions(
2724
clientSet, 0, []k8Informers.SharedInformerOption{}...),
28-
syslog: logrus.WithField("component", "nodeInformer"),
29-
stop: make(chan struct{}),
25+
nodeHandler: nodeHandler,
26+
syslog: logrus.WithField("component", "nodeInformer"),
27+
stop: make(chan struct{}),
3028
}
3129
}
3230

33-
func (n *nodeInformer) startNodeInformer() (nodeStatusUpdate, bool) {
34-
nodeStatus := nodeStatusUpdate{}
31+
func (n *nodeInformer) startNodeInformer() bool {
3532
err := true
3633
nodeInformer := n.informer.Core().V1().Nodes().Informer()
3734
nodeInformer.AddEventHandler(&cache.ResourceEventHandlerFuncs{
3835
AddFunc: func(obj interface{}) {
3936
node, ok := obj.(*k8sV1.Node)
4037
if ok {
4138
n.syslog.Debugf("node added %s", node.Name)
42-
nodeStatus.updatedNode = node
39+
go n.nodeHandler(node, true)
4340
} else {
4441
n.syslog.Warnf("error converting event of type %T to *k8sV1.Node: %+v", obj, obj)
4542
err = false
@@ -49,7 +46,7 @@ func (n *nodeInformer) startNodeInformer() (nodeStatusUpdate, bool) {
4946
node, ok := newObj.(*k8sV1.Node)
5047
if ok {
5148
n.syslog.Debugf("node updated %s", node.Name)
52-
nodeStatus.updatedNode = node
49+
go n.nodeHandler(node, true)
5350
} else {
5451
n.syslog.Warnf("error converting event of type %T to *k8sV1.Node: %+v", newObj, newObj)
5552
err = false
@@ -59,7 +56,7 @@ func (n *nodeInformer) startNodeInformer() (nodeStatusUpdate, bool) {
5956
node, ok := obj.(*k8sV1.Node)
6057
if ok {
6158
n.syslog.Debugf("node stopped %s", node.Name)
62-
nodeStatus.deletedNode = node
59+
go n.nodeHandler(node, false)
6360
} else {
6461
n.syslog.Warnf("error converting event of type %T to *k8sV1.Node: %+v", obj, obj)
6562
err = false
@@ -73,7 +70,7 @@ func (n *nodeInformer) startNodeInformer() (nodeStatusUpdate, bool) {
7370
}
7471
n.syslog.Info("node informer has started")
7572

76-
return nodeStatus, err
73+
return err
7774
}
7875

7976
func (n *nodeInformer) stopNodeInformer() {

master/internal/rm/kubernetesrm/pods.go

+22-19
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func (p *pods) Receive(ctx *actor.Context) error {
243243
}
244244

245245
case podStatusUpdate:
246-
p.receivePodStatusUpdate(ctx, msg)
246+
p.receivePodStatusUpdate(ctx, msg.updatedPod)
247247

248248
case podEventUpdate:
249249
p.receivePodEventUpdate(ctx, msg)
@@ -686,34 +686,27 @@ func (p *pods) deleteDoomedKubernetesResources(ctx *actor.Context) error {
686686

687687
func (p *pods) startPodInformer(ctx *actor.Context) error {
688688
for namespace := range p.namespaceToPoolName {
689-
i := newInformer(namespace, p.podInterfaces[namespace])
690-
pod, ok := i.startInformer()
689+
i := newInformer(namespace, p.podInterfaces[namespace],
690+
func(pod *k8sV1.Pod) { p.receivePodStatusUpdate(ctx, pod) })
691+
ok := i.startInformer()
691692

692693
if !ok {
693694
return errors.Errorf("pod informer for %s failed", namespace)
694695
}
695696

696697
p.informers = append(p.informers, i)
697-
ctx.Tell(ctx.Self(), podStatusUpdate{updatedPod: pod})
698698
}
699699
return nil
700700
}
701701

702702
func (p *pods) startNodeInformer(ctx *actor.Context) error {
703-
p.nodeInformer = newNodeInformer(p.clientSet)
704-
nodeStatus, ok := p.nodeInformer.startNodeInformer()
703+
p.nodeInformer = newNodeInformer(p.clientSet, p.receiveNodeStatusUpdate)
704+
ok := p.nodeInformer.startNodeInformer()
705705

706706
if !ok {
707707
return errors.Errorf("node informer failed")
708708
}
709709

710-
if nodeStatus.updatedNode != nil {
711-
p.currentNodes[nodeStatus.updatedNode.Name] = nodeStatus.updatedNode
712-
}
713-
714-
if nodeStatus.deletedNode != nil {
715-
delete(p.currentNodes, nodeStatus.deletedNode.Name)
716-
}
717710
return nil
718711
}
719712

@@ -783,20 +776,30 @@ func (p *pods) receiveStartTaskPod(ctx *actor.Context, msg StartTaskPod) error {
783776
return nil
784777
}
785778

786-
func (p *pods) receivePodStatusUpdate(ctx *actor.Context, msg podStatusUpdate) {
787-
ref, ok := p.podNameToPodHandler[msg.updatedPod.Name]
779+
func (p *pods) receiveNodeStatusUpdate(node *k8sV1.Node, toUpdate bool) {
780+
if node != nil {
781+
if toUpdate {
782+
p.currentNodes[node.Name] = node
783+
} else {
784+
delete(p.currentNodes, node.Name)
785+
}
786+
}
787+
}
788+
789+
func (p *pods) receivePodStatusUpdate(ctx *actor.Context, pod *k8sV1.Pod) {
790+
ref, ok := p.podNameToPodHandler[pod.Name]
788791
if !ok {
789-
ctx.Log().WithField("pod-name", msg.updatedPod.Name).Warn(
792+
ctx.Log().WithField("pod-name", pod.Name).Warn(
790793
"received pod status update for un-registered pod")
791794
return
792795
}
793796

794-
ctx.Tell(ref, msg)
797+
ctx.Tell(ref, podStatusUpdate{pod})
795798

796-
if containerID, ok := p.podNameToContainerID[msg.updatedPod.Name]; ok {
799+
if containerID, ok := p.podNameToContainerID[pod.Name]; ok {
797800
if state, ok := p.containerIDToSchedulingState[containerID]; ok {
798801
currState := sproto.SchedulingStateQueued
799-
if msg.updatedPod.Status.Phase == "Running" {
802+
if pod.Status.Phase == "Running" {
800803
currState = sproto.SchedulingStateScheduled
801804
}
802805
if currState != state {

0 commit comments

Comments
 (0)