diff --git a/egs/mini_librispeech/s5/local/nnet3/tuning/run_tdnn_lstm_1a.sh b/egs/mini_librispeech/s5/local/nnet3/tuning/run_tdnn_lstm_1a.sh index 48f7f6cdee5..04adba926cf 100755 --- a/egs/mini_librispeech/s5/local/nnet3/tuning/run_tdnn_lstm_1a.sh +++ b/egs/mini_librispeech/s5/local/nnet3/tuning/run_tdnn_lstm_1a.sh @@ -3,18 +3,21 @@ # This is a basic TDNN+LSTM nnet3 experiment. +# steps/info/nnet3_dir_info.pl exp/nnet3/tdnn_lstm1a_sp +# exp/nnet3/tdnn_lstm1a_sp: num-iters=32 nj=1..2 num-params=8.4M dim=40+100->2022 combine=-0.49->-0.39 loglike:train/valid[20,31,combined]=(-0.65,-0.41,-0.39/-1.03,-0.96,-0.97) accuracy:train/valid[20,31,combined]=(0.78,0.86,0.87/0.70,0.72,0.72) + # Below, comparing with the chain TDNN system. It's a little better with the # small-vocab decoding. Both systems are probably super-badly tuned, and the # chain system probably used too many jobs. # # local/nnet3/compare_wer.sh exp/chain/tdnn1a_sp exp/nnet3/tdnn_lstm1a_sp # System tdnn1a_sp tdnn_lstm1a_sp -#WER dev_clean_2 (tgsmall) 18.58 17.67 -#WER dev_clean_2 (tglarge) 13.35 13.43 -# Final train prob -0.3660 -# Final valid prob -1.0236 -# Final train acc 0.8737 -# Final valid acc 0.7222 +#WER dev_clean_2 (tgsmall) 18.43 17.37 +#WER dev_clean_2 (tglarge) 13.15 13.43 +# Final train prob -0.3933 +# Final valid prob -0.9662 +# Final train acc 0.8652 +# Final valid acc 0.7206 # Set -e here so that we catch if any executable fails immediately set -euo pipefail diff --git a/egs/wsj/s5/steps/libs/common.py b/egs/wsj/s5/steps/libs/common.py index a52b78369df..f17d271eb4d 100644 --- a/egs/wsj/s5/steps/libs/common.py +++ b/egs/wsj/s5/steps/libs/common.py @@ -2,6 +2,7 @@ # Copyright 2016 Vijayaditya Peddinti. # 2016 Vimal Manohar +# 2017 Johns Hopkins University (author: Daniel Povey) # Apache 2.0 """ This module contains several utility functions and classes that are @@ -74,187 +75,107 @@ def check_if_cuda_compiled(): return True -class KaldiCommandException(Exception): - """ An Exception class that throws an error string with the - kaldi command that caused the error and the error string captured. - """ - def __init__(self, command, err=None): - Exception.__init__(self, - "There was an error while running the command " - "{0}\n{1}\n{2}".format(command, "-"*10, - "" if err is None else err)) - - -class BackgroundProcessHandler(): - """ This class handles background processes to ensure that a top-level - script waits until all the processes end before exiting - - A top-level script is expected to instantiate an object of this class - and pass it to all calls of run_kaldi_command that are to be run in the - background. The background processes are queued and these are polled - in a parallel thread at set interval to check for failures. - The top-level script can ensure at the end ensure that all processes are - completed before exiting. - - Attributes: - __process_queue: Stores a list of process handles and command tuples - __polling_time: The time after which the processes are polled - __timer: Internal timer object - __is_running: Stores whether a timer is running - """ - - def __init__(self, polling_time=600): - self.__process_queue = [] - self.__polling_time = polling_time - self.__timer = None - self.__lock = threading.Lock() - self.__is_running = False - - def __run(self): - """ Internal function to run a poll. Calls poll(). """ - assert(self.__is_running) - self.__is_running = False - logger.debug("Polling...") - if self.poll(): - # If there are any more background processes running, - # start a new timer - self.start() - - def start(self): - """ Start the background process handler. - - Repeatedly calls itself through the __run() method every - __polling_time seconds. - """ - if not self.__is_running: - self.__timer = threading.Timer(self.__polling_time, self.__run) - logger.debug("Starting new timer...") - self.__is_running = True - self.__timer.start() - - def stop(self): - """ Stop the background process handler by cancelling any running timer. - """ - if self.__timer is not None: - self.__timer.cancel() - self.__is_running = False - - def poll(self): - """ Poll background processes and check their statuses. - - Returns True if any processes are still in the queue. - """ - with self.__lock: - remaining_processes = [] - for t in self.__process_queue: - if self.is_process_done(t): - self.ensure_process_is_done(t) - else: - remaining_processes.append(t) - self.__process_queue = remaining_processes - num_processes = len(self.__process_queue) - logger.debug("Number of processes remaining is {0}...".format( - num_processes)) - return (num_processes > 0) - - def add_process(self, t): - """ Add a (process handle, command) tuple to the queue. - """ - with self.__lock: - self.__process_queue.append(t) - self.start() - - def is_process_done(self, t): - p, command = t - if p.poll() is None: - return False - return True +def execute_command(command): + """ Runs a kaldi job in the foreground and waits for it to complete; raises an + exception if its return status is nonzero. The command is executed in + 'shell' mode so 'command' can involve things like pipes. Often, + 'command' will start with 'run.pl' or 'queue.pl'. The stdout and stderr + are merged with the calling process's stdout and stderr so they will + appear on the screen. - def ensure_process_is_done(self, t): - p, command = t - logger.debug("Waiting for process '{0}' to end".format(command)) - [stdout, stderr] = p.communicate() - if p.returncode is not 0: - raise KaldiCommandException(command, stderr) - - def ensure_processes_are_done(self): - self.__process_queue.reverse() - while len(self.__process_queue) > 0: - t = self.__process_queue.pop() - self.ensure_process_is_done(t) - self.stop() - - def __del__(self): - self.stop() - - def debug(self): - for p, command in self.__process_queue: - logger.info("Process '{0}' is running".format(command)) - - -def run_job(command, wait=True, background_process_handler=None): - """ Runs a kaldi job, usually using a script such as queue.pl and - run.pl, and redirects the stdout and stderr to the parent - process's streams. - These are usually a sequence of commands connected by pipes, so we use - shell=True. - - Args: - background_process_handler: An object of the BackgroundProcessHandler - class that is instantiated by the top-level script. If this is - provided, then the created process handle is added to the object. - wait: If True, wait until the process is completed. However, if the - background_process_handler is provided, this option will be - ignored and the process will be run in the background. + See also: get_command_stdout, background_command """ p = subprocess.Popen(command, shell=True) + p.communicate() + if p.returncode is not 0: + raise Exception("Command exited with status {0}: {1}".format( + p.returncode, command)) - if background_process_handler is not None: - wait = False - background_process_handler.add_process((p, command)) - if wait: - p.communicate() - if p.returncode is not 0: - raise KaldiCommandException(command) - return None - else: - return p - - -def run_kaldi_command(command, wait=True, background_process_handler=None): - """ Runs commands frequently seen in Kaldi scripts and - captures the stdout and stderr. - These are usually a sequence of commands connected by pipes, so we use - shell=True. - - Args: - background_process_handler: An object of the BackgroundProcessHandler - class that is instantiated by the top-level script. If this is - provided, then the created process handle is added to the object. - wait: If True, wait until the process is completed. However, if the - background_process_handler is provided, this option will be - ignored and the process will be run in the background. +def get_command_stdout(command, require_zero_status = True): + """ Executes a command and returns its stdout output as a string. The + command is executed with shell=True, so it may contain pipes and + other shell constructs. + + If require_zero_stats is True, this function will raise an exception if + the command has nonzero exit status. If False, it just prints a warning + if the exit status is nonzero. + + See also: execute_command, background_command """ p = subprocess.Popen(command, shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - - if background_process_handler is not None: - wait = False - background_process_handler.add_process((p, command)) - - if wait: - [stdout, stderr] = p.communicate() - if p.returncode is not 0: - raise KaldiCommandException(command, stderr) - return stdout, stderr - else: - return p + stdout=subprocess.PIPE) + + stdout = p.communicate()[0] + if p.returncode is not 0: + str = "Command exited with status {0}: {1}".format( + p.returncode, command) + if require_zero_status: + raise Exception(str) + else: + logger.warning(str) + return stdout + + + + +def wait_for_background_commands(): + """ This waits for all threads to exit. You will often want to + run this at the end of programs that have launched background + threads, so that the program will wait for its child processes + to terminate before it dies.""" + for t in threading.enumerate(): + if not t == threading.current_thread(): + t.join() + +def background_command(command, require_zero_status = False): + """Executes a command in a separate thread, like running with '&' in the shell. + If you want the program to die if the command eventually returns with + nonzero status, then set require_zero_status to True. 'command' will be + executed in 'shell' mode, so it's OK for it to contain pipes and other + shell constructs. + + This function returns the Thread object created, just in case you want + to wait for that specific command to finish. For example, you could do: + thread = background_command('foo | bar') + # do something else while waiting for it to finish + thread.join() + + See also: + - wait_for_background_commands(), which can be used + at the end of the program to wait for all these commands to terminate. + - execute_command() and get_command_stdout(), which allow you to + execute commands in the foreground. + + """ + + thread = threading.Thread(target=background_command_runner, + args=(command, require_zero_status)) + thread.start() + return thread + + +def background_command_runner(command, require_zero_status): + """ This is the function that is called from background_command, in + a separate thread.""" + p = subprocess.Popen(command, shell=True) + + p.communicate() + if p.returncode is not 0: + str = "Command exited with status {0}: {1}".format( + p.returncode, command) + if require_zero_status: + logger.error(str) + # thread.interrupt_main() sends a KeyboardInterrupt to the main + # thread, which will generally terminate the program. + import thread + thread.interrupt_main() + else: + logger.warning(str) def get_number_of_leaves_from_tree(alidir): - [stdout, stderr] = run_kaldi_command( + stdout = get_command_stdout( "tree-info {0}/tree 2>/dev/null | grep num-pdfs".format(alidir)) parts = stdout.split() assert(parts[0] == "num-pdfs") @@ -265,7 +186,7 @@ def get_number_of_leaves_from_tree(alidir): def get_number_of_leaves_from_model(dir): - [stdout, stderr] = run_kaldi_command( + stdout = get_command_stdout( "am-info {0}/final.mdl 2>/dev/null | grep -w pdfs".format(dir)) parts = stdout.split() # number of pdfs 7115 @@ -288,7 +209,7 @@ def get_number_of_jobs(alidir): def get_ivector_dim(ivector_dir=None): if ivector_dir is None: return 0 - [stdout_val, stderr_val] = run_kaldi_command( + stdout_val = get_command_stdout( "feat-to-dim --print-args=false " "scp:{dir}/ivector_online.scp -".format(dir=ivector_dir)) ivector_dim = int(stdout_val) @@ -297,7 +218,7 @@ def get_ivector_dim(ivector_dir=None): def get_ivector_extractor_id(ivector_dir=None): if ivector_dir is None: return None - [stdout_val, stderr_val] = run_kaldi_command( + stdout_val = get_command_stdout( "steps/nnet2/get_ivector_id.sh {dir}".format(dir=ivector_dir)) if (stdout_val.strip() == "") or (stdout_val is None): @@ -308,7 +229,7 @@ def get_ivector_extractor_id(ivector_dir=None): def get_feat_dim(feat_dir): if feat_dir is None: return 0 - [stdout_val, stderr_val] = run_kaldi_command( + stdout_val = get_command_stdout( "feat-to-dim --print-args=false " "scp:{data}/feats.scp -".format(data=feat_dir)) feat_dim = int(stdout_val) @@ -316,7 +237,7 @@ def get_feat_dim(feat_dir): def get_feat_dim_from_scp(feat_scp): - [stdout_val, stderr_val] = run_kaldi_command( + stdout_val = get_command_stdout( "feat-to-dim --print-args=false " "scp:{feat_scp} -".format(feat_scp=feat_scp)) feat_dim = int(stdout_val) @@ -324,9 +245,8 @@ def get_feat_dim_from_scp(feat_scp): def split_data(data, num_jobs): - run_kaldi_command("utils/split_data.sh {data} {num_jobs}".format( - data=data, - num_jobs=num_jobs)) + execute_command("utils/split_data.sh {data} {num_jobs}".format( + data=data, num_jobs=num_jobs)) def read_kaldi_matrix(matrix_file): diff --git a/egs/wsj/s5/steps/libs/nnet3/report/log_parse.py b/egs/wsj/s5/steps/libs/nnet3/report/log_parse.py index b5d3e17dded..76c8bd65082 100755 --- a/egs/wsj/s5/steps/libs/nnet3/report/log_parse.py +++ b/egs/wsj/s5/steps/libs/nnet3/report/log_parse.py @@ -52,7 +52,7 @@ def __init__(self, message = None): "There was an error while trying to parse the logs." " Details : \n{0}\n".format(message)) -# This function is used to fill stats_per_component_per_iter table with the +# This function is used to fill stats_per_component_per_iter table with the # results of regular expression. def fill_nonlin_stats_table_with_regex_result(groups, gate_index, stats_table): iteration = int(groups[0]) @@ -61,7 +61,7 @@ def fill_nonlin_stats_table_with_regex_result(groups, gate_index, stats_table): value_percentiles = groups[3+gate_index*6] value_mean = float(groups[4+gate_index*6]) value_stddev = float(groups[5+gate_index*6]) - value_percentiles_split = re.split(',| ',value_percentiles) + value_percentiles_split = re.split(',| ',value_percentiles) assert len(value_percentiles_split) == 13 value_5th = float(value_percentiles_split[4]) value_50th = float(value_percentiles_split[6]) @@ -69,7 +69,7 @@ def fill_nonlin_stats_table_with_regex_result(groups, gate_index, stats_table): deriv_percentiles = groups[6+gate_index*6] deriv_mean = float(groups[7+gate_index*6]) deriv_stddev = float(groups[8+gate_index*6]) - deriv_percentiles_split = re.split(',| ',deriv_percentiles) + deriv_percentiles_split = re.split(',| ',deriv_percentiles) assert len(deriv_percentiles_split) == 13 deriv_5th = float(deriv_percentiles_split[4]) deriv_50th = float(deriv_percentiles_split[6]) @@ -115,8 +115,9 @@ def parse_progress_logs_for_nonlinearity_stats(exp_dir): progress_log_files = "%s/log/progress.*.log" % (exp_dir) stats_per_component_per_iter = {} - progress_log_lines = common_lib.run_kaldi_command( - 'grep -e "value-avg.*deriv-avg" {0}'.format(progress_log_files))[0] + progress_log_lines = common_lib.get_command_stdout( + 'grep -e "value-avg.*deriv-avg" {0}'.format(progress_log_files), + require_zero_status = False) parse_regex = re.compile(g_normal_nonlin_regex_pattern) @@ -172,9 +173,10 @@ def parse_progress_logs_for_clipped_proportion(exp_dir): progress_log_files = "%s/log/progress.*.log" % (exp_dir) component_names = set([]) - progress_log_lines = common_lib.run_kaldi_command( + progress_log_lines = common_lib.get_command_stdout( 'grep -e "{0}" {1}'.format( - "clipped-proportion", progress_log_files))[0] + "clipped-proportion", progress_log_files), + require_zero_status=False) parse_regex = re.compile(".*progress\.([0-9]+)\.log:component " "name=(.*) type=.* " "clipped-proportion=([0-9\.e\-]+)") @@ -255,8 +257,8 @@ def parse_progress_logs_for_param_diff(exp_dir, pattern): progress_log_files = "%s/log/progress.*.log" % (exp_dir) progress_per_iter = {} component_names = set([]) - progress_log_lines = common_lib.run_kaldi_command( - 'grep -e "{0}" {1}'.format(pattern, progress_log_files))[0] + progress_log_lines = common_lib.get_command_stdout( + 'grep -e "{0}" {1}'.format(pattern, progress_log_files)) parse_regex = re.compile(".*progress\.([0-9]+)\.log:" "LOG.*{0}.*\[(.*)\]".format(pattern)) for line in progress_log_lines.split("\n"): @@ -309,8 +311,8 @@ def parse_progress_logs_for_param_diff(exp_dir, pattern): def parse_train_logs(exp_dir): train_log_files = "%s/log/train.*.log" % (exp_dir) - train_log_lines = common_lib.run_kaldi_command( - 'grep -e Accounting {0}'.format(train_log_files))[0] + train_log_lines = common_lib.get_command_stdout( + 'grep -e Accounting {0}'.format(train_log_files)) parse_regex = re.compile(".*train\.([0-9]+)\.([0-9]+)\.log:# " "Accounting: time=([0-9]+) thread.*") @@ -334,10 +336,10 @@ def parse_train_logs(exp_dir): def parse_prob_logs(exp_dir, key='accuracy', output="output"): train_prob_files = "%s/log/compute_prob_train.*.log" % (exp_dir) valid_prob_files = "%s/log/compute_prob_valid.*.log" % (exp_dir) - train_prob_strings = common_lib.run_kaldi_command( - 'grep -e {0} {1}'.format(key, train_prob_files), wait=True)[0] - valid_prob_strings = common_lib.run_kaldi_command( - 'grep -e {0} {1}'.format(key, valid_prob_files))[0] + train_prob_strings = common_lib.get_command_stdout( + 'grep -e {0} {1}'.format(key, train_prob_files)) + valid_prob_strings = common_lib.get_command_stdout( + 'grep -e {0} {1}'.format(key, valid_prob_files)) # LOG # (nnet3-chain-compute-prob:PrintTotalStats():nnet-chain-diagnostics.cc:149) diff --git a/egs/wsj/s5/steps/libs/nnet3/train/chain_objf/acoustic_model.py b/egs/wsj/s5/steps/libs/nnet3/train/chain_objf/acoustic_model.py index 75cba7f3657..4a8505d4f3a 100644 --- a/egs/wsj/s5/steps/libs/nnet3/train/chain_objf/acoustic_model.py +++ b/egs/wsj/s5/steps/libs/nnet3/train/chain_objf/acoustic_model.py @@ -37,7 +37,7 @@ def create_phone_lm(dir, tree_dir, run_opts, lm_opts=None): alignments=' '.join(['{0}/ali.{1}.gz'.format(tree_dir, job) for job in range(1, num_ali_jobs + 1)]) - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/make_phone_lm.log \ gunzip -c {alignments} \| \ ali-to-phones {tree_dir}/final.mdl ark:- ark:- \| \ @@ -49,10 +49,10 @@ def create_phone_lm(dir, tree_dir, run_opts, lm_opts=None): def create_denominator_fst(dir, tree_dir, run_opts): - common_lib.run_job( + common_lib.execute_command( """copy-transition-model {tree_dir}/final.mdl \ {dir}/0.trans_mdl""".format(dir=dir, tree_dir=tree_dir)) - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/make_den_fst.log \ chain-make-den-fst {dir}/tree {dir}/0.trans_mdl \ {dir}/phone_lm.fst \ @@ -75,7 +75,7 @@ def generate_chain_egs(dir, data, lat_dir, egs_dir, See options in that script. """ - common_lib.run_job( + common_lib.execute_command( """steps/nnet3/chain/get_egs.sh {egs_opts} \ --cmd "{command}" \ --cmvn-opts "{cmvn_opts}" \ @@ -151,7 +151,7 @@ def train_new_models(dir, iter, srand, num_jobs, deriv_time_opts.append("--optimization.max-deriv-time-relative={0}".format( int(max_deriv_time_relative))) - processes = [] + threads = [] for job in range(1, num_jobs+1): # k is a zero-based index that we will derive the other indexes from. k = num_archives_processed + job - 1 @@ -167,7 +167,7 @@ def train_new_models(dir, iter, srand, num_jobs, (" --write-cache={0}/cache.{1}".format(dir, iter + 1) if job == 1 else "")) - process_handle = common_lib.run_job( + thread = common_lib.background_command( """{command} {train_queue_opt} {dir}/log/train.{iter}.{job}.log \ nnet3-chain-train {parallel_train_opts} \ --apply-deriv-weights={app_deriv_wts} \ @@ -199,21 +199,14 @@ def train_new_models(dir, iter, srand, num_jobs, egs_dir=egs_dir, archive_index=archive_index, buf_size=shuffle_buffer_size, num_chunk_per_mb=num_chunk_per_minibatch_str), - wait=False) + require_zero_status=True) - processes.append(process_handle) + threads.append(thread) - all_success = True - for process in processes: - process.wait() - process.communicate() - if process.returncode != 0: - all_success = False - if not all_success: - open('{0}/.error'.format(dir), 'w').close() - raise Exception("There was error during training " - "iteration {0}".format(iter)) + for thread in threads: + thread.join() + def train_one_iteration(dir, iter, srand, egs_dir, @@ -226,8 +219,7 @@ def train_one_iteration(dir, iter, srand, egs_dir, leaky_hmm_coefficient, momentum, max_param_change, shuffle_buffer_size, frame_subsampling_factor, - run_opts, dropout_edit_string="", - background_process_handler=None): + run_opts, dropout_edit_string=""): """ Called from steps/nnet3/chain/train.py for one iteration for neural network training with LF-MMI objective @@ -259,13 +251,11 @@ def train_one_iteration(dir, iter, srand, egs_dir, compute_train_cv_probabilities( dir=dir, iter=iter, egs_dir=egs_dir, l2_regularize=l2_regularize, xent_regularize=xent_regularize, - leaky_hmm_coefficient=leaky_hmm_coefficient, run_opts=run_opts, - background_process_handler=background_process_handler) + leaky_hmm_coefficient=leaky_hmm_coefficient, run_opts=run_opts) if iter > 0: # Runs in the background - compute_progress(dir, iter, run_opts, - background_process_handler=background_process_handler) + compute_progress(dir, iter, run_opts) do_average = (iter > 0) @@ -378,7 +368,7 @@ def compute_preconditioning_matrix(dir, egs_dir, num_lda_jobs, run_opts, num_lda_jobs = max_lda_jobs # Write stats with the same format as stats for LDA. - common_lib.run_job( + common_lib.execute_command( """{command} JOB=1:{num_lda_jobs} {dir}/log/get_lda_stats.JOB.log \ nnet3-chain-acc-lda-stats --rand-prune={rand_prune} \ {dir}/init.raw "ark:{egs_dir}/cegs.JOB.ark" \ @@ -393,7 +383,7 @@ def compute_preconditioning_matrix(dir, egs_dir, num_lda_jobs, run_opts, lda_stat_files = map(lambda x: '{0}/{1}.lda_stats'.format(dir, x), range(1, num_lda_jobs + 1)) - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/sum_transform_stats.log \ sum-lda-accs {dir}/lda_stats {lda_stat_files}""".format( command=run_opts.command, @@ -409,7 +399,7 @@ def compute_preconditioning_matrix(dir, egs_dir, num_lda_jobs, run_opts, # in Appendix C.6 of http://arxiv.org/pdf/1410.7455v6.pdf; it's a scaled # variant of an LDA transform but without dimensionality reduction. - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/get_transform.log \ nnet-get-feature-transform {lda_opts} {dir}/lda.mat \ {dir}/lda_stats""".format( @@ -433,7 +423,7 @@ def prepare_initial_acoustic_model(dir, run_opts, srand=-1): # We ensure that they have the same mode (even if someone changed the # script to make one or both of them text mode) by copying them both # before concatenating them. - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/init_mdl.log \ nnet3-am-init {dir}/0.trans_mdl {dir}/0.raw \ {dir}/0.mdl""".format(command=run_opts.command, dir=dir)) @@ -441,11 +431,10 @@ def prepare_initial_acoustic_model(dir, run_opts, srand=-1): def compute_train_cv_probabilities(dir, iter, egs_dir, l2_regularize, xent_regularize, leaky_hmm_coefficient, - run_opts, wait=False, - background_process_handler=None): + run_opts): model = '{0}/{1}.mdl'.format(dir, iter) - common_lib.run_job( + common_lib.background_command( """{command} {dir}/log/compute_prob_valid.{iter}.log \ nnet3-chain-compute-prob --l2-regularize={l2} \ --leaky-hmm-coefficient={leaky} --xent-regularize={xent_reg} \ @@ -455,10 +444,9 @@ def compute_train_cv_probabilities(dir, iter, egs_dir, l2_regularize, """.format(command=run_opts.command, dir=dir, iter=iter, model=model, l2=l2_regularize, leaky=leaky_hmm_coefficient, xent_reg=xent_regularize, - egs_dir=egs_dir), wait=wait, - background_process_handler=background_process_handler) + egs_dir=egs_dir)) - common_lib.run_job( + common_lib.background_command( """{command} {dir}/log/compute_prob_train.{iter}.log \ nnet3-chain-compute-prob --l2-regularize={l2} \ --leaky-hmm-coefficient={leaky} --xent-regularize={xent_reg} \ @@ -468,17 +456,15 @@ def compute_train_cv_probabilities(dir, iter, egs_dir, l2_regularize, """.format(command=run_opts.command, dir=dir, iter=iter, model=model, l2=l2_regularize, leaky=leaky_hmm_coefficient, xent_reg=xent_regularize, - egs_dir=egs_dir), wait=wait, - background_process_handler=background_process_handler) + egs_dir=egs_dir)) -def compute_progress(dir, iter, run_opts, wait=False, - background_process_handler=None): +def compute_progress(dir, iter, run_opts): prev_model = '{0}/{1}.mdl'.format(dir, iter - 1) model = '{0}/{1}.mdl'.format(dir, iter) - common_lib.run_job( + common_lib.background_command( """{command} {dir}/log/progress.{iter}.log \ nnet3-am-info {model} '&&' \ nnet3-show-progress --use-gpu=no \ @@ -488,14 +474,11 @@ def compute_progress(dir, iter, run_opts, wait=False, dir=dir, iter=iter, model=model, - prev_model=prev_model), wait=wait, - background_process_handler=background_process_handler) - + prev_model=prev_model)) def combine_models(dir, num_iters, models_to_combine, num_chunk_per_minibatch_str, egs_dir, leaky_hmm_coefficient, l2_regularize, xent_regularize, run_opts, - background_process_handler=None, sum_to_one_penalty=0.0): """ Function to do model combination @@ -530,7 +513,7 @@ def combine_models(dir, num_iters, models_to_combine, num_chunk_per_minibatch_st # model. raw_model_strings = list(reversed(raw_model_strings)) - common_lib.run_job( + common_lib.execute_command( """{command} {combine_queue_opt} {dir}/log/combine.log \ nnet3-chain-combine --num-iters={opt_iters} \ --l2-regularize={l2} --leaky-hmm-coefficient={leaky} \ @@ -563,5 +546,4 @@ def combine_models(dir, num_iters, models_to_combine, num_chunk_per_minibatch_st dir=dir, iter='final', egs_dir=egs_dir, l2_regularize=l2_regularize, xent_regularize=xent_regularize, leaky_hmm_coefficient=leaky_hmm_coefficient, - run_opts=run_opts, wait=False, - background_process_handler=background_process_handler) + run_opts=run_opts) diff --git a/egs/wsj/s5/steps/libs/nnet3/train/common.py b/egs/wsj/s5/steps/libs/nnet3/train/common.py index 0ba332d01bf..6c5e0d6d834 100644 --- a/egs/wsj/s5/steps/libs/nnet3/train/common.py +++ b/egs/wsj/s5/steps/libs/nnet3/train/common.py @@ -99,7 +99,7 @@ def get_average_nnet_model(dir, iter, nnets_list, run_opts, out_model = "{dir}/{next_iter}.raw".format(dir=dir, next_iter=next_iter) - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/average.{iter}.log \ nnet3-average {nnets_list} \ {out_model}""".format(command=run_opts.command, @@ -128,7 +128,7 @@ def get_best_nnet_model(dir, iter, best_model_index, run_opts, out_model = "{dir}/{next_iter}.raw".format(dir=dir, next_iter=iter + 1) - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/select.{iter}.log \ nnet3-copy --scale={scale} {best_model} \ {out_model}""".format(command=run_opts.command, @@ -411,7 +411,7 @@ def compute_presoftmax_prior_scale(dir, alidir, num_jobs, run_opts, presoftmax_prior_scale_power=-0.25): # getting the raw pdf count - common_lib.run_job( + common_lib.execute_command( """{command} JOB=1:{num_jobs} {dir}/log/acc_pdf.JOB.log \ ali-to-post "ark:gunzip -c {alidir}/ali.JOB.gz|" ark:- \| \ post-to-tacc --per-pdf=true {alidir}/final.mdl ark:- \ @@ -420,7 +420,7 @@ def compute_presoftmax_prior_scale(dir, alidir, num_jobs, run_opts, dir=dir, alidir=alidir)) - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/sum_pdf_counts.log \ vector-sum --binary=false {dir}/pdf_counts.* {dir}/pdf_counts \ """.format(command=run_opts.command, dir=dir)) @@ -456,14 +456,14 @@ def smooth_presoftmax_prior_scale_vector(pdf_counts, def prepare_initial_network(dir, run_opts, srand=-3): if os.path.exists(dir+"/configs/init.config"): - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/add_first_layer.log \ nnet3-init --srand={srand} {dir}/init.raw \ {dir}/configs/final.config {dir}/0.raw""".format( command=run_opts.command, srand=srand, dir=dir)) else: - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/init_model.log \ nnet3-init --srand={srand} {dir}/configs/final.config {dir}/0.raw""".format( command=run_opts.command, srand=srand, @@ -537,12 +537,12 @@ def do_shrinkage(iter, model_file, shrink_saturation_threshold, return True if get_raw_nnet_from_am: - output, error = common_lib.run_kaldi_command( - "nnet3-am-info --print-args=false {0} | " + output = common_lib.get_command_stdout( + "nnet3-am-info {0} 2>/dev/null | " "steps/nnet3/get_saturation.pl".format(model_file)) else: - output, error = common_lib.run_kaldi_command( - "nnet3-info --print-args=false {0} | " + output = common_lib.get_command_stdout( + "nnet3-info 2>/dev/null {0} | " "steps/nnet3/get_saturation.pl".format(model_file)) output = output.strip().split("\n") try: @@ -558,8 +558,8 @@ def do_shrinkage(iter, model_file, shrink_saturation_threshold, def remove_nnet_egs(egs_dir): - common_lib.run_job("steps/nnet2/remove_egs.sh {egs_dir}".format( - egs_dir=egs_dir)) + common_lib.execute_command("steps/nnet2/remove_egs.sh {egs_dir}".format( + egs_dir=egs_dir)) def clean_nnet_dir(nnet_dir, num_iters, egs_dir, @@ -852,12 +852,6 @@ def __init__(self, iterations. If 0 and reporting mail has been specified then only failure notifications are sent""") - self.parser.add_argument("--background-polling-time", - dest="background_polling_time", - type=float, default=60, - help="""Polling frequency in seconds at which - the background process handler checks for - errors in the processes.""") if __name__ == '__main__': diff --git a/egs/wsj/s5/steps/libs/nnet3/train/frame_level_objf/acoustic_model.py b/egs/wsj/s5/steps/libs/nnet3/train/frame_level_objf/acoustic_model.py index 47265a19dba..99421bafdd1 100644 --- a/egs/wsj/s5/steps/libs/nnet3/train/frame_level_objf/acoustic_model.py +++ b/egs/wsj/s5/steps/libs/nnet3/train/frame_level_objf/acoustic_model.py @@ -32,7 +32,7 @@ def generate_egs(data, alidir, egs_dir, the model final.mdl and alignments. """ - common_lib.run_job( + common_lib.execute_command( """steps/nnet3/get_egs.sh {egs_opts} \ --cmd "{command}" \ --cmvn-opts "{cmvn_opts}" \ @@ -77,7 +77,7 @@ def prepare_initial_acoustic_model(dir, alidir, run_opts, srand=srand) # Convert to .mdl, train the transitions, set the priors. - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/init_mdl.log \ nnet3-am-init {alidir}/final.mdl {dir}/0.raw - \| \ nnet3-am-train-transitions - \ diff --git a/egs/wsj/s5/steps/libs/nnet3/train/frame_level_objf/common.py b/egs/wsj/s5/steps/libs/nnet3/train/frame_level_objf/common.py index cd34432a895..87ae9df7378 100644 --- a/egs/wsj/s5/steps/libs/nnet3/train/frame_level_objf/common.py +++ b/egs/wsj/s5/steps/libs/nnet3/train/frame_level_objf/common.py @@ -1,7 +1,7 @@ - -# Copyright 2016 Vijayaditya Peddinti. -# 2016 Vimal Manohar +# Copyright 2016 Vijayaditya Peddinti. +# 2016 Vimal Manohar +# 2017 Johns Hopkins University (author: Daniel Povey) # Apache 2.0. """ This is a module with methods which will be used by scripts for training of @@ -58,7 +58,7 @@ def train_new_models(dir, iter, srand, num_jobs, deriv_time_opts.append("--optimization.max-deriv-time-relative={0}".format( max_deriv_time_relative)) - processes = [] + threads = [] for job in range(1, num_jobs+1): # k is a zero-based index that we will derive the other indexes from. k = num_archives_processed + job - 1 @@ -82,7 +82,8 @@ def train_new_models(dir, iter, srand, num_jobs, else: image_augmentation_cmd = '' - process_handle = common_lib.run_job( + # note: the thread waits on that process's completion. + thread = common_lib.background_command( """{command} {train_queue_opt} {dir}/log/train.{iter}.{job}.log \ nnet3-train {parallel_train_opts} {cache_io_opts} \ --print-interval=10 \ @@ -113,21 +114,13 @@ def train_new_models(dir, iter, srand, num_jobs, egs_dir=egs_dir, archive_index=archive_index, shuffle_buffer_size=shuffle_buffer_size, minibatch_size_str=minibatch_size_str, - aug_cmd=image_augmentation_cmd), wait=False) - - processes.append(process_handle) + aug_cmd=image_augmentation_cmd), + require_zero_status=True) - all_success = True - for process in processes: - process.wait() - process.communicate() - if process.returncode != 0: - all_success = False + threads.append(thread) - if not all_success: - open('{0}/.error'.format(dir), 'w').close() - raise Exception("There was error during training " - "iteration {0}".format(iter)) + for thread in threads: + thread.join() def train_one_iteration(dir, iter, srand, egs_dir, @@ -138,8 +131,7 @@ def train_one_iteration(dir, iter, srand, egs_dir, frames_per_eg=-1, min_deriv_time=None, max_deriv_time_relative=None, shrinkage_value=1.0, dropout_edit_string="", - get_raw_nnet_from_am=True, - background_process_handler=None): + get_raw_nnet_from_am=True): """ Called from steps/nnet3/train_*.py scripts for one iteration of neural network training @@ -182,16 +174,13 @@ def train_one_iteration(dir, iter, srand, egs_dir, compute_train_cv_probabilities( dir=dir, iter=iter, egs_dir=egs_dir, run_opts=run_opts, - get_raw_nnet_from_am=get_raw_nnet_from_am, wait=False, - background_process_handler=background_process_handler) + get_raw_nnet_from_am=get_raw_nnet_from_am) if iter > 0: # Runs in the background compute_progress(dir=dir, iter=iter, egs_dir=egs_dir, run_opts=run_opts, - wait=False, - get_raw_nnet_from_am=get_raw_nnet_from_am, - background_process_handler=background_process_handler) + get_raw_nnet_from_am=get_raw_nnet_from_am) do_average = (iter > 0) @@ -217,10 +206,6 @@ def train_one_iteration(dir, iter, srand, egs_dir, # keep the update stable. cur_minibatch_size_str = common_train_lib.halve_minibatch_size_str(minibatch_size_str) cur_max_param_change = float(max_param_change) / math.sqrt(2) - try: - os.remove("{0}/.error".format(dir)) - except OSError: - pass shrink_info_str = '' if shrinkage_value != 1.0: @@ -298,7 +283,7 @@ def compute_preconditioning_matrix(dir, egs_dir, num_lda_jobs, run_opts, num_lda_jobs = max_lda_jobs # Write stats with the same format as stats for LDA. - common_lib.run_job( + common_lib.execute_command( """{command} JOB=1:{num_lda_jobs} {dir}/log/get_lda_stats.JOB.log \ nnet3-acc-lda-stats --rand-prune={rand_prune} \ {dir}/init.raw "ark:{egs_dir}/egs.JOB.ark" \ @@ -313,7 +298,7 @@ def compute_preconditioning_matrix(dir, egs_dir, num_lda_jobs, run_opts, lda_stat_files = map(lambda x: '{0}/{1}.lda_stats'.format(dir, x), range(1, num_lda_jobs + 1)) - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/sum_transform_stats.log \ sum-lda-accs {dir}/lda_stats {lda_stat_files}""".format( command=run_opts.command, @@ -330,7 +315,7 @@ def compute_preconditioning_matrix(dir, egs_dir, num_lda_jobs, run_opts, # in Appendix C.6 of http://arxiv.org/pdf/1410.7455v6.pdf; it's a scaled # variant of an LDA transform but without dimensionality reduction. - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/get_transform.log \ nnet-get-feature-transform {lda_opts} {dir}/lda.mat \ {dir}/lda_stats""".format( @@ -341,7 +326,6 @@ def compute_preconditioning_matrix(dir, egs_dir, num_lda_jobs, run_opts, def compute_train_cv_probabilities(dir, iter, egs_dir, run_opts, - wait=False, background_process_handler=None, get_raw_nnet_from_am=True): if get_raw_nnet_from_am: model = "nnet3-am-copy --raw=true {dir}/{iter}.mdl - |".format( @@ -350,7 +334,7 @@ def compute_train_cv_probabilities(dir, iter, egs_dir, run_opts, model = "{dir}/{iter}.raw".format(dir=dir, iter=iter) - common_lib.run_job( + common_lib.background_command( """ {command} {dir}/log/compute_prob_valid.{iter}.log \ nnet3-compute-prob "{model}" \ "ark,bg:nnet3-copy-egs \ @@ -360,10 +344,9 @@ def compute_train_cv_probabilities(dir, iter, egs_dir, run_opts, dir=dir, iter=iter, model=model, - egs_dir=egs_dir), - wait=wait, background_process_handler=background_process_handler) + egs_dir=egs_dir)) - common_lib.run_job( + common_lib.background_command( """{command} {dir}/log/compute_prob_train.{iter}.log \ nnet3-compute-prob "{model}" \ "ark,bg:nnet3-copy-egs \ @@ -373,12 +356,11 @@ def compute_train_cv_probabilities(dir, iter, egs_dir, run_opts, dir=dir, iter=iter, model=model, - egs_dir=egs_dir), - wait=wait, background_process_handler=background_process_handler) + egs_dir=egs_dir)) def compute_progress(dir, iter, egs_dir, - run_opts, background_process_handler=None, wait=False, + run_opts, get_raw_nnet_from_am=True): if get_raw_nnet_from_am: prev_model = "nnet3-am-copy --raw=true {0}/{1}.mdl - |".format( @@ -388,7 +370,7 @@ def compute_progress(dir, iter, egs_dir, prev_model = '{0}/{1}.raw'.format(dir, iter - 1) model = '{0}/{1}.raw'.format(dir, iter) - common_lib.run_job( + common_lib.background_command( """{command} {dir}/log/progress.{iter}.log \ nnet3-info "{model}" '&&' \ nnet3-show-progress --use-gpu=no "{prev_model}" "{model}" \ @@ -400,13 +382,12 @@ def compute_progress(dir, iter, egs_dir, iter=iter, model=model, prev_model=prev_model, - egs_dir=egs_dir), - wait=wait, background_process_handler=background_process_handler) + egs_dir=egs_dir)) def combine_models(dir, num_iters, models_to_combine, egs_dir, minibatch_size_str, - run_opts, background_process_handler=None, + run_opts, chunk_width=None, get_raw_nnet_from_am=True, sum_to_one_penalty=0.0): """ Function to do model combination @@ -449,7 +430,7 @@ def combine_models(dir, num_iters, models_to_combine, egs_dir, # model. raw_model_strings = list(reversed(raw_model_strings)) - common_lib.run_job( + common_lib.execute_command( """{command} {combine_queue_opt} {dir}/log/combine.log \ nnet3-combine --num-iters=80 \ --enforce-sum-to-one={hard_enforce} \ @@ -476,14 +457,11 @@ def combine_models(dir, num_iters, models_to_combine, egs_dir, if get_raw_nnet_from_am: compute_train_cv_probabilities( dir=dir, iter='combined', egs_dir=egs_dir, - run_opts=run_opts, wait=False, - background_process_handler=background_process_handler) + run_opts=run_opts) else: compute_train_cv_probabilities( dir=dir, iter='final', egs_dir=egs_dir, - run_opts=run_opts, wait=False, - background_process_handler=background_process_handler, - get_raw_nnet_from_am=False) + run_opts=run_opts, get_raw_nnet_from_am=False) def get_realign_iters(realign_times, num_iters, @@ -523,7 +501,7 @@ def align(dir, data, lang, run_opts, iter=None, transform_dir=None, logger.info("Aligning the data{gpu}with {num_jobs} jobs.".format( gpu=" using gpu " if run_opts.realign_use_gpu else " ", num_jobs=run_opts.realign_num_jobs)) - common_lib.run_job( + common_lib.execute_command( """steps/nnet3/align.sh --nj {num_jobs_align} \ --cmd "{align_cmd} {align_queue_opt}" \ --use-gpu {align_use_gpu} \ @@ -570,7 +548,7 @@ def realign(dir, iter, feat_dir, lang, prev_egs_dir, cur_egs_dir, alidir = align(dir, feat_dir, lang, run_opts, iter, transform_dir, online_ivector_dir) - common_lib.run_job( + common_lib.execute_command( """steps/nnet3/relabel_egs.sh --cmd "{command}" --iter {iter} \ {alidir} {prev_egs_dir} {cur_egs_dir}""".format( command=run_opts.command, @@ -583,7 +561,7 @@ def realign(dir, iter, feat_dir, lang, prev_egs_dir, cur_egs_dir, def adjust_am_priors(dir, input_model, avg_posterior_vector, output_model, run_opts): - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/adjust_priors.final.log \ nnet3-am-adjust-priors "{input_model}" {avg_posterior_vector} \ "{output_model}" """.format( @@ -612,7 +590,7 @@ def compute_average_posterior(dir, iter, egs_dir, num_archives, else: model = "{dir}/final.raw".format(dir=dir) - common_lib.run_job( + common_lib.execute_command( """{command} JOB=1:{num_jobs_compute_prior} {prior_queue_opt} \ {dir}/log/get_post.{iter}.JOB.log \ nnet3-copy-egs \ @@ -636,7 +614,7 @@ def compute_average_posterior(dir, iter, egs_dir, num_archives, # make sure there is time for $dir/post.{iter}.*.vec to appear. time.sleep(5) avg_post_vec_file = "{dir}/post.{iter}.vec".format(dir=dir, iter=iter) - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/vector_sum.{iter}.log \ vector-sum {dir}/post.{iter}.*.vec {output_file} """.format(command=run_opts.command, diff --git a/egs/wsj/s5/steps/libs/nnet3/train/frame_level_objf/raw_model.py b/egs/wsj/s5/steps/libs/nnet3/train/frame_level_objf/raw_model.py index 037abeb1dd8..270a0ddf499 100644 --- a/egs/wsj/s5/steps/libs/nnet3/train/frame_level_objf/raw_model.py +++ b/egs/wsj/s5/steps/libs/nnet3/train/frame_level_objf/raw_model.py @@ -48,7 +48,7 @@ def generate_egs_using_targets(data, targets_scp, egs_dir, raise Exception("--num-targets is required if " "target-type is sparse") - common_lib.run_job( + common_lib.execute_command( """steps/nnet3/get_egs_targets.sh {egs_opts} \ --cmd "{command}" \ --cmvn-opts "{cmvn_opts}" \ diff --git a/egs/wsj/s5/steps/nnet3/chain/train.py b/egs/wsj/s5/steps/nnet3/chain/train.py index 86459c2dbaf..65b09f9f1eb 100755 --- a/egs/wsj/s5/steps/nnet3/chain/train.py +++ b/egs/wsj/s5/steps/nnet3/chain/train.py @@ -245,7 +245,7 @@ def process_args(args): return [args, run_opts] -def train(args, run_opts, background_process_handler): +def train(args, run_opts): """ The main function for training. Args: @@ -310,7 +310,7 @@ def train(args, run_opts, background_process_handler): if (args.stage <= -4): logger.info("Initializing a basic network for estimating " "preconditioning matrix") - common_lib.run_kaldi_command( + common_lib.execute_command( """{command} {dir}/log/nnet_init.log \ nnet3-init --srand=-2 {dir}/configs/init.config \ {dir}/init.raw""".format(command=run_opts.command, @@ -467,8 +467,7 @@ def learning_rate(iter, current_num_jobs, num_archives_processed): max_param_change=args.max_param_change, shuffle_buffer_size=args.shuffle_buffer_size, frame_subsampling_factor=args.frame_subsampling_factor, - run_opts=run_opts, - background_process_handler=background_process_handler) + run_opts=run_opts) if args.cleanup: # do a clean up everythin but the last 2 models, under certain @@ -502,7 +501,6 @@ def learning_rate(iter, current_num_jobs, num_archives_processed): l2_regularize=args.l2_regularize, xent_regularize=args.xent_regularize, run_opts=run_opts, - background_process_handler=background_process_handler, sum_to_one_penalty=args.combine_sum_to_one_penalty) @@ -530,26 +528,25 @@ def learning_rate(iter, current_num_jobs, num_archives_processed): with open("{dir}/accuracy.report".format(dir=args.dir), "w") as f: f.write(report) - common_lib.run_kaldi_command("steps/info/nnet3_dir_info.pl " - "{0}".format(args.dir)) + common_lib.execute_command("steps/info/nnet3_dir_info.pl " + "{0}".format(args.dir)) def main(): [args, run_opts] = get_args() try: - background_process_handler = common_lib.BackgroundProcessHandler( - polling_time=args.background_polling_time) - train(args, run_opts, background_process_handler) - background_process_handler.ensure_processes_are_done() - except Exception as e: + train(args, run_opts) + common_lib.wait_for_background_commands() + except BaseException as e: + # look for BaseException so we catch KeyboardInterrupt, which is + # what we get when a background thread dies. if args.email is not None: message = ("Training session for experiment {dir} " "died due to an error.".format(dir=args.dir)) common_lib.send_mail(message, message, args.email) - traceback.print_exc() - background_process_handler.stop() - raise e - + if not isinstance(e, KeyboardInterrupt): + traceback.print_exc() + sys.exit(1) if __name__ == "__main__": main() diff --git a/egs/wsj/s5/steps/nnet3/report/generate_plots.py b/egs/wsj/s5/steps/nnet3/report/generate_plots.py index 6a652f9ec68..8ec283492ef 100755 --- a/egs/wsj/s5/steps/nnet3/report/generate_plots.py +++ b/egs/wsj/s5/steps/nnet3/report/generate_plots.py @@ -48,11 +48,12 @@ def get_args(): parser = argparse.ArgumentParser( description="""Parses the training logs and generates a variety of plots. - e.g. (deprecated): steps/nnet3/report/generate_plots.py \\ - --comparison-dir exp/nnet3/tdnn1 --comparison-dir exp/nnet3/tdnn2 \\ + e.g. (deprecated): steps/nnet3/report/generate_plots.py + --comparison-dir exp/nnet3/tdnn1 --comparison-dir exp/nnet3/tdnn2 exp/nnet3/tdnn exp/nnet3/tdnn/report - e.g. (current): steps/nnet3/report/generate_plots.py \\ - exp/nnet3/tdnn exp/nnet3/tdnn1 exp/nnet3/tdnn2 exp/nnet3/tdnn/report""") + or (current): steps/nnet3/report/generate_plots.py + exp/nnet3/tdnn exp/nnet3/tdnn1 exp/nnet3/tdnn2 exp/nnet3/tdnn/report. + Look for the report.pdf in the output (report) directory.""") parser.add_argument("--comparison-dir", type=str, action='append', help="other experiment directories for comparison. " @@ -134,7 +135,7 @@ def compile(self): lat_file.close() logger.info("Compiling the latex report.") try: - common_lib.run_kaldi_command( + common_lib.execute_command( "pdflatex -interaction=batchmode " "-output-directory={0} {1}".format(dir_name, latex_file)) except Exception as e: @@ -238,7 +239,7 @@ def plot_a_nonlin_component(fig, dirs, stat_tables_per_component_per_dir, index = 0 legend_handle = [extra, extra, extra, extra] legend_label = ["", '5th percentile', '50th percentile', '95th percentile'] - + for dir in dirs: color_val = g_plot_colors[index] index += 1 @@ -262,7 +263,7 @@ def plot_a_nonlin_component(fig, dirs, stat_tables_per_component_per_dir, linestyle='--') insert_a_column_legend(legend_handle, legend_label, lp, mp, hp, dir, prefix_length, index+1) - + ax.set_ylabel('Value-{0}'.format(component_type)) ax.grid(True) @@ -292,7 +293,7 @@ def plot_a_nonlin_component(fig, dirs, stat_tables_per_component_per_dir, # contains all the statistics in each component of each directory. # 3) The statistics of each component are stored into corresponding log files. # Each line of the log file contains the statistics of one iteration. -# 4) Plot the "Per-dimension average-(value, derivative) percentiles" figure +# 4) Plot the "Per-dimension average-(value, derivative) percentiles" figure # for each nonlinearity component. def generate_nonlin_stats_plots(exp_dir, output_dir, plot, comparison_dir=None, start_iter=1, latex_report=None): @@ -359,13 +360,13 @@ def generate_nonlin_stats_plots(exp_dir, output_dir, plot, comparison_dir=None, given experiment dirs are not the same, so comparison plots are provided only for common component names. Make sure that these are comparable experiments before analyzing these plots.""") - + fig = plt.figure() - + common_prefix = os.path.commonprefix(dirs) prefix_length = common_prefix.rfind('/') common_prefix = common_prefix[0:prefix_length] - + for component_name in main_component_names: if stats_per_dir[exp_dir][component_name]['type'] == 'LstmNonlinearity': for i in range(0,5): @@ -425,7 +426,7 @@ def generate_clipped_proportion_plots(exp_dir, output_dir, plot, " this might be because there are no " "ClipGradientComponents.".format(dir)) continue - if len(stats_per_dir[dir]) == 0: + if len(stats_per_dir[dir]) == 0: logger.warning("Couldn't find any rows for the" "clipped proportion plot, not generating it") try: diff --git a/egs/wsj/s5/steps/nnet3/train_dnn.py b/egs/wsj/s5/steps/nnet3/train_dnn.py index 2e838cfd9e8..7760bf85f4a 100755 --- a/egs/wsj/s5/steps/nnet3/train_dnn.py +++ b/egs/wsj/s5/steps/nnet3/train_dnn.py @@ -2,6 +2,7 @@ # Copyright 2016 Vijayaditya Peddinti. # 2016 Vimal Manohar +# 2017 Johns Hopkins University (author: Daniel Povey) # Apache 2.0. """ This script is based on steps/nnet3/tdnn/train.sh @@ -148,7 +149,7 @@ def process_args(args): return [args, run_opts] -def train(args, run_opts, background_process_handler): +def train(args, run_opts): """ The main function for training. Args: @@ -199,7 +200,7 @@ def train(args, run_opts, background_process_handler): if (args.stage <= -5) and os.path.exists(args.dir+"/configs/init.config"): logger.info("Initializing a basic network for estimating " "preconditioning matrix") - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/nnet_init.log \ nnet3-init --srand=-2 {dir}/configs/init.config \ {dir}/init.raw""".format(command=run_opts.command, @@ -322,8 +323,7 @@ def learning_rate(iter, current_num_jobs, num_archives_processed): momentum=args.momentum, max_param_change=args.max_param_change, shuffle_buffer_size=args.shuffle_buffer_size, - run_opts=run_opts, - background_process_handler=background_process_handler) + run_opts=run_opts) if args.cleanup: # do a clean up everythin but the last 2 models, under certain @@ -352,7 +352,6 @@ def learning_rate(iter, current_num_jobs, num_archives_processed): models_to_combine=models_to_combine, egs_dir=egs_dir, minibatch_size_str=args.minibatch_size, run_opts=run_opts, - background_process_handler=background_process_handler, sum_to_one_penalty=args.combine_sum_to_one_penalty) if args.stage <= num_iters + 1: @@ -393,25 +392,25 @@ def learning_rate(iter, current_num_jobs, num_archives_processed): with open("{dir}/accuracy.report".format(dir=args.dir), "w") as f: f.write(report) - common_lib.run_job("steps/info/nnet3_dir_info.pl " - "{0}".format(args.dir)) + common_lib.execute_command("steps/info/nnet3_dir_info.pl " + "{0}".format(args.dir)) def main(): [args, run_opts] = get_args() try: - background_process_handler = common_lib.BackgroundProcessHandler( - polling_time=args.background_polling_time) - train(args, run_opts, background_process_handler) - background_process_handler.ensure_processes_are_done() - except Exception as e: + train(args, run_opts) + common_lib.wait_for_background_commands() + except BaseException as e: + # look for BaseException so we catch KeyboardInterrupt, which is + # what we get when a background thread dies. if args.email is not None: message = ("Training session for experiment {dir} " "died due to an error.".format(dir=args.dir)) common_lib.send_mail(message, message, args.email) - traceback.print_exc() - background_process_handler.stop() - raise e + if not isinstance(e, KeyboardInterrupt): + traceback.print_exc() + sys.exit(1) if __name__ == "__main__": diff --git a/egs/wsj/s5/steps/nnet3/train_raw_dnn.py b/egs/wsj/s5/steps/nnet3/train_raw_dnn.py index 88885660158..5ff8b548b43 100755 --- a/egs/wsj/s5/steps/nnet3/train_raw_dnn.py +++ b/egs/wsj/s5/steps/nnet3/train_raw_dnn.py @@ -153,7 +153,7 @@ def process_args(args): return [args, run_opts] -def train(args, run_opts, background_process_handler): +def train(args, run_opts): """ The main function for training. Args: @@ -200,7 +200,7 @@ def train(args, run_opts, background_process_handler): # transform. if (args.stage <= -5) and os.path.exists(args.dir+"/configs/init.config"): logger.info("Initializing the network for computing the LDA stats") - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/nnet_init.log \ nnet3-init --srand=-2 {dir}/configs/init.config \ {dir}/init.raw""".format(command=run_opts.command, @@ -337,7 +337,6 @@ def learning_rate(iter, current_num_jobs, num_archives_processed): shuffle_buffer_size=args.shuffle_buffer_size, run_opts=run_opts, get_raw_nnet_from_am=False, - background_process_handler=background_process_handler, image_augmentation_opts=args.image_augmentation_opts) if args.cleanup: @@ -367,7 +366,6 @@ def learning_rate(iter, current_num_jobs, num_archives_processed): dir=args.dir, num_iters=num_iters, models_to_combine=models_to_combine, egs_dir=egs_dir, minibatch_size_str=args.minibatch_size, run_opts=run_opts, - background_process_handler=background_process_handler, get_raw_nnet_from_am=False, sum_to_one_penalty=args.combine_sum_to_one_penalty) @@ -404,25 +402,25 @@ def learning_rate(iter, current_num_jobs, num_archives_processed): with open("{dir}/accuracy.report".format(dir=args.dir), "w") as f: f.write(report) - common_lib.run_job("steps/info/nnet3_dir_info.pl " - "{0}".format(args.dir)) + common_lib.execute_command("steps/info/nnet3_dir_info.pl " + "{0}".format(args.dir)) def main(): [args, run_opts] = get_args() try: - background_process_handler = common_lib.BackgroundProcessHandler( - polling_time=args.background_polling_time) - train(args, run_opts, background_process_handler) - background_process_handler.ensure_processes_are_done() - except Exception as e: + train(args, run_opts) + common_lib.wait_for_background_commands() + except BaseException as e: + # look for BaseException so we catch KeyboardInterrupt, which is + # what we get when a background thread dies. if args.email is not None: message = ("Training session for experiment {dir} " "died due to an error.".format(dir=args.dir)) common_lib.send_mail(message, message, args.email) - traceback.print_exc() - background_process_handler.stop() - raise e + if not isinstance(e, KeyboardInterrupt): + traceback.print_exc() + sys.exit(1) if __name__ == "__main__": diff --git a/egs/wsj/s5/steps/nnet3/train_raw_rnn.py b/egs/wsj/s5/steps/nnet3/train_raw_rnn.py index 382ca069b9e..60d1c7fd5fe 100755 --- a/egs/wsj/s5/steps/nnet3/train_raw_rnn.py +++ b/egs/wsj/s5/steps/nnet3/train_raw_rnn.py @@ -3,6 +3,7 @@ # Copyright 2016 Vijayaditya Peddinti. # 2016 Vimal Manohar +# 2017 Johns Hopkins University (author: Daniel Povey) # Apache 2.0. """ This script is similar to steps/nnet3/train_rnn.py but trains a @@ -206,7 +207,7 @@ def process_args(args): return [args, run_opts] -def train(args, run_opts, background_process_handler): +def train(args, run_opts): """ The main function for training. Args: @@ -256,7 +257,7 @@ def train(args, run_opts, background_process_handler): if (args.stage <= -4) and os.path.exists(args.dir+"/configs/init.config"): logger.info("Initializing the network for computing the LDA stats") - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/nnet_init.log \ nnet3-init --srand=-2 {dir}/configs/init.config \ {dir}/init.raw""".format(command=run_opts.command, @@ -414,8 +415,7 @@ def learning_rate(iter, current_num_jobs, num_archives_processed): max_param_change=args.max_param_change, shuffle_buffer_size=args.shuffle_buffer_size, run_opts=run_opts, - get_raw_nnet_from_am=False, - background_process_handler=background_process_handler) + get_raw_nnet_from_am=False) if args.cleanup: # do a clean up everythin but the last 2 models, under certain @@ -445,7 +445,6 @@ def learning_rate(iter, current_num_jobs, num_archives_processed): models_to_combine=models_to_combine, egs_dir=egs_dir, minibatch_size_str=args.num_chunk_per_minibatch, run_opts=run_opts, chunk_width=args.chunk_width, - background_process_handler=background_process_handler, get_raw_nnet_from_am=False, sum_to_one_penalty=args.combine_sum_to_one_penalty) @@ -482,26 +481,25 @@ def learning_rate(iter, current_num_jobs, num_archives_processed): with open("{dir}/accuracy.report".format(dir=args.dir), "w") as f: f.write(report) - common_lib.run_job("steps/info/nnet3_dir_info.pl " - "{0}".format(args.dir)) + common_lib.execute_command("steps/info/nnet3_dir_info.pl " + "{0}".format(args.dir)) def main(): [args, run_opts] = get_args() try: - background_process_handler = common_lib.BackgroundProcessHandler( - polling_time=args.background_polling_time) - train(args, run_opts, background_process_handler) - background_process_handler.ensure_processes_are_done() - except Exception as e: + train(args, run_opts) + common_lib.wait_for_background_commands() + except BaseException as e: + # look for BaseException so we catch KeyboardInterrupt, which is + # what we get when a background thread dies. if args.email is not None: message = ("Training session for experiment {dir} " "died due to an error.".format(dir=args.dir)) common_lib.send_mail(message, message, args.email) - traceback.print_exc() - background_process_handler.stop() - raise e - + if not isinstance(e, KeyboardInterrupt): + traceback.print_exc() + sys.exit(1) if __name__ == "__main__": main() diff --git a/egs/wsj/s5/steps/nnet3/train_rnn.py b/egs/wsj/s5/steps/nnet3/train_rnn.py index 40d544834bd..e8c044d679a 100755 --- a/egs/wsj/s5/steps/nnet3/train_rnn.py +++ b/egs/wsj/s5/steps/nnet3/train_rnn.py @@ -205,7 +205,7 @@ def process_args(args): return [args, run_opts] -def train(args, run_opts, background_process_handler): +def train(args, run_opts): """ The main function for training. Args: @@ -259,7 +259,7 @@ def train(args, run_opts, background_process_handler): if (args.stage <= -5): logger.info("Initializing a basic network for estimating " "preconditioning matrix") - common_lib.run_job( + common_lib.execute_command( """{command} {dir}/log/nnet_init.log \ nnet3-init --srand=-2 {dir}/configs/init.config \ {dir}/init.raw""".format(command=run_opts.command, @@ -409,8 +409,7 @@ def learning_rate(iter, current_num_jobs, num_archives_processed): momentum=args.momentum, max_param_change=args.max_param_change, shuffle_buffer_size=args.shuffle_buffer_size, - run_opts=run_opts, - background_process_handler=background_process_handler) + run_opts=run_opts) if args.cleanup: # do a clean up everythin but the last 2 models, under certain @@ -439,7 +438,6 @@ def learning_rate(iter, current_num_jobs, num_archives_processed): models_to_combine=models_to_combine, egs_dir=egs_dir, run_opts=run_opts, minibatch_size_str=args.num_chunk_per_minibatch, - background_process_handler=background_process_handler, chunk_width=args.chunk_width, sum_to_one_penalty=args.combine_sum_to_one_penalty) @@ -481,25 +479,25 @@ def learning_rate(iter, current_num_jobs, num_archives_processed): with open("{dir}/accuracy.report".format(dir=args.dir), "w") as f: f.write(report) - common_lib.run_job("steps/info/nnet3_dir_info.pl " - "{0}".format(args.dir)) + common_lib.execute_command("steps/info/nnet3_dir_info.pl " + "{0}".format(args.dir)) def main(): [args, run_opts] = get_args() try: - background_process_handler = common_lib.BackgroundProcessHandler( - polling_time=args.background_polling_time) - train(args, run_opts, background_process_handler) - background_process_handler.ensure_processes_are_done() - except Exception as e: + train(args, run_opts) + common_lib.wait_for_background_commands() + except BaseException as e: + # look for BaseException so we catch KeyboardInterrupt, which is + # what we get when a background thread dies. if args.email is not None: message = ("Training session for experiment {dir} " "died due to an error.".format(dir=args.dir)) common_lib.send_mail(message, message, args.email) - traceback.print_exc() - background_process_handler.stop() - raise e + if not isinstance(e, KeyboardInterrupt): + traceback.print_exc() + sys.exit(1) if __name__ == "__main__": diff --git a/egs/wsj/s5/steps/nnet3/xconfig_to_configs.py b/egs/wsj/s5/steps/nnet3/xconfig_to_configs.py index 63476347c12..c157beba188 100755 --- a/egs/wsj/s5/steps/nnet3/xconfig_to_configs.py +++ b/egs/wsj/s5/steps/nnet3/xconfig_to_configs.py @@ -212,10 +212,10 @@ def write_config_files(config_dir, all_layers): def add_nnet_context_info(config_dir): """This will be removed when python script refactoring is done.""" - common_lib.run_kaldi_command("nnet3-init {0}/ref.config " - "{0}/ref.raw".format(config_dir)) - out, err = common_lib.run_kaldi_command("nnet3-info {0}/ref.raw | " - "head -4".format(config_dir)) + common_lib.execute_command("nnet3-init {0}/ref.config " + "{0}/ref.raw".format(config_dir)) + out = common_lib.get_command_stdout("nnet3-info {0}/ref.raw | " + "head -4".format(config_dir)) # out looks like this # left-context: 7 # right-context: 0 @@ -241,10 +241,10 @@ def check_model_contexts(config_dir): for file_name in ['init', 'ref']: if os.path.exists('{0}/{1}.config'.format(config_dir, file_name)): contexts[file_name] = {} - common_lib.run_kaldi_command("nnet3-init {0}/{1}.config " - "{0}/{1}.raw".format(config_dir, file_name)) - out, err = common_lib.run_kaldi_command("nnet3-info {0}/{1}.raw | " - "head -4".format(config_dir, file_name)) + common_lib.execute_command("nnet3-init {0}/{1}.config " + "{0}/{1}.raw".format(config_dir, file_name)) + out = common_lib.get_command_stdout("nnet3-info {0}/{1}.raw | " + "head -4".format(config_dir, file_name)) # out looks like this # left-context: 7 # right-context: 0