From 0d591310023840c141eaafe0830743c409b08687 Mon Sep 17 00:00:00 2001 From: tomMoral Date: Mon, 16 Nov 2020 12:08:13 +0100 Subject: [PATCH 1/3] CLN add event name in dispatcher log --- ramp-engine/ramp_engine/dispatcher.py | 48 ++++++++++++++++----------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/ramp-engine/ramp_engine/dispatcher.py b/ramp-engine/ramp_engine/dispatcher.py index 9ac3fdeec..35141c13b 100644 --- a/ramp-engine/ramp_engine/dispatcher.py +++ b/ramp-engine/ramp_engine/dispatcher.py @@ -121,6 +121,7 @@ def __init__(self, config, event_config, worker=None, n_workers=1, ) for lib in ('OMP', 'MKL', 'OPENBLAS'): os.environ[lib + '_NUM_THREADS'] = str(self.n_threads) + self._logger = logger.getChild(event_config['name']) def fetch_from_db(self, session): """Fetch the submission from the database and create the workers.""" @@ -143,8 +144,10 @@ def fetch_from_db(self, session): ) self._awaiting_worker_queue.put_nowait((worker, (submission_id, submission_name))) - logger.info('Submission {} added to the queue of submission to be ' - 'processed'.format(submission_name)) + self._logger.info( + f'Submission {submission_name} added to the queue of ' + 'submission to be processed' + ) def launch_workers(self, session): """Launch the awaiting workers if possible.""" @@ -152,15 +155,16 @@ def launch_workers(self, session): not self._awaiting_worker_queue.empty()): worker, (submission_id, submission_name) = \ self._awaiting_worker_queue.get() - logger.info('Starting worker: {}'.format(worker)) + self._logger.info(f'Starting worker: {worker}') try: worker.setup() if worker.status != "error": worker.launch_submission() except Exception as e: - logger.error('Worker finished with unhandled exception:' - f' {e}') + self._logger.error( + f'Worker finished with unhandled exception:\n {e}' + ) worker.status = 'error' if worker.status == 'error': set_submission_state(session, submission_id, 'checking_error') @@ -173,8 +177,9 @@ def launch_workers(self, session): ) self._processing_worker_queue.put_nowait( (worker, (submission_id, submission_name))) - logger.info('Store the worker {} into the processing queue' - .format(worker)) + self._logger.info( + f'Store the worker {worker} into the processing queue' + ) def collect_result(self, session): """Collect result from processed workers.""" @@ -204,20 +209,21 @@ def collect_result(self, session): time.sleep(0) elif worker.status == 'retry': set_submission_state(session, submission_id, 'new') - logging.info(f'Submission: {submission_id} has been ' - 'interrupted. It will be added to queue again ' - 'and retried.') + self._logger.info( + f'Submission: {submission_id} has been interrupted. ' + 'It will be added to queue again and retried.' + ) worker.teardown() else: - logger.info(f'Collecting results from worker {worker}') + self._logger.info(f'Collecting results from worker {worker}') returncode, stderr = worker.collect_results() if returncode: if returncode == 124: - logger.info( + self._logger.info( f'Worker {worker} killed due to timeout.' ) else: - logger.info( + self._logger.info( f'Worker {worker} killed due to an error ' 'during training' ) @@ -242,8 +248,9 @@ def update_database_results(self, session): self._processed_submission_queue.get_nowait() if 'error' in get_submission_state(session, submission_id): continue - logger.info('Write info in database for submission {}' - .format(submission_name)) + self.logger.info( + f'Write info in database for submission {submission_name}' + ) path_predictions = os.path.join( self._worker_config['predictions_dir'], submission_name ) @@ -257,10 +264,11 @@ def update_database_results(self, session): set_submission_state(session, submission_id, 'scored') if make_update_leaderboard: - logger.info('Update all leaderboards') + self._logger.info('Update all leaderboards') update_leaderboards(session, self._ramp_config['event_name']) update_all_user_leaderboards(session, self._ramp_config['event_name']) + self._logger.info('Leaderboards updated') @staticmethod def _reset_submission_after_failure(session, even_name): @@ -272,10 +280,10 @@ def _reset_submission_after_failure(session, even_name): def launch(self): """Launch the dispatcher.""" - logger.info('Starting the RAMP dispatcher') + self._logger.info('Starting the RAMP dispatcher') with session_scope(self._database_config) as session: - logger.info('Open a session to the database') - logger.info( + self._logger.info('Open a session to the database') + self._logger.info( 'Reset unfinished trained submission from previous session' ) self._reset_submission_after_failure( @@ -293,4 +301,4 @@ def launch(self): self._reset_submission_after_failure( session, self._ramp_config['event_name'] ) - logger.info('Dispatcher killed by the poison pill') + self._logger.info('Dispatcher killed by the poison pill') From 29071ee7f3f6588cef11bb46efd448eada663782 Mon Sep 17 00:00:00 2001 From: tomMoral Date: Mon, 16 Nov 2020 12:11:22 +0100 Subject: [PATCH 2/3] FIX typo in fetching event name --- ramp-engine/ramp_engine/dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ramp-engine/ramp_engine/dispatcher.py b/ramp-engine/ramp_engine/dispatcher.py index 35141c13b..d5ccbedf6 100644 --- a/ramp-engine/ramp_engine/dispatcher.py +++ b/ramp-engine/ramp_engine/dispatcher.py @@ -121,7 +121,7 @@ def __init__(self, config, event_config, worker=None, n_workers=1, ) for lib in ('OMP', 'MKL', 'OPENBLAS'): os.environ[lib + '_NUM_THREADS'] = str(self.n_threads) - self._logger = logger.getChild(event_config['name']) + self._logger = logger.getChild(self._ramp_config['event_name']) def fetch_from_db(self, session): """Fetch the submission from the database and create the workers.""" From 8372e9fa397f4afce9081ffd9cb220495ad8def3 Mon Sep 17 00:00:00 2001 From: tomMoral Date: Mon, 16 Nov 2020 12:34:14 +0100 Subject: [PATCH 3/3] FIX typo in the logger name --- ramp-engine/ramp_engine/dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ramp-engine/ramp_engine/dispatcher.py b/ramp-engine/ramp_engine/dispatcher.py index d5ccbedf6..a1aca3fe1 100644 --- a/ramp-engine/ramp_engine/dispatcher.py +++ b/ramp-engine/ramp_engine/dispatcher.py @@ -248,7 +248,7 @@ def update_database_results(self, session): self._processed_submission_queue.get_nowait() if 'error' in get_submission_state(session, submission_id): continue - self.logger.info( + self._logger.info( f'Write info in database for submission {submission_name}' ) path_predictions = os.path.join(