diff --git a/docs/interpreter/scalding.md b/docs/interpreter/scalding.md index 44303125bdc..ec5608bf3b3 100644 --- a/docs/interpreter/scalding.md +++ b/docs/interpreter/scalding.md @@ -28,10 +28,49 @@ In a notebook, to enable the **Scalding** interpreter, click on the **Gear** ico ### Configuring the Interpreter -Zeppelin comes with a pre-configured Scalding interpreter in local mode, so you do not need to install anything. + +Scalding interpreter runs in two modes: + +* local +* hdfs + +In the local mode, you can access files on the local server and scalding transformation are done locally. + +In hdfs mode you can access files in HDFS and scalding transformation are run as hadoop map-reduce jobs. + +Zeppelin comes with a pre-configured Scalding interpreter in local mode. + +To run the scalding interpreter in the hdfs mode you have to do the following: + +**Set the classpath with ZEPPELIN\_CLASSPATH\_OVERRIDES** + +In conf/zeppelin_env.sh, you have to set +ZEPPELIN_CLASSPATH_OVERRIDES to the contents of 'hadoop classpath' +and directories with custom jar files you need for your scalding commands. + +**Set arguments to the scalding repl** + +The default arguments are: "--local --repl" + +For hdfs mode you need to add: "--hdfs --repl" + +If you want to add custom jars, you need to add: +"-libjars directory/*:directory/*" + +For reducer estimation, you need to add something like: +"-Dscalding.reducer.estimator.classes=com.twitter.scalding.reducer_estimation.InputSizeReducerEstimator" + +**Set max.open.instances** + +If you want to control the maximum number of open interpreters, you have to select "scoped" interpreter for note +option and set max.open.instances argument. ### Testing the Interpreter -In example, by using the [Alice in Wonderland](https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial, we will count words (of course!), and plot a graph of the top 10 words in the book. + +#### Local mode + +In example, by using the [Alice in Wonderland](https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial, +we will count words (of course!), and plot a graph of the top 10 words in the book. ``` %scalding @@ -71,7 +110,44 @@ print("%table " + table) If you click on the icon for the pie chart, you should be able to see a chart like this: ![Scalding - Pie - Chart](../assets/themes/zeppelin/img/docs-img/scalding-pie.png) -### Current Status & Future Work -The current implementation of the Scalding interpreter does not support canceling jobs, or fine-grained progress updates. -The pre-configured Scalding interpreter only supports Scalding in local mode. Hadoop mode for Scalding is currently unsupported, and will be future work (contributions welcome!). +#### HDFS mode + +**Test mode** + +``` +%scalding +mode +``` +This command should print: + +``` +res4: com.twitter.scalding.Mode = Hdfs(true,Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml) +``` + + +**Test HDFS read** + +``` +val testfile = TypedPipe.from(TextLine("/user/x/testfile")) +testfile.dump +``` + +This command should print the contents of the hdfs file /user/x/testfile. + +**Test map-reduce job** + +``` +val testfile = TypedPipe.from(TextLine("/user/x/testfile")) +val a = testfile.groupAll.size.values +a.toList + +``` + +This command should create a map reduce job. + +### Future Work +* Better user feedback (hadoop url, progress updates) +* Ability to cancel jobs +* Ability to dynamically load jars without restarting the interpreter +* Multiuser scalability (run scalding interpreters on different servers) diff --git a/scalding/pom.xml b/scalding/pom.xml index 2b04f666623..a3b3b58fe63 100644 --- a/scalding/pom.xml +++ b/scalding/pom.xml @@ -34,9 +34,9 @@ http://zeppelin.apache.org - 2.10.4 - 2.3.0 - 0.15.1-RC13 + 2.11.8 + 2.6.0 + 0.16.1-RC1 @@ -45,6 +45,11 @@ Concurrent Maven Repo http://conjars.org/repo + + twitter + Twitter Maven Repo + http://maven.twttr.com + @@ -69,13 +74,43 @@ com.twitter - scalding-core_2.10 + scalding-core_2.11 + ${scalding.version} + + + + com.twitter + scalding-args_2.11 + ${scalding.version} + + + + com.twitter + scalding-date_2.11 + ${scalding.version} + + + + com.twitter + scalding-commons_2.11 + ${scalding.version} + + + + com.twitter + scalding-avro_2.11 ${scalding.version} com.twitter - scalding-repl_2.10 + scalding-parquet_2.11 + ${scalding.version} + + + + com.twitter + scalding-repl_2.11 ${scalding.version} @@ -97,12 +132,12 @@ ${scala.version} - org.apache.hadoop - hadoop-common + hadoop-client ${hadoop.version} + diff --git a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java index e808e702c74..4542297e296 100644 --- a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java +++ b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java @@ -17,35 +17,29 @@ package org.apache.zeppelin.scalding; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.PrintStream; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.net.URL; -import java.net.URLClassLoader; - +import com.twitter.scalding.ScaldingILoop; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.Console; -import scala.Some; -import scala.None; -import scala.tools.nsc.Settings; -import scala.tools.nsc.settings.MutableSettings.BooleanSetting; -import scala.tools.nsc.settings.MutableSettings.PathSetting; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; /** * Scalding interpreter for Zeppelin. Based off the Spark interpreter code. @@ -54,16 +48,29 @@ public class ScaldingInterpreter extends Interpreter { Logger logger = LoggerFactory.getLogger(ScaldingInterpreter.class); + static final String ARGS_STRING = "args.string"; + static final String ARGS_STRING_DEFAULT = "--local --repl"; + static final String MAX_OPEN_INSTANCES = "max.open.instances"; + static final String MAX_OPEN_INSTANCES_DEFAULT = "50"; + public static final List NO_COMPLETION = Collections.unmodifiableList(new ArrayList()); static { - Interpreter.register("scalding", ScaldingInterpreter.class.getName()); + Interpreter.register( + "scalding", + "scalding", + ScaldingInterpreter.class.getName(), + new InterpreterPropertyBuilder() + .add(ARGS_STRING, ARGS_STRING_DEFAULT, "Arguments for scalding REPL") + .add(MAX_OPEN_INSTANCES, MAX_OPEN_INSTANCES_DEFAULT, + "Maximum number of open interpreter instances") + .build()); } + static int numOpenInstances = 0; private ScaldingILoop interpreter; private ByteArrayOutputStream out; - private Map binder; public ScaldingInterpreter(Properties property) { super(property); @@ -72,104 +79,34 @@ public ScaldingInterpreter(Properties property) { @Override public void open() { - URL[] urls = getClassloaderUrls(); - - // Very nice discussion about how scala compiler handle classpath - // https://groups.google.com/forum/#!topic/scala-user/MlVwo2xCCI0 - - /* - * > val env = new nsc.Settings(errLogger) > env.usejavacp.value = true > val p = new - * Interpreter(env) > p.setContextClassLoader > Alternatively you can set the class path through - * nsc.Settings.classpath. - * - * >> val settings = new Settings() >> settings.usejavacp.value = true >> - * settings.classpath.value += File.pathSeparator + >> System.getProperty("java.class.path") >> - * val in = new Interpreter(settings) { >> override protected def parentClassLoader = - * getClass.getClassLoader >> } >> in.setContextClassLoader() - */ - Settings settings = new Settings(); - - // set classpath for scala compiler - PathSetting pathSettings = settings.classpath(); - String classpath = ""; - List paths = currentClassPath(); - for (File f : paths) { - if (classpath.length() > 0) { - classpath += File.pathSeparator; - } - classpath += f.getAbsolutePath(); + numOpenInstances = numOpenInstances + 1; + String maxOpenInstancesStr = property.getProperty(MAX_OPEN_INSTANCES, + MAX_OPEN_INSTANCES_DEFAULT); + int maxOpenInstances = 50; + try { + maxOpenInstances = Integer.valueOf(maxOpenInstancesStr); + } catch (Exception e) { + logger.error("Error reading max.open.instances", e); } - - if (urls != null) { - for (URL u : urls) { - if (classpath.length() > 0) { - classpath += File.pathSeparator; - } - classpath += u.getFile(); - } + logger.info("max.open.instances = {}", maxOpenInstances); + if (numOpenInstances > maxOpenInstances) { + logger.error("Reached maximum number of open instances"); + return; } - - pathSettings.v_$eq(classpath); - settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings); - - - // set classloader for scala compiler - settings.explicitParentLoader_$eq(new Some(Thread.currentThread() - .getContextClassLoader())); - BooleanSetting b = (BooleanSetting) settings.usejavacp(); - b.v_$eq(true); - settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b); - - /* Scalding interpreter */ - PrintStream printStream = new PrintStream(out); - interpreter = new ScaldingILoop(null, new PrintWriter(out)); - interpreter.settings_$eq(settings); - interpreter.createInterpreter(); - - interpreter.intp(). - interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); - binder = (Map) getValue("_binder"); - binder.put("out", printStream); - } - - private Object getValue(String name) { - Object ret = interpreter.intp().valueOfTerm(name); - if (ret instanceof None) { - return null; - } else if (ret instanceof Some) { - return ((Some) ret).get(); + logger.info("Opening instance {}", numOpenInstances); + logger.info("property: {}", property); + String argsString = property.getProperty(ARGS_STRING, ARGS_STRING_DEFAULT); + String[] args; + if (argsString == null) { + args = new String[0]; } else { - return ret; - } - } - - private List currentClassPath() { - List paths = classPath(Thread.currentThread().getContextClassLoader()); - String[] cps = System.getProperty("java.class.path").split(File.pathSeparator); - if (cps != null) { - for (String cp : cps) { - paths.add(new File(cp)); - } - } - return paths; - } - - private List classPath(ClassLoader cl) { - List paths = new LinkedList(); - if (cl == null) { - return paths; + args = argsString.split(" "); } + logger.info("{}", Arrays.toString(args)); - if (cl instanceof URLClassLoader) { - URLClassLoader ucl = (URLClassLoader) cl; - URL[] urls = ucl.getURLs(); - if (urls != null) { - for (URL url : urls) { - paths.add(new File(url.getFile())); - } - } - } - return paths; + PrintWriter printWriter = new PrintWriter(out, true); + interpreter = ZeppelinScaldingShell.getRepl(args, printWriter); + interpreter.createInterpreter(); } @Override @@ -180,12 +117,49 @@ public void close() { @Override public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { - logger.info("Running Scalding command '" + cmd + "'"); - + String user = contextInterpreter.getAuthenticationInfo().getUser(); + logger.info("Running Scalding command: user: {} cmd: '{}'", user, cmd); + + if (interpreter == null) { + logger.error( + "interpreter == null, open may not have been called because max.open.instances reached"); + return new InterpreterResult(Code.ERROR, + "interpreter == null\n" + + "open may not have been called because max.open.instances reached" + ); + } if (cmd == null || cmd.trim().length() == 0) { return new InterpreterResult(Code.SUCCESS); } - return interpret(cmd.split("\n"), contextInterpreter); + InterpreterResult interpreterResult = new InterpreterResult(Code.ERROR); + if (property.getProperty(ARGS_STRING).contains("hdfs")) { + UserGroupInformation ugi = null; + try { + ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); + } catch (IOException e) { + logger.error("Error creating UserGroupInformation", e); + return new InterpreterResult(Code.ERROR, e.getMessage()); + } + try { + // Make variables final to avoid "local variable is accessed from within inner class; + // needs to be declared final" exception in JDK7 + final String cmd1 = cmd; + final InterpreterContext contextInterpreter1 = contextInterpreter; + PrivilegedExceptionAction action = + new PrivilegedExceptionAction() { + public InterpreterResult run() throws Exception { + return interpret(cmd1.split("\n"), contextInterpreter1); + } + }; + interpreterResult = ugi.doAs(action); + } catch (Exception e) { + logger.error("Error running command with ugi.doAs", e); + return new InterpreterResult(Code.ERROR, e.getMessage()); + } + } else { + interpreterResult = interpret(cmd.split("\n"), contextInterpreter); + } + return interpreterResult; } public InterpreterResult interpret(String[] lines, InterpreterContext context) { @@ -205,8 +179,13 @@ public InterpreterResult interpretInput(String[] lines) { } linesToRun[lines.length] = "print(\"\")"; - Console.setOut((java.io.PrintStream) binder.get("out")); out.reset(); + + // Moving two lines below from open() to this function. + // If they are in open output is incomplete. + PrintStream printStream = new PrintStream(out, true); + Console.setOut(printStream); + Code r = null; String incomplete = ""; boolean inComment = false; @@ -261,7 +240,6 @@ public InterpreterResult interpretInput(String[] lines) { incomplete = ""; } } - if (r == Code.INCOMPLETE) { return new InterpreterResult(r, "Incomplete expression"); } else { @@ -306,4 +284,5 @@ public Scheduler getScheduler() { public List completion(String buf, int cursor) { return NO_COMPLETION; } + } diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala deleted file mode 100644 index bd23c4937f5..00000000000 --- a/scalding/src/main/scala/org/apache/zeppelin/scalding/ScaldingILoop.scala +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scalding; - -import java.io.{BufferedReader, File, FileReader} - -import scala.tools.nsc.GenericRunnerSettings -import scala.tools.nsc.interpreter.{ILoop, IR, JPrintWriter} - - -/** - * A class providing Scalding specific commands for inclusion in the Scalding REPL. - * This is currently forked from Scalding, but should eventually make it into Scalding itself: - * https://github.com/twitter/scalding/blob/develop/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala - */ - class ScaldingILoop(in0: Option[BufferedReader], out: JPrintWriter) - extends ILoop(in0, out) { - // def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) - // def this() = this(None, new JPrintWriter(Console.out, true)) - - settings = new GenericRunnerSettings({ s => echo(s) }) - - override def printWelcome() { - val fc = Console.YELLOW - val wc = Console.RED - def wrapFlames(s: String) = s.replaceAll("[()]+", fc + "$0" + wc) - echo(fc + - " ( \n" + - " )\\ ) ( ( \n" + - "(()/( ) )\\ )\\ ) ( ( ( \n" + - " /(_)) ( ( /( ((_)(()/( )\\ ( )\\))( \n" + - "(_)) )\\ )( )) _ ((_)(( ) )\\ ) (( ))\\ \n".replaceAll("_", wc + "_" + fc) + wc + - wrapFlames("/ __|((_) ((_)_ | | _| | (_) _(_(( (_()_) \n") + - wrapFlames("\\__ \\/ _| / _` || |/ _` | | || ' \\))/ _` \\ \n") + - "|___/\\__| \\__,_||_|\\__,_| |_||_||_| \\__, | \n" + - " |___/ ") - } - - /** - * Commands specific to the Scalding REPL. To define a new command use one of the following - * factory methods: - * - `LoopCommand.nullary` for commands that take no arguments - * - `LoopCommand.cmd` for commands that take one string argument - * - `LoopCommand.varargs` for commands that take multiple string arguments - */ - private val scaldingCommands: List[LoopCommand] = List() - - /** - * Change the shell prompt to read scalding> - * - * @return a prompt string to use for this REPL. - */ - override def prompt: String = Console.BLUE + "\nscalding> " + Console.RESET - - private[this] def addImports(ids: String*): IR.Result = - if (ids.isEmpty) IR.Success - else intp.interpret("import " + ids.mkString(", ")) - - /** - * Search for files with the given name in all directories from current directory - * up to root. - */ - private def findAllUpPath(filename: String): List[File] = - Iterator.iterate(System.getProperty("user.dir"))(new File(_).getParent) - .takeWhile(_ != "/") - .flatMap(new File(_).listFiles.filter(_.toString.endsWith(filename))) - .toList - - /** - * Gets the list of commands that this REPL supports. - * - * @return a list of the command supported by this REPL. - */ - override def commands: List[LoopCommand] = super.commands ++ scaldingCommands - - protected def imports: List[String] = List( - "com.twitter.scalding._", - "com.twitter.scalding.ReplImplicits._", - "com.twitter.scalding.ReplImplicitContext._", - "com.twitter.scalding.ReplState._") - - override def createInterpreter() { - super.createInterpreter() - intp.beQuietDuring { - addImports(imports: _*) - - settings match { - case s: GenericRunnerSettings => - findAllUpPath(".scalding_repl").reverse.foreach { - f => s.loadfiles.appendToValue(f.toString) - } - case _ => () - } - } - } -} diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala new file mode 100644 index 00000000000..b847eba0012 --- /dev/null +++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scalding + +/** + * Stores REPL state + */ + +import cascading.flow.FlowDef +import com.twitter.scalding.BaseReplState +import scala.concurrent.{ ExecutionContext => ConcurrentExecutionContext } +import scala.concurrent.Future +import scala.util.{Failure, Success} + +object ZeppelinReplState extends BaseReplState { + override def shell = ZeppelinScaldingShell +} + +/** + * Implicit FlowDef and Mode, import in the REPL to have the global context implicitly + * used everywhere. + */ +object ZeppelinReplImplicitContext { + /** Implicit execution context for using the Execution monad */ + implicit val executionContext = ConcurrentExecutionContext.global + /** Implicit repl state used for ShellPipes */ + implicit def stateImpl = ZeppelinReplState + /** Implicit flowDef for this Scalding shell session. */ + implicit def flowDefImpl = ZeppelinReplState.flowDef + /** Defaults to running in local mode if no mode is specified. */ + implicit def modeImpl = ZeppelinReplState.mode + implicit def configImpl = ZeppelinReplState.config +} diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala new file mode 100644 index 00000000000..9be01998695 --- /dev/null +++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scalding + +import java.io.BufferedReader +import com.twitter.scalding.ScaldingILoop + +import scala.tools.nsc.interpreter._ + +/** + * TBD + */ +class ZeppelinScaldingILoop(in: Option[BufferedReader], out: JPrintWriter) + extends ScaldingILoop(in, out) { + + override protected def imports = List( + "com.twitter.scalding.{ ScaldingILoop => ScaldingScaldingILoop, ScaldingShell => ScaldingScaldingShell, _ }", + // ReplImplicits minus fields API parts (esp FieldConversions) + """com.twitter.scalding.ReplImplicits.{ + iterableToSource, + keyedListLikeToShellTypedPipe, + typedPipeToShellTypedPipe, + valuePipeToShellValuePipe + }""", + "com.twitter.scalding.ReplImplicits", + "org.apache.zeppelin.scalding.ZeppelinReplImplicitContext._", + "org.apache.zeppelin.scalding.ZeppelinReplState", + "org.apache.zeppelin.scalding.ZeppelinReplState._" + ) + +} diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala new file mode 100644 index 00000000000..29e5f835cb4 --- /dev/null +++ b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.scalding + +import com.twitter.scalding._ +import com.twitter.scalding.typed.TypedPipe +import scala.tools.nsc.{GenericRunnerCommand} +import scala.tools.nsc.interpreter._ + +/** + * TBD + */ +object ZeppelinScaldingShell extends BaseScaldingShell { + + override def replState = ZeppelinReplState + + def getRepl(args: Array[String], out: JPrintWriter): ScaldingILoop = { + + val argsExpanded = ExpandLibJarsGlobs(args) + val ShellArgs(cfg, mode, cmdArgs) = parseModeArgs(argsExpanded) + + // Process command line arguments into a settings object, and use that to start the REPL. + // We ignore params we don't care about - hence error function is empty + val command = new GenericRunnerCommand(cmdArgs, _ => ()) + + // inherit defaults for embedded interpretter (needed for running with SBT) + // (TypedPipe chosen arbitrarily, just needs to be something representative) + command.settings.embeddedDefaults[TypedPipe[String]] + + // if running from the assembly, need to explicitly tell it to use java classpath + if (args.contains("--repl")) command.settings.usejavacp.value = true + + command.settings.classpath.append(System.getProperty("java.class.path")) + + // Force the repl to be synchronous, so all cmds are executed in the same thread + command.settings.Yreplsync.value = true + + val repl = new ZeppelinScaldingILoop(None, out) + scaldingREPL = Some(repl) + replState.mode = mode + replState.customConfig = replState.customConfig ++ (mode match { + case _: HadoopMode => cfg + case _ => Config.empty + }) + + // if in Hdfs mode, store the mode to enable switching between Local and Hdfs + mode match { + case m @ Hdfs(_, _) => replState.storedHdfsMode = Some(m) + case _ => () + } + + repl.settings = command.settings + return repl; + + } + +} diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java index 08c67dacd3b..7ffbd975b3e 100644 --- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java +++ b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java @@ -57,6 +57,7 @@ public void setUp() throws Exception { if (repl == null) { Properties p = new Properties(); + p.setProperty(ScaldingInterpreter.ARGS_STRING, "--local --repl"); repl = new ScaldingInterpreter(p); repl.open(); @@ -119,7 +120,7 @@ public void testBasicScalding() { "val salesPipe = TypedPipe.from(salesList)\n" + "val results = salesPipe.map{x => (1, Set(x.state), x.sale)}.\n" + " groupAll.sum.values.map{ case(count, set, sum) => (count, set.size, sum) }\n" + - "results.dump", + "results.dump", context).code()); }