Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 63 additions & 68 deletions spark/src/main/resources/python/zeppelin_pyspark.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@
jsc = intp.getJavaSparkContext()

if jsc.version().startswith("1.2"):
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
elif jsc.version().startswith("1.3"):
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")


java_import(gateway.jvm, "scala.Tuple2")
Expand All @@ -64,77 +64,72 @@
z = intp.getZeppelinContext()

class Logger(object):
def __init__(self):
self.out = ""
def __init__(self):
self.out = ""

def write(self, message):
self.out = self.out + message
def write(self, message):
self.out = self.out + message

def get(self):
return self.out
def get(self):
return self.out

def reset(self):
self.out = ""
def reset(self):
self.out = ""

output = Logger()
sys.stdout = output
sys.stderr = output

while True :
req = intp.getStatements()
try:
stmts = req.statements().split("\n")
jobGroup = req.jobGroup()
single = None
incomplete = None
compiledCode = None

for s in stmts:
if s == None or len(s.strip()) == 0:
continue

# skip comment
if s.strip().startswith("#"):
continue

if s[0] != " " and s[0] != "\t":
req = intp.getStatements()
try:
stmts = req.statements().split("\n")
jobGroup = req.jobGroup()
single = None
incomplete = None
compiledCode = None

for s in stmts:
if s == None or len(s.strip()) == 0:
continue

# skip comment
if s.strip().startswith("#"):
continue

if compiledCode != None:
sc.setJobGroup(jobGroup, "Zeppelin")
eval(compiledCode)
compiledCode = None
single = None
incomplete = None

if single == None:
single = s
else:
single += "\n" + s

try :
compiledCode = compile(single, "<string>", "single")
incomplete = None
except SyntaxError as e:
if str(e).startswith("unexpected EOF while parsing") or str(e).startswith("expected an indented block"):
# incomplete expression
incomplete = e
continue
else :
# actual error
raise e

if incomplete != None:
raise incomplete
raise incomplete

if compiledCode != None:
sc.setJobGroup(jobGroup, "Zeppelin")
eval(compiledCode)
compiledCode = None
single = None
incomplete = None

if single == None:
single = s
else:
single += "\n" + s

try :
compiledCode = compile(single, "<string>", "single")
incomplete = None
except SyntaxError as e:
if str(e).startswith("unexpected EOF while parsing") :
# incomplete expression
incomplete = e
continue
else :
# actual error
raise e

if incomplete != None:
raise incomplete

if compiledCode != None:
sc.setJobGroup(jobGroup, "Zeppelin")
eval(compiledCode)

intp.setStatementsFinished(output.get(), False)
except:
intp.setStatementsFinished(str(sys.exc_info()), True)

output.reset()

sc.setJobGroup(jobGroup, "Zeppelin")
eval(compiledCode)

intp.setStatementsFinished(output.get(), False)
except:
intp.setStatementsFinished(str(sys.exc_info()), True)

output.reset()