diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 5b70d85ccfe..9f11e03bbdc 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -45,13 +45,13 @@ jsc = intp.getJavaSparkContext() if jsc.version().startswith("1.2"): - java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") - java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") - java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext") - java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext") + java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") + java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") + java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext") + java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext") elif jsc.version().startswith("1.3"): - java_import(gateway.jvm, "org.apache.spark.sql.*") - java_import(gateway.jvm, "org.apache.spark.sql.hive.*") + java_import(gateway.jvm, "org.apache.spark.sql.*") + java_import(gateway.jvm, "org.apache.spark.sql.hive.*") java_import(gateway.jvm, "scala.Tuple2") @@ -64,77 +64,72 @@ z = intp.getZeppelinContext() class Logger(object): - def __init__(self): - self.out = "" + def __init__(self): + self.out = "" - def write(self, message): - self.out = self.out + message + def write(self, message): + self.out = self.out + message - def get(self): - return self.out + def get(self): + return self.out - def reset(self): - self.out = "" + def reset(self): + self.out = "" output = Logger() sys.stdout = output sys.stderr = output while True : - req = intp.getStatements() - try: - stmts = req.statements().split("\n") - jobGroup = req.jobGroup() - single = None - incomplete = None - compiledCode = None - - for s in stmts: - if s == None or len(s.strip()) == 0: - continue - - # skip comment - if s.strip().startswith("#"): - continue - - if s[0] != " " and s[0] != "\t": + req = intp.getStatements() + try: + stmts = req.statements().split("\n") + jobGroup = req.jobGroup() + single = None + incomplete = None + compiledCode = None + + for s in stmts: + if s == None or len(s.strip()) == 0: + continue + + # skip comment + if s.strip().startswith("#"): + continue + + if compiledCode != None: + sc.setJobGroup(jobGroup, "Zeppelin") + eval(compiledCode) + compiledCode = None + single = None + incomplete = None + + if single == None: + single = s + else: + single += "\n" + s + + try : + compiledCode = compile(single, "", "single") + incomplete = None + except SyntaxError as e: + if str(e).startswith("unexpected EOF while parsing") or str(e).startswith("expected an indented block"): + # incomplete expression + incomplete = e + continue + else : + # actual error + raise e + if incomplete != None: - raise incomplete + raise incomplete if compiledCode != None: - sc.setJobGroup(jobGroup, "Zeppelin") - eval(compiledCode) - compiledCode = None - single = None - incomplete = None - - if single == None: - single = s - else: - single += "\n" + s - - try : - compiledCode = compile(single, "", "single") - incomplete = None - except SyntaxError as e: - if str(e).startswith("unexpected EOF while parsing") : - # incomplete expression - incomplete = e - continue - else : - # actual error - raise e - - if incomplete != None: - raise incomplete - - if compiledCode != None: - sc.setJobGroup(jobGroup, "Zeppelin") - eval(compiledCode) - - intp.setStatementsFinished(output.get(), False) - except: - intp.setStatementsFinished(str(sys.exc_info()), True) - - output.reset() - + sc.setJobGroup(jobGroup, "Zeppelin") + eval(compiledCode) + + intp.setStatementsFinished(output.get(), False) + except: + intp.setStatementsFinished(str(sys.exc_info()), True) + + output.reset()