Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cln improve dispatcher log #468

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions ramp-engine/ramp_engine/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def __init__(self, config, event_config, worker=None, n_workers=1,
)
for lib in ('OMP', 'MKL', 'OPENBLAS'):
os.environ[lib + '_NUM_THREADS'] = str(self.n_threads)
self._logger = logger.getChild(self._ramp_config['event_name'])

def fetch_from_db(self, session):
"""Fetch the submission from the database and create the workers."""
Expand All @@ -143,24 +144,27 @@ def fetch_from_db(self, session):
)
self._awaiting_worker_queue.put_nowait((worker, (submission_id,
submission_name)))
logger.info('Submission {} added to the queue of submission to be '
'processed'.format(submission_name))
self._logger.info(
f'Submission {submission_name} added to the queue of '
'submission to be processed'
)

def launch_workers(self, session):
"""Launch the awaiting workers if possible."""
while (not self._processing_worker_queue.full() and
not self._awaiting_worker_queue.empty()):
worker, (submission_id, submission_name) = \
self._awaiting_worker_queue.get()
logger.info('Starting worker: {}'.format(worker))
self._logger.info(f'Starting worker: {worker}')

try:
worker.setup()
if worker.status != "error":
worker.launch_submission()
except Exception as e:
logger.error('Worker finished with unhandled exception:'
f' {e}')
self._logger.error(
f'Worker finished with unhandled exception:\n {e}'
)
worker.status = 'error'
if worker.status == 'error':
set_submission_state(session, submission_id, 'checking_error')
Expand All @@ -173,8 +177,9 @@ def launch_workers(self, session):
)
self._processing_worker_queue.put_nowait(
(worker, (submission_id, submission_name)))
logger.info('Store the worker {} into the processing queue'
.format(worker))
self._logger.info(
f'Store the worker {worker} into the processing queue'
)

def collect_result(self, session):
"""Collect result from processed workers."""
Expand Down Expand Up @@ -204,20 +209,21 @@ def collect_result(self, session):
time.sleep(0)
elif worker.status == 'retry':
set_submission_state(session, submission_id, 'new')
logging.info(f'Submission: {submission_id} has been '
'interrupted. It will be added to queue again '
'and retried.')
self._logger.info(
f'Submission: {submission_id} has been interrupted. '
'It will be added to queue again and retried.'
)
worker.teardown()
else:
logger.info(f'Collecting results from worker {worker}')
self._logger.info(f'Collecting results from worker {worker}')
returncode, stderr = worker.collect_results()
if returncode:
if returncode == 124:
logger.info(
self._logger.info(
f'Worker {worker} killed due to timeout.'
)
else:
logger.info(
self._logger.info(
f'Worker {worker} killed due to an error '
'during training'
)
Expand All @@ -242,8 +248,9 @@ def update_database_results(self, session):
self._processed_submission_queue.get_nowait()
if 'error' in get_submission_state(session, submission_id):
continue
logger.info('Write info in database for submission {}'
.format(submission_name))
self._logger.info(
f'Write info in database for submission {submission_name}'
)
path_predictions = os.path.join(
self._worker_config['predictions_dir'], submission_name
)
Expand All @@ -257,10 +264,11 @@ def update_database_results(self, session):
set_submission_state(session, submission_id, 'scored')

if make_update_leaderboard:
logger.info('Update all leaderboards')
self._logger.info('Update all leaderboards')
update_leaderboards(session, self._ramp_config['event_name'])
update_all_user_leaderboards(session,
self._ramp_config['event_name'])
self._logger.info('Leaderboards updated')

@staticmethod
def _reset_submission_after_failure(session, even_name):
Expand All @@ -272,10 +280,10 @@ def _reset_submission_after_failure(session, even_name):

def launch(self):
"""Launch the dispatcher."""
logger.info('Starting the RAMP dispatcher')
self._logger.info('Starting the RAMP dispatcher')
with session_scope(self._database_config) as session:
logger.info('Open a session to the database')
logger.info(
self._logger.info('Open a session to the database')
self._logger.info(
'Reset unfinished trained submission from previous session'
)
self._reset_submission_after_failure(
Expand All @@ -293,4 +301,4 @@ def launch(self):
self._reset_submission_after_failure(
session, self._ramp_config['event_name']
)
logger.info('Dispatcher killed by the poison pill')
self._logger.info('Dispatcher killed by the poison pill')