Skip to content

Commit

Permalink
more changes -- possibly too many
Browse files Browse the repository at this point in the history
  • Loading branch information
carolinaecalderon committed Jun 21, 2023
1 parent b0b20ed commit f3c7905
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 151 deletions.
4 changes: 2 additions & 2 deletions master/internal/rm/kubernetesrm/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (e *eventListener) startEventListener(ctx *actor.Context) {
context.TODO(), metaV1.ListOptions{})
if err != nil {
ctx.Log().WithError(err).Warnf("error retrieving internal resource version")
actors.NotifyAfter(ctx, defaultInformerBackoff, startEventListener{})
actors.NotifyAfter(ctx, defaultBackoff, startEventListener{})
return
}

Expand All @@ -79,7 +79,7 @@ func (e *eventListener) startEventListener(ctx *actor.Context) {
})
if err != nil {
ctx.Log().WithError(err).Warnf("error initializing event retry watcher")
actors.NotifyAfter(ctx, defaultInformerBackoff, startEventListener{})
actors.NotifyAfter(ctx, defaultBackoff, startEventListener{})
return
}

Expand Down
41 changes: 9 additions & 32 deletions master/internal/rm/kubernetesrm/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,65 +15,42 @@ import (
typedV1 "k8s.io/client-go/kubernetes/typed/core/v1"
)

const defaultInformerBackoff = 5 * time.Second

// messages that are sent to the informer.
type (
startInformer struct{}
)

// messages that are sent by the informer.
type (
podStatusUpdate struct {
updatedPod *k8sV1.Pod
}
)
const defaultBackoff = 5 * time.Second

// Informer struct TODO CAROLINA.
type Informer struct {
syslog *logrus.Entry
podInterface typedV1.PodInterface
// podsHandler *actor.Ref
}

// DefaultInformer is the global informer singleton.
var DefaultInformer *startInformer

// TODO CAROLINA -- actor.Prestart & startInformer

// TODO CAROLINA -- actor.PostStop

func newInformer(
namespace string,
podInterface typedV1.PodInterface,
// podsHandler *actor.Ref, // TODO CAROLINA
) *Informer {
return &Informer{
syslog: logrus.WithField("Informer namespace", namespace), // TODO CAROLINA
syslog: logrus.WithField("Informer", namespace),
podInterface: podInterface,
// podsHandler: podsHandler,
}
}

// TODO CAROLINA
func (i *Informer) startInformer() {
// TODO
// startInformer returns the updated pod, if any.
func (i *Informer) startInformer() *k8sV1.Pod {
pods, err := i.podInterface.List(
context.TODO(), metaV1.ListOptions{LabelSelector: determinedLabel})
if err != nil {
i.syslog.WithError(err).Warnf("error retrieving internal resource version")
return
return nil
}

rw, err := watchtools.NewRetryWatcher(pods.ResourceVersion, &cache.ListWatch{
WatchFunc: func(options metaV1.ListOptions) (watch.Interface, error) {
// TODO
return i.podInterface.Watch(
context.TODO(), metaV1.ListOptions{LabelSelector: determinedLabel})
},
})
if err != nil {
i.syslog.WithError(err).Warnf("error initializing pod retry watcher")
return
return nil
}

i.syslog.Info("pod informer is starting")
Expand All @@ -90,9 +67,9 @@ func (i *Informer) startInformer() {
}

i.syslog.Debugf("informer got new pod event for pod: %s %s", pod.Name, pod.Status.Phase)
// ctx.Tell(i.podsHandler, podStatusUpdate{updatedPod: pod})
return
return pod
}

i.syslog.Warn("pod informer stopped unexpectedly")
return nil
}
67 changes: 18 additions & 49 deletions master/internal/rm/kubernetesrm/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,72 +9,36 @@ import (
"k8s.io/client-go/tools/cache"
)

// Messages sent by the nodeInformer to itself.
type (
startNodeInformer struct{}
)

// Messages sent by the nodeInformer to the podsHandler.
type (
nodeStatusUpdate struct {
updatedNode *k8sV1.Node
deletedNode *k8sV1.Node
}
)
type nodeStatusUpdate struct {
updatedNode *k8sV1.Node
deletedNode *k8sV1.Node
}

type nodeInformer struct {
informer k8Informers.SharedInformerFactory
syslog *logrus.Entry
// podsHandler *actor.Ref
stop chan struct{}
stop chan struct{}
}

func newNodeInformer(clientSet k8sClient.Interface) *nodeInformer {
return &nodeInformer{
informer: k8Informers.NewSharedInformerFactoryWithOptions(
clientSet, 0, []k8Informers.SharedInformerOption{}...),
syslog: logrus.WithField("component", "nodeInformer"), // TODO CAROLINA
// podsHandler: podsHandler,
stop: make(chan struct{}),
}
}

// TODO CAROLINA: actor.PreStart ctx.Tell(ctx.Self(), startNodeInformer{}),
// startNodeInformer if err := n.startNodeInformer(ctx); err != nil { return err }
// actor.PostStop ctx.Log().Infof("shutting down node informer") close(n.stop)
// default ctx.Log().Errorf("unexpected message %T", msg) return actor.ErrUnexpectedMessage(ctx)
/*
func (n *nodeInformer) Receive(ctx *actor.Context) error {
switch msg := ctx.Message().(type) {
case actor.PreStart:
ctx.Tell(ctx.Self(), startNodeInformer{})
case startNodeInformer:
if err := n.startNodeInformer(); err != nil {
return err
}
case actor.PostStop:
ctx.Log().Infof("shutting down node informer")
close(n.stop)
default:
ctx.Log().Errorf("unexpected message %T", msg)
return actor.ErrUnexpectedMessage(ctx)
syslog: logrus.WithField("component", "nodeInformer"),
stop: make(chan struct{}),
}
return nil
}
*/

func (n *nodeInformer) startNodeInformer() error {
func (n *nodeInformer) startNodeInformer() nodeStatusUpdate {
nodeStatus := nodeStatusUpdate{}
nodeInformer := n.informer.Core().V1().Nodes().Informer()
nodeInformer.AddEventHandler(&cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node, ok := obj.(*k8sV1.Node)
if ok {
n.syslog.Debugf("node added %s", node.Name)
// ctx.Tell(n.podsHandler, nodeStatusUpdate{updatedNode: node}) // TODO CAROLINA
nodeStatus.updatedNode = node
} else {
n.syslog.Warnf("error converting event of type %T to *k8sV1.Node: %+v", obj, obj)
}
Expand All @@ -83,7 +47,7 @@ func (n *nodeInformer) startNodeInformer() error {
node, ok := newObj.(*k8sV1.Node)
if ok {
n.syslog.Debugf("node updated %s", node.Name)
// ctx.Tell(n.podsHandler, nodeStatusUpdate{updatedNode: node}) // TODO CAROLINA
nodeStatus.updatedNode = node
} else {
n.syslog.Warnf("error converting event of type %T to *k8sV1.Node: %+v", newObj, newObj)
}
Expand All @@ -92,7 +56,7 @@ func (n *nodeInformer) startNodeInformer() error {
node, ok := obj.(*k8sV1.Node)
if ok {
n.syslog.Debugf("node stopped %s", node.Name)
// ctx.Tell(n.podsHandler, nodeStatusUpdate{deletedNode: node}) // TODO CAROLINA
nodeStatus.deletedNode = node
} else {
n.syslog.Warnf("error converting event of type %T to *k8sV1.Node: %+v", obj, obj)
}
Expand All @@ -105,5 +69,10 @@ func (n *nodeInformer) startNodeInformer() error {
}
n.syslog.Info("node informer has started")

return nil
return nodeStatus
}

func (n *nodeInformer) stopNodeInformer() {
n.syslog.Infof("shutting down node informer")
close(n.stop)
}
8 changes: 4 additions & 4 deletions master/internal/rm/kubernetesrm/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ func (p *pod) Receive(ctx *actor.Context) error {
case resourceCreationFailed:
p.receiveResourceCreationFailed(ctx, msg)

case podStatusUpdate:
if err := p.receivePodStatusUpdate(ctx, msg); err != nil {
case k8sV1.Pod:
if err := p.receivePodStatusUpdate(ctx, &msg); err != nil {
return err
}

Expand Down Expand Up @@ -262,8 +262,8 @@ func (p *pod) receiveResourceCreationFailed(ctx *actor.Context, msg resourceCrea
ctx.Self().Stop()
}

func (p *pod) receivePodStatusUpdate(ctx *actor.Context, msg podStatusUpdate) error {
p.pod = msg.updatedPod
func (p *pod) receivePodStatusUpdate(ctx *actor.Context, pod *k8sV1.Pod) error {
p.pod = pod

containerState, err := getPodState(ctx, p.pod, p.containerNames)
if err != nil {
Expand Down
45 changes: 18 additions & 27 deletions master/internal/rm/kubernetesrm/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,13 @@ func cleanup(t *testing.T) {

func checkReceiveTermination(
t *testing.T,
update podStatusUpdate,
update *k8sV1.Pod,
system *actor.System,
ref *actor.Ref,
newPod *pod,
podMap map[string]*mockReceiver,
) {
system.Ask(ref, update)
// system.Ask(ref, update) // TODO CAROLINA
time.Sleep(time.Second)

assert.Equal(t, podMap["task"].GetLength(), 1)
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestReceivePodStatusUpdateTerminated(t *testing.T) {
Status: k8sV1.PodStatus{Phase: k8sV1.PodPending},
}

statusUpdate := podStatusUpdate{updatedPod: &pod}
statusUpdate := &pod

checkReceiveTermination(t, statusUpdate, system, ref, newPod, podMap)

Expand All @@ -299,7 +299,7 @@ func TestReceivePodStatusUpdateTerminated(t *testing.T) {
ObjectMeta: objectMeta,
Status: k8sV1.PodStatus{Phase: k8sV1.PodFailed},
}
statusUpdate = podStatusUpdate{updatedPod: &pod}
statusUpdate = &pod

checkReceiveTermination(t, statusUpdate, system, ref, newPod, podMap)

Expand All @@ -312,7 +312,7 @@ func TestReceivePodStatusUpdateTerminated(t *testing.T) {
ObjectMeta: objectMeta,
Status: k8sV1.PodStatus{Phase: k8sV1.PodSucceeded},
}
statusUpdate = podStatusUpdate{updatedPod: &pod}
statusUpdate = &pod

checkReceiveTermination(t, statusUpdate, system, ref, newPod, podMap)

Expand Down Expand Up @@ -358,9 +358,7 @@ func TestMultipleContainerTerminate(t *testing.T) {
ContainerStatuses: containerStatuses,
},
}

statusUpdate := podStatusUpdate{updatedPod: &pod}
checkReceiveTermination(t, statusUpdate, system, ref, newPod, podMap)
checkReceiveTermination(t, &pod, system, ref, newPod, podMap)

// Multiple pods, 1 termination, no deletion timestamp.
// This results in an error, which causes pod termination and the same outcome.
Expand All @@ -379,9 +377,7 @@ func TestMultipleContainerTerminate(t *testing.T) {
ContainerStatuses: containerStatuses,
},
}

statusUpdate = podStatusUpdate{updatedPod: &pod}
checkReceiveTermination(t, statusUpdate, system, ref, newPod, podMap)
checkReceiveTermination(t, &pod, system, ref, newPod, podMap)
}

func TestReceivePodStatusUpdateAssigned(t *testing.T) {
Expand All @@ -402,16 +398,14 @@ func TestReceivePodStatusUpdateAssigned(t *testing.T) {
Status: k8sV1.PodStatus{Phase: k8sV1.PodPending},
}

statusUpdate := podStatusUpdate{updatedPod: &pod}

assert.Equal(t, newPod.container.State, cproto.Assigned)
system.Ask(ref, statusUpdate)
system.Ask(ref, &pod) // TODO CAROLINA
time.Sleep(time.Second)
assert.Equal(t, podMap["task"].GetLength(), 0)

newPod.container.State = cproto.Starting

system.Ask(ref, statusUpdate)
// system.Ask(ref, &pod) // TODO CAROLINA
time.Sleep(time.Second)
assert.Equal(t, podMap["task"].GetLength(), 0)
assert.Equal(t, newPod.container.State, cproto.Starting)
Expand Down Expand Up @@ -445,15 +439,14 @@ func TestReceivePodStatusUpdateStarting(t *testing.T) {
ObjectMeta: objectMeta,
Status: status,
}
statusUpdate := podStatusUpdate{updatedPod: &pod}

system.Ask(ref, statusUpdate)
system.Ask(ref, &pod) // TODO CAROLINA
time.Sleep(time.Second)

assert.Equal(t, podMap["task"].GetLength(), 2)
assert.Equal(t, newPod.container.State, cproto.Starting)
podMap["task"].Purge()
system.Ask(ref, statusUpdate)
system.Ask(ref, &pod) // TODO CAROLINA
time.Sleep(time.Second)
assert.Equal(t, podMap["task"].GetLength(), 0)
assert.Equal(t, newPod.container.State, cproto.Starting)
Expand Down Expand Up @@ -483,9 +476,8 @@ func TestReceivePodStatusUpdateStarting(t *testing.T) {
ObjectMeta: objectMeta,
Status: status,
}
statusUpdate = podStatusUpdate{updatedPod: &pod}

system.Ask(ref, statusUpdate)
system.Ask(ref, &pod) // TODO CAROLINA
time.Sleep(time.Second)

assert.Equal(t, podMap["task"].GetLength(), 2)
Expand All @@ -507,8 +499,8 @@ func TestReceivePodStatusUpdateStarting(t *testing.T) {
ObjectMeta: objectMeta,
Status: status,
}
statusUpdate = podStatusUpdate{updatedPod: &pod}
system.Ask(ref, statusUpdate)

system.Ask(ref, &pod) // TODO CAROLINA
time.Sleep(time.Second)
assert.Equal(t, podMap["task"].GetLength(), 2)
assert.Equal(t, newPod.container.State, cproto.Starting)
Expand Down Expand Up @@ -558,9 +550,8 @@ func TestMultipleContainersRunning(t *testing.T) {
"determined-fluent-container",
"test-pod",
})
statusUpdate := podStatusUpdate{updatedPod: &pod}

system.Ask(ref, statusUpdate)
system.Ask(ref, &pod) // TODO CAROLINA
time.Sleep(time.Second)
assert.Equal(t, podMap["task"].GetLength(), 0)
assert.Equal(t, newPod.container.State, cproto.Starting)
Expand All @@ -586,8 +577,8 @@ func TestMultipleContainersRunning(t *testing.T) {
ObjectMeta: objectMeta,
Status: status,
}
statusUpdate = podStatusUpdate{updatedPod: &pod}
system.Ask(ref, statusUpdate)

system.Ask(ref, &pod) // TODO CAROLINA
time.Sleep(time.Second)

assert.Equal(t, podMap["task"].GetLength(), 1)
Expand Down Expand Up @@ -704,7 +695,7 @@ func TestReceiveContainerLog(t *testing.T) {
newPod.containerNames = set.FromSlice([]string{
"sample-container",
})
statusUpdate := podStatusUpdate{updatedPod: &pod}
statusUpdate := &pod

system.Ask(ref, statusUpdate)
time.Sleep(time.Second)
Expand Down
Loading

0 comments on commit f3c7905

Please sign in to comment.