Skip to content
5 changes: 5 additions & 0 deletions pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ func (pas *PodAutoscalerStatus) MarkScaleTargetInitialized() {
podCondSet.Manage(pas).MarkTrue(PodAutoscalerConditionScaleTargetInitialized)
}

// IsSKSReady returns true if the PA condition denoting that SKS is ready.
func (pas *PodAutoscalerStatus) IsSKSReady() bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this isn't actually needed after latest refactor, but I guess it's a reasonable method to have anyway 🤷🏼

return pas.GetCondition(PodAutoscalerSKSReady).IsTrue()
}

// MarkSKSReady marks the PA condition denoting that SKS is ready.
func (pas *PodAutoscalerStatus) MarkSKSReady() {
podCondSet.Manage(pas).MarkTrue(PodAutoscalerSKSReady)
Expand Down
15 changes: 15 additions & 0 deletions pkg/apis/autoscaling/v1alpha1/pa_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,3 +1212,18 @@ func TestIsScaleTargetInitialized(t *testing.T) {
t.Errorf("after marking initially active: got: %v, want: %v", got, want)
}
}

func TestIsSKSReady(t *testing.T) {
p := PodAutoscaler{}
if got, want := p.Status.IsSKSReady(), false; got != want {
t.Errorf("before marking SKS ready: got: %v, want: %v", got, want)
}
p.Status.MarkSKSReady()
if got, want := p.Status.IsSKSReady(), true; got != want {
t.Errorf("after marking SKS ready: got: %v, want: %v", got, want)
}
p.Status.MarkSKSNotReady("not ready")
if got, want := p.Status.IsSKSReady(), false; got != want {
t.Errorf("after marking SKS not ready: got: %v, want: %v", got, want)
}
}
47 changes: 31 additions & 16 deletions pkg/reconciler/autoscaling/kpa/kpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (
corev1listers "k8s.io/client-go/listers/core/v1"
)

const noPrivateServiceName = "No Private Service Name"

// podCounts keeps record of various numbers of pods
// for each revision.
type podCounts struct {
Expand Down Expand Up @@ -85,7 +87,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pa *pav1alpha1.PodAutosc
if _, err = c.ReconcileSKS(ctx, pa, nv1alpha1.SKSOperationModeServe, 0 /*numActivators == all*/); err != nil {
return fmt.Errorf("error reconciling SKS: %w", err)
}
pa.Status.MarkSKSNotReady("No Private Service Name") // In both cases this is true.
pa.Status.MarkSKSNotReady(noPrivateServiceName) // In both cases this is true.
return computeStatus(ctx, pa, podCounts{want: scaleUnknown}, logger)
}

Expand Down Expand Up @@ -219,24 +221,30 @@ func reportMetrics(pa *pav1alpha1.PodAutoscaler, pc podCounts) error {
}

// computeActiveCondition updates the status of a PA given the current scale (got), desired scale (want)
// and the current status, as per the following table:
// active threshold (min), and the current status, as per the following table:
//
// | Want | Got | Status | New status |
// | 0 | <any> | <any> | inactive |
// | >0 | < min | <any> | activating |
// | >0 | >= min | <any> | active |
// | -1 | < min | inactive | inactive |
// | -1 | < min | activating | activating |
// | -1 | < min | active | activating |
// | -1 | >= min | inactive | inactive |
// | -1 | >= min | activating | active |
// | -1 | >= min | active | active |
// | Want | Got | min | Status | New status |
// | 0 | <any> | <any> | <any> | inactive |
// | >0 | < min | <any> | <any> | activating |
// | >0 | >= min | <any> | <any> | active |
// | -1 | < min | <any> | inactive | inactive |
// | -1 | < min | <any> | activating | activating |
// | -1 | < min | <any> | active | activating |
// | -1 | >= min | <any> | inactive | inactive |
// | -1 | >= min | 0 | activating | inactive |
// | -1 | >= min | 0 | active | inactive | <-- this case technically is impossible.
// | -1 | >= min | >0 | activating | active |
// | -1 | >= min | >0 | active | active |
func computeActiveCondition(ctx context.Context, pa *pav1alpha1.PodAutoscaler, pc podCounts) {
minReady := activeThreshold(ctx, pa)
if pc.ready >= minReady {
pa.Status.MarkScaleTargetInitialized()
}

switch {
case pc.want == 0:
if pa.Status.IsActivating() {
// Need to check for minReady = 0 because in the initialScale 0 case, pc.want will be -1.
case pc.want == 0 || minReady == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to do || minReady==0? It seems that pc.want=0 is a superset of that? E.g. if minReady=0, pc.want will be 0 (the opposite is not true, though).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't overriding -1 with 0, so we are still hitting the pc.want = -1, minReady = 0 case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, I had that state somewhere in my head, but I guess we got rid of that :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah originally we were overriding -1 with 0, but with Markus' simplification we are able to get rid of it.

if pa.Status.IsActivating() && minReady > 0 {
// We only ever scale to zero while activating if we fail to activate within the progress deadline.
pa.Status.MarkInactive("TimedOut", "The target could not be activated.")
} else {
Expand All @@ -247,12 +255,19 @@ func computeActiveCondition(ctx context.Context, pa *pav1alpha1.PodAutoscaler, p
if pc.want > 0 || !pa.Status.IsInactive() {
pa.Status.MarkActivating(
"Queued", "Requests to the target are being buffered as resources are provisioned.")
} else {
// This is for the initialScale 0 case. In the first iteration, minReady is 0,
// but for the following iterations, minReady is 1. pc.want will continue being
// -1 until we start receiving metrics, so we will end up here.
// Even though PA is already been marked as inactive in the first iteration, we
// still need to set it again. Otherwise reconciliation will fail with NewObservedGenFailure
// because we cannot go through one iteration of reconciliation without setting
// some status.
pa.Status.MarkInactive("NoTraffic", "The target is not receiving traffic.")
Comment on lines +258 to +266
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I read it, this is redundant.
On first iteration we'll enter the first switch case and mark PA as Inactive.
On the next ones we'll enter here (0 < 1). But as you mentioned pc.want=-1 and the pa.Status==Inactive: thus the if above will always evaluate to false (unless we receive requests and positive metrics and pc.want becomes positive). Thus you're just marking Inactive with Inactive again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, we are marking Inactive with Inactive again here. This is still needed because we will be getting the NewObservedGenFailure during reconciliation post processing, because we cannot go through one iteration of reconciliation without setting some sort of status.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, so just updating the status with the same value updates ObsGen?
@whaught, Weston, if change in inputs didn't yield any change in status, why would this be an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@whaught i'm not sure if my understanding is correct. I think it's because PA has been updated during the reconciliation, therefore there's a difference in ObsGen and Gen, but the status is not updated, which causes the failure: https://github.com/knative/pkg/blob/deb6b33d2a6c114f596f52630e85c475bf43abce/reconciler/reconcile_common.go#L50-L53

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When Spec changes, we reset Ready to unknown with a dummy message that the reconciler didn't set anything for a new spec before calling ReconcileKind. The reconciler is expected to set something upon observation of a new generation (or is left with the default message as a warning)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder the spec change in PA is 🤔
But anyway, it's interesting, since this does not change ready (before and after would be unknown).

}

case pc.ready >= minReady:
if pc.want > 0 || !pa.Status.IsInactive() {
pa.Status.MarkScaleTargetInitialized()
// SKS should already be active.
pa.Status.MarkActive()
}
}
Expand Down
98 changes: 95 additions & 3 deletions pkg/reconciler/autoscaling/kpa/kpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ func defaultConfigMapData() map[string]string {
}
}

func initialScaleZeroConfigMap() *autoscalerconfig.Config {
autoscalerConfig, _ := autoscalerconfig.NewConfigFromMap(defaultConfigMapData())
autoscalerConfig.AllowZeroInitialScale = true
autoscalerConfig.InitialScale = 0
autoscalerConfig.EnableScaleToZero = true
return autoscalerConfig
}

func defaultConfig() *config.Config {
autoscalerConfig, _ := autoscalerconfig.NewConfigFromMap(defaultConfigMapData())
deploymentConfig, _ := deployment.NewConfigFromMap(map[string]string{
Expand Down Expand Up @@ -166,6 +174,15 @@ func sksNoConds(s *nv1a1.ServerlessService) {
s.Status.Status = duckv1.Status{}
}

func metricWithASConfig(ns, n string, asConfig *autoscalerconfig.Config, opts ...metricOption) *asv1a1.Metric {
pa := kpa(ns, n)
m := aresources.MakeMetric(pa, kmeta.ChildName(n, "-private"), asConfig)
for _, o := range opts {
o(m)
}
return m
}

func sks(ns, n string, so ...SKSOption) *nv1a1.ServerlessService {
kpa := kpa(ns, n)
s := aresources.MakeSKS(kpa, nv1a1.SKSOperationModeServe, scaling.MinActivators)
Expand Down Expand Up @@ -211,6 +228,7 @@ func TestReconcile(t *testing.T) {
unknownScale = scaleUnknown
underscale = defaultScale - 1
overscale = defaultScale + 1
asConfigKey = ""
)

// Set up a default deployment with the appropriate scale so that we don't
Expand Down Expand Up @@ -716,7 +734,7 @@ func TestReconcile(t *testing.T) {
deploy(testNamespace, testRevision),
}, defaultReady...),
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: kpa(testNamespace, testRevision, withScales(1, 0),
Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, withScales(1, 0),
WithPASKSReady, WithPAMetricsService(privateSvc),
WithNoTraffic("NoTraffic", "The target is not receiving traffic."),
WithPAStatusService(testRevision), WithPAMetricsService(privateSvc),
Expand Down Expand Up @@ -753,7 +771,7 @@ func TestReconcile(t *testing.T) {
deploy(testNamespace, testRevision),
}, defaultReady...),
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: kpa(testNamespace, testRevision, WithPASKSReady, WithPAMetricsService(privateSvc),
Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, WithPASKSReady, WithPAMetricsService(privateSvc),
WithNoTraffic("TimedOut", "The target could not be activated."), withScales(1, 0),
WithPAStatusService(testRevision), WithPAMetricsService(privateSvc),
WithObservedGeneration(1)),
Expand Down Expand Up @@ -1039,6 +1057,76 @@ func TestReconcile(t *testing.T) {
Name: deployName,
Patch: []byte(fmt.Sprintf(`[{"op":"replace","path":"/spec/replicas","value":%d}]`, 20)),
}},
}, {
Name: "initial scale zero: scale to zero",
Key: key,
Ctx: context.WithValue(context.WithValue(context.Background(), asConfigKey, initialScaleZeroConfigMap()), deciderKey,
decider(testNamespace, testRevision, -1, /* desiredScale */
0 /* ebc */, scaling.MinActivators)),
Objects: append([]runtime.Object{
kpa(testNamespace, testRevision, withScales(0, -1), WithReachabilityReachable,
WithPAMetricsService(privateSvc), WithPASKSNotReady(noPrivateServiceName),
),
// SKS won't be ready bc no ready endpoints, but private service name will be populated.
sks(testNamespace, testRevision, WithDeployRef(deployName), WithPrivateService),
metric(testNamespace, testRevision),
deploy(testNamespace, testRevision, func(d *appsv1.Deployment) {
d.Spec.Replicas = ptr.Int32(0)
}),
}, makeReadyPods(0, testNamespace, testRevision)...),
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: kpa(testNamespace, testRevision, markScaleTargetInitialized,
WithNoTraffic("NoTraffic", "The target is not receiving traffic."),
withScales(0, -1), WithReachabilityReachable,
WithPAMetricsService(privateSvc), WithObservedGeneration(1),
WithPASKSNotReady(""),
),
}},
}, {
Name: "initial scale zero: stay at zero",
Key: key,
Ctx: context.WithValue(context.WithValue(context.Background(), asConfigKey, initialScaleZeroConfigMap()), deciderKey,
decider(testNamespace, testRevision, -1, /* desiredScale */
0 /* ebc */, scaling.MinActivators)),
Objects: append([]runtime.Object{
kpa(testNamespace, testRevision, markScaleTargetInitialized, withScales(0, scaleUnknown),
WithReachabilityReachable, WithPAMetricsService(privateSvc), WithPASKSNotReady(""),
),
sks(testNamespace, testRevision, WithDeployRef(deployName), WithPrivateService),
metric(testNamespace, testRevision),
deploy(testNamespace, testRevision, func(d *appsv1.Deployment) {
d.Spec.Replicas = ptr.Int32(0)
}),
}, makeReadyPods(0, testNamespace, testRevision)...),
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: kpa(testNamespace, testRevision, WithPASKSNotReady(""), WithBufferedTraffic, markScaleTargetInitialized,
withScales(0, scaleUnknown), WithReachabilityReachable,
WithPAMetricsService(privateSvc), WithObservedGeneration(1),
),
}},
}, {
Name: "initial scale zero: scale to greater than zero",
Key: key,
Ctx: context.WithValue(context.WithValue(context.Background(), asConfigKey, initialScaleZeroConfigMap()), deciderKey,
decider(testNamespace, testRevision, 2, /* desiredScale */
-42 /* ebc */, scaling.MinActivators)),
Objects: append([]runtime.Object{
kpa(testNamespace, testRevision, markScaleTargetInitialized, withScales(2, 2),
WithReachabilityReachable, WithPAStatusService(testRevision), WithPAMetricsService(privateSvc),
WithPASKSReady,
),
sks(testNamespace, testRevision, WithDeployRef(deployName), WithProxyMode, WithSKSReady, WithPrivateService),
metricWithASConfig(testNamespace, testRevision, initialScaleZeroConfigMap()),
deploy(testNamespace, testRevision, func(d *appsv1.Deployment) {
d.Spec.Replicas = ptr.Int32(2)
}),
}, makeReadyPods(2, testNamespace, testRevision)...),
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: kpa(testNamespace, testRevision, WithTraffic, WithPASKSReady, markScaleTargetInitialized,
withScales(2, 2), WithReachabilityReachable, WithPAStatusService(testRevision),
WithPAMetricsService(privateSvc), WithObservedGeneration(1),
),
}},
}}

table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler {
Expand All @@ -1061,6 +1149,10 @@ func TestReconcile(t *testing.T) {
fakeDeciders.Create(ctx, d.(*scaling.Decider))
}

testConfigs := defaultConfig()
if asConfig := ctx.Value(asConfigKey); asConfig != nil {
testConfigs.Autoscaler = asConfig.(*autoscalerconfig.Config)
}
psf := podscalable.Get(ctx)
scaler := newScaler(ctx, psf, func(interface{}, time.Duration) {})
scaler.activatorProbe = func(*asv1a1.PodAutoscaler, http.RoundTripper) (bool, error) { return true, nil }
Expand All @@ -1079,7 +1171,7 @@ func TestReconcile(t *testing.T) {
servingclient.Get(ctx), listers.GetPodAutoscalerLister(),
controller.GetEventRecorder(ctx), r, autoscaling.KPA,
controller.Options{
ConfigStore: &testConfigStore{config: defaultConfig()},
ConfigStore: &testConfigStore{config: testConfigs},
})
}))
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/reconciler/autoscaling/kpa/scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,20 @@ func TestScaler(t *testing.T) {
paMarkActivating(k, time.Now())
k.ObjectMeta.Annotations[autoscaling.InitialScaleAnnotationKey] = "5"
},
}, {
label: "reaching initial scale zero",
startReplicas: 0,
scaleTo: 0,
wantReplicas: 0,
wantScaling: false,
wantCBCount: 1,
paMutation: func(k *pav1alpha1.PodAutoscaler) {
paMarkInactive(k, time.Now())
k.ObjectMeta.Annotations[autoscaling.InitialScaleAnnotationKey] = "0"
},
configMutator: func(c *config.Config) {
c.Autoscaler.AllowZeroInitialScale = true
},
}}

for _, test := range tests {
Expand Down
1 change: 0 additions & 1 deletion test/e2e/initial_scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
// the revision level. This test runs after the cluster wide flag allow-zero-initial-scale
// is set to true.
func TestInitScaleZero(t *testing.T) {
t.Skip()
t.Parallel()
cancel := logstream.Start(t)
defer cancel()
Expand Down