diff --git a/base-python3-minimal/base_run.py b/base-python3-minimal/base_run.py
index 33a903166..2aeb589bf 100644
--- a/base-python3-minimal/base_run.py
+++ b/base-python3-minimal/base_run.py
@@ -12,6 +12,7 @@
import aiozmq
import uvloop
import zmq
+import simplejson as json
log = logging.getLogger()
@@ -55,6 +56,7 @@ def __init__(self):
self.child_env = {}
self.insock = None
self.outsock = None
+ self.loop = None
@abstractmethod
async def build(self, build_cmd):
@@ -68,6 +70,10 @@ async def execute(self, exec_cmd):
async def query(self, code_text):
"""Run user code by creating a temporary file and compiling it."""
+ @abstractmethod
+ async def complete(self, completion_data):
+ """Return the list of strings to be shown in the auto-complete list."""
+
async def run_subproc(self, cmd):
"""A thin wrapper for an external command."""
loop = asyncio.get_event_loop()
@@ -132,6 +138,11 @@ async def main_loop(self):
elif op_type == 'input': # query-mode
await self.query(text)
self.outsock.write([b'finished', b''])
+ elif op_type == 'complete': # auto-completion
+ completion_data = json.loads(text)
+ completions = await self.complete(completion_data)
+ self.outsock.write([b'completion', json.dumps(completions)])
+ self.outsock.write([b'finished', b''])
await self.outsock.drain()
except asyncio.CancelledError:
break
@@ -151,6 +162,7 @@ def run(self):
# Initialize event loop.
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop = asyncio.get_event_loop()
+ self.loop = loop
stopped = asyncio.Event()
def interrupt(loop, stopped):
diff --git a/c/Dockerfile b/c/Dockerfile
index e0320dcf3..b2ce93546 100644
--- a/c/Dockerfile
+++ b/c/Dockerfile
@@ -2,7 +2,7 @@ FROM lablup/kernel-base-python3-minimal:latest
MAINTAINER DevOps "devops@lablup.com"
# Install minimal C compile environments
-RUN apk add --no-cache gcc musl-dev
+RUN apk add --no-cache gcc musl-dev make
COPY run.py /home/sorna/run.py
COPY policy.yml /home/sorna/policy.yml
diff --git a/c/run.py b/c/run.py
index 80c5019ad..2ae8dc6e4 100644
--- a/c/run.py
+++ b/c/run.py
@@ -87,6 +87,9 @@ async def query(self, code_text):
f'./main')
await self.run_subproc(cmd)
+ async def complete(self, data):
+ return []
+
if __name__ == '__main__':
CProgramRunner().run()
diff --git a/cpp/Dockerfile b/cpp/Dockerfile
index 6440323da..aa9bf5ab6 100644
--- a/cpp/Dockerfile
+++ b/cpp/Dockerfile
@@ -2,7 +2,7 @@ FROM lablup/kernel-base-python3-minimal:latest
MAINTAINER DevOps "devops@lablup.com"
# Install minimal C++ compile environments
-RUN apk add --no-cache g++ libstdc++
+RUN apk add --no-cache g++ libstdc++ make
COPY run.py /home/sorna/run.py
COPY policy.yml /home/sorna/policy.yml
diff --git a/cpp/run.py b/cpp/run.py
index 6189e8b33..0f481631a 100644
--- a/cpp/run.py
+++ b/cpp/run.py
@@ -85,6 +85,9 @@ async def query(self, code_text):
f'&& ./main')
await self.run_subproc(cmd)
+ async def complete(self, data):
+ return []
+
if __name__ == '__main__':
CPPProgramRunner().run()
diff --git a/java8/run.py b/java8/run.py
index d5398c8fc..0e69eca06 100644
--- a/java8/run.py
+++ b/java8/run.py
@@ -102,6 +102,9 @@ async def query(self, code_text):
if filename:
os.remove(filename)
+ async def complete(self, data):
+ return []
+
def main():
JavaProgramRunner().run()
diff --git a/python3/Dockerfile b/python3/Dockerfile
index 17c2ae468..a16f44095 100644
--- a/python3/Dockerfile
+++ b/python3/Dockerfile
@@ -49,6 +49,7 @@ RUN echo 'import matplotlib.pyplot' > /tmp/matplotlib-fontcache.py \
COPY policy.yml /home/sorna/
COPY run.py /home/sorna/
+COPY inproc_run.py /home/sorna/
LABEL io.sorna.envs.corecount="OPENBLAS_NUM_THREADS,NPROC"
LABEL io.sorna.features "batch query uid-match user-input"
diff --git a/python3/inproc_run.py b/python3/inproc_run.py
new file mode 100644
index 000000000..0fabea058
--- /dev/null
+++ b/python3/inproc_run.py
@@ -0,0 +1,172 @@
+import builtins as builtin_mod
+import code
+import enum
+from functools import partial
+import logging
+import os
+import sys
+import time
+import traceback
+import types
+
+import simplejson as json
+from IPython.core.completer import Completer
+
+import getpass
+
+from sorna.types import (
+ InputRequest, ControlRecord, ConsoleRecord, MediaRecord, HTMLRecord, CompletionRecord,
+)
+
+
+log = logging.getLogger()
+
+class StreamToEmitter:
+
+ def __init__(self, emitter, stream_type):
+ self.emit = emitter
+ self.stream_type = stream_type
+
+ def write(self, s):
+ self.emit(ConsoleRecord(self.stream_type, s))
+
+ def flush(self):
+ pass
+
+
+class PythonInprocRunner:
+ '''
+ A thin wrapper for REPL.
+
+ It creates a dummy module that user codes run and keeps the references to user-created objects
+ (e.g., variables and functions).
+ '''
+
+ def __init__(self, runner):
+ self.runner = runner
+ self.insock = runner.insock
+ self.outsock = runner.outsock
+
+ self.stdout_emitter = StreamToEmitter(self.emit, 'stdout')
+ self.stderr_emitter = StreamToEmitter(self.emit, 'stderr')
+
+ # Initialize user module and namespaces.
+ user_module = types.ModuleType('__main__',
+ doc='Automatically created module for the interactive shell.')
+ user_module.__dict__.setdefault('__builtin__', builtin_mod)
+ user_module.__dict__.setdefault('__builtins__', builtin_mod)
+ self.user_module = user_module
+ self.user_ns = user_module.__dict__
+
+ self.completer = Completer(namespace=self.user_ns, global_namespace={})
+ self.completer.limit_to__all__ = True
+
+ def handle_input(self, prompt=None, password=False):
+ if prompt is None:
+ prompt = 'Password: ' if password else ''
+ # Use synchronous version of ZeroMQ sockets
+ raw_insock = self.insock.transport._zmq_sock
+ raw_outsock = self.outsock.transport._zmq_sock
+ raw_outsock.send_multipart([
+ b'stdout',
+ prompt.encode('utf8'),
+ ])
+ raw_outsock.send_multipart([
+ b'waiting-input',
+ json.dumps({'is_password': password}).encode('utf8'),
+ ])
+ data = raw_insock.recv_multipart()
+ return data[1].decode('utf8')
+
+ def get_completions(self, data):
+ state = 0
+ matches = []
+ while True:
+ ret = self.completer.complete(data['line'], state)
+ if ret is None:
+ break
+ matches.append(ret)
+ state += 1
+ return matches
+
+ def emit(self, record):
+ if isinstance(record, ConsoleRecord):
+ assert record.target in ('stdout', 'stderr')
+ self.outsock.write([
+ record.target.encode('ascii'),
+ record.data.encode('utf8'),
+ ])
+ elif isinstance(record, MediaRecord):
+ self.outsock.write([
+ b'media',
+ json.dumps({
+ 'type': record.type,
+ 'data': record.data,
+ }).encode('utf8'),
+ ])
+ elif isinstance(record, HTMLRecord):
+ self.outsock.write([
+ b'html',
+ record.html.encode('utf8'),
+ ])
+ elif isinstance(record, InputRequest):
+ self.outsock.write([
+ b'waiting-input',
+ json.dumps({
+ 'is_password': record.is_password,
+ }).encode('utf8'),
+ ])
+ elif isinstance(record, CompletionRecord):
+ self.outsock.write([
+ b'completion',
+ json.dumps(record.matches).encode('utf8'),
+ ])
+ elif isinstance(record, ControlRecord):
+ self.outsock.write([
+ record.event.encode('ascii'),
+ b'',
+ ])
+ else:
+ raise TypeError('Unsupported record type.')
+
+ @staticmethod
+ def strip_traceback(tb):
+ while tb is not None:
+ frame_summary = traceback.extract_tb(tb, limit=1)[0]
+ if frame_summary[0] == '':
+ break
+ tb = tb.tb_next
+ return tb
+
+ def query(self, code_text):
+ # Set Sorna Media handler
+ self.user_module.__builtins__._sorna_emit = self.emit
+
+ # Override interactive input functions
+ self.user_module.__builtins__.input = self.handle_input
+ getpass.getpass = partial(self.handle_input, password=True)
+
+ try:
+ code_obj = code.compile_command(code_text, symbol='exec')
+ except (OverflowError, IndentationError, SyntaxError,
+ ValueError, TypeError, MemoryError) as e:
+ exc_type, exc_val, tb = sys.exc_info()
+ user_tb = type(self).strip_traceback(tb)
+ err_str = ''.join(traceback.format_exception(exc_type, exc_val, user_tb))
+ hdr_str = 'Traceback (most recent call last):\n' if not err_str.startswith('Traceback ') else ''
+ self.emit(ConsoleRecord('stderr', hdr_str + err_str))
+ self.emit(ControlRecord('finished'))
+ else:
+ sys.stdout, orig_stdout = self.stdout_emitter, sys.stdout
+ sys.stderr, orig_stderr = self.stderr_emitter, sys.stderr
+ try:
+ exec(code_obj, self.user_ns)
+ except Exception as e:
+ # strip the first frame
+ exc_type, exc_val, tb = sys.exc_info()
+ user_tb = type(self).strip_traceback(tb)
+ traceback.print_exception(exc_type, exc_val, user_tb)
+ finally:
+ self.emit(ControlRecord('finished'))
+ sys.stdout = orig_stdout
+ sys.stderr = orig_stderr
diff --git a/python3/run.py b/python3/run.py
index dfe285dc2..57c54c461 100644
--- a/python3/run.py
+++ b/python3/run.py
@@ -1,211 +1,75 @@
#! /usr/bin/env python
-import builtins as builtin_mod
-import code
-import enum
-from functools import partial
import logging
import os
+from pathlib import Path
import sys
-import time
-import traceback
-import types
-from namedlist import namedtuple, namedlist
-import simplejson as json
-import zmq
-from IPython.core.completer import Completer
-
-import getpass
-
-from sorna.types import (
- InputRequest, ControlRecord, ConsoleRecord, MediaRecord, HTMLRecord, CompletionRecord,
-)
-
-log = logging.getLogger('code-runner')
-
-
-class StreamToEmitter:
-
- def __init__(self, emitter, stream_type):
- self.emit = emitter
- self.stream_type = stream_type
-
- def write(self, s):
- self.emit(ConsoleRecord(self.stream_type, s))
-
- def flush(self):
- pass
-
-
-class CodeRunner:
- '''
- A thin wrapper for REPL.
-
- It creates a dummy module that user codes run and keeps the references to user-created objects
- (e.g., variables and functions).
- '''
-
- def __init__(self, api_version=1):
- self.api_version = api_version
- self.input_supported = (api_version >= 2)
-
- ctx = zmq.Context.instance()
- self.input_stream = ctx.socket(zmq.PULL)
- self.input_stream.bind('tcp://*:2000')
- self.output_stream = ctx.socket(zmq.PUSH)
- self.output_stream.bind('tcp://*:2001')
-
- self.stdout_emitter = StreamToEmitter(self.emit, 'stdout')
- self.stderr_emitter = StreamToEmitter(self.emit, 'stderr')
-
- # Initialize user module and namespaces.
- user_module = types.ModuleType('__main__',
- doc='Automatically created module for the interactive shell.')
- user_module.__dict__.setdefault('__builtin__', builtin_mod)
- user_module.__dict__.setdefault('__builtins__', builtin_mod)
- self.user_module = user_module
- self.user_ns = user_module.__dict__
-
- self.completer = Completer(namespace=self.user_ns, global_namespace={})
- self.completer.limit_to__all__ = True
-
- def handle_input(self, prompt=None, password=False):
- if prompt is None:
- prompt = 'Password: ' if password else ''
- self.emit(ConsoleRecord('stdout', prompt))
- self.emit(InputRequest(is_password=password))
- data = self.input_stream.recv_multipart()
- return data[1].decode('utf8')
-
- def handle_complete(self, data):
- args = json.loads(data)
- state = 0
- matches = []
- while True:
- ret = self.completer.complete(args['line'], state)
- if ret is None:
- break
- matches.append(ret)
- state += 1
- return matches
-
- def emit(self, record):
- if isinstance(record, ConsoleRecord):
- assert record.target in ('stdout', 'stderr')
- self.output_stream.send_multipart([
- record.target.encode('ascii'),
- record.data.encode('utf8'),
- ])
- elif isinstance(record, MediaRecord):
- self.output_stream.send_multipart([
- b'media',
- json.dumps({
- 'type': record.type,
- 'data': record.data,
- }).encode('utf8'),
- ])
- elif isinstance(record, HTMLRecord):
- self.output_stream.send_multipart([
- b'html',
- record.html.encode('utf8'),
- ])
- elif isinstance(record, InputRequest):
- self.output_stream.send_multipart([
- b'waiting-input',
- json.dumps({
- 'is_password': record.is_password,
- }).encode('utf8'),
- ])
- elif isinstance(record, CompletionRecord):
- self.output_stream.send_multipart([
- b'completion',
- json.dumps(record.matches).encode('utf8'),
- ])
- elif isinstance(record, ControlRecord):
- self.output_stream.send_multipart([
- record.event.encode('ascii'),
- b'',
- ])
+sys.path.insert(0, os.path.abspath('.'))
+from base_run import BaseRunner
+from inproc_run import PythonInprocRunner
+
+log = logging.getLogger()
+
+DEFAULT_PYFLAGS = ''
+CHILD_ENV = {
+ 'TERM': 'xterm',
+ 'LANG': 'C.UTF-8',
+ 'SHELL': '/bin/ash',
+ 'USER': 'work',
+ 'HOME': '/home/work',
+ 'PATH': '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin',
+ 'LD_PRELOAD': '/home/sorna/patch-libs.so',
+}
+
+
+class PythonProgramRunner(BaseRunner):
+
+ log_prefix = 'python-kernel'
+
+ def __init__(self):
+ super().__init__()
+ self.child_env.update(CHILD_ENV)
+ self.inproc_runner = None
+
+ async def build(self, build_cmd):
+ if build_cmd is None or build_cmd == '':
+ # skipped
+ return
+ elif build_cmd == '*':
+ if Path('setup.py').is_file():
+ cmd = f'python {DEFAULT_PYFLAGS} setup.py develop'
+ await self.run_subproc(cmd)
+ else:
+ log.warning('skipping build phase due to missing "setup.py" file')
else:
- raise TypeError('Unsupported record type.')
-
- @staticmethod
- def strip_traceback(tb):
- while tb is not None:
- frame_summary = traceback.extract_tb(tb, limit=1)[0]
- if frame_summary[0] == '':
- break
- tb = tb.tb_next
- return tb
-
- def run(self):
- json_opts = {'namedtuple_as_object': False}
- while True:
- data = self.input_stream.recv_multipart()
- code_id = data[0].decode('ascii')
- code_text = data[1].decode('utf8')
- log.debug(f'recv input: {code_id}, {code_text}')
- if code_id == 'complete':
- try:
- completions = self.handle_complete(code_text)
- self.emit(CompletionRecord(completions))
- log.debug('completion-sent')
- except:
- log.exception('Unexpected error during handle_complete()')
- finally:
- self.emit(ControlRecord('finished'))
- log.debug('completion-finished')
- continue
- self.user_module.__builtins__._sorna_emit = self.emit
- if self.input_supported:
- self.user_module.__builtins__.input = self.handle_input
- getpass.getpass = partial(self.handle_input, password=True)
- try:
- code_obj = code.compile_command(code_text, symbol='exec')
- except (OverflowError, IndentationError, SyntaxError,
- ValueError, TypeError, MemoryError) as e:
- exc_type, exc_val, tb = sys.exc_info()
- user_tb = type(self).strip_traceback(tb)
- err_str = ''.join(traceback.format_exception(exc_type, exc_val, user_tb))
- hdr_str = 'Traceback (most recent call last):\n' if not err_str.startswith('Traceback ') else ''
- self.emit(ConsoleRecord('stderr', hdr_str + err_str))
- self.emit(ControlRecord('finished'))
+ await self.run_subproc(build_cmd)
+
+ async def execute(self, exec_cmd):
+ if exec_cmd is None or exec_cmd == '':
+ # skipped
+ return
+ elif exec_cmd == '*':
+ if Path('main.py').is_file():
+ cmd = f'python {DEFAULT_PYFLAGS} main.py'
+ await self.run_subproc(cmd)
else:
- sys.stdout, orig_stdout = self.stdout_emitter, sys.stdout
- sys.stderr, orig_stderr = self.stderr_emitter, sys.stderr
- try:
- exec(code_obj, self.user_ns)
- except Exception as e:
- # strip the first frame
- exc_type, exc_val, tb = sys.exc_info()
- user_tb = type(self).strip_traceback(tb)
- traceback.print_exception(exc_type, exc_val, user_tb)
- finally:
- self.emit(ControlRecord('finished'))
- sys.stdout = orig_stdout
- sys.stderr = orig_stderr
-
-
-def main():
- log = logging.getLogger('main')
+ log.error('cannot find the main script ("main.py").')
+ else:
+ await self.run_subproc(exec_cmd)
- # Replace stdin with a "null" file
- # (trying to read stdin will raise EOFError immediately afterwards.)
- sys.stdin = open(os.devnull, 'rb')
+ async def query(self, code_text):
+ if self.inproc_runner is None:
+ self.inproc_runner = PythonInprocRunner(self)
+ # NOTE: In-process code execution is a blocking operation.
+ self.inproc_runner.query(code_text)
- # Initialize context object.
- runner = CodeRunner(api_version=2)
- try:
- runner.run()
- except (KeyboardInterrupt, SystemExit):
- pass
- except:
- log.exception('unexpected error')
- finally:
- print('exit.')
+ async def complete(self, data):
+ try:
+ return self.inproc_runner.get_completions(data)
+ except:
+ log.exception('unexpected error')
if __name__ == '__main__':
- logging.basicConfig(level=logging.DEBUG)
- main()
+ PythonProgramRunner().run()