diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index ffef1e777e8..6d628cf6d10 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -62,6 +62,11 @@ Apache Flink is supported in Zeppelin with Flink interpreter group which consist
+## Prerequisites
+
+* Download Flink 1.10 for scala 2.11 (Only scala-2.11 is supported, scala-2.12 is not supported yet in Zeppelin)
+* Download [flink-hadoop-shaded](https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2/2.8.3-10.0/flink-shaded-hadoop-2-2.8.3-10.0.jar) and put it under lib folder of flink (flink interpreter need that to support yarn mode)
+
## Configuration
The Flink interpreter can be configured with properties provided by Zeppelin (as following table).
You can also set other flink properties which are not listed in the table. For a list of additional properties, refer to [Flink Available Properties](https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html).
@@ -79,12 +84,12 @@ You can also set other flink properties which are not listed in the table. For a
| `HADOOP_CONF_DIR` |
|
- location of hadoop conf, this is must be set if running in yarn mode |
+ Location of hadoop conf, this is must be set if running in yarn mode |
| `HIVE_CONF_DIR` |
|
- location of hive conf, this is must be set if you want to connect to hive metastore |
+ Location of hive conf, this is must be set if you want to connect to hive metastore |
| flink.execution.mode |
@@ -94,12 +99,12 @@ You can also set other flink properties which are not listed in the table. For a
| flink.execution.remote.host |
|
- jobmanager hostname if it is remote mode |
+ Host name of running JobManager. Only used for remote mode |
| flink.execution.remote.port |
|
- jobmanager port if it is remote mode |
+ Port of running JobManager. Only used for remote mode |
| flink.jm.memory |
@@ -119,7 +124,7 @@ You can also set other flink properties which are not listed in the table. For a
| local.number-taskmanager |
4 |
- Total number of TaskManagers in Local mode |
+ Total number of TaskManagers in local mode |
| flink.yarn.appName |
@@ -139,17 +144,17 @@ You can also set other flink properties which are not listed in the table. For a
| flink.udf.jars |
|
- udf jars (comma separated), zeppelin will register udf in this jar automatically for user. The udf name is the class name. |
+ Flink udf jars (comma separated), zeppelin will register udf in this jar automatically for user. The udf name is the class name. |
| flink.execution.jars |
|
- additional user jars (comma separated) |
+ Additional user jars (comma separated) |
| flink.execution.packages |
|
- additional user packages (comma separated), e.g. org.apache.flink:flink-connector-kafka_2.11:1.10,org.apache.flink:flink-connector-kafka-base_2.11:1.10,org.apache.flink:flink-json:1.10 |
+ Additional user packages (comma separated), e.g. org.apache.flink:flink-connector-kafka_2.11:1.10,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0,org.apache.flink:flink-json:1.10.0 |
| zeppelin.flink.concurrentBatchSql.max |
@@ -164,7 +169,7 @@ You can also set other flink properties which are not listed in the table. For a
| zeppelin.pyflink.python |
python |
- python binary executable for PyFlink |
+ Python binary executable for PyFlink |
| table.exec.resource.default-parallelism |
@@ -174,17 +179,22 @@ You can also set other flink properties which are not listed in the table. For a
| zeppelin.flink.scala.color |
true |
- whether display scala shell output in colorful format |
+ Whether display scala shell output in colorful format |
| zeppelin.flink.enableHive |
false |
- whether enable hive |
+ Whether enable hive |
- | zeppelin.flink.printREPLOutput |
- true |
- Print REPL output |
+ zeppelin.flink.enableHive |
+ false |
+ Whether enable hive |
+
+
+ | zeppelin.flink.hive.version |
+ 2.3.4 |
+ Hive version that you would like to connect |
| zeppelin.flink.maxResult |
@@ -192,12 +202,12 @@ You can also set other flink properties which are not listed in the table. For a
max number of row returned by sql interpreter |
- | flink.interpreter.close.shutdown_cluster |
+ `flink.interpreter.close.shutdown_cluster` |
true |
Whether shutdown application when closing interpreter |
- | zeppelin.interpreter.close.cancel_job |
+ `zeppelin.interpreter.close.cancel_job` |
true |
Whether cancel flink job when closing interpreter |
@@ -240,18 +250,16 @@ because by default it is only 4 TM with 1 Slots which may not be enough for some
### Run Flink in Remote Mode
-Running Flink in remote mode will connect to a flink standalone cluster, besides specifying `flink.execution.mode` to be `remote`. You also need to specify
-`flink.execution.remote.host` and `flink.execution.remote.port` to point to Flink Job Manager.
+Running Flink in remote mode will connect to a existing flink cluster which could be standalone cluster or yarn session cluster. Besides specifying `flink.execution.mode` to be `remote`. You also need to specify
+`flink.execution.remote.host` and `flink.execution.remote.port` to point to flink job manager.
### Run Flink in Yarn Mode
-In order to run Flink in Yarn mode, you need to make the following settings:
+In order to run flink in Yarn mode, you need to make the following settings:
* Set `flink.execution.mode` to `yarn`
-* Set `HADOOP_CONF_DIR` in `zeppelin-env.sh` or flink's interpreter setting.
-* Copy necessary dependencies to flink's lib folder, check this [link](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/hive/#depedencies) for more details
- * `flink-hadoop-compatibility_{scala_version}-{flink.version}.jar`
- * `flink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jar`
+* Set `HADOOP_CONF_DIR` in flink's interpreter setting.
+* Make sure `hadoop` command is your PATH. Because internally flink will call command `hadoop classpath` and load all the hadoop related jars in the flink interpreter process
## Blink/Flink Planner
@@ -266,14 +274,13 @@ There're 2 planners supported by Flink's table api: `flink` & `blink`.
In order to use Hive in Flink, you have to make the following setting.
-* Set `zeppelin.flink.enableHive` to `true`
-* Copy necessary dependencies to flink's lib folder, check this [link](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/hive/#depedencies) for more details
- * flink-connector-hive_{scala_version}-{flink.version}.jar
- * flink-hadoop-compatibility_{scala_version}-{flink.version}.jar
- * flink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jar
- * hive-exec-2.x.jar (for Hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303-0.9.2.jar and libthrift-0.9.2.jar)
-* Specify `HIVE_CONF_DIR` either in flink interpreter setting or `zeppelin-env.sh`
-* Specify `zeppelin.flink.hive.version`, by default it is 2.3.4. If you are using Hive 1.2.x, then you need to set it as `1.2.2`
+* Set `zeppelin.flink.enableHive` to be true
+* Set `zeppelin.flink.hive.version` to be the hive version you are using.
+* Set `HIVE_CONF_DIR` to be the location where `hive-site.xml` is located. Make sure hive metastore is started and you have configure `hive.metastore.uris` in `hive-site.xml`
+* Copy the following dependencies to the lib folder of flink installation.
+ * flink-connector-hive_2.11–1.10.0.jar
+ * flink-hadoop-compatibility_2.11–1.10.0.jar
+ * hive-exec-2.x.jar (for hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303–0.9.2.jar and libthrift-0.9.2.jar)
After these settings, you will be able to query hive table via either table api `%flink` or batch sql `%flink.bsql`
@@ -322,8 +329,8 @@ bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(),
```
Besides defining udf in Zeppelin, you can also load udfs in jars via `flink.udf.jars`. For example, you can create
-udfs in intellij and then build these udfs in one jar. After that you can specify `flink.udf.jars` to this jar, and Flink
-interpreter will inspect this jar and register all the udfs in this jar to TableEnvironment, the udf name is the class name.
+udfs in intellij and then build these udfs in one jar. After that you can specify `flink.udf.jars` to this jar, and flink
+interpreter will detect all the udfs in this jar and register all the udfs to TableEnvironment, the udf name is the class name.
## ZeppelinContext
Zeppelin automatically injects `ZeppelinContext` as variable `z` in your Scala/Python environment. `ZeppelinContext` provides some additional functions and utilities.
@@ -332,13 +339,13 @@ See [Zeppelin-Context](../usage/other_features/zeppelin_context.html) for more d
## IPython Support
By default, zeppelin would use IPython in `%flink.pyflink` when IPython is available, Otherwise it would fall back to the original python implementation.
-If you don't want to use IPython, then you can set `zeppelin.pyflink.useIPython` as `false` in interpreter setting. For the IPython features, you can refer doc
-[Python Interpreter](python.html)
+For the IPython features, you can refer doc[Python Interpreter](python.html)
## Tutorial Notes
-Zeppelin is shipped with several Flink tutorial notes which may be helpful for you. Overall you can use Flink in Zeppelin for 4 main scenairos.
+Zeppelin is shipped with several Flink tutorial notes which may be helpful for you. Except the first one, the below 4 notes cover the 4 main scenarios of flink.
+* Flink Basic
* Batch ETL
* Exploratory Data Analytics
* Streaming ETL
diff --git a/flink/src/main/resources/interpreter-setting.json b/flink/src/main/resources/interpreter-setting.json
index 5ea252d7bb1..acf1bf387ef 100644
--- a/flink/src/main/resources/interpreter-setting.json
+++ b/flink/src/main/resources/interpreter-setting.json
@@ -12,6 +12,20 @@
"description": "Location of flink distribution",
"type": "string"
},
+ "HADOOP_CONF_DIR": {
+ "envName": null,
+ "propertyName": null,
+ "defaultValue": "",
+ "description": "Location of hadoop conf (core-site.xml, hdfs-site.xml and etc.)",
+ "type": "string"
+ },
+ "HIVE_CONF_DIR": {
+ "envName": null,
+ "propertyName": null,
+ "defaultValue": "",
+ "description": "Location of hive conf (hive-site.xml)",
+ "type": "string"
+ },
"flink.execution.mode": {
"envName": null,
"propertyName": null,
@@ -71,7 +85,7 @@
"flink.yarn.queue": {
"envName": null,
"propertyName": null,
- "defaultValue": "default",
+ "defaultValue": "",
"description": "Yarn queue name",
"type": "string"
},
diff --git a/zeppelin-web/src/app/interpreter/interpreter.controller.js b/zeppelin-web/src/app/interpreter/interpreter.controller.js
index 1bbd1ee1d83..dddae0022cf 100644
--- a/zeppelin-web/src/app/interpreter/interpreter.controller.js
+++ b/zeppelin-web/src/app/interpreter/interpreter.controller.js
@@ -536,6 +536,7 @@ function InterpreterCtrl($rootScope, $scope, $http, baseUrlSrv, ngToast, $timeou
newProperties[p] = {
value: newSetting.properties[p].value,
type: newSetting.properties[p].type,
+ description: newSetting.properties[p].description,
name: p,
};
}