Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions pkg/router/controller/contention.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"

routev1 "github.com/openshift/api/route/v1"
)

Expand Down Expand Up @@ -258,15 +255,36 @@ func ingressEqual(a, b *routev1.RouteIngress) bool {
}

// ingressConditionsEqual determines if the route ingress conditions are equal,
// while ignoring LastTransitionTime
// while ignoring LastTransitionTime.
func ingressConditionsEqual(a, b []routev1.RouteIngressCondition) bool {
conditionCmpOpts := []cmp.Option{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(routev1.RouteIngressCondition{}, "LastTransitionTime"),
cmpopts.SortSlices(func(a, b routev1.RouteIngressCondition) bool { return a.Type < b.Type }),
if len(a) != len(b) {
return false
}

return cmp.Equal(a, b, conditionCmpOpts...)
// Compare each condition in a with every condition in b.
// Given the current max of only two conditions, nested loops are more efficient than sorting.
for i := 0; i < len(a); i++ {
matchFound := false
for j := 0; j < len(b); j++ {
if conditionsEqual(&a[i], &b[j]) {
matchFound = true
break
}
}
if !matchFound {
return false
}
}

return true
}

// conditionsEqual compares two RouteIngressConditions, ignoring LastTransitionTime.
func conditionsEqual(a, b *routev1.RouteIngressCondition) bool {
return a.Type == b.Type &&
a.Status == b.Status &&
a.Reason == b.Reason &&
a.Message == b.Message
}

func ingressConditionTouched(ingress *routev1.RouteIngress) *metav1.Time {
Expand Down
1 change: 1 addition & 0 deletions pkg/router/controller/extended_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (p *ExtendedValidator) HandleEndpoints(eventType watch.EventType, endpoints

// HandleRoute processes watch events on the Route resource.
func (p *ExtendedValidator) HandleRoute(eventType watch.EventType, route *routev1.Route) error {
log.V(10).Info("HandleRoute: ExtendedValidator")
// Check if previously seen route and its Spec is unchanged.
routeName := routeNameKey(route)
if err := routeapihelpers.ExtendedValidateRoute(route).ToAggregate(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/router/controller/host_admitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (p *HostAdmitter) HandleEndpoints(eventType watch.EventType, endpoints *kap

// HandleRoute processes watch events on the Route resource.
func (p *HostAdmitter) HandleRoute(eventType watch.EventType, route *routev1.Route) error {
log.V(10).Info("HandleRoute: HostAdmitter")
if p.allowedNamespaces != nil && !p.allowedNamespaces.Has(route.Namespace) {
// Ignore routes we don't need to "service" due to namespace
// restrictions (ala for sharding).
Expand Down
3 changes: 3 additions & 0 deletions pkg/router/controller/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ var nowFn = getRfc3339Timestamp

// HandleRoute attempts to admit the provided route on watch add / modifications.
func (a *StatusAdmitter) HandleRoute(eventType watch.EventType, route *routev1.Route) error {
log.V(10).Info("HandleRoute: StatusAdmitter")
switch eventType {
case watch.Added, watch.Modified:
performIngressConditionUpdate("admit", a.lease, a.tracker, a.client, a.lister, route, a.routerName, a.routerCanonicalHostname, routev1.RouteIngressCondition{
Expand Down Expand Up @@ -232,6 +233,8 @@ func performIngressConditionRemoval(action string, lease writerlease.Lease, trac
// attempts to update the route status and, depending on the outcome, clears the tracker if necessary. It returns the
// writerlease's WorkResult and a boolean flag indicating whether the writerlease should retry.
func handleRouteStatusUpdate(ctx context.Context, action string, oc client.RoutesGetter, route *routev1.Route, latest *routev1.RouteIngress, tracker ContentionTracker) (workResult writerlease.WorkResult, retry bool) {
log.V(4).Info("attempting to update route status")

switch _, err := oc.Routes(route.Namespace).UpdateStatus(ctx, route, metav1.UpdateOptions{}); {
case err == nil:
log.V(4).Info("updated route status", "action", action, "namespace", route.Namespace, "name", route.Name)
Expand Down
82 changes: 82 additions & 0 deletions pkg/router/controller/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,22 @@ func TestRouterContentionOnCondition(t *testing.T) {
}},
expectContend: true,
},
{
name: "changing condition status with empty reason and message causes contention",
conditions: []routev1.RouteIngressCondition{{
Type: routev1.RouteAdmitted,
Status: kapi.ConditionTrue,
LastTransitionTime: &notNow,
}},
updateConditions: []routev1.RouteIngressCondition{{
Type: routev1.RouteAdmitted,
Status: kapi.ConditionFalse,
Reason: "foo",
Message: "foo",
LastTransitionTime: &now,
}},
expectContend: true,
},
{
name: "changing condition reason causes contention",
conditions: []routev1.RouteIngressCondition{{
Expand Down Expand Up @@ -1074,6 +1090,72 @@ func TestRouterContentionOnCondition(t *testing.T) {
}
}

// Benchmark_ingressConditionsEqual benchmarks the ingressConditionEqual function. Efficiency is crucial for
// this function as it directly impacts the performance of the contention tracker, potentially delaying the
// detection of contentions.
func Benchmark_ingressConditionsEqual(b *testing.B) {
now := metav1.Now()
notNow := metav1.Time{Time: now.Add(3 * time.Minute)}
admittedCondition := routev1.RouteIngressCondition{
Type: routev1.RouteAdmitted,
Status: kapi.ConditionTrue,
Reason: "foo",
Message: "foo",
LastTransitionTime: &now,
}
unservableCondition := routev1.RouteIngressCondition{
Type: routev1.RouteUnservableInFutureVersions,
Status: kapi.ConditionFalse,
Reason: "bar",
Message: "bar",
LastTransitionTime: &notNow,
}
testCases := []struct {
name string
condA []routev1.RouteIngressCondition
condB []routev1.RouteIngressCondition
}{
{
name: "single",
condA: []routev1.RouteIngressCondition{
admittedCondition,
},
condB: []routev1.RouteIngressCondition{
unservableCondition,
},
},
{
name: "mismatch_length",
condA: []routev1.RouteIngressCondition{
admittedCondition,
},
condB: []routev1.RouteIngressCondition{
unservableCondition,
admittedCondition,
},
},
{
name: "double",
condA: []routev1.RouteIngressCondition{
admittedCondition,
unservableCondition,
},
condB: []routev1.RouteIngressCondition{
unservableCondition,
admittedCondition,
},
},
}
for _, tc := range testCases {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although there's very minimal setup here you could call b.ResetTimer() to take any current or future setup costs out of the numbers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

b.ResetTimer()
b.Run(tc.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
ingressConditionsEqual(tc.condA, tc.condB)
}
})
}
}

// TestStatusUnservableInFutureVersions tests the StatusAdmitter's functionality for handling routes that are unservable
// in future version of OpenShift.
func TestStatusUnservableInFutureVersions(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/router/controller/unique_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (p *UniqueHost) HandleNode(eventType watch.EventType, node *kapi.Node) erro
// determines which component needs to be recalculated (which template) and then does so
// on demand.
func (p *UniqueHost) HandleRoute(eventType watch.EventType, route *routev1.Route) error {
log.V(10).Info("HandleRoute: UniqueHost")
if p.allowedNamespaces != nil && !p.allowedNamespaces.Has(route.Namespace) {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/router/controller/upgrade_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (p *UpgradeValidation) HandleEndpoints(eventType watch.EventType, endpoints
// It checks if the route is upgradeable to a future version of OpenShift
// and sets UnservableInFutureVersions condition if needed.
func (p *UpgradeValidation) HandleRoute(eventType watch.EventType, route *routev1.Route) error {
log.V(10).Info("HandleRoute: UpgradeValidation")
routeName := routeNameKey(route)

// Force add and force removal logic for debugging and testing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func NewBlueprintPlugin(cm templaterouter.ConfigManager) *BlueprintPlugin {

// HandleRoute processes watch events on blueprint routes.
func (p *BlueprintPlugin) HandleRoute(eventType watch.EventType, route *routev1.Route) error {
log.V(10).Info("HandleRoute: BlueprintPlugin")
switch eventType {
case watch.Added, watch.Modified:
return p.manager.AddBlueprint(route)
Expand Down
1 change: 1 addition & 0 deletions pkg/router/template/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (p *TemplatePlugin) HandleNode(eventType watch.EventType, node *kapi.Node)
// determines which component needs to be recalculated (which template) and then does so
// on demand.
func (p *TemplatePlugin) HandleRoute(eventType watch.EventType, route *routev1.Route) error {
log.V(10).Info("HandleRoute: TemplatePlugin")
switch eventType {
case watch.Added, watch.Modified:
p.Router.AddRoute(route)
Expand Down
4 changes: 3 additions & 1 deletion pkg/router/writerlease/writerlease.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (l *WriterLease) work() bool {
if leaseState == Follower {
// if we are following, continue to defer work until the lease expires
if remaining := leaseExpires.Sub(l.nowFn()); remaining > 0 {
log.V(4).Info("follower awaiting lease expiration", "worker", l.name, "leaseTimeRemaining", remaining)
log.V(4).Info("follower awaiting lease expiration", "worker", l.name, "key", key, "leaseTimeRemaining", remaining)
time.Sleep(remaining)
l.queue.Add(key)
l.queue.Done(key)
Expand Down Expand Up @@ -303,13 +303,15 @@ func (l *WriterLease) nextState(result WorkResult) {
case Election, Follower:
l.tick = 0
l.state = Leader
log.V(4).Info("state change: elected to leader", "worker", l.name)
}
l.expires = l.nowFn().Add(l.maxBackoff)
case Release:
switch l.state {
case Election, Leader:
l.tick = 0
l.state = Follower
log.V(4).Info("state change: demoted to follower", "worker", l.name)
case Follower:
l.tick++
}
Expand Down