diff --git a/configs/edison/job_script.edison.bash b/configs/edison/job_script.edison.bash index 874aeb329..300731b0b 100644 --- a/configs/edison/job_script.edison.bash +++ b/configs/edison/job_script.edison.bash @@ -8,7 +8,7 @@ ##SBATCH --partition=debug # change number of nodes to change the number of parallel tasks # (anything between 1 and the total number of tasks to run) -#SBATCH --nodes=10 +#SBATCH --nodes=1 #SBATCH --time=1:00:00 #SBATCH --account=acme #SBATCH --job-name=mpas_analysis @@ -23,7 +23,6 @@ export OMP_NUM_THREADS=1 module unload python python/base module use /global/project/projectdirs/acme/software/modulefiles/all module load python/anaconda-2.7-acme -export PATH=/global/homes/z/zender/bin_${NERSC_HOST}:${PATH} # MPAS/ACME job to be analyzed, including paths to simulation data and # observations. Change this name and path as needed @@ -34,7 +33,9 @@ command_prefix="srun -N 1 -n 1" # containing run_mpas_analysis mpas_analysis_dir="." # one parallel task per node by default -parallel_task_count=$SLURM_JOB_NUM_NODES +parallel_task_count=12 +# ncclimo can run with 1 (serial) or 12 (bck) threads +ncclimo_mode=bck if [ ! -f $run_config_file ]; then echo "File $run_config_file not found!" @@ -58,12 +59,13 @@ cat < $job_config_file # the number of parallel tasks (1 means tasks run in serial, the default) parallelTaskCount = $parallel_task_count -# Prefix on the commnd line before a parallel task (e.g. 'srun -n 1 python') -# Default is no prefix (run_mpas_analysis is executed directly) -commandPrefix = $command_prefix +# the parallelism mode in ncclimo ("serial" or "bck") +# Set this to "bck" (background parallelism) if running on a machine that can +# handle 12 simultaneous processes, one for each monthly climatology. +ncclimoParallelMode = $ncclimo_mode EOF -$mpas_analysis_dir/run_mpas_analysis $run_config_file \ +$command_prefix $mpas_analysis_dir/run_mpas_analysis $run_config_file \ $job_config_file diff --git a/configs/job_script.default.bash b/configs/job_script.default.bash index 544a0518a..565d423ab 100755 --- a/configs/job_script.default.bash +++ b/configs/job_script.default.bash @@ -11,6 +11,8 @@ mpas_analysis_dir="." # the number of parallel tasks (anything between 1 and the total number # of tasks to run) parallel_task_count=8 +# ncclimo can run with 1 (serial) or 12 (bck) threads +ncclimo_mode=bck if [ ! -f $run_config_file ]; then echo "File $run_config_file not found!" @@ -33,13 +35,14 @@ cat < $job_config_file # the number of parallel tasks (1 means tasks run in serial, the default) parallelTaskCount = $parallel_task_count -# Prefix on the commnd line before a parallel task (e.g. 'srun -n 1 python') -# Default is no prefix (run_mpas_analysis is executed directly) -commandPrefix = $command_prefix +# the parallelism mode in ncclimo ("serial" or "bck") +# Set this to "bck" (background parallelism) if running on a machine that can +# handle 12 simultaneous processes, one for each monthly climatology. +ncclimoParallelMode = $ncclimo_mode EOF -$mpas_analysis_dir/run_mpas_analysis $run_config_file \ +$command_prefix $mpas_analysis_dir/run_mpas_analysis $run_config_file \ $job_config_file # commend this out if you want to keep the config file, e.g. for debugging diff --git a/configs/lanl/job_script.lanl.bash b/configs/lanl/job_script.lanl.bash index cc6e07927..428522253 100644 --- a/configs/lanl/job_script.lanl.bash +++ b/configs/lanl/job_script.lanl.bash @@ -2,7 +2,7 @@ # change number of nodes to change the number of parallel tasks # (anything between 1 and the total number of tasks to run) -#SBATCH --nodes=10 +#SBATCH --nodes=1 #SBATCH --time=1:00:00 #SBATCH --account=climateacme #SBATCH --job-name=mpas_analysis @@ -12,6 +12,8 @@ cd $SLURM_SUBMIT_DIR # optional, since this is the default behavior +export OMP_NUM_THREADS=1 + module unload python module use /usr/projects/climate/SHARED_CLIMATE/modulefiles/all/ module load python/anaconda-2.7-climate @@ -25,7 +27,9 @@ command_prefix="" # containing run_mpas_analysis mpas_analysis_dir="." # one parallel task per node by default -parallel_task_count=$SLURM_JOB_NUM_NODES +parallel_task_count=12 +# ncclimo can run with 1 (serial) or 12 (bck) threads +ncclimo_mode=bck if [ ! -f $run_config_file ]; then echo "File $run_config_file not found!" @@ -49,12 +53,13 @@ cat < $job_config_file # the number of parallel tasks (1 means tasks run in serial, the default) parallelTaskCount = $parallel_task_count -# Prefix on the commnd line before a parallel task (e.g. 'srun -n 1 python') -# Default is no prefix (run_mpas_analysis is executed directly) -commandPrefix = $command_prefix +# the parallelism mode in ncclimo ("serial" or "bck") +# Set this to "bck" (background parallelism) if running on a machine that can +# handle 12 simultaneous processes, one for each monthly climatology. +ncclimoParallelMode = $ncclimo_mode EOF -$mpas_analysis_dir/run_mpas_analysis $run_config_file \ +$command_prefix $mpas_analysis_dir/run_mpas_analysis $run_config_file \ $job_config_file diff --git a/configs/olcf/job_script.olcf.bash b/configs/olcf/job_script.olcf.bash index 575df9a43..a3aed1262 100644 --- a/configs/olcf/job_script.olcf.bash +++ b/configs/olcf/job_script.olcf.bash @@ -6,7 +6,7 @@ ##PBS -q debug # change number of nodes to change the number of parallel tasks # (anything between 1 and the total number of tasks to run) -#PBS -l nodes=10 +#PBS -l nodes=1 #PBS -l walltime=1:00:00 #PBS -A cli115 #PBS -N mpas_analysis @@ -28,7 +28,9 @@ command_prefix="aprun -b -N 1 -n 1" # containing run_mpas_analysis mpas_analysis_dir="." # one parallel task per node by default -parallel_task_count=$PBS_NUM_NODES +parallel_task_count=12 +# ncclimo can run with 1 (serial) or 12 (bck) threads +ncclimo_mode=bck if [ ! -f $run_config_file ]; then echo "File $run_config_file not found!" @@ -52,12 +54,13 @@ cat < $job_config_file # the number of parallel tasks (1 means tasks run in serial, the default) parallelTaskCount = $parallel_task_count -# Prefix on the commnd line before a parallel task (e.g. 'srun -n 1 python') -# Default is no prefix (run_mpas_analysis is executed directly) -commandPrefix = $command_prefix +# the parallelism mode in ncclimo ("serial" or "bck") +# Set this to "bck" (background parallelism) if running on a machine that can +# handle 12 simultaneous processes, one for each monthly climatology. +ncclimoParallelMode = $ncclimo_mode EOF -$mpas_analysis_dir/run_mpas_analysis $run_config_file \ +$command_prefix $mpas_analysis_dir/run_mpas_analysis $run_config_file \ $job_config_file diff --git a/docs/api.rst b/docs/api.rst index 57347458d..3623557ec 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -13,9 +13,7 @@ Top-level script: run_mpas_analysis run_mpas_analysis.update_generate run_mpas_analysis.run_parallel_tasks - run_mpas_analysis.launch_tasks run_mpas_analysis.wait_for_task - run_mpas_analysis.is_running run_mpas_analysis.build_analysis_list run_mpas_analysis.determine_analyses_to_generate run_mpas_analysis.run_analysis @@ -34,6 +32,7 @@ Base Class AnalysisTask AnalysisTask.setup_and_check + AnalysisTask.run_analysis AnalysisTask.run AnalysisTask.check_generate AnalysisTask.check_analysis_enabled diff --git a/mpas_analysis/analysis_task_template.py b/mpas_analysis/analysis_task_template.py index 689c0256f..17dd3bb4a 100644 --- a/mpas_analysis/analysis_task_template.py +++ b/mpas_analysis/analysis_task_template.py @@ -19,7 +19,7 @@ 5. add new analysis task to run_mpas_analysis under build_analysis_list: analyses.append(.MyTask(config, myArg='argValue')) This will add a new object of the MyTask class to a list of analysis tasks - created in build_analysis_list. Later on in run_analysis, it will first + created in build_analysis_list. Later on in run_task, it will first go through the list to make sure each task needs to be generated (by calling check_generate, which is defined in AnalysisTask), then, will call setup_and_check on each task (to make sure the appropriate AM is on @@ -220,7 +220,7 @@ def setup_and_check(self): # {{{ self.endDate)) # For climatologies, update the start and end year based on the files - # that are actually available + # that are actually available # If not analyzing climatologies, delete this line changed, self.startYear, self.endYear, self.startDate, self.endDate = \ update_climatology_bounds_from_file_names(self.inputFiles, @@ -234,19 +234,19 @@ def setup_and_check(self): # {{{ # images should appear on the webpage. # Note: because of the way parallel tasks are handled in MPAS-Analysis, - # we can't be sure that run() will be called (it might be launched - # as a completely separate process) so it is not safe to store a list - # of xml files from within run(). The recommended procedure is to - # create a list of XML files here during setup_and_check() and possibly - # use them during run() + # we can't be sure that run_task() will be called (it might be + # launched as a completely separate process) so it is not safe to store + # a list of xml files from within run_task(). The recommended + # procedure is to create a list of XML files here during + # setup_and_check() and possibly use them during run_task() self.xmlFileNames = [] - + # we also show how to store file prefixes for later use in creating # plots self.filePrefixes = {} - # plotParameters is a list of parameters, a stand-ins for whatever + # plotParameters is a list of parameters, a stand-ins for whatever # you might want to include in each plot name, for example, seasons or # types of observation. self.plotParameters = self.config.getExpression(self.taskName, @@ -261,10 +261,9 @@ def setup_and_check(self): # {{{ filePrefix)) self.filePrefixes[plotParameter] = filePrefix - # }}} - def run(self): # {{{ + def run_task(self): # {{{ ''' The main method of the task that performs the analysis task. @@ -275,9 +274,9 @@ def run(self): # {{{ # Add the main contents of the analysis task below - # No need to call AnalysisTask.run() because it doesn't do anything, - # so we don't call super(MyTask, self).run(), as we do for other - # methods above. + # No need to call AnalysisTask.run_task() because it doesn't do + # anything, so we don't call super(MyTask, self).run_task(), as we + # do for other methods above. # Here is an example of a call to a local helper method (function), # one for each of our plotParameters (e.g. seasons) @@ -296,15 +295,15 @@ def run(self): # {{{ def _make_plot(self, plotParameter, optionalArgument=None): # {{{ ''' Make a simple plot - + Parameters ---------- plotParameter : str The name of a parameter that is specific to this plot - + optionalArgument : , optional An optional argument - + ''' @@ -320,20 +319,20 @@ def _make_plot(self, plotParameter, optionalArgument=None): # {{{ # get the file name based on the plot parameter filePrefix = self.filePrefixes[plotParameter] outFileName = '{}/{}.png'.format(self.plotsDirectory, filePrefix) - + # make the plot x = numpy.linspace(0, 1, 1000) plt.plot(x, x**2) # save the plot to the output file plt.savefig(outFileName) - + # here's an example of how you would create an XML file for this plot # with the appropriate entries. Some notes: # * Gallery groups typically represent all the analysis from a task, # or sometimes from multiple tasks # * A gallery might be for just for one set of observations, one # season, etc., depending on what makes sense - # * Within each gallery, there is one plot for each value in + # * Within each gallery, there is one plot for each value in # 'plotParameters', with a corresponding caption and short thumbnail # description caption = 'Plot of x^2 with plotParamter: {}'.format(plotParameter) @@ -350,8 +349,8 @@ def _make_plot(self, plotParameter, optionalArgument=None): # {{{ imageDescription=caption, imageCaption=caption) - - # + + # # }}} # }}} diff --git a/mpas_analysis/ocean/climatology_map.py b/mpas_analysis/ocean/climatology_map.py index 7df02d9d3..e0eb148d0 100644 --- a/mpas_analysis/ocean/climatology_map.py +++ b/mpas_analysis/ocean/climatology_map.py @@ -13,13 +13,13 @@ import numpy as np import os -from ..shared.analysis_task import AnalysisTask +from ..shared import AnalysisTask from ..shared.plot.plotting import plot_global_comparison, \ setup_colormap from ..shared.constants import constants -from ..shared.io.utility import build_config_full_path +from ..shared.io.utility import build_config_full_path, make_directories from ..shared.io import write_netcdf from ..shared.html import write_image_xml @@ -105,7 +105,8 @@ def setup_and_check(self): # {{{ config=config, sourceDescriptor=mpasDescriptor, comparisonDescriptor=comparisonDescriptor, mappingFilePrefix=mappingFilePrefix, - method=config.get('climatology', 'mpasInterpolationMethod')) + method=config.get('climatology', 'mpasInterpolationMethod'), + logger=self.logger) obsDescriptor = LatLonGridDescriptor.read(fileName=self.obsFileName, latVarName='lat', @@ -127,7 +128,8 @@ def setup_and_check(self): # {{{ comparisonDescriptor=comparisonDescriptor, mappingFilePrefix='map_obs_{}'.format(fieldName), method=config.get('oceanObservations', - 'interpolationMethod')) + 'interpolationMethod'), + logger=self.logger) self.xmlFileNames = [] self.filePrefixes = {} @@ -140,9 +142,16 @@ def setup_and_check(self): # {{{ filePrefix)) self.filePrefixes[season] = filePrefix + # make the mapping directory, because doing so within each process + # seems to be giving ESMF_RegridWeightGen some trouble + mappingSubdirectory = \ + build_config_full_path(self.config, 'output', + 'mappingSubdirectory') + make_directories(mappingSubdirectory) + # }}} - def run(self): # {{{ + def run_task(self): # {{{ """ Plots a comparison of ACME/MPAS output to SST, MLD or SSS observations @@ -151,17 +160,17 @@ def run(self): # {{{ Luke Van Roekel, Xylar Asay-Davis, Milena Veneziani """ - print "\nPlotting 2-d maps of {} climatologies...".format( - self.fieldNameInTitle) + self.logger.info("\nPlotting 2-d maps of {} climatologies...".format( + self.fieldNameInTitle)) # get local versions of member variables for convenience config = self.config fieldName = self.fieldName - print '\n Reading files:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFiles[0]), - os.path.basename(self.inputFiles[-1])) + self.logger.info('\n Reading files:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFiles[0]), + os.path.basename(self.inputFiles[-1]))) mainRunName = config.get('runs', 'mainRunName') @@ -213,7 +222,8 @@ def run(self): # {{{ variableList=[self.mpasFieldName], modelName=modelName, seasons=outputTimes, - decemberMode='sdd') + decemberMode='sdd', + logger=self.logger) dsObs = None @@ -254,10 +264,12 @@ def run(self): # {{{ write_netcdf(climatology, maskedClimatologyFileName) if not os.path.exists(remappedFileName): + self.mpasRemapper.remap_file( inFileName=maskedClimatologyFileName, outFileName=remappedFileName, - overwrite=True) + overwrite=True, + logger=self.logger) remappedClimatology = xr.open_dataset(remappedFileName) @@ -292,7 +304,8 @@ def run(self): # {{{ remappedClimatology = \ remap_and_write_climatology( config, seasonalClimatology, climatologyFileName, - remappedFileName, self.obsRemapper) + remappedFileName, self.obsRemapper, + logger=self.logger) else: diff --git a/mpas_analysis/ocean/index_nino34.py b/mpas_analysis/ocean/index_nino34.py index 8d3987f69..321119cf4 100644 --- a/mpas_analysis/ocean/index_nino34.py +++ b/mpas_analysis/ocean/index_nino34.py @@ -17,7 +17,7 @@ from ..shared.plot.plotting import plot_xtick_format, plot_size_y_axis -from ..shared.analysis_task import AnalysisTask +from ..shared import AnalysisTask from ..shared.html import write_image_xml @@ -94,7 +94,7 @@ def setup_and_check(self): # {{{ # }}} - def run(self): # {{{ + def run_task(self): # {{{ ''' Computes NINO34 index and plots the time series and power spectrum with 95 and 99% confidence bounds @@ -104,9 +104,10 @@ def run(self): # {{{ Luke Van Roekel, Xylar Asay-Davis ''' - print "\nPlotting Nino3.4 time series and power spectrum...." + self.logger.info("\nPlotting Nino3.4 time series and power " + "spectrum....") - print ' Load SST data...' + self.logger.info(' Load SST data...') fieldName = 'nino' simulationStartTime = get_simulation_start_time(self.runStreams) @@ -127,10 +128,10 @@ def run(self): # {{{ dataPath = "{}/ERS_SSTv4_nino34.nc".format(observationsDirectory) obsTitle = 'ERS SSTv4' - print '\n Reading files:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFiles[0]), - os.path.basename(self.inputFiles[-1])) + self.logger.info('\n Reading files:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFiles[0]), + os.path.basename(self.inputFiles[-1]))) mainRunName = config.get('runs', 'mainRunName') # regionIndex should correspond to NINO34 in surface weighted Average @@ -155,14 +156,14 @@ def run(self): # {{{ dsObs = xr.open_dataset(dataPath) nino34Obs = dsObs.sst - print ' Compute NINO3.4 index...' + self.logger.info(' Compute NINO3.4 index...') regionSST = ds[varName].isel(nOceanRegions=regionIndex) nino34 = self._compute_nino34_index(regionSST, calendar) # Compute the observational index over the entire time range # nino34Obs = compute_nino34_index(dsObs.sst, calendar) - print ' Computing NINO3.4 power spectra...' + self.logger.info(' Computing NINO3.4 power spectra...') f, spectra, conf99, conf95, redNoise = \ self._compute_nino34_spectra(nino34) @@ -183,7 +184,7 @@ def run(self): # {{{ fObs = 1.0 / (constants.eps + fObs*constants.sec_per_year) f30 = 1.0 / (constants.eps + f30*constants.sec_per_year) - print ' Plot NINO3.4 index and spectra...' + self.logger.info(' Plot NINO3.4 index and spectra...') figureName = '{}/NINO34_{}.png'.format(self.plotsDirectory, mainRunName) @@ -282,7 +283,6 @@ def _compute_nino34_spectra(self, nino34Index): # {{{ nino34Index power spectra that has been smoothed with a modified Daniell window (https://www.ncl.ucar.edu/Document/Functions/Built-in/specx_anal.shtml) - f : numpy.array array of frequencies corresponding to the center of the spectral bins resulting from the analysis @@ -493,7 +493,7 @@ def _nino34_spectra_plot(self, config, f, ninoSpectra, if title is not None: fig.suptitle(title, y=0.92, **title_font) - ax1 = plt.subplot(3, 1, 1) + plt.subplot(3, 1, 1) plt.plot(fObs[2:-3], ninoObs[2:-3], 'k', linewidth=linewidths) plt.plot(fObs[2:-3], redNoiseObs[2:-3], 'r', linewidth=linewidths) @@ -519,7 +519,7 @@ def _nino34_spectra_plot(self, config, f, ninoSpectra, if ylabel is not None: plt.ylabel(ylabel, **axis_font) - ax2 = plt.subplot(3, 1, 2) + plt.subplot(3, 1, 2) plt.plot(f30[2:-3], nino30yr[2:-3], 'k', linewidth=linewidths) plt.plot(f30[2:-3], redNoise30[2:-3], 'r', linewidth=linewidths) @@ -539,7 +539,7 @@ def _nino34_spectra_plot(self, config, f, ninoSpectra, if ylabel is not None: plt.ylabel(ylabel, **axis_font) - ax3 = plt.subplot(3, 1, 3) + plt.subplot(3, 1, 3) plt.plot(f[2:-3], ninoSpectra[2:-3], 'k', linewidth=linewidths) plt.plot(f[2:-3], redNoiseSpectra[2:-3], 'r', linewidth=linewidths) plt.plot(f[2:-3], confidence95[2:-3], 'b', linewidth=linewidths) diff --git a/mpas_analysis/ocean/meridional_heat_transport.py b/mpas_analysis/ocean/meridional_heat_transport.py index 499a722a4..aeeb7347e 100644 --- a/mpas_analysis/ocean/meridional_heat_transport.py +++ b/mpas_analysis/ocean/meridional_heat_transport.py @@ -16,7 +16,7 @@ compute_climatologies_with_ncclimo, \ get_ncclimo_season_file_name -from ..shared.analysis_task import AnalysisTask +from ..shared import AnalysisTask from ..shared.html import write_image_xml @@ -150,7 +150,7 @@ def setup_and_check(self): # {{{ # }}} - def run(self): # {{{ + def run_task(self): # {{{ """ Process MHT analysis member data if available. Plots MHT as: @@ -161,7 +161,7 @@ def run(self): # {{{ ------- Mark Petersen, Milena Veneziani, Xylar Asay-Davis """ - print "\nPlotting meridional heat transport (MHT)..." + self.logger.info("\nPlotting meridional heat transport (MHT)...") config = self.config @@ -174,7 +174,7 @@ def run(self): # {{{ # Depth is from refZMid, also in # mpaso.hist.am.meridionalHeatTransport.*.nc - print ' Read in depth and latitude...' + self.logger.info(' Read in depth and latitude...') ncFile = netCDF4.Dataset(self.mhtFile, mode='r') # reference depth [m] refZMid = ncFile.variables['refZMid'][:] @@ -201,19 +201,20 @@ def run(self): # {{{ variableList = ['timeMonthly_avg_meridionalHeatTransportLat', 'timeMonthly_avg_meridionalHeatTransportLatZ'] - print '\n Compute and plot global meridional heat transport' + self.logger.info('\n Compute and plot global meridional heat ' + 'transport') outputRoot = build_config_full_path(config, 'output', 'mpasClimatologySubdirectory') outputDirectory = '{}/mht'.format(outputRoot) - print '\n List of files for climatologies:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFiles[0]), - os.path.basename(self.inputFiles[-1])) + self.logger.info('\n List of files for climatologies:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFiles[0]), + os.path.basename(self.inputFiles[-1]))) - print ' Load data...' + self.logger.info(' Load data...') climatologyFileName = get_ncclimo_season_file_name(outputDirectory, 'mpaso', 'ANN', @@ -233,7 +234,8 @@ def run(self): # {{{ variableList=variableList, modelName='mpaso', seasons=['ANN'], - decemberMode='sdd') + decemberMode='sdd', + logger=self.logger) annualClimatology = xr.open_dataset(climatologyFileName) annualClimatology = annualClimatology.isel(Time=0) @@ -245,7 +247,7 @@ def run(self): # {{{ depthLimGlobal = config.getExpression(self.sectionName, 'depthLimGlobal') - print ' Plot global MHT...' + self.logger.info(' Plot global MHT...') # Plot 1D MHT (zonally averaged, depth integrated) x = binBoundaryMerHeatTrans y = annualClimatology.timeMonthly_avg_meridionalHeatTransportLat diff --git a/mpas_analysis/ocean/streamfunction_moc.py b/mpas_analysis/ocean/streamfunction_moc.py index 3e6aeaa55..d4c409d7a 100644 --- a/mpas_analysis/ocean/streamfunction_moc.py +++ b/mpas_analysis/ocean/streamfunction_moc.py @@ -22,7 +22,7 @@ compute_climatologies_with_ncclimo, \ get_ncclimo_season_file_name -from ..shared.analysis_task import AnalysisTask +from ..shared import AnalysisTask from ..shared.time_series import cache_time_series from ..shared.html import write_image_xml @@ -166,7 +166,7 @@ def setup_and_check(self): # {{{ # }}} - def run(self): # {{{ + def run_task(self): # {{{ ''' Process MOC analysis member data if available, or compute MOC at post-processing if not. Plots streamfunction climatolgoical sections @@ -178,18 +178,18 @@ def run(self): # {{{ Milena Veneziani, Mark Petersen, Phillip J. Wolfram, Xylar Asay-Davis ''' - print "\nPlotting streamfunction of Meridional Overturning " \ - "Circulation (MOC)..." + self.logger.info("\nPlotting streamfunction of Meridional Overturning " + "Circulation (MOC)...") - print '\n List of files for climatologies:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFilesClimo[0]), - os.path.basename(self.inputFilesClimo[-1])) + self.logger.info('\n List of files for climatologies:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFilesClimo[0]), + os.path.basename(self.inputFilesClimo[-1]))) - print '\n List of files for time series:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFilesTseries[0]), - os.path.basename(self.inputFilesTseries[-1])) + self.logger.info('\n List of files for time series:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFilesTseries[0]), + os.path.basename(self.inputFilesTseries[-1]))) config = self.config @@ -197,7 +197,7 @@ def run(self): # {{{ # Check whether MOC Analysis Member is enabled if self.mocAnalysisMemberEnabled: # Add a moc_analisysMember_processing - print '*** MOC Analysis Member is on ***' + self.logger.info('*** MOC Analysis Member is on ***') # (mocDictClimo, mocDictTseries) = \ # self._compute_moc_analysismember(config, streams, calendar, # sectionName, dictClimo, @@ -220,7 +220,7 @@ def run(self): # {{{ yLabel = 'depth [m]' for region in self.regionNames: - print ' Plot climatological {} MOC...'.format(region) + self.logger.info(' Plot climatological {} MOC...'.format(region)) title = '{} MOC (ANN, years {:04d}-{:04d})\n {}'.format( region, self.startYearClimo, self.endYearClimo, @@ -256,7 +256,7 @@ def run(self): # {{{ imageCaption=caption) # }}} # Plot time series - print ' Plot time series of max Atlantic MOC at 26.5N...' + self.logger.info(' Plot time series of max Atlantic MOC at 26.5N...') xLabel = 'Time [years]' yLabel = '[Sv]' title = 'Max Atlantic MOC at $26.5^\circ$N\n {}'.format(mainRunName) @@ -341,7 +341,8 @@ def _compute_velocity_climatologies(self): # {{{ variableList=variableList, modelName='mpaso', seasons=['ANN'], - decemberMode='sdd') + decemberMode='sdd', + logger=self.logger) # }}} def _compute_moc_climo_postprocess(self): # {{{ @@ -365,8 +366,8 @@ def _compute_moc_climo_postprocess(self): # {{{ iRegion = 0 self.dictRegion = {} for region in self.regionNames: - print '\n Reading region and transect mask for ' \ - '{}...'.format(region) + self.logger.info('\n Reading region and transect mask for ' + '{}...'.format(region)) ncFileRegional = netCDF4.Dataset(regionMaskFiles, mode='r') maxEdgesInTransect = \ ncFileRegional.dimensions['maxEdgesInTransect'].size @@ -394,8 +395,8 @@ def _compute_moc_climo_postprocess(self): # {{{ self.regionNames.append('Global') # Compute and plot annual climatology of MOC streamfunction - print '\n Compute and/or plot post-processed MOC climatological '\ - 'streamfunction...' + self.logger.info('\n Compute and/or plot post-processed MOC ' + 'climatological streamfunction...') outputDirectory = build_config_full_path(config, 'output', 'mpasClimatologySubdirectory') @@ -405,7 +406,7 @@ def _compute_moc_climo_postprocess(self): # {{{ outputDirectory, self.startYearClimo, self.endYearClimo) if not os.path.exists(outputFileClimo): - print ' Load data...' + self.logger.info(' Load data...') annualClimatology = xr.open_dataset(self.velClimoFile) # rename some variables for convenience @@ -426,9 +427,9 @@ def _compute_moc_climo_postprocess(self): # {{{ self.lat = {} self.moc = {} for region in self.regionNames: - print ' Compute {} MOC...'.format(region) - print ' Compute transport through region southern ' \ - 'transect...' + self.logger.info(' Compute {} MOC...'.format(region)) + self.logger.info(' Compute transport through region ' + 'southern transect...') if region == 'Global': transportZ = np.zeros(nVertLevels) else: @@ -465,7 +466,7 @@ def _compute_moc_climo_postprocess(self): # {{{ self.moc[region] = mocTop # Save to file - print ' Save global and regional MOC to file...' + self.logger.info(' Save global and regional MOC to file...') ncFile = netCDF4.Dataset(outputFileClimo, mode='w') # create dimensions ncFile.createDimension('nz', len(refTopDepth)) @@ -494,7 +495,8 @@ def _compute_moc_climo_postprocess(self): # {{{ ncFile.close() else: # Read from file - print ' Read previously computed MOC streamfunction from file...' + self.logger.info(' Read previously computed MOC streamfunction ' + 'from file...') ncFile = netCDF4.Dataset(outputFileClimo, mode='r') self.depth = ncFile.variables['depth'][:] self.lat = {} @@ -510,9 +512,9 @@ def _compute_moc_time_series_postprocess(self): # {{{ '''compute MOC time series as a post-process''' # Compute and plot time series of Atlantic MOC at 26.5N (RAPID array) - print '\n Compute and/or plot post-processed Atlantic MOC '\ - 'time series...' - print ' Load data...' + self.logger.info('\n Compute and/or plot post-processed Atlantic MOC ' + 'time series...') + self.logger.info(' Load data...') config = self.config @@ -559,7 +561,7 @@ def _compute_moc_time_series_postprocess(self): # {{{ continueOutput = os.path.exists(outputFileTseries) if continueOutput: - print ' Read in previously computed MOC time series' + self.logger.info(' Read in previously computed MOC time series') # add all the other arguments to the function comp_moc_part = partial(self._compute_moc_time_series_part, ds, @@ -570,7 +572,7 @@ def _compute_moc_time_series_postprocess(self): # {{{ dsMOCTimeSeries = cache_time_series( ds.Time.values, comp_moc_part, outputFileTseries, - self.calendar, yearsPerCacheUpdate=1, printProgress=False) + self.calendar, yearsPerCacheUpdate=1, logger=self.logger) return dsMOCTimeSeries # }}} @@ -583,7 +585,7 @@ def _compute_moc_time_series_part(self, ds, areaCell, latCell, indlat26, # computes a subset of the MOC time series if firstCall: - print ' Process and save time series' + self.logger.info(' Process and save time series') times = ds.Time[timeIndices].values mocRegion = np.zeros(timeIndices.shape) @@ -593,7 +595,8 @@ def _compute_moc_time_series_part(self, ds, areaCell, latCell, indlat26, dsLocal = ds.isel(Time=timeIndex) date = days_to_datetime(time, calendar=self.calendar) - print ' date: {:04d}-{:02d}'.format(date.year, date.month) + self.logger.info(' date: {:04d}-{:02d}'.format(date.year, + date.month)) horizontalVel = dsLocal.timeMonthly_avg_normalVelocity.values verticalVel = dsLocal.timeMonthly_avg_vertVelocityTop.values diff --git a/mpas_analysis/ocean/time_series_ohc.py b/mpas_analysis/ocean/time_series_ohc.py index 628b851c1..72980cfda 100644 --- a/mpas_analysis/ocean/time_series_ohc.py +++ b/mpas_analysis/ocean/time_series_ohc.py @@ -3,7 +3,7 @@ import netCDF4 import os -from ..shared.analysis_task import AnalysisTask +from ..shared import AnalysisTask from ..shared.plot.plotting import timeseries_analysis_plot, \ plot_vertical_section, setup_colormap @@ -123,7 +123,7 @@ def setup_and_check(self): # {{{ return # }}} - def run(self): # {{{ + def run_task(self): # {{{ """ Performs analysis of ocean heat content (OHC) from time-series output. @@ -132,7 +132,8 @@ def run(self): # {{{ Xylar Asay-Davis, Milena Veneziani, Greg Streletz """ - print "\nPlotting OHC time series and T, S, and OHC vertical trends..." + self.logger.info("\nPlotting OHC time series and T, S, and OHC " + "vertical trends...") simulationStartTime = get_simulation_start_time(self.runStreams) config = self.config @@ -181,13 +182,14 @@ def run(self): # {{{ raise IOError('No MPAS-O restart file found: need at least one ' 'restart file for OHC calculation') - print '\n Reading files:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFiles[0]), - os.path.basename(self.inputFiles[-1])) + self.logger.info('\n Reading files:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFiles[0]), + os.path.basename(self.inputFiles[-1]))) # Define/read in general variables - print ' Read in depth and compute specific depth indexes...' + self.logger.info(' Read in depth and compute specific depth ' + 'indexes...') ncFile = netCDF4.Dataset(restartFile, mode='r') # reference depth [m] depth = ncFile.variables['refBottomDepth'][:] @@ -199,7 +201,7 @@ def run(self): # {{{ kbtm = len(depth)-1 # Load data - print ' Load ocean data...' + self.logger.info(' Load ocean data...') avgTemperatureVarName = \ 'timeMonthly_avg_avgValueWithinOceanLayerRegion_avgLayerTemperature' avgSalinityVarName = \ @@ -268,7 +270,7 @@ def run(self): # {{{ firstYearAvgLayerSalinity = \ firstYearAvgLayerSalinity.mean('Time') - print ' Compute temperature and salinity anomalies...' + self.logger.info(' Compute temperature and salinity anomalies...') ds['avgLayerTemperatureAnomaly'] = (ds[avgTemperatureVarName] - firstYearAvgLayerTemperature) @@ -282,7 +284,8 @@ def run(self): # {{{ calendar=calendar) if preprocessedReferenceRunName != 'None': - print ' Load in OHC from preprocessed reference run...' + self.logger.info(' Load in OHC from preprocessed reference ' + 'run...') inFilesPreprocessed = '{}/OHC.{}.year*.nc'.format( preprocessedInputDirectory, preprocessedReferenceRunName) dsPreprocessed = open_multifile_dataset( @@ -297,8 +300,9 @@ def run(self): # {{{ dsPreprocessedTimeSlice = \ dsPreprocessed.sel(Time=slice(timeStart, timeEnd)) else: - print ' Warning: Preprocessed time series ends before the ' \ - 'timeSeries startYear and will not be plotted.' + self.logger.warning('Preprocessed time series ends before the ' + 'timeSeries startYear and will not be ' + 'plotted.') preprocessedReferenceRunName = 'None' cacheFileName = '{}/ohcTimeSeries.nc'.format(outputDirectory) @@ -310,11 +314,11 @@ def run(self): # {{{ self._compute_ohc_part, cacheFileName, calendar, yearsPerCacheUpdate=10, - printProgress=True) + logger=self.logger) unitsScalefactor = 1e-22 - print ' Compute OHC and make plots...' + self.logger.info(' Compute OHC and make plots...') for regionIndex in regionIndicesToPlot: region = regions[regionIndex] diff --git a/mpas_analysis/ocean/time_series_sst.py b/mpas_analysis/ocean/time_series_sst.py index 44a3d604c..c39e0aacd 100644 --- a/mpas_analysis/ocean/time_series_sst.py +++ b/mpas_analysis/ocean/time_series_sst.py @@ -1,6 +1,6 @@ import os -from ..shared.analysis_task import AnalysisTask +from ..shared import AnalysisTask from ..shared.plot.plotting import timeseries_analysis_plot @@ -112,7 +112,7 @@ def setup_and_check(self): # {{{ return # }}} - def run(self): # {{{ + def run_task(self): # {{{ """ Performs analysis of the time-series output of sea-surface temperature (SST). @@ -122,18 +122,18 @@ def run(self): # {{{ Xylar Asay-Davis, Milena Veneziani """ - print "\nPlotting SST time series..." + self.logger.info("\nPlotting SST time series...") - print ' Load SST data...' + self.logger.info(' Load SST data...') simulationStartTime = get_simulation_start_time(self.runStreams) config = self.config calendar = self.calendar - print '\n Reading files:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFiles[0]), - os.path.basename(self.inputFiles[-1])) + self.logger.info('\n Reading files:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFiles[0]), + os.path.basename(self.inputFiles[-1]))) mainRunName = config.get('runs', 'mainRunName') preprocessedReferenceRunName = \ @@ -179,7 +179,8 @@ def run(self): # {{{ calendar=calendar) if preprocessedReferenceRunName != 'None': - print ' Load in SST for a preprocesses reference run...' + self.logger.info(' Load in SST for a preprocesses reference ' + 'run...') inFilesPreprocessed = '{}/SST.{}.year*.nc'.format( preprocessedInputDirectory, preprocessedReferenceRunName) dsPreprocessed = open_multifile_dataset( @@ -194,8 +195,9 @@ def run(self): # {{{ dsPreprocessedTimeSlice = \ dsPreprocessed.sel(Time=slice(timeStart, timeEnd)) else: - print ' Warning: Preprocessed time series ends before the ' \ - 'timeSeries startYear and will not be plotted.' + self.logger.warning('Preprocessed time series ends before the ' + 'timeSeries startYear and will not be ' + 'plotted.') preprocessedReferenceRunName = 'None' cacheFileName = '{}/sstTimeSeries.nc'.format(outputDirectory) @@ -206,9 +208,9 @@ def run(self): # {{{ self._compute_sst_part, cacheFileName, calendar, yearsPerCacheUpdate=10, - printProgress=True) + logger=self.logger) - print ' Make plots...' + self.logger.info(' Make plots...') for regionIndex in regionIndicesToPlot: region = regions[regionIndex] diff --git a/mpas_analysis/sea_ice/climatology_map.py b/mpas_analysis/sea_ice/climatology_map.py index b6e4cc40f..9e09ded86 100644 --- a/mpas_analysis/sea_ice/climatology_map.py +++ b/mpas_analysis/sea_ice/climatology_map.py @@ -6,8 +6,6 @@ import xarray as xr -from ..shared.constants import constants - from ..shared.climatology import get_lat_lon_comparison_descriptor, \ get_remapper, get_mpas_climatology_dir_name, \ get_observation_climatology_file_names, \ @@ -21,7 +19,7 @@ from ..shared.plot.plotting import plot_polar_comparison, \ setup_colormap -from ..shared.io.utility import build_config_full_path +from ..shared.io.utility import build_config_full_path, make_directories from ..shared.io import write_netcdf from ..shared.html import write_image_xml @@ -88,7 +86,8 @@ def setup_and_check(self): # {{{ config=self.config, sourceDescriptor=mpasDescriptor, comparisonDescriptor=comparisonDescriptor, mappingFilePrefix='map', - method=self.config.get('climatology', 'mpasInterpolationMethod')) + method=self.config.get('climatology', 'mpasInterpolationMethod'), + logger=self.logger) info = self.obsAndPlotInfo[0] season = info['season'] @@ -108,7 +107,8 @@ def setup_and_check(self): # {{{ comparisonDescriptor=comparisonDescriptor, mappingFilePrefix='map_obs_{}'.format(fieldName), method=config.get('seaIceObservations', - 'interpolationMethod')) + 'interpolationMethod'), + logger=self.logger) self.xmlFileNames = [] @@ -123,9 +123,16 @@ def setup_and_check(self): # {{{ filePrefix)) info['filePrefix'] = filePrefix + # make the mapping directory, because doing so within each process + # seems to be giving ESMF_RegridWeightGen some trouble + mappingSubdirectory = \ + build_config_full_path(self.config, 'output', + 'mappingSubdirectory') + make_directories(mappingSubdirectory) + # }}} - def run(self): # {{{ + def run_task(self): # {{{ """ Performs analysis of sea-ice properties by comparing with previous model results and/or observations. @@ -135,13 +142,13 @@ def run(self): # {{{ Xylar Asay-Davis, Milena Veneziani """ - print "\nPlotting 2-d maps of {} climatologies...".format( - self.fieldNameInTitle) + self.logger.info("\nPlotting 2-d maps of {} climatologies...".format( + self.fieldNameInTitle)) - print '\n Reading files:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFiles[0]), - os.path.basename(self.inputFiles[-1])) + self.logger.info('\n Reading files:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFiles[0]), + os.path.basename(self.inputFiles[-1]))) self._compute_seasonal_climatologies() @@ -157,7 +164,7 @@ def _compute_and_plot(self): # {{{ Xylar Asay-Davis, Milena Veneziani ''' - print ' Make ice concentration plots...' + self.logger.info(' Make ice concentration plots...') config = self.config @@ -232,7 +239,8 @@ def _compute_and_plot(self): # {{{ remap_and_write_climatology( config, seasonalClimatology, obsClimatologyFileName, - obsRemappedFileName, self.obsRemapper) + obsRemappedFileName, self.obsRemapper, + logger=self.logger) else: @@ -338,7 +346,8 @@ def _compute_seasonal_climatologies(self): # {{{ 'timeMonthly_avg_iceVolumeCell'], modelName=modelName, seasons=self.seasons, - decemberMode='sdd') + decemberMode='sdd', + logger=self.logger) self._remap_seasonal_climatology() @@ -379,7 +388,8 @@ def _remap_seasonal_climatology(self): # {{{ self.mpasRemapper.remap_file( inFileName=maskedClimatologyFileName, outFileName=remappedFileName, - overwrite=True) + overwrite=True, + logger=self.logger) # }}} # }}} diff --git a/mpas_analysis/sea_ice/time_series.py b/mpas_analysis/sea_ice/time_series.py index 856aeb4c9..e4e2049b0 100644 --- a/mpas_analysis/sea_ice/time_series.py +++ b/mpas_analysis/sea_ice/time_series.py @@ -137,7 +137,7 @@ def setup_and_check(self): # {{{ self.xmlFileNames.extend(polarXMLFileNames) return # }}} - def run(self): # {{{ + def run_task(self): # {{{ """ Performs analysis of time series of sea-ice properties. @@ -146,15 +146,15 @@ def run(self): # {{{ Xylar Asay-Davis, Milena Veneziani """ - print "\nPlotting sea-ice area and volume time series..." + self.logger.info("\nPlotting sea-ice area and volume time series...") config = self.config calendar = self.calendar - print '\n Reading files:\n' \ - ' {} through\n {}'.format( - os.path.basename(self.inputFiles[0]), - os.path.basename(self.inputFiles[-1])) + self.logger.info('\n Reading files:\n' + ' {} through\n {}'.format( + os.path.basename(self.inputFiles[0]), + os.path.basename(self.inputFiles[-1]))) plotTitles = {'iceArea': 'Sea-ice area', 'iceVolume': 'Sea-ice volume', @@ -200,7 +200,7 @@ def run(self): # {{{ make_directories(outputDirectory) - print ' Load sea-ice data...' + self.logger.info(' Load sea-ice data...') # Load mesh self.dsMesh = xr.open_dataset(self.restartFileName) self.dsMesh = subset_variables(self.dsMesh, @@ -240,8 +240,9 @@ def run(self): # {{{ dsPreprocessedTimeSlice = \ dsPreprocessed.sel(Time=slice(timeStart, timeEnd)) else: - print ' Warning: Preprocessed time series ends before the ' \ - 'timeSeries startYear and will not be plotted.' + self.logger.warning('Preprocessed time series ends before the ' + 'timeSeries startYear and will not be ' + 'plotted.') preprocessedReferenceRunName = 'None' norm = {'iceArea': 1e-6, # m^2 to km^2 @@ -262,7 +263,7 @@ def run(self): # {{{ plotVars = {} for hemisphere in ['NH', 'SH']: - print ' Caching {} data'.format(hemisphere) + self.logger.info(' Caching {} data'.format(hemisphere)) cacheFileName = '{}/seaIceAreaVolumeTimeSeries_{}.nc'.format( outputDirectory, hemisphere) @@ -271,9 +272,9 @@ def run(self): # {{{ self.ds = ds dsTimeSeries[hemisphere] = cache_time_series( ds.Time.values, self._compute_area_vol_part, cacheFileName, - calendar, yearsPerCacheUpdate=10, printProgress=True) + calendar, yearsPerCacheUpdate=10, logger=self.logger) - print ' Make {} plots...'.format(hemisphere) + self.logger.info(' Make {} plots...'.format(hemisphere)) for variableName in ['iceArea', 'iceVolume']: key = (hemisphere, variableName) diff --git a/mpas_analysis/shared/__init__.py b/mpas_analysis/shared/__init__.py index e69de29bb..0999164d4 100644 --- a/mpas_analysis/shared/__init__.py +++ b/mpas_analysis/shared/__init__.py @@ -0,0 +1 @@ +from analysis_task import AnalysisTask diff --git a/mpas_analysis/shared/analysis_task.py b/mpas_analysis/shared/analysis_task.py index 5402c242f..a2cb5e827 100644 --- a/mpas_analysis/shared/analysis_task.py +++ b/mpas_analysis/shared/analysis_task.py @@ -8,29 +8,34 @@ ''' import warnings +from multiprocessing import Process, Value +import time +import traceback +import logging +import sys from .io import NameList, StreamsFile from .io.utility import build_config_full_path, make_directories -class AnalysisTask(object): # {{{ +class AnalysisTask(Process): # {{{ ''' The base class for analysis tasks. Attributes ---------- - config : instance of MpasAnalysisConfigParser + config : ``MpasAnalysisConfigParser`` Contains configuration options - taskName : str + taskName : str The name of the task, typically the same as the class name except starting with lowercase (e.g. 'myTask' for class 'MyTask') - componentName : {'ocean', 'seaIce'} + componentName : {'ocean', 'seaIce'} The name of the component (same as the folder where the task resides) - tags : list of str + tags : list of str Tags used to describe the task (e.g. 'timeSeries', 'climatology', horizontalMap', 'index', 'transect'). These are used to determine which tasks are generated (e.g. 'all_transect' or 'no_climatology' @@ -46,24 +51,37 @@ class AnalysisTask(object): # {{{ The directory for writing plots (which is also created if it doesn't exist) - namelist : ``shared.io.NameList`` object + namelist : ``shared.io.NameList`` the namelist reader - runStreams : ``shared.io.StreamsFile`` object + runStreams : ``shared.io.StreamsFile`` the streams file reader for streams in the run directory (e.g. restart files) - historyStreams : ``shared.io.StreamsFile`` object + historyStreams : ``shared.io.StreamsFile`` the streams file reader for streams in the history directory (most streams other than restart files) - calendar : the name of the calendar ('gregorian' or 'gregoraian_noleap') + calendar : {'gregorian', 'gregoraian_noleap'} + The calendar used in the MPAS run + + logger : ``logging.Logger`` + A logger for output during the run phase of an analysis task Authors ------- Xylar Asay-Davis ''' + + # flags for run status + UNSET = 0 + READY = 1 + BLOCKED = 2 + RUNNING = 3 + SUCCESS = 4 + FAIL = 5 + def __init__(self, config, taskName, componentName, tags=[]): # {{{ ''' Construct the analysis task. @@ -95,10 +113,24 @@ def __init__(self, config, taskName, componentName, tags=[]): # {{{ ------- Xylar Asay-Davis ''' + # This will include a subtask name as well in the future + self.fullTaskName = taskName + + # call the constructor from the base class (Process) + super(AnalysisTask, self).__init__(name=self.fullTaskName) + self.config = config self.taskName = taskName self.componentName = componentName - self.tags = tags # }}} + self.tags = tags + self.logger = None + + # non-public attributes related to multiprocessing and logging + self.daemon = True + self._runStatus = Value('i', AnalysisTask.UNSET) + self._stackTrace = None + self._logFileName = None + # }}} def setup_and_check(self): # {{{ ''' @@ -157,15 +189,19 @@ def setup_and_check(self): # {{{ if tag in self.tags: self.set_start_end_date(section=tag) + # redirect output to a log file + logsDirectory = build_config_full_path(self.config, 'output', + 'logsSubdirectory') + + self._logFileName = '{}/{}.log'.format(logsDirectory, + self.fullTaskName) + # }}} - def run(self): # {{{ + def run_task(self): # {{{ ''' - Runs the analysis task. - - Individual tasks (children classes of this base class) should first - call this method to perform any common steps in an analysis task, - then, perform the steps required to run the analysis task. + Run the analysis. Each task should override this function to do the + work of computing and/or plotting analysis Authors ------- @@ -173,6 +209,71 @@ def run(self): # {{{ ''' return # }}} + def run(self, writeLogFile=True): # {{{ + ''' + Sets up logging and then runs the analysis task. + + Authors + ------- + Xylar Asay-Davis + ''' + # redirect output to a log file + logsDirectory = build_config_full_path(self.config, 'output', + 'logsSubdirectory') + + configFileName = '{}/configs/config.{}'.format(logsDirectory, + self.fullTaskName) + configFile = open(configFileName, 'w') + self.config.write(configFile) + configFile.close() + + if writeLogFile: + self.logger = logging.getLogger(self.fullTaskName) + handler = logging.FileHandler(self._logFileName) + else: + self.logger = logging.getLogger() + handler = logging.StreamHandler(sys.stdout) + + formatter = AnalysisFormatter() + handler.setFormatter(formatter) + self.logger.addHandler(handler) + self.logger.setLevel(logging.INFO) + + if writeLogFile: + oldStdout = sys.stdout + oldStderr = sys.stderr + sys.stdout = StreamToLogger(self.logger, logging.INFO) + sys.stderr = StreamToLogger(self.logger, logging.ERROR) + + self._runStatus.value = AnalysisTask.RUNNING + startTime = time.time() + try: + self.run_task() + self._runStatus.value = AnalysisTask.SUCCESS + except (Exception, BaseException) as e: + if isinstance(e, KeyboardInterrupt): + raise e + self._stackTrace = traceback.format_exc() + self.logger.error("analysis task {} failed during run \n" + "{}".format(self.taskName, self._stackTrace)) + self._runStatus.value = AnalysisTask.FAIL + + runDuration = time.time() - startTime + m, s = divmod(runDuration, 60) + h, m = divmod(int(m), 60) + self.logger.info('Execution time: {}:{:02d}:{:05.2f}'.format(h, m, s)) + + if writeLogFile: + # restore stdout and stderr + sys.stdout = oldStdout + sys.stderr = oldStderr + + # remove the handlers from the logger (probably only necessary if + # writeLogFile==False) + self.logger.handlers = [] + + # }}} + def check_generate(self): # {{{ ''' @@ -319,4 +420,78 @@ def set_start_end_date(self, section): # {{{ # }}} +class AnalysisFormatter(logging.Formatter): # {{{ + """ + A custom formatter for logging + + Modified from: + https://stackoverflow.com/a/8349076/7728169 + + Authors + ------- + Xylar Asay-Davis + """ + + # printing error messages without a prefix because they are sometimes + # errors and sometimes only warnings sent to stderr + err_fmt = "%(msg)s" + dbg_fmt = "DEBUG: %(module)s: %(lineno)d: %(msg)s" + info_fmt = "%(msg)s" + + def __init__(self, fmt="%(levelno)s: %(msg)s"): + logging.Formatter.__init__(self, fmt) + + def format(self, record): + + # Save the original format configured by the user + # when the logger formatter was instantiated + format_orig = self._fmt + + # Replace the original format with one customized by logging level + if record.levelno == logging.DEBUG: + self._fmt = AnalysisFormatter.dbg_fmt + + elif record.levelno == logging.INFO: + self._fmt = AnalysisFormatter.info_fmt + + elif record.levelno == logging.ERROR: + self._fmt = AnalysisFormatter.err_fmt + + # Call the original formatter class to do the grunt work + result = logging.Formatter.format(self, record) + + # Restore the original format configured by the user + self._fmt = format_orig + + return result +# }}} + + +class StreamToLogger(object): # {{{ + """ + Modified based on code by: + https://www.electricmonk.nl/log/2011/08/14/redirect-stdout-and-stderr-to-a-logger-in-python/ + + Copyright (C) 2011 Ferry Boender + + License: "available under the GPL" (the author does not provide more + details) + + Fake file-like stream object that redirects writes to a logger instance. + """ + def __init__(self, logger, log_level=logging.INFO): + self.logger = logger + self.log_level = log_level + self.linebuf = '' + + def write(self, buf): + for line in buf.rstrip().splitlines(): + self.logger.log(self.log_level, line.rstrip()) + + def flush(self): + pass + + # }}} + + # vim: foldmethod=marker ai ts=4 sts=4 et sw=4 ft=python diff --git a/mpas_analysis/shared/climatology/climatology.py b/mpas_analysis/shared/climatology/climatology.py index 14b3acb59..75e8343e8 100644 --- a/mpas_analysis/shared/climatology/climatology.py +++ b/mpas_analysis/shared/climatology/climatology.py @@ -64,7 +64,7 @@ def get_lat_lon_comparison_descriptor(config): # {{{ def get_remapper(config, sourceDescriptor, comparisonDescriptor, - mappingFilePrefix, method): # {{{ + mappingFilePrefix, method, logger=None): # {{{ """ Given config options and descriptions of the source and comparison grids, returns a ``Remapper`` object that can be used to remap from source files @@ -90,6 +90,9 @@ def get_remapper(config, sourceDescriptor, comparisonDescriptor, method : {'bilinear', 'neareststod', 'conserve'} The method of interpolation used. + logger : ``logging.Logger``, optional + A logger to which ncclimo output should be redirected + Returns ------- remapper : ``Remapper`` object @@ -135,7 +138,7 @@ def get_remapper(config, sourceDescriptor, comparisonDescriptor, remapper = Remapper(sourceDescriptor, comparisonDescriptor, mappingFileName) - remapper.build_mapping_file(method=method) + remapper.build_mapping_file(method=method, logger=logger) return remapper # }}} @@ -258,7 +261,8 @@ def compute_climatologies_with_ncclimo(config, inDirectory, outDirectory, seasons='none', decemberMode='sdd', remapper=None, - remappedDirectory=None): # {{{ + remappedDirectory=None, + logger=None): # {{{ ''' Uses ncclimo to compute monthly, seasonal (DJF, MAM, JJA, SON) and annual climatologies. @@ -306,6 +310,9 @@ def compute_climatologies_with_ncclimo(config, inDirectory, outDirectory, climatologies on the source grid. Has no effect if ``remapper`` is ``None``. + logger : ``logging.Logger``, optional + A logger to which ncclimo output should be redirected + Raises ------ OSError @@ -346,19 +353,38 @@ def compute_climatologies_with_ncclimo(config, inDirectory, outDirectory, if remappedDirectory is not None: args.extend(['-O', remappedDirectory]) - print 'running: {}'.format(' '.join(args)) - - # make sure any output is flushed before we add output from the - # subprocess - sys.stdout.flush() - sys.stderr.flush() - # set an environment variable to make sure we're not using czender's # local version of NCO instead of one we have intentionally loaded env = os.environ.copy() env['NCO_PATH_OVERRIDE'] = 'No' - subprocess.check_call(args, env=env) # }}} + if logger is None: + print 'running: {}'.format(' '.join(args)) + # make sure any output is flushed before we add output from the + # subprocess + sys.stdout.flush() + sys.stderr.flush() + + subprocess.check_call(args, env=env) + else: + logger.info('running: {}'.format(' '.join(args))) + for handler in logger.handlers: + handler.flush() + + process = subprocess.Popen(args, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, env=env) + stdout, stderr = process.communicate() + + if stdout: + logger.info(stdout) + if stderr: + for line in stderr.split('\n'): + logger.error(line) + + if process.returncode != 0: + raise subprocess.CalledProcessError(process.returncode, + ' '.join(args)) + # }}} def get_observation_climatology_file_names(config, fieldName, monthNames, @@ -751,7 +777,7 @@ def add_years_months_days_in_month(ds, calendar=None): # {{{ def remap_and_write_climatology(config, climatologyDataSet, climatologyFileName, remappedFileName, - remapper): # {{{ + remapper, logger=None): # {{{ """ Given a field in a climatology data set, use the ``remapper`` to remap horizontal dimensions of all fields, write the results to an output file, @@ -785,6 +811,9 @@ def remap_and_write_climatology(config, climatologyDataSet, A remapper that can be used to remap files or data sets to a comparison grid. + logger : ``logging.Logger``, optional + A logger to which ncclimo output should be redirected + Returns ------- remappedClimatology : ``xarray.DataSet`` or ``xarray.DataArray`` object @@ -808,7 +837,8 @@ def remap_and_write_climatology(config, climatologyDataSet, remapper.remap_file(inFileName=climatologyFileName, outFileName=remappedFileName, overwrite=True, - renormalize=renormalizationThreshold) + renormalize=renormalizationThreshold, + logger=logger) remappedClimatology = xr.open_dataset(remappedFileName) else: diff --git a/mpas_analysis/shared/interpolation/remapper.py b/mpas_analysis/shared/interpolation/remapper.py index eb55dd1d1..f73d25ec0 100644 --- a/mpas_analysis/shared/interpolation/remapper.py +++ b/mpas_analysis/shared/interpolation/remapper.py @@ -86,7 +86,7 @@ def __init__(self, sourceDescriptor, destinationDescriptor, # }}} def build_mapping_file(self, method='bilinear', - additionalArgs=None): # {{{ + additionalArgs=None, logger=None): # {{{ ''' Given a source file defining either an MPAS mesh or a lat-lon grid and a destination file or set of arrays defining a lat-lon grid, constructs @@ -102,6 +102,9 @@ def build_mapping_file(self, method='bilinear', additionalArgs : list of str, optional A list of additional arguments to ``ESMF_RegridWeightGen`` + logger : ``logging.Logger``, optional + A logger to which ncclimo output should be redirected + Raises ------ OSError @@ -153,16 +156,36 @@ def build_mapping_file(self, method='bilinear', if additionalArgs is not None: args.extend(additionalArgs) - # make sure any output is flushed before we add output from the - # subprocess - sys.stdout.flush() - sys.stderr.flush() + if logger is None: + print 'running: {}'.format(' '.join(args)) + # make sure any output is flushed before we add output from the + # subprocess + sys.stdout.flush() + sys.stderr.flush() + + # throw out the standard output from ESMF_RegridWeightGen, as it's + # rather verbose but keep stderr + DEVNULL = open(os.devnull, 'wb') + subprocess.check_call(args, stdout=DEVNULL) + + else: + logger.info('running: {}'.format(' '.join(args))) + for handler in logger.handlers: + handler.flush() + + process = subprocess.Popen(args, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = process.communicate() - # throw out the standard output from ESMF_RegridWeightGen, as it's - # rather verbose but keep stderr - DEVNULL = open(os.devnull, 'wb') + # throw out the standard output from ESMF_RegridWeightGen, as it's + # rather verbose but keep stderr + if stderr: + for line in stderr.split('\n'): + logger.error(line) - subprocess.check_call(args, stdout=DEVNULL) + if process.returncode != 0: + raise subprocess.CalledProcessError(process.returncode, + ' '.join(args)) # remove the temporary SCRIP files os.remove(self.sourceDescriptor.scripFileName) @@ -171,7 +194,7 @@ def build_mapping_file(self, method='bilinear', # }}} def remap_file(self, inFileName, outFileName, variableList=None, - overwrite=False, renormalize=None): # {{{ + overwrite=False, renormalize=None, logger=None): # {{{ ''' Given a source file defining either an MPAS mesh or a lat-lon grid and a destination file or set of arrays defining a lat-lon grid, constructs @@ -198,6 +221,9 @@ def remap_file(self, inFileName, outFileName, variableList=None, renormalize : float, optional A threshold to use to renormalize the data + logger : ``logging.Logger``, optional + A logger to which ncclimo output should be redirected + Raises ------ OSError @@ -267,18 +293,38 @@ def remap_file(self, inFileName, outFileName, variableList=None, if variableList is not None: args.extend(['-v', ','.join(variableList)]) - - # make sure any output is flushed before we add output from the - # subprocess - sys.stdout.flush() - sys.stderr.flush() - # set an environment variable to make sure we're not using czender's # local version of NCO instead of one we have intentionally loaded env = os.environ.copy() env['NCO_PATH_OVERRIDE'] = 'No' - subprocess.check_call(args, env=env) # }}} + if logger is None: + print 'running: {}'.format(' '.join(args)) + # make sure any output is flushed before we add output from the + # subprocess + sys.stdout.flush() + sys.stderr.flush() + + subprocess.check_call(args, env=env) + else: + logger.info('running: {}'.format(' '.join(args))) + for handler in logger.handlers: + handler.flush() + + process = subprocess.Popen(args, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, env=env) + stdout, stderr = process.communicate() + + if stdout: + logger.info(stdout) + if stderr: + for line in stderr.split('\n'): + logger.error(line) + + if process.returncode != 0: + raise subprocess.CalledProcessError(process.returncode, + ' '.join(args)) + # }}} def remap(self, ds, renormalizationThreshold=None): # {{{ ''' diff --git a/mpas_analysis/shared/time_series/time_series.py b/mpas_analysis/shared/time_series/time_series.py index 3163abc6e..c27de8530 100644 --- a/mpas_analysis/shared/time_series/time_series.py +++ b/mpas_analysis/shared/time_series/time_series.py @@ -16,7 +16,7 @@ def cache_time_series(timesInDataSet, timeSeriesCalcFunction, cacheFileName, calendar, yearsPerCacheUpdate=1, - printProgress=False): # {{{ + logger=None): # {{{ ''' Create or update a NetCDF file ``cacheFileName`` containing the given time series, calculated with ``timeSeriesCalcFunction`` over the given times, @@ -52,9 +52,8 @@ def cache_time_series(timesInDataSet, timeSeriesCalcFunction, cacheFileName, output the file frequently. If not, there will be needless overhead in caching the file too frequently. - printProgress: bool, optional - Whether progress messages should be printed as the climatology is - computed + logger : ``logging.Logger``, optional + A logger to which to write output as the time series is computed Returns ------- @@ -73,8 +72,8 @@ def cache_time_series(timesInDataSet, timeSeriesCalcFunction, cacheFileName, continueOutput = os.path.exists(cacheFileName) cacheDataSetExists = False if continueOutput: - if printProgress: - print ' Read in previously computed time series' + if logger is not None: + logger.info(' Read in previously computed time series') # read in what we have so far try: @@ -84,7 +83,10 @@ def cache_time_series(timesInDataSet, timeSeriesCalcFunction, cacheFileName, # assuming the cache file is corrupt, so deleting it. message = 'Deleting cache file {}, which appears to have ' \ 'been corrupted.'.format(cacheFileName) - warnings.warn(message) + if logger is None: + warnings.warn(message) + else: + logger.warning(message) os.remove(cacheFileName) if cacheDataSetExists: @@ -116,13 +118,13 @@ def cache_time_series(timesInDataSet, timeSeriesCalcFunction, cacheFileName, # no unprocessed time entries in this data range continue - if printProgress: + if logger is not None: if firstProcessed: - print ' Process and save time series' + logger.info(' Process and save time series') if yearsPerCacheUpdate == 1: - print ' {:04d}'.format(years[0]) + logger.info(' {:04d}'.format(years[0])) else: - print ' {:04d}-{:04d}'.format(years[0], years[-1]) + logger.info(' {:04d}-{:04d}'.format(years[0], years[-1])) ds = timeSeriesCalcFunction(timeIndices, firstProcessed) firstProcessed = False diff --git a/mpas_analysis/test/test_analysis_task.py b/mpas_analysis/test/test_analysis_task.py index 4ea0cec67..689d17bd8 100644 --- a/mpas_analysis/test/test_analysis_task.py +++ b/mpas_analysis/test/test_analysis_task.py @@ -1,5 +1,5 @@ """ -Unit tests for utility functions in run_analysis +Unit tests for utility functions in run_mpas_analysis Xylar Asay-Davis """ @@ -33,7 +33,7 @@ def doTest(generate, expectedResults): # a list of analyses to generate. Valid names are: # 'timeSeriesOHC', 'timeSeriesSST', 'climatologyMapSST', # 'climatologyMapSSS', 'climatologyMapMLD', 'timeSeriesSeaIceAreaVol', - # 'climatologyMapSeaIceConcNH', 'climatologyMapSeaIceConcSH', + # 'climatologyMapSeaIceConcNH', 'climatologyMapSeaIceConcSH', # 'climatologyMapSeaIceThickNH', 'climatologyMapSeaIceThickSH' # the following shortcuts exist: # 'all' -- all analyses will be run diff --git a/run_mpas_analysis b/run_mpas_analysis index c5219b3de..e2fac1b9b 100755 --- a/run_mpas_analysis +++ b/run_mpas_analysis @@ -9,15 +9,13 @@ Authors Xylar Asay-Davis, Phillip J. Wolfram """ -import os import matplotlib as mpl import argparse import traceback import sys -import subprocess -import time import pkg_resources import shutil +import os from mpas_analysis.configuration import MpasAnalysisConfigParser @@ -26,6 +24,8 @@ from mpas_analysis.shared.io.utility import build_config_full_path, \ from mpas_analysis.shared.html import generate_html +from mpas_analysis.shared import AnalysisTask + def update_generate(config, generate): # {{{ """ @@ -55,7 +55,7 @@ def update_generate(config, generate): # {{{ config.set('output', 'generate', generateString) # }}} -def run_parallel_tasks(config, analyses, configFiles, taskCount): +def run_parallel_tasks(config, analyses, taskCount): # {{{ """ Launch new processes for parallel tasks, allowing up to ``taskCount`` @@ -69,9 +69,6 @@ def run_parallel_tasks(config, analyses, configFiles, taskCount): analyses : list of ``AnalysisTask`` objects A list of analysis tasks to run - configFiles : list of str - A list of config files, passed on to each parallel task - taskCount : int The maximum number of tasks that are allowed to run at once @@ -80,34 +77,39 @@ def run_parallel_tasks(config, analyses, configFiles, taskCount): Xylar Asay-Davis """ - taskNames = [analysisTask.taskName for analysisTask in analyses] + taskCount = min(taskCount, len(analyses)) - taskCount = min(taskCount, len(taskNames)) + runningTasks = {} + for analysisTask in analyses[0:taskCount]: + print 'Running {}'.format(analysisTask.taskName) + analysisTask.start() + runningTasks[analysisTask.taskName] = analysisTask - (processes, logs) = launch_tasks(taskNames[0:taskCount], config, - configFiles) - remainingTasks = taskNames[taskCount:] + remainingTasks = analyses[taskCount:] tasksWithErrors = [] - while len(processes) > 0: - (taskName, process) = wait_for_task(processes) - if process.returncode == 0: - print "Task {} has finished successfully.".format(taskName) - else: + while len(runningTasks.keys()) > 0: + analysisTask = wait_for_task(runningTasks) + taskName = analysisTask.taskName + if analysisTask._runStatus.value == AnalysisTask.SUCCESS: + print " Task {} has finished successfully.".format(taskName) + elif analysisTask._runStatus.value == AnalysisTask.FAIL: print "ERROR in task {}. See log file {} for details".format( - taskName, logs[taskName].name) + taskName, analysisTask._logFileName) tasksWithErrors.append(taskName) - logs[taskName].close() + else: + print "Unexpected status from in task {}. This may be a " \ + "bug.".format(taskName) # remove the process from the process dictionary (no need to bother) - processes.pop(taskName) + runningTasks.pop(taskName) if len(remainingTasks) > 0: - (process, log) = launch_tasks(remainingTasks[0:1], config, - configFiles) - # merge the new process and log into these dictionaries - processes.update(process) - logs.update(log) + analysisTask = remainingTasks[0] remainingTasks = remainingTasks[1:] + print 'Running {}'.format(analysisTask.taskName) + analysisTask.start() + runningTasks[analysisTask.taskName] = analysisTask + # raise the last exception so the process exits with an error errorCount = len(tasksWithErrors) if errorCount == 1: @@ -120,119 +122,33 @@ def run_parallel_tasks(config, analyses, configFiles, taskCount): # }}} -def launch_tasks(taskNames, config, configFiles): # {{{ +def wait_for_task(runningTasks, timeout=0.1): # {{{ """ - Launch one or more tasks + Build a list of analysis modules based on the 'generate' config option. + New tasks should be added here, following the approach used for existing + analysis tasks. Parameters ---------- - taskNames : list of str - the names of the tasks to launch - - config : ``MpasAnalysisConfigParser`` object - contains config options - - configFiles : list of str - A list of config files, passed along when each task is launched - - Authors - ------- - Xylar Asay-Davis - """ - thisFile = os.path.realpath(__file__) - - commandPrefix = config.getWithDefault('execute', 'commandPrefix', - default='') - if commandPrefix == '': - commandPrefix = [] - else: - commandPrefix = commandPrefix.split(' ') - - processes = {} - logs = {} - for taskName in taskNames: - args = commandPrefix + \ - [thisFile, '--subtask', '--generate', taskName] + configFiles - - logFileName = '{}/{}.log'.format(logsDirectory, taskName) - - # write the command to the log file - logFile = open(logFileName, 'w') - logFile.write('Command: {}\n'.format(' '.join(args))) - # make sure the command gets written before the rest of the log - logFile.flush() - print 'Running {}'.format(taskName) - process = subprocess.Popen(args, stdout=logFile, - stderr=subprocess.STDOUT) - processes[taskName] = process - logs[taskName] = logFile - - return (processes, logs) # }}} - - -def wait_for_task(processes): # {{{ - """ - Wait for the next process to finish and check its status. Returns both the - task name and the process that finished. - - Parameters - ---------- - processes : list of ``subprocess.Popen`` objects - Processes to wait for - + runningTasks : dict of ``AnalysisTasks`` + The tasks that are currently running, with task names as keys Returns ------- - taskName : str - The name of the task that finished - - process : ``subprocess.Popen`` object - The process that finished + analysisTask : ``AnalysisTasks`` + A task that finished Authors ------- Xylar Asay-Davis """ - - # first, check if any process has already finished - for taskName, process in processes.iteritems(): # python 2.7! - if(not is_running(process)): - return (taskName, process) - - # No process has already finished, so wait for the next one - (pid, status) = os.waitpid(-1, 0) - for taskName, process in processes.iteritems(): - if pid == process.pid: - process.returncode = status - # since we used waitpid, this won't happen automatically - return (taskName, process) # }}} - - -def is_running(process): # {{{ - """ - Returns whether a given process is currently running - - Parameters - ---------- - process : ``subprocess.Popen`` object - The process to check - - Returns - ------- - isRunning : bool - whether the process is running - - Authors - ------- - Xylar Asay-Davis - """ - - try: - os.kill(process.pid, 0) - except OSError: - return False - else: - return True # }}} + # necessary to have a timeout so we can kill the whole thing + # with a keyboard interrupt + while True: + for analysisTask in runningTasks.itervalues(): + analysisTask.join(timeout=timeout) + if not analysisTask.is_alive(): + return analysisTask # }}} def build_analysis_list(config): # {{{ @@ -357,36 +273,16 @@ def run_analysis(config, analyses): # {{{ tasksWithErrors = [] lastStacktrace = None for analysisTask in analyses: - # write out a copy of the configuration to document the run - logsDirectory = build_config_full_path(config, 'output', - 'logsSubdirectory') - try: - startTime = time.clock() - analysisTask.run() - runDuration = time.clock() - startTime - m, s = divmod(runDuration, 60) - h, m = divmod(int(m), 60) - print 'Execution time: {}:{:02d}:{:05.2f}'.format(h, m, s) - except (Exception, BaseException) as e: - if isinstance(e, KeyboardInterrupt): - raise e - lastStacktrace = traceback.format_exc() - print "ERROR: analysis task {} failed during run".format( - analysisTask.taskName) - print lastStacktrace + analysisTask.run(writeLogFile=False) + if analysisTask._runStatus.value == AnalysisTask.FAIL: + lastStacktrace = analysisTask._stackTrace tasksWithErrors.append(analysisTask.taskName) - configFileName = '{}/configs/config.{}'.format(logsDirectory, - analysisTask.taskName) - configFile = open(configFileName, 'w') - config.write(configFile) - configFile.close() - if config.getboolean('plot', 'displayToScreen'): import matplotlib.pyplot as plt plt.show() - # raise the last exception so the process exits with an error + # See if there were errors; exit(1) if so errorCount = len(tasksWithErrors) if errorCount == 1: if len(analyses) > 1: @@ -408,9 +304,6 @@ if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawTextHelpFormatter) - parser.add_argument("--subtask", dest="subtask", action='store_true', - help="If this is a subtask when running parallel " - "tasks") parser.add_argument("--setup_only", dest="setup_only", action='store_true', help="If only the setup phase, not the run or HTML " "generation phases, should be executed.") @@ -479,10 +372,9 @@ if __name__ == "__main__": if parallelTaskCount <= 1 or len(analyses) == 1: run_analysis(config, analyses) else: - run_parallel_tasks(config, analyses, configFiles, - parallelTaskCount) + run_parallel_tasks(config, analyses, parallelTaskCount) - if not args.subtask and not args.setup_only: + if not args.setup_only: generate_html(config, analyses) # vim: foldmethod=marker ai ts=4 sts=4 et sw=4 ft=python