Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 43 additions & 36 deletions docs/interpreter/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ Apache Flink is supported in Zeppelin with Flink interpreter group which consist
</tr>
</table>

## Prerequisites

* Download Flink 1.10 for scala 2.11 (Only scala-2.11 is supported, scala-2.12 is not supported yet in Zeppelin)
* Download [flink-hadoop-shaded](https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2/2.8.3-10.0/flink-shaded-hadoop-2-2.8.3-10.0.jar) and put it under lib folder of flink (flink interpreter need that to support yarn mode)

## Configuration
The Flink interpreter can be configured with properties provided by Zeppelin (as following table).
You can also set other flink properties which are not listed in the table. For a list of additional properties, refer to [Flink Available Properties](https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html).
Expand All @@ -79,12 +84,12 @@ You can also set other flink properties which are not listed in the table. For a
<tr>
<td>`HADOOP_CONF_DIR`</td>
<td></td>
<td>location of hadoop conf, this is must be set if running in yarn mode</td>
<td>Location of hadoop conf, this is must be set if running in yarn mode</td>
</tr>
<tr>
<td>`HIVE_CONF_DIR`</td>
<td></td>
<td>location of hive conf, this is must be set if you want to connect to hive metastore</td>
<td>Location of hive conf, this is must be set if you want to connect to hive metastore</td>
</tr>
<tr>
<td>flink.execution.mode</td>
Expand All @@ -94,12 +99,12 @@ You can also set other flink properties which are not listed in the table. For a
<tr>
<td>flink.execution.remote.host</td>
<td></td>
<td>jobmanager hostname if it is remote mode</td>
<td>Host name of running JobManager. Only used for remote mode</td>
</tr>
<tr>
<td>flink.execution.remote.port</td>
<td></td>
<td>jobmanager port if it is remote mode</td>
<td>Port of running JobManager. Only used for remote mode</td>
</tr>
<tr>
<td>flink.jm.memory</td>
Expand All @@ -119,7 +124,7 @@ You can also set other flink properties which are not listed in the table. For a
<tr>
<td>local.number-taskmanager</td>
<td>4</td>
<td>Total number of TaskManagers in Local mode</td>
<td>Total number of TaskManagers in local mode</td>
</tr>
<tr>
<td>flink.yarn.appName</td>
Expand All @@ -139,17 +144,17 @@ You can also set other flink properties which are not listed in the table. For a
<tr>
<td>flink.udf.jars</td>
<td></td>
<td>udf jars (comma separated), zeppelin will register udf in this jar automatically for user. The udf name is the class name.</td>
<td>Flink udf jars (comma separated), zeppelin will register udf in this jar automatically for user. The udf name is the class name.</td>
</tr>
<tr>
<td>flink.execution.jars</td>
<td></td>
<td>additional user jars (comma separated)</td>
<td>Additional user jars (comma separated)</td>
</tr>
<tr>
<td>flink.execution.packages</td>
<td></td>
<td>additional user packages (comma separated), e.g. org.apache.flink:flink-connector-kafka_2.11:1.10,org.apache.flink:flink-connector-kafka-base_2.11:1.10,org.apache.flink:flink-json:1.10</td>
<td>Additional user packages (comma separated), e.g. org.apache.flink:flink-connector-kafka_2.11:1.10,org.apache.flink:flink-connector-kafka-base_2.11:1.10.0,org.apache.flink:flink-json:1.10.0</td>
</tr>
<tr>
<td>zeppelin.flink.concurrentBatchSql.max</td>
Expand All @@ -164,7 +169,7 @@ You can also set other flink properties which are not listed in the table. For a
<tr>
<td>zeppelin.pyflink.python</td>
<td>python</td>
<td>python binary executable for PyFlink</td>
<td>Python binary executable for PyFlink</td>
</tr>
<tr>
<td>table.exec.resource.default-parallelism</td>
Expand All @@ -174,30 +179,35 @@ You can also set other flink properties which are not listed in the table. For a
<tr>
<td>zeppelin.flink.scala.color</td>
<td>true</td>
<td>whether display scala shell output in colorful format</td>
<td>Whether display scala shell output in colorful format</td>
</tr>
<tr>
<td>zeppelin.flink.enableHive</td>
<td>false</td>
<td>whether enable hive</td>
<td>Whether enable hive</td>
</tr>
<tr>
<td>zeppelin.flink.printREPLOutput</td>
<td>true</td>
<td>Print REPL output</td>
<td>zeppelin.flink.enableHive</td>
<td>false</td>
<td>Whether enable hive</td>
</tr>
<tr>
<td>zeppelin.flink.hive.version</td>
<td>2.3.4</td>
<td>Hive version that you would like to connect</td>
</tr>
<tr>
<td>zeppelin.flink.maxResult</td>
<td>1000</td>
<td>max number of row returned by sql interpreter</td>
</tr>
<tr>
<td>flink.interpreter.close.shutdown_cluster</td>
<td>`flink.interpreter.close.shutdown_cluster`</td>
<td>true</td>
<td>Whether shutdown application when closing interpreter</td>
</tr>
<tr>
<td>zeppelin.interpreter.close.cancel_job</td>
<td>`zeppelin.interpreter.close.cancel_job`</td>
<td>true</td>
<td>Whether cancel flink job when closing interpreter</td>
</tr>
Expand Down Expand Up @@ -240,18 +250,16 @@ because by default it is only 4 TM with 1 Slots which may not be enough for some

### Run Flink in Remote Mode

Running Flink in remote mode will connect to a flink standalone cluster, besides specifying `flink.execution.mode` to be `remote`. You also need to specify
`flink.execution.remote.host` and `flink.execution.remote.port` to point to Flink Job Manager.
Running Flink in remote mode will connect to a existing flink cluster which could be standalone cluster or yarn session cluster. Besides specifying `flink.execution.mode` to be `remote`. You also need to specify
`flink.execution.remote.host` and `flink.execution.remote.port` to point to flink job manager.

### Run Flink in Yarn Mode

In order to run Flink in Yarn mode, you need to make the following settings:
In order to run flink in Yarn mode, you need to make the following settings:

* Set `flink.execution.mode` to `yarn`
* Set `HADOOP_CONF_DIR` in `zeppelin-env.sh` or flink's interpreter setting.
* Copy necessary dependencies to flink's lib folder, check this [link](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/hive/#depedencies) for more details
* `flink-hadoop-compatibility_{scala_version}-{flink.version}.jar`
* `flink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jar`
* Set `HADOOP_CONF_DIR` in flink's interpreter setting.
* Make sure `hadoop` command is your PATH. Because internally flink will call command `hadoop classpath` and load all the hadoop related jars in the flink interpreter process


## Blink/Flink Planner
Expand All @@ -266,14 +274,13 @@ There're 2 planners supported by Flink's table api: `flink` & `blink`.

In order to use Hive in Flink, you have to make the following setting.

* Set `zeppelin.flink.enableHive` to `true`
* Copy necessary dependencies to flink's lib folder, check this [link](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/hive/#depedencies) for more details
* flink-connector-hive_{scala_version}-{flink.version}.jar
* flink-hadoop-compatibility_{scala_version}-{flink.version}.jar
* flink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jar
* hive-exec-2.x.jar (for Hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303-0.9.2.jar and libthrift-0.9.2.jar)
* Specify `HIVE_CONF_DIR` either in flink interpreter setting or `zeppelin-env.sh`
* Specify `zeppelin.flink.hive.version`, by default it is 2.3.4. If you are using Hive 1.2.x, then you need to set it as `1.2.2`
* Set `zeppelin.flink.enableHive` to be true
* Set `zeppelin.flink.hive.version` to be the hive version you are using.
* Set `HIVE_CONF_DIR` to be the location where `hive-site.xml` is located. Make sure hive metastore is started and you have configure `hive.metastore.uris` in `hive-site.xml`
* Copy the following dependencies to the lib folder of flink installation. 
* flink-connector-hive_2.11–1.10.0.jar
* flink-hadoop-compatibility_2.11–1.10.0.jar
* hive-exec-2.x.jar (for hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303–0.9.2.jar and libthrift-0.9.2.jar)

After these settings, you will be able to query hive table via either table api `%flink` or batch sql `%flink.bsql`

Expand Down Expand Up @@ -322,8 +329,8 @@ bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(),
```

Besides defining udf in Zeppelin, you can also load udfs in jars via `flink.udf.jars`. For example, you can create
udfs in intellij and then build these udfs in one jar. After that you can specify `flink.udf.jars` to this jar, and Flink
interpreter will inspect this jar and register all the udfs in this jar to TableEnvironment, the udf name is the class name.
udfs in intellij and then build these udfs in one jar. After that you can specify `flink.udf.jars` to this jar, and flink
interpreter will detect all the udfs in this jar and register all the udfs to TableEnvironment, the udf name is the class name.

## ZeppelinContext
Zeppelin automatically injects `ZeppelinContext` as variable `z` in your Scala/Python environment. `ZeppelinContext` provides some additional functions and utilities.
Expand All @@ -332,13 +339,13 @@ See [Zeppelin-Context](../usage/other_features/zeppelin_context.html) for more d
## IPython Support

By default, zeppelin would use IPython in `%flink.pyflink` when IPython is available, Otherwise it would fall back to the original python implementation.
If you don't want to use IPython, then you can set `zeppelin.pyflink.useIPython` as `false` in interpreter setting. For the IPython features, you can refer doc
[Python Interpreter](python.html)
For the IPython features, you can refer doc[Python Interpreter](python.html)

## Tutorial Notes

Zeppelin is shipped with several Flink tutorial notes which may be helpful for you. Overall you can use Flink in Zeppelin for 4 main scenairos.
Zeppelin is shipped with several Flink tutorial notes which may be helpful for you. Except the first one, the below 4 notes cover the 4 main scenarios of flink.

* Flink Basic
* Batch ETL
* Exploratory Data Analytics
* Streaming ETL
Expand Down
16 changes: 15 additions & 1 deletion flink/src/main/resources/interpreter-setting.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@
"description": "Location of flink distribution",
"type": "string"
},
"HADOOP_CONF_DIR": {
"envName": null,
"propertyName": null,
"defaultValue": "",
"description": "Location of hadoop conf (core-site.xml, hdfs-site.xml and etc.)",
"type": "string"
},
"HIVE_CONF_DIR": {
"envName": null,
"propertyName": null,
"defaultValue": "",
"description": "Location of hive conf (hive-site.xml)",
"type": "string"
},
"flink.execution.mode": {
"envName": null,
"propertyName": null,
Expand Down Expand Up @@ -71,7 +85,7 @@
"flink.yarn.queue": {
"envName": null,
"propertyName": null,
"defaultValue": "default",
"defaultValue": "",
"description": "Yarn queue name",
"type": "string"
},
Expand Down
1 change: 1 addition & 0 deletions zeppelin-web/src/app/interpreter/interpreter.controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ function InterpreterCtrl($rootScope, $scope, $http, baseUrlSrv, ngToast, $timeou
newProperties[p] = {
value: newSetting.properties[p].value,
type: newSetting.properties[p].type,
description: newSetting.properties[p].description,
name: p,
};
}
Expand Down