From 82d680c6cb08eefc58c1a19a5ec5dc2c52869f02 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Wed, 30 Aug 2017 23:30:19 +0900 Subject: [PATCH] refs #34, #1, #44, #45, #46: Update base runner and all descendant images * Base runner now offers the auto-completion handler stub. * "python3" kernel now uses the base runner and thus support the batch mode with the default build command: "python setup.py develop" (skipped with a warning when setup.py is not uploaded together) * C/C++ kernels now have the make utility. Ooops. --- base-python3-minimal/base_run.py | 12 ++ c/Dockerfile | 2 +- c/run.py | 3 + cpp/Dockerfile | 2 +- cpp/run.py | 3 + java8/run.py | 3 + python3/Dockerfile | 1 + python3/inproc_run.py | 172 ++++++++++++++++++++ python3/run.py | 260 ++++++++----------------------- 9 files changed, 258 insertions(+), 200 deletions(-) create mode 100644 python3/inproc_run.py diff --git a/base-python3-minimal/base_run.py b/base-python3-minimal/base_run.py index 33a903166..2aeb589bf 100644 --- a/base-python3-minimal/base_run.py +++ b/base-python3-minimal/base_run.py @@ -12,6 +12,7 @@ import aiozmq import uvloop import zmq +import simplejson as json log = logging.getLogger() @@ -55,6 +56,7 @@ def __init__(self): self.child_env = {} self.insock = None self.outsock = None + self.loop = None @abstractmethod async def build(self, build_cmd): @@ -68,6 +70,10 @@ async def execute(self, exec_cmd): async def query(self, code_text): """Run user code by creating a temporary file and compiling it.""" + @abstractmethod + async def complete(self, completion_data): + """Return the list of strings to be shown in the auto-complete list.""" + async def run_subproc(self, cmd): """A thin wrapper for an external command.""" loop = asyncio.get_event_loop() @@ -132,6 +138,11 @@ async def main_loop(self): elif op_type == 'input': # query-mode await self.query(text) self.outsock.write([b'finished', b'']) + elif op_type == 'complete': # auto-completion + completion_data = json.loads(text) + completions = await self.complete(completion_data) + self.outsock.write([b'completion', json.dumps(completions)]) + self.outsock.write([b'finished', b'']) await self.outsock.drain() except asyncio.CancelledError: break @@ -151,6 +162,7 @@ def run(self): # Initialize event loop. asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) loop = asyncio.get_event_loop() + self.loop = loop stopped = asyncio.Event() def interrupt(loop, stopped): diff --git a/c/Dockerfile b/c/Dockerfile index e0320dcf3..b2ce93546 100644 --- a/c/Dockerfile +++ b/c/Dockerfile @@ -2,7 +2,7 @@ FROM lablup/kernel-base-python3-minimal:latest MAINTAINER DevOps "devops@lablup.com" # Install minimal C compile environments -RUN apk add --no-cache gcc musl-dev +RUN apk add --no-cache gcc musl-dev make COPY run.py /home/sorna/run.py COPY policy.yml /home/sorna/policy.yml diff --git a/c/run.py b/c/run.py index 80c5019ad..2ae8dc6e4 100644 --- a/c/run.py +++ b/c/run.py @@ -87,6 +87,9 @@ async def query(self, code_text): f'./main') await self.run_subproc(cmd) + async def complete(self, data): + return [] + if __name__ == '__main__': CProgramRunner().run() diff --git a/cpp/Dockerfile b/cpp/Dockerfile index 6440323da..aa9bf5ab6 100644 --- a/cpp/Dockerfile +++ b/cpp/Dockerfile @@ -2,7 +2,7 @@ FROM lablup/kernel-base-python3-minimal:latest MAINTAINER DevOps "devops@lablup.com" # Install minimal C++ compile environments -RUN apk add --no-cache g++ libstdc++ +RUN apk add --no-cache g++ libstdc++ make COPY run.py /home/sorna/run.py COPY policy.yml /home/sorna/policy.yml diff --git a/cpp/run.py b/cpp/run.py index 6189e8b33..0f481631a 100644 --- a/cpp/run.py +++ b/cpp/run.py @@ -85,6 +85,9 @@ async def query(self, code_text): f'&& ./main') await self.run_subproc(cmd) + async def complete(self, data): + return [] + if __name__ == '__main__': CPPProgramRunner().run() diff --git a/java8/run.py b/java8/run.py index d5398c8fc..0e69eca06 100644 --- a/java8/run.py +++ b/java8/run.py @@ -102,6 +102,9 @@ async def query(self, code_text): if filename: os.remove(filename) + async def complete(self, data): + return [] + def main(): JavaProgramRunner().run() diff --git a/python3/Dockerfile b/python3/Dockerfile index 17c2ae468..a16f44095 100644 --- a/python3/Dockerfile +++ b/python3/Dockerfile @@ -49,6 +49,7 @@ RUN echo 'import matplotlib.pyplot' > /tmp/matplotlib-fontcache.py \ COPY policy.yml /home/sorna/ COPY run.py /home/sorna/ +COPY inproc_run.py /home/sorna/ LABEL io.sorna.envs.corecount="OPENBLAS_NUM_THREADS,NPROC" LABEL io.sorna.features "batch query uid-match user-input" diff --git a/python3/inproc_run.py b/python3/inproc_run.py new file mode 100644 index 000000000..0fabea058 --- /dev/null +++ b/python3/inproc_run.py @@ -0,0 +1,172 @@ +import builtins as builtin_mod +import code +import enum +from functools import partial +import logging +import os +import sys +import time +import traceback +import types + +import simplejson as json +from IPython.core.completer import Completer + +import getpass + +from sorna.types import ( + InputRequest, ControlRecord, ConsoleRecord, MediaRecord, HTMLRecord, CompletionRecord, +) + + +log = logging.getLogger() + +class StreamToEmitter: + + def __init__(self, emitter, stream_type): + self.emit = emitter + self.stream_type = stream_type + + def write(self, s): + self.emit(ConsoleRecord(self.stream_type, s)) + + def flush(self): + pass + + +class PythonInprocRunner: + ''' + A thin wrapper for REPL. + + It creates a dummy module that user codes run and keeps the references to user-created objects + (e.g., variables and functions). + ''' + + def __init__(self, runner): + self.runner = runner + self.insock = runner.insock + self.outsock = runner.outsock + + self.stdout_emitter = StreamToEmitter(self.emit, 'stdout') + self.stderr_emitter = StreamToEmitter(self.emit, 'stderr') + + # Initialize user module and namespaces. + user_module = types.ModuleType('__main__', + doc='Automatically created module for the interactive shell.') + user_module.__dict__.setdefault('__builtin__', builtin_mod) + user_module.__dict__.setdefault('__builtins__', builtin_mod) + self.user_module = user_module + self.user_ns = user_module.__dict__ + + self.completer = Completer(namespace=self.user_ns, global_namespace={}) + self.completer.limit_to__all__ = True + + def handle_input(self, prompt=None, password=False): + if prompt is None: + prompt = 'Password: ' if password else '' + # Use synchronous version of ZeroMQ sockets + raw_insock = self.insock.transport._zmq_sock + raw_outsock = self.outsock.transport._zmq_sock + raw_outsock.send_multipart([ + b'stdout', + prompt.encode('utf8'), + ]) + raw_outsock.send_multipart([ + b'waiting-input', + json.dumps({'is_password': password}).encode('utf8'), + ]) + data = raw_insock.recv_multipart() + return data[1].decode('utf8') + + def get_completions(self, data): + state = 0 + matches = [] + while True: + ret = self.completer.complete(data['line'], state) + if ret is None: + break + matches.append(ret) + state += 1 + return matches + + def emit(self, record): + if isinstance(record, ConsoleRecord): + assert record.target in ('stdout', 'stderr') + self.outsock.write([ + record.target.encode('ascii'), + record.data.encode('utf8'), + ]) + elif isinstance(record, MediaRecord): + self.outsock.write([ + b'media', + json.dumps({ + 'type': record.type, + 'data': record.data, + }).encode('utf8'), + ]) + elif isinstance(record, HTMLRecord): + self.outsock.write([ + b'html', + record.html.encode('utf8'), + ]) + elif isinstance(record, InputRequest): + self.outsock.write([ + b'waiting-input', + json.dumps({ + 'is_password': record.is_password, + }).encode('utf8'), + ]) + elif isinstance(record, CompletionRecord): + self.outsock.write([ + b'completion', + json.dumps(record.matches).encode('utf8'), + ]) + elif isinstance(record, ControlRecord): + self.outsock.write([ + record.event.encode('ascii'), + b'', + ]) + else: + raise TypeError('Unsupported record type.') + + @staticmethod + def strip_traceback(tb): + while tb is not None: + frame_summary = traceback.extract_tb(tb, limit=1)[0] + if frame_summary[0] == '': + break + tb = tb.tb_next + return tb + + def query(self, code_text): + # Set Sorna Media handler + self.user_module.__builtins__._sorna_emit = self.emit + + # Override interactive input functions + self.user_module.__builtins__.input = self.handle_input + getpass.getpass = partial(self.handle_input, password=True) + + try: + code_obj = code.compile_command(code_text, symbol='exec') + except (OverflowError, IndentationError, SyntaxError, + ValueError, TypeError, MemoryError) as e: + exc_type, exc_val, tb = sys.exc_info() + user_tb = type(self).strip_traceback(tb) + err_str = ''.join(traceback.format_exception(exc_type, exc_val, user_tb)) + hdr_str = 'Traceback (most recent call last):\n' if not err_str.startswith('Traceback ') else '' + self.emit(ConsoleRecord('stderr', hdr_str + err_str)) + self.emit(ControlRecord('finished')) + else: + sys.stdout, orig_stdout = self.stdout_emitter, sys.stdout + sys.stderr, orig_stderr = self.stderr_emitter, sys.stderr + try: + exec(code_obj, self.user_ns) + except Exception as e: + # strip the first frame + exc_type, exc_val, tb = sys.exc_info() + user_tb = type(self).strip_traceback(tb) + traceback.print_exception(exc_type, exc_val, user_tb) + finally: + self.emit(ControlRecord('finished')) + sys.stdout = orig_stdout + sys.stderr = orig_stderr diff --git a/python3/run.py b/python3/run.py index dfe285dc2..57c54c461 100644 --- a/python3/run.py +++ b/python3/run.py @@ -1,211 +1,75 @@ #! /usr/bin/env python -import builtins as builtin_mod -import code -import enum -from functools import partial import logging import os +from pathlib import Path import sys -import time -import traceback -import types -from namedlist import namedtuple, namedlist -import simplejson as json -import zmq -from IPython.core.completer import Completer - -import getpass - -from sorna.types import ( - InputRequest, ControlRecord, ConsoleRecord, MediaRecord, HTMLRecord, CompletionRecord, -) - -log = logging.getLogger('code-runner') - - -class StreamToEmitter: - - def __init__(self, emitter, stream_type): - self.emit = emitter - self.stream_type = stream_type - - def write(self, s): - self.emit(ConsoleRecord(self.stream_type, s)) - - def flush(self): - pass - - -class CodeRunner: - ''' - A thin wrapper for REPL. - - It creates a dummy module that user codes run and keeps the references to user-created objects - (e.g., variables and functions). - ''' - - def __init__(self, api_version=1): - self.api_version = api_version - self.input_supported = (api_version >= 2) - - ctx = zmq.Context.instance() - self.input_stream = ctx.socket(zmq.PULL) - self.input_stream.bind('tcp://*:2000') - self.output_stream = ctx.socket(zmq.PUSH) - self.output_stream.bind('tcp://*:2001') - - self.stdout_emitter = StreamToEmitter(self.emit, 'stdout') - self.stderr_emitter = StreamToEmitter(self.emit, 'stderr') - - # Initialize user module and namespaces. - user_module = types.ModuleType('__main__', - doc='Automatically created module for the interactive shell.') - user_module.__dict__.setdefault('__builtin__', builtin_mod) - user_module.__dict__.setdefault('__builtins__', builtin_mod) - self.user_module = user_module - self.user_ns = user_module.__dict__ - - self.completer = Completer(namespace=self.user_ns, global_namespace={}) - self.completer.limit_to__all__ = True - - def handle_input(self, prompt=None, password=False): - if prompt is None: - prompt = 'Password: ' if password else '' - self.emit(ConsoleRecord('stdout', prompt)) - self.emit(InputRequest(is_password=password)) - data = self.input_stream.recv_multipart() - return data[1].decode('utf8') - - def handle_complete(self, data): - args = json.loads(data) - state = 0 - matches = [] - while True: - ret = self.completer.complete(args['line'], state) - if ret is None: - break - matches.append(ret) - state += 1 - return matches - - def emit(self, record): - if isinstance(record, ConsoleRecord): - assert record.target in ('stdout', 'stderr') - self.output_stream.send_multipart([ - record.target.encode('ascii'), - record.data.encode('utf8'), - ]) - elif isinstance(record, MediaRecord): - self.output_stream.send_multipart([ - b'media', - json.dumps({ - 'type': record.type, - 'data': record.data, - }).encode('utf8'), - ]) - elif isinstance(record, HTMLRecord): - self.output_stream.send_multipart([ - b'html', - record.html.encode('utf8'), - ]) - elif isinstance(record, InputRequest): - self.output_stream.send_multipart([ - b'waiting-input', - json.dumps({ - 'is_password': record.is_password, - }).encode('utf8'), - ]) - elif isinstance(record, CompletionRecord): - self.output_stream.send_multipart([ - b'completion', - json.dumps(record.matches).encode('utf8'), - ]) - elif isinstance(record, ControlRecord): - self.output_stream.send_multipart([ - record.event.encode('ascii'), - b'', - ]) +sys.path.insert(0, os.path.abspath('.')) +from base_run import BaseRunner +from inproc_run import PythonInprocRunner + +log = logging.getLogger() + +DEFAULT_PYFLAGS = '' +CHILD_ENV = { + 'TERM': 'xterm', + 'LANG': 'C.UTF-8', + 'SHELL': '/bin/ash', + 'USER': 'work', + 'HOME': '/home/work', + 'PATH': '/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin', + 'LD_PRELOAD': '/home/sorna/patch-libs.so', +} + + +class PythonProgramRunner(BaseRunner): + + log_prefix = 'python-kernel' + + def __init__(self): + super().__init__() + self.child_env.update(CHILD_ENV) + self.inproc_runner = None + + async def build(self, build_cmd): + if build_cmd is None or build_cmd == '': + # skipped + return + elif build_cmd == '*': + if Path('setup.py').is_file(): + cmd = f'python {DEFAULT_PYFLAGS} setup.py develop' + await self.run_subproc(cmd) + else: + log.warning('skipping build phase due to missing "setup.py" file') else: - raise TypeError('Unsupported record type.') - - @staticmethod - def strip_traceback(tb): - while tb is not None: - frame_summary = traceback.extract_tb(tb, limit=1)[0] - if frame_summary[0] == '': - break - tb = tb.tb_next - return tb - - def run(self): - json_opts = {'namedtuple_as_object': False} - while True: - data = self.input_stream.recv_multipart() - code_id = data[0].decode('ascii') - code_text = data[1].decode('utf8') - log.debug(f'recv input: {code_id}, {code_text}') - if code_id == 'complete': - try: - completions = self.handle_complete(code_text) - self.emit(CompletionRecord(completions)) - log.debug('completion-sent') - except: - log.exception('Unexpected error during handle_complete()') - finally: - self.emit(ControlRecord('finished')) - log.debug('completion-finished') - continue - self.user_module.__builtins__._sorna_emit = self.emit - if self.input_supported: - self.user_module.__builtins__.input = self.handle_input - getpass.getpass = partial(self.handle_input, password=True) - try: - code_obj = code.compile_command(code_text, symbol='exec') - except (OverflowError, IndentationError, SyntaxError, - ValueError, TypeError, MemoryError) as e: - exc_type, exc_val, tb = sys.exc_info() - user_tb = type(self).strip_traceback(tb) - err_str = ''.join(traceback.format_exception(exc_type, exc_val, user_tb)) - hdr_str = 'Traceback (most recent call last):\n' if not err_str.startswith('Traceback ') else '' - self.emit(ConsoleRecord('stderr', hdr_str + err_str)) - self.emit(ControlRecord('finished')) + await self.run_subproc(build_cmd) + + async def execute(self, exec_cmd): + if exec_cmd is None or exec_cmd == '': + # skipped + return + elif exec_cmd == '*': + if Path('main.py').is_file(): + cmd = f'python {DEFAULT_PYFLAGS} main.py' + await self.run_subproc(cmd) else: - sys.stdout, orig_stdout = self.stdout_emitter, sys.stdout - sys.stderr, orig_stderr = self.stderr_emitter, sys.stderr - try: - exec(code_obj, self.user_ns) - except Exception as e: - # strip the first frame - exc_type, exc_val, tb = sys.exc_info() - user_tb = type(self).strip_traceback(tb) - traceback.print_exception(exc_type, exc_val, user_tb) - finally: - self.emit(ControlRecord('finished')) - sys.stdout = orig_stdout - sys.stderr = orig_stderr - - -def main(): - log = logging.getLogger('main') + log.error('cannot find the main script ("main.py").') + else: + await self.run_subproc(exec_cmd) - # Replace stdin with a "null" file - # (trying to read stdin will raise EOFError immediately afterwards.) - sys.stdin = open(os.devnull, 'rb') + async def query(self, code_text): + if self.inproc_runner is None: + self.inproc_runner = PythonInprocRunner(self) + # NOTE: In-process code execution is a blocking operation. + self.inproc_runner.query(code_text) - # Initialize context object. - runner = CodeRunner(api_version=2) - try: - runner.run() - except (KeyboardInterrupt, SystemExit): - pass - except: - log.exception('unexpected error') - finally: - print('exit.') + async def complete(self, data): + try: + return self.inproc_runner.get_completions(data) + except: + log.exception('unexpected error') if __name__ == '__main__': - logging.basicConfig(level=logging.DEBUG) - main() + PythonProgramRunner().run()