Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flink/flink1.10-shims/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<name>Zeppelin: Flink1.10 Shims</name>

<properties>
<flink.version>1.10.0</flink.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to define a constant in the flink-parent so it will be shared?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid not. I plan to add profile flink-1.10 and flink-1.11 for flink interpreter after flink 1.11 is released. But flink.version in flink1.10-shims will not be affected by profile, it should always be flink 1.10, flink.version in flink/interpreter module will be affected by this profile, and will run test under flink/interpreter for different flink version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant, that we can define property, like, <flink.1.10.version> and refer to it... I did this for Hadoop:

    <hadoop2.7.version>2.7.7</hadoop2.7.version>
    <hadoop2.6.version>2.6.5</hadoop2.6.version>
    <hadoop3.0.version>3.0.3</hadoop3.0.version>
    <hadoop3.1.version>3.1.3</hadoop3.1.version>

and then use it as ${hadoop3.1.version}, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I have fixed it

<flink.version>${flink1.10.version}</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
</properties>
Expand Down
2 changes: 1 addition & 1 deletion flink/flink1.11-shims/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<name>Zeppelin: Flink1.11 Shims</name>

<properties>
<flink.version>1.11-SNAPSHOT</flink.version>
<flink.version>${flink1.11.version}</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
</properties>
Expand Down
7 changes: 3 additions & 4 deletions flink/interpreter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
<properties>
<!--library versions-->
<interpreter.name>flink</interpreter.name>
<!-- <flink.version>1.11-SNAPSHOT</flink.version>-->
<flink.version>1.10.0</flink.version>
<flink.version>${flink1.10.version}</flink.version>
<flink.hadoop.version>2.6.5</flink.hadoop.version>
<hive.version>2.3.4</hive.version>
<hiverunner.version>4.0.0</hiverunner.version>
Expand Down Expand Up @@ -876,14 +875,14 @@
<profile>
<id>flink-1.10</id>
<properties>
<flink.version>1.10.0</flink.version>
<flink.version>${flink1.10.version}</flink.version>
</properties>
</profile>

<profile>
<id>flink-1.11</id>
<properties>
<flink.version>1.11-SNAPSHOT</flink.version>
<flink.version>${flink1.11.version}</flink.version>
</properties>
</profile>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, Tabl
import org.apache.flink.table.module.ModuleManager
import org.apache.flink.table.module.hive.HiveModule
import org.apache.flink.yarn.cli.FlinkYarnSessionCli
import org.apache.flink.yarn.executors.YarnSessionClusterExecutor
import org.apache.zeppelin.flink.util.DependencyUtils
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
Expand Down Expand Up @@ -223,6 +224,10 @@ class FlinkScalaInterpreter(val properties: Properties) {
.copy(port = Some(Integer.parseInt(port)))
}

if (config.executionMode == ExecutionMode.YARN) {
// workaround for FLINK-17788, otherwise it won't work with flink 1.10.1 which has been released.
configuration.set(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME)
}
config
}

Expand Down
5 changes: 5 additions & 0 deletions flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
<module>flink1.11-shims</module>
</modules>

<properties>
<flink1.10.version>1.10.1</flink1.10.version>
<flink1.11.version>1.11-SNAPSHOT</flink1.11.version>
</properties>

<dependencies>

<dependency>
Expand Down