-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
* This stdout/stderr-capturing wrapper kernel will become the base for compiled language environments such as C/C++, Go, and Rust.
- Loading branch information
Showing
6 changed files
with
299 additions
and
127 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,132 +1,119 @@ | ||
#! /usr/bin/env python | ||
import asyncio | ||
import io | ||
from namedlist import namedtuple, namedlist | ||
import logging | ||
import os | ||
import subprocess | ||
import signal | ||
import sys | ||
import uuid | ||
import zmq | ||
try: | ||
import simplejson | ||
has_simplejson = True | ||
except ImportError: | ||
has_simplejson = False | ||
|
||
ExceptionInfo = namedtuple('ExceptionInfo', [ | ||
'exc', | ||
('args', tuple()), | ||
('raised_before_exec', False), | ||
('traceback', None), | ||
]) | ||
|
||
Result = namedlist('Result', [ | ||
('stdout', ''), | ||
('stderr', ''), | ||
('media', None), | ||
]) | ||
|
||
|
||
@staticmethod | ||
def _create_excinfo(e, raised_before_exec, tb): | ||
assert isinstance(e, Exception) | ||
return ExceptionInfo(type(e).__name__, e.args, raised_before_exec, tb) | ||
ExceptionInfo.create = _create_excinfo | ||
|
||
|
||
class SockWriter(object): | ||
def __init__(self, sock, cell_id): | ||
self.cell_id_encoded = '{0}'.format(cell_id).encode('ascii') | ||
self.sock = sock | ||
self.buffer = io.StringIO() | ||
|
||
def write(self, s): | ||
if '\n' in s: # flush on occurrence of a newline. | ||
s1, s2 = s.split('\n', maxsplit=1) | ||
s0 = self.buffer.getvalue() | ||
self.sock.send_multipart([self.cell_id_encoded, (s0 + s1 + '\n').encode('utf8')]) | ||
self.buffer.seek(0) | ||
self.buffer.truncate(0) | ||
self.buffer.write(s2) | ||
else: | ||
self.buffer.write(s) | ||
if self.buffer.tell() > 1024: # flush if the buffer is too large. | ||
s0 = self.buffer.getvalue() | ||
self.sock.send_multipart([self.cell_id_encoded, s0.encode('utf8')]) | ||
self.buffer.seek(0) | ||
self.buffer.truncate(0) | ||
# TODO: timeout to flush? | ||
|
||
|
||
class CodeRunner(object): | ||
''' | ||
A thin wrapper for haskell compile & runner. | ||
It creates a temporary file with user haskell code, run it with ``runhaskell``, and | ||
returns the outputs of the execution. | ||
''' | ||
def execute(self, cell_id, src): | ||
# TODO: exception handling. needed? | ||
exceptions = [] | ||
result = Result() | ||
before_exec = True | ||
|
||
def my_excepthook(type_, value, tb): | ||
exceptions.append(ExceptionInfo.create(value, before_exec, tb)) | ||
sys.excepthook = my_excepthook | ||
|
||
# Save haskell code to a temporary file | ||
tmp_fname = "./tmp-code-{}.hs".format(str(uuid.uuid4())) | ||
with open(tmp_fname, 'w') as f: | ||
f.write(src) | ||
import tempfile | ||
|
||
# Compile and run the saved haskell code. | ||
p = subprocess.run("runhaskell {}".format(tmp_fname), shell=True, | ||
stdout=subprocess.PIPE, stderr=subprocess.PIPE) | ||
result.stdout = p.stdout.decode("utf-8") if p.stdout else '' | ||
result.stderr = p.stderr.decode("utf-8") if p.stderr else '' | ||
from namedlist import namedtuple, namedlist | ||
import simplejson as json | ||
import uvloop | ||
import zmq, aiozmq | ||
|
||
# Delete temporary haskell codes | ||
if os.path.exists(tmp_fname): | ||
os.remove(tmp_fname) | ||
|
||
sys.excepthook = sys.__excepthook__ | ||
log = logging.getLogger() | ||
|
||
return exceptions, result | ||
cmdspec = 'runhaskell {mainpath}' | ||
|
||
|
||
if __name__ == '__main__': | ||
# Use the "confined" working directory | ||
os.chdir('/home/work') | ||
''' | ||
A thin wrapper for an external command. | ||
It creates a temporary file with user haskell code, run it with ``runhaskell``, and | ||
returns the outputs of the execution. | ||
''' | ||
async def execute(insock, outsock, code_id, code_data): | ||
loop = asyncio.get_event_loop() | ||
|
||
# Save haskell code to a temporary file | ||
tmpf = tempfile.NamedTemporaryFile() | ||
tmpf.write(code_data) | ||
tmpf.flush() | ||
|
||
try: | ||
# Compile and run the saved haskell code. | ||
proc = await asyncio.create_subprocess_shell(cmdspec.format(mainpath=tmpf.name), | ||
stdin=None, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) | ||
pipe_tasks = [ | ||
loop.create_task(pipe_output(proc.stdout, outsock, 'stdout')), | ||
loop.create_task(pipe_output(proc.stderr, outsock, 'stderr')), | ||
] | ||
await proc.wait() | ||
for t in pipe_tasks: | ||
t.cancel() | ||
await t | ||
except: | ||
log.exception() | ||
finally: | ||
# Close and delete the temporary file. | ||
tmpf.close() | ||
|
||
|
||
async def pipe_output(stream, outsock, target): | ||
assert target in ('stdout', 'stderr') | ||
try: | ||
while True: | ||
data = await stream.read(4096) | ||
if not data: | ||
break | ||
outsock.write([target.encode('ascii'), data]) | ||
await outsock.drain() | ||
except (aiozmq.ZmqStreamClosed, asyncio.CancelledError): | ||
pass | ||
|
||
async def main_loop(): | ||
insock = await aiozmq.create_zmq_stream(zmq.PULL, bind='tcp://*:2000') | ||
outsock = await aiozmq.create_zmq_stream(zmq.PUSH, bind='tcp://*:2001') | ||
print('start serving...') | ||
while True: | ||
try: | ||
data = await insock.read() | ||
code_id = data[0].decode('ascii') | ||
code_data = data[1] | ||
await execute(insock, outsock, code_id, code_data) | ||
outsock.write([b'finished', b'']) | ||
await outsock.drain() | ||
except asyncio.CancelledError: | ||
break | ||
except: | ||
log.exception() | ||
break | ||
insock.close() | ||
outsock.close() | ||
|
||
|
||
def main(): | ||
# Replace stdin with a "null" file | ||
# (trying to read stdin will raise EOFError immediately afterwards.) | ||
sys.stdin = open(os.devnull, 'rb') | ||
|
||
# Initialize context object. | ||
runner = CodeRunner() | ||
# Initialize event loop. | ||
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | ||
loop = asyncio.get_event_loop() | ||
stopped = asyncio.Event() | ||
|
||
# Initialize minimal ZMQ server socket. | ||
ctx = zmq.Context(io_threads=1) | ||
sock = ctx.socket(zmq.REP) | ||
sock.bind('tcp://*:2001') | ||
print('serving at port 2001...') | ||
def interrupt(loop, stopped): | ||
if not stopped.is_set(): | ||
stopped.set() | ||
loop.stop() | ||
else: | ||
print('forced shutdown!', file=sys.stderr) | ||
sys.exit(1) | ||
|
||
loop.add_signal_handler(signal.SIGINT, interrupt, loop, stopped) | ||
loop.add_signal_handler(signal.SIGTERM, interrupt, loop, stopped) | ||
|
||
try: | ||
while True: | ||
data = sock.recv_multipart() | ||
exceptions, result = runner.execute(data[0].decode('ascii'), | ||
data[1].decode('utf8')) | ||
response = { | ||
'stdout': result.stdout, | ||
'stderr': result.stderr, | ||
'media': result.media, | ||
'exceptions': exceptions, | ||
} | ||
json_opts = {} | ||
if has_simplejson: | ||
json_opts['namedtuple_as_object'] = False | ||
sock.send_json(response, **json_opts) | ||
except (KeyboardInterrupt, SystemExit): | ||
pass | ||
main_task = loop.create_task(main_loop()) | ||
loop.run_forever() | ||
# interrupted | ||
main_task.cancel() | ||
loop.run_until_complete(main_task) | ||
finally: | ||
sock.close() | ||
loop.close() | ||
print('exit.') | ||
|
||
if __name__ == '__main__': | ||
main() |
Empty file.
Oops, something went wrong.