Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pkg/apis/flows/v1alpha1/flow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ const (

// FlowConditionSubscriptionReady specifies that the Subscription has been configured successfully.
FlowConditionSubscriptionReady FlowConditionType = "SubscriptionReady"

// FlowConditionActionTargetResolved specifies that the Action Target has been resolved
FlowConditionActionTargetResolved FlowConditionType = "ActionTargetResolved"
)

// FlowCondition defines a readiness condition for a Flow.
Expand Down Expand Up @@ -265,6 +268,16 @@ func (fs *FlowStatus) removeCondition(t FlowConditionType) {
fs.Conditions = conditions
}

func (fs *FlowStatus) PropagateActionTargetResolved(status corev1.ConditionStatus, reason string, message string) {
fs.setCondition(&FlowCondition{
Type: FlowConditionActionTargetResolved,
Status: status,
Reason: reason,
Message: message,
})
fs.checkAndMarkReady()
}

func (fs *FlowStatus) PropagateChannelStatus(cs channelsv1alpha1.ChannelStatus) {
cc := cs.GetCondition(channelsv1alpha1.ChannelReady)

Expand Down Expand Up @@ -345,6 +358,7 @@ func (fs *FlowStatus) checkAndMarkReady() {
FlowConditionFeedReady,
FlowConditionChannelReady,
FlowConditionSubscriptionReady,
FlowConditionActionTargetResolved,
} {
c := fs.GetCondition(cond)
if c == nil || c.Status != corev1.ConditionTrue {
Expand Down
54 changes: 49 additions & 5 deletions pkg/apis/flows/v1alpha1/flow_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ import (
corev1 "k8s.io/api/core/v1"
)

const (
actionTargetResolveFailMessage = "action target failed to resolve"
actionTargetResolveSuccessMessage = "action target resolved"

actionTargetResolveFailReason = "ActionTargetResolveFailed"
actionTargetResolveSuccessReason = "ActionTargetResolveSucceeded"
)

func TestFlowCondition_GetConditionNotFound(t *testing.T) {
flow := Flow{}
flow.Status.setCondition(&FlowCondition{Type: FlowConditionReady})
Expand Down Expand Up @@ -170,6 +178,20 @@ func TestFlowCondition_IsReady(t *testing.T) {
}, {
Type: FlowConditionSubscriptionReady,
Status: corev1.ConditionTrue,
}},
false},
{"FlowConditionFeedReadyChannelReadySubscriptionDispatchingActionTargetResolved", []FlowCondition{{
Type: FlowConditionFeedReady,
Status: corev1.ConditionTrue,
}, {
Type: FlowConditionChannelReady,
Status: corev1.ConditionTrue,
}, {
Type: FlowConditionSubscriptionReady,
Status: corev1.ConditionTrue,
}, {
Type: FlowConditionActionTargetResolved,
Status: corev1.ConditionTrue,
}},
true},
}
Expand All @@ -190,17 +212,21 @@ func TestFlowCondition_IsReady(t *testing.T) {
}

func TestFlowCondition_PropagateStatus(t *testing.T) {
// These just get set by the

testcases := []struct {
name string
feedStatuses []feedsv1alpha1.FeedStatus
channelStatuses []channelsv1alpha1.ChannelStatus
subscriptionStatuses []channelsv1alpha1.SubscriptionStatus
want bool
name string
feedStatuses []feedsv1alpha1.FeedStatus
channelStatuses []channelsv1alpha1.ChannelStatus
subscriptionStatuses []channelsv1alpha1.SubscriptionStatus
actionTargetResolveStatus corev1.ConditionStatus
want bool
}{
{"NothingReady",
[]feedsv1alpha1.FeedStatus{feedsv1alpha1.FeedStatus{}},
[]channelsv1alpha1.ChannelStatus{channelsv1alpha1.ChannelStatus{}},
[]channelsv1alpha1.SubscriptionStatus{channelsv1alpha1.SubscriptionStatus{}},
corev1.ConditionFalse,
false},
{"FeedReady",
[]feedsv1alpha1.FeedStatus{{
Expand All @@ -211,6 +237,7 @@ func TestFlowCondition_PropagateStatus(t *testing.T) {
}},
[]channelsv1alpha1.ChannelStatus{},
[]channelsv1alpha1.SubscriptionStatus{},
corev1.ConditionFalse,
false},
{"ChannelReady",
[]feedsv1alpha1.FeedStatus{},
Expand All @@ -222,6 +249,7 @@ func TestFlowCondition_PropagateStatus(t *testing.T) {
DomainInternal: "foobar-channel.default.svc.cluster.local",
}},
[]channelsv1alpha1.SubscriptionStatus{},
corev1.ConditionFalse,
false},
{"SubscriptionReady",
[]feedsv1alpha1.FeedStatus{},
Expand All @@ -232,6 +260,13 @@ func TestFlowCondition_PropagateStatus(t *testing.T) {
Status: corev1.ConditionTrue,
}},
}},
corev1.ConditionFalse,
false},
{"ActionTargetResolved",
[]feedsv1alpha1.FeedStatus{},
[]channelsv1alpha1.ChannelStatus{},
[]channelsv1alpha1.SubscriptionStatus{},
corev1.ConditionTrue,
false},
{"AllNotReady",
[]feedsv1alpha1.FeedStatus{{
Expand All @@ -253,6 +288,7 @@ func TestFlowCondition_PropagateStatus(t *testing.T) {
Status: corev1.ConditionFalse,
}},
}},
corev1.ConditionFalse,
false},
{"AllReady",
[]feedsv1alpha1.FeedStatus{{
Expand All @@ -274,6 +310,7 @@ func TestFlowCondition_PropagateStatus(t *testing.T) {
Status: corev1.ConditionTrue,
}},
}},
corev1.ConditionTrue,
true},
}
for _, tc := range testcases {
Expand All @@ -289,6 +326,13 @@ func TestFlowCondition_PropagateStatus(t *testing.T) {
for _, ss := range tc.subscriptionStatuses {
flow.Status.PropagateSubscriptionStatus(ss)
}
if tc.actionTargetResolveStatus == corev1.ConditionTrue {
flow.Status.PropagateActionTargetResolved(corev1.ConditionTrue, actionTargetResolveSuccessReason, actionTargetResolveSuccessMessage)
} else if tc.actionTargetResolveStatus == corev1.ConditionFalse {
flow.Status.PropagateActionTargetResolved(corev1.ConditionFalse, actionTargetResolveFailReason, actionTargetResolveFailMessage)
} else {
flow.Status.PropagateActionTargetResolved(corev1.ConditionUnknown, "Unknown", "Unknown")
}
if want, got := tc.want, flow.Status.IsReady(); want != got {
t.Fatalf("Failed IsReady check : \nwant:\t%#v\ngot:\t%#v", want, got)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,12 @@ func (c *Controller) reconcile(flow *v1alpha1.Flow) error {
target, err := c.resolveActionTarget(flow.Namespace, flow.Spec.Action)
if err != nil {
glog.Warningf("Failed to resolve target %v : %v", flow.Spec.Action, err)
flow.Status.PropagateActionTargetResolved(corev1.ConditionFalse, "ActionTargetNotResolved", err.Error())
return err
}

flow.Status.PropagateActionTargetResolved(corev1.ConditionTrue, "ActionTargetResolved", fmt.Sprintf("Resolved to: %q", target))

// Ok, so target is the underlying k8s service (or URI if so specified) that we want to target
glog.Infof("Resolved Target to: %q", target)

Expand Down
4 changes: 2 additions & 2 deletions test/e2e-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ function wait_until_flow_ready() {
fi
done

if [ $typeCount -eq 4 ]; then
if [ $readyCount -eq 4 ]; then
if [ $typeCount -eq 5 ]; then
if [ $readyCount -eq 5 ]; then
return 0
fi
fi
Expand Down