Skip to content

Commit

Permalink
Add ShowPreciseParallelism filed (#607)
Browse files Browse the repository at this point in the history
Co-authored-by: Rui Fu <[email protected]>
  • Loading branch information
jiangpengcheng and freeznet authored Mar 15, 2023
1 parent 5921b50 commit 405525e
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 4 deletions.
5 changes: 5 additions & 0 deletions api/compute/v1alpha1/function_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ type FunctionSpec struct {
ClusterName string `json:"clusterName,omitempty"`
// +kubebuilder:validation:Minimum=0
Replicas *int32 `json:"replicas,omitempty"`
// Whether show the precise parallelism, if true, the `Parallelism` will be equal to the `Replicas`,
// in such case, update the `Replicas` will cause all pods being recreated since the command of pod is updated.
// else, the `Parallelism` will be 1, default to false.
// It just affects the result of context.getNumInstances, there will be only 1 process and 1 thread in each pod in any cases.
ShowPreciseParallelism bool `json:"showPreciseParallelism,omitempty"`
// +kubebuilder:validation:Minimum=0
// +kubebuilder:default=1
MinReplicas *int32 `json:"minReplicas,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions api/compute/v1alpha1/sink_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type SinkSpec struct {
SinkType string `json:"sinkType,omitempty"` // refer to `--sink-type` as builtin connector
// +kubebuilder:validation:Minimum=0
Replicas *int32 `json:"replicas,omitempty"`
// Whether show the precise parallelism, if true, the `Parallelism` will be equal to the `Replicas`,
// in such case, update the `Replicas` will cause all pods being recreated since the command of pod is updated.
// else, the `Parallelism` will be 1, default to false.
// It just affects the result of context.getNumInstances, there will be only 1 process and 1 thread in each pod in any cases.
ShowPreciseParallelism bool `json:"showPreciseParallelism,omitempty"`
// +kubebuilder:validation:Minimum=0
// +kubebuilder:default=1
MinReplicas *int32 `json:"minReplicas,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions api/compute/v1alpha1/source_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ type SourceSpec struct {
SourceType string `json:"sourceType,omitempty"` // refer to `--source-type` as builtin connector
// +kubebuilder:validation:Minimum=0
Replicas *int32 `json:"replicas,omitempty"`
// Whether show the precise parallelism, if true, the `Parallelism` will be equal to the `Replicas`,
// in such case, update the `Replicas` will cause all pods being recreated since the command of pod is updated.
// else, the `Parallelism` will be 1, default to false.
// It just affects the result of context.getNumInstances, there will be only 1 process and 1 thread in each pod in any cases.
ShowPreciseParallelism bool `json:"showPreciseParallelism,omitempty"`
// +kubebuilder:validation:Minimum=0
// +kubebuilder:default=1
MinReplicas *int32 `json:"minReplicas,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3251,6 +3251,8 @@ spec:
type: string
type: object
type: object
showPreciseParallelism:
type: boolean
statefulConfig:
properties:
pulsar:
Expand Down Expand Up @@ -6651,6 +6653,8 @@ spec:
type: string
type: object
type: object
showPreciseParallelism:
type: boolean
sinkConfig:
type: object
x-kubernetes-preserve-unknown-fields: true
Expand Down Expand Up @@ -9851,6 +9855,8 @@ spec:
type: string
type: object
type: object
showPreciseParallelism:
type: boolean
sourceConfig:
type: object
x-kubernetes-preserve-unknown-fields: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3270,6 +3270,8 @@ spec:
type: string
type: object
type: object
showPreciseParallelism:
type: boolean
statefulConfig:
properties:
pulsar:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3204,6 +3204,8 @@ spec:
type: string
type: object
type: object
showPreciseParallelism:
type: boolean
sinkConfig:
type: object
x-kubernetes-preserve-unknown-fields: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3187,6 +3187,8 @@ spec:
type: string
type: object
type: object
showPreciseParallelism:
type: boolean
sourceConfig:
type: object
x-kubernetes-preserve-unknown-fields: true
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/compute.functionmesh.io_functionmeshes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3252,6 +3252,8 @@ spec:
type: string
type: object
type: object
showPreciseParallelism:
type: boolean
statefulConfig:
properties:
pulsar:
Expand Down Expand Up @@ -6652,6 +6654,8 @@ spec:
type: string
type: object
type: object
showPreciseParallelism:
type: boolean
sinkConfig:
type: object
x-kubernetes-preserve-unknown-fields: true
Expand Down Expand Up @@ -9852,6 +9856,8 @@ spec:
type: string
type: object
type: object
showPreciseParallelism:
type: boolean
sourceConfig:
type: object
x-kubernetes-preserve-unknown-fields: true
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/compute.functionmesh.io_functions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3249,6 +3249,8 @@ spec:
type: string
type: object
type: object
showPreciseParallelism:
type: boolean
statefulConfig:
properties:
pulsar:
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/compute.functionmesh.io_sinks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3183,6 +3183,8 @@ spec:
type: string
type: object
type: object
showPreciseParallelism:
type: boolean
sinkConfig:
type: object
x-kubernetes-preserve-unknown-fields: true
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/compute.functionmesh.io_sources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3166,6 +3166,8 @@ spec:
type: string
type: object
type: object
showPreciseParallelism:
type: boolean
sourceConfig:
type: object
x-kubernetes-preserve-unknown-fields: true
Expand Down
15 changes: 11 additions & 4 deletions controllers/spec/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func convertFunctionDetails(function *v1alpha1.Function) *proto.FunctionDetails
UserConfig: getUserConfig(generateFunctionConfig(function)),
Runtime: runtime,
AutoAck: getBoolFromPtrOrDefault(function.Spec.AutoAck, true),
Parallelism: getInt32FromPtrOrDefault(function.Spec.Replicas, 1),
Parallelism: getParallelism(function.Spec.Replicas, function.Spec.ShowPreciseParallelism),
Source: generateFunctionInputSpec(function),
Sink: generateFunctionOutputSpec(function),
Resources: generateResource(function.Spec.Resources.Requests),
Expand Down Expand Up @@ -110,7 +110,7 @@ func convertGoFunctionConfs(function *v1alpha1.Function) *GoFunctionConf {
//SecretsMap: marshalSecretsMap(function.Spec.SecretsMap),
Runtime: int32(proto.FunctionDetails_GO),
AutoACK: getBoolFromPtrOrDefault(function.Spec.AutoAck, true),
Parallelism: getInt32FromPtrOrDefault(function.Spec.Replicas, 1),
Parallelism: getParallelism(function.Spec.Replicas, function.Spec.ShowPreciseParallelism),
TimeoutMs: uint64(function.Spec.Timeout),
SubscriptionName: function.Spec.SubscriptionName,
CleanupSubscription: function.Spec.CleanupSubscription,
Expand Down Expand Up @@ -247,7 +247,7 @@ func convertSourceDetails(source *v1alpha1.Source) *proto.FunctionDetails {
UserConfig: getUserConfig(source.Spec.SourceConfig),
Runtime: proto.FunctionDetails_JAVA,
AutoAck: true,
Parallelism: getInt32FromPtrOrDefault(source.Spec.Replicas, 1),
Parallelism: getParallelism(source.Spec.Replicas, source.Spec.ShowPreciseParallelism),
Source: generateSourceInputSpec(source),
Sink: generateSourceOutputSpec(source),
Resources: generateResource(source.Spec.Resources.Requests),
Expand Down Expand Up @@ -318,7 +318,7 @@ func convertSinkDetails(sink *v1alpha1.Sink) *proto.FunctionDetails {
ProcessingGuarantees: convertProcessingGuarantee(sink.Spec.ProcessingGuarantee),
Runtime: proto.FunctionDetails_JAVA,
AutoAck: getBoolFromPtrOrDefault(sink.Spec.AutoAck, true),
Parallelism: getInt32FromPtrOrDefault(sink.Spec.Replicas, 1),
Parallelism: getParallelism(sink.Spec.Replicas, sink.Spec.ShowPreciseParallelism),
Source: generateSinkInputSpec(sink),
Sink: generateSinkOutputSpec(sink),
Resources: generateResource(sink.Spec.Resources.Requests),
Expand Down Expand Up @@ -485,6 +485,13 @@ func getEnvOrDefault(key, fallback string) string {
return fallback
}

func getParallelism(replicas *int32, showPreciseParallelism bool) int32 {
if showPreciseParallelism {
return getInt32FromPtrOrDefault(replicas, 1)
}
return 1
}

func getDeadLetterTopicOrDefault(deadLetterTopic, subscriptionName, tenant, namespace, name string, maxMessageRetry int32) string {
if deadLetterTopic == "" && maxMessageRetry > 0 && (subscriptionName == "" || strings.Contains(subscriptionName, "\\")) {
// otherwise the auto generated DeadLetterTopic($TOPIC-$SUBNAME-DLQ) will be invalid
Expand Down

0 comments on commit 405525e

Please sign in to comment.