Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bin/interpreter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ elif [[ "${INTERPRETER_ID}" == "pig" ]]; then
echo "TEZ_CONF_DIR is not set, configuration might not be loaded"
fi
elif [[ "${INTERPRETER_ID}" == "flink" ]]; then
addJarInDirForIntp "${FLINK_HOME}/lib"
addJarInDirForIntp "${FLINK_HOME}/opt"
if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR}
Expand Down
170 changes: 147 additions & 23 deletions docs/interpreter/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,38 +39,162 @@ At the "Interpreters" menu, you have to create a new Flink interpreter and provi
<th>Description</th>
</tr>
<tr>
<td>host</td>
<td>local</td>
<td>host name of running JobManager. 'local' runs flink in local mode (default)</td>
<td>flink.execution.mode</td>
<td>local|remote|yarn</td>
<td>execution mode flink.</td>
</tr>
<tr>
<td>port</td>
<td>6123</td>
<td>port of running JobManager</td>
<td>flink.execution.remote.host</td>
<td></td>
<td>host name of job manager in remote mode</td>
</tr>
<tr>
<td>flink.execution.remote.port</td>
<td></td>
<td>port of job manager rest service in remote mode</td>
</tr>
<tr>
<td>flink.yarn.appName</td>
<td></td>
<td>Yarn app name of flink session</td>
</tr>
<tr>
<td>flink.yarn.jm.memory</td>
<td>1g</td>
<td>Memory of Job Manager</td>
</tr>
<tr>
<td>flink.yarn.tm.memory</td>
<td>1g</td>
<td>Memory of Task Manager</td>
</tr>
<tr>
<td>flink.yarn.tm.num</td>
<td>2</td>
<td>Number of Task Manager</td>
</tr>
<tr>
<td>flink.yarn.tm.slot</td>
<td>1</td>
<td>Slot number per Task Manager</td>
</tr>
<tr>
<td>flink.yarn.queue</td>
<td>default</td>
<td>Queue name for yarn app</td>
</tr>
<tr>
<td>zeppelin.flink.printREPLOutput</td>
<td>true</td>
<td>Whether to print repl output</td>
</tr>
<tr>
<td>zeppelin.flink.maxResult</td>
<td>1000</td>
<td>Max rows of result for Sql output</td>
</tr>
<tr>
<td>zeppelin.flink.concurrentBatchSql</td>
<td>10</td>
<td>Max number of batch sql executed concurrently</td>
</tr>
<tr>
<td>zeppelin.flink.concurrentStreamSql</td>
<td>10</td>
<td>Max number of stream sql executed concurrently</td>
</tr>

</table>

For more information about Flink configuration, you can find it [here](https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html).
For more information about Flink configuration, you can find it [here](https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html).

## What can Flink Interpreter do

Zeppelin's Flink interpreter support 3 kinds of interpreter:
* %flink.scala (FlinkScalaInterpreter)
* %flink.bsql (FlinkBatchSqlInterpreter)
* %flink.ssql (FlinkStreamSqlInterpreter)

### FlinkScalaInterpreter
FlinkScalaInterpreter allow user to run scala code in zeppelin. 4 variables are created for users:
* senv (StreamExecutionEnvironment)
* benv (ExecutionEnvironment)
* stenv (StreamTableEnvironment)
* btenv (BatchTableEnvironment)

Users can use these variables to run DataSet/DataStream/BatchTable/StreamTable related job.

## How to test it's working
You can find an example of Flink usage in the Zeppelin Tutorial folder or try the following word count example, by using the [Zeppelin notebook](https://www.zeppelinhub.com/viewer/notebooks/aHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL05GTGFicy96ZXBwZWxpbi1ub3RlYm9va3MvbWFzdGVyL25vdGVib29rcy8yQVFFREs1UEMvbm90ZS5qc29u) from Till Rohrmann's presentation [Interactive data analysis with Apache Flink](http://www.slideshare.net/tillrohrmann/data-analysis-49806564) for Apache Flink Meetup.
e.g. The following is to use benv to run a batch style WordCount

```
%sh
rm 10.txt.utf-8
wget http://www.gutenberg.org/ebooks/10.txt.utf-8
{% highlight scala %}
%flink

val data = benv.fromElements("hello world", "hello flink", "hello hadoop")
data.flatMap(line => line.split("\\s"))
.map(w => (w, 1))
.groupBy(0)
.sum(1)
.print()
{% endhighlight %}
```

The following is to use senv to run a stream style WordCount

```
{% highlight scala %}
%flink
case class WordCount(word: String, frequency: Int)
val bible:DataSet[String] = benv.readTextFile("10.txt.utf-8")
val partialCounts: DataSet[WordCount] = bible.flatMap{
line =>
"""\b\w+\b""".r.findAllIn(line).map(word => WordCount(word, 1))
// line.split(" ").map(word => WordCount(word, 1))
}
val wordCounts = partialCounts.groupBy("word").reduce{
(left, right) => WordCount(left.word, left.frequency + right.frequency)
}
val result10 = wordCounts.first(10).collect()

val data = senv.fromElements("hello world", "hello flink", "hello hadoop")
data.flatMap(line => line.split("\\s"))
.map(w => (w, 1))
.keyBy(0)
.sum(1)
.print

senv.execute()
{% endhighlight %}
```

### FlinkBatchSqlInterpreter

FlinkBatchSqlInterpreter support to run sql to query tables registered in BatchTableEnvironment.

e.g. We can query the `wc` table which is registered in FlinkScalaInterpreter

```
{% highlight scala %}
%flink

val data = benv.fromElements("hello world", "hello flink", "hello hadoop")
val table = data.flatMap(line=>line.split("\\s")).
map(w => (w, 1)).
toTable(btenv, 'word, 'number)
btenv.registerOrReplaceTable("wc", table)

{% endhighlight %}
```


```
{% highlight scala %}

%flink.bsql

select word, count(1) as c from wc group by word

{% endhighlight %}
```

### FlinkStreamSqlInterpreter (not mature yet)


### Other Features

* Job Canceling
- User can cancel job via the job cancel button
* Flink Job url association
- User can link to the flink job url in JM dashboard
* Code completion
- As other interpreters, user can use `tab` for code completion

119 changes: 98 additions & 21 deletions flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
<properties>
<!--library versions-->
<interpreter.name>flink</interpreter.name>
<flink.version>1.5.2</flink.version>
<flink.akka.version>2.3.7</flink.akka.version>
<flink.version>1.8.1-SNAPSHOT</flink.version>
<flink.akka.version>2.4.20</flink.akka.version>
<scala.macros.version>2.0.1</scala.macros.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.8</scala.version>
<scala.version>2.11.12</scala.version>

<!--plugin versions-->
<plugin.scalamaven.version>3.2.2</plugin.scalamaven.version>
Expand Down Expand Up @@ -101,7 +101,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
<version>18.0</version>
</dependency>

<dependency>
Expand All @@ -122,52 +122,87 @@
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
<version>2.14.3</version>
</dependency>

</dependencies>

<build>
<plugins>

<!-- Scala Compiler -->
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>${plugin.scalamaven.version}</version>
<version>3.2.2</version>
<executions>
<!-- Run scala compiler in the process-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) compile phase -->
<execution>
<id>eclipse-add-source</id>
<goals>
<goal>add-source</goal>
</goals>
</execution>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>

<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) test-compile phase -->
<execution>
<id>scala-test-compile</id>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.compile.version}</scalaVersion>
<!--<recompileMode>incremental</recompileMode>-->
<!--<useZincServer>true</useZincServer>-->
<args>
<arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
</args>
<jvmArgs>
<jvmArg>-Xms128m</jvmArg>
<jvmArg>-Xmx512m</jvmArg>
<jvmArg>-Xms1024m</jvmArg>
<jvmArg>-Xmx1024m</jvmArg>
<jvmArg>-XX:PermSize=${PermGen}</jvmArg>
<jvmArg>-XX:MaxPermSize=${MaxPermGen}</jvmArg>
</jvmArgs>
<compilerPlugins combine.children="append">
<compilerPlugin>
<groupId>org.scalamacros</groupId>
<artifactId>paradise_${scala.version}</artifactId>
<version>${scala.macros.version}</version>
</compilerPlugin>
</compilerPlugins>
<javacArgs>
<javacArg>-source</javacArg>
<javacArg>${java.version}</javacArg>
<javacArg>-target</javacArg>
<javacArg>${java.version}</javacArg>
<javacArg>-Xlint:all,-serial,-path,-options</javacArg>
</javacArgs>
</configuration>
</plugin>


<!-- Eclipse Integration -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -275,9 +310,51 @@
<plugin>
<artifactId>maven-resources-plugin</artifactId>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${plugin.shade.version}</version>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>org/datanucleus/**</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>

<artifactSet>
<excludes>
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.scala-lang:scala-compiler</exclude>
<exclude>org.scala-lang:scala-reflect</exclude>
<exclude>org.apache.flink:*</exclude>
</excludes>
</artifactSet>

<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
<outputFile>${project.basedir}/../interpreter/${interpreter.name}/${project.artifactId}-${project.version}.jar</outputFile>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
Expand Down
Loading