Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 7 additions & 35 deletions spark/src/main/resources/python/zeppelin_pyspark.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ def reset(self):
try:
stmts = req.statements().split("\n")
jobGroup = req.jobGroup()
single = None
incomplete = None
compiledCode = None
final_code = None

for s in stmts:
if s == None or len(s.strip()) == 0:
Expand All @@ -97,38 +95,13 @@ def reset(self):
if s.strip().startswith("#"):
continue

if s[0] != " " and s[0] != "\t":
if incomplete != None:
raise incomplete

if compiledCode != None:
sc.setJobGroup(jobGroup, "Zeppelin")
eval(compiledCode)
compiledCode = None
single = None
incomplete = None

if single == None:
single = s
if final_code:
final_code += "\n" + s
else:
single += "\n" + s

try :
compiledCode = compile(single, "<string>", "single")
incomplete = None
except SyntaxError as e:
if str(e).startswith("unexpected EOF while parsing") :
# incomplete expression
incomplete = e
continue
else :
# actual error
raise e

if incomplete != None:
raise incomplete

if compiledCode != None:
final_code = s

if final_code:
compiledCode = compile(final_code, "<string>", "exec")
sc.setJobGroup(jobGroup, "Zeppelin")
eval(compiledCode)

Expand All @@ -137,4 +110,3 @@ def reset(self):
intp.setStatementsFinished(str(sys.exc_info()), True)

output.reset()