Skip to content

Commit

Permalink
[Feature] apache flink 1.19 support (#3673)
Browse files Browse the repository at this point in the history
* [Improve] apache flink 1.19 support

* [Improve] 2.1.4 upgrade sql bug fixed.

---------

Co-authored-by: benjobs <[email protected]>
  • Loading branch information
wolfboys and benjobs authored Apr 19, 2024
1 parent b4c4050 commit b13a3f0
Show file tree
Hide file tree
Showing 14 changed files with 577 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class FlinkVersion(val flinkHome: String) extends java.io.Serializable with Logg

def checkVersion(throwException: Boolean = true): Boolean = {
version.split("\\.").map(_.trim.toInt) match {
case Array(1, v, _) if v >= 12 && v <= 18 => true
case Array(1, v, _) if v >= 12 && v <= 19 => true
case _ =>
if (throwException) {
throw new UnsupportedOperationException(s"Unsupported flink version: $version")
Expand Down
7 changes: 7 additions & 0 deletions streampark-console/streampark-console-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,13 @@
<version>${project.version}</version>
<outputDirectory>${project.build.directory}/shims</outputDirectory>
</dependency>
<!-- flink 1.19 support-->
<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-shims_flink-1.19_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<outputDirectory>${project.build.directory}/shims</outputDirectory>
</dependency>
<!-- flink-submit-core -->
<dependency>
<groupId>org.apache.streampark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ SET a.`flink_cluster_id` = c.`id`;

UPDATE `t_flink_app`
SET `cluster_id` = `app_id`
WHERE `execution_mode` IN (2,3,5);
WHERE `execution_mode` IN (2,3,4);

ALTER TABLE `t_flink_app` DROP COLUMN `app_id`;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ WHERE t_flink_app.cluster_id = t_flink_cluster.cluster_id

UPDATE t_flink_app
SET cluster_id = app_id
WHERE execution_mode IN (2,3,5);
WHERE execution_mode IN (2,3,4);

ALTER TABLE t_flink_app DROP COLUMN app_id;
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.streampark.common.conf.FlinkVersion;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;

import org.apache.commons.io.FileUtils;
Expand All @@ -33,6 +34,7 @@

import java.io.File;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -67,9 +69,29 @@ public class FlinkEnv implements Serializable {
private transient String streamParkScalaVersion = scala.util.Properties.versionNumberString();

public void doSetFlinkConf() throws ApiDetailException {
File yaml;
float ver = Float.parseFloat(getVersionOfFirst().concat(".").concat(getVersionOfMiddle()));
if (ver < 1.19f) {
yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
if (!yaml.exists()) {
throw new ApiAlertException("cannot find flink-conf.yaml in flink/conf ");
}
} else if (ver == 1.19f) {
yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
if (!yaml.exists()) {
yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
}
if (!yaml.exists()) {
throw new ApiAlertException("cannot find config.yaml|flink-conf.yaml in flink/conf ");
}
} else {
yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
if (!yaml.exists()) {
throw new ApiAlertException("cannot find config.yaml in flink/conf ");
}
}
try {
File yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
String flinkConf = FileUtils.readFileToString(yaml);
String flinkConf = FileUtils.readFileToString(yaml, StandardCharsets.UTF_8);
this.flinkConf = DeflaterUtils.zipString(flinkConf);
} catch (Exception e) {
throw new ApiDetailException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class EnvInitializer implements ApplicationRunner {

private static final Pattern PATTERN_FLINK_SHIMS_JAR =
Pattern.compile(
"^streampark-flink-shims_flink-(1.1[2-8])_(2.11|2.12)-(.*).jar$",
"^streampark-flink-shims_flink-(1.1[2-9])_(2.11|2.12)-(.*).jar$",
Pattern.CASE_INSENSITIVE | Pattern.DOTALL);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public boolean create(FlinkEnv version) throws Exception {
long count = this.baseMapper.selectCount(null);
version.setIsDefault(count == 0);
version.setCreateTime(new Date());
version.doSetFlinkConf();
version.doSetVersion();
version.doSetFlinkConf();
return save(version);
}

Expand Down
1 change: 1 addition & 0 deletions streampark-flink/streampark-flink-shims/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
<module>streampark-flink-shims_flink-1.16</module>
<module>streampark-flink-shims_flink-1.17</module>
<module>streampark-flink-shims_flink-1.18</module>
<module>streampark-flink-shims_flink-1.19</module>
</modules>
</profile>
</profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-shims</artifactId>
<version>2.1.4</version>
</parent>

<artifactId>streampark-flink-shims_flink-1.19_${scala.binary.version}</artifactId>
<name>StreamPark : Flink Shims 1.19</name>

<properties>
<flink.version>1.19.0</flink.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-shims-base_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!--flink-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-uber</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
<artifactSet>
<includes>
<include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.flink.core

import org.apache.flink.api.common.JobID
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.core.execution.SavepointFormatType

import java.util.concurrent.CompletableFuture

class FlinkClusterClient[T](clusterClient: ClusterClient[T])
extends FlinkClientTrait[T](clusterClient) {

override def triggerSavepoint(jobID: JobID, savepointDir: String): CompletableFuture[String] = {
clusterClient.triggerSavepoint(jobID, savepointDir, SavepointFormatType.DEFAULT)
}

override def cancelWithSavepoint(
jobID: JobID,
savepointDirectory: String): CompletableFuture[String] = {
clusterClient.cancelWithSavepoint(jobID, savepointDirectory, SavepointFormatType.DEFAULT)
}

override def stopWithSavepoint(
jobID: JobID,
advanceToEndOfEventTime: Boolean,
savepointDirectory: String): CompletableFuture[String] = {
clusterClient.stopWithSavepoint(
jobID,
advanceToEndOfEventTime,
savepointDirectory,
SavepointFormatType.DEFAULT)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.flink.core

import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService

import java.util.Optional

class FlinkKubernetesClient(kubeClient: FlinkKubeClient)
extends FlinkKubernetesClientTrait(kubeClient) {

override def getService(serviceName: String): Optional[KubernetesService] = {
kubeClient.getService(serviceName)
}

}
Loading

0 comments on commit b13a3f0

Please sign in to comment.