Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,27 @@ jobs:
- mvn test -DskipRat -pl $(echo .,zeppelin-interpreter,zeppelin-interpreter-shaded,${INTERPRETERS} | sed 's/!//g') -Pscala-2.10 -B


- name: "Test zeppelin-client integration test"
jdk: "openjdk8"
dist: xenial
before_install:
- export PYTHON=3
- export R=true
- echo "MAVEN_OPTS='-Xms1024M -Xmx2048M -XX:MaxMetaspaceSize=1024m -XX:-UseGCOverheadLimit -Dorg.slf4j.simpleLogger.defaultLogLevel=warn'" >> ~/.mavenrc
- bash -x ./testing/install_external_dependencies.sh
- source ~/.environ
install:
- mvn install -DskipTests -DskipRat -Pintegration -pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown,flink/interpreter,jdbc,shell -am
- mvn clean package -T 2C -pl zeppelin-plugins -amd -B
before_script:
- echo "export ZEPPELIN_HELIUM_REGISTRY=helium" >> conf/zeppelin-env.sh
- echo "export SPARK_PRINT_LAUNCH_COMMAND=true" >> conf/zeppelin-env.sh
- export SPARK_PRINT_LAUNCH_COMMAND=true
- tail conf/zeppelin-env.sh
script:
- mvn test -DskipRat -pl zeppelin-interpreter-integration -Pintegration -Dtest=ZeppelinClientIntegrationTest,ZeppelinClientWithAuthIntegrationTest,ZSessionIntegrationTest


- name: "Test flink 1.10 & flink integration test"
jdk: "openjdk8"
dist: xenial
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@
<module>geode</module>
<module>ksql</module>
<module>sparql</module>
<module>zeppelin-client</module>
<module>zeppelin-client-examples</module>
<module>zeppelin-web</module>
<module>zeppelin-server</module>
<module>zeppelin-jupyter</module>
Expand Down
2 changes: 2 additions & 0 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

<!-- spark versions -->
<spark.version>2.4.5</spark.version>
<protobuf.version>2.5.0</protobuf.version>
<py4j.version>0.10.7</py4j.version>
<spark.scala.version>2.11.12</spark.scala.version>
<spark.scala.binary.version>2.11</spark.scala.binary.version>

Expand Down
83 changes: 83 additions & 0 deletions zeppelin-client-examples/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.9.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-client-examples</artifactId>
<packaging>jar</packaging>
<version>0.9.0-SNAPSHOT</version>
<name>Zeppelin: Client Examples</name>
<description>Zeppelin Client Examples</description>

<dependencies>

<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-client</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.client.examples;

import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.client.ClientConfig;
import org.apache.zeppelin.client.ExecuteResult;
import org.apache.zeppelin.client.websocket.SimpleMessageHandler;
import org.apache.zeppelin.client.ZSession;

import java.util.HashMap;
import java.util.Map;

/**
* Advanced example of run flink streaming sql via session api.
* You can capture the streaming output via SimpleMessageHandler
*/
public class FlinkAdvancedExample {
public static void main(String[] args) {

ZSession session = null;
try {
ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
Map<String, String> intpProperties = new HashMap<>();

session = ZSession.builder()
.setClientConfig(clientConfig)
.setInterpreter("flink")
.setIntpProperties(intpProperties)
.build();

// if MessageHandler is specified, then websocket is enabled.
// you can get continuous output from Zeppelin via websocket.
session.start(new SimpleMessageHandler());
System.out.println("Flink Web UI: " + session.getWeburl());

String code = "benv.fromElements(1,2,3,4,5,6,7,8,9,10).map(e=> {Thread.sleep(1000); e}).print()";
System.out.println("Submit code: " + code);
// use submit to run flink code in non-blocking way.
ExecuteResult result = session.submit(code);
System.out.println("Job status: " + result.getStatus());
while(!result.getStatus().isCompleted()) {
result = session.queryStatement(result.getStatementId());
System.out.println("Job status: " + result.getStatus() + ", progress: " + result.getProgress());
Thread.sleep(1000);
}
System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());

System.out.println("-----------------------------------------------------------------------------");
System.out.println("Submit code: " + code);
result = session.submit("benv.fromElements(1,2,3,4,5,6,7,8,9,10).map(e=> {Thread.sleep(1000); e}).print()");
System.out.println("Job status: " + result.getStatus());
result = session.waitUntilFinished(result.getStatementId());
System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());

System.out.println("-----------------------------------------------------------------------------");
code = "for(i <- 1 to 10) {\n" +
" Thread.sleep(1000)\n" +
" println(i)\n" +
"}";
System.out.println("Submit code: " + code);
result = session.execute(code);
System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());

System.out.println("-----------------------------------------------------------------------------");
String initCode = IOUtils.toString(FlinkAdvancedExample.class.getResource("/init_stream.scala"));
result = session.execute(initCode);
System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());

// run flink ssql
Map<String, String> localProperties = new HashMap<>();
localProperties.put("type", "update");
result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url");
session.waitUntilFinished(result.getStatementId());

result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url");
session.waitUntilRunning(result.getStatementId());
Thread.sleep(10 * 1000);
System.out.println("Try to cancel statement: " + result.getStatementId());
session.cancel(result.getStatementId());
session.waitUntilFinished(result.getStatementId());
System.out.println("Job status: " + result.getStatus());

} catch (Exception e) {
e.printStackTrace();
} finally {
if (session != null) {
try {
session.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package org.apache.zeppelin.client.examples;

import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.client.ClientConfig;
import org.apache.zeppelin.client.websocket.CompositeMessageHandler;
import org.apache.zeppelin.client.ExecuteResult;
import org.apache.zeppelin.client.websocket.StatementMessageHandler;
import org.apache.zeppelin.client.ZSession;

import java.util.HashMap;
import java.util.Map;

/**
* Advanced example of run flink streaming sql via session api.
* You can capture the streaming output via CompositeMessageHandler.
* You can specify StatementMessageHandler(MyStatementMessageHandler1, MyStatementMessageHandler2)
* for each flink job.
*/
public class FlinkAdvancedExample2 {
public static void main(String[] args) {

ZSession session = null;
try {
ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
Map<String, String> intpProperties = new HashMap<>();

session = ZSession.builder()
.setClientConfig(clientConfig)
.setInterpreter("flink")
.setIntpProperties(intpProperties)
.build();

// CompositeMessageHandler allow you to add StatementMessageHandler for each statement.
// otherwise you have to use a global MessageHandler.
session.start(new CompositeMessageHandler());
System.out.println("Flink Web UI: " + session.getWeburl());

System.out.println("-----------------------------------------------------------------------------");
String initCode = IOUtils.toString(FlinkAdvancedExample.class.getResource("/init_stream.scala"));
ExecuteResult result = session.execute(initCode);
System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());

// run flink ssql
Map<String, String> localProperties = new HashMap<>();
localProperties.put("type", "update");
result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url",
new MyStatementMessageHandler1());
session.waitUntilFinished(result.getStatementId());

result = session.submit("ssql", localProperties, "select upper(url), count(1) as pv from log group by url",
new MyStatementMessageHandler2());
session.waitUntilFinished(result.getStatementId());

} catch (Exception e) {
e.printStackTrace();
} finally {
if (session != null) {
try {
session.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

public static class MyStatementMessageHandler1 implements StatementMessageHandler {

@Override
public void onStatementAppendOutput(String statementId, int index, String output) {
System.out.println("MyStatementMessageHandler1, append output: " + output);
}

@Override
public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
System.out.println("MyStatementMessageHandler1, update output: " + output);
}
}

public static class MyStatementMessageHandler2 implements StatementMessageHandler {

@Override
public void onStatementAppendOutput(String statementId, int index, String output) {
System.out.println("MyStatementMessageHandler2, append output: " + output);
}

@Override
public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
System.out.println("MyStatementMessageHandler2, update output: " + output);
}
}
}
Loading