diff --git a/pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go b/pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go index cc63731e8697..23e6c3e7a09b 100644 --- a/pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go +++ b/pkg/apis/autoscaling/v1alpha1/pa_lifecycle.go @@ -186,6 +186,11 @@ func (pas *PodAutoscalerStatus) MarkScaleTargetInitialized() { podCondSet.Manage(pas).MarkTrue(PodAutoscalerConditionScaleTargetInitialized) } +// IsSKSReady returns true if the PA condition denoting that SKS is ready. +func (pas *PodAutoscalerStatus) IsSKSReady() bool { + return pas.GetCondition(PodAutoscalerSKSReady).IsTrue() +} + // MarkSKSReady marks the PA condition denoting that SKS is ready. func (pas *PodAutoscalerStatus) MarkSKSReady() { podCondSet.Manage(pas).MarkTrue(PodAutoscalerSKSReady) diff --git a/pkg/apis/autoscaling/v1alpha1/pa_lifecycle_test.go b/pkg/apis/autoscaling/v1alpha1/pa_lifecycle_test.go index 1fcca085bfe5..799ed8d5e871 100644 --- a/pkg/apis/autoscaling/v1alpha1/pa_lifecycle_test.go +++ b/pkg/apis/autoscaling/v1alpha1/pa_lifecycle_test.go @@ -1212,3 +1212,18 @@ func TestIsScaleTargetInitialized(t *testing.T) { t.Errorf("after marking initially active: got: %v, want: %v", got, want) } } + +func TestIsSKSReady(t *testing.T) { + p := PodAutoscaler{} + if got, want := p.Status.IsSKSReady(), false; got != want { + t.Errorf("before marking SKS ready: got: %v, want: %v", got, want) + } + p.Status.MarkSKSReady() + if got, want := p.Status.IsSKSReady(), true; got != want { + t.Errorf("after marking SKS ready: got: %v, want: %v", got, want) + } + p.Status.MarkSKSNotReady("not ready") + if got, want := p.Status.IsSKSReady(), false; got != want { + t.Errorf("after marking SKS not ready: got: %v, want: %v", got, want) + } +} diff --git a/pkg/reconciler/autoscaling/kpa/kpa.go b/pkg/reconciler/autoscaling/kpa/kpa.go index 3a487578d9b3..b6acec22413b 100644 --- a/pkg/reconciler/autoscaling/kpa/kpa.go +++ b/pkg/reconciler/autoscaling/kpa/kpa.go @@ -44,6 +44,8 @@ import ( corev1listers "k8s.io/client-go/listers/core/v1" ) +const noPrivateServiceName = "No Private Service Name" + // podCounts keeps record of various numbers of pods // for each revision. type podCounts struct { @@ -85,7 +87,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pa *pav1alpha1.PodAutosc if _, err = c.ReconcileSKS(ctx, pa, nv1alpha1.SKSOperationModeServe, 0 /*numActivators == all*/); err != nil { return fmt.Errorf("error reconciling SKS: %w", err) } - pa.Status.MarkSKSNotReady("No Private Service Name") // In both cases this is true. + pa.Status.MarkSKSNotReady(noPrivateServiceName) // In both cases this is true. return computeStatus(ctx, pa, podCounts{want: scaleUnknown}, logger) } @@ -219,24 +221,30 @@ func reportMetrics(pa *pav1alpha1.PodAutoscaler, pc podCounts) error { } // computeActiveCondition updates the status of a PA given the current scale (got), desired scale (want) -// and the current status, as per the following table: +// active threshold (min), and the current status, as per the following table: // -// | Want | Got | Status | New status | -// | 0 | | | inactive | -// | >0 | < min | | activating | -// | >0 | >= min | | active | -// | -1 | < min | inactive | inactive | -// | -1 | < min | activating | activating | -// | -1 | < min | active | activating | -// | -1 | >= min | inactive | inactive | -// | -1 | >= min | activating | active | -// | -1 | >= min | active | active | +// | Want | Got | min | Status | New status | +// | 0 | | | | inactive | +// | >0 | < min | | | activating | +// | >0 | >= min | | | active | +// | -1 | < min | | inactive | inactive | +// | -1 | < min | | activating | activating | +// | -1 | < min | | active | activating | +// | -1 | >= min | | inactive | inactive | +// | -1 | >= min | 0 | activating | inactive | +// | -1 | >= min | 0 | active | inactive | <-- this case technically is impossible. +// | -1 | >= min | >0 | activating | active | +// | -1 | >= min | >0 | active | active | func computeActiveCondition(ctx context.Context, pa *pav1alpha1.PodAutoscaler, pc podCounts) { minReady := activeThreshold(ctx, pa) + if pc.ready >= minReady { + pa.Status.MarkScaleTargetInitialized() + } switch { - case pc.want == 0: - if pa.Status.IsActivating() { + // Need to check for minReady = 0 because in the initialScale 0 case, pc.want will be -1. + case pc.want == 0 || minReady == 0: + if pa.Status.IsActivating() && minReady > 0 { // We only ever scale to zero while activating if we fail to activate within the progress deadline. pa.Status.MarkInactive("TimedOut", "The target could not be activated.") } else { @@ -247,12 +255,19 @@ func computeActiveCondition(ctx context.Context, pa *pav1alpha1.PodAutoscaler, p if pc.want > 0 || !pa.Status.IsInactive() { pa.Status.MarkActivating( "Queued", "Requests to the target are being buffered as resources are provisioned.") + } else { + // This is for the initialScale 0 case. In the first iteration, minReady is 0, + // but for the following iterations, minReady is 1. pc.want will continue being + // -1 until we start receiving metrics, so we will end up here. + // Even though PA is already been marked as inactive in the first iteration, we + // still need to set it again. Otherwise reconciliation will fail with NewObservedGenFailure + // because we cannot go through one iteration of reconciliation without setting + // some status. + pa.Status.MarkInactive("NoTraffic", "The target is not receiving traffic.") } case pc.ready >= minReady: if pc.want > 0 || !pa.Status.IsInactive() { - pa.Status.MarkScaleTargetInitialized() - // SKS should already be active. pa.Status.MarkActive() } } diff --git a/pkg/reconciler/autoscaling/kpa/kpa_test.go b/pkg/reconciler/autoscaling/kpa/kpa_test.go index f7f5e109db56..74906e59bf44 100644 --- a/pkg/reconciler/autoscaling/kpa/kpa_test.go +++ b/pkg/reconciler/autoscaling/kpa/kpa_test.go @@ -109,6 +109,14 @@ func defaultConfigMapData() map[string]string { } } +func initialScaleZeroConfigMap() *autoscalerconfig.Config { + autoscalerConfig, _ := autoscalerconfig.NewConfigFromMap(defaultConfigMapData()) + autoscalerConfig.AllowZeroInitialScale = true + autoscalerConfig.InitialScale = 0 + autoscalerConfig.EnableScaleToZero = true + return autoscalerConfig +} + func defaultConfig() *config.Config { autoscalerConfig, _ := autoscalerconfig.NewConfigFromMap(defaultConfigMapData()) deploymentConfig, _ := deployment.NewConfigFromMap(map[string]string{ @@ -166,6 +174,15 @@ func sksNoConds(s *nv1a1.ServerlessService) { s.Status.Status = duckv1.Status{} } +func metricWithASConfig(ns, n string, asConfig *autoscalerconfig.Config, opts ...metricOption) *asv1a1.Metric { + pa := kpa(ns, n) + m := aresources.MakeMetric(pa, kmeta.ChildName(n, "-private"), asConfig) + for _, o := range opts { + o(m) + } + return m +} + func sks(ns, n string, so ...SKSOption) *nv1a1.ServerlessService { kpa := kpa(ns, n) s := aresources.MakeSKS(kpa, nv1a1.SKSOperationModeServe, scaling.MinActivators) @@ -211,6 +228,7 @@ func TestReconcile(t *testing.T) { unknownScale = scaleUnknown underscale = defaultScale - 1 overscale = defaultScale + 1 + asConfigKey = "" ) // Set up a default deployment with the appropriate scale so that we don't @@ -716,7 +734,7 @@ func TestReconcile(t *testing.T) { deploy(testNamespace, testRevision), }, defaultReady...), WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: kpa(testNamespace, testRevision, withScales(1, 0), + Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, withScales(1, 0), WithPASKSReady, WithPAMetricsService(privateSvc), WithNoTraffic("NoTraffic", "The target is not receiving traffic."), WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), @@ -753,7 +771,7 @@ func TestReconcile(t *testing.T) { deploy(testNamespace, testRevision), }, defaultReady...), WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: kpa(testNamespace, testRevision, WithPASKSReady, WithPAMetricsService(privateSvc), + Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, WithPASKSReady, WithPAMetricsService(privateSvc), WithNoTraffic("TimedOut", "The target could not be activated."), withScales(1, 0), WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), WithObservedGeneration(1)), @@ -1039,6 +1057,76 @@ func TestReconcile(t *testing.T) { Name: deployName, Patch: []byte(fmt.Sprintf(`[{"op":"replace","path":"/spec/replicas","value":%d}]`, 20)), }}, + }, { + Name: "initial scale zero: scale to zero", + Key: key, + Ctx: context.WithValue(context.WithValue(context.Background(), asConfigKey, initialScaleZeroConfigMap()), deciderKey, + decider(testNamespace, testRevision, -1, /* desiredScale */ + 0 /* ebc */, scaling.MinActivators)), + Objects: append([]runtime.Object{ + kpa(testNamespace, testRevision, withScales(0, -1), WithReachabilityReachable, + WithPAMetricsService(privateSvc), WithPASKSNotReady(noPrivateServiceName), + ), + // SKS won't be ready bc no ready endpoints, but private service name will be populated. + sks(testNamespace, testRevision, WithDeployRef(deployName), WithPrivateService), + metric(testNamespace, testRevision), + deploy(testNamespace, testRevision, func(d *appsv1.Deployment) { + d.Spec.Replicas = ptr.Int32(0) + }), + }, makeReadyPods(0, testNamespace, testRevision)...), + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: kpa(testNamespace, testRevision, markScaleTargetInitialized, + WithNoTraffic("NoTraffic", "The target is not receiving traffic."), + withScales(0, -1), WithReachabilityReachable, + WithPAMetricsService(privateSvc), WithObservedGeneration(1), + WithPASKSNotReady(""), + ), + }}, + }, { + Name: "initial scale zero: stay at zero", + Key: key, + Ctx: context.WithValue(context.WithValue(context.Background(), asConfigKey, initialScaleZeroConfigMap()), deciderKey, + decider(testNamespace, testRevision, -1, /* desiredScale */ + 0 /* ebc */, scaling.MinActivators)), + Objects: append([]runtime.Object{ + kpa(testNamespace, testRevision, markScaleTargetInitialized, withScales(0, scaleUnknown), + WithReachabilityReachable, WithPAMetricsService(privateSvc), WithPASKSNotReady(""), + ), + sks(testNamespace, testRevision, WithDeployRef(deployName), WithPrivateService), + metric(testNamespace, testRevision), + deploy(testNamespace, testRevision, func(d *appsv1.Deployment) { + d.Spec.Replicas = ptr.Int32(0) + }), + }, makeReadyPods(0, testNamespace, testRevision)...), + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: kpa(testNamespace, testRevision, WithPASKSNotReady(""), WithBufferedTraffic, markScaleTargetInitialized, + withScales(0, scaleUnknown), WithReachabilityReachable, + WithPAMetricsService(privateSvc), WithObservedGeneration(1), + ), + }}, + }, { + Name: "initial scale zero: scale to greater than zero", + Key: key, + Ctx: context.WithValue(context.WithValue(context.Background(), asConfigKey, initialScaleZeroConfigMap()), deciderKey, + decider(testNamespace, testRevision, 2, /* desiredScale */ + -42 /* ebc */, scaling.MinActivators)), + Objects: append([]runtime.Object{ + kpa(testNamespace, testRevision, markScaleTargetInitialized, withScales(2, 2), + WithReachabilityReachable, WithPAStatusService(testRevision), WithPAMetricsService(privateSvc), + WithPASKSReady, + ), + sks(testNamespace, testRevision, WithDeployRef(deployName), WithProxyMode, WithSKSReady, WithPrivateService), + metricWithASConfig(testNamespace, testRevision, initialScaleZeroConfigMap()), + deploy(testNamespace, testRevision, func(d *appsv1.Deployment) { + d.Spec.Replicas = ptr.Int32(2) + }), + }, makeReadyPods(2, testNamespace, testRevision)...), + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: kpa(testNamespace, testRevision, WithTraffic, WithPASKSReady, markScaleTargetInitialized, + withScales(2, 2), WithReachabilityReachable, WithPAStatusService(testRevision), + WithPAMetricsService(privateSvc), WithObservedGeneration(1), + ), + }}, }} table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { @@ -1061,6 +1149,10 @@ func TestReconcile(t *testing.T) { fakeDeciders.Create(ctx, d.(*scaling.Decider)) } + testConfigs := defaultConfig() + if asConfig := ctx.Value(asConfigKey); asConfig != nil { + testConfigs.Autoscaler = asConfig.(*autoscalerconfig.Config) + } psf := podscalable.Get(ctx) scaler := newScaler(ctx, psf, func(interface{}, time.Duration) {}) scaler.activatorProbe = func(*asv1a1.PodAutoscaler, http.RoundTripper) (bool, error) { return true, nil } @@ -1079,7 +1171,7 @@ func TestReconcile(t *testing.T) { servingclient.Get(ctx), listers.GetPodAutoscalerLister(), controller.GetEventRecorder(ctx), r, autoscaling.KPA, controller.Options{ - ConfigStore: &testConfigStore{config: defaultConfig()}, + ConfigStore: &testConfigStore{config: testConfigs}, }) })) } diff --git a/pkg/reconciler/autoscaling/kpa/scaler_test.go b/pkg/reconciler/autoscaling/kpa/scaler_test.go index a4d7e482a30f..4fc64091fbc2 100644 --- a/pkg/reconciler/autoscaling/kpa/scaler_test.go +++ b/pkg/reconciler/autoscaling/kpa/scaler_test.go @@ -437,6 +437,20 @@ func TestScaler(t *testing.T) { paMarkActivating(k, time.Now()) k.ObjectMeta.Annotations[autoscaling.InitialScaleAnnotationKey] = "5" }, + }, { + label: "reaching initial scale zero", + startReplicas: 0, + scaleTo: 0, + wantReplicas: 0, + wantScaling: false, + wantCBCount: 1, + paMutation: func(k *pav1alpha1.PodAutoscaler) { + paMarkInactive(k, time.Now()) + k.ObjectMeta.Annotations[autoscaling.InitialScaleAnnotationKey] = "0" + }, + configMutator: func(c *config.Config) { + c.Autoscaler.AllowZeroInitialScale = true + }, }} for _, test := range tests { diff --git a/test/e2e/initial_scale_test.go b/test/e2e/initial_scale_test.go index 4f24ad858137..ac27f4c3245c 100644 --- a/test/e2e/initial_scale_test.go +++ b/test/e2e/initial_scale_test.go @@ -37,7 +37,6 @@ import ( // the revision level. This test runs after the cluster wide flag allow-zero-initial-scale // is set to true. func TestInitScaleZero(t *testing.T) { - t.Skip() t.Parallel() cancel := logstream.Start(t) defer cancel()