diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index c4b369c301c..a0e061f35c3 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -190,7 +190,7 @@ zeppelin.interpreters - org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter,org.apache.zeppelin.pig.PigInterpreter, org.apache.zeppelin.pig.PigQueryInterpreter + org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter,org.apache.zeppelin.pig.PigInterpreter, org.apache.zeppelin.pig.PigQueryInterpreter, org.apache.zeppelin.pig.PigUDFInterpreter Comma separated interpreter configurations. First interpreter become a default diff --git a/docs/interpreter/pig.md b/docs/interpreter/pig.md index 227656bba78..8b876e66e0f 100644 --- a/docs/interpreter/pig.md +++ b/docs/interpreter/pig.md @@ -23,6 +23,10 @@ group: manual Almost the same as `%pig.script`. The only difference is that you don't need to add alias in the last statement. And the display type is table. + - '%pig.udf' + + Java editor for writing pig udf, these java udf will be compiled and built into jars which will be register to pig automatically. + ## Supported runtime mode - Local - MapReduce @@ -95,3 +99,22 @@ foreach c generate group as category, COUNT($1) as count; ``` Data is shared between `%pig` and `%pig.query`, so that you can do some common work in `%pig`, and do different kinds of query based on the data of `%pig`. + +##### pig.udf + +``` +%pig.udf + +import org.apache.pig.data.Tuple; +import org.apache.pig.EvalFunc; +import java.io.IOException; + +class UDF1 extends EvalFunc { + public String exec(Tuple input) throws IOException { + return "1"; + } +} +``` + +If your udf depends on other third party libraries, you need to specify these libraries in pig's dependency in interpreter setting page. These dependencies will be +registered to pig automatically. \ No newline at end of file diff --git a/pig/pom.xml b/pig/pom.xml index a4e5cbb38e1..7baf8c65362 100644 --- a/pig/pom.xml +++ b/pig/pom.xml @@ -114,6 +114,12 @@ ${tez.version} + + com.thoughtworks.qdox + qdox + 2.0-M3 + + junit junit @@ -135,6 +141,16 @@ + + maven-surefire-plugin + 2.17 + + + ${basedir}/src/test/resources + + + + maven-dependency-plugin 2.8 diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigUDFInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigUDFInterpreter.java new file mode 100644 index 00000000000..905c08b1108 --- /dev/null +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigUDFInterpreter.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.pig; + +import com.thoughtworks.qdox.JavaProjectBuilder; +import com.thoughtworks.qdox.model.JavaClass; +import com.thoughtworks.qdox.model.JavaSource; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.FileFileFilter; +import org.apache.commons.lang3.StringUtils; +import org.apache.pig.PigServer; +import org.apache.zeppelin.interpreter.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.tools.*; +import java.io.*; +import java.net.URI; +import java.util.*; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; +import java.util.zip.ZipEntry; + +/** + * Interpreter for Pig UDF + */ +public class PigUDFInterpreter extends Interpreter { + + private static final Logger LOGGER = LoggerFactory.getLogger(PigUDFInterpreter.class); + + private PigServer pigServer; + private String udfBuildClasspath; + + public PigUDFInterpreter(Properties property) { + super(property); + } + + @Override + public void open() { + pigServer = getPigInterpreter().getPigServer(); + // register dependency jars + String localRepo = getProperty("zeppelin.interpreter.localRepo"); + if (localRepo != null && new File(localRepo).exists()) { + File[] jars = new File(localRepo).listFiles(new FileFilter() { + @Override + public boolean accept(File pathname) { + return pathname.isFile() && pathname.getName().endsWith(".jar"); + } + }); + StringBuilder classPathBuilder = new StringBuilder(System.getProperty("java.class.path")); + for (File jar : jars) { + try { + pigServer.registerJar(jar.getAbsolutePath()); + classPathBuilder.append(":" + jar.getAbsolutePath()); + LOGGER.debug("Register dependency jar:" + jar.getAbsolutePath()); + } catch (IOException e) { + LOGGER.error("Fail to register dependency jar", e); + } + } + this.udfBuildClasspath = classPathBuilder.toString(); + LOGGER.debug("udfBuildClass:" + udfBuildClasspath); + } else { + LOGGER.error("localRepo is missing or doesn't exist, " + + "zeppelin.interpreter.localRepo=" + localRepo); + } + } + + @Override + public void close() { + + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + try { + CompiledClass compiledClass = compile(st); + File jarFile = buildJar(compiledClass); + pigServer.registerJar(jarFile.getAbsolutePath()); + + return new InterpreterResult(InterpreterResult.Code.SUCCESS, "Build successfully"); + } catch (Exception e) { + LOGGER.error("Fail to compile/build udf", e); + return new InterpreterResult(InterpreterResult.Code.ERROR, + InterpreterUtils.getMostRelevantMessage(e)); + } + } + + @Override + public void cancel(InterpreterContext context) { + + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + + private PigInterpreter getPigInterpreter() { + LazyOpenInterpreter lazy = null; + PigInterpreter pig = null; + Interpreter p = getInterpreterInTheSameSessionByClassName(PigInterpreter.class.getName()); + + while (p instanceof WrappedInterpreter) { + if (p instanceof LazyOpenInterpreter) { + lazy = (LazyOpenInterpreter) p; + } + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + pig = (PigInterpreter) p; + + if (lazy != null) { + lazy.open(); + } + return pig; + } + + private CompiledClass compile(String code) throws Exception { + + JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + DiagnosticCollector diagnostics = new DiagnosticCollector(); + + // Java parsing + JavaProjectBuilder builder = new JavaProjectBuilder(); + JavaSource src = builder.addSource(new StringReader(code)); + + // get all classes in code (paragraph) + List classes = src.getClasses(); + if (classes.size() != 1) { + throw new Exception("Either you doesn't define class or define multiple classes " + + "in on paragraph."); + } + String className = classes.get(0).getName(); + String packageName = classes.get(0).getPackageName(); + JavaFileObject file = new JavaSourceFromString(className, code.toString()); + Iterable compilationUnits = Arrays.asList(file); + + ByteArrayOutputStream baosOut = new ByteArrayOutputStream(); + ByteArrayOutputStream baosErr = new ByteArrayOutputStream(); + + // Creating new stream to get the output data + PrintStream newOut = new PrintStream(baosOut); + PrintStream newErr = new PrintStream(baosErr); + // Save the old System.out! + PrintStream oldOut = System.out; + PrintStream oldErr = System.err; + // Tell Java to use your special stream + System.setOut(newOut); + System.setErr(newErr); + + List options = new ArrayList<>(); + options.addAll(Arrays.asList("-classpath", udfBuildClasspath)); + JavaCompiler.CompilationTask task = compiler.getTask(null, null, diagnostics, options, null, + compilationUnits); + + // executing the compilation process + boolean success = task.call(); + + // if success is false will get error + if (!success) { + for (Diagnostic diagnostic : diagnostics.getDiagnostics()) { + if (diagnostic.getLineNumber() == -1) { + continue; + } + System.err.println("line " + diagnostic.getLineNumber() + " : " + + diagnostic.getMessage(null)); + } + System.out.flush(); + System.err.flush(); + + System.setOut(oldOut); + System.setErr(oldErr); + logger.error("Exception in Interpreter while compilation", baosErr.toString()); + throw new Exception(baosErr.toString()); + } else { + System.out.flush(); + System.err.flush(); + + // set the stream to old stream + System.setOut(oldOut); + System.setErr(oldErr); + return new CompiledClass(packageName, new File(className + ".class")); + } + } + + private File buildJar(CompiledClass clazz) throws IOException { + File tmpJarFile = File.createTempFile("zeppelin_pig", ".jar"); + FileOutputStream fOut = null; + JarOutputStream jarOut = null; + try { + fOut = new FileOutputStream(tmpJarFile); + jarOut = new JarOutputStream(fOut); + String entryPath = null; + if (clazz.packageName.isEmpty()) { + entryPath = clazz.classFile.getName(); + } else { + entryPath = clazz.packageName.replace(".", "/") + "/" + clazz.classFile.getName(); + } + jarOut.putNextEntry(new JarEntry(entryPath)); + jarOut.write(FileUtils.readFileToByteArray(clazz.classFile)); + jarOut.closeEntry(); + LOGGER.debug("pig udf jar is created under " + tmpJarFile.getAbsolutePath()); + return tmpJarFile; + } catch (IOException e) { + throw e; + } finally { + if (jarOut != null) { + jarOut.close(); + } + if (fOut != null) { + fOut.close(); + } + } + } + + /** + * + */ + public static class JavaSourceFromString extends SimpleJavaFileObject { + final String code; + + JavaSourceFromString(String name, String code) { + super(URI.create("string:///" + name.replace('.', '/') + Kind.SOURCE.extension), Kind.SOURCE); + this.code = code; + } + + @Override + public CharSequence getCharContent(boolean ignoreEncodingErrors) { + return code; + } + } + + /** + * + */ + public static class CompiledClass { + public final String packageName; + public final File classFile; + + public CompiledClass(String packageName, File classFile) { + this.packageName = packageName; + this.classFile = classFile; + } + } +} + + diff --git a/pig/src/main/resources/interpreter-setting.json b/pig/src/main/resources/interpreter-setting.json index 27918ede1bf..340dae2845b 100644 --- a/pig/src/main/resources/interpreter-setting.json +++ b/pig/src/main/resources/interpreter-setting.json @@ -16,9 +16,6 @@ "defaultValue": "false", "description": "flag to include job stats in output" } - }, - "editor": { - "language": "pig" } }, { @@ -38,9 +35,16 @@ "defaultValue": "1000", "description": "max row number for %pig.query" } + } + }, + { + "group": "pig", + "name": "udf", + "className": "org.apache.zeppelin.pig.PigUDFInterpreter", + "properties": { }, "editor": { - "language": "pig" + "language": "java" } } ] diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigUDFInterpreterTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigUDFInterpreterTest.java new file mode 100644 index 00000000000..b7016f7e88f --- /dev/null +++ b/pig/src/test/java/org/apache/zeppelin/pig/PigUDFInterpreterTest.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.pig; + +import org.apache.commons.io.IOUtils; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterResult; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * + */ +public class PigUDFInterpreterTest { + + private PigInterpreter pigInterpreter; + private PigUDFInterpreter udfInterpreter; + private InterpreterContext context; + + @Before + public void setUp() { + Properties properties = new Properties(); + properties.put("zeppelin.pig.execType", "local"); + properties.put("zeppelin.pig.maxResult", "20"); + properties.put("zeppelin.interpreter.localRepo", System.getProperty("zeppelin.pig.localRepo")); + pigInterpreter = new PigInterpreter(properties); + udfInterpreter = new PigUDFInterpreter(properties); + context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null, + null, null); + + List interpreters = new ArrayList(); + interpreters.add(pigInterpreter); + interpreters.add(udfInterpreter); + InterpreterGroup group = new InterpreterGroup(); + group.put("note_id", interpreters); + pigInterpreter.setInterpreterGroup(group); + udfInterpreter.setInterpreterGroup(group); + pigInterpreter.open(); + udfInterpreter.open(); + } + + @Test + public void testSimpleUDFWithoutPackage() throws IOException { + InterpreterResult result = udfInterpreter.interpret( + "import org.apache.pig.data.Tuple;\n" + + "import org.apache.pig.EvalFunc;\n" + + "import java.io.IOException;\n" + + "public class UDF1 extends EvalFunc {\n" + + "public String exec(Tuple input) throws IOException {\n" + + "return \"1\";}\n" + + "}", context); + Assert.assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + Assert.assertEquals("Build successfully", result.message()); + + String content = "1\tandy\n" + + "2\tpeter\n"; + File tmpFile = File.createTempFile("zeppelin", "test"); + FileWriter writer = new FileWriter(tmpFile); + IOUtils.write(content, writer); + writer.close(); + + // simple pig script using this udf + String pigscript = + "DEFINE udf1 UDF1();" + + "a = load '" + tmpFile.getAbsolutePath() + "';" + + "b = foreach a generate udf1($0), $1;" + + "dump b;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.type()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(result.message().contains("(1,andy)\n(1,peter)")); + } + + @Test + public void testSimpleUDFWithPackage() throws IOException { + InterpreterResult result = udfInterpreter.interpret( + "package org.apache.zeppelin.pig;\n" + + "import org.apache.pig.data.Tuple;\n" + + "import org.apache.pig.EvalFunc;\n" + + "import java.io.IOException;\n" + + "public class UDF2 extends EvalFunc {\n" + + "public String exec(Tuple input) throws IOException {\n" + + "return \"2\";}\n" + + "}", context); + Assert.assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + Assert.assertEquals("Build successfully", result.message()); + + String content = "1\tandy\n" + + "2\tpeter\n"; + File tmpFile = File.createTempFile("zeppelin", "test"); + FileWriter writer = new FileWriter(tmpFile); + IOUtils.write(content, writer); + writer.close(); + + // simple pig script using this udf + String pigscript = + "DEFINE udf2 org.apache.zeppelin.pig.UDF2();" + + "a = load '" + tmpFile.getAbsolutePath() + "';" + + "b = foreach a generate udf2($0), $1;" + + "dump b;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.type()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(result.message().contains("(2,andy)\n(2,peter)")); + } + + @Test + public void testUDFWithDependency() throws IOException { + InterpreterResult result = udfInterpreter.interpret( + "package org.apache.zeppelin.pig;\n" + + "import org.apache.pig.data.Tuple;\n" + + "import org.apache.pig.EvalFunc;\n" + + "import java.io.IOException;\n" + + "import org.apache.zeppelin.pig.test.Dummy;\n" + + "public class UDF3 extends EvalFunc {\n" + + "public String exec(Tuple input) throws IOException {\n" + + "return Dummy.VALUE_1;}\n" + + "}", context); + + Assert.assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + Assert.assertEquals("Build successfully", result.message()); + + String content = "1\tandy\n" + + "2\tpeter\n"; + File tmpFile = File.createTempFile("zeppelin", "test"); + FileWriter writer = new FileWriter(tmpFile); + IOUtils.write(content, writer); + writer.close(); + + // simple pig script using this udf + String pigscript = + "DEFINE udf3 org.apache.zeppelin.pig.UDF3();" + + "a = load '" + tmpFile.getAbsolutePath() + "';" + + "b = foreach a generate udf3($0), $1;" + + "dump b;"; + result = pigInterpreter.interpret(pigscript, context); + assertEquals(InterpreterResult.Type.TEXT, result.type()); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertTrue(result.message().contains("(1,andy)\n(1,peter)")); + } + + @Test + public void testInvalidUDF() { + InterpreterResult result = udfInterpreter.interpret( + "import org.apache.pig.data.Tuple;\n" + + "import org.apache.pig.EvalFunc;\n" + + "import java.io.IOException;\n" + + "public class UDF1 extends EvalFunc {" + + "public String exe(Tuple input) throws IOException {" + + "return \"1\";}" + + "}", context); + + Assert.assertEquals(InterpreterResult.Code.ERROR, result.code()); + Assert.assertTrue(result.message() + .contains("UDF1 is not abstract and does not override abstract method exec")); + } +} diff --git a/pig/src/test/resources/log4j.properties b/pig/src/test/resources/log4j.properties index 8daee59d60d..a4f76e61945 100644 --- a/pig/src/test/resources/log4j.properties +++ b/pig/src/test/resources/log4j.properties @@ -20,3 +20,5 @@ log4j.rootLogger = INFO, stdout log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n + +log4j.logger.org.apache.zeppelin.pig=DEBUG \ No newline at end of file diff --git a/pig/src/test/resources/pig_udf_test.jar b/pig/src/test/resources/pig_udf_test.jar new file mode 100644 index 00000000000..4b7978eec12 Binary files /dev/null and b/pig/src/test/resources/pig_udf_test.jar differ diff --git a/zeppelin-web/bower.json b/zeppelin-web/bower.json index 6a101b8e1f2..6acc3b9fe3d 100644 --- a/zeppelin-web/bower.json +++ b/zeppelin-web/bower.json @@ -49,6 +49,7 @@ "src-noconflict/mode-markdown.js", "src-noconflict/mode-sh.js", "src-noconflict/mode-r.js", + "src-noconflict/mode-java.js", "src-noconflict/keybinding-emacs.js", "src-noconflict/ext-language_tools.js", "src-noconflict/theme-chrome.js" diff --git a/zeppelin-web/src/index.html b/zeppelin-web/src/index.html index 8180e687054..758ea09030e 100644 --- a/zeppelin-web/src/index.html +++ b/zeppelin-web/src/index.html @@ -121,6 +121,7 @@ +