From b13a3f095e852600dc2a127095d2a3a179881d8e Mon Sep 17 00:00:00 2001 From: benjobs Date: Sat, 20 Apr 2024 00:19:02 +0800 Subject: [PATCH] [Feature] apache flink 1.19 support (#3673) * [Improve] apache flink 1.19 support * [Improve] 2.1.4 upgrade sql bug fixed. --------- Co-authored-by: benjobs --- .../streampark/common/conf/FlinkVersion.scala | 2 +- .../streampark-console-service/pom.xml | 7 + .../assembly/script/upgrade/mysql/2.1.4.sql | 2 +- .../assembly/script/upgrade/pgsql/2.1.4.sql | 2 +- .../console/core/entity/FlinkEnv.java | 26 ++- .../console/core/runner/EnvInitializer.java | 2 +- .../service/impl/FlinkEnvServiceImpl.java | 2 +- .../streampark-flink-shims/pom.xml | 1 + .../streampark-flink-shims_flink-1.19/pom.xml | 154 +++++++++++++++++ .../flink/core/FlinkClusterClient.scala | 49 ++++++ .../flink/core/FlinkKubernetesClient.scala | 31 ++++ .../flink/core/StreamTableContext.scala | 161 ++++++++++++++++++ .../streampark/flink/core/TableContext.scala | 103 +++++++++++ .../streampark/flink/core/TableExt.scala | 42 +++++ 14 files changed, 577 insertions(+), 7 deletions(-) create mode 100644 streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml create mode 100644 streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala create mode 100644 streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala create mode 100644 streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala create mode 100644 streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableContext.scala create mode 100644 streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableExt.scala diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala index b6e5e6a908..6dec3b29b4 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala @@ -116,7 +116,7 @@ class FlinkVersion(val flinkHome: String) extends java.io.Serializable with Logg def checkVersion(throwException: Boolean = true): Boolean = { version.split("\\.").map(_.trim.toInt) match { - case Array(1, v, _) if v >= 12 && v <= 18 => true + case Array(1, v, _) if v >= 12 && v <= 19 => true case _ => if (throwException) { throw new UnsupportedOperationException(s"Unsupported flink version: $version") diff --git a/streampark-console/streampark-console-service/pom.xml b/streampark-console/streampark-console-service/pom.xml index 2d7a7a520d..aa85f698fd 100644 --- a/streampark-console/streampark-console-service/pom.xml +++ b/streampark-console/streampark-console-service/pom.xml @@ -618,6 +618,13 @@ ${project.version} ${project.build.directory}/shims + + + org.apache.streampark + streampark-flink-shims_flink-1.19_${scala.binary.version} + ${project.version} + ${project.build.directory}/shims + org.apache.streampark diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql index 04b7f6ea47..33607f396b 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.4.sql @@ -27,7 +27,7 @@ SET a.`flink_cluster_id` = c.`id`; UPDATE `t_flink_app` SET `cluster_id` = `app_id` -WHERE `execution_mode` IN (2,3,5); +WHERE `execution_mode` IN (2,3,4); ALTER TABLE `t_flink_app` DROP COLUMN `app_id`; diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql index 6b216e6eca..6e66682145 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.1.4.sql @@ -23,6 +23,6 @@ WHERE t_flink_app.cluster_id = t_flink_cluster.cluster_id UPDATE t_flink_app SET cluster_id = app_id -WHERE execution_mode IN (2,3,5); +WHERE execution_mode IN (2,3,4); ALTER TABLE t_flink_app DROP COLUMN app_id; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java index 6decd5de54..b1ed53da85 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java @@ -20,6 +20,7 @@ import org.apache.streampark.common.conf.FlinkVersion; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.PropertiesUtils; +import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.exception.ApiDetailException; import org.apache.commons.io.FileUtils; @@ -33,6 +34,7 @@ import java.io.File; import java.io.Serializable; +import java.nio.charset.StandardCharsets; import java.util.Date; import java.util.Map; import java.util.Properties; @@ -67,9 +69,29 @@ public class FlinkEnv implements Serializable { private transient String streamParkScalaVersion = scala.util.Properties.versionNumberString(); public void doSetFlinkConf() throws ApiDetailException { + File yaml; + float ver = Float.parseFloat(getVersionOfFirst().concat(".").concat(getVersionOfMiddle())); + if (ver < 1.19f) { + yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml")); + if (!yaml.exists()) { + throw new ApiAlertException("cannot find flink-conf.yaml in flink/conf "); + } + } else if (ver == 1.19f) { + yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml")); + if (!yaml.exists()) { + yaml = new File(this.flinkHome.concat("/conf/config.yaml")); + } + if (!yaml.exists()) { + throw new ApiAlertException("cannot find config.yaml|flink-conf.yaml in flink/conf "); + } + } else { + yaml = new File(this.flinkHome.concat("/conf/config.yaml")); + if (!yaml.exists()) { + throw new ApiAlertException("cannot find config.yaml in flink/conf "); + } + } try { - File yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml")); - String flinkConf = FileUtils.readFileToString(yaml); + String flinkConf = FileUtils.readFileToString(yaml, StandardCharsets.UTF_8); this.flinkConf = DeflaterUtils.zipString(flinkConf); } catch (Exception e) { throw new ApiDetailException(e); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java index 841f954fa4..0e87a93612 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java @@ -68,7 +68,7 @@ public class EnvInitializer implements ApplicationRunner { private static final Pattern PATTERN_FLINK_SHIMS_JAR = Pattern.compile( - "^streampark-flink-shims_flink-(1.1[2-8])_(2.11|2.12)-(.*).jar$", + "^streampark-flink-shims_flink-(1.1[2-9])_(2.11|2.12)-(.*).jar$", Pattern.CASE_INSENSITIVE | Pattern.DOTALL); @Override diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java index e83bd65bbb..5a53e587ea 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java @@ -84,8 +84,8 @@ public boolean create(FlinkEnv version) throws Exception { long count = this.baseMapper.selectCount(null); version.setIsDefault(count == 0); version.setCreateTime(new Date()); - version.doSetFlinkConf(); version.doSetVersion(); + version.doSetFlinkConf(); return save(version); } diff --git a/streampark-flink/streampark-flink-shims/pom.xml b/streampark-flink/streampark-flink-shims/pom.xml index 587c5bd4eb..1b53bef491 100644 --- a/streampark-flink/streampark-flink-shims/pom.xml +++ b/streampark-flink/streampark-flink-shims/pom.xml @@ -45,6 +45,7 @@ streampark-flink-shims_flink-1.16 streampark-flink-shims_flink-1.17 streampark-flink-shims_flink-1.18 + streampark-flink-shims_flink-1.19 diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml new file mode 100644 index 0000000000..2e0751ecb7 --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/pom.xml @@ -0,0 +1,154 @@ + + + + 4.0.0 + + + org.apache.streampark + streampark-flink-shims + 2.1.4 + + + streampark-flink-shims_flink-1.19_${scala.binary.version} + StreamPark : Flink Shims 1.19 + + + 1.19.0 + + + + + org.apache.streampark + streampark-flink-shims-base_${scala.binary.version} + ${project.version} + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + provided + + + + + org.apache.flink + flink-table-api-scala_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-scala_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-streaming-scala_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-java-uber + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-scala-bridge_${scala.binary.version} + ${flink.version} + true + + + + org.apache.flink + flink-statebackend-rocksdb + ${flink.version} + provided + + + + org.apache.flink + flink-yarn + ${flink.version} + provided + + + + org.apache.hadoop + hadoop-client-api + true + + + + org.apache.hadoop + hadoop-client-runtime + true + + + + org.apache.flink + flink-kubernetes + ${flink.version} + provided + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + true + ${project.basedir}/target/dependency-reduced-pom.xml + + + org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version} + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala new file mode 100644 index 0000000000..4f6336f5a1 --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkClusterClient.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.streampark.flink.core + +import org.apache.flink.api.common.JobID +import org.apache.flink.client.program.ClusterClient +import org.apache.flink.core.execution.SavepointFormatType + +import java.util.concurrent.CompletableFuture + +class FlinkClusterClient[T](clusterClient: ClusterClient[T]) + extends FlinkClientTrait[T](clusterClient) { + + override def triggerSavepoint(jobID: JobID, savepointDir: String): CompletableFuture[String] = { + clusterClient.triggerSavepoint(jobID, savepointDir, SavepointFormatType.DEFAULT) + } + + override def cancelWithSavepoint( + jobID: JobID, + savepointDirectory: String): CompletableFuture[String] = { + clusterClient.cancelWithSavepoint(jobID, savepointDirectory, SavepointFormatType.DEFAULT) + } + + override def stopWithSavepoint( + jobID: JobID, + advanceToEndOfEventTime: Boolean, + savepointDirectory: String): CompletableFuture[String] = { + clusterClient.stopWithSavepoint( + jobID, + advanceToEndOfEventTime, + savepointDirectory, + SavepointFormatType.DEFAULT) + } + +} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala new file mode 100644 index 0000000000..f388c8e9f4 --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/FlinkKubernetesClient.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.streampark.flink.core + +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService + +import java.util.Optional + +class FlinkKubernetesClient(kubeClient: FlinkKubeClient) + extends FlinkKubernetesClientTrait(kubeClient) { + + override def getService(serviceName: String): Optional[KubernetesService] = { + kubeClient.getService(serviceName) + } + +} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala new file mode 100644 index 0000000000..65f715c752 --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.core + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} +import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, PlanReference, Schema, Table, TableDescriptor, TableResult} +import org.apache.flink.table.api.bridge.scala.{StreamStatementSet, StreamTableEnvironment} +import org.apache.flink.table.catalog.CatalogDescriptor +import org.apache.flink.table.connector.ChangelogMode +import org.apache.flink.table.module.ModuleEntry +import org.apache.flink.table.resource.ResourceUri +import org.apache.flink.table.types.AbstractDataType +import org.apache.flink.types.Row + +import java.util.{List => JList} + +class StreamTableContext( + override val parameter: ParameterTool, + private val streamEnv: StreamExecutionEnvironment, + private val tableEnv: StreamTableEnvironment) + extends FlinkStreamTableTrait(parameter, streamEnv, tableEnv) { + + def this(args: (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment)) = + this(args._1, args._2, args._3) + + def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initialize(args)) + + override def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table = + tableEnv.fromDataStream[T](dataStream, schema) + + override def fromChangelogStream(dataStream: DataStream[Row]): Table = + tableEnv.fromChangelogStream(dataStream) + + override def fromChangelogStream(dataStream: DataStream[Row], schema: Schema): Table = + tableEnv.fromChangelogStream(dataStream, schema) + + override def fromChangelogStream( + dataStream: DataStream[Row], + schema: Schema, + changelogMode: ChangelogMode): Table = + tableEnv.fromChangelogStream(dataStream, schema, changelogMode) + + override def createTemporaryView[T]( + path: String, + dataStream: DataStream[T], + schema: Schema): Unit = tableEnv.createTemporaryView[T](path, dataStream, schema) + + override def toDataStream(table: Table): DataStream[Row] = { + isConvertedToDataStream = true + tableEnv.toDataStream(table) + } + + override def toDataStream[T](table: Table, targetClass: Class[T]): DataStream[T] = { + isConvertedToDataStream = true + tableEnv.toDataStream[T](table, targetClass) + } + + override def toDataStream[T](table: Table, targetDataType: AbstractDataType[_]): DataStream[T] = { + isConvertedToDataStream = true + tableEnv.toDataStream[T](table, targetDataType) + } + + override def toChangelogStream(table: Table): DataStream[Row] = { + isConvertedToDataStream = true + tableEnv.toChangelogStream(table) + } + + override def toChangelogStream(table: Table, targetSchema: Schema): DataStream[Row] = { + isConvertedToDataStream = true + tableEnv.toChangelogStream(table, targetSchema) + } + + override def toChangelogStream( + table: Table, + targetSchema: Schema, + changelogMode: ChangelogMode): DataStream[Row] = { + isConvertedToDataStream = true + tableEnv.toChangelogStream(table, targetSchema, changelogMode) + } + + override def createStatementSet(): StreamStatementSet = tableEnv.createStatementSet() + + override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*) + + override def createTemporaryTable(path: String, descriptor: TableDescriptor): Unit = + tableEnv.createTemporaryTable(path, descriptor) + + override def createTable(path: String, descriptor: TableDescriptor): Unit = + tableEnv.createTable(path, descriptor) + + override def from(descriptor: TableDescriptor): Table = tableEnv.from(descriptor) + + override def listFullModules(): Array[ModuleEntry] = tableEnv.listFullModules() + + /** @since 1.15 */ + override def listTables(s: String, s1: String): Array[String] = tableEnv.listTables(s, s1) + + /** @since 1.15 */ + override def loadPlan(planReference: PlanReference): CompiledPlan = + tableEnv.loadPlan(planReference) + + /** @since 1.15 */ + override def compilePlanSql(s: String): CompiledPlan = tableEnv.compilePlanSql(s) + + /** @since 1.17 */ + override def createFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createFunction(path, className, resourceUris) + + /** @since 1.17 */ + override def createFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri], + ignoreIfExists: Boolean): Unit = + tableEnv.createFunction(path, className, resourceUris, ignoreIfExists) + + /** @since 1.17 */ + override def createTemporaryFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createTemporaryFunction(path, className, resourceUris) + + /** @since 1.17 */ + override def createTemporarySystemFunction( + name: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createTemporarySystemFunction(name, className, resourceUris) + + /** @since 1.17 */ + override def explainSql( + statement: String, + format: ExplainFormat, + extraDetails: ExplainDetail*): String = + tableEnv.explainSql(statement, format, extraDetails: _*) + + /** @since 1.18 */ + override def createCatalog(catalog: String, catalogDescriptor: CatalogDescriptor): Unit = { + tableEnv.createCatalog(catalog, catalogDescriptor) + } +} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableContext.scala new file mode 100644 index 0000000000..e8f704f393 --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableContext.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.core + +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.table.api.{CompiledPlan, ExplainDetail, ExplainFormat, PlanReference, Table, TableDescriptor, TableEnvironment, TableResult} +import org.apache.flink.table.catalog.CatalogDescriptor +import org.apache.flink.table.module.ModuleEntry +import org.apache.flink.table.resource.ResourceUri + +import java.util.{List => JList} + +class TableContext(override val parameter: ParameterTool, private val tableEnv: TableEnvironment) + extends FlinkTableTrait(parameter, tableEnv) { + + def this(args: (ParameterTool, TableEnvironment)) = this(args._1, args._2) + + def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args)) + + override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*) + + override def createTemporaryTable(path: String, descriptor: TableDescriptor): Unit = { + tableEnv.createTemporaryTable(path, descriptor) + } + + override def createTable(path: String, descriptor: TableDescriptor): Unit = { + tableEnv.createTable(path, descriptor) + } + + override def from(tableDescriptor: TableDescriptor): Table = { + tableEnv.from(tableDescriptor) + } + + override def listFullModules(): Array[ModuleEntry] = tableEnv.listFullModules() + + /** @since 1.15 */ + override def listTables(catalogName: String, databaseName: String): Array[String] = + tableEnv.listTables(catalogName, databaseName) + + /** @since 1.15 */ + override def loadPlan(planReference: PlanReference): CompiledPlan = + tableEnv.loadPlan(planReference) + + /** @since 1.15 */ + override def compilePlanSql(stmt: String): CompiledPlan = tableEnv.compilePlanSql(stmt) + + /** @since 1.17 */ + override def createFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createFunction(path, className, resourceUris) + + /** @since 1.17 */ + override def createFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri], + ignoreIfExists: Boolean): Unit = + tableEnv.createFunction(path, className, resourceUris, ignoreIfExists) + + /** @since 1.17 */ + override def createTemporaryFunction( + path: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createTemporaryFunction(path, className, resourceUris) + + /** @since 1.17 */ + override def createTemporarySystemFunction( + name: String, + className: String, + resourceUris: JList[ResourceUri]): Unit = + tableEnv.createTemporarySystemFunction(name, className, resourceUris) + + /** @since 1.17 */ + override def explainSql( + statement: String, + format: ExplainFormat, + extraDetails: ExplainDetail*): String = + tableEnv.explainSql(statement, format, extraDetails: _*) + + /** @since 1.18 */ + override def createCatalog(catalog: String, catalogDescriptor: CatalogDescriptor): Unit = { + tableEnv.createCatalog(catalog, catalogDescriptor) + } + +} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableExt.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableExt.scala new file mode 100644 index 0000000000..cab368e361 --- /dev/null +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.19/src/main/scala/org/apache/streampark/flink/core/TableExt.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.flink.core + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.scala.DataStream +import org.apache.flink.table.api.{Table => FlinkTable} +import org.apache.flink.table.api.bridge.scala.{TableConversions => FlinkTableConversions} +import org.apache.flink.types.Row + +object TableExt { + + class Table(val table: FlinkTable) { + def ->(field: String, fields: String*): FlinkTable = table.as(field, fields: _*) + } + + class TableConversions(table: FlinkTable) extends FlinkTableConversions(table) { + + def \\ : DataStream[Row] = toDataStream + + def >>[T: TypeInformation](implicit context: StreamTableContext): DataStream[T] = { + context.isConvertedToDataStream = true + super.toAppendStream + } + } + +}