Skip to content

Commit

Permalink
Merge branch 'master' into fix_replicas
Browse files Browse the repository at this point in the history
  • Loading branch information
freeznet authored Mar 14, 2023
2 parents ffe1591 + 908906a commit 0f72c37
Show file tree
Hide file tree
Showing 7 changed files with 614 additions and 91 deletions.
37 changes: 19 additions & 18 deletions api/compute/v1alpha1/connectorcatalog_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,24 +70,25 @@ type ConfigFieldDefinition struct {
}

type ConnectorDefinition struct {
ID string `json:"id"`
Version string `json:"version,omitempty"`
ImageRegistry string `json:"imageRegistry,omitempty"`
ImageRepository string `json:"imageRepository,omitempty"`
ImageTag string `json:"imageTag,omitempty"`
TypeClassName string `json:"typeClassName,omitempty"`
SourceTypeClassName string `json:"sourceTypeClassName,omitempty"`
SinkTypeClassName string `json:"sinkTypeClassName,omitempty"`
JarFullName string `json:"jarFullName,omitempty"`
DefaultSchemaType string `json:"defaultSchemaType,omitempty"`
DefaultSerdeClassName string `json:"defaultSerdeClassName,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
SourceClass string `json:"sourceClass,omitempty"`
SinkClass string `json:"sinkClass,omitempty"`
SourceConfigClass string `json:"sourceConfigClass,omitempty"`
SinkConfigClass string `json:"sinkConfigClass,omitempty"`
ConfigFieldDefinitions []ConfigFieldDefinition `json:"configFieldDefinitions,omitempty"`
ID string `json:"id"`
Version string `json:"version,omitempty"`
ImageRegistry string `json:"imageRegistry,omitempty"`
ImageRepository string `json:"imageRepository,omitempty"`
ImageTag string `json:"imageTag,omitempty"`
TypeClassName string `json:"typeClassName,omitempty"`
SourceTypeClassName string `json:"sourceTypeClassName,omitempty"`
SinkTypeClassName string `json:"sinkTypeClassName,omitempty"`
JarFullName string `json:"jarFullName,omitempty"`
DefaultSchemaType string `json:"defaultSchemaType,omitempty"`
DefaultSerdeClassName string `json:"defaultSerdeClassName,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
SourceClass string `json:"sourceClass,omitempty"`
SinkClass string `json:"sinkClass,omitempty"`
SourceConfigClass string `json:"sourceConfigClass,omitempty"`
SinkConfigClass string `json:"sinkConfigClass,omitempty"`
SourceConfigFieldDefinitions []ConfigFieldDefinition `json:"sourceConfigFieldDefinitions,omitempty"`
SinkConfigFieldDefinitions []ConfigFieldDefinition `json:"sinkConfigFieldDefinitions,omitempty"`
}

func init() {
Expand Down
11 changes: 9 additions & 2 deletions api/compute/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,6 @@ spec:
connectorDefinitions:
items:
properties:
configFieldDefinitions:
items:
properties:
attributes:
additionalProperties:
type: string
type: object
fieldName:
type: string
typeName:
type: string
required:
- fieldName
- typeName
type: object
type: array
defaultSchemaType:
type: string
defaultSerdeClassName:
Expand All @@ -89,12 +73,44 @@ spec:
type: string
sinkConfigClass:
type: string
sinkConfigFieldDefinitions:
items:
properties:
attributes:
additionalProperties:
type: string
type: object
fieldName:
type: string
typeName:
type: string
required:
- fieldName
- typeName
type: object
type: array
sinkTypeClassName:
type: string
sourceClass:
type: string
sourceConfigClass:
type: string
sourceConfigFieldDefinitions:
items:
properties:
attributes:
additionalProperties:
type: string
type: object
fieldName:
type: string
typeName:
type: string
required:
- fieldName
- typeName
type: object
type: array
sourceTypeClassName:
type: string
typeClassName:
Expand Down
48 changes: 32 additions & 16 deletions config/crd/bases/compute.functionmesh.io_connectorcatalogs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,6 @@ spec:
connectorDefinitions:
items:
properties:
configFieldDefinitions:
items:
properties:
attributes:
additionalProperties:
type: string
type: object
fieldName:
type: string
typeName:
type: string
required:
- fieldName
- typeName
type: object
type: array
defaultSchemaType:
type: string
defaultSerdeClassName:
Expand All @@ -68,12 +52,44 @@ spec:
type: string
sinkConfigClass:
type: string
sinkConfigFieldDefinitions:
items:
properties:
attributes:
additionalProperties:
type: string
type: object
fieldName:
type: string
typeName:
type: string
required:
- fieldName
- typeName
type: object
type: array
sinkTypeClassName:
type: string
sourceClass:
type: string
sourceConfigClass:
type: string
sourceConfigFieldDefinitions:
items:
properties:
attributes:
additionalProperties:
type: string
type: object
fieldName:
type: string
typeName:
type: string
required:
- fieldName
- typeName
type: object
type: array
sourceTypeClassName:
type: string
typeClassName:
Expand Down
2 changes: 1 addition & 1 deletion config/samples/compute_v1alpha1_connectorcatalog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ spec:
version: 2.9.2.17
imageTag: 2.9.2.17
typeClassName: org.apache.pulsar.io.datagenerator.Person
configFieldDefinitions:
sourceConfigFieldDefinitions:
- fieldName: sleepBetweenMessages
typeName: long
attributes:
Expand Down
18 changes: 15 additions & 3 deletions controllers/spec/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func convertFunctionDetails(function *v1alpha1.Function) *proto.FunctionDetails
} else if function.Spec.Python != nil {
runtime = proto.FunctionDetails_PYTHON
}
deadLetterTopic := getDeadLetterTopicOrDefault(function.Spec.DeadLetterTopic, function.Spec.SubscriptionName, function.Spec.Tenant, function.Spec.Namespace, function.Spec.Name, function.Spec.MaxMessageRetry)
fd := &proto.FunctionDetails{
Tenant: function.Spec.Tenant,
Namespace: function.Spec.Namespace,
Expand All @@ -56,7 +57,7 @@ func convertFunctionDetails(function *v1alpha1.Function) *proto.FunctionDetails
Sink: generateFunctionOutputSpec(function),
Resources: generateResource(function.Spec.Resources.Requests),
PackageUrl: "",
RetryDetails: generateRetryDetails(function.Spec.MaxMessageRetry, function.Spec.DeadLetterTopic),
RetryDetails: generateRetryDetails(function.Spec.MaxMessageRetry, deadLetterTopic),
RuntimeFlags: function.Spec.RuntimeFlags,
ComponentType: proto.FunctionDetails_FUNCTION,
CustomRuntimeOptions: "",
Expand Down Expand Up @@ -93,6 +94,7 @@ func fetchClassName(function *v1alpha1.Function) string {
}

func convertGoFunctionConfs(function *v1alpha1.Function) *GoFunctionConf {
deadLetterTopic := getDeadLetterTopicOrDefault(function.Spec.DeadLetterTopic, function.Spec.SubscriptionName, function.Spec.Tenant, function.Spec.Namespace, function.Spec.Name, function.Spec.MaxMessageRetry)
return &GoFunctionConf{
FuncID: fmt.Sprintf("${%s}-%s", EnvShardID, string(function.UID)),
PulsarServiceURL: "${brokerServiceURL}",
Expand Down Expand Up @@ -121,7 +123,7 @@ func convertGoFunctionConfs(function *v1alpha1.Function) *GoFunctionConf {
RAM: function.Spec.Resources.Requests.Memory().Value(),
Disk: function.Spec.Resources.Requests.Storage().Value(),
MaxMessageRetries: function.Spec.MaxMessageRetry,
DeadLetterTopic: function.Spec.DeadLetterTopic,
DeadLetterTopic: deadLetterTopic,
UserConfig: getUserConfig(function.Spec.FuncConfig),
MetricsPort: int(MetricsPort.ContainerPort),
ExpectedHealthCheckInterval: -1, // TurnOff BuiltIn HealthCheck to avoid instance exit
Expand Down Expand Up @@ -307,6 +309,7 @@ func generateSourceOutputSpec(source *v1alpha1.Source) *proto.SinkSpec {
}

func convertSinkDetails(sink *v1alpha1.Sink) *proto.FunctionDetails {
deadLetterTopic := getDeadLetterTopicOrDefault(sink.Spec.DeadLetterTopic, sink.Spec.SubscriptionName, sink.Spec.Tenant, sink.Spec.Namespace, sink.Spec.Name, sink.Spec.MaxMessageRetry)
fd := &proto.FunctionDetails{
Tenant: sink.Spec.Tenant,
Namespace: sink.Spec.Namespace,
Expand All @@ -319,7 +322,7 @@ func convertSinkDetails(sink *v1alpha1.Sink) *proto.FunctionDetails {
Source: generateSinkInputSpec(sink),
Sink: generateSinkOutputSpec(sink),
Resources: generateResource(sink.Spec.Resources.Requests),
RetryDetails: generateRetryDetails(sink.Spec.MaxMessageRetry, sink.Spec.DeadLetterTopic),
RetryDetails: generateRetryDetails(sink.Spec.MaxMessageRetry, deadLetterTopic),
RuntimeFlags: sink.Spec.RuntimeFlags,
ComponentType: proto.FunctionDetails_SINK,
RetainOrdering: sink.Spec.RetainOrdering,
Expand Down Expand Up @@ -488,3 +491,12 @@ func getParallelism(replicas *int32, showPreciseParallelism bool) int32 {
}
return 1
}

func getDeadLetterTopicOrDefault(deadLetterTopic, subscriptionName, tenant, namespace, name string, maxMessageRetry int32) string {
if deadLetterTopic == "" && maxMessageRetry > 0 && (subscriptionName == "" || strings.Contains(subscriptionName, "\\")) {
// otherwise the auto generated DeadLetterTopic($TOPIC-$SUBNAME-DLQ) will be invalid
// like: persistent://public/default/input-public/default/test-function-DLQ
return fmt.Sprintf("%s-%s-%s-DLQ", tenant, namespace, name)
}
return deadLetterTopic
}
Loading

0 comments on commit 0f72c37

Please sign in to comment.