From 7f30f5a3c443770f759f2fc15e46192c2e6d0487 Mon Sep 17 00:00:00 2001 From: benjobs Date: Sat, 27 Jan 2024 17:40:01 +0800 Subject: [PATCH] [Improve] ingress class improvement --- .../kubernetes/ingress/IngressController.scala | 2 +- .../flink/kubernetes/ingress/IngressStrategy.scala | 4 +--- .../kubernetes/ingress/IngressStrategyV1.scala | 1 - .../kubernetes/ingress/IngressStrategyV1beta1.scala | 13 ++++++++++++- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala index 8bf576329b..d6597ef197 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala @@ -29,7 +29,7 @@ object IngressController extends Logger { private[this] val VERSION_REGEXP = "(\\d+\\.\\d+)".r - private lazy val clusterVersion = using(new DefaultKubernetesClient()) { + lazy val clusterVersion = using(new DefaultKubernetesClient()) { client => VERSION_REGEXP.findFirstIn(client.getVersion.getGitVersion).get.toDouble } diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala index b44c216f8f..ade4c8caef 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala @@ -47,13 +47,11 @@ trait IngressStrategy { } def buildIngressAnnotations(clusterId: String, namespace: String): Map[String, String] = { - val annotations = Map( - "kubernetes.io/ingress.class" -> ingressClass, + Map( "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2", "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m", "nginx.ingress.kubernetes.io/configuration-snippet" -> s"""rewrite ^(/$clusterId)$$ $$1/ permanent; sub_filter '' ''; sub_filter_once off;""" ) - annotations } def buildIngressLabels(clusterId: String): Map[String, String] = { diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala index d7fd5d37e2..9e08338e96 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala @@ -47,7 +47,6 @@ class IngressStrategyV1 extends IngressStrategy { throw new RuntimeException(s"[StreamPark] get ingressUrlAddress error: $e") }.get } - } override def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = { diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala index 878f13b3e2..1cfeceb68c 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala @@ -22,6 +22,7 @@ import org.apache.streampark.common.util.Utils import io.fabric8.kubernetes.api.model.IntOrString import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder import io.fabric8.kubernetes.client.DefaultKubernetesClient +import org.apache.commons.lang3.StringUtils import org.apache.flink.client.program.ClusterClient import scala.collection.JavaConverters._ @@ -50,6 +51,17 @@ class IngressStrategyV1beta1 extends IngressStrategy { } } + override def buildIngressAnnotations( + clusterId: String, + namespace: String): Map[String, String] = { + val map = super.buildIngressAnnotations(clusterId, namespace) + if (StringUtils.isNotBlank(ingressClass)) { + Map("kubernetes.io/ingress.class" -> ingressClass) ++ map + } else { + map + } + } + override def configureIngress(domainName: String, clusterId: String, nameSpace: String): Unit = { Utils.using(new DefaultKubernetesClient) { client => @@ -62,7 +74,6 @@ class IngressStrategyV1beta1 extends IngressStrategy { .addToOwnerReferences(ownerReference) .endMetadata() .withNewSpec() - .withIngressClassName(ingressClass) .addNewRule() .withHost(domainName) .withNewHttp()