From 16081b8ed1969e60c819f9d03e56def73097a543 Mon Sep 17 00:00:00 2001 From: Xylar Asay-Davis Date: Fri, 22 Sep 2017 13:05:34 -0600 Subject: [PATCH] Switch to using multiprocessing instead of subprocess In order to support better data availability between tasks (e.g. if tasks depend on data from the setup of other tasks), tasks are now subclasses of multiprocess.Process. This work is needed as a precursor to adding support for subtasks (e.g. for producing cliamtologies, or for generating each plot from a task). Logging is also altered so that each task has a logger, to which info, errors and warnings should be logged. Because multiprocess.Process already uses a method called run(), the run() method of AnalysisTask and its dervied classes had to be renamed to run_task(). Job scripts have been updated to run on 1 node. The default ncclim mode is background (12 threads) The path to @czender's NCO installation has been removed from the edison job script. It is no longer needed for the latest NCO and MPAS-Analysis, which includes a flag to ignore @czender's path on LCF machines. The LANL jobscript has been updated to have 1 Open MP thread instead of 2. This is needed (as on NERSC) to prevent NCO problems when running with the default 2 threads. (Not sure why.) --- configs/edison/job_script.edison.bash | 16 +- configs/job_script.default.bash | 11 +- configs/lanl/job_script.lanl.bash | 17 +- configs/olcf/job_script.olcf.bash | 15 +- docs/api.rst | 3 +- mpas_analysis/analysis_task_template.py | 43 ++-- mpas_analysis/ocean/climatology_map.py | 41 ++-- mpas_analysis/ocean/index_nino34.py | 30 +-- .../ocean/meridional_heat_transport.py | 26 ++- mpas_analysis/ocean/streamfunction_moc.py | 69 +++--- mpas_analysis/ocean/time_series_ohc.py | 34 +-- mpas_analysis/ocean/time_series_sst.py | 28 +-- mpas_analysis/sea_ice/climatology_map.py | 42 ++-- mpas_analysis/sea_ice/time_series.py | 25 ++- mpas_analysis/shared/__init__.py | 1 + mpas_analysis/shared/analysis_task.py | 207 ++++++++++++++++-- .../shared/climatology/climatology.py | 56 +++-- .../shared/interpolation/remapper.py | 80 +++++-- .../shared/time_series/time_series.py | 24 +- mpas_analysis/test/test_analysis_task.py | 4 +- run_mpas_analysis | 204 ++++------------- 21 files changed, 584 insertions(+), 392 deletions(-) diff --git a/configs/edison/job_script.edison.bash b/configs/edison/job_script.edison.bash index 874aeb329..300731b0b 100644 --- a/configs/edison/job_script.edison.bash +++ b/configs/edison/job_script.edison.bash @@ -8,7 +8,7 @@ ##SBATCH --partition=debug # change number of nodes to change the number of parallel tasks # (anything between 1 and the total number of tasks to run) -#SBATCH --nodes=10 +#SBATCH --nodes=1 #SBATCH --time=1:00:00 #SBATCH --account=acme #SBATCH --job-name=mpas_analysis @@ -23,7 +23,6 @@ export OMP_NUM_THREADS=1 module unload python python/base module use /global/project/projectdirs/acme/software/modulefiles/all module load python/anaconda-2.7-acme -export PATH=/global/homes/z/zender/bin_${NERSC_HOST}:${PATH} # MPAS/ACME job to be analyzed, including paths to simulation data and # observations. Change this name and path as needed @@ -34,7 +33,9 @@ command_prefix="srun -N 1 -n 1" # containing run_mpas_analysis mpas_analysis_dir="." # one parallel task per node by default -parallel_task_count=$SLURM_JOB_NUM_NODES +parallel_task_count=12 +# ncclimo can run with 1 (serial) or 12 (bck) threads +ncclimo_mode=bck if [ ! -f $run_config_file ]; then echo "File $run_config_file not found!" @@ -58,12 +59,13 @@ cat < $job_config_file # the number of parallel tasks (1 means tasks run in serial, the default) parallelTaskCount = $parallel_task_count -# Prefix on the commnd line before a parallel task (e.g. 'srun -n 1 python') -# Default is no prefix (run_mpas_analysis is executed directly) -commandPrefix = $command_prefix +# the parallelism mode in ncclimo ("serial" or "bck") +# Set this to "bck" (background parallelism) if running on a machine that can +# handle 12 simultaneous processes, one for each monthly climatology. +ncclimoParallelMode = $ncclimo_mode EOF -$mpas_analysis_dir/run_mpas_analysis $run_config_file \ +$command_prefix $mpas_analysis_dir/run_mpas_analysis $run_config_file \ $job_config_file diff --git a/configs/job_script.default.bash b/configs/job_script.default.bash index 544a0518a..565d423ab 100755 --- a/configs/job_script.default.bash +++ b/configs/job_script.default.bash @@ -11,6 +11,8 @@ mpas_analysis_dir="." # the number of parallel tasks (anything between 1 and the total number # of tasks to run) parallel_task_count=8 +# ncclimo can run with 1 (serial) or 12 (bck) threads +ncclimo_mode=bck if [ ! -f $run_config_file ]; then echo "File $run_config_file not found!" @@ -33,13 +35,14 @@ cat < $job_config_file # the number of parallel tasks (1 means tasks run in serial, the default) parallelTaskCount = $parallel_task_count -# Prefix on the commnd line before a parallel task (e.g. 'srun -n 1 python') -# Default is no prefix (run_mpas_analysis is executed directly) -commandPrefix = $command_prefix +# the parallelism mode in ncclimo ("serial" or "bck") +# Set this to "bck" (background parallelism) if running on a machine that can +# handle 12 simultaneous processes, one for each monthly climatology. +ncclimoParallelMode = $ncclimo_mode EOF -$mpas_analysis_dir/run_mpas_analysis $run_config_file \ +$command_prefix $mpas_analysis_dir/run_mpas_analysis $run_config_file \ $job_config_file # commend this out if you want to keep the config file, e.g. for debugging diff --git a/configs/lanl/job_script.lanl.bash b/configs/lanl/job_script.lanl.bash index cc6e07927..428522253 100644 --- a/configs/lanl/job_script.lanl.bash +++ b/configs/lanl/job_script.lanl.bash @@ -2,7 +2,7 @@ # change number of nodes to change the number of parallel tasks # (anything between 1 and the total number of tasks to run) -#SBATCH --nodes=10 +#SBATCH --nodes=1 #SBATCH --time=1:00:00 #SBATCH --account=climateacme #SBATCH --job-name=mpas_analysis @@ -12,6 +12,8 @@ cd $SLURM_SUBMIT_DIR # optional, since this is the default behavior +export OMP_NUM_THREADS=1 + module unload python module use /usr/projects/climate/SHARED_CLIMATE/modulefiles/all/ module load python/anaconda-2.7-climate @@ -25,7 +27,9 @@ command_prefix="" # containing run_mpas_analysis mpas_analysis_dir="." # one parallel task per node by default -parallel_task_count=$SLURM_JOB_NUM_NODES +parallel_task_count=12 +# ncclimo can run with 1 (serial) or 12 (bck) threads +ncclimo_mode=bck if [ ! -f $run_config_file ]; then echo "File $run_config_file not found!" @@ -49,12 +53,13 @@ cat < $job_config_file # the number of parallel tasks (1 means tasks run in serial, the default) parallelTaskCount = $parallel_task_count -# Prefix on the commnd line before a parallel task (e.g. 'srun -n 1 python') -# Default is no prefix (run_mpas_analysis is executed directly) -commandPrefix = $command_prefix +# the parallelism mode in ncclimo ("serial" or "bck") +# Set this to "bck" (background parallelism) if running on a machine that can +# handle 12 simultaneous processes, one for each monthly climatology. +ncclimoParallelMode = $ncclimo_mode EOF -$mpas_analysis_dir/run_mpas_analysis $run_config_file \ +$command_prefix $mpas_analysis_dir/run_mpas_analysis $run_config_file \ $job_config_file diff --git a/configs/olcf/job_script.olcf.bash b/configs/olcf/job_script.olcf.bash index 575df9a43..a3aed1262 100644 --- a/configs/olcf/job_script.olcf.bash +++ b/configs/olcf/job_script.olcf.bash @@ -6,7 +6,7 @@ ##PBS -q debug # change number of nodes to change the number of parallel tasks # (anything between 1 and the total number of tasks to run) -#PBS -l nodes=10 +#PBS -l nodes=1 #PBS -l walltime=1:00:00 #PBS -A cli115 #PBS -N mpas_analysis @@ -28,7 +28,9 @@ command_prefix="aprun -b -N 1 -n 1" # containing run_mpas_analysis mpas_analysis_dir="." # one parallel task per node by default -parallel_task_count=$PBS_NUM_NODES +parallel_task_count=12 +# ncclimo can run with 1 (serial) or 12 (bck) threads +ncclimo_mode=bck if [ ! -f $run_config_file ]; then echo "File $run_config_file not found!" @@ -52,12 +54,13 @@ cat < $job_config_file # the number of parallel tasks (1 means tasks run in serial, the default) parallelTaskCount = $parallel_task_count -# Prefix on the commnd line before a parallel task (e.g. 'srun -n 1 python') -# Default is no prefix (run_mpas_analysis is executed directly) -commandPrefix = $command_prefix +# the parallelism mode in ncclimo ("serial" or "bck") +# Set this to "bck" (background parallelism) if running on a machine that can +# handle 12 simultaneous processes, one for each monthly climatology. +ncclimoParallelMode = $ncclimo_mode EOF -$mpas_analysis_dir/run_mpas_analysis $run_config_file \ +$command_prefix $mpas_analysis_dir/run_mpas_analysis $run_config_file \ $job_config_file diff --git a/docs/api.rst b/docs/api.rst index 57347458d..3623557ec 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -13,9 +13,7 @@ Top-level script: run_mpas_analysis run_mpas_analysis.update_generate run_mpas_analysis.run_parallel_tasks - run_mpas_analysis.launch_tasks run_mpas_analysis.wait_for_task - run_mpas_analysis.is_running run_mpas_analysis.build_analysis_list run_mpas_analysis.determine_analyses_to_generate run_mpas_analysis.run_analysis @@ -34,6 +32,7 @@ Base Class AnalysisTask AnalysisTask.setup_and_check + AnalysisTask.run_analysis AnalysisTask.run AnalysisTask.check_generate AnalysisTask.check_analysis_enabled diff --git a/mpas_analysis/analysis_task_template.py b/mpas_analysis/analysis_task_template.py index 689c0256f..17dd3bb4a 100644 --- a/mpas_analysis/analysis_task_template.py +++ b/mpas_analysis/analysis_task_template.py @@ -19,7 +19,7 @@ 5. add new analysis task to run_mpas_analysis under build_analysis_list: analyses.append(.MyTask(config, myArg='argValue')) This will add a new object of the MyTask class to a list of analysis tasks - created in build_analysis_list. Later on in run_analysis, it will first + created in build_analysis_list. Later on in run_task, it will first go through the list to make sure each task needs to be generated (by calling check_generate, which is defined in AnalysisTask), then, will call setup_and_check on each task (to make sure the appropriate AM is on @@ -220,7 +220,7 @@ def setup_and_check(self): # {{{ self.endDate)) # For climatologies, update the start and end year based on the files - # that are actually available + # that are actually available # If not analyzing climatologies, delete this line changed, self.startYear, self.endYear, self.startDate, self.endDate = \ update_climatology_bounds_from_file_names(self.inputFiles, @@ -234,19 +234,19 @@ def setup_and_check(self): # {{{ # images should appear on the webpage. # Note: because of the way parallel tasks are handled in MPAS-Analysis, - # we can't be sure that run() will be called (it might be launched - # as a completely separate process) so it is not safe to store a list - # of xml files from within run(). The recommended procedure is to - # create a list of XML files here during setup_and_check() and possibly - # use them during run() + # we can't be sure that run_task() will be called (it might be + # launched as a completely separate process) so it is not safe to store + # a list of xml files from within run_task(). The recommended + # procedure is to create a list of XML files here during + # setup_and_check() and possibly use them during run_task() self.xmlFileNames = [] - + # we also show how to store file prefixes for later use in creating # plots self.filePrefixes = {} - # plotParameters is a list of parameters, a stand-ins for whatever + # plotParameters is a list of parameters, a stand-ins for whatever # you might want to include in each plot name, for example, seasons or # types of observation. self.plotParameters = self.config.getExpression(self.taskName, @@ -261,10 +261,9 @@ def setup_and_check(self): # {{{ filePrefix)) self.filePrefixes[plotParameter] = filePrefix - # }}} - def run(self): # {{{ + def run_task(self): # {{{ ''' The main method of the task that performs the analysis task. @@ -275,9 +274,9 @@ def run(self): # {{{ # Add the main contents of the analysis task below - # No need to call AnalysisTask.run() because it doesn't do anything, - # so we don't call super(MyTask, self).run(), as we do for other - # methods above. + # No need to call AnalysisTask.run_task() because it doesn't do + # anything, so we don't call super(MyTask, self).run_task(), as we + # do for other methods above. # Here is an example of a call to a local helper method (function), # one for each of our plotParameters (e.g. seasons) @@ -296,15 +295,15 @@ def run(self): # {{{ def _make_plot(self, plotParameter, optionalArgument=None): # {{{ ''' Make a simple plot - + Parameters ---------- plotParameter : str The name of a parameter that is specific to this plot - + optionalArgument : , optional An optional argument - + ''' @@ -320,20 +319,20 @@ def _make_plot(self, plotParameter, optionalArgument=None): # {{{ # get the file name based on the plot parameter filePrefix = self.filePrefixes[plotParameter] outFileName = '{}/{}.png'.format(self.plotsDirectory, filePrefix) - + # make the plot x = numpy.linspace(0, 1, 1000) plt.plot(x, x**2) # save the plot to the output file plt.savefig(outFileName) - + # here's an example of how you would create an XML file for this plot # with the appropriate entries. Some notes: # * Gallery groups typically represent all the analysis from a task, # or sometimes from multiple tasks # * A gallery might be for just for one set of observations, one # season, etc., depending on what makes sense - # * Within each gallery, there is one plot for each value in + # * Within each gallery, there is one plot for each value in # 'plotParameters', with a corresponding caption and short thumbnail # description caption = 'Plot of x^2 with plotParamter: {}'.format(plotParameter) @@ -350,8 +349,8 @@ def _make_plot(self, plotParameter, optionalArgument=None): # {{{ imageDescription=caption, imageCaption=caption) - - # + + # # }}} # }}} diff --git a/mpas_analysis/ocean/climatology_map.py b/mpas_analysis/ocean/climatology_map.py index 7df02d9d3..e0eb148d0 100644 --- a/mpas_analysis/ocean/climatology_map.py +++ b/mpas_analysis/ocean/climatology_map.py @@ -13,13 +13,13 @@ import numpy as np import os -from ..shared.analysis_task import AnalysisTask +from ..shared import AnalysisTask from ..shared.plot.plotting import plot_global_comparison, \ setup_colormap from ..shared.constants import constants -from ..shared.io.utility import build_config_full_path +from ..shared.io.utility import build_config_full_path, make_directories from ..shared.io import write_netcdf from ..shared.html import write_image_xml @@ -105,7 +105,8 @@ def setup_and_check(self): # {{{ config=config, sourceDescriptor=mpasDescriptor, comparisonDescriptor=comparisonDescriptor, mappingFilePrefix=mappingFilePrefix, - method=config.get('climatology', 'mpasInterpolationMethod')) + method=config.get('climatology', 'mpasInterpolationMethod'), + logger=self.logger) obsDescriptor = LatLonGridDescriptor.read(fileName=self.obsFileName, latVarName='lat', @@ -127,7 +128,8 @@ def setup_and_check(self): # {{{ comparisonDescriptor=comparisonDescriptor, mappingFilePrefix='map_obs_{}'.format(fieldName), method=config.get('oceanObservations', - 'interpolationMethod')) + 'interpolationMethod'), + logger=self.logger) self.xmlFileNames = [] self.filePrefixes = {} @@ -140,9 +142,16 @@ def setup_and_check(self): # {{{ filePrefix)) self.filePrefixes[season] = filePrefix + # make the mapping directory, because doing so within each process + # seems to be giving ESMF_RegridWeightGen some trouble + mappingSubdirectory = \ + build_config_full_path(self.config, 'output', + 'mappingSubdirectory') + make_directories(mappingSubdirectory) + # }}} - def run(self): # {{{ + def run_task(self): # {{{ """ Plots a comparison of ACME/MPAS output to SST, MLD or SSS observations @@ -151,17 +160,17 @@ def run(self): # {{{ Luke Van Roekel, Xylar Asay-Davis, Milena Veneziani """ - print "\nPlotting 2-d maps of {} climatologies...".format( - self.fieldNameInTitle) + self.logger.info("\nPlotting 2-d maps of {} climatologies...".format( + self.fieldNameInTitle)) # get local versions of member variables for convenience config = self.config fieldName = self.fieldName - print '\n Reading files:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFiles[0]), - os.path.basename(self.inputFiles[-1])) + self.logger.info('\n Reading files:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFiles[0]), + os.path.basename(self.inputFiles[-1]))) mainRunName = config.get('runs', 'mainRunName') @@ -213,7 +222,8 @@ def run(self): # {{{ variableList=[self.mpasFieldName], modelName=modelName, seasons=outputTimes, - decemberMode='sdd') + decemberMode='sdd', + logger=self.logger) dsObs = None @@ -254,10 +264,12 @@ def run(self): # {{{ write_netcdf(climatology, maskedClimatologyFileName) if not os.path.exists(remappedFileName): + self.mpasRemapper.remap_file( inFileName=maskedClimatologyFileName, outFileName=remappedFileName, - overwrite=True) + overwrite=True, + logger=self.logger) remappedClimatology = xr.open_dataset(remappedFileName) @@ -292,7 +304,8 @@ def run(self): # {{{ remappedClimatology = \ remap_and_write_climatology( config, seasonalClimatology, climatologyFileName, - remappedFileName, self.obsRemapper) + remappedFileName, self.obsRemapper, + logger=self.logger) else: diff --git a/mpas_analysis/ocean/index_nino34.py b/mpas_analysis/ocean/index_nino34.py index 8d3987f69..321119cf4 100644 --- a/mpas_analysis/ocean/index_nino34.py +++ b/mpas_analysis/ocean/index_nino34.py @@ -17,7 +17,7 @@ from ..shared.plot.plotting import plot_xtick_format, plot_size_y_axis -from ..shared.analysis_task import AnalysisTask +from ..shared import AnalysisTask from ..shared.html import write_image_xml @@ -94,7 +94,7 @@ def setup_and_check(self): # {{{ # }}} - def run(self): # {{{ + def run_task(self): # {{{ ''' Computes NINO34 index and plots the time series and power spectrum with 95 and 99% confidence bounds @@ -104,9 +104,10 @@ def run(self): # {{{ Luke Van Roekel, Xylar Asay-Davis ''' - print "\nPlotting Nino3.4 time series and power spectrum...." + self.logger.info("\nPlotting Nino3.4 time series and power " + "spectrum....") - print ' Load SST data...' + self.logger.info(' Load SST data...') fieldName = 'nino' simulationStartTime = get_simulation_start_time(self.runStreams) @@ -127,10 +128,10 @@ def run(self): # {{{ dataPath = "{}/ERS_SSTv4_nino34.nc".format(observationsDirectory) obsTitle = 'ERS SSTv4' - print '\n Reading files:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFiles[0]), - os.path.basename(self.inputFiles[-1])) + self.logger.info('\n Reading files:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFiles[0]), + os.path.basename(self.inputFiles[-1]))) mainRunName = config.get('runs', 'mainRunName') # regionIndex should correspond to NINO34 in surface weighted Average @@ -155,14 +156,14 @@ def run(self): # {{{ dsObs = xr.open_dataset(dataPath) nino34Obs = dsObs.sst - print ' Compute NINO3.4 index...' + self.logger.info(' Compute NINO3.4 index...') regionSST = ds[varName].isel(nOceanRegions=regionIndex) nino34 = self._compute_nino34_index(regionSST, calendar) # Compute the observational index over the entire time range # nino34Obs = compute_nino34_index(dsObs.sst, calendar) - print ' Computing NINO3.4 power spectra...' + self.logger.info(' Computing NINO3.4 power spectra...') f, spectra, conf99, conf95, redNoise = \ self._compute_nino34_spectra(nino34) @@ -183,7 +184,7 @@ def run(self): # {{{ fObs = 1.0 / (constants.eps + fObs*constants.sec_per_year) f30 = 1.0 / (constants.eps + f30*constants.sec_per_year) - print ' Plot NINO3.4 index and spectra...' + self.logger.info(' Plot NINO3.4 index and spectra...') figureName = '{}/NINO34_{}.png'.format(self.plotsDirectory, mainRunName) @@ -282,7 +283,6 @@ def _compute_nino34_spectra(self, nino34Index): # {{{ nino34Index power spectra that has been smoothed with a modified Daniell window (https://www.ncl.ucar.edu/Document/Functions/Built-in/specx_anal.shtml) - f : numpy.array array of frequencies corresponding to the center of the spectral bins resulting from the analysis @@ -493,7 +493,7 @@ def _nino34_spectra_plot(self, config, f, ninoSpectra, if title is not None: fig.suptitle(title, y=0.92, **title_font) - ax1 = plt.subplot(3, 1, 1) + plt.subplot(3, 1, 1) plt.plot(fObs[2:-3], ninoObs[2:-3], 'k', linewidth=linewidths) plt.plot(fObs[2:-3], redNoiseObs[2:-3], 'r', linewidth=linewidths) @@ -519,7 +519,7 @@ def _nino34_spectra_plot(self, config, f, ninoSpectra, if ylabel is not None: plt.ylabel(ylabel, **axis_font) - ax2 = plt.subplot(3, 1, 2) + plt.subplot(3, 1, 2) plt.plot(f30[2:-3], nino30yr[2:-3], 'k', linewidth=linewidths) plt.plot(f30[2:-3], redNoise30[2:-3], 'r', linewidth=linewidths) @@ -539,7 +539,7 @@ def _nino34_spectra_plot(self, config, f, ninoSpectra, if ylabel is not None: plt.ylabel(ylabel, **axis_font) - ax3 = plt.subplot(3, 1, 3) + plt.subplot(3, 1, 3) plt.plot(f[2:-3], ninoSpectra[2:-3], 'k', linewidth=linewidths) plt.plot(f[2:-3], redNoiseSpectra[2:-3], 'r', linewidth=linewidths) plt.plot(f[2:-3], confidence95[2:-3], 'b', linewidth=linewidths) diff --git a/mpas_analysis/ocean/meridional_heat_transport.py b/mpas_analysis/ocean/meridional_heat_transport.py index 499a722a4..aeeb7347e 100644 --- a/mpas_analysis/ocean/meridional_heat_transport.py +++ b/mpas_analysis/ocean/meridional_heat_transport.py @@ -16,7 +16,7 @@ compute_climatologies_with_ncclimo, \ get_ncclimo_season_file_name -from ..shared.analysis_task import AnalysisTask +from ..shared import AnalysisTask from ..shared.html import write_image_xml @@ -150,7 +150,7 @@ def setup_and_check(self): # {{{ # }}} - def run(self): # {{{ + def run_task(self): # {{{ """ Process MHT analysis member data if available. Plots MHT as: @@ -161,7 +161,7 @@ def run(self): # {{{ ------- Mark Petersen, Milena Veneziani, Xylar Asay-Davis """ - print "\nPlotting meridional heat transport (MHT)..." + self.logger.info("\nPlotting meridional heat transport (MHT)...") config = self.config @@ -174,7 +174,7 @@ def run(self): # {{{ # Depth is from refZMid, also in # mpaso.hist.am.meridionalHeatTransport.*.nc - print ' Read in depth and latitude...' + self.logger.info(' Read in depth and latitude...') ncFile = netCDF4.Dataset(self.mhtFile, mode='r') # reference depth [m] refZMid = ncFile.variables['refZMid'][:] @@ -201,19 +201,20 @@ def run(self): # {{{ variableList = ['timeMonthly_avg_meridionalHeatTransportLat', 'timeMonthly_avg_meridionalHeatTransportLatZ'] - print '\n Compute and plot global meridional heat transport' + self.logger.info('\n Compute and plot global meridional heat ' + 'transport') outputRoot = build_config_full_path(config, 'output', 'mpasClimatologySubdirectory') outputDirectory = '{}/mht'.format(outputRoot) - print '\n List of files for climatologies:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFiles[0]), - os.path.basename(self.inputFiles[-1])) + self.logger.info('\n List of files for climatologies:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFiles[0]), + os.path.basename(self.inputFiles[-1]))) - print ' Load data...' + self.logger.info(' Load data...') climatologyFileName = get_ncclimo_season_file_name(outputDirectory, 'mpaso', 'ANN', @@ -233,7 +234,8 @@ def run(self): # {{{ variableList=variableList, modelName='mpaso', seasons=['ANN'], - decemberMode='sdd') + decemberMode='sdd', + logger=self.logger) annualClimatology = xr.open_dataset(climatologyFileName) annualClimatology = annualClimatology.isel(Time=0) @@ -245,7 +247,7 @@ def run(self): # {{{ depthLimGlobal = config.getExpression(self.sectionName, 'depthLimGlobal') - print ' Plot global MHT...' + self.logger.info(' Plot global MHT...') # Plot 1D MHT (zonally averaged, depth integrated) x = binBoundaryMerHeatTrans y = annualClimatology.timeMonthly_avg_meridionalHeatTransportLat diff --git a/mpas_analysis/ocean/streamfunction_moc.py b/mpas_analysis/ocean/streamfunction_moc.py index 3e6aeaa55..d4c409d7a 100644 --- a/mpas_analysis/ocean/streamfunction_moc.py +++ b/mpas_analysis/ocean/streamfunction_moc.py @@ -22,7 +22,7 @@ compute_climatologies_with_ncclimo, \ get_ncclimo_season_file_name -from ..shared.analysis_task import AnalysisTask +from ..shared import AnalysisTask from ..shared.time_series import cache_time_series from ..shared.html import write_image_xml @@ -166,7 +166,7 @@ def setup_and_check(self): # {{{ # }}} - def run(self): # {{{ + def run_task(self): # {{{ ''' Process MOC analysis member data if available, or compute MOC at post-processing if not. Plots streamfunction climatolgoical sections @@ -178,18 +178,18 @@ def run(self): # {{{ Milena Veneziani, Mark Petersen, Phillip J. Wolfram, Xylar Asay-Davis ''' - print "\nPlotting streamfunction of Meridional Overturning " \ - "Circulation (MOC)..." + self.logger.info("\nPlotting streamfunction of Meridional Overturning " + "Circulation (MOC)...") - print '\n List of files for climatologies:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFilesClimo[0]), - os.path.basename(self.inputFilesClimo[-1])) + self.logger.info('\n List of files for climatologies:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFilesClimo[0]), + os.path.basename(self.inputFilesClimo[-1]))) - print '\n List of files for time series:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFilesTseries[0]), - os.path.basename(self.inputFilesTseries[-1])) + self.logger.info('\n List of files for time series:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFilesTseries[0]), + os.path.basename(self.inputFilesTseries[-1]))) config = self.config @@ -197,7 +197,7 @@ def run(self): # {{{ # Check whether MOC Analysis Member is enabled if self.mocAnalysisMemberEnabled: # Add a moc_analisysMember_processing - print '*** MOC Analysis Member is on ***' + self.logger.info('*** MOC Analysis Member is on ***') # (mocDictClimo, mocDictTseries) = \ # self._compute_moc_analysismember(config, streams, calendar, # sectionName, dictClimo, @@ -220,7 +220,7 @@ def run(self): # {{{ yLabel = 'depth [m]' for region in self.regionNames: - print ' Plot climatological {} MOC...'.format(region) + self.logger.info(' Plot climatological {} MOC...'.format(region)) title = '{} MOC (ANN, years {:04d}-{:04d})\n {}'.format( region, self.startYearClimo, self.endYearClimo, @@ -256,7 +256,7 @@ def run(self): # {{{ imageCaption=caption) # }}} # Plot time series - print ' Plot time series of max Atlantic MOC at 26.5N...' + self.logger.info(' Plot time series of max Atlantic MOC at 26.5N...') xLabel = 'Time [years]' yLabel = '[Sv]' title = 'Max Atlantic MOC at $26.5^\circ$N\n {}'.format(mainRunName) @@ -341,7 +341,8 @@ def _compute_velocity_climatologies(self): # {{{ variableList=variableList, modelName='mpaso', seasons=['ANN'], - decemberMode='sdd') + decemberMode='sdd', + logger=self.logger) # }}} def _compute_moc_climo_postprocess(self): # {{{ @@ -365,8 +366,8 @@ def _compute_moc_climo_postprocess(self): # {{{ iRegion = 0 self.dictRegion = {} for region in self.regionNames: - print '\n Reading region and transect mask for ' \ - '{}...'.format(region) + self.logger.info('\n Reading region and transect mask for ' + '{}...'.format(region)) ncFileRegional = netCDF4.Dataset(regionMaskFiles, mode='r') maxEdgesInTransect = \ ncFileRegional.dimensions['maxEdgesInTransect'].size @@ -394,8 +395,8 @@ def _compute_moc_climo_postprocess(self): # {{{ self.regionNames.append('Global') # Compute and plot annual climatology of MOC streamfunction - print '\n Compute and/or plot post-processed MOC climatological '\ - 'streamfunction...' + self.logger.info('\n Compute and/or plot post-processed MOC ' + 'climatological streamfunction...') outputDirectory = build_config_full_path(config, 'output', 'mpasClimatologySubdirectory') @@ -405,7 +406,7 @@ def _compute_moc_climo_postprocess(self): # {{{ outputDirectory, self.startYearClimo, self.endYearClimo) if not os.path.exists(outputFileClimo): - print ' Load data...' + self.logger.info(' Load data...') annualClimatology = xr.open_dataset(self.velClimoFile) # rename some variables for convenience @@ -426,9 +427,9 @@ def _compute_moc_climo_postprocess(self): # {{{ self.lat = {} self.moc = {} for region in self.regionNames: - print ' Compute {} MOC...'.format(region) - print ' Compute transport through region southern ' \ - 'transect...' + self.logger.info(' Compute {} MOC...'.format(region)) + self.logger.info(' Compute transport through region ' + 'southern transect...') if region == 'Global': transportZ = np.zeros(nVertLevels) else: @@ -465,7 +466,7 @@ def _compute_moc_climo_postprocess(self): # {{{ self.moc[region] = mocTop # Save to file - print ' Save global and regional MOC to file...' + self.logger.info(' Save global and regional MOC to file...') ncFile = netCDF4.Dataset(outputFileClimo, mode='w') # create dimensions ncFile.createDimension('nz', len(refTopDepth)) @@ -494,7 +495,8 @@ def _compute_moc_climo_postprocess(self): # {{{ ncFile.close() else: # Read from file - print ' Read previously computed MOC streamfunction from file...' + self.logger.info(' Read previously computed MOC streamfunction ' + 'from file...') ncFile = netCDF4.Dataset(outputFileClimo, mode='r') self.depth = ncFile.variables['depth'][:] self.lat = {} @@ -510,9 +512,9 @@ def _compute_moc_time_series_postprocess(self): # {{{ '''compute MOC time series as a post-process''' # Compute and plot time series of Atlantic MOC at 26.5N (RAPID array) - print '\n Compute and/or plot post-processed Atlantic MOC '\ - 'time series...' - print ' Load data...' + self.logger.info('\n Compute and/or plot post-processed Atlantic MOC ' + 'time series...') + self.logger.info(' Load data...') config = self.config @@ -559,7 +561,7 @@ def _compute_moc_time_series_postprocess(self): # {{{ continueOutput = os.path.exists(outputFileTseries) if continueOutput: - print ' Read in previously computed MOC time series' + self.logger.info(' Read in previously computed MOC time series') # add all the other arguments to the function comp_moc_part = partial(self._compute_moc_time_series_part, ds, @@ -570,7 +572,7 @@ def _compute_moc_time_series_postprocess(self): # {{{ dsMOCTimeSeries = cache_time_series( ds.Time.values, comp_moc_part, outputFileTseries, - self.calendar, yearsPerCacheUpdate=1, printProgress=False) + self.calendar, yearsPerCacheUpdate=1, logger=self.logger) return dsMOCTimeSeries # }}} @@ -583,7 +585,7 @@ def _compute_moc_time_series_part(self, ds, areaCell, latCell, indlat26, # computes a subset of the MOC time series if firstCall: - print ' Process and save time series' + self.logger.info(' Process and save time series') times = ds.Time[timeIndices].values mocRegion = np.zeros(timeIndices.shape) @@ -593,7 +595,8 @@ def _compute_moc_time_series_part(self, ds, areaCell, latCell, indlat26, dsLocal = ds.isel(Time=timeIndex) date = days_to_datetime(time, calendar=self.calendar) - print ' date: {:04d}-{:02d}'.format(date.year, date.month) + self.logger.info(' date: {:04d}-{:02d}'.format(date.year, + date.month)) horizontalVel = dsLocal.timeMonthly_avg_normalVelocity.values verticalVel = dsLocal.timeMonthly_avg_vertVelocityTop.values diff --git a/mpas_analysis/ocean/time_series_ohc.py b/mpas_analysis/ocean/time_series_ohc.py index 628b851c1..72980cfda 100644 --- a/mpas_analysis/ocean/time_series_ohc.py +++ b/mpas_analysis/ocean/time_series_ohc.py @@ -3,7 +3,7 @@ import netCDF4 import os -from ..shared.analysis_task import AnalysisTask +from ..shared import AnalysisTask from ..shared.plot.plotting import timeseries_analysis_plot, \ plot_vertical_section, setup_colormap @@ -123,7 +123,7 @@ def setup_and_check(self): # {{{ return # }}} - def run(self): # {{{ + def run_task(self): # {{{ """ Performs analysis of ocean heat content (OHC) from time-series output. @@ -132,7 +132,8 @@ def run(self): # {{{ Xylar Asay-Davis, Milena Veneziani, Greg Streletz """ - print "\nPlotting OHC time series and T, S, and OHC vertical trends..." + self.logger.info("\nPlotting OHC time series and T, S, and OHC " + "vertical trends...") simulationStartTime = get_simulation_start_time(self.runStreams) config = self.config @@ -181,13 +182,14 @@ def run(self): # {{{ raise IOError('No MPAS-O restart file found: need at least one ' 'restart file for OHC calculation') - print '\n Reading files:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFiles[0]), - os.path.basename(self.inputFiles[-1])) + self.logger.info('\n Reading files:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFiles[0]), + os.path.basename(self.inputFiles[-1]))) # Define/read in general variables - print ' Read in depth and compute specific depth indexes...' + self.logger.info(' Read in depth and compute specific depth ' + 'indexes...') ncFile = netCDF4.Dataset(restartFile, mode='r') # reference depth [m] depth = ncFile.variables['refBottomDepth'][:] @@ -199,7 +201,7 @@ def run(self): # {{{ kbtm = len(depth)-1 # Load data - print ' Load ocean data...' + self.logger.info(' Load ocean data...') avgTemperatureVarName = \ 'timeMonthly_avg_avgValueWithinOceanLayerRegion_avgLayerTemperature' avgSalinityVarName = \ @@ -268,7 +270,7 @@ def run(self): # {{{ firstYearAvgLayerSalinity = \ firstYearAvgLayerSalinity.mean('Time') - print ' Compute temperature and salinity anomalies...' + self.logger.info(' Compute temperature and salinity anomalies...') ds['avgLayerTemperatureAnomaly'] = (ds[avgTemperatureVarName] - firstYearAvgLayerTemperature) @@ -282,7 +284,8 @@ def run(self): # {{{ calendar=calendar) if preprocessedReferenceRunName != 'None': - print ' Load in OHC from preprocessed reference run...' + self.logger.info(' Load in OHC from preprocessed reference ' + 'run...') inFilesPreprocessed = '{}/OHC.{}.year*.nc'.format( preprocessedInputDirectory, preprocessedReferenceRunName) dsPreprocessed = open_multifile_dataset( @@ -297,8 +300,9 @@ def run(self): # {{{ dsPreprocessedTimeSlice = \ dsPreprocessed.sel(Time=slice(timeStart, timeEnd)) else: - print ' Warning: Preprocessed time series ends before the ' \ - 'timeSeries startYear and will not be plotted.' + self.logger.warning('Preprocessed time series ends before the ' + 'timeSeries startYear and will not be ' + 'plotted.') preprocessedReferenceRunName = 'None' cacheFileName = '{}/ohcTimeSeries.nc'.format(outputDirectory) @@ -310,11 +314,11 @@ def run(self): # {{{ self._compute_ohc_part, cacheFileName, calendar, yearsPerCacheUpdate=10, - printProgress=True) + logger=self.logger) unitsScalefactor = 1e-22 - print ' Compute OHC and make plots...' + self.logger.info(' Compute OHC and make plots...') for regionIndex in regionIndicesToPlot: region = regions[regionIndex] diff --git a/mpas_analysis/ocean/time_series_sst.py b/mpas_analysis/ocean/time_series_sst.py index 44a3d604c..c39e0aacd 100644 --- a/mpas_analysis/ocean/time_series_sst.py +++ b/mpas_analysis/ocean/time_series_sst.py @@ -1,6 +1,6 @@ import os -from ..shared.analysis_task import AnalysisTask +from ..shared import AnalysisTask from ..shared.plot.plotting import timeseries_analysis_plot @@ -112,7 +112,7 @@ def setup_and_check(self): # {{{ return # }}} - def run(self): # {{{ + def run_task(self): # {{{ """ Performs analysis of the time-series output of sea-surface temperature (SST). @@ -122,18 +122,18 @@ def run(self): # {{{ Xylar Asay-Davis, Milena Veneziani """ - print "\nPlotting SST time series..." + self.logger.info("\nPlotting SST time series...") - print ' Load SST data...' + self.logger.info(' Load SST data...') simulationStartTime = get_simulation_start_time(self.runStreams) config = self.config calendar = self.calendar - print '\n Reading files:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFiles[0]), - os.path.basename(self.inputFiles[-1])) + self.logger.info('\n Reading files:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFiles[0]), + os.path.basename(self.inputFiles[-1]))) mainRunName = config.get('runs', 'mainRunName') preprocessedReferenceRunName = \ @@ -179,7 +179,8 @@ def run(self): # {{{ calendar=calendar) if preprocessedReferenceRunName != 'None': - print ' Load in SST for a preprocesses reference run...' + self.logger.info(' Load in SST for a preprocesses reference ' + 'run...') inFilesPreprocessed = '{}/SST.{}.year*.nc'.format( preprocessedInputDirectory, preprocessedReferenceRunName) dsPreprocessed = open_multifile_dataset( @@ -194,8 +195,9 @@ def run(self): # {{{ dsPreprocessedTimeSlice = \ dsPreprocessed.sel(Time=slice(timeStart, timeEnd)) else: - print ' Warning: Preprocessed time series ends before the ' \ - 'timeSeries startYear and will not be plotted.' + self.logger.warning('Preprocessed time series ends before the ' + 'timeSeries startYear and will not be ' + 'plotted.') preprocessedReferenceRunName = 'None' cacheFileName = '{}/sstTimeSeries.nc'.format(outputDirectory) @@ -206,9 +208,9 @@ def run(self): # {{{ self._compute_sst_part, cacheFileName, calendar, yearsPerCacheUpdate=10, - printProgress=True) + logger=self.logger) - print ' Make plots...' + self.logger.info(' Make plots...') for regionIndex in regionIndicesToPlot: region = regions[regionIndex] diff --git a/mpas_analysis/sea_ice/climatology_map.py b/mpas_analysis/sea_ice/climatology_map.py index b6e4cc40f..9e09ded86 100644 --- a/mpas_analysis/sea_ice/climatology_map.py +++ b/mpas_analysis/sea_ice/climatology_map.py @@ -6,8 +6,6 @@ import xarray as xr -from ..shared.constants import constants - from ..shared.climatology import get_lat_lon_comparison_descriptor, \ get_remapper, get_mpas_climatology_dir_name, \ get_observation_climatology_file_names, \ @@ -21,7 +19,7 @@ from ..shared.plot.plotting import plot_polar_comparison, \ setup_colormap -from ..shared.io.utility import build_config_full_path +from ..shared.io.utility import build_config_full_path, make_directories from ..shared.io import write_netcdf from ..shared.html import write_image_xml @@ -88,7 +86,8 @@ def setup_and_check(self): # {{{ config=self.config, sourceDescriptor=mpasDescriptor, comparisonDescriptor=comparisonDescriptor, mappingFilePrefix='map', - method=self.config.get('climatology', 'mpasInterpolationMethod')) + method=self.config.get('climatology', 'mpasInterpolationMethod'), + logger=self.logger) info = self.obsAndPlotInfo[0] season = info['season'] @@ -108,7 +107,8 @@ def setup_and_check(self): # {{{ comparisonDescriptor=comparisonDescriptor, mappingFilePrefix='map_obs_{}'.format(fieldName), method=config.get('seaIceObservations', - 'interpolationMethod')) + 'interpolationMethod'), + logger=self.logger) self.xmlFileNames = [] @@ -123,9 +123,16 @@ def setup_and_check(self): # {{{ filePrefix)) info['filePrefix'] = filePrefix + # make the mapping directory, because doing so within each process + # seems to be giving ESMF_RegridWeightGen some trouble + mappingSubdirectory = \ + build_config_full_path(self.config, 'output', + 'mappingSubdirectory') + make_directories(mappingSubdirectory) + # }}} - def run(self): # {{{ + def run_task(self): # {{{ """ Performs analysis of sea-ice properties by comparing with previous model results and/or observations. @@ -135,13 +142,13 @@ def run(self): # {{{ Xylar Asay-Davis, Milena Veneziani """ - print "\nPlotting 2-d maps of {} climatologies...".format( - self.fieldNameInTitle) + self.logger.info("\nPlotting 2-d maps of {} climatologies...".format( + self.fieldNameInTitle)) - print '\n Reading files:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFiles[0]), - os.path.basename(self.inputFiles[-1])) + self.logger.info('\n Reading files:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFiles[0]), + os.path.basename(self.inputFiles[-1]))) self._compute_seasonal_climatologies() @@ -157,7 +164,7 @@ def _compute_and_plot(self): # {{{ Xylar Asay-Davis, Milena Veneziani ''' - print ' Make ice concentration plots...' + self.logger.info(' Make ice concentration plots...') config = self.config @@ -232,7 +239,8 @@ def _compute_and_plot(self): # {{{ remap_and_write_climatology( config, seasonalClimatology, obsClimatologyFileName, - obsRemappedFileName, self.obsRemapper) + obsRemappedFileName, self.obsRemapper, + logger=self.logger) else: @@ -338,7 +346,8 @@ def _compute_seasonal_climatologies(self): # {{{ 'timeMonthly_avg_iceVolumeCell'], modelName=modelName, seasons=self.seasons, - decemberMode='sdd') + decemberMode='sdd', + logger=self.logger) self._remap_seasonal_climatology() @@ -379,7 +388,8 @@ def _remap_seasonal_climatology(self): # {{{ self.mpasRemapper.remap_file( inFileName=maskedClimatologyFileName, outFileName=remappedFileName, - overwrite=True) + overwrite=True, + logger=self.logger) # }}} # }}} diff --git a/mpas_analysis/sea_ice/time_series.py b/mpas_analysis/sea_ice/time_series.py index 856aeb4c9..e4e2049b0 100644 --- a/mpas_analysis/sea_ice/time_series.py +++ b/mpas_analysis/sea_ice/time_series.py @@ -137,7 +137,7 @@ def setup_and_check(self): # {{{ self.xmlFileNames.extend(polarXMLFileNames) return # }}} - def run(self): # {{{ + def run_task(self): # {{{ """ Performs analysis of time series of sea-ice properties. @@ -146,15 +146,15 @@ def run(self): # {{{ Xylar Asay-Davis, Milena Veneziani """ - print "\nPlotting sea-ice area and volume time series..." + self.logger.info("\nPlotting sea-ice area and volume time series...") config = self.config calendar = self.calendar - print '\n Reading files:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFiles[0]), - os.path.basename(self.inputFiles[-1])) + self.logger.info('\n Reading files:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFiles[0]), + os.path.basename(self.inputFiles[-1]))) plotTitles = {'iceArea': 'Sea-ice area', 'iceVolume': 'Sea-ice volume', @@ -200,7 +200,7 @@ def run(self): # {{{ make_directories(outputDirectory) - print ' Load sea-ice data...' + self.logger.info(' Load sea-ice data...') # Load mesh self.dsMesh = xr.open_dataset(self.restartFileName) self.dsMesh = subset_variables(self.dsMesh, @@ -240,8 +240,9 @@ def run(self): # {{{ dsPreprocessedTimeSlice = \ dsPreprocessed.sel(Time=slice(timeStart, timeEnd)) else: - print ' Warning: Preprocessed time series ends before the ' \ - 'timeSeries startYear and will not be plotted.' + self.logger.warning('Preprocessed time series ends before the ' + 'timeSeries startYear and will not be ' + 'plotted.') preprocessedReferenceRunName = 'None' norm = {'iceArea': 1e-6, # m^2 to km^2 @@ -262,7 +263,7 @@ def run(self): # {{{ plotVars = {} for hemisphere in ['NH', 'SH']: - print ' Caching {} data'.format(hemisphere) + self.logger.info(' Caching {} data'.format(hemisphere)) cacheFileName = '{}/seaIceAreaVolumeTimeSeries_{}.nc'.format( outputDirectory, hemisphere) @@ -271,9 +272,9 @@ def run(self): # {{{ self.ds = ds dsTimeSeries[hemisphere] = cache_time_series( ds.Time.values, self._compute_area_vol_part, cacheFileName, - calendar, yearsPerCacheUpdate=10, printProgress=True) + calendar, yearsPerCacheUpdate=10, logger=self.logger) - print ' Make {} plots...'.format(hemisphere) + self.logger.info(' Make {} plots...'.format(hemisphere)) for variableName in ['iceArea', 'iceVolume']: key = (hemisphere, variableName) diff --git a/mpas_analysis/shared/__init__.py b/mpas_analysis/shared/__init__.py index e69de29bb..0999164d4 100644 --- a/mpas_analysis/shared/__init__.py +++ b/mpas_analysis/shared/__init__.py @@ -0,0 +1 @@ +from analysis_task import AnalysisTask diff --git a/mpas_analysis/shared/analysis_task.py b/mpas_analysis/shared/analysis_task.py index 5402c242f..a2cb5e827 100644 --- a/mpas_analysis/shared/analysis_task.py +++ b/mpas_analysis/shared/analysis_task.py @@ -8,29 +8,34 @@ ''' import warnings +from multiprocessing import Process, Value +import time +import traceback +import logging +import sys from .io import NameList, StreamsFile from .io.utility import build_config_full_path, make_directories -class AnalysisTask(object): # {{{ +class AnalysisTask(Process): # {{{ ''' The base class for analysis tasks. Attributes ---------- - config : instance of MpasAnalysisConfigParser + config : ``MpasAnalysisConfigParser`` Contains configuration options - taskName : str + taskName : str The name of the task, typically the same as the class name except starting with lowercase (e.g. 'myTask' for class 'MyTask') - componentName : {'ocean', 'seaIce'} + componentName : {'ocean', 'seaIce'} The name of the component (same as the folder where the task resides) - tags : list of str + tags : list of str Tags used to describe the task (e.g. 'timeSeries', 'climatology', horizontalMap', 'index', 'transect'). These are used to determine which tasks are generated (e.g. 'all_transect' or 'no_climatology' @@ -46,24 +51,37 @@ class AnalysisTask(object): # {{{ The directory for writing plots (which is also created if it doesn't exist) - namelist : ``shared.io.NameList`` object + namelist : ``shared.io.NameList`` the namelist reader - runStreams : ``shared.io.StreamsFile`` object + runStreams : ``shared.io.StreamsFile`` the streams file reader for streams in the run directory (e.g. restart files) - historyStreams : ``shared.io.StreamsFile`` object + historyStreams : ``shared.io.StreamsFile`` the streams file reader for streams in the history directory (most streams other than restart files) - calendar : the name of the calendar ('gregorian' or 'gregoraian_noleap') + calendar : {'gregorian', 'gregoraian_noleap'} + The calendar used in the MPAS run + + logger : ``logging.Logger`` + A logger for output during the run phase of an analysis task Authors ------- Xylar Asay-Davis ''' + + # flags for run status + UNSET = 0 + READY = 1 + BLOCKED = 2 + RUNNING = 3 + SUCCESS = 4 + FAIL = 5 + def __init__(self, config, taskName, componentName, tags=[]): # {{{ ''' Construct the analysis task. @@ -95,10 +113,24 @@ def __init__(self, config, taskName, componentName, tags=[]): # {{{ ------- Xylar Asay-Davis ''' + # This will include a subtask name as well in the future + self.fullTaskName = taskName + + # call the constructor from the base class (Process) + super(AnalysisTask, self).__init__(name=self.fullTaskName) + self.config = config self.taskName = taskName self.componentName = componentName - self.tags = tags # }}} + self.tags = tags + self.logger = None + + # non-public attributes related to multiprocessing and logging + self.daemon = True + self._runStatus = Value('i', AnalysisTask.UNSET) + self._stackTrace = None + self._logFileName = None + # }}} def setup_and_check(self): # {{{ ''' @@ -157,15 +189,19 @@ def setup_and_check(self): # {{{ if tag in self.tags: self.set_start_end_date(section=tag) + # redirect output to a log file + logsDirectory = build_config_full_path(self.config, 'output', + 'logsSubdirectory') + + self._logFileName = '{}/{}.log'.format(logsDirectory, + self.fullTaskName) + # }}} - def run(self): # {{{ + def run_task(self): # {{{ ''' - Runs the analysis task. - - Individual tasks (children classes of this base class) should first - call this method to perform any common steps in an analysis task, - then, perform the steps required to run the analysis task. + Run the analysis. Each task should override this function to do the + work of computing and/or plotting analysis Authors ------- @@ -173,6 +209,71 @@ def run(self): # {{{ ''' return # }}} + def run(self, writeLogFile=True): # {{{ + ''' + Sets up logging and then runs the analysis task. + + Authors + ------- + Xylar Asay-Davis + ''' + # redirect output to a log file + logsDirectory = build_config_full_path(self.config, 'output', + 'logsSubdirectory') + + configFileName = '{}/configs/config.{}'.format(logsDirectory, + self.fullTaskName) + configFile = open(configFileName, 'w') + self.config.write(configFile) + configFile.close() + + if writeLogFile: + self.logger = logging.getLogger(self.fullTaskName) + handler = logging.FileHandler(self._logFileName) + else: + self.logger = logging.getLogger() + handler = logging.StreamHandler(sys.stdout) + + formatter = AnalysisFormatter() + handler.setFormatter(formatter) + self.logger.addHandler(handler) + self.logger.setLevel(logging.INFO) + + if writeLogFile: + oldStdout = sys.stdout + oldStderr = sys.stderr + sys.stdout = StreamToLogger(self.logger, logging.INFO) + sys.stderr = StreamToLogger(self.logger, logging.ERROR) + + self._runStatus.value = AnalysisTask.RUNNING + startTime = time.time() + try: + self.run_task() + self._runStatus.value = AnalysisTask.SUCCESS + except (Exception, BaseException) as e: + if isinstance(e, KeyboardInterrupt): + raise e + self._stackTrace = traceback.format_exc() + self.logger.error("analysis task {} failed during run \n" + "{}".format(self.taskName, self._stackTrace)) + self._runStatus.value = AnalysisTask.FAIL + + runDuration = time.time() - startTime + m, s = divmod(runDuration, 60) + h, m = divmod(int(m), 60) + self.logger.info('Execution time: {}:{:02d}:{:05.2f}'.format(h, m, s)) + + if writeLogFile: + # restore stdout and stderr + sys.stdout = oldStdout + sys.stderr = oldStderr + + # remove the handlers from the logger (probably only necessary if + # writeLogFile==False) + self.logger.handlers = [] + + # }}} + def check_generate(self): # {{{ ''' @@ -319,4 +420,78 @@ def set_start_end_date(self, section): # {{{ # }}} +class AnalysisFormatter(logging.Formatter): # {{{ + """ + A custom formatter for logging + + Modified from: + https://stackoverflow.com/a/8349076/7728169 + + Authors + ------- + Xylar Asay-Davis + """ + + # printing error messages without a prefix because they are sometimes + # errors and sometimes only warnings sent to stderr + err_fmt = "%(msg)s" + dbg_fmt = "DEBUG: %(module)s: %(lineno)d: %(msg)s" + info_fmt = "%(msg)s" + + def __init__(self, fmt="%(levelno)s: %(msg)s"): + logging.Formatter.__init__(self, fmt) + + def format(self, record): + + # Save the original format configured by the user + # when the logger formatter was instantiated + format_orig = self._fmt + + # Replace the original format with one customized by logging level + if record.levelno == logging.DEBUG: + self._fmt = AnalysisFormatter.dbg_fmt + + elif record.levelno == logging.INFO: + self._fmt = AnalysisFormatter.info_fmt + + elif record.levelno == logging.ERROR: + self._fmt = AnalysisFormatter.err_fmt + + # Call the original formatter class to do the grunt work + result = logging.Formatter.format(self, record) + + # Restore the original format configured by the user + self._fmt = format_orig + + return result +# }}} + + +class StreamToLogger(object): # {{{ + """ + Modified based on code by: + https://www.electricmonk.nl/log/2011/08/14/redirect-stdout-and-stderr-to-a-logger-in-python/ + + Copyright (C) 2011 Ferry Boender + + License: "available under the GPL" (the author does not provide more + details) + + Fake file-like stream object that redirects writes to a logger instance. + """ + def __init__(self, logger, log_level=logging.INFO): + self.logger = logger + self.log_level = log_level + self.linebuf = '' + + def write(self, buf): + for line in buf.rstrip().splitlines(): + self.logger.log(self.log_level, line.rstrip()) + + def flush(self): + pass + + # }}} + + # vim: foldmethod=marker ai ts=4 sts=4 et sw=4 ft=python diff --git a/mpas_analysis/shared/climatology/climatology.py b/mpas_analysis/shared/climatology/climatology.py index 14b3acb59..75e8343e8 100644 --- a/mpas_analysis/shared/climatology/climatology.py +++ b/mpas_analysis/shared/climatology/climatology.py @@ -64,7 +64,7 @@ def get_lat_lon_comparison_descriptor(config): # {{{ def get_remapper(config, sourceDescriptor, comparisonDescriptor, - mappingFilePrefix, method): # {{{ + mappingFilePrefix, method, logger=None): # {{{ """ Given config options and descriptions of the source and comparison grids, returns a ``Remapper`` object that can be used to remap from source files @@ -90,6 +90,9 @@ def get_remapper(config, sourceDescriptor, comparisonDescriptor, method : {'bilinear', 'neareststod', 'conserve'} The method of interpolation used. + logger : ``logging.Logger``, optional + A logger to which ncclimo output should be redirected + Returns ------- remapper : ``Remapper`` object @@ -135,7 +138,7 @@ def get_remapper(config, sourceDescriptor, comparisonDescriptor, remapper = Remapper(sourceDescriptor, comparisonDescriptor, mappingFileName) - remapper.build_mapping_file(method=method) + remapper.build_mapping_file(method=method, logger=logger) return remapper # }}} @@ -258,7 +261,8 @@ def compute_climatologies_with_ncclimo(config, inDirectory, outDirectory, seasons='none', decemberMode='sdd', remapper=None, - remappedDirectory=None): # {{{ + remappedDirectory=None, + logger=None): # {{{ ''' Uses ncclimo to compute monthly, seasonal (DJF, MAM, JJA, SON) and annual climatologies. @@ -306,6 +310,9 @@ def compute_climatologies_with_ncclimo(config, inDirectory, outDirectory, climatologies on the source grid. Has no effect if ``remapper`` is ``None``. + logger : ``logging.Logger``, optional + A logger to which ncclimo output should be redirected + Raises ------ OSError @@ -346,19 +353,38 @@ def compute_climatologies_with_ncclimo(config, inDirectory, outDirectory, if remappedDirectory is not None: args.extend(['-O', remappedDirectory]) - print 'running: {}'.format(' '.join(args)) - - # make sure any output is flushed before we add output from the - # subprocess - sys.stdout.flush() - sys.stderr.flush() - # set an environment variable to make sure we're not using czender's # local version of NCO instead of one we have intentionally loaded env = os.environ.copy() env['NCO_PATH_OVERRIDE'] = 'No' - subprocess.check_call(args, env=env) # }}} + if logger is None: + print 'running: {}'.format(' '.join(args)) + # make sure any output is flushed before we add output from the + # subprocess + sys.stdout.flush() + sys.stderr.flush() + + subprocess.check_call(args, env=env) + else: + logger.info('running: {}'.format(' '.join(args))) + for handler in logger.handlers: + handler.flush() + + process = subprocess.Popen(args, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, env=env) + stdout, stderr = process.communicate() + + if stdout: + logger.info(stdout) + if stderr: + for line in stderr.split('\n'): + logger.error(line) + + if process.returncode != 0: + raise subprocess.CalledProcessError(process.returncode, + ' '.join(args)) + # }}} def get_observation_climatology_file_names(config, fieldName, monthNames, @@ -751,7 +777,7 @@ def add_years_months_days_in_month(ds, calendar=None): # {{{ def remap_and_write_climatology(config, climatologyDataSet, climatologyFileName, remappedFileName, - remapper): # {{{ + remapper, logger=None): # {{{ """ Given a field in a climatology data set, use the ``remapper`` to remap horizontal dimensions of all fields, write the results to an output file, @@ -785,6 +811,9 @@ def remap_and_write_climatology(config, climatologyDataSet, A remapper that can be used to remap files or data sets to a comparison grid. + logger : ``logging.Logger``, optional + A logger to which ncclimo output should be redirected + Returns ------- remappedClimatology : ``xarray.DataSet`` or ``xarray.DataArray`` object @@ -808,7 +837,8 @@ def remap_and_write_climatology(config, climatologyDataSet, remapper.remap_file(inFileName=climatologyFileName, outFileName=remappedFileName, overwrite=True, - renormalize=renormalizationThreshold) + renormalize=renormalizationThreshold, + logger=logger) remappedClimatology = xr.open_dataset(remappedFileName) else: diff --git a/mpas_analysis/shared/interpolation/remapper.py b/mpas_analysis/shared/interpolation/remapper.py index eb55dd1d1..f73d25ec0 100644 --- a/mpas_analysis/shared/interpolation/remapper.py +++ b/mpas_analysis/shared/interpolation/remapper.py @@ -86,7 +86,7 @@ def __init__(self, sourceDescriptor, destinationDescriptor, # }}} def build_mapping_file(self, method='bilinear', - additionalArgs=None): # {{{ + additionalArgs=None, logger=None): # {{{ ''' Given a source file defining either an MPAS mesh or a lat-lon grid and a destination file or set of arrays defining a lat-lon grid, constructs @@ -102,6 +102,9 @@ def build_mapping_file(self, method='bilinear', additionalArgs : list of str, optional A list of additional arguments to ``ESMF_RegridWeightGen`` + logger : ``logging.Logger``, optional + A logger to which ncclimo output should be redirected + Raises ------ OSError @@ -153,16 +156,36 @@ def build_mapping_file(self, method='bilinear', if additionalArgs is not None: args.extend(additionalArgs) - # make sure any output is flushed before we add output from the - # subprocess - sys.stdout.flush() - sys.stderr.flush() + if logger is None: + print 'running: {}'.format(' '.join(args)) + # make sure any output is flushed before we add output from the + # subprocess + sys.stdout.flush() + sys.stderr.flush() + + # throw out the standard output from ESMF_RegridWeightGen, as it's + # rather verbose but keep stderr + DEVNULL = open(os.devnull, 'wb') + subprocess.check_call(args, stdout=DEVNULL) + + else: + logger.info('running: {}'.format(' '.join(args))) + for handler in logger.handlers: + handler.flush() + + process = subprocess.Popen(args, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = process.communicate() - # throw out the standard output from ESMF_RegridWeightGen, as it's - # rather verbose but keep stderr - DEVNULL = open(os.devnull, 'wb') + # throw out the standard output from ESMF_RegridWeightGen, as it's + # rather verbose but keep stderr + if stderr: + for line in stderr.split('\n'): + logger.error(line) - subprocess.check_call(args, stdout=DEVNULL) + if process.returncode != 0: + raise subprocess.CalledProcessError(process.returncode, + ' '.join(args)) # remove the temporary SCRIP files os.remove(self.sourceDescriptor.scripFileName) @@ -171,7 +194,7 @@ def build_mapping_file(self, method='bilinear', # }}} def remap_file(self, inFileName, outFileName, variableList=None, - overwrite=False, renormalize=None): # {{{ + overwrite=False, renormalize=None, logger=None): # {{{ ''' Given a source file defining either an MPAS mesh or a lat-lon grid and a destination file or set of arrays defining a lat-lon grid, constructs @@ -198,6 +221,9 @@ def remap_file(self, inFileName, outFileName, variableList=None, renormalize : float, optional A threshold to use to renormalize the data + logger : ``logging.Logger``, optional + A logger to which ncclimo output should be redirected + Raises ------ OSError @@ -267,18 +293,38 @@ def remap_file(self, inFileName, outFileName, variableList=None, if variableList is not None: args.extend(['-v', ','.join(variableList)]) - - # make sure any output is flushed before we add output from the - # subprocess - sys.stdout.flush() - sys.stderr.flush() - # set an environment variable to make sure we're not using czender's # local version of NCO instead of one we have intentionally loaded env = os.environ.copy() env['NCO_PATH_OVERRIDE'] = 'No' - subprocess.check_call(args, env=env) # }}} + if logger is None: + print 'running: {}'.format(' '.join(args)) + # make sure any output is flushed before we add output from the + # subprocess + sys.stdout.flush() + sys.stderr.flush() + + subprocess.check_call(args, env=env) + else: + logger.info('running: {}'.format(' '.join(args))) + for handler in logger.handlers: + handler.flush() + + process = subprocess.Popen(args, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, env=env) + stdout, stderr = process.communicate() + + if stdout: + logger.info(stdout) + if stderr: + for line in stderr.split('\n'): + logger.error(line) + + if process.returncode != 0: + raise subprocess.CalledProcessError(process.returncode, + ' '.join(args)) + # }}} def remap(self, ds, renormalizationThreshold=None): # {{{ ''' diff --git a/mpas_analysis/shared/time_series/time_series.py b/mpas_analysis/shared/time_series/time_series.py index 3163abc6e..c27de8530 100644 --- a/mpas_analysis/shared/time_series/time_series.py +++ b/mpas_analysis/shared/time_series/time_series.py @@ -16,7 +16,7 @@ def cache_time_series(timesInDataSet, timeSeriesCalcFunction, cacheFileName, calendar, yearsPerCacheUpdate=1, - printProgress=False): # {{{ + logger=None): # {{{ ''' Create or update a NetCDF file ``cacheFileName`` containing the given time series, calculated with ``timeSeriesCalcFunction`` over the given times, @@ -52,9 +52,8 @@ def cache_time_series(timesInDataSet, timeSeriesCalcFunction, cacheFileName, output the file frequently. If not, there will be needless overhead in caching the file too frequently. - printProgress: bool, optional - Whether progress messages should be printed as the climatology is - computed + logger : ``logging.Logger``, optional + A logger to which to write output as the time series is computed Returns ------- @@ -73,8 +72,8 @@ def cache_time_series(timesInDataSet, timeSeriesCalcFunction, cacheFileName, continueOutput = os.path.exists(cacheFileName) cacheDataSetExists = False if continueOutput: - if printProgress: - print ' Read in previously computed time series' + if logger is not None: + logger.info(' Read in previously computed time series') # read in what we have so far try: @@ -84,7 +83,10 @@ def cache_time_series(timesInDataSet, timeSeriesCalcFunction, cacheFileName, # assuming the cache file is corrupt, so deleting it. message = 'Deleting cache file {}, which appears to have ' \ 'been corrupted.'.format(cacheFileName) - warnings.warn(message) + if logger is None: + warnings.warn(message) + else: + logger.warning(message) os.remove(cacheFileName) if cacheDataSetExists: @@ -116,13 +118,13 @@ def cache_time_series(timesInDataSet, timeSeriesCalcFunction, cacheFileName, # no unprocessed time entries in this data range continue - if printProgress: + if logger is not None: if firstProcessed: - print ' Process and save time series' + logger.info(' Process and save time series') if yearsPerCacheUpdate == 1: - print ' {:04d}'.format(years[0]) + logger.info(' {:04d}'.format(years[0])) else: - print ' {:04d}-{:04d}'.format(years[0], years[-1]) + logger.info(' {:04d}-{:04d}'.format(years[0], years[-1])) ds = timeSeriesCalcFunction(timeIndices, firstProcessed) firstProcessed = False diff --git a/mpas_analysis/test/test_analysis_task.py b/mpas_analysis/test/test_analysis_task.py index 4ea0cec67..689d17bd8 100644 --- a/mpas_analysis/test/test_analysis_task.py +++ b/mpas_analysis/test/test_analysis_task.py @@ -1,5 +1,5 @@ """ -Unit tests for utility functions in run_analysis +Unit tests for utility functions in run_mpas_analysis Xylar Asay-Davis """ @@ -33,7 +33,7 @@ def doTest(generate, expectedResults): # a list of analyses to generate. Valid names are: # 'timeSeriesOHC', 'timeSeriesSST', 'climatologyMapSST', # 'climatologyMapSSS', 'climatologyMapMLD', 'timeSeriesSeaIceAreaVol', - # 'climatologyMapSeaIceConcNH', 'climatologyMapSeaIceConcSH', + # 'climatologyMapSeaIceConcNH', 'climatologyMapSeaIceConcSH', # 'climatologyMapSeaIceThickNH', 'climatologyMapSeaIceThickSH' # the following shortcuts exist: # 'all' -- all analyses will be run diff --git a/run_mpas_analysis b/run_mpas_analysis index c5219b3de..e2fac1b9b 100755 --- a/run_mpas_analysis +++ b/run_mpas_analysis @@ -9,15 +9,13 @@ Authors Xylar Asay-Davis, Phillip J. Wolfram """ -import os import matplotlib as mpl import argparse import traceback import sys -import subprocess -import time import pkg_resources import shutil +import os from mpas_analysis.configuration import MpasAnalysisConfigParser @@ -26,6 +24,8 @@ from mpas_analysis.shared.io.utility import build_config_full_path, \ from mpas_analysis.shared.html import generate_html +from mpas_analysis.shared import AnalysisTask + def update_generate(config, generate): # {{{ """ @@ -55,7 +55,7 @@ def update_generate(config, generate): # {{{ config.set('output', 'generate', generateString) # }}} -def run_parallel_tasks(config, analyses, configFiles, taskCount): +def run_parallel_tasks(config, analyses, taskCount): # {{{ """ Launch new processes for parallel tasks, allowing up to ``taskCount`` @@ -69,9 +69,6 @@ def run_parallel_tasks(config, analyses, configFiles, taskCount): analyses : list of ``AnalysisTask`` objects A list of analysis tasks to run - configFiles : list of str - A list of config files, passed on to each parallel task - taskCount : int The maximum number of tasks that are allowed to run at once @@ -80,34 +77,39 @@ def run_parallel_tasks(config, analyses, configFiles, taskCount): Xylar Asay-Davis """ - taskNames = [analysisTask.taskName for analysisTask in analyses] + taskCount = min(taskCount, len(analyses)) - taskCount = min(taskCount, len(taskNames)) + runningTasks = {} + for analysisTask in analyses[0:taskCount]: + print 'Running {}'.format(analysisTask.taskName) + analysisTask.start() + runningTasks[analysisTask.taskName] = analysisTask - (processes, logs) = launch_tasks(taskNames[0:taskCount], config, - configFiles) - remainingTasks = taskNames[taskCount:] + remainingTasks = analyses[taskCount:] tasksWithErrors = [] - while len(processes) > 0: - (taskName, process) = wait_for_task(processes) - if process.returncode == 0: - print "Task {} has finished successfully.".format(taskName) - else: + while len(runningTasks.keys()) > 0: + analysisTask = wait_for_task(runningTasks) + taskName = analysisTask.taskName + if analysisTask._runStatus.value == AnalysisTask.SUCCESS: + print " Task {} has finished successfully.".format(taskName) + elif analysisTask._runStatus.value == AnalysisTask.FAIL: print "ERROR in task {}. See log file {} for details".format( - taskName, logs[taskName].name) + taskName, analysisTask._logFileName) tasksWithErrors.append(taskName) - logs[taskName].close() + else: + print "Unexpected status from in task {}. This may be a " \ + "bug.".format(taskName) # remove the process from the process dictionary (no need to bother) - processes.pop(taskName) + runningTasks.pop(taskName) if len(remainingTasks) > 0: - (process, log) = launch_tasks(remainingTasks[0:1], config, - configFiles) - # merge the new process and log into these dictionaries - processes.update(process) - logs.update(log) + analysisTask = remainingTasks[0] remainingTasks = remainingTasks[1:] + print 'Running {}'.format(analysisTask.taskName) + analysisTask.start() + runningTasks[analysisTask.taskName] = analysisTask + # raise the last exception so the process exits with an error errorCount = len(tasksWithErrors) if errorCount == 1: @@ -120,119 +122,33 @@ def run_parallel_tasks(config, analyses, configFiles, taskCount): # }}} -def launch_tasks(taskNames, config, configFiles): # {{{ +def wait_for_task(runningTasks, timeout=0.1): # {{{ """ - Launch one or more tasks + Build a list of analysis modules based on the 'generate' config option. + New tasks should be added here, following the approach used for existing + analysis tasks. Parameters ---------- - taskNames : list of str - the names of the tasks to launch - - config : ``MpasAnalysisConfigParser`` object - contains config options - - configFiles : list of str - A list of config files, passed along when each task is launched - - Authors - ------- - Xylar Asay-Davis - """ - thisFile = os.path.realpath(__file__) - - commandPrefix = config.getWithDefault('execute', 'commandPrefix', - default='') - if commandPrefix == '': - commandPrefix = [] - else: - commandPrefix = commandPrefix.split(' ') - - processes = {} - logs = {} - for taskName in taskNames: - args = commandPrefix + \ - [thisFile, '--subtask', '--generate', taskName] + configFiles - - logFileName = '{}/{}.log'.format(logsDirectory, taskName) - - # write the command to the log file - logFile = open(logFileName, 'w') - logFile.write('Command: {}\n'.format(' '.join(args))) - # make sure the command gets written before the rest of the log - logFile.flush() - print 'Running {}'.format(taskName) - process = subprocess.Popen(args, stdout=logFile, - stderr=subprocess.STDOUT) - processes[taskName] = process - logs[taskName] = logFile - - return (processes, logs) # }}} - - -def wait_for_task(processes): # {{{ - """ - Wait for the next process to finish and check its status. Returns both the - task name and the process that finished. - - Parameters - ---------- - processes : list of ``subprocess.Popen`` objects - Processes to wait for - + runningTasks : dict of ``AnalysisTasks`` + The tasks that are currently running, with task names as keys Returns ------- - taskName : str - The name of the task that finished - - process : ``subprocess.Popen`` object - The process that finished + analysisTask : ``AnalysisTasks`` + A task that finished Authors ------- Xylar Asay-Davis """ - - # first, check if any process has already finished - for taskName, process in processes.iteritems(): # python 2.7! - if(not is_running(process)): - return (taskName, process) - - # No process has already finished, so wait for the next one - (pid, status) = os.waitpid(-1, 0) - for taskName, process in processes.iteritems(): - if pid == process.pid: - process.returncode = status - # since we used waitpid, this won't happen automatically - return (taskName, process) # }}} - - -def is_running(process): # {{{ - """ - Returns whether a given process is currently running - - Parameters - ---------- - process : ``subprocess.Popen`` object - The process to check - - Returns - ------- - isRunning : bool - whether the process is running - - Authors - ------- - Xylar Asay-Davis - """ - - try: - os.kill(process.pid, 0) - except OSError: - return False - else: - return True # }}} + # necessary to have a timeout so we can kill the whole thing + # with a keyboard interrupt + while True: + for analysisTask in runningTasks.itervalues(): + analysisTask.join(timeout=timeout) + if not analysisTask.is_alive(): + return analysisTask # }}} def build_analysis_list(config): # {{{ @@ -357,36 +273,16 @@ def run_analysis(config, analyses): # {{{ tasksWithErrors = [] lastStacktrace = None for analysisTask in analyses: - # write out a copy of the configuration to document the run - logsDirectory = build_config_full_path(config, 'output', - 'logsSubdirectory') - try: - startTime = time.clock() - analysisTask.run() - runDuration = time.clock() - startTime - m, s = divmod(runDuration, 60) - h, m = divmod(int(m), 60) - print 'Execution time: {}:{:02d}:{:05.2f}'.format(h, m, s) - except (Exception, BaseException) as e: - if isinstance(e, KeyboardInterrupt): - raise e - lastStacktrace = traceback.format_exc() - print "ERROR: analysis task {} failed during run".format( - analysisTask.taskName) - print lastStacktrace + analysisTask.run(writeLogFile=False) + if analysisTask._runStatus.value == AnalysisTask.FAIL: + lastStacktrace = analysisTask._stackTrace tasksWithErrors.append(analysisTask.taskName) - configFileName = '{}/configs/config.{}'.format(logsDirectory, - analysisTask.taskName) - configFile = open(configFileName, 'w') - config.write(configFile) - configFile.close() - if config.getboolean('plot', 'displayToScreen'): import matplotlib.pyplot as plt plt.show() - # raise the last exception so the process exits with an error + # See if there were errors; exit(1) if so errorCount = len(tasksWithErrors) if errorCount == 1: if len(analyses) > 1: @@ -408,9 +304,6 @@ if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawTextHelpFormatter) - parser.add_argument("--subtask", dest="subtask", action='store_true', - help="If this is a subtask when running parallel " - "tasks") parser.add_argument("--setup_only", dest="setup_only", action='store_true', help="If only the setup phase, not the run or HTML " "generation phases, should be executed.") @@ -479,10 +372,9 @@ if __name__ == "__main__": if parallelTaskCount <= 1 or len(analyses) == 1: run_analysis(config, analyses) else: - run_parallel_tasks(config, analyses, configFiles, - parallelTaskCount) + run_parallel_tasks(config, analyses, parallelTaskCount) - if not args.subtask and not args.setup_only: + if not args.setup_only: generate_html(config, analyses) # vim: foldmethod=marker ai ts=4 sts=4 et sw=4 ft=python