diff --git a/.travis.yml b/.travis.yml index 27a0e393cbf..e2a115cc89f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -44,7 +44,7 @@ matrix: # Test all modules with scala 2.10 - jdk: "oraclejdk7" - env: SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.10" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" + env: SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pbeam -Pexamples -Pscala-2.10" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS="" # Test all modules with scala 2.11 - jdk: "oraclejdk7" diff --git a/beam/README.md b/beam/README.md new file mode 100644 index 00000000000..57150a0208a --- /dev/null +++ b/beam/README.md @@ -0,0 +1,25 @@ +# Overview +Beam interpreter for Apache Zeppelin + +# Architecture +Current interpreter implementation supports the static repl. It compiles the code in memory, execute it and redirect the output to zeppelin. + +## Building the Beam Interpreter +You have to first build the Beam interpreter by enable the **beam** profile as follows: + +``` +mvn clean package -Pbeam -DskipTests +``` + +### Notice +- Flink runner comes with binary compiled for scala 2.10. So, currently we support only Scala 2.10 + +### Technical overview + + * Upon starting an interpreter, an instance of `JavaCompiler` is created. + + * When the user runs commands with beam, the `JavaParser` go through the code to get a class that contains the main method. + + * Then it replaces the class name with random class name to avoid overriding while compilation. it creates new out & err stream to get the data in new stream instead of the console, to redirect output to zeppelin. + + * If there is any error during compilation, it can catch and redirect to zeppelin. diff --git a/beam/pom.xml b/beam/pom.xml new file mode 100644 index 00000000000..b0f165647b6 --- /dev/null +++ b/beam/pom.xml @@ -0,0 +1,320 @@ + + + + + 4.0.0 + + + zeppelin + org.apache.zeppelin + 0.7.0-SNAPSHOT + .. + + + org.apache.zeppelin + zeppelin-beam + jar + 0.7.0-SNAPSHOT + Zeppelin: Beam interpreter + + + 2.3.0 + 1.6.2 + 0.2.0-incubating + + + + + io.netty + netty-all + 4.1.1.Final + + + + org.apache.spark + spark-core_2.10 + ${beam.spark.version} + + + slf4j-log4j12 + org.slf4j + + + netty-all + io.netty + + + akka-actor_2.10 + org.spark-project.akka + + + akka-remote_2.10 + org.spark-project.akka + + + akka-slf4j_2.10 + org.spark-project.akka + + + + + + org.apache.spark + spark-streaming_2.10 + ${beam.spark.version} + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${beam.hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.hadoop + hadoop-common + ${beam.hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.hadoop + hadoop-hdfs + ${beam.hadoop.version} + + + + org.apache.hadoop + hadoop-client + ${beam.hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.hadoop + hadoop-annotations + ${beam.hadoop.version} + + + + org.apache.hadoop + hadoop-yarn-common + ${beam.hadoop.version} + + + + org.apache.hadoop + hadoop-mapreduce-client-common + ${beam.hadoop.version} + + + slf4j-log4j12 + org.slf4j + + + + + + com.thoughtworks.qdox + qdox + 2.0-M3 + + + + org.apache.beam + beam-runners-parent + ${beam.beam.version} + pom + + + + org.apache.beam + beam-runners-core-java + ${beam.beam.version} + + + google-http-client-jackson2 + com.google.http-client + + + + + + org.apache.beam + beam-runners-direct-java + ${beam.beam.version} + + + + org.apache.beam + beam-runners-flink_2.10 + ${beam.beam.version} + + + slf4j-log4j12 + org.slf4j + + + netty-all + io.netty + + + + + + org.apache.beam + beam-runners-flink_2.10-examples + ${beam.beam.version} + + + slf4j-log4j12 + org.slf4j + + + + + + javax.servlet + javax.servlet-api + 3.1.0 + + + + org.apache.beam + beam-runners-google-cloud-dataflow-java + ${beam.beam.version} + + + google-http-client-jackson2 + com.google.http-client + + + + + + org.apache.beam + beam-runners-spark + ${beam.beam.version} + jar + + + + ${project.groupId} + zeppelin-interpreter + ${project.version} + provided + + + + org.apache.commons + commons-exec + 1.3 + + + + junit + junit + test + + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.7 + + true + + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/beam + false + false + true + runtime + + + + copy-artifact + package + + copy + + + ${project.build.directory}/../../interpreter/beam + false + false + true + runtime + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + + + diff --git a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java new file mode 100644 index 00000000000..caa91c3e4cb --- /dev/null +++ b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.beam; + +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Beam interpreter + * + */ +public class BeamInterpreter extends Interpreter { + + Logger logger = LoggerFactory.getLogger(BeamInterpreter.class); + + public BeamInterpreter(Properties property) { + super(property); + } + + @Override + public void open() { + + } + + @Override + public void close() { + File dir = new File("."); + // delete all .class files created while compilation process + for (int i = 0; i < dir.list().length; i++) { + File f = dir.listFiles()[i]; + if (f.getAbsolutePath().endsWith(".class")) { + f.delete(); + } + } + } + + @Override + public InterpreterResult interpret(String code, InterpreterContext context) { + + // choosing new name to class containing Main method + String generatedClassName = "C" + UUID.randomUUID().toString().replace("-", ""); + + try { + String res = StaticRepl.execute(generatedClassName, code); + return new InterpreterResult(InterpreterResult.Code.SUCCESS, res); + } catch (Exception e) { + logger.error("Exception in Interpreter while interpret", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage()); + + } + + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List completion(String buf, int cursor) { + return Collections.emptyList(); + } + +} diff --git a/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java b/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java new file mode 100644 index 00000000000..ed81146bb1c --- /dev/null +++ b/beam/src/main/java/org/apache/zeppelin/beam/StaticRepl.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.beam; + +import javax.tools.Diagnostic; +import javax.tools.DiagnosticCollector; +import javax.tools.JavaCompiler; +import javax.tools.JavaCompiler.CompilationTask; +import javax.tools.JavaFileObject; +import javax.tools.SimpleJavaFileObject; +import javax.tools.ToolProvider; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.thoughtworks.qdox.JavaProjectBuilder; +import com.thoughtworks.qdox.model.JavaClass; +import com.thoughtworks.qdox.model.JavaSource; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.PrintStream; +import java.io.StringReader; +import java.lang.reflect.InvocationTargetException; +import java.net.URI; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.List; + +/** + * + * StaticRepl for compling the java code in memory + * + */ +public class StaticRepl { + static Logger logger = LoggerFactory.getLogger(StaticRepl.class); + + public static String execute(String generatedClassName, String code) throws Exception { + + JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + DiagnosticCollector diagnostics = new DiagnosticCollector(); + + // Java parasing + JavaProjectBuilder builder = new JavaProjectBuilder(); + JavaSource src = builder.addSource(new StringReader(code)); + + // get all classes in code (paragraph) + List classes = src.getClasses(); + String mainClassName = null; + + // Searching for class containing Main method + for (int i = 0; i < classes.size(); i++) { + boolean hasMain = false; + + for (int j = 0; j < classes.get(i).getMethods().size(); j++) { + if (classes.get(i).getMethods().get(j).getName().equals("main") && classes.get(i) + .getMethods().get(j).isStatic()) { + mainClassName = classes.get(i).getName(); + hasMain = true; + break; + } + } + if (hasMain == true) { + break; + } + + } + + // if there isn't Main method, will retuen error + if (mainClassName == null) { + logger.error("Exception for Main method", "There isn't any class " + + "containing static main method."); + throw new Exception("There isn't any class containing static main method."); + } + + // replace name of class containing Main method with generated name + code = code.replace(mainClassName, generatedClassName); + + JavaFileObject file = new JavaSourceFromString(generatedClassName, code.toString()); + Iterable compilationUnits = Arrays.asList(file); + + ByteArrayOutputStream baosOut = new ByteArrayOutputStream(); + ByteArrayOutputStream baosErr = new ByteArrayOutputStream(); + + // Creating new stream to get the output data + PrintStream newOut = new PrintStream(baosOut); + PrintStream newErr = new PrintStream(baosErr); + // Save the old System.out! + PrintStream oldOut = System.out; + PrintStream oldErr = System.err; + // Tell Java to use your special stream + System.setOut(newOut); + System.setErr(newErr); + + CompilationTask task = compiler.getTask(null, null, diagnostics, null, null, compilationUnits); + + // executing the compilation process + boolean success = task.call(); + + // if success is false will get error + if (!success) { + for (Diagnostic diagnostic : diagnostics.getDiagnostics()) { + if (diagnostic.getLineNumber() == -1) { + continue; + } + System.err.println("line " + diagnostic.getLineNumber() + " : " + + diagnostic.getMessage(null)); + } + System.out.flush(); + System.err.flush(); + + System.setOut(oldOut); + System.setErr(oldErr); + logger.error("Exception in Interpreter while compilation", baosErr.toString()); + throw new Exception(baosErr.toString()); + } else { + try { + + // creating new class loader + URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { new File("").toURI() + .toURL() }); + // execute the Main method + Class.forName(generatedClassName, true, classLoader) + .getDeclaredMethod("main", new Class[] { String[].class }) + .invoke(null, new Object[] { null }); + + System.out.flush(); + System.err.flush(); + + // set the stream to old stream + System.setOut(oldOut); + System.setErr(oldErr); + + return baosOut.toString(); + + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException + | InvocationTargetException e) { + logger.error("Exception in Interpreter while execution", e); + System.err.println(e); + e.printStackTrace(newErr); + throw new Exception(baosErr.toString(), e); + + } finally { + + System.out.flush(); + System.err.flush(); + + System.setOut(oldOut); + System.setErr(oldErr); + } + } + + } + +} + +class JavaSourceFromString extends SimpleJavaFileObject { + final String code; + + JavaSourceFromString(String name, String code) { + super(URI.create("string:///" + name.replace('.', '/') + Kind.SOURCE.extension), Kind.SOURCE); + this.code = code; + } + + @Override + public CharSequence getCharContent(boolean ignoreEncodingErrors) { + return code; + } +} diff --git a/beam/src/main/resources/interpreter-setting.json b/beam/src/main/resources/interpreter-setting.json new file mode 100644 index 00000000000..7cf57d24835 --- /dev/null +++ b/beam/src/main/resources/interpreter-setting.json @@ -0,0 +1,11 @@ +[ + { + "group": "beam", + "name": "beam", + "className": "org.apache.zeppelin.beam.BeamInterpreter", + "defaultInterpreter": true, + "properties": { + + } + } +] diff --git a/beam/src/main/test/org/apache/zeppelin/beam/BeamInterpreterTest.java b/beam/src/main/test/org/apache/zeppelin/beam/BeamInterpreterTest.java new file mode 100644 index 00000000000..c24ed41ddb9 --- /dev/null +++ b/beam/src/main/test/org/apache/zeppelin/beam/BeamInterpreterTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.beam; + +import static org.junit.Assert.assertEquals; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Properties; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * + * BeamInterpreterTest + * + */ +public class BeamInterpreterTest { + + private static BeamInterpreter beam; + private static InterpreterContext context; + + @BeforeClass + public static void setUp() { + Properties p = new Properties(); + beam = new BeamInterpreter(p); + beam.open(); + context = new InterpreterContext(null, null, null, null, null, null, null, null, null, null, + null); + } + + @AfterClass + public static void tearDown() { + beam.close(); + } + + @Test + public void testStaticRepl() { + + StringWriter writer = new StringWriter(); + PrintWriter out = new PrintWriter(writer); + out.println("public class HelloWorld {"); + out.println(" public static void main(String args[]) {"); + out.println(" System.out.println(\"This is in another java file\");"); + out.println(" }"); + out.println("}"); + out.close(); + + InterpreterResult res = beam.interpret(writer.toString(), context); + + assertEquals(InterpreterResult.Code.SUCCESS, res.code()); + } + + @Test + public void testStaticReplWithoutMain() { + + StringBuffer sourceCode = new StringBuffer(); + sourceCode.append("package org.mdkt;\n"); + sourceCode.append("public class HelloClass {\n"); + sourceCode.append(" public String hello() { return \"hello\"; }"); + sourceCode.append("}"); + InterpreterResult res = beam.interpret(sourceCode.toString(), context); + assertEquals(InterpreterResult.Code.ERROR, res.code()); + } + + @Test + public void testStaticReplWithSyntaxError() { + + StringWriter writer = new StringWriter(); + PrintWriter out = new PrintWriter(writer); + out.println("public class HelloWorld {"); + out.println(" public static void main(String args[]) {"); + out.println(" System.out.prin(\"This is in another java file\");"); + out.println(" }"); + out.println("}"); + out.close(); + InterpreterResult res = beam.interpret(writer.toString(), context); + + assertEquals(InterpreterResult.Code.ERROR, res.code()); + } + +} diff --git a/conf/interpreter-list b/conf/interpreter-list index 17a6f1e4a36..098b3c6c188 100644 --- a/conf/interpreter-list +++ b/conf/interpreter-list @@ -19,6 +19,7 @@ alluxio org.apache.zeppelin:zeppelin-alluxio:0.6.1 Alluxio interpreter angular org.apache.zeppelin:zeppelin-angular:0.6.1 HTML and AngularJS view rendering +beam org.apache.zeppelin:zeppelin-beam:0.6.1 Beam interpreter bigquery org.apache.zeppelin:zeppelin-bigquery:0.6.1 BigQuery interpreter cassandra org.apache.zeppelin:zeppelin-cassandra_2.11:0.6.1 Cassandra interpreter built with Scala 2.11 elasticsearch org.apache.zeppelin:zeppelin-elasticsearch:0.6.1 Elasticsearch interpreter diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 77e0b1f3bcd..9386fd78eba 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -184,13 +184,13 @@ zeppelin.interpreters - org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter + org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter Comma separated interpreter configurations. First interpreter become a default zeppelin.interpreter.group.order - spark,md,angular,sh,livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,scalding,jdbc,hbase,bigquery + spark,md,angular,sh,livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,scalding,jdbc,hbase,bigquery,beam diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html index 9bd9967244a..e86ffb789bb 100644 --- a/docs/_includes/themes/zeppelin/_navigation.html +++ b/docs/_includes/themes/zeppelin/_navigation.html @@ -47,6 +47,7 @@
  • Available Interpreters
  • Alluxio
  • +
  • Beam
  • BigQuery
  • Cassandra
  • Elasticsearch
  • diff --git a/docs/interpreter/beam.md b/docs/interpreter/beam.md new file mode 100644 index 00000000000..cbcd5e37d51 --- /dev/null +++ b/docs/interpreter/beam.md @@ -0,0 +1,124 @@ +--- +layout: page +title: Beam interpreter in Apache Zeppelin +description: Apache Beam is an open source, unified programming model that you can use to create a data processing pipeline. +group: interpreter +--- + + +{% include JB/setup %} + +# Beam interpreter for Apache Zeppelin + +
    + +## Overview +[Apache Beam](http://beam.incubator.apache.org) is an open source unified platform for data processing pipelines. A pipeline can be build using one of the Beam SDKs. +The execution of the pipeline is done by different Runners. Currently, Beam supports Apache Flink Runner, Apache Spark Runner, and Google Dataflow Runner. + +## How to use +Basically, you can write normal Beam java code where you can determine the Runner. You should write the main method inside a class becuase the interpreter invoke this main to execute the pipeline. Unlike Zeppelin normal pattern, each paragraph is considered as a separate job, there isn't any relation to any other paragraph. + +The following is a demonstration of a word count example with data represented in array of strings +But it can read data from files by replacing `Create.of(SENTENCES).withCoder(StringUtf8Coder.of())` with `TextIO.Read.from("path/to/filename.txt")` + +```java +%beam + +// most used imports +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.Create; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.ArrayList; +import org.apache.spark.api.java.*; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.SparkConf; +import org.apache.spark.streaming.*; +import org.apache.spark.SparkContext; +import org.apache.beam.runners.direct.*; +import org.apache.beam.sdk.runners.*; +import org.apache.beam.sdk.options.*; +import org.apache.beam.runners.spark.*; +import org.apache.beam.runners.spark.io.ConsoleIO; +import org.apache.beam.runners.flink.*; +import org.apache.beam.runners.flink.examples.WordCount.Options; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.options.PipelineOptions; + +public class MinimalWordCount { + static List s = new ArrayList<>(); + + static final String[] SENTENCES_ARRAY = new String[] { + "Hadoop is the Elephant King!", + "A yellow and elegant thing.", + "He never forgets", + "Useful data, or lets", + "An extraneous element cling!", + "A wonderful king is Hadoop.", + "The elephant plays well with Sqoop.", + "But what helps him to thrive", + "Are Impala, and Hive,", + "And HDFS in the group.", + "Hadoop is an elegant fellow.", + "An elephant gentle and mellow.", + "He never gets mad,", + "Or does anything bad,", + "Because, at his core, he is yellow", + }; + static final List SENTENCES = Arrays.asList(SENTENCES_ARRAY); + public static void main(String[] args) { + Options options = PipelineOptionsFactory.create().as(Options.class); + options.setRunner(FlinkRunner.class); + Pipeline p = Pipeline.create(options); + p.apply(Create.of(SENTENCES).withCoder(StringUtf8Coder.of())) + .apply("ExtractWords", ParDo.of(new DoFn() { + @Override + public void processElement(ProcessContext c) { + for (String word : c.element().split("[^a-zA-Z']+")) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + })) + .apply(Count. perElement()) + .apply("FormatResults", ParDo.of(new DoFn, String>() { + @Override + public void processElement(DoFn, String>.ProcessContext arg0) + throws Exception { + s.add("\n" + arg0.element().getKey() + "\t" + arg0.element().getValue()); + } + })); + p.run(); + System.out.println("%table word\tcount"); + for (int i = 0; i < s.size(); i++) { + System.out.print(s.get(i)); + } + + } +} + +``` + diff --git a/pom.xml b/pom.xml index d0f43885514..c93f4b8d782 100644 --- a/pom.xml +++ b/pom.xml @@ -579,6 +579,13 @@ + + beam + + beam + + + examples diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE index e599084904b..2ee668a130e 100644 --- a/zeppelin-distribution/src/bin_license/LICENSE +++ b/zeppelin-distribution/src/bin_license/LICENSE @@ -42,10 +42,12 @@ The following components are provided under Apache License. (Apache 2.0) Apache Kylin (http://kylin.apache.org/) (Apache 2.0) Apache Lens (http://lens.apache.org/) (Apache 2.0) Apache Flink (http://flink.apache.org/) + (Apache 2.0) Apache Beam (http://beam.apache.org/) (Apache 2.0) Apache Thrift (http://thrift.apache.org/) (Apache 2.0) Apache Lucene (https://lucene.apache.org/) (Apache 2.0) Apache Zookeeper (org.apache.zookeeper:zookeeper:jar:3.4.5 - http://zookeeper.apache.org/) (Apache 2.0) Chill (com.twitter:chill-java:jar:0.8.0 - https://github.com/twitter/chill/) + (Apache 2.0) QDox (com.thoughtworks.qdox:qdox:jar:2.0-M3 - https://github.com/paul-hammant/qdox/) (Apache 2.0) Codehaus Plexus (org.codehaus.plexus:plexus:jar:1.5.6 - https://codehaus-plexus.github.io/) (Apache 2.0) findbugs jsr305 (com.google.code.findbugs:jsr305:jar:1.3.9 - http://findbugs.sourceforge.net/) (Apache 2.0) Google Guava (com.google.guava:guava:15.0 - https://code.google.com/p/guava-libraries/) @@ -118,7 +120,43 @@ The following components are provided under Apache License. (Apache 2.0) pegdown (org.pegdown:pegdown:1.6.0 - https://github.com/sirthias/pegdown) (Apache 2.0) parboiled-java (org.parboiled:parboiled-java:1.1.7 - https://github.com/sirthias/parboiled) (Apache 2.0) parboiled-core (org.parboiled:parboiled-core:1.1.7 - https://github.com/sirthias/parboiled) - + (Apache 2.0) ZkClient (com.101tec:zkclient:0.7 - https://github.com/sgroschupf/zkclient) + (Apache 2.0) jackson-module-scala (com.fasterxml.jackson.module:jackson-module-scala_2.10:2.4.4 - http://wiki.fasterxml.com/JacksonModuleScala) + (Apache 2.0) BigQuery API v2-rev295-1.22.0 (com.google.apis:google-api-services-bigquery:v2-rev295-1.22.0 - http://nexus.sonatype.org/oss-repository-hosting.html/google-api-services-bigquery) + (Apache 2.0) Google Cloud Debugger API v2-rev8-1.22.0 (com.google.apis:google-api-services-clouddebugger:v2-rev8-1.22.0 - http://nexus.sonatype.org/oss-repository-hosting.html/google-api-services-clouddebugger) + (Apache 2.0) Google Dataflow API v1b3-rev30-1.22.0 (com.google.apis:google-api-services-dataflow:v1b3-rev30-1.22.0 - http://nexus.sonatype.org/oss-repository-hosting.html/google-api-services-dataflow) + (Apache 2.0) Google Cloud Pub/Sub API v1-rev10-1.22.0 (com.google.apis:google-api-services-pubsub:v1-rev10-1.22.0 - http://nexus.sonatype.org/oss-repository-hosting.html/google-api-services-pubsub) + (Apache 2.0) Cloud Storage JSON API v1-rev71-1.22.0 (com.google.apis:google-api-services-storage:v1-rev71-1.22.0 - http://nexus.sonatype.org/oss-repository-hosting.html/google-api-services-storage) + (Apache 2.0) gcsio.jar (com.google.cloud.bigdataoss:gcsio:1.4.5 - https://github.com/GoogleCloudPlatform/BigData-interop/gcsio/) + (Apache 2.0) util (com.google.cloud.bigdataoss:util:1.4.5 - https://github.com/GoogleCloudPlatform/BigData-interop/util/) + (Apache 2.0) Google Guice - Core Library (com.google.inject:guice:3.0 - http://code.google.com/p/google-guice/guice/) + (Apache 2.0) OkHttp (com.squareup.okhttp:okhttp:2.5.0 - https://github.com/square/okhttp/okhttp) + (Apache 2.0) Okio (com.squareup.okio:okio:1.6.0 - https://github.com/square/okio/okio) + (Apache 2.0) config (com.typesafe:config:1.2.1 - https://github.com/typesafehub/config) + (Apache 2.0) akka-actor (com.typesafe.akka:akka-actor_2.10:2.3.7 - http://akka.io/) + (Apache 2.0) akka-remote (com.typesafe.akka:akka-remote_2.10:2.3.7 - http://akka.io/) + (Apache 2.0) akka-slf4j (com.typesafe.akka:akka-slf4j_2.10:2.3.7 - http://akka.io/) + (Apache 2.0) Metrics Core Library (com.yammer.metrics:metrics-core:2.2.0 - http://metrics.codahale.com/metrics-core/) + (Apache 2.0) Commons BeanUtils Bean Collections (commons-beanutils:commons-beanutils-bean-collections:1.8.3 - http://commons.apache.org/beanutils/) + (Apache 2.0) Metrics Core (io.dropwizard.metrics:metrics-core:3.1.0 - http://metrics.codahale.com/metrics-core/) + (Apache 2.0) Graphite Integration for Metrics (io.dropwizard.metrics:metrics-graphite:3.1.0 - http://metrics.codahale.com/metrics-graphite/) + (Apache 2.0) Jackson Integration for Metrics (io.dropwizard.metrics:metrics-json:3.1.0 - http://metrics.codahale.com/metrics-json/) + (Apache 2.0) JVM Integration for Metrics (io.dropwizard.metrics:metrics-jvm:3.1.0 - http://metrics.codahale.com/metrics-jvm/) + (Apache 2.0) Apache Log4j (log4j:log4j:1.2.17 - http://logging.apache.org/log4j/1.2/) + (Apache 2.0) Apache Avro IPC (org.apache.avro:avro-ipc:1.8.1 - http://avro.apache.org) + (Apache 2.0) Apache Avro Mapred API (org.apache.avro:avro-mapred:1.8.1 - http://avro.apache.org/avro-mapred) + (Apache 2.0) Apache Ivy (org.apache.ivy:ivy:2.4.0 - http://ant.apache.org/ivy/) + (Apache 2.0) Apache Kafka (org.apache.kafka:kafka-clients:0.8.2.2 - http://kafka.apache.org) + (Apache 2.0) Apache Kafka (org.apache.kafka:kafka_2.10:0.8.2.2 - http://kafka.apache.org) + (Apache 2.0) mesos (org.apache.mesos:mesos:0.21.1 - http://mesos.apache.org) + (Apache 2.0) Apache Sling JSON Library (org.apache.sling:org.apache.sling.commons.json:2.0.6 - http://sling.apache.org/org.apache.sling.commons.json) + (Apache 2.0) Apache Velocity (org.apache.velocity:velocity:1.7 - http://velocity.apache.org/engine/devel/) + (Apache 2.0) jasper-compiler (tomcat:jasper-compiler:5.5.23 - http://tomcat.apache.org/jasper-compiler) + (Apache 2.0) jasper-runtime (tomcat:jasper-runtime:5.5.23 - http://tomcat.apache.org/jasper-runtime) + (Apache 2.0) Tachyon Project Core (org.tachyonproject:tachyon:0.6.4 - http://tachyonproject.org/tachyon/) + (Apache 2.0) Tachyon Project Client (org.tachyonproject:tachyon-client:0.6.4 - http://tachyonproject.org/tachyon-client/) + (Apache 2.0) javax.inject (javax.inject:javax.inject:1 - http://code.google.com/p/atinject/) + ======================================================================== MIT licenses ======================================================================== @@ -155,7 +193,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version (The MIT License) AnchorJS (https://github.com/bryanbraun/anchorjs) - https://github.com/bryanbraun/anchorjs/blob/master/README.md#license (The MIT License) moment-duration-format v1.3.0 (https://github.com/jsmreese/moment-duration-format) - https://github.com/jsmreese/moment-duration-format/blob/master/LICENSE (The MIT License) github-markdown-css 2.4.0 (https://github.com/sindresorhus/github-markdown-css) - https://github.com/sindresorhus/github-markdown-css/blob/gh-pages/license - + (The MIT License) scopt (com.github.scopt:scopt_2.10:3.2.0 - https://github.com/scopt/scopt) The following components are provided under the MIT License. (The MIT License) Objenesis (org.objenesis:objenesis:2.1 - https://github.com/easymock/objenesis) - Copyright (c) 2006-2015 the original author and authors @@ -180,9 +218,22 @@ The text of each license is also included at licenses/LICENSE-[project]-[version (BSD 3 Clause) highlightjs v9.4.0 (https://highlightjs.org/) - https://github.com/isagalaev/highlight.js/blob/9.4.0/LICENSE (BSD 3 Clause) hamcrest v1.3 (http://hamcrest.org/JavaHamcrest/) - http://opensource.org/licenses/BSD-3-Clause (BSD Style) JLine v2.12.1 (https://github.com/jline/jline2) - https://github.com/jline/jline2/blob/master/LICENSE.txt - - - + (BSD New license) Google Auth Library for Java - Credentials (com.google.auth:google-auth-library-credentials:0.4.0 - https://github.com/google/google-auth-library-java/google-auth-library-credentials) + (BSD New license) Google Auth Library for Java - OAuth2 HTTP (com.google.auth:google-auth-library-oauth2-http:0.4.0 - https://github.com/google/google-auth-library-java/google-auth-library-oauth2-http) + (New BSD license) Protocol Buffer Java API (com.google.protobuf:protobuf-java-util:3.0.0-beta-2 - https://developers.google.com/protocol-buffers/) + (New BSD license) Protocol Buffer JavaNano API (com.google.protobuf.nano:protobuf-javanano:3.0.0-alpha-5 - https://developers.google.com/protocol-buffers/) + (BSD) JSch (com.jcraft:jsch:0.1.42 - http://www.jcraft.com/jsch/) + (BSD 3-Clause) io.grpc:grpc-all (io.grpc:grpc-all:0.14.1 - https://github.com/grpc/grpc-java) + (BSD 3-Clause) io.grpc:grpc-auth (io.grpc:grpc-auth:0.14.1 - https://github.com/grpc/grpc-java) + (BSD 3-Clause) io.grpc:grpc-core (io.grpc:grpc-core:0.14.1 - https://github.com/grpc/grpc-java) + (BSD 3-Clause) io.grpc:grpc-netty (io.grpc:grpc-netty:0.14.1 - https://github.com/grpc/grpc-java) + (BSD 3-Clause) io.grpc:grpc-okhttp (io.grpc:grpc-okhttp:0.14.1 - https://github.com/grpc/grpc-java) + (BSD 3-Clause) io.grpc:grpc-protobuf (io.grpc:grpc-protobuf:0.14.1 - https://github.com/grpc/grpc-java) + (BSD 3-Clause) io.grpc:grpc-protobuf-lite (io.grpc:grpc-protobuf-lite:0.14.1 - https://github.com/grpc/grpc-java) + (BSD 3-Clause) io.grpc:grpc-protobuf-nano (io.grpc:grpc-protobuf-nano:0.14.1 - https://github.com/grpc/grpc-java) + (BSD 3-Clause) io.grpc:grpc-stub (io.grpc:grpc-stub:0.14.1 - https://github.com/grpc/grpc-java) + + The following components are provided under the BSD-style License. (New BSD License) JGit (org.eclipse.jgit:org.eclipse.jgit:jar:4.1.1.201511131810-r - https://eclipse.org/jgit/) @@ -202,6 +253,7 @@ The following components are provided under the BSD-style License. (New BSD License) Markdown4j (org.commonjava.googlecode.markdown4j:markdown4j:jar:2.2-cj-1.0 - https://code.google.com/p/markdown4j/) (New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/) (New BSD License) Py4J (net.sf.py4j:py4j:0.10.1 - http://py4j.sourceforge.net/) - https://github.com/bartdag/py4j/blob/0.10.1/LICENSE.txt + (New BSD License) Markdown4j (org.commonjava.googlecode.markdown4j:markdown4j:jar:2.2-cj-1.0 - https://code.google.com/p/markdown4j/) (BSD 3 Clause) Paranamer (com.thoughtworks.paranamer:paranamer:jar:2.6) - https://github.com/paul-hammant/paranamer/blob/paranamer-parent-2.6/LICENSE.txt (BSD 3 Clause) netlib core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core) (BSD 3 Clause) JPMML-Model (org.jpmml:pmml-model:1.2.7 - https://github.com/jpmml/jpmml-model) @@ -225,8 +277,15 @@ The following components are provided under the CDDL License. (CDDL 1.1) Jersey (com.sun.jersey:jersey:jar:1.9 - https://jersey.java.net/) (CDDL 1.1) jersey-core (org.glassfish.jersey.core:jersey-core:2.22.2 - https://jersey.java.net/) (CDDL 1.1) hk2 (org.glassfish.hk2 - https://hk2.java.net/2.5.0-b03/) - - + (CDDL 1.1) jersey-core (com.sun.jersey:jersey-core:1.9 - https://jersey.java.net/jersey-core/) + (CDDL 1.1) jersey-json (com.sun.jersey:jersey-json:1.9 - https://jersey.java.net/jersey-json/) + (CDDL 1.1) jersey-server (com.sun.jersey:jersey-server:1.9 - https://jersey.java.net/jersey-server/) + (CDDL 1.1) jersey-guice (com.sun.jersey.contribs:jersey-guice:1.9 - https://jersey.java.net/jersey-contribs/jersey-guice/) + (CDDL 1.1) JAXB RI (com.sun.xml.bind:jaxb-impl:2.2.3-1 - http://jaxb.java.net/) + (CDDL 1.0) Java Servlet API (javax.servlet:javax.servlet-api:3.1.0 - http://servlet-spec.java.net) + (CDDL 1.1) (GPL2 w/ CPE) JAXB API bundle for GlassFish V3 (javax.xml.bind:jaxb-api:2.2.2 - https://jaxb.dev.java.net/) + (CDDL 1.0) (GNU General Public Library) Streaming API for XML (javax.xml.stream:stax-api:1.0-2 - no url defined) + ======================================================================== EPL license @@ -271,3 +330,5 @@ Creative Commons CC0 (http://creativecommons.org/publicdomain/zero/1.0/) (CC0 1.0 Universal) JSR166e (com.twitter:jsr166e:1.1.0 - http://github.com/twitter/jsr166e) (Public Domain, per Creative Commons CC0) HdrHistogram (org.hdrhistogram:HdrHistogram:2.1.6 - http://hdrhistogram.github.io/HdrHistogram/) + (Public Domain) XZ for Java (org.tukaani:xz:1.0 - http://tukaani.org/xz/java.html) + (Public Domain) AOP alliance (aopalliance:aopalliance:1.0 - http://aopalliance.sourceforge.net) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 75efe3956f8..4a80c7efaf5 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -529,7 +529,8 @@ public static enum ConfVars { + "org.apache.zeppelin.scalding.ScaldingInterpreter," + "org.apache.zeppelin.jdbc.JDBCInterpreter," + "org.apache.zeppelin.hbase.HbaseInterpreter," - + "org.apache.zeppelin.bigquery.BigQueryInterpreter"), + + "org.apache.zeppelin.bigquery.BigQueryInterpreter," + + "org.apache.zeppelin.beam.BeamInterpreter"), ZEPPELIN_INTERPRETER_JSON("zeppelin.interpreter.setting", "interpreter-setting.json"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"), @@ -537,7 +538,7 @@ public static enum ConfVars { ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10), ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh," + "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch," - + "scalding,jdbc,hbase,bigquery"), + + "scalding,jdbc,hbase,bigquery,beam"), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"), // use specified notebook (id) as homescreen