Skip to content

Commit

Permalink
refs #44: Fix hang-up bug in query-mode of C kernel
Browse files Browse the repository at this point in the history
 * Improve logging of the kernel by publishing log records to both stdout (for docker logs) and output socket (for client-side)
  • Loading branch information
achimnol committed Jun 29, 2017
1 parent 14b5a44 commit e5407a6
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 41 deletions.
72 changes: 46 additions & 26 deletions base/base_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from abc import ABC, abstractmethod
import asyncio
import logging
import logging.config
from logging.handlers import QueueHandler
import os
import signal
import sys
Expand All @@ -14,6 +16,22 @@
log = logging.getLogger()


class OutsockHandler(QueueHandler):
def enqueue(self, record):
msg = self.formatter.format(record)
self.queue.write([
b'stderr',
(msg + '\n').encode('utf8'),
])


class BraceLogRecord(logging.LogRecord):
def getMessage(self):
if self.args is not None:
return self.msg.format(*self.args)
return self.msg


async def pipe_output(stream, outsock, target):
assert target in ('stdout', 'stderr')
try:
Expand All @@ -29,14 +47,14 @@ async def pipe_output(stream, outsock, target):
log.exception('unexpected error')


class BaseRun(ABC):
class BaseRunner(ABC):

insock = None
outsock = None
child_env = None
log_prefix = 'generic-kernel'

def __init__(self):
super().__init__()
self.child_env = {}
self.insock = None
self.outsock = None

@abstractmethod
async def build(self, build_cmd):
Expand All @@ -50,12 +68,6 @@ async def execute(self, exec_cmd):
async def query(self, code_text):
"""Run user code by creating a temporary file and compiling it."""

def set_child_env(self, env=None):
if not env:
log.info('subprocess environment variables are not set')
env = dict()
self.child_env = env

async def run_subproc(self, cmd):
"""A thin wrapper for an external command."""
loop = asyncio.get_event_loop()
Expand All @@ -79,34 +91,42 @@ async def run_subproc(self, cmd):
log.exception('unexpected error')

async def main_loop(self):
insock = await aiozmq.create_zmq_stream(zmq.PULL, bind='tcp://*:2000')
outsock = await aiozmq.create_zmq_stream(zmq.PUSH, bind='tcp://*:2001')
outsock.write([b'stdout', b'akdj;fajwe;f\n'])
self.insock, self.outsock = insock, outsock
self.set_child_env()
print('start serving...')
self.insock = await aiozmq.create_zmq_stream(zmq.PULL, bind='tcp://*:2000')
self.outsock = await aiozmq.create_zmq_stream(zmq.PUSH, bind='tcp://*:2001')

# configure logging to publish logs via outsock as well
logging.basicConfig(
level=logging.INFO, # NOTE: change this to DEBUG when debugging
format=self.log_prefix + ': {message}',
style='{',
handlers=[logging.StreamHandler(), OutsockHandler(self.outsock)],
)
_factory = lambda *args, **kwargs: BraceLogRecord(*args, **kwargs)
logging.setLogRecordFactory(_factory)

log.debug('start serving...')
while True:
try:
data = await insock.read()
data = await self.insock.read()
op_type = data[0].decode('ascii')
text = data[1].decode('utf8')
if op_type == 'build': # batch-mode step 1
await self.build(text)
outsock.write([b'build-finished', b''])
self.outsock.write([b'build-finished', b''])
elif op_type == 'exec': # batch-mode step 2
await self.execute(text)
outsock.write([b'finished', b''])
self.outsock.write([b'finished', b''])
elif op_type == 'input': # query-mode
await self.query(text)
outsock.write([b'finished', b''])
await outsock.drain()
self.outsock.write([b'finished', b''])
await self.outsock.drain()
except asyncio.CancelledError:
break
except:
log.exception('unexpected error')
break
insock.close()
outsock.close()
self.insock.close()
self.outsock.close()

def run(self):
# Replace stdin with a "null" file
Expand All @@ -123,7 +143,7 @@ def interrupt(loop, stopped):
stopped.set()
loop.stop()
else:
print('forced shutdown!', file=sys.stderr)
log.warning('forced shutdown!', file=sys.stderr)
sys.exit(1)

loop.add_signal_handler(signal.SIGINT, interrupt, loop, stopped)
Expand All @@ -137,4 +157,4 @@ def interrupt(loop, stopped):
loop.run_until_complete(main_task)
finally:
loop.close()
print('exit.')
log.debug('exit.')
24 changes: 9 additions & 15 deletions c/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import tempfile

sys.path.insert(0, os.path.abspath('.'))
from base_run import BaseRun
from base_run import BaseRunner
# For debugging
# sys.path.insert(0, os.path.abspath('..'))
# from base.run import BaseRun
Expand All @@ -27,7 +27,9 @@
}


class Run(BaseRun):
class CProgramRunner(BaseRunner):

log_prefix = 'c-kernel'

def __init__(self):
super().__init__()
Expand All @@ -48,10 +50,8 @@ async def build(self, build_cmd):
f'chmod 755 ./main')
await self.run_subproc(cmd)
else:
self.outsock.write([
b'stderr',
b'c-kernel: cannot find build script ("Makefile").\n',
])
log.error('cannot find build script ("Makefile") '
'or the main file ("main.c").')
else:
await self.run_subproc(build_cmd)

Expand All @@ -65,24 +65,18 @@ async def execute(self, exec_cmd):
elif Path('./a.out').is_file():
await self.run_subproc('chmod 755 ./a.out; ./a.out')
else:
self.outsock.write([
b'stderr',
b'c-kernel: cannot find executable ("a.out" or "main").\n',
])
log.error('cannot find executable ("a.out" or "main").')
else:
await self.run_subproc(exec_cmd)

async def query(self, code_text):
with tempfile.NamedTemporaryFile(suffix='.c', dir='.') as tmpf:
tmpf.write(code_text)
tmpf.write(code_text.encode('utf8'))
tmpf.flush()
cmd = (f'gcc {tmpf.name} {DEFAULT_CFLAGS} -o ./main {DEFAULT_LDFLAGS} '
f'&& chmod 755 ./main && ./main')
await self.run_subproc(cmd)


def main():
Run().run()

if __name__ == '__main__':
main()
CProgramRunner().run()

0 comments on commit e5407a6

Please sign in to comment.