Skip to content

Commit

Permalink
ENH Cl improve dispatcher log (#468)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomMoral authored and glemaitre committed Nov 16, 2020
1 parent 65adad5 commit 6b7e3f9
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions ramp-engine/ramp_engine/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def __init__(self, config, event_config, worker=None, n_workers=1,
)
for lib in ('OMP', 'MKL', 'OPENBLAS'):
os.environ[lib + '_NUM_THREADS'] = str(self.n_threads)
self._logger = logger.getChild(self._ramp_config['event_name'])

def fetch_from_db(self, session):
"""Fetch the submission from the database and create the workers."""
Expand All @@ -143,24 +144,27 @@ def fetch_from_db(self, session):
)
self._awaiting_worker_queue.put_nowait((worker, (submission_id,
submission_name)))
logger.info('Submission {} added to the queue of submission to be '
'processed'.format(submission_name))
self._logger.info(
f'Submission {submission_name} added to the queue of '
'submission to be processed'
)

def launch_workers(self, session):
"""Launch the awaiting workers if possible."""
while (not self._processing_worker_queue.full() and
not self._awaiting_worker_queue.empty()):
worker, (submission_id, submission_name) = \
self._awaiting_worker_queue.get()
logger.info('Starting worker: {}'.format(worker))
self._logger.info(f'Starting worker: {worker}')

try:
worker.setup()
if worker.status != "error":
worker.launch_submission()
except Exception as e:
logger.error('Worker finished with unhandled exception:'
f' {e}')
self._logger.error(
f'Worker finished with unhandled exception:\n {e}'
)
worker.status = 'error'
if worker.status == 'error':
set_submission_state(session, submission_id, 'checking_error')
Expand All @@ -173,8 +177,9 @@ def launch_workers(self, session):
)
self._processing_worker_queue.put_nowait(
(worker, (submission_id, submission_name)))
logger.info('Store the worker {} into the processing queue'
.format(worker))
self._logger.info(
f'Store the worker {worker} into the processing queue'
)

def collect_result(self, session):
"""Collect result from processed workers."""
Expand Down Expand Up @@ -204,20 +209,21 @@ def collect_result(self, session):
time.sleep(0)
elif worker.status == 'retry':
set_submission_state(session, submission_id, 'new')
logging.info(f'Submission: {submission_id} has been '
'interrupted. It will be added to queue again '
'and retried.')
self._logger.info(
f'Submission: {submission_id} has been interrupted. '
'It will be added to queue again and retried.'
)
worker.teardown()
else:
logger.info(f'Collecting results from worker {worker}')
self._logger.info(f'Collecting results from worker {worker}')
returncode, stderr = worker.collect_results()
if returncode:
if returncode == 124:
logger.info(
self._logger.info(
f'Worker {worker} killed due to timeout.'
)
else:
logger.info(
self._logger.info(
f'Worker {worker} killed due to an error '
'during training'
)
Expand All @@ -242,8 +248,9 @@ def update_database_results(self, session):
self._processed_submission_queue.get_nowait()
if 'error' in get_submission_state(session, submission_id):
continue
logger.info('Write info in database for submission {}'
.format(submission_name))
self._logger.info(
f'Write info in database for submission {submission_name}'
)
path_predictions = os.path.join(
self._worker_config['predictions_dir'], submission_name
)
Expand All @@ -257,10 +264,11 @@ def update_database_results(self, session):
set_submission_state(session, submission_id, 'scored')

if make_update_leaderboard:
logger.info('Update all leaderboards')
self._logger.info('Update all leaderboards')
update_leaderboards(session, self._ramp_config['event_name'])
update_all_user_leaderboards(session,
self._ramp_config['event_name'])
self._logger.info('Leaderboards updated')

@staticmethod
def _reset_submission_after_failure(session, even_name):
Expand All @@ -272,10 +280,10 @@ def _reset_submission_after_failure(session, even_name):

def launch(self):
"""Launch the dispatcher."""
logger.info('Starting the RAMP dispatcher')
self._logger.info('Starting the RAMP dispatcher')
with session_scope(self._database_config) as session:
logger.info('Open a session to the database')
logger.info(
self._logger.info('Open a session to the database')
self._logger.info(
'Reset unfinished trained submission from previous session'
)
self._reset_submission_after_failure(
Expand All @@ -293,4 +301,4 @@ def launch(self):
self._reset_submission_after_failure(
session, self._ramp_config['event_name']
)
logger.info('Dispatcher killed by the poison pill')
self._logger.info('Dispatcher killed by the poison pill')

0 comments on commit 6b7e3f9

Please sign in to comment.