From 2a59b50aabdbb850eb1a4ab17af3d8f84136a6bb Mon Sep 17 00:00:00 2001 From: Cybwan Date: Tue, 7 Jan 2025 15:45:25 +0800 Subject: [PATCH] feat: slow starting warm-up feature (#622) add slow starting warm-up feature Signed-off-by: Cybwan Signed-off-by: Lin Yang --- charts/fsm/README.md | 4 + charts/fsm/templates/fsm-rbac.yaml | 4 +- charts/fsm/templates/preset-mesh-config.yaml | 7 + charts/fsm/values.schema.json | 51 +++++++ charts/fsm/values.yaml | 13 ++ .../crds/config.flomesh.io_meshconfigs.yaml | 42 +++++ .../policy.flomesh.io_trafficwarmups.yaml | 97 ++++++++++++ pkg/announcements/types.go | 9 ++ pkg/apis/config/v1alpha3/mesh_config.go | 67 ++++++++ .../config/v1alpha3/zz_generated.deepcopy.go | 33 ++++ pkg/apis/policy/v1alpha1/traffic_warmup.go | 50 ++++++ .../policy/v1alpha1/zz_generated.deepcopy.go | 77 ++++++++++ .../policy/v1alpha1/zz_generated.register.go | 2 + pkg/catalog/mock_catalog_generated.go | 15 ++ pkg/catalog/trafficwarmup.go | 81 ++++++++++ pkg/catalog/types.go | 3 + pkg/constants/constants.go | 6 + .../v1alpha1/fake/fake_policy_client.go | 4 + .../v1alpha1/fake/fake_trafficwarmup.go | 144 ++++++++++++++++++ .../policy/v1alpha1/generated_expansion.go | 2 + .../typed/policy/v1alpha1/policy_client.go | 5 + .../typed/policy/v1alpha1/trafficwarmup.go | 66 ++++++++ .../informers/externalversions/generic.go | 2 + .../policy/v1alpha1/interface.go | 7 + .../policy/v1alpha1/trafficwarmup.go | 87 +++++++++++ .../policy/v1alpha1/expansion_generated.go | 8 + .../listers/policy/v1alpha1/trafficwarmup.go | 67 ++++++++ pkg/k8s/informers/informers.go | 1 + pkg/k8s/informers/types.go | 2 + pkg/messaging/broker.go | 2 + pkg/policy/client.go | 67 +++++--- pkg/policy/mock_client_generated.go | 15 ++ pkg/policy/types.go | 4 + pkg/sidecar/v1/providers/pipy/repo/jobs.go | 14 +- pkg/sidecar/v1/providers/pipy/repo/policy.go | 65 +++++++- 35 files changed, 1088 insertions(+), 35 deletions(-) create mode 100644 cmd/fsm-bootstrap/crds/policy.flomesh.io_trafficwarmups.yaml create mode 100644 pkg/apis/policy/v1alpha1/traffic_warmup.go create mode 100644 pkg/catalog/trafficwarmup.go create mode 100644 pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/fake/fake_trafficwarmup.go create mode 100644 pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/trafficwarmup.go create mode 100644 pkg/gen/client/policy/informers/externalversions/policy/v1alpha1/trafficwarmup.go create mode 100644 pkg/gen/client/policy/listers/policy/v1alpha1/trafficwarmup.go diff --git a/charts/fsm/README.md b/charts/fsm/README.md index 247cdc1b2..b26fa16e7 100644 --- a/charts/fsm/README.md +++ b/charts/fsm/README.md @@ -118,6 +118,7 @@ The following table lists the configurable parameters of the fsm chart and their | fsm.featureFlags.enableSidecarActiveHealthChecks | bool | `false` | Enable Sidecar active health checks | | fsm.featureFlags.enableSidecarPrettyConfig | bool | `true` | Enable Sidecar Pretty Config | | fsm.featureFlags.enableSnapshotCacheMode | bool | `false` | Enables SnapshotCache feature for Sidecar xDS server. | +| fsm.featureFlags.enableTrafficWarmupPolicy | bool | `false` | Enables traffic warmup feature | | fsm.featureFlags.enableValidateGRPCRouteHostnames | bool | `true` | Enable validate GRPC route hostnames, enforce the hostname is DNS name not IP address | | fsm.featureFlags.enableValidateGatewayListenerHostname | bool | `true` | Enable validate Gateway listener hostname, enforce the hostname is DNS name not IP address | | fsm.featureFlags.enableValidateHTTPRouteHostnames | bool | `true` | Enable validate HTTP route hostnames, enforce the hostname is DNS name not IP address | @@ -487,6 +488,9 @@ The following table lists the configurable parameters of the fsm chart and their | fsm.vault.secret.key | string | `""` | The Kubernetes secret key with the value bring the Vault token | | fsm.vault.secret.name | string | `""` | The Kubernetes secret name storing the Vault token used in FSM | | fsm.vault.token | string | `""` | token that should be used to connect to Vault | +| fsm.warmup | object | `{"duration":"90s","enable":false,"maxWeight":100,"minWeight":10}` | Global Traffic Warmup policy | +| fsm.warmup.maxWeight | int | `100` | MaxWeight configures the maximum percentage of origin weight -- If unspecified, defaults to 100 | +| fsm.warmup.minWeight | int | `10` | MinWeight configures the minimum percentage of origin weight -- If unspecified, defaults to 10 | | fsm.webhookConfigNamePrefix | string | `"fsm-webhook"` | Prefix used in name of the webhook configuration resources | | smi.validateTrafficTarget | bool | `true` | Enables validation of SMI Traffic Target | diff --git a/charts/fsm/templates/fsm-rbac.yaml b/charts/fsm/templates/fsm-rbac.yaml index 74d18eed1..5bfd3de88 100644 --- a/charts/fsm/templates/fsm-rbac.yaml +++ b/charts/fsm/templates/fsm-rbac.yaml @@ -89,10 +89,10 @@ rules: # FSM's custom policy API - apiGroups: ["policy.flomesh.io"] - resources: ["egresses", "egressgateways", "ingressbackends", "accesscontrols", "accesscerts", "isolations", "retries", "upstreamtrafficsettings"] + resources: ["egresses", "egressgateways", "ingressbackends", "accesscontrols", "accesscerts", "isolations", "retries", "upstreamtrafficsettings", "trafficwarmups"] verbs: ["list", "get", "watch"] - apiGroups: ["policy.flomesh.io"] - resources: ["ingressbackends/status", "accesscontrols/status", "accesscerts/status", "upstreamtrafficsettings/status"] + resources: ["ingressbackends/status", "accesscontrols/status", "accesscerts/status", "upstreamtrafficsettings/status", "trafficwarmup/status"] verbs: ["update"] # FSM's MultiCluster resource API diff --git a/charts/fsm/templates/preset-mesh-config.yaml b/charts/fsm/templates/preset-mesh-config.yaml index 17cbe286a..cf1ba20a5 100644 --- a/charts/fsm/templates/preset-mesh-config.yaml +++ b/charts/fsm/templates/preset-mesh-config.yaml @@ -46,6 +46,12 @@ data: "outboundIPRangeInclusionList": {{.Values.fsm.outboundIPRangeInclusionList | mustToJson}}, "networkInterfaceExclusionList": {{.Values.fsm.networkInterfaceExclusionList | mustToJson}} }, + "warmup": { + "enable": {{.Values.fsm.warmup.enable | mustToJson}}, + "duration": {{.Values.fsm.warmup.duration | mustToJson}}, + "minWeight": {{.Values.fsm.warmup.minWeight | mustToJson}}, + "maxWeight": {{.Values.fsm.warmup.maxWeight | mustToJson}} + }, "observability": { "fsmLogLevel": {{.Values.fsm.controllerLogLevel | mustToJson}}, "tracing": { @@ -86,6 +92,7 @@ data: "enableIngressBackendPolicy": {{.Values.fsm.featureFlags.enableIngressBackendPolicy | mustToJson}}, "enableAccessControlPolicy": {{.Values.fsm.featureFlags.enableAccessControlPolicy | mustToJson}}, "enableAccessCertPolicy": {{.Values.fsm.featureFlags.enableAccessCertPolicy | mustToJson}}, + "enableTrafficWarmupPolicy": {{.Values.fsm.featureFlags.enableTrafficWarmupPolicy | mustToJson}}, "enableSidecarPrettyConfig": {{.Values.fsm.featureFlags.enableSidecarPrettyConfig | mustToJson}}, "enableSidecarActiveHealthChecks": {{.Values.fsm.featureFlags.enableSidecarActiveHealthChecks | mustToJson}}, "enableRetryPolicy": {{.Values.fsm.featureFlags.enableRetryPolicy | mustToJson}}, diff --git a/charts/fsm/values.schema.json b/charts/fsm/values.schema.json index 9fd14adc7..4462f7b81 100644 --- a/charts/fsm/values.schema.json +++ b/charts/fsm/values.schema.json @@ -1140,6 +1140,47 @@ } } }, + "warmup": { + "$id": "#/properties/fsm/properties/warmup", + "type": "object", + "title": "The traffic warmup schema", + "required": [ + "enable", + "duration", + "minWeight", + "maxWeight" + ], + "properties": { + "enable": { + "$id": "#/properties/fsm/properties/warmup/properties/enable", + "type": "boolean", + "title": "The enable schema for traffic warmup", + "examples": [ + true + ] + }, + "duration": { + "$id": "#/properties/fsm/properties/warmup/properties/duration", + "type": "string", + "title": "The duration schema", + "description": "The traffic warmup duration.", + "examples": [ + "90s" + ] + }, + "minWeight": { + "$id": "#/properties/fsm/properties/warmup/properties/minWeight", + "type": "integer", + "title": "minWeight configures the minimum percentage" + }, + "maxWeight": { + "$id": "#/properties/fsm/properties/warmup/properties/maxWeight", + "type": "integer", + "title": "maxWeight configures the maximum percentage" + } + }, + "additionalProperties": false + }, "trustDomain": { "$id": "#/properties/fsm/properties/trustDomain", "type": "string", @@ -2417,6 +2458,7 @@ "enableIngressBackendPolicy", "enableAccessControlPolicy", "enableAccessCertPolicy", + "enableTrafficWarmupPolicy", "enableSidecarPrettyConfig", "enableSidecarActiveHealthChecks", "enableSnapshotCacheMode", @@ -2477,6 +2519,15 @@ true ] }, + "enableTrafficWarmupPolicy": { + "$id": "#/properties/fsm/properties/featureFlags/properties/enableTrafficWarmupPolicy", + "type": "boolean", + "title": "Enable traffic warmup feature", + "description": "Enable traffic warmup feature", + "examples": [ + true + ] + }, "enableSidecarPrettyConfig": { "$id": "#/properties/fsm/properties/featureFlags/properties/enableSidecarPrettyConfig", "type": "boolean", diff --git a/charts/fsm/values.yaml b/charts/fsm/values.yaml index b838dcb2a..d58a8f2ca 100644 --- a/charts/fsm/values.yaml +++ b/charts/fsm/values.yaml @@ -112,6 +112,17 @@ fsm: # -- codebase is the folder used by fsmController. codebase: "" + # -- Global Traffic Warmup policy + warmup: + enable: false + duration: 90s + # -- MinWeight configures the minimum percentage of origin weight + # -- If unspecified, defaults to 10 + minWeight: 10 + # -- MaxWeight configures the maximum percentage of origin weight + # -- If unspecified, defaults to 100 + maxWeight: 100 + pluginChains: inbound-tcp: - plugin: modules/inbound-tls-termination @@ -780,6 +791,8 @@ fsm: enableAccessControlPolicy: true # When enabled, FSM can issue certificates for external services. enableAccessCertPolicy: false + # -- Enables traffic warmup feature + enableTrafficWarmupPolicy: false # -- Enable Sidecar Pretty Config enableSidecarPrettyConfig: true # -- Enable Sidecar active health checks diff --git a/cmd/fsm-bootstrap/crds/config.flomesh.io_meshconfigs.yaml b/cmd/fsm-bootstrap/crds/config.flomesh.io_meshconfigs.yaml index f8ec649ec..f45a9a937 100644 --- a/cmd/fsm-bootstrap/crds/config.flomesh.io_meshconfigs.yaml +++ b/cmd/fsm-bootstrap/crds/config.flomesh.io_meshconfigs.yaml @@ -1473,6 +1473,10 @@ spec: description: EnableSnapshotCacheMode defines if XDS server starts with snapshot cache. type: boolean + enableTrafficWarmupPolicy: + description: EnableTrafficWarmupPolicy defines if FSM will use + the TrafficWarmup API to allow traffic warmup + type: boolean enableValidateGRPCRouteHostnames: description: EnableValidateGRPCRouteHostnames defines if validate grpc route hostnames is enabled. @@ -1506,6 +1510,7 @@ spec: - enableSidecarActiveHealthChecks - enableSidecarPrettyConfig - enableSnapshotCacheMode + - enableTrafficWarmupPolicy - enableValidateGRPCRouteHostnames - enableValidateGatewayListenerHostname - enableValidateHTTPRouteHostnames @@ -2451,6 +2456,43 @@ spec: - outboundPortExclusionList - serviceAccessMode type: object + warmup: + description: Warmup defines the traffic warm up policy + properties: + aggression: + default: 1 + description: |- + Aggression controls the speed of traffic increase over the warmup duration. Defaults to 1.0, so that endpoints would + get linearly increasing amount of traffic. When increasing the value for this parameter, + the speed of traffic ramp-up increases non-linearly. + minimum: 1 + type: number + duration: + default: 90s + format: duration + type: string + enable: + default: true + type: boolean + maxWeight: + default: 100 + description: |- + MaxWeight configures the maximum percentage of origin weight + If unspecified, defaults to 100 + format: int64 + maximum: 100 + minimum: 0 + type: integer + minWeight: + default: 10 + description: |- + MinWeight configures the minimum percentage of origin weight + If unspecified, defaults to 10 + format: int64 + maximum: 100 + minimum: 0 + type: integer + type: object required: - connector - image diff --git a/cmd/fsm-bootstrap/crds/policy.flomesh.io_trafficwarmups.yaml b/cmd/fsm-bootstrap/crds/policy.flomesh.io_trafficwarmups.yaml new file mode 100644 index 000000000..9f6433ba3 --- /dev/null +++ b/cmd/fsm-bootstrap/crds/policy.flomesh.io_trafficwarmups.yaml @@ -0,0 +1,97 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.16.5 + labels: + app.kubernetes.io/name: flomesh.io + name: trafficwarmups.policy.flomesh.io +spec: + group: policy.flomesh.io + names: + kind: TrafficWarmup + listKind: TrafficWarmupList + plural: trafficwarmups + shortNames: + - trafficwarmup + singular: trafficwarmup + scope: Namespaced + versions: + - name: v1alpha1 + schema: + openAPIV3Schema: + description: TrafficWarmup is the type used to represent a traffic warmup + policy. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: |- + Specification of the desired behavior of the traffic raffic warmup. + More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#spec-and-status + properties: + aggression: + default: 1 + description: |- + Aggression controls the speed of traffic increase over the warmup duration. Defaults to 1.0, so that endpoints would + get linearly increasing amount of traffic. When increasing the value for this parameter, + the speed of traffic ramp-up increases non-linearly. + minimum: 1 + type: number + duration: + default: 90s + format: duration + type: string + enable: + default: true + type: boolean + maxWeight: + default: 100 + description: |- + MaxWeight configures the maximum percentage of origin weight + If unspecified, defaults to 100 + format: int64 + maximum: 100 + minimum: 0 + type: integer + minWeight: + default: 10 + description: |- + MinWeight configures the minimum percentage of origin weight + If unspecified, defaults to 10 + format: int64 + maximum: 100 + minimum: 0 + type: integer + type: object + status: + description: Status defines the current state of TrafficWarmup. + properties: + currentStatus: + description: CurrentStatus defines the current status of a traffic + warmup resource. + type: string + reason: + description: Reason defines the reason for the current status of a + raffic warmup resource. + type: string + type: object + type: object + served: true + storage: true diff --git a/pkg/announcements/types.go b/pkg/announcements/types.go index 5c0140a62..2dd678f37 100644 --- a/pkg/announcements/types.go +++ b/pkg/announcements/types.go @@ -333,6 +333,15 @@ const ( // RetryPolicyUpdated is the type of announcement emitted when we observe an update to retries.policy.flomesh.io RetryPolicyUpdated Kind = "retry-updated" + // TrafficWarmupAdded is the type of announcement emitted when we observe an addition of trafficwarmups.policy.flomesh.io + TrafficWarmupAdded Kind = "trafficwarmup-added" + + // TrafficWarmupDeleted the type of announcement emitted when we observe a deletion of trafficwarmups.policy.flomesh.io + TrafficWarmupDeleted Kind = "trafficwarmup-deleted" + + // TrafficWarmupUpdated is the type of announcement emitted when we observe an update to trafficwarmups.policy.flomesh.io + TrafficWarmupUpdated Kind = "trafficwarmup-updated" + // UpstreamTrafficSettingAdded is the type of announcement emitted when we observe an addition of upstreamtrafficsettings.policy.flomesh.io UpstreamTrafficSettingAdded Kind = "upstreamtrafficsetting-added" diff --git a/pkg/apis/config/v1alpha3/mesh_config.go b/pkg/apis/config/v1alpha3/mesh_config.go index d9eba1e05..aa8c7bf70 100644 --- a/pkg/apis/config/v1alpha3/mesh_config.go +++ b/pkg/apis/config/v1alpha3/mesh_config.go @@ -1,6 +1,7 @@ package v1alpha3 import ( + "math" "strings" corev1 "k8s.io/api/core/v1" @@ -41,6 +42,9 @@ type MeshConfigSpec struct { // Traffic defines the traffic management configurations for a mesh instance. Traffic TrafficSpec `json:"traffic,omitempty"` + // Warmup defines the traffic warm up policy + Warmup TrafficWarmupSpec `json:"warmup,omitempty"` + // Observalility defines the observability configurations for a mesh instance. Observability ObservabilitySpec `json:"observability,omitempty"` @@ -211,6 +215,66 @@ type SidecarSpec struct { LocalDNSProxy LocalDNSProxy `json:"localDNSProxy,omitempty"` } +// TrafficWarmupSpec is the specification for a TrafficWarmup +type TrafficWarmupSpec struct { + // +kubebuilder:default=true + // +optional + Enable bool `json:"enable"` + + // +kubebuilder:validation:Format="duration" + // +kubebuilder:default="90s" + // +optional + Duration metav1.Duration `json:"duration,omitempty"` + + // MinWeight configures the minimum percentage of origin weight + // If unspecified, defaults to 10 + // +kubebuilder:default=10 + // +kubebuilder:validation:Maximum=100 + // +kubebuilder:validation:Minimum=0 + MinWeight *uint64 `json:"minWeight,omitempty"` + + // MaxWeight configures the maximum percentage of origin weight + // If unspecified, defaults to 100 + // +kubebuilder:default=100 + // +kubebuilder:validation:Maximum=100 + // +kubebuilder:validation:Minimum=0 + MaxWeight *uint64 `json:"maxWeight,omitempty"` + + // Aggression controls the speed of traffic increase over the warmup duration. Defaults to 1.0, so that endpoints would + // get linearly increasing amount of traffic. When increasing the value for this parameter, + // the speed of traffic ramp-up increases non-linearly. + // +kubebuilder:default=1.0 + // +kubebuilder:validation:Minimum=1 + Aggression *float64 `json:"aggression,omitempty"` +} + +func (tw *TrafficWarmupSpec) Weight(startTimestampSeconds, currTimestampSeconds int64) float64 { + maxWeight := uint64(100) + minWeight := uint64(10) + aggression := float64(1) + slowStartWindowSeconds := tw.Duration.Seconds() + + if tw.MaxWeight != nil { + maxWeight = *tw.MaxWeight + } + + if tw.MinWeight != nil { + minWeight = *tw.MinWeight + } + + if tw.Aggression != nil { + aggression = *tw.Aggression + } + + if currTimestampSeconds <= startTimestampSeconds { + return float64(minWeight) + } + + timeFactor := float64(currTimestampSeconds-startTimestampSeconds) / slowStartWindowSeconds + newWeight := float64(maxWeight) * math.Max(float64(minWeight)/float64(maxWeight), math.Pow(timeFactor, 1/float64(aggression))) + return math.Min(newWeight, float64(maxWeight)) +} + // TrafficSpec is the type used to represent FSM's traffic management configuration. type TrafficSpec struct { // InterceptionMode defines a string indicating which traffic interception mode is used. @@ -436,6 +500,9 @@ type FeatureFlags struct { // EnableAccessCertPolicy defines if FSM can issue certificates for external services.. EnableAccessCertPolicy bool `json:"enableAccessCertPolicy"` + // EnableTrafficWarmupPolicy defines if FSM will use the TrafficWarmup API to allow traffic warmup + EnableTrafficWarmupPolicy bool `json:"enableTrafficWarmupPolicy"` + // EnableSidecarPrettyConfig defines if pretty sidecar config is enabled. EnableSidecarPrettyConfig bool `json:"enableSidecarPrettyConfig"` diff --git a/pkg/apis/config/v1alpha3/zz_generated.deepcopy.go b/pkg/apis/config/v1alpha3/zz_generated.deepcopy.go index 8f6fcaa89..aa07b8274 100644 --- a/pkg/apis/config/v1alpha3/zz_generated.deepcopy.go +++ b/pkg/apis/config/v1alpha3/zz_generated.deepcopy.go @@ -433,6 +433,7 @@ func (in *MeshConfigSpec) DeepCopyInto(out *MeshConfigSpec) { in.Sidecar.DeepCopyInto(&out.Sidecar) out.RepoServer = in.RepoServer in.Traffic.DeepCopyInto(&out.Traffic) + in.Warmup.DeepCopyInto(&out.Warmup) in.Observability.DeepCopyInto(&out.Observability) in.Certificate.DeepCopyInto(&out.Certificate) out.FeatureFlags = in.FeatureFlags @@ -961,6 +962,38 @@ func (in *TrafficSpec) DeepCopy() *TrafficSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficWarmupSpec) DeepCopyInto(out *TrafficWarmupSpec) { + *out = *in + out.Duration = in.Duration + if in.MinWeight != nil { + in, out := &in.MinWeight, &out.MinWeight + *out = new(uint64) + **out = **in + } + if in.MaxWeight != nil { + in, out := &in.MaxWeight, &out.MaxWeight + *out = new(uint64) + **out = **in + } + if in.Aggression != nil { + in, out := &in.Aggression, &out.Aggression + *out = new(float64) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficWarmupSpec. +func (in *TrafficWarmupSpec) DeepCopy() *TrafficWarmupSpec { + if in == nil { + return nil + } + out := new(TrafficWarmupSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TresorCASpec) DeepCopyInto(out *TresorCASpec) { *out = *in diff --git a/pkg/apis/policy/v1alpha1/traffic_warmup.go b/pkg/apis/policy/v1alpha1/traffic_warmup.go new file mode 100644 index 000000000..f89e22b19 --- /dev/null +++ b/pkg/apis/policy/v1alpha1/traffic_warmup.go @@ -0,0 +1,50 @@ +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + configv1alpha3 "github.com/flomesh-io/fsm/pkg/apis/config/v1alpha3" +) + +// TrafficWarmup is the type used to represent a traffic warmup policy. +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +kubebuilder:metadata:labels=app.kubernetes.io/name=flomesh.io +// +kubebuilder:resource:shortName=trafficwarmup,scope=Namespaced +type TrafficWarmup struct { + metav1.TypeMeta `json:",inline"` + // Standard object's metadata. + // More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#metadata + // +optional + metav1.ObjectMeta `json:"metadata,omitempty"` + + // Specification of the desired behavior of the traffic raffic warmup. + // More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#spec-and-status + // +optional + Spec configv1alpha3.TrafficWarmupSpec `json:"spec,omitempty"` + + // Status defines the current state of TrafficWarmup. + Status TrafficWarmupStatus `json:"status,omitempty"` +} + +// TrafficWarmupStatus defines the common attributes that all filters should include within +// their status. +type TrafficWarmupStatus struct { + // CurrentStatus defines the current status of a traffic warmup resource. + // +optional + CurrentStatus string `json:"currentStatus,omitempty"` + + // Reason defines the reason for the current status of a raffic warmup resource. + // +optional + Reason string `json:"reason,omitempty"` +} + +// +kubebuilder:object:root=true +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type TrafficWarmupList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + + Items []TrafficWarmup `json:"items"` +} diff --git a/pkg/apis/policy/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/policy/v1alpha1/zz_generated.deepcopy.go index a0e6804c6..3bd7ee07b 100644 --- a/pkg/apis/policy/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/policy/v1alpha1/zz_generated.deepcopy.go @@ -1301,6 +1301,83 @@ func (in *TLSSpec) DeepCopy() *TLSSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficWarmup) DeepCopyInto(out *TrafficWarmup) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + out.Status = in.Status + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficWarmup. +func (in *TrafficWarmup) DeepCopy() *TrafficWarmup { + if in == nil { + return nil + } + out := new(TrafficWarmup) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *TrafficWarmup) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficWarmupList) DeepCopyInto(out *TrafficWarmupList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]TrafficWarmup, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficWarmupList. +func (in *TrafficWarmupList) DeepCopy() *TrafficWarmupList { + if in == nil { + return nil + } + out := new(TrafficWarmupList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *TrafficWarmupList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TrafficWarmupStatus) DeepCopyInto(out *TrafficWarmupStatus) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficWarmupStatus. +func (in *TrafficWarmupStatus) DeepCopy() *TrafficWarmupStatus { + if in == nil { + return nil + } + out := new(TrafficWarmupStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *UpstreamTrafficSetting) DeepCopyInto(out *UpstreamTrafficSetting) { *out = *in diff --git a/pkg/apis/policy/v1alpha1/zz_generated.register.go b/pkg/apis/policy/v1alpha1/zz_generated.register.go index a9f1ebb08..30b59fb10 100644 --- a/pkg/apis/policy/v1alpha1/zz_generated.register.go +++ b/pkg/apis/policy/v1alpha1/zz_generated.register.go @@ -72,6 +72,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &IsolationList{}, &Retry{}, &RetryList{}, + &TrafficWarmup{}, + &TrafficWarmupList{}, &UpstreamTrafficSetting{}, &UpstreamTrafficSettingList{}, ) diff --git a/pkg/catalog/mock_catalog_generated.go b/pkg/catalog/mock_catalog_generated.go index 45133b4ee..48c156f2f 100644 --- a/pkg/catalog/mock_catalog_generated.go +++ b/pkg/catalog/mock_catalog_generated.go @@ -7,6 +7,7 @@ package catalog import ( reflect "reflect" + v1alpha3 "github.com/flomesh-io/fsm/pkg/apis/config/v1alpha3" v1alpha1 "github.com/flomesh-io/fsm/pkg/apis/policy/v1alpha1" endpoint "github.com/flomesh-io/fsm/pkg/endpoint" identity "github.com/flomesh-io/fsm/pkg/identity" @@ -243,6 +244,20 @@ func (mr *MockMeshCatalogerMockRecorder) GetRetryPolicy(arg0, arg1 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRetryPolicy", reflect.TypeOf((*MockMeshCataloger)(nil).GetRetryPolicy), arg0, arg1) } +// GetTrafficWarmupPolicy mocks base method. +func (m *MockMeshCataloger) GetTrafficWarmupPolicy(arg0 service.MeshService) *v1alpha3.TrafficWarmupSpec { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTrafficWarmupPolicy", arg0) + ret0, _ := ret[0].(*v1alpha3.TrafficWarmupSpec) + return ret0 +} + +// GetTrafficWarmupPolicy indicates an expected call of GetTrafficWarmupPolicy. +func (mr *MockMeshCatalogerMockRecorder) GetTrafficWarmupPolicy(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTrafficWarmupPolicy", reflect.TypeOf((*MockMeshCataloger)(nil).GetTrafficWarmupPolicy), arg0) +} + // ListAllowedUpstreamEndpointsForService mocks base method. func (m *MockMeshCataloger) ListAllowedUpstreamEndpointsForService(arg0 identity.ServiceIdentity, arg1 service.MeshService) []endpoint.Endpoint { m.ctrl.T.Helper() diff --git a/pkg/catalog/trafficwarmup.go b/pkg/catalog/trafficwarmup.go new file mode 100644 index 000000000..3032a2da6 --- /dev/null +++ b/pkg/catalog/trafficwarmup.go @@ -0,0 +1,81 @@ +package catalog + +import ( + "strconv" + "strings" + "time" + + configv1alpha3 "github.com/flomesh-io/fsm/pkg/apis/config/v1alpha3" + "github.com/flomesh-io/fsm/pkg/constants" + + "github.com/flomesh-io/fsm/pkg/service" +) + +func (mc *MeshCatalog) GetTrafficWarmupPolicy(svc service.MeshService) *configv1alpha3.TrafficWarmupSpec { + if !mc.configurator.GetFeatureFlags().EnableTrafficWarmupPolicy { + return nil + } + + if svcWarmupPolicy := mc.policyController.GetTrafficWarmupPolicy(svc); svcWarmupPolicy != nil && svcWarmupPolicy.Enable { + return svcWarmupPolicy + } + + if ns := mc.kubeController.GetNamespace(svc.Namespace); ns != nil { + if enableAnno, exists := ns.Annotations[constants.TrafficWarmupEnableAnnotation]; exists { + enabled := false + switch strings.ToLower(enableAnno) { + case "enabled", "yes", "true": + enabled = true + case "disabled", "no", "false": + enabled = false + default: + log.Error().Msgf("invalid traffic warmup enable annotation, namespace:%s value:%s", svc.Namespace, enableAnno) + } + + if enabled { + nsWarmupPolicy := new(configv1alpha3.TrafficWarmupSpec) + if durationAnno, exists := ns.Annotations[constants.TrafficWarmupDurationAnnotation]; exists { + if duration, err := time.ParseDuration(durationAnno); err == nil { + nsWarmupPolicy.Duration.Duration = duration + } else { + log.Error().Err(err).Msgf("invalid traffic warmup duration annotation, namespace:%s value:%s", svc.Namespace, durationAnno) + return nil + } + } else { + nsWarmupPolicy.Duration.Duration = time.Second * 90 + } + if minWeightAnno, exists := ns.Annotations[constants.TrafficWarmupMinWeightAnnotation]; exists { + if minWeight, err := strconv.ParseUint(minWeightAnno, 10, 32); err == nil { + nsWarmupPolicy.MinWeight = &minWeight + } else { + log.Error().Err(err).Msgf("invalid traffic warmup minweight annotation, namespace:%s value:%s", svc.Namespace, minWeightAnno) + return nil + } + } + if maxWeightAnno, exists := ns.Annotations[constants.TrafficWarmupMaxWeightAnnotation]; exists { + if maxWeight, err := strconv.ParseUint(maxWeightAnno, 10, 32); err == nil { + nsWarmupPolicy.MaxWeight = &maxWeight + } else { + log.Error().Err(err).Msgf("invalid traffic warmup maxweight annotation, namespace:%s value:%s", svc.Namespace, maxWeightAnno) + return nil + } + } + if aggressionAnno, exists := ns.Annotations[constants.TrafficWarmupAggressionAnnotation]; exists { + if aggression, err := strconv.ParseFloat(aggressionAnno, 64); err == nil { + nsWarmupPolicy.Aggression = &aggression + } else { + log.Error().Err(err).Msgf("invalid traffic warmup aggression annotation, namespace:%s value:%s", svc.Namespace, aggressionAnno) + return nil + } + } + return nsWarmupPolicy + } + } + } + + if globalWarmupPolicy := mc.configurator.GetMeshConfig().Spec.Warmup; globalWarmupPolicy.Enable { + return &globalWarmupPolicy + } + + return nil +} diff --git a/pkg/catalog/types.go b/pkg/catalog/types.go index c484416cc..731692116 100644 --- a/pkg/catalog/types.go +++ b/pkg/catalog/types.go @@ -7,6 +7,7 @@ package catalog import ( corev1 "k8s.io/api/core/v1" + configv1alpha3 "github.com/flomesh-io/fsm/pkg/apis/config/v1alpha3" "github.com/flomesh-io/fsm/pkg/apis/policy/v1alpha1" "github.com/flomesh-io/fsm/pkg/certificate" "github.com/flomesh-io/fsm/pkg/configurator" @@ -101,6 +102,8 @@ type MeshCataloger interface { // GetRetryPolicy returns the RetryPolicySpec for the given downstream identity and upstream service GetRetryPolicy(downstreamIdentity identity.ServiceIdentity, upstreamSvc service.MeshService) *v1alpha1.RetryPolicySpec + GetTrafficWarmupPolicy(svc service.MeshService) *configv1alpha3.TrafficWarmupSpec + // GetExportTrafficPolicy returns the export policy for the given mesh service GetExportTrafficPolicy(svc service.MeshService) (*trafficpolicy.ServiceExportTrafficPolicy, error) diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index e9c5adb6b..4ba5bcc54 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -234,6 +234,12 @@ const ( // ServiceExclusionAnnotation is the annotation used for service exclusion ServiceExclusionAnnotation = "flomesh.io/service-exclusion" + + TrafficWarmupEnableAnnotation = "flomesh.io/traffic-warmup-enable" + TrafficWarmupDurationAnnotation = "flomesh.io/traffic-warmup-duration" + TrafficWarmupMinWeightAnnotation = "flomesh.io/traffic-warmup-minweight" + TrafficWarmupMaxWeightAnnotation = "flomesh.io/traffic-warmup-maxweight" + TrafficWarmupAggressionAnnotation = "flomesh.io/traffic-warmup-aggression" ) // Annotations and labels used by the MeshRootCertificate diff --git a/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/fake/fake_policy_client.go b/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/fake/fake_policy_client.go index 78d2e3f2a..9c5f273c6 100644 --- a/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/fake/fake_policy_client.go +++ b/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/fake/fake_policy_client.go @@ -53,6 +53,10 @@ func (c *FakePolicyV1alpha1) Retries(namespace string) v1alpha1.RetryInterface { return &FakeRetries{c, namespace} } +func (c *FakePolicyV1alpha1) TrafficWarmups(namespace string) v1alpha1.TrafficWarmupInterface { + return &FakeTrafficWarmups{c, namespace} +} + func (c *FakePolicyV1alpha1) UpstreamTrafficSettings(namespace string) v1alpha1.UpstreamTrafficSettingInterface { return &FakeUpstreamTrafficSettings{c, namespace} } diff --git a/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/fake/fake_trafficwarmup.go b/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/fake/fake_trafficwarmup.go new file mode 100644 index 000000000..4f1fc73ff --- /dev/null +++ b/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/fake/fake_trafficwarmup.go @@ -0,0 +1,144 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + v1alpha1 "github.com/flomesh-io/fsm/pkg/apis/policy/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeTrafficWarmups implements TrafficWarmupInterface +type FakeTrafficWarmups struct { + Fake *FakePolicyV1alpha1 + ns string +} + +var trafficwarmupsResource = v1alpha1.SchemeGroupVersion.WithResource("trafficwarmups") + +var trafficwarmupsKind = v1alpha1.SchemeGroupVersion.WithKind("TrafficWarmup") + +// Get takes name of the trafficWarmup, and returns the corresponding trafficWarmup object, and an error if there is any. +func (c *FakeTrafficWarmups) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.TrafficWarmup, err error) { + emptyResult := &v1alpha1.TrafficWarmup{} + obj, err := c.Fake. + Invokes(testing.NewGetActionWithOptions(trafficwarmupsResource, c.ns, name, options), emptyResult) + + if obj == nil { + return emptyResult, err + } + return obj.(*v1alpha1.TrafficWarmup), err +} + +// List takes label and field selectors, and returns the list of TrafficWarmups that match those selectors. +func (c *FakeTrafficWarmups) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.TrafficWarmupList, err error) { + emptyResult := &v1alpha1.TrafficWarmupList{} + obj, err := c.Fake. + Invokes(testing.NewListActionWithOptions(trafficwarmupsResource, trafficwarmupsKind, c.ns, opts), emptyResult) + + if obj == nil { + return emptyResult, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.TrafficWarmupList{ListMeta: obj.(*v1alpha1.TrafficWarmupList).ListMeta} + for _, item := range obj.(*v1alpha1.TrafficWarmupList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested trafficWarmups. +func (c *FakeTrafficWarmups) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchActionWithOptions(trafficwarmupsResource, c.ns, opts)) + +} + +// Create takes the representation of a trafficWarmup and creates it. Returns the server's representation of the trafficWarmup, and an error, if there is any. +func (c *FakeTrafficWarmups) Create(ctx context.Context, trafficWarmup *v1alpha1.TrafficWarmup, opts v1.CreateOptions) (result *v1alpha1.TrafficWarmup, err error) { + emptyResult := &v1alpha1.TrafficWarmup{} + obj, err := c.Fake. + Invokes(testing.NewCreateActionWithOptions(trafficwarmupsResource, c.ns, trafficWarmup, opts), emptyResult) + + if obj == nil { + return emptyResult, err + } + return obj.(*v1alpha1.TrafficWarmup), err +} + +// Update takes the representation of a trafficWarmup and updates it. Returns the server's representation of the trafficWarmup, and an error, if there is any. +func (c *FakeTrafficWarmups) Update(ctx context.Context, trafficWarmup *v1alpha1.TrafficWarmup, opts v1.UpdateOptions) (result *v1alpha1.TrafficWarmup, err error) { + emptyResult := &v1alpha1.TrafficWarmup{} + obj, err := c.Fake. + Invokes(testing.NewUpdateActionWithOptions(trafficwarmupsResource, c.ns, trafficWarmup, opts), emptyResult) + + if obj == nil { + return emptyResult, err + } + return obj.(*v1alpha1.TrafficWarmup), err +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeTrafficWarmups) UpdateStatus(ctx context.Context, trafficWarmup *v1alpha1.TrafficWarmup, opts v1.UpdateOptions) (result *v1alpha1.TrafficWarmup, err error) { + emptyResult := &v1alpha1.TrafficWarmup{} + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceActionWithOptions(trafficwarmupsResource, "status", c.ns, trafficWarmup, opts), emptyResult) + + if obj == nil { + return emptyResult, err + } + return obj.(*v1alpha1.TrafficWarmup), err +} + +// Delete takes name of the trafficWarmup and deletes it. Returns an error if one occurs. +func (c *FakeTrafficWarmups) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteActionWithOptions(trafficwarmupsResource, c.ns, name, opts), &v1alpha1.TrafficWarmup{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeTrafficWarmups) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + action := testing.NewDeleteCollectionActionWithOptions(trafficwarmupsResource, c.ns, opts, listOpts) + + _, err := c.Fake.Invokes(action, &v1alpha1.TrafficWarmupList{}) + return err +} + +// Patch applies the patch and returns the patched trafficWarmup. +func (c *FakeTrafficWarmups) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.TrafficWarmup, err error) { + emptyResult := &v1alpha1.TrafficWarmup{} + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceActionWithOptions(trafficwarmupsResource, c.ns, name, pt, data, opts, subresources...), emptyResult) + + if obj == nil { + return emptyResult, err + } + return obj.(*v1alpha1.TrafficWarmup), err +} diff --git a/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/generated_expansion.go b/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/generated_expansion.go index 18877650b..db006b865 100644 --- a/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/generated_expansion.go +++ b/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/generated_expansion.go @@ -29,4 +29,6 @@ type IsolationExpansion interface{} type RetryExpansion interface{} +type TrafficWarmupExpansion interface{} + type UpstreamTrafficSettingExpansion interface{} diff --git a/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/policy_client.go b/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/policy_client.go index b5de4d0f0..7f9341002 100644 --- a/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/policy_client.go +++ b/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/policy_client.go @@ -32,6 +32,7 @@ type PolicyV1alpha1Interface interface { IngressBackendsGetter IsolationsGetter RetriesGetter + TrafficWarmupsGetter UpstreamTrafficSettingsGetter } @@ -68,6 +69,10 @@ func (c *PolicyV1alpha1Client) Retries(namespace string) RetryInterface { return newRetries(c, namespace) } +func (c *PolicyV1alpha1Client) TrafficWarmups(namespace string) TrafficWarmupInterface { + return newTrafficWarmups(c, namespace) +} + func (c *PolicyV1alpha1Client) UpstreamTrafficSettings(namespace string) UpstreamTrafficSettingInterface { return newUpstreamTrafficSettings(c, namespace) } diff --git a/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/trafficwarmup.go b/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/trafficwarmup.go new file mode 100644 index 000000000..da5bbab6f --- /dev/null +++ b/pkg/gen/client/policy/clientset/versioned/typed/policy/v1alpha1/trafficwarmup.go @@ -0,0 +1,66 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + + v1alpha1 "github.com/flomesh-io/fsm/pkg/apis/policy/v1alpha1" + scheme "github.com/flomesh-io/fsm/pkg/gen/client/policy/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + gentype "k8s.io/client-go/gentype" +) + +// TrafficWarmupsGetter has a method to return a TrafficWarmupInterface. +// A group's client should implement this interface. +type TrafficWarmupsGetter interface { + TrafficWarmups(namespace string) TrafficWarmupInterface +} + +// TrafficWarmupInterface has methods to work with TrafficWarmup resources. +type TrafficWarmupInterface interface { + Create(ctx context.Context, trafficWarmup *v1alpha1.TrafficWarmup, opts v1.CreateOptions) (*v1alpha1.TrafficWarmup, error) + Update(ctx context.Context, trafficWarmup *v1alpha1.TrafficWarmup, opts v1.UpdateOptions) (*v1alpha1.TrafficWarmup, error) + // Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). + UpdateStatus(ctx context.Context, trafficWarmup *v1alpha1.TrafficWarmup, opts v1.UpdateOptions) (*v1alpha1.TrafficWarmup, error) + Delete(ctx context.Context, name string, opts v1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error + Get(ctx context.Context, name string, opts v1.GetOptions) (*v1alpha1.TrafficWarmup, error) + List(ctx context.Context, opts v1.ListOptions) (*v1alpha1.TrafficWarmupList, error) + Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.TrafficWarmup, err error) + TrafficWarmupExpansion +} + +// trafficWarmups implements TrafficWarmupInterface +type trafficWarmups struct { + *gentype.ClientWithList[*v1alpha1.TrafficWarmup, *v1alpha1.TrafficWarmupList] +} + +// newTrafficWarmups returns a TrafficWarmups +func newTrafficWarmups(c *PolicyV1alpha1Client, namespace string) *trafficWarmups { + return &trafficWarmups{ + gentype.NewClientWithList[*v1alpha1.TrafficWarmup, *v1alpha1.TrafficWarmupList]( + "trafficwarmups", + c.RESTClient(), + scheme.ParameterCodec, + namespace, + func() *v1alpha1.TrafficWarmup { return &v1alpha1.TrafficWarmup{} }, + func() *v1alpha1.TrafficWarmupList { return &v1alpha1.TrafficWarmupList{} }), + } +} diff --git a/pkg/gen/client/policy/informers/externalversions/generic.go b/pkg/gen/client/policy/informers/externalversions/generic.go index cd1c40564..0613e90ca 100644 --- a/pkg/gen/client/policy/informers/externalversions/generic.go +++ b/pkg/gen/client/policy/informers/externalversions/generic.go @@ -64,6 +64,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Policy().V1alpha1().Isolations().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("retries"): return &genericInformer{resource: resource.GroupResource(), informer: f.Policy().V1alpha1().Retries().Informer()}, nil + case v1alpha1.SchemeGroupVersion.WithResource("trafficwarmups"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Policy().V1alpha1().TrafficWarmups().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("upstreamtrafficsettings"): return &genericInformer{resource: resource.GroupResource(), informer: f.Policy().V1alpha1().UpstreamTrafficSettings().Informer()}, nil diff --git a/pkg/gen/client/policy/informers/externalversions/policy/v1alpha1/interface.go b/pkg/gen/client/policy/informers/externalversions/policy/v1alpha1/interface.go index d2855c4a9..dc7fcc7fc 100644 --- a/pkg/gen/client/policy/informers/externalversions/policy/v1alpha1/interface.go +++ b/pkg/gen/client/policy/informers/externalversions/policy/v1alpha1/interface.go @@ -35,6 +35,8 @@ type Interface interface { Isolations() IsolationInformer // Retries returns a RetryInformer. Retries() RetryInformer + // TrafficWarmups returns a TrafficWarmupInformer. + TrafficWarmups() TrafficWarmupInformer // UpstreamTrafficSettings returns a UpstreamTrafficSettingInformer. UpstreamTrafficSettings() UpstreamTrafficSettingInformer } @@ -85,6 +87,11 @@ func (v *version) Retries() RetryInformer { return &retryInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } +// TrafficWarmups returns a TrafficWarmupInformer. +func (v *version) TrafficWarmups() TrafficWarmupInformer { + return &trafficWarmupInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // UpstreamTrafficSettings returns a UpstreamTrafficSettingInformer. func (v *version) UpstreamTrafficSettings() UpstreamTrafficSettingInformer { return &upstreamTrafficSettingInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/pkg/gen/client/policy/informers/externalversions/policy/v1alpha1/trafficwarmup.go b/pkg/gen/client/policy/informers/externalversions/policy/v1alpha1/trafficwarmup.go new file mode 100644 index 000000000..645ffd3a7 --- /dev/null +++ b/pkg/gen/client/policy/informers/externalversions/policy/v1alpha1/trafficwarmup.go @@ -0,0 +1,87 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + time "time" + + policyv1alpha1 "github.com/flomesh-io/fsm/pkg/apis/policy/v1alpha1" + versioned "github.com/flomesh-io/fsm/pkg/gen/client/policy/clientset/versioned" + internalinterfaces "github.com/flomesh-io/fsm/pkg/gen/client/policy/informers/externalversions/internalinterfaces" + v1alpha1 "github.com/flomesh-io/fsm/pkg/gen/client/policy/listers/policy/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// TrafficWarmupInformer provides access to a shared informer and lister for +// TrafficWarmups. +type TrafficWarmupInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.TrafficWarmupLister +} + +type trafficWarmupInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewTrafficWarmupInformer constructs a new informer for TrafficWarmup type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewTrafficWarmupInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredTrafficWarmupInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredTrafficWarmupInformer constructs a new informer for TrafficWarmup type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredTrafficWarmupInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.PolicyV1alpha1().TrafficWarmups(namespace).List(context.TODO(), options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.PolicyV1alpha1().TrafficWarmups(namespace).Watch(context.TODO(), options) + }, + }, + &policyv1alpha1.TrafficWarmup{}, + resyncPeriod, + indexers, + ) +} + +func (f *trafficWarmupInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredTrafficWarmupInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *trafficWarmupInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&policyv1alpha1.TrafficWarmup{}, f.defaultInformer) +} + +func (f *trafficWarmupInformer) Lister() v1alpha1.TrafficWarmupLister { + return v1alpha1.NewTrafficWarmupLister(f.Informer().GetIndexer()) +} diff --git a/pkg/gen/client/policy/listers/policy/v1alpha1/expansion_generated.go b/pkg/gen/client/policy/listers/policy/v1alpha1/expansion_generated.go index cbd9ccf1a..d61e620f9 100644 --- a/pkg/gen/client/policy/listers/policy/v1alpha1/expansion_generated.go +++ b/pkg/gen/client/policy/listers/policy/v1alpha1/expansion_generated.go @@ -71,6 +71,14 @@ type RetryListerExpansion interface{} // RetryNamespaceLister. type RetryNamespaceListerExpansion interface{} +// TrafficWarmupListerExpansion allows custom methods to be added to +// TrafficWarmupLister. +type TrafficWarmupListerExpansion interface{} + +// TrafficWarmupNamespaceListerExpansion allows custom methods to be added to +// TrafficWarmupNamespaceLister. +type TrafficWarmupNamespaceListerExpansion interface{} + // UpstreamTrafficSettingListerExpansion allows custom methods to be added to // UpstreamTrafficSettingLister. type UpstreamTrafficSettingListerExpansion interface{} diff --git a/pkg/gen/client/policy/listers/policy/v1alpha1/trafficwarmup.go b/pkg/gen/client/policy/listers/policy/v1alpha1/trafficwarmup.go new file mode 100644 index 000000000..5103dd3b8 --- /dev/null +++ b/pkg/gen/client/policy/listers/policy/v1alpha1/trafficwarmup.go @@ -0,0 +1,67 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/flomesh-io/fsm/pkg/apis/policy/v1alpha1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/listers" + "k8s.io/client-go/tools/cache" +) + +// TrafficWarmupLister helps list TrafficWarmups. +// All objects returned here must be treated as read-only. +type TrafficWarmupLister interface { + // List lists all TrafficWarmups in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.TrafficWarmup, err error) + // TrafficWarmups returns an object that can list and get TrafficWarmups. + TrafficWarmups(namespace string) TrafficWarmupNamespaceLister + TrafficWarmupListerExpansion +} + +// trafficWarmupLister implements the TrafficWarmupLister interface. +type trafficWarmupLister struct { + listers.ResourceIndexer[*v1alpha1.TrafficWarmup] +} + +// NewTrafficWarmupLister returns a new TrafficWarmupLister. +func NewTrafficWarmupLister(indexer cache.Indexer) TrafficWarmupLister { + return &trafficWarmupLister{listers.New[*v1alpha1.TrafficWarmup](indexer, v1alpha1.Resource("trafficwarmup"))} +} + +// TrafficWarmups returns an object that can list and get TrafficWarmups. +func (s *trafficWarmupLister) TrafficWarmups(namespace string) TrafficWarmupNamespaceLister { + return trafficWarmupNamespaceLister{listers.NewNamespaced[*v1alpha1.TrafficWarmup](s.ResourceIndexer, namespace)} +} + +// TrafficWarmupNamespaceLister helps list and get TrafficWarmups. +// All objects returned here must be treated as read-only. +type TrafficWarmupNamespaceLister interface { + // List lists all TrafficWarmups in the indexer for a given namespace. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.TrafficWarmup, err error) + // Get retrieves the TrafficWarmup from the indexer for a given namespace and name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1alpha1.TrafficWarmup, error) + TrafficWarmupNamespaceListerExpansion +} + +// trafficWarmupNamespaceLister implements the TrafficWarmupNamespaceLister +// interface. +type trafficWarmupNamespaceLister struct { + listers.ResourceIndexer[*v1alpha1.TrafficWarmup] +} diff --git a/pkg/k8s/informers/informers.go b/pkg/k8s/informers/informers.go index 0cd08aac5..4e324ea06 100644 --- a/pkg/k8s/informers/informers.go +++ b/pkg/k8s/informers/informers.go @@ -168,6 +168,7 @@ func WithPolicyClient(policyClient policyClientset.Interface) InformerCollection ic.informers[InformerKeyRetry] = informerFactory.Policy().V1alpha1().Retries().Informer() ic.informers[InformerKeyAccessControl] = informerFactory.Policy().V1alpha1().AccessControls().Informer() ic.informers[InformerKeyAccessCert] = informerFactory.Policy().V1alpha1().AccessCerts().Informer() + ic.informers[InformerKeyTrafficWarmup] = informerFactory.Policy().V1alpha1().TrafficWarmups().Informer() } } diff --git a/pkg/k8s/informers/types.go b/pkg/k8s/informers/types.go index 23733aa4d..e8e1034c2 100644 --- a/pkg/k8s/informers/types.go +++ b/pkg/k8s/informers/types.go @@ -60,6 +60,8 @@ const ( InformerKeyAccessControl InformerKey = "AccessControl" // InformerKeyAccessCert is the InformerKey for a AccessCert informer InformerKeyAccessCert InformerKey = "AccessCert" + // InformerKeyTrafficWarmup is the InformerKey for a TrafficWarmup informer + InformerKeyTrafficWarmup InformerKey = "TrafficWarmup" // InformerKeyServiceImport is the InformerKey for a ServiceImport informer InformerKeyServiceImport InformerKey = "ServiceImport" // InformerKeyServiceExport is the InformerKey for a ServiceExport informer diff --git a/pkg/messaging/broker.go b/pkg/messaging/broker.go index e0642c4c2..45f7a4802 100644 --- a/pkg/messaging/broker.go +++ b/pkg/messaging/broker.go @@ -1083,6 +1083,8 @@ func getProxyUpdateEvent(msg events.PubSubMessage) *proxyUpdateEvent { announcements.IsolationPolicyAdded, announcements.IsolationPolicyDeleted, announcements.IsolationPolicyUpdated, // Retry event announcements.RetryPolicyAdded, announcements.RetryPolicyDeleted, announcements.RetryPolicyUpdated, + // TrafficWarmup event + announcements.TrafficWarmupAdded, announcements.TrafficWarmupDeleted, announcements.TrafficWarmupUpdated, // UpstreamTrafficSetting event announcements.UpstreamTrafficSettingAdded, announcements.UpstreamTrafficSettingDeleted, announcements.UpstreamTrafficSettingUpdated, // diff --git a/pkg/policy/client.go b/pkg/policy/client.go index a1f434b01..d3e2fcbd2 100644 --- a/pkg/policy/client.go +++ b/pkg/policy/client.go @@ -7,12 +7,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - policyV1alpha1 "github.com/flomesh-io/fsm/pkg/apis/policy/v1alpha1" - "github.com/flomesh-io/fsm/pkg/k8s/informers" - "github.com/flomesh-io/fsm/pkg/announcements" + configv1alpha3 "github.com/flomesh-io/fsm/pkg/apis/config/v1alpha3" + policyv1alpha1 "github.com/flomesh-io/fsm/pkg/apis/policy/v1alpha1" "github.com/flomesh-io/fsm/pkg/identity" "github.com/flomesh-io/fsm/pkg/k8s" + "github.com/flomesh-io/fsm/pkg/k8s/informers" "github.com/flomesh-io/fsm/pkg/messaging" "github.com/flomesh-io/fsm/pkg/service" ) @@ -87,6 +87,13 @@ func NewPolicyController(informerCollection *informers.InformerCollection, kubeC } client.informers.AddEventHandler(informers.InformerKeyRetry, k8s.GetEventHandlerFuncs(shouldObserve, retryEventTypes, msgBroker)) + warmupEventTypes := k8s.EventTypes{ + Add: announcements.TrafficWarmupAdded, + Update: announcements.TrafficWarmupUpdated, + Delete: announcements.TrafficWarmupDeleted, + } + client.informers.AddEventHandler(informers.InformerKeyTrafficWarmup, k8s.GetEventHandlerFuncs(shouldObserve, warmupEventTypes, msgBroker)) + upstreamTrafficSettingEventTypes := k8s.EventTypes{ Add: announcements.UpstreamTrafficSettingAdded, Update: announcements.UpstreamTrafficSettingUpdated, @@ -98,10 +105,10 @@ func NewPolicyController(informerCollection *informers.InformerCollection, kubeC } // ListIsolationPolicies returns the Isolation policies -func (c *Client) ListIsolationPolicies() []*policyV1alpha1.Isolation { - var isolations []*policyV1alpha1.Isolation +func (c *Client) ListIsolationPolicies() []*policyv1alpha1.Isolation { + var isolations []*policyv1alpha1.Isolation for _, isolationIface := range c.informers.List(informers.InformerKeyIsolation) { - isolation := isolationIface.(*policyV1alpha1.Isolation) + isolation := isolationIface.(*policyv1alpha1.Isolation) isolations = append(isolations, isolation) } @@ -109,10 +116,10 @@ func (c *Client) ListIsolationPolicies() []*policyV1alpha1.Isolation { } // ListEgressGateways lists egress gateways -func (c *Client) ListEgressGateways() []*policyV1alpha1.EgressGateway { - var egressGateways []*policyV1alpha1.EgressGateway +func (c *Client) ListEgressGateways() []*policyv1alpha1.EgressGateway { + var egressGateways []*policyv1alpha1.EgressGateway for _, egressGatewayIface := range c.informers.List(informers.InformerKeyEgressGateway) { - egressGateway := egressGatewayIface.(*policyV1alpha1.EgressGateway) + egressGateway := egressGatewayIface.(*policyv1alpha1.EgressGateway) egressGateways = append(egressGateways, egressGateway) } @@ -120,11 +127,11 @@ func (c *Client) ListEgressGateways() []*policyV1alpha1.EgressGateway { } // ListEgressPoliciesForSourceIdentity lists the Egress policies for the given source identity based on service accounts -func (c *Client) ListEgressPoliciesForSourceIdentity(source identity.K8sServiceAccount) []*policyV1alpha1.Egress { - var policies []*policyV1alpha1.Egress +func (c *Client) ListEgressPoliciesForSourceIdentity(source identity.K8sServiceAccount) []*policyv1alpha1.Egress { + var policies []*policyv1alpha1.Egress for _, egressIface := range c.informers.List(informers.InformerKeyEgress) { - egressPolicy := egressIface.(*policyV1alpha1.Egress) + egressPolicy := egressIface.(*policyv1alpha1.Egress) if !c.kubeController.IsMonitoredNamespace(egressPolicy.Namespace) { continue @@ -147,9 +154,9 @@ func (c *Client) GetEgressSourceSecret(secretReference corev1.SecretReference) ( } // GetIngressBackendPolicy returns the IngressBackend policy for the given backend MeshService -func (c *Client) GetIngressBackendPolicy(svc service.MeshService) *policyV1alpha1.IngressBackend { +func (c *Client) GetIngressBackendPolicy(svc service.MeshService) *policyv1alpha1.IngressBackend { for _, ingressBackendIface := range c.informers.List(informers.InformerKeyIngressBackend) { - ingressBackend := ingressBackendIface.(*policyV1alpha1.IngressBackend) + ingressBackend := ingressBackendIface.(*policyv1alpha1.IngressBackend) if ingressBackend.Namespace != svc.Namespace { continue @@ -170,11 +177,11 @@ func (c *Client) GetIngressBackendPolicy(svc service.MeshService) *policyV1alpha } // ListRetryPolicies returns the retry policies for the given source identity based on service accounts. -func (c *Client) ListRetryPolicies(source identity.K8sServiceAccount) []*policyV1alpha1.Retry { - var retries []*policyV1alpha1.Retry +func (c *Client) ListRetryPolicies(source identity.K8sServiceAccount) []*policyv1alpha1.Retry { + var retries []*policyv1alpha1.Retry for _, retryInterface := range c.informers.List(informers.InformerKeyRetry) { - retry := retryInterface.(*policyV1alpha1.Retry) + retry := retryInterface.(*policyv1alpha1.Retry) if retry.Spec.Source.Kind == kindSvcAccount && retry.Spec.Source.Name == source.Name && retry.Spec.Source.Namespace == source.Namespace { retries = append(retries, retry) } @@ -184,10 +191,10 @@ func (c *Client) ListRetryPolicies(source identity.K8sServiceAccount) []*policyV } // GetAccessControlPolicy returns the AccessControl policy for the given backend MeshService -func (c *Client) GetAccessControlPolicy(svc service.MeshService) *policyV1alpha1.AccessControl { +func (c *Client) GetAccessControlPolicy(svc service.MeshService) *policyv1alpha1.AccessControl { aclIfaces := c.informers.List(informers.InformerKeyAccessControl) for _, aclIface := range aclIfaces { - acl := aclIface.(*policyV1alpha1.AccessControl) + acl := aclIface.(*policyv1alpha1.AccessControl) if acl.Namespace != svc.Namespace { continue @@ -204,7 +211,7 @@ func (c *Client) GetAccessControlPolicy(svc service.MeshService) *policyV1alpha1 } } for _, aclIface := range aclIfaces { - acl := aclIface.(*policyV1alpha1.AccessControl) + acl := aclIface.(*policyv1alpha1.AccessControl) if len(acl.Spec.Backends) == 0 { return acl } @@ -212,8 +219,22 @@ func (c *Client) GetAccessControlPolicy(svc service.MeshService) *policyV1alpha1 return nil } +// GetTrafficWarmupPolicy returns the TrafficWarmup policy for the given backend MeshService +func (c *Client) GetTrafficWarmupPolicy(svc service.MeshService) *configv1alpha3.TrafficWarmupSpec { + warmupIf, exists, err := c.informers.GetByKey(informers.InformerKeyTrafficWarmup, svc.NamespacedKey()) + if exists && err == nil { + warmup := warmupIf.(*policyv1alpha1.TrafficWarmup) + if !c.kubeController.IsMonitoredNamespace(warmup.Namespace) { + log.Warn().Msgf("TrafficWarmup %s found, but belongs to a namespace that is not monitored, ignoring it", svc.NamespacedKey()) + return nil + } + return &warmup.Spec + } + return nil +} + // GetUpstreamTrafficSetting returns the UpstreamTrafficSetting resource that matches the given options -func (c *Client) GetUpstreamTrafficSetting(options UpstreamTrafficSettingGetOpt) *policyV1alpha1.UpstreamTrafficSetting { +func (c *Client) GetUpstreamTrafficSetting(options UpstreamTrafficSettingGetOpt) *policyv1alpha1.UpstreamTrafficSetting { if options.MeshService == nil && options.NamespacedName == nil && options.Host == "" { log.Error().Msgf("No option specified to get UpstreamTrafficSetting resource") return nil @@ -223,14 +244,14 @@ func (c *Client) GetUpstreamTrafficSetting(options UpstreamTrafficSettingGetOpt) // Filter by namespaced name resource, exists, err := c.informers.GetByKey(informers.InformerKeyUpstreamTrafficSetting, options.NamespacedName.String()) if exists && err == nil { - return resource.(*policyV1alpha1.UpstreamTrafficSetting) + return resource.(*policyv1alpha1.UpstreamTrafficSetting) } return nil } // Filter by MeshService for _, resource := range c.informers.List(informers.InformerKeyUpstreamTrafficSetting) { - upstreamTrafficSetting := resource.(*policyV1alpha1.UpstreamTrafficSetting) + upstreamTrafficSetting := resource.(*policyv1alpha1.UpstreamTrafficSetting) if upstreamTrafficSetting.Spec.Host == options.Host { return upstreamTrafficSetting diff --git a/pkg/policy/mock_client_generated.go b/pkg/policy/mock_client_generated.go index 705c9b78f..fe29ff45d 100644 --- a/pkg/policy/mock_client_generated.go +++ b/pkg/policy/mock_client_generated.go @@ -7,6 +7,7 @@ package policy import ( reflect "reflect" + v1alpha3 "github.com/flomesh-io/fsm/pkg/apis/config/v1alpha3" v1alpha1 "github.com/flomesh-io/fsm/pkg/apis/policy/v1alpha1" identity "github.com/flomesh-io/fsm/pkg/identity" service "github.com/flomesh-io/fsm/pkg/service" @@ -80,6 +81,20 @@ func (mr *MockControllerMockRecorder) GetIngressBackendPolicy(arg0 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetIngressBackendPolicy", reflect.TypeOf((*MockController)(nil).GetIngressBackendPolicy), arg0) } +// GetTrafficWarmupPolicy mocks base method. +func (m *MockController) GetTrafficWarmupPolicy(arg0 service.MeshService) *v1alpha3.TrafficWarmupSpec { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTrafficWarmupPolicy", arg0) + ret0, _ := ret[0].(*v1alpha3.TrafficWarmupSpec) + return ret0 +} + +// GetTrafficWarmupPolicy indicates an expected call of GetTrafficWarmupPolicy. +func (mr *MockControllerMockRecorder) GetTrafficWarmupPolicy(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTrafficWarmupPolicy", reflect.TypeOf((*MockController)(nil).GetTrafficWarmupPolicy), arg0) +} + // GetUpstreamTrafficSetting mocks base method. func (m *MockController) GetUpstreamTrafficSetting(arg0 UpstreamTrafficSettingGetOpt) *v1alpha1.UpstreamTrafficSetting { m.ctrl.T.Helper() diff --git a/pkg/policy/types.go b/pkg/policy/types.go index 06769eab5..c205a255b 100644 --- a/pkg/policy/types.go +++ b/pkg/policy/types.go @@ -6,6 +6,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" + configv1alpha3 "github.com/flomesh-io/fsm/pkg/apis/config/v1alpha3" policyv1alpha1 "github.com/flomesh-io/fsm/pkg/apis/policy/v1alpha1" "github.com/flomesh-io/fsm/pkg/k8s" "github.com/flomesh-io/fsm/pkg/k8s/informers" @@ -49,6 +50,9 @@ type Controller interface { // GetAccessControlPolicy returns the AccessControl policy for the given backend MeshService GetAccessControlPolicy(service.MeshService) *policyv1alpha1.AccessControl + // GetTrafficWarmupPolicy returns the TrafficWarmup policy for the given backend MeshService + GetTrafficWarmupPolicy(svc service.MeshService) *configv1alpha3.TrafficWarmupSpec + // GetUpstreamTrafficSetting returns the UpstreamTrafficSetting resource that matches the given options GetUpstreamTrafficSetting(UpstreamTrafficSettingGetOpt) *policyv1alpha1.UpstreamTrafficSetting } diff --git a/pkg/sidecar/v1/providers/pipy/repo/jobs.go b/pkg/sidecar/v1/providers/pipy/repo/jobs.go index 67c1c469e..400f8b9bf 100644 --- a/pkg/sidecar/v1/providers/pipy/repo/jobs.go +++ b/pkg/sidecar/v1/providers/pipy/repo/jobs.go @@ -121,9 +121,9 @@ func (job *PipyConfGeneratorJob) Run() { egress(cataloger, s, pipyConf, proxy, desiredSuffix) forward(cataloger, s, pipyConf, proxy) cloudConnector(cataloger, pipyConf, s.cfg, proxy) - balance(pipyConf) + warmUpping := balance(cataloger, pipyConf) reorder(pipyConf) - allowedEndpoints(pipyConf, s) + allowedEndpoints(pipyConf, s, warmUpping) dnsResolveDB(pipyConf, s.cfg) job.publishSidecarConf(s.repoClient, proxy, pipyConf) end := time.Now() @@ -134,19 +134,19 @@ func (job *PipyConfGeneratorJob) Run() { Msg("Codebase Recalculated") } -func allowedEndpoints(pipyConf *PipyConf, s *Server) { - ready := pipyConf.copyAllowedEndpoints(s.kubeController, s.proxyRegistry) - if !ready { +func allowedEndpoints(pipyConf *PipyConf, s *Server, warmUpping bool) { + if ready := pipyConf.copyAllowedEndpoints(s.kubeController, s.proxyRegistry); !ready || warmUpping { if s.retryProxiesJob != nil { s.retryProxiesJob() } } } -func balance(pipyConf *PipyConf) { +func balance(catalog catalog.MeshCataloger, pipyConf *PipyConf) (warmUpping bool) { pipyConf.rebalancedTargetClusters() - pipyConf.rebalancedOutboundClusters() + warmUpping = pipyConf.rebalancedOutboundClusters(catalog) pipyConf.rebalancedForwardClusters() + return } func reorder(pipyConf *PipyConf) { diff --git a/pkg/sidecar/v1/providers/pipy/repo/policy.go b/pkg/sidecar/v1/providers/pipy/repo/policy.go index d9d0e7345..b271a7ae3 100644 --- a/pkg/sidecar/v1/providers/pipy/repo/policy.go +++ b/pkg/sidecar/v1/providers/pipy/repo/policy.go @@ -2,20 +2,25 @@ package repo import ( "fmt" + "math" "reflect" "regexp" "sort" "strings" + "time" "github.com/mitchellh/hashstructure/v2" "k8s.io/apimachinery/pkg/runtime" + configv1alpha3 "github.com/flomesh-io/fsm/pkg/apis/config/v1alpha3" multiclusterv1alpha1 "github.com/flomesh-io/fsm/pkg/apis/multicluster/v1alpha1" policyv1alpha1 "github.com/flomesh-io/fsm/pkg/apis/policy/v1alpha1" + "github.com/flomesh-io/fsm/pkg/catalog" "github.com/flomesh-io/fsm/pkg/configurator" "github.com/flomesh-io/fsm/pkg/constants" "github.com/flomesh-io/fsm/pkg/identity" "github.com/flomesh-io/fsm/pkg/k8s" + "github.com/flomesh-io/fsm/pkg/service" "github.com/flomesh-io/fsm/pkg/sidecar/v1/providers/pipy/registry" "github.com/flomesh-io/fsm/pkg/utils/cidr" ) @@ -190,14 +195,17 @@ func (p *PipyConf) rebalancedTargetClusters() { } } -func (p *PipyConf) rebalancedOutboundClusters() { +func (p *PipyConf) rebalancedOutboundClusters(catalog catalog.MeshCataloger) (warmUpping bool) { if p.Outbound == nil { return } if p.Outbound.ClustersConfigs == nil || len(p.Outbound.ClustersConfigs) == 0 { return } - for _, clusterConfigs := range p.Outbound.ClustersConfigs { + + var uptimes map[string]int64 + + for clusterName, clusterConfigs := range p.Outbound.ClustersConfigs { weightedEndpoints := clusterConfigs.Endpoints if weightedEndpoints == nil || len(*weightedEndpoints) == 0 { continue @@ -209,7 +217,23 @@ func (p *PipyConf) rebalancedOutboundClusters() { break } } - for _, wze := range *weightedEndpoints { + + var warmupPolicy *configv1alpha3.TrafficWarmupSpec + if meshSvc, ok := clusterName.MeshService(); ok { + if warmupPolicy = catalog.GetTrafficWarmupPolicy(*meshSvc); warmupPolicy != nil { + if uptimes == nil { + uptimes = make(map[string]int64) + pods := catalog.GetKubeController().ListPods() + for _, pod := range pods { + for _, podIP := range pod.Status.PodIPs { + uptimes[podIP.IP] = pod.CreationTimestamp.Unix() + } + } + } + } + } + + for hostport, wze := range *weightedEndpoints { if len(wze.Cluster) > 0 { if multiclusterv1alpha1.FailOverLbType == multiclusterv1alpha1.LoadBalancerType(wze.LBType) { if hasLocalEndpoints { @@ -227,8 +251,20 @@ func (p *PipyConf) rebalancedOutboundClusters() { wze.Weight = constants.ClusterWeightAcceptAll } } + if warmupPolicy != nil && len(uptimes) > 0 { + if host, ok := hostport.Host(); ok { + if startTimestampSeconds, exists := uptimes[host]; exists { + weight := warmupPolicy.Weight(startTimestampSeconds, time.Now().Unix()) + wze.Weight = Weight(math.Min(weight, float64(wze.Weight))) + if !warmUpping { + warmUpping = uint64(weight) < *warmupPolicy.MaxWeight + } + } + } + } } } + return } func (p *PipyConf) rebalancedForwardClusters() { @@ -1145,3 +1181,26 @@ func (ps *PluginSlice) Less(i, j int) bool { a, b := (*ps)[i], (*ps)[j] return a.Priority > b.Priority } + +func (c ClusterName) MeshService() (*service.MeshService, bool) { + clusterName := string(c) + clusterNameSegs := strings.Split(clusterName, `/`) + if len(clusterNameSegs) < 2 { + return nil, false + } + namespace := clusterNameSegs[0] + nameSegs := strings.Split(clusterNameSegs[1], `|`) + if len(nameSegs) < 2 { + return nil, false + } + return &service.MeshService{Namespace: namespace, Name: nameSegs[0]}, true +} + +func (c HTTPHostPort) Host() (string, bool) { + hostPort := string(c) + segs := strings.Split(hostPort, `:`) + if len(segs) < 2 { + return "", false + } + return segs[0], true +}