From 291e3df540300fc8db86bdfaa380cdcf96b4ff4b Mon Sep 17 00:00:00 2001 From: alperaltuntas Date: Sun, 16 Feb 2025 15:17:32 -0700 Subject: [PATCH 1/9] first working version of auto nccombine execution --- diag_manager/diag_manager.F90 | 49 +- diag_manager/mppnccombine.c | 1977 +++++++++++++++++++++++++++++++++ fms2_io/netcdf_io.F90 | 27 + 3 files changed, 2051 insertions(+), 2 deletions(-) create mode 100644 diag_manager/mppnccombine.c diff --git a/diag_manager/diag_manager.F90 b/diag_manager/diag_manager.F90 index 92fdf0e122..06801bb4ac 100644 --- a/diag_manager/diag_manager.F90 +++ b/diag_manager/diag_manager.F90 @@ -208,7 +208,7 @@ MODULE diag_manager_mod & get_ticks_per_second USE mpp_mod, ONLY: mpp_get_current_pelist, mpp_pe, mpp_npes, mpp_root_pe, mpp_sum - USE mpp_mod, ONLY: input_nml_file + USE mpp_mod, ONLY: input_nml_file, mpp_sync USE fms_mod, ONLY: error_mesg, FATAL, WARNING, NOTE, stdout, stdlog, write_version_number,& & fms_error_handler, check_nml_error, lowercase @@ -239,6 +239,7 @@ MODULE diag_manager_mod USE fms_diag_outfield_mod, ONLY: fmsDiagOutfieldIndex_type, fmsDiagOutfield_type USE fms_diag_fieldbuff_update_mod, ONLY: fieldbuff_update, fieldbuff_copy_missvals, & & fieldbuff_copy_fieldvals + use netcdf_io_mod, ONLY: file_path_list, files_to_combine #ifdef use_netCDF USE netcdf, ONLY: NF90_INT, NF90_FLOAT, NF90_CHAR @@ -250,6 +251,10 @@ MODULE diag_manager_mod use diag_axis_mod, only: DIAG_AXIS_UGDOMAIN !---------- +use iso_fortran_env, only: compiler_version +use iso_c_binding, only : c_int, c_char, c_ptr, c_null_ptr, c_null_char +use, intrinsic :: iso_c_binding, only: c_int, c_char + IMPLICIT NONE PRIVATE @@ -258,7 +263,8 @@ MODULE diag_manager_mod & need_data, DIAG_ALL, DIAG_OCEAN, DIAG_OTHER, get_date_dif, DIAG_SECONDS,& & DIAG_MINUTES, DIAG_HOURS, DIAG_DAYS, DIAG_MONTHS, DIAG_YEARS, get_diag_global_att,& & set_diag_global_att, diag_field_add_attribute, diag_field_add_cell_measures,& - & get_diag_field_id, diag_axis_add_attribute, CMOR_MISSING_VALUE, null_axis_id + & get_diag_field_id, diag_axis_add_attribute, CMOR_MISSING_VALUE, null_axis_id,& + & exec_mppnccombine PUBLIC :: CENTER, NORTH, EAST !< Used for diag_axis_init ! Public interfaces from diag_grid_mod PUBLIC :: diag_grid_init, diag_grid_end @@ -369,6 +375,17 @@ MODULE diag_manager_mod MODULE PROCEDURE diag_field_add_attribute_i1d END INTERFACE diag_field_add_attribute +! ----- interface to the C function ----- +interface + function exec_mppnccombine(outfile, infiles) bind(C) + use, intrinsic :: iso_c_binding, only: c_int, c_char + implicit none + character(kind=c_char) :: outfile + character(kind=c_char) :: infiles + integer(c_int) :: exec_mppnccombine + endfunction exec_mppnccombine +endinterface + !> @addtogroup diag_manager_mod !> @{ CONTAINS @@ -3687,12 +3704,40 @@ SUBROUTINE diag_manager_end(time) DO file = 1, num_files CALL closing_file(file, time) END DO + + ! barrier to make sure all io PEs are done closing all files to be combined. + call mpp_sync() + + ! combine partitioned netcdf files into single file + call combine_files() + if (allocated(fileobjU)) deallocate(fileobjU) if (allocated(fileobj)) deallocate(fileobj) if (allocated(fileobjND)) deallocate(fileobjND) if (allocated(fnum_for_domain)) deallocate(fnum_for_domain) END SUBROUTINE diag_manager_end + subroutine combine_files() + integer(c_int) :: ireturn + type(file_path_list), pointer :: current + character(len=:), allocatable :: filepath + character(kind=c_char, len=256) :: outfile, infiles + + ! loop through files to combine: + if (mpp_pe() == mpp_root_pe()) then + current => files_to_combine + do while (associated(current)) + filepath = trim(adjustl(current%path)) + outfile = filepath(1:len(filepath)-5) // c_null_char + infiles = filepath(1:len(filepath)-5) // ".*" // c_null_char + ireturn = exec_mppnccombine(outfile, infiles) + if (ireturn /= 0) call error_mesg('diag_manager_mod::combine_files', 'mppnccombine failed', FATAL) + current => current%next + end do + end if + + end subroutine combine_files + !> @brief Replaces diag_manager_end; close just one file: files(file) SUBROUTINE closing_file(file, time) INTEGER, INTENT(in) :: file diff --git a/diag_manager/mppnccombine.c b/diag_manager/mppnccombine.c new file mode 100644 index 0000000000..42a414fd9c --- /dev/null +++ b/diag_manager/mppnccombine.c @@ -0,0 +1,1977 @@ +/*********************************************************************** + * GNU Lesser General Public License + * + * This file is part of the GFDL FRE NetCDF tools package (FRE-NCTools). + * + * FRE-NCtools is free software: you can redistribute it and/or modify it under + * the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * FRE-NCtools is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with FRE-NCTools. If not, see + * . + **********************************************************************/ +/* + mppnccombine - joins together netCDF data files representing a decomposed + domain into a unified netCDF file. It was originally + designed to be used as a post-processor for the parallel I/O + programming interface "mpp_io_mod" + (http://www.gfdl.noaa.gov/~vb/mpp_io.html) by V. Balaji. + + V2.2.8: Hans.Vahlenkamp@noaa.gov + If the netCDF format for the output file is not chosen with the -64 + or -n4 options then automatically use the netCDF format of the first + input file; except if the first input file is netCDF classic then use + netCDF 64-bit offset instead for the output file. + V2.2.7: Hans.Vahlenkamp@noaa.gov + Synchronize output file before closing and check for errors. + V2.2.6: Seth Underwood + Updates for C99 standards. + V2.2.5: Seth Underwood + Fix for NetCDF files that do not have a time dimension. + V2.2.4: Tushar.Mohan@noaa.gov + Round memory footprint to ceiling integral value. + V2.2.3: Tushar.Mohan@noaa.gov + Fixed handling of -k when -x is set. + Print memory estimate in MB when -x is used without -v. + Fixed help message for -k and -x. + If user sets blocking factor > # records (nrecs), set bf to nrecs + V2.2.2: Tushar.Mohan@noaa.gov + Added a -x option to print estimate resident memory footprint and + exit. + Changed default blocking factor 1, so the combine behaves as the + combine of the past if no "-k" option is set. This is useful + for low-memory nodes. + V2.2.1: Do not bail out when we cannot write variables to output file. + Instead, issue a warning and set an error condition. Continue + processing. + Fixed bug in allocation of memory for decomposed variables that + only showed up in certain rare input conditions. + Added -M to show memory usage statistics. + Added -V to print version information. + V2.2: Added record blocking (see, the -k option) to the memory buffering + code. This significantly improves performance, by buffering multiple + records of decomposed variables in memory. Output I/O performance + improves, due to reduced seeks and larger contiguous writes. We also + reduce the number of file open/close operations in the input files + (by the blocking factor), thus we can expect better performance on + file-systems with expensive metadata operations (such as Lustre). + This performance improvement comes at the cost of increased resident + memory size (by the blocking factor). For questions regarding these + changes, contact: Tushar Mohan (Tushar.Mohan@noaa.gov) 08/03/2011. + Also added fixes to the following bugs: + - flush_decomp ignored errors, when it was not able to + write variables to the output file. We now exit on such errors. + - when -n and -e are used and files are not present then + the message of the number of files remaining was misleading. + - when a range is specified, and not all files are present, + then the -r option was deleting files properly only when + NumInFiles in set was correct. That may occasionally be + be wrongly set in the input. We now correctly remove files, + nevertheless. + V2.1.7: Added option to initialize output variables with a missing_value + from the variables of the first input file as suggested by + Martin Schmidt (martin.schmidt@io-warnemuende.de) and + Franz Tauber (franz.tauber@io-warnemuende.de). + V2.1.6: Bug fixes for greater than 4GB record sizes. Does not contain + V2.1.5 modifications which were a special case. + V2.1.5: Supports running in an MPI environment. Reads arguments from a + configuration file instead of from the command line; this is needed + to work around a bug in Cray's aprun. + V2.1.4: Fixed a bug with file cleanup and the debugging messages. + V2.1.3: Fixed a run-time segmentation fault with some compilers; changed + ncinfile allocation in process_file function. + V2.1.2: Fixed a bug with input files that have decomposed dimensions + defined after the variables that use them. + V2.1.1: Added option (-64) for creating output files with 64-bit offset + format requiring NetCDF 3.6.x. + V2.1: Added an option (-h) to pad the output file's header with empty space. + Added an option (-e #) to specify an ending number to a range of input + filename extensions. It no longer aborts on missing input files, but + gives error messages at the end of all the processing. + V2.0: Substantial rewrite; memory buffering increases speed several times. + V1.2: Added support for specifying the start number in filename extensions. + V1.1.1: Added a fix for dimensions that are not also variables. + V1.1: Changed loop order for increased I/O efficiency; records are now the + innermost loop then the variables loop. + V1.0: Original release. + + Written by Hans Vahlenkamp (Hans.Vahlenkamp@noaa.gov) + Geophysical Fluid Dynamics Laboratory / NOAA + Princeton Forrestal Campus +*/ + +/* Algorithm: + there are k records in a block + + for block b: 1 .. N + for file f: 1 .. n + for record r: 1 .. k + read rec (r) from file (f) + for var v: 1 .. n_vars + if var is not decomposed write to output + if var (v) is decomposed: + if not allocated, allocate memory for var (v), record (r) + write variable (v) into memory buffer + done var + done record + done file + for record r: 1 .. k + for var: 1..n_vars + if decomposed variable, flush variable v for rec r to output + done + done + free memory for all variables for all records in block + done block + */ + + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + + #ifndef MAX_BF + #define MAX_BF 100 /* maximum blocking factor */ + #endif + #ifndef DEFAULT_BF /* default blocking factor, if none set */ + #define DEFAULT_BF 1 + #endif + + /* Block size for NetCDF file open/reads */ + #ifndef NC_BLKSZ + #define NC_BLKSZ 65536 + #endif + + /* Information structure for a file */ + struct fileinfo + { + int ncfid; /* ID of the input netCDF file */ + int ndims; /* Number of dimensions */ + int nvars; /* Number of variables */ + int ngatts; /* Number of global attributes */ + int recdim; /* ID of the record dimensions */ + char varname[MAX_NC_VARS][MAX_NC_NAME]; /* Names of the variables */ + nc_type datatype[MAX_NC_VARS]; /* Data types of the variables */ + int varndims[MAX_NC_VARS]; /* Number of dimensions for each variable */ + int vardim[MAX_NC_VARS][MAX_NC_DIMS]; /* Dimensions for each variable */ + int natts[MAX_NC_VARS]; /* Number of attributes for each variable */ + unsigned char vardecomp[MAX_NC_VARS]; /* Is the variable decomposed */ + char dimname[MAX_NC_DIMS][MAX_NC_NAME]; /* Names of the dimensions */ + long dimsize[MAX_NC_DIMS]; /* Sizes of the dimensions (decomposed) */ + long dimfullsize[MAX_NC_DIMS]; /* Full sizes of the dimensions */ + long dimstart[MAX_NC_DIMS]; /* Start positions within full dimensions */ + long dimend[MAX_NC_DIMS]; /* End positions within full dimensions */ + unsigned char varmiss[MAX_NC_VARS]; /* Does variable have missing_value */ + unsigned char varmissval[MAX_NC_VARS][8]; /* missing_value per variable */ + }; + + /* Auxiliary function prototypes */ + int main_(int argc, char *argv[]); + void usage (); + int process_file (char *, unsigned char, struct fileinfo *, char *, int *, + int *, int *, int *, int, int, int, unsigned char, + unsigned char, int, int, int); + int process_vars (struct fileinfo *, struct fileinfo *, unsigned char, int *, + int *, int *, int, int, int, unsigned char, unsigned char); + +int flush_decomp (struct fileinfo *, int, int, int, unsigned char); + void print_debug (struct fileinfo *, unsigned char); + char *nc_type_to_str (nc_type); + int min (int, int); + + static void ***varbuf + = NULL; /* Buffers for multiple records of decomposed var */ + + struct rusage ruse; /* structure used to store values from getrusage() */ + static unsigned long maxrss = 0; /* maximum memory used so far in kilobytes */ + static int print_mem_usage = 0; + static unsigned long mem_allocated = 0; /* memory allocated so far */ + + static const char version[] = "2024.02"; + + static unsigned long estimated_maxrss = 0; /* see option: -x */ + static int mem_dry_run = 0; /* set if -x option is used */ + + static inline void + check_mem_usage (void) + { + static long prev_rss = 0; + static long PAGE_SIZE = 0; + long rss = 0; + if (PAGE_SIZE == 0) + PAGE_SIZE = sysconf (_SC_PAGESIZE); + if (getrusage (RUSAGE_SELF, &ruse) == 0) + rss = ruse.ru_maxrss * PAGE_SIZE; + if (rss == 0) + { + /* bug in Linux kernel means resident size is reported 0 */ + FILE *f = fopen ("/proc/self/statm", "r"); + if (f != NULL) + { + int discard; + fscanf (f, "%d %ld\n", &discard, &rss); + fclose (f); + rss *= PAGE_SIZE; + } + } + if (rss > maxrss) + maxrss = rss; + printf ("rss=%lu KB, delta=%ld KB, maxrss=%lu KB\n", rss / 1024, + (rss - prev_rss) / 1024, maxrss / 1024); + prev_rss = rss; + return; + } + + static void + print_estimated_mem_footprint (int verbose) + { + if (verbose) + { + printf ("Estimated peak memory resident size (k=1) : %.1f MB\n", + (float)estimated_maxrss / (1024 * 1024)); + printf ("For estimating RSS for a different value of k, multiply above " + "usage by k.\n"); + } + else + printf ("%.0f\n", ceil ((float)estimated_maxrss / (1024 * 1024))); + return; + } + +int iama_function(int *somearg, char *cdata, int clen) { + int i; + for( i=0; i='A' && cdata[i]<='Z') + cdata[i] = cdata[i]+*somearg; + } + *somearg = -(*somearg); // negate it + return(i); +} + +char** find_files(const char* pattern, int* count) { + /* This function uses the glob library to find files matching the given pattern. + * It returns an array of file-path strings and sets the count to the number of + * matched files. + * The caller is responsible for freeing the returned array and its elements. + * Returns NULL if no files match or an error occurs. + * -aa + */ + glob_t globbuf; + + // Initialize glob result structure + memset(&globbuf, 0, sizeof(globbuf)); + + // Perform the glob operation + int result = glob(pattern, GLOB_TILDE, NULL, &globbuf); + if (result != 0) { + // No matches or glob error + *count = 0; + globfree(&globbuf); + return NULL; + } + + *count = (int)globbuf.gl_pathc; + + // Allocate array of file-path strings + char** files = (char**)malloc((*count) * sizeof(char*)); + if (!files) { + // Allocation failure + *count = 0; + globfree(&globbuf); + return NULL; + } + + // Copy each matched path into our array + for (int i = 0; i < *count; i++) { + files[i] = strdup(globbuf.gl_pathv[i]); + if (!files[i]) { + // Handle possible allocation failure + // Free already allocated strings + for (int j = 0; j < i; j++) { + free(files[j]); + } + free(files); + globfree(&globbuf); + *count = 0; + return NULL; + } + //printf("files[%d]=%s\n", i, files[i]); + } + + // Free the memory used by glob + globfree(&globbuf); + + return files; +} + + int exec_mppnccombine(char *outfile, char *infiles){ + /* A wrapper function for main_ that takes a single string of input files + * and a single string of output file. This function is called from diag_manager.F90 + * It expands the input files if a wildcard is given and calls main_. + * It returns the return value of main_. + * -aa + */ + char *infiles_in; // Local copy of infiles argument + char **infiles_array; // Array of strings to hold the list of expanded input files + char **argv; // Array of strings to hold the list of input files + int argc = 0; // Number of arguments to pass to main_ + int file_count = 0; // Number of input files + int iret; // Return value of main_ + + // Get a local copy of infiles argument. This is needed because strtok + // leads to segfault when called on the argument directly. + infiles_in = (char *)malloc(strlen(infiles) + 1); + if (infiles_in == NULL) { + fprintf(stderr, "Error: unable to allocate memory\n"); + return 1; + } + strcpy(infiles_in, infiles); + + // if a list of files delimited by space is given, then split it into an array + if (strchr(infiles_in, ' ') != NULL) { + char *p = strtok(infiles_in, " "); + while (p != NULL) { + file_count++; + p = strtok(NULL, " "); + } + infiles_array = (char **)malloc(file_count * sizeof(char *)); + strcpy(infiles_in, infiles); + p = strtok(infiles_in, " "); + for (int i = 0; i < file_count; i++) { + infiles_array[i] = p; + p = strtok(NULL, " "); + } + } + // if a wildcard is given, then expand it into the + else if (strchr(infiles_in, '*') != NULL || strchr(infiles_in, '?') != NULL) { + infiles_array = find_files(infiles_in, &file_count); + if (infiles_array == NULL || file_count == 0) { + fprintf(stderr, "Error: no files matched or an error occurred\n"); + return 1; + } + } + // if a single file is given, then put it into the array + else { + file_count = 1; + infiles_array = (char **)malloc(1 * sizeof(char *)); + infiles_array[0] = infiles_in; + } + + argc = 2 + file_count; + + argv = (char **)malloc(argc * sizeof(char *)); + argv[0] = "mppnccombine"; + argv[1] = outfile; + for (int i = 0; i < file_count; i++) { + argv[2 + i] = infiles_array[i]; + } + + // call the main mppnccombine function + iret = main_(argc, argv); + + // if successful, remove the partitioned files + if (iret == 0) { + for (int i = 0; i < file_count; i++) { + remove(infiles_array[i]); + } + } + + // free memory + free(infiles_array); + free(infiles_in); + free(argv); + + return iret; + } + + int + main_ (int argc, char *argv[]) + { + unsigned char verbose = 0; /* Print some progress information? */ + unsigned char appendnc = 0; /* Append to an existing netCDF file? */ + unsigned char removein = 0; /* Remove the ".####" decomposed input files? */ + int nstart = 0; /* PE number of the first input netCDF file */ + int nend = (-1); /* PE number of the last input netCDF file */ + int force = 0; /* Allows combining of incomplete input file set */ + int headerpad = 16384; /* Additional padding at the end of the header */ + int format = NC_NOCLOBBER; /* Format of new netCDF output file */ + unsigned char missing = 0; /* Initialize output variables with */ + /* "missing_value" instead of 0 value? */ + int outputarg = (-1); /* Argument # of the output netCDF file */ + int inputarg = (-1); /* Argument # of first input netCDF file */ + struct stat statbuf; /* Dummy structure for file-testing "stat" call */ + struct fileinfo *ncoutfile; /* Information for the output file */ + char outfilename[2048], *strptr; /* Name of the output netCDF file */ + int outlen; /* Length of the output filename */ + char infilename[2048]; /* Name of an input file */ + unsigned char infileerror = 0; /* Errors reading an input file */ + unsigned char infileerrors = 0; /* Errors reading any input files */ + unsigned char outfileerrors = 0; /* error in output file writing */ + int nfiles = (-1); /* Number of files in the decomposed domain */ + int a, f, r, block, k, v; /* Loop variables */ + int status; /* Return status */ + int nrecs = 1; /* Number of records in each decomposed file */ + int bf = DEFAULT_BF; /* default blocking factor: bf records will be read and + written at a time */ + int nblocks + = 1; /* number of iterations of outer loop (default nrecs/bf = )*/ + int peWidth = -1; /* Width of PE number in uncombined file extension */ + size_t blksz = NC_BLKSZ; /* netCDF block size */ + int deflate = 0; /* do not deflate by default */ + int deflation = -1; /* do not deflate by default */ + int shuffle = 0; + + /* Check the command-line arguments */ + if (argc < 2) + { + usage (); + return (1); + } + for (a = 1; a < argc; a++) + { + if (!strcmp (argv[a], "-v")) + verbose = 1; + else if (!strcmp (argv[a], "-vv")) + verbose = 2; /* Hidden debug mode */ + else if (!strcmp (argv[a], "-V")) + { + //print_version ("mppnccombine"); + exit (EXIT_SUCCESS); + } + else if (!strcmp (argv[a], "-M")) + print_mem_usage = 1; + else if (!strcmp (argv[a], "-x")) + { + mem_dry_run = 1; + } + else if (!strcmp (argv[a], "-a")) + appendnc = 1; + else if (!strcmp (argv[a], "-r")) + removein = 1; + else if (!strcmp (argv[a], "-f")) + force = 1; + else if (!strcmp (argv[a], "-n")) + { + a++; + if (a < argc) + nstart = atoi (argv[a]); + else + { + usage (); + return (1); + } + } + else if (!strcmp (argv[a], "-k")) + { + a++; + if (a < argc) + bf = atoi (argv[a]); + else + { + usage (); + return (1); + } + if (mem_dry_run) + continue; + if (bf > MAX_BF) + { + fprintf (stderr, + "Warning: k is set to high. Choosing a more sane value " + "of %d.\n", + MAX_BF); + bf = MAX_BF; + } + } + else if (!strcmp (argv[a], "-e")) + { + a++; + if (a < argc) + nend = atoi (argv[a]); + else + { + usage (); + return (1); + } + } + else if (!strcmp (argv[a], "-h")) + { + a++; + if (a < argc) + headerpad = atoi (argv[a]); + else + { + usage (); + return (1); + } + } + else if (!strcmp (argv[a], "-64")) + format = (NC_NOCLOBBER | NC_64BIT_OFFSET); + else if (!strcmp (argv[a], "-n4")) + format = (NC_NOCLOBBER | NC_NETCDF4 | NC_CLASSIC_MODEL); + else if (!strcmp (argv[a], "-m")) + missing = 1; + else if (!strcmp (argv[a], "-d")) + { + a++; + if (a < argc) + { + deflate = 1; + deflation = atoi (argv[a]); + } + else + { + usage (); + return (1); + } + } + else if (!strcmp (argv[a], "-s")) + shuffle = 1; + else + { + outputarg = a; + break; + } + } + if (outputarg == (-1)) + { + usage (); + return (1); + } + if (argc - 1 > outputarg) + inputarg = outputarg + 1; + sprintf (outfilename, "%s", argv[outputarg]); + outlen = strlen (outfilename); + if (outlen > 4) + { + strptr = outfilename + outlen - 5; + if (!strcmp (strptr, ".0000")) + outfilename[outlen - 5] = '\0'; + } + + /* if -x (estimate memory usage) is set, k will be automatically set to 1 */ + if (mem_dry_run) + { + if (bf != 1) + fprintf (stderr, "-x is set, so blocking factor will be set to 1. The " + "-k option will be ignored.\n"); + bf = 1; + if (verbose) + printf ("This run will estimate peak memory resident size. No output " + "file will be created.\n"); + } + + /* Disable fatal returns from netCDF library functions */ + ncopts = 0; + + if (!mem_dry_run) + { + /* Create a new netCDF output file */ + if ((ncoutfile = (struct fileinfo *)malloc (sizeof (struct fileinfo))) + == NULL) + { + fprintf (stderr, "Error: cannot allocate enough memory!\n"); + return (1); + } + if (!appendnc) + { + if (stat (outfilename, &statbuf) == 0) + { + fprintf (stderr, "Error: output file seems to exist already!\n"); + free (ncoutfile); + return (1); + } + status + = nc__create (outfilename, format, 0, &blksz, &ncoutfile->ncfid); + if (status == (-1)) + { + fprintf (stderr, + "Error: cannot create the output netCDF file!\n"); + free (ncoutfile); + return (1); + } + ncsetfill (ncoutfile->ncfid, NC_NOFILL); + } + /* Open an existing netCDF file for appending */ + else + { + if ((ncoutfile->ncfid = ncopen (outfilename, NC_WRITE)) == (-1)) + { + fprintf (stderr, "Error: cannot open the output netCDF file for " + "appending!\n"); + free (ncoutfile); + return (1); + } + } + } + + /* No input files are specified on the command-line */ + if (inputarg == (-1)) + { + int num_infiles_used + = 0; /* we may use only a subset of the input files */ + if (nend > -1) + for (block = 0; block < nblocks; block++) + { + if (verbose) + printf ("block = %d\n", block); + f = 0; + for (a = nstart; a <= nend; a++) + { + if (peWidth < 0) + { + sprintf (infilename, "%s.%04d", outfilename, a); + if (stat (infilename, &statbuf) == 0) + { + peWidth = 4; + } + else + { + sprintf (infilename, "%s.%06d", outfilename, a); + if (stat (infilename, &statbuf) == 0) + { + peWidth = 6; + } + else + { + continue; + } + } + } + sprintf (infilename, "%s.%0*d", outfilename, peWidth, a); + if (stat (infilename, &statbuf) != 0) + { + if (force == 0) + { + printf ("ERROR: missing at least %s from the input " + "file set. Exiting.\n", + infilename); + unlink (outfilename); + return 9; + } + else + { + infileerrors = 1; + } + } + if (verbose) + { + if (block == 0) + printf (" n files to go... "); + else + printf (" %d files to go... ", num_infiles_used - f); + printf ("processing \"%s\"\n", infilename); + } + infileerror = process_file ( + infilename, appendnc, ncoutfile, outfilename, &nfiles, + &nrecs, &nblocks, &bf, block, f, headerpad, verbose, + missing, deflate, deflation, shuffle); + if (infileerror) + infileerrors = 1; + appendnc = 1; + f++; + if (f == nfiles || a == nend) + { + if (mem_dry_run) + { + print_estimated_mem_footprint (verbose); + exit (0); + } + if (verbose > 1) + printf (" Write variables from previous %d files\n", f); + for (r = block * bf; r < min ((block + 1) * bf, nrecs); + r++) + outfileerrors + += flush_decomp (ncoutfile, nfiles, r, bf, verbose); + num_infiles_used = f; + break; + } + } + if (num_infiles_used == 0) + { + fprintf (stderr, "Warning: No input file found in range.\n"); + unlink (outfilename); + exit (1); + } + /* free the memory before reading the next block */ + if (verbose && print_mem_usage) + printf ("freeing memory allocated in block\n"); + for (k = 0; k < bf; k++) + for (v = 0; v < NC_MAX_VARS; v++) + if (varbuf && (varbuf[k][v] != NULL)) + { + free (varbuf[k][v]); + varbuf[k][v] = NULL; + } + } + else + { + nend = nstart + 1; + for (block = 0; block < nblocks; block++) + { + if (verbose) + printf ("block = %d\n", block); + f = 0; + for (a = nstart; a < nend; a++) + { + if (peWidth < 0) + { + sprintf (infilename, "%s.%04d", outfilename, a); + if (stat (infilename, &statbuf) == 0) + { + peWidth = 4; + } + else + { + sprintf (infilename, "%s.%06d", outfilename, a); + if (stat (infilename, &statbuf) == 0) + { + peWidth = 6; + } + else + { + continue; + } + } + } + sprintf (infilename, "%s.%0*d", outfilename, peWidth, a); + if (stat (infilename, &statbuf) != 0) + { + if (force == 0) + { + printf ("ERROR: missing at least %s from the input " + "file set. Exiting.\n", + infilename); + unlink (outfilename); + return 9; + } + else + { + infileerrors = 1; + } + } + if (verbose) + { + if (a == nstart && block == 0) + printf (" n files to go... "); + else + printf (" %d files to go... ", nend - a); + printf ("processing \"%s\"\n", infilename); + } + infileerror = process_file ( + infilename, appendnc, ncoutfile, outfilename, &nfiles, + &nrecs, &nblocks, &bf, block, f, headerpad, verbose, + missing, deflate, deflation, shuffle); + if (infileerror) + infileerrors = 1; + if (a == nstart && nfiles > 0) + nend = nstart + nfiles; + appendnc = 1; + f++; + if (f == nfiles || a == (nend - 1)) + { + if (mem_dry_run) + { + print_estimated_mem_footprint (verbose); + exit (0); + } + if (verbose > 1) + printf (" Write variables from previous %d files\n", + f); + for (r = block * bf; r < min ((block + 1) * bf, nrecs); + r++) + outfileerrors += flush_decomp (ncoutfile, nfiles, r, + bf, verbose); + f = 0; + continue; + } + } + /* free the memory before reading the next block */ + if (verbose && print_mem_usage) + printf ("freeing memory allocated in block\n"); + for (k = 0; k < bf; k++) + for (v = 0; v < NC_MAX_VARS; v++) + if (varbuf && (varbuf[k][v] != NULL)) + { + free (varbuf[k][v]); + varbuf[k][v] = NULL; + } + } + } + } + /* Loop over all the specified input files */ + else + for (block = 0; block < nblocks; block++) + { + if (verbose) + printf ("block = %d\n", block); + f = 0; + for (a = inputarg; a < argc; a++) + { + if (verbose) + { + if ((argc - a) == 1) + printf (" 1 file to go... "); + else + printf (" %d files to go... ", argc - a); + printf ("processing \"%s\"\n", argv[a]); + } + if (stat (argv[a], &statbuf) != 0) + { + if (force == 0) + { + printf ("ERROR: missing at least %s from the input file " + "set. Exiting.\n", + argv[a]); + unlink (outfilename); + return 9; + } + else + { + infileerrors = 1; + } + } + infileerror = process_file (argv[a], appendnc, ncoutfile, + outfilename, &nfiles, &nrecs, &nblocks, + &bf, block, f, headerpad, verbose, + missing, deflate, deflation, shuffle); + if (infileerror) + infileerrors = 1; + appendnc = 1; + f++; + if (f == nfiles || a == (argc - 1)) + { + if (mem_dry_run) + { + print_estimated_mem_footprint (verbose); + exit (0); + } + if (verbose > 1) + printf (" Write variables from previous %d files\n", f); + for (r = block * bf; r < min ((block + 1) * bf, nrecs); r++) + outfileerrors + += flush_decomp (ncoutfile, nfiles, r, bf, verbose); + f = 0; + continue; + } + } + /* free the memory before reading the next block */ + if (verbose && print_mem_usage) + printf ("freeing memory allocated in block\n"); + for (k = 0; k < bf; k++) + for (v = 0; v < NC_MAX_VARS; v++) + if (varbuf && (varbuf[k][v] != NULL)) + { + free (varbuf[k][v]); + varbuf[k][v] = NULL; + } + } + + /* Cleanup and check for any input or output file errors */ + if (ncsync (ncoutfile->ncfid) == (-1)) + outfileerrors++; + if (ncclose (ncoutfile->ncfid) == (-1)) + outfileerrors++; + free (ncoutfile); + if ((!infileerrors) && (!outfileerrors)) + { + if (removein) + { + /* No input files are specified on the command-line */ + if (inputarg == (-1)) + { + f = 0; + for (a = nstart; a <= nend; a++) + { + // commenting line below, as it's a bug + // occasionally nfiles may be wrongly set to zero, + // and yet the user wants to remove the input files in a + // range. + // if (++f > nfiles) break; + if (peWidth < 0) + { + sprintf (infilename, "%s.%04d", outfilename, a); + if (stat (infilename, &statbuf) == 0) + { + peWidth = 4; + } + else + { + sprintf (infilename, "%s.%06d", outfilename, a); + if (stat (infilename, &statbuf) == 0) + { + peWidth = 6; + } + else + { + continue; + } + } + } + sprintf (infilename, "%s.%0*d", outfilename, peWidth, a); + if (stat (infilename, &statbuf) != 0) + continue; + if (verbose) + printf ("Removing \"%s\"\n", infilename); + unlink (infilename); + } + } + /* Loop over all the specified input files */ + else + for (a = inputarg; a < argc; a++) + { + if (stat (argv[a], &statbuf) != 0) + continue; + if (verbose) + printf ("Removing \"%s\"\n", argv[a]); + unlink (argv[a]); + } + } + return (0); + } + else + { + fprintf (stderr, "Warning: output file may be incomplete!\n"); + return (1); + } + } + + /* Print the usage message for mppnccombine */ + void + usage () + { + printf ("mppnccombine %s - (written by Hans.Vahlenkamp@noaa.gov)\n\n", + version); + printf ("Usage: mppnccombine [-v] [-V] [-M] [-a] [-r] [-n #] [-k #] [-e #] " + "[-h #] [-64] [-n4 [-d #] [-s]] [-m]\n"); + printf (" output.nc [input ...]\n\n"); + printf (" -v Print some progress information.\n"); + printf (" -V Print version information.\n"); + printf (" -M Print memory usage statistics.\n"); + printf ( + " -f Force combine to happen even if input files are missing.\n"); + printf ( + " -a Append to an existing netCDF file (not heavily tested...).\n"); + printf (" -r Remove the \".####\" decomposed files after a successful " + "run.\n"); + printf (" -n # Input filename extensions start with number #### instead " + "of 0000.\n"); + printf (" -k # Blocking factor. k records are read from an input file at " + "a time.\n"); + printf (" Valid values are between 0 and %d. For a given input, the " + "maximum\n", + MAX_BF); + printf ( + " permissible value for k is min(total number of records, %d).\n", + MAX_BF); + printf (" Setting k to zero will set the blocking factor to this " + "maximum\n"); + printf (" permissible value. Setting k to a value higher than this " + "value,\n"); + printf (" will make the system implicitly set k to the highest " + "permissible value.\n"); + printf (" A value of 1 for k disables blocking. This is the default " + "behavior.\n"); + printf (" Blocking often improves performance, but increases the " + "peak memory\n"); + printf (" footprint (by the blocking factor). Beware of running out " + "of\n"); + printf (" available physical memory and causing swapping to disk due " + "to\n"); + printf (" large blocking factors and/or large input datasets.\n"); + printf ( + " A value of 10 for k has worked well on many input datasets.\n"); + printf ( + " See -x for estimating memory usage for a given input set.\n"); + printf (" -e # Ending number #### of a specified range of input filename " + "extensions.\n"); + printf (" Files within the range do not have to be consecutively " + "numbered.\n"); + printf (" -h # Add a specified number of bytes of padding at the end of " + "the header.\n"); + printf ( + " -64 Create netCDF output files with the 64-bit offset format.\n"); + printf (" -n4 Create netCDF output files in NETCDF4_CLASSIC mode (no v4 " + "enhanced features).\n"); + printf (" -d # When in NETCDF4 mode, use deflation of level #.\n"); + printf (" -s When in NETCDF4 mode, use shuffle.\n"); + printf (" -m Initialize output variables with a \"missing_value\" from " + "the variables\n"); + printf (" of the first input file instead of the default 0 value.\n"); + printf (" -x Print an estimate for peak memory resident size in (MB) " + "and exit.\n"); + printf (" No output file will be created. Setting -x automatically " + "sets\n"); + printf ( + " the blocking factor (-k) to 1. Any value set for -k on the\n"); + printf (" command-line will be ignored. To estimate memory usage for " + "a\n"); + printf (" a different blocking factor, simply multiply the estimate " + "by k.\n\n"); + printf ("mppnccombine joins together an arbitrary number of netCDF input " + "files, each\n"); + printf ("containing parts of a decomposed domain, into a unified netCDF " + "output file.\n"); + printf ("An output file must be specified and it is assumed to be the first " + "filename\n"); + printf ("argument. If the output file already exists, then it will not be " + "modified\n"); + printf ("unless the option is chosen to append to it. If no input files " + "are specified\n"); + printf ("then their names will be based on the name of the output file plus " + "the default\n"); + printf ("numeric extension \".0000\", which will increment by 1. There is " + "an option for\n"); + printf ("starting the filename extensions with an arbitrary number instead " + "of 0. There\n"); + printf ("is an option for specifying an end to the range of filename " + "extension numbers;\n"); + printf ("files within the range do not have to be consecutively numbered. " + "If input\n"); + printf ("files are specified then names will be used verbatim.\n\n"); + printf ("A value of 0 is returned if execution completed successfully; a " + "value of 1\n"); + printf ("otherwise.\n"); + } + + int + min (int a, int b) + { + if (a < b) + return a; + return b; + } + + /* Open an input file and get some information about it, define the */ + /* structure of the output file if necessary, prepare to copy all the */ + /* variables for the current block to memory (and non-decomposed variables */ + /* will be written to the output file */ + int + process_file (char *ncname, unsigned char appendnc, struct fileinfo *ncoutfile, + char *outncname, int *nfiles, int *nrecs, int *nblocks, int *bf, + int block, int f, int headerpad, unsigned char verbose, + unsigned char missing, int deflate, int deflation, int shuffle) + { + struct fileinfo *ncinfile; /* Information about an input netCDF file */ + int nfiles2; /* Number of files in the decomposed domain */ + int ncinformat, ncoutformat; /* Format of input and output netCDF files */ + int d, v, n; /* Loop variables */ + int dimid; /* ID of a dimension */ + int decomp[4]; /* "domain_decomposition = #0, #1, #2, #3" attribute */ + /* #0 starting position of original dimension */ + /* #1 ending position of original dimension */ + /* #2 starting position of decomposed dimension */ + /* #3 ending position of decomposed dimension */ + char attname[MAX_NC_NAME]; /* Name of a global or variable attribute */ + unsigned char ncinfileerror = 0; /* Were there any file errors? */ + size_t blksz = NC_BLKSZ; /* netCDF block size */ + + if (print_mem_usage) + check_mem_usage (); + + /* Information for netCDF input file */ + if ((ncinfile = (struct fileinfo *)malloc (sizeof (struct fileinfo))) + == NULL) + { + fprintf (stderr, "Error: cannot allocate enough memory!\n"); + return (1); + } + + /* Open an input netCDF file */ + if ((ncinfile->ncfid = ncopen (ncname, NC_NOWRITE)) == (-1)) + { + fprintf (stderr, "Error: cannot open input file \"%s\"\n", ncname); + free (ncinfile); + return (1); + } + + /* Determine the number of files in the decomposed domain */ + if (ncattget (ncinfile->ncfid, NC_GLOBAL, "NumFilesInSet", (void *)&nfiles2) + == (-1)) + { + if (*nfiles == 1) + { + fprintf (stderr, + "Error: missing the \"NumFilesInSet\" global attribute!\n"); + return (1); + } + else if (*nfiles == (-1)) + { + fprintf ( + stderr, + "Warning: missing the \"NumFilesInSet\" global attribute.\n"); + } + } + *nfiles = nfiles2; + + /* Get some general information about the input netCDF file */ + if (ncinquire (ncinfile->ncfid, &(ncinfile->ndims), &(ncinfile->nvars), + &(ncinfile->ngatts), &(ncinfile->recdim)) + == (-1)) + { + fprintf (stderr, "Error: cannot read the file's metadata!\n"); + ncclose (ncinfile->ncfid); + free (ncinfile); + return (1); + } + + /* Get some information about the dimensions */ + for (d = 0; d < ncinfile->ndims; d++) + { + if ((ncdiminq (ncinfile->ncfid, d, ncinfile->dimname[d], + &(ncinfile->dimsize[d]))) + == (-1)) + { + fprintf (stderr, "Error: cannot read dimension #%d's metadata!\n", + d); + ncclose (ncinfile->ncfid); + free (ncinfile); + return (1); + } + ncinfile->dimfullsize[d] = ncinfile->dimsize[d]; + ncinfile->dimstart[d] = 1; + ncinfile->dimend[d] = (-1); + } + + /* Save some information for the output file */ + if ((block == 0) && (!mem_dry_run)) + { + ncoutfile->nvars = ncinfile->nvars; + ncoutfile->recdim = ncinfile->recdim; + } + + /* Get some information about the variables */ + for (v = 0; v < ncinfile->nvars; v++) + { + if ((ncvarinq (ncinfile->ncfid, v, ncinfile->varname[v], + &(ncinfile->datatype[v]), &(ncinfile->varndims[v]), + ncinfile->vardim[v], &(ncinfile->natts[v]))) + == (-1)) + { + fprintf (stderr, "Error: cannot read variable #%d's metadata!\n", v); + ncclose (ncinfile->ncfid); + free (ncinfile); + return (1); + } + + /* If the variable is also a dimension then get decomposition info */ + if ((dimid = ncdimid (ncinfile->ncfid, ncinfile->varname[v])) != (-1)) + { + if (ncattget (ncinfile->ncfid, v, "domain_decomposition", + (void *)decomp) + != (-1)) + { + /* the dimension is decomposed */ + ncinfile->dimfullsize[dimid] = decomp[1] - decomp[0] + 1; + ncinfile->dimstart[dimid] = decomp[2] - (decomp[0] - 1); + ncinfile->dimend[dimid] = decomp[3] - (decomp[0] - 1); + } + else + { + /* the dimension is NOT decomposed */ + ncinfile->dimfullsize[dimid] = ncinfile->dimsize[dimid]; + ncinfile->dimstart[dimid] = 1; + ncinfile->dimend[dimid] = (-1); + } + } + } + + /* Get some additional information about the variables */ + for (v = 0; v < ncinfile->nvars; v++) + { + + /* start by assuming the variable has no decomposed dimension */ + ncinfile->vardecomp[v] = 0; + + /* now, iterate over the variable's dimensions and mark the */ + /* variable as a decomposed variable if any dimension of */ + /* the variable is decomposed */ + for (d = 0; d < ncinfile->varndims[v]; d++) + { + /* Does the variable have a decomposed dimension? */ + if (ncinfile->dimend[ncinfile->vardim[v][d]] != (-1)) + { + ncinfile->vardecomp[v] = 1; + break; + } + } + + /* Save some information for the output file */ + /* This only needs to be done once per output file */ + if ((!appendnc) && (!mem_dry_run)) + { + ncoutfile->varndims[v] = ncinfile->varndims[v]; + for (d = 0; d < ncinfile->ndims; d++) + ncoutfile->dimfullsize[d] = ncinfile->dimfullsize[d]; + for (d = 0; d < ncinfile->varndims[v]; d++) + ncoutfile->vardim[v][d] = ncinfile->vardim[v][d]; + ncoutfile->vardecomp[v] = ncinfile->vardecomp[v]; + strcpy (ncoutfile->varname[v], ncinfile->varname[v]); + ncoutfile->varmiss[v] = 0; + } + } + + /* If the output netCDF file was just created then define its structure */ + if ((!appendnc) && (!mem_dry_run)) + { + if (verbose) + printf (" Creating output \"%s\"\n", outncname); + + /* Determine the format of the input netCDF file */ + if (nc_inq_format (ncinfile->ncfid, &ncinformat) == (-1)) + { + fprintf (stderr, "Error: cannot read the input file format!\n"); + ncclose (ncinfile->ncfid); + free (ncinfile); + return (1); + } + + /* Determine the format of the output netCDF file */ + if (nc_inq_format (ncoutfile->ncfid, &ncoutformat) == (-1)) + { + fprintf (stderr, "Error: cannot read the output file format!\n"); + ncclose (ncinfile->ncfid); + free (ncinfile); + return (1); + } + + if (verbose) + printf (" ncinformat=%d, ncoutformat=%d\n", ncinformat, + ncoutformat); + + /* If the format option (-64 or -n4) for the output netCDF file + * is not specified then recreate the output netCDF file based + * upon the format of the input netCDF file. */ + if (ncoutformat == NC_FORMAT_CLASSIC) + { + if (ncinformat == NC_FORMAT_CLASSIC + || ncinformat == NC_FORMAT_64BIT_OFFSET) + { + ncoutformat = (NC_CLOBBER | NC_64BIT_OFFSET); + if (verbose) + printf (" ncoutformat reset to NC_64BIT_OFFSET \n"); + } + else if (ncinformat == NC_FORMAT_NETCDF4) + { + ncoutformat = (NC_CLOBBER | NC_NETCDF4); + if (verbose) + printf (" ncoutformat reset to NC_NETCDF4\n"); + } + else if (ncinformat == NC_FORMAT_NETCDF4_CLASSIC) + { + ncoutformat = (NC_CLOBBER | NC_NETCDF4 | NC_CLASSIC_MODEL); + if (verbose) + printf (" ncoutformat reset to NC_NETCDF4 with " + "NC_CLASSIC_MODEL\n"); + } + else if (ncinformat == NC_FORMAT_64BIT_DATA) + { + ncoutformat = (NC_CLOBBER | NC_64BIT_DATA); + if (verbose) + printf (" ncoutformat reset to NC_64BIT_DATA\n"); + } + /* close the file to recreate it */ + ncclose (ncoutfile->ncfid); + if (nc__create (outncname, ncoutformat, 0, &blksz, &ncoutfile->ncfid) + == (-1)) + { + fprintf (stderr, + "Error: cannot create the output netCDF file!\n"); + free (ncoutfile); + return (1); + } + ncsetfill (ncoutfile->ncfid, NC_NOFILL); + } + + /* Define the dimensions */ + for (d = 0; d < ncinfile->ndims; d++) + { + if (d == ncinfile->recdim) + ncdimdef (ncoutfile->ncfid, ncinfile->dimname[d], NC_UNLIMITED); + else + ncdimdef (ncoutfile->ncfid, ncinfile->dimname[d], + ncinfile->dimfullsize[d]); + } + + /* Define the variables and copy their attributes */ + for (v = 0; v < ncinfile->nvars; v++) + { + ncvardef (ncoutfile->ncfid, ncinfile->varname[v], + ncinfile->datatype[v], ncinfile->varndims[v], + ncinfile->vardim[v]); + for (n = 0; n < ncinfile->natts[v]; n++) + { + ncattname (ncinfile->ncfid, v, n, attname); + if (missing) + { + if (!strcmp (attname, "missing_value")) + { + ncoutfile->varmiss[v] = 1; + ncattget (ncinfile->ncfid, v, "missing_value", + (void *)(ncoutfile->varmissval[v])); + } + } + if (!strcmp (attname, "domain_decomposition")) + continue; + else + { + if (ncattcopy (ncinfile->ncfid, v, attname, ncoutfile->ncfid, + v) + == (-1)) + { + fprintf ( + stderr, + "Error: cannot copy variable \"%s\"'s attributes!\n", + ncinfile->varname[v]); + free (ncinfile); + return (1); + } + } + } + } + + /* Copy the global attributes */ + for (n = 0; n < ncinfile->ngatts; n++) + { + ncattname (ncinfile->ncfid, NC_GLOBAL, n, attname); + if (!strcmp (attname, "NumFilesInSet")) + continue; + else if (!strcmp (attname, "filename")) + ncattput (ncoutfile->ncfid, NC_GLOBAL, attname, NC_CHAR, + strlen (outncname), (void *)outncname); + else + { + if (ncattcopy (ncinfile->ncfid, NC_GLOBAL, attname, + ncoutfile->ncfid, NC_GLOBAL) + == (-1)) + { + fprintf ( + stderr, + "Error: cannot copy the file's global attributes!\n"); + return (1); + } + } + } + + if (deflate == 1 && deflation > 0) + { + for (v = 0; v < ncinfile->nvars; v++) + { + if (nc_def_var_deflate (ncoutfile->ncfid, v, shuffle, deflate, + deflation) + != NC_NOERR) + { + fprintf (stderr, "Error: nc_def_var_deflate failed! Are you " + "setting NETCDF4?\n"); + usage (); + } + } + } + + /* Definitions done */ + nc__enddef (ncoutfile->ncfid, headerpad, 4, 0, 4); + } + + /* Copy all data values of the dimensions and variables to memory */ + /* For non-decomposed variables, process_vars will write them to the */ + /* output file. Decomposed variables for N records from this file will */ + /* be written to memory, where they will eventually get merged with those */ + /* from other input files */ + int r = block * (*bf); // the position of r is absolute + unsigned long mem_for_rec, tmp_mem_alloc; + do + { + tmp_mem_alloc + = mem_allocated; /* store current memory usage in a temporary */ + ncinfileerror + += process_vars (ncinfile, ncoutfile, appendnc, nrecs, nblocks, bf, + r, *nfiles, f, verbose, missing); + mem_for_rec = mem_allocated - tmp_mem_alloc; + if (verbose && print_mem_usage) + if (mem_for_rec > 0) + printf (" mem alloc for r=%d, infile=%s is %lu KB\n", r, ncname, + mem_for_rec / 1024); + r++; + appendnc = 1; + } + while ( + r < min ( + ((block + 1) * (*bf)), + *nrecs)); // r is a minimum of the next block start point and nrecs + + /* Done */ + ncclose (ncinfile->ncfid); + free (ncinfile); + return (ncinfileerror); + } + + /* Decomposed variables from an input file and record will be written to memory + */ + /* non-decomposed variables will be written to the output file */ + int + process_vars (struct fileinfo *ncinfile, struct fileinfo *ncoutfile, + unsigned char appendnc, int *nrecs, int *nblocks, int *bf, int r, + int nfiles, int f, unsigned char verbose, unsigned char missing) + { + int v, d, i, j, k, l, s; /* Loop variables */ + int dimid; /* ID of a dimension */ + void *values = NULL; /* Current data values */ + long instart[MAX_NC_DIMS], outstart[MAX_NC_DIMS]; /* Data array sizes */ + long count[MAX_NC_DIMS]; /* " */ + long long recsize; /* Decomposed size of one record of a variable */ + long long recfullsize; /* Non-decomposed size of one record of a variable */ + int varrecdim; /* Variable's record dimension */ + static unsigned char first = 1; /* First time reading variables? */ + int imax, jmax, kmax, lmax; + long long int imaxfull, jmaxfull, kmaxfull, lmaxfull; + long long int imaxjmaxfull, imaxjmaxkmaxfull; + long long int offset, ioffset, joffset, koffset, loffset; + long long int b; + int recdimsize; /* Using a local value to correct issue when netcdf file does + not have a record dimension */ + long long varbufsize; + + if (ncinfile->recdim < 0) + recdimsize = 1; + else + recdimsize = ncinfile->dimsize[ncinfile->recdim]; + + /* Check the number of records */ + if (*nrecs == 1) + { + *nrecs = recdimsize; + + /* adjust bf */ + if ((*bf) >= 1) + { + if ((*bf) > (*nrecs)) + { + fprintf (stderr, + "blocking factor (k) > total records (%d). Setting " + "blocking factor to %d.\n", + *nrecs, *nrecs); + *bf = *nrecs; + } + } + else + { + /* bf was set to zero, so we do full buffering */ + *bf = min ( + MAX_BF, + *nrecs); // we use the maximum blocking factor in our capacity + /* normally we'll have one block, unless we hit MAX_BF */ + } + /* find nblocks */ + if (((*nrecs) % (*bf)) != 0) + *nblocks = (int)((*nrecs) / (*bf)) + 1; + else + *nblocks = (int)((*nrecs) / (*bf)); + + if (verbose) + fprintf (stderr, + "blocking factor=%d, num. blocks=%d, num. records=%d\n", *bf, + *nblocks, *nrecs); + } + else if (recdimsize != *nrecs) + { + fprintf ( + stderr, + "Error: different number of records than the first input file!\n"); + return (1); + } + + /* Allocate memory for the decomposed variables, if none has been allocated + yet We use an optimized algorithm to malloc and set up a double dimension + array using a single malloc call. We do the cross-linking after the + malloc, so the entire allocation appears to be a double-dimensional array. + The memory allocated below is a trivial amount. The real allocation will + happen later. This allocation is done exactly once in the whole program */ + if (varbuf == NULL) + { + int nbytes + = (*bf) * sizeof (void **) + ((*bf) * MAX_NC_VARS * sizeof (void *)); + if (verbose || print_mem_usage) + fprintf (stderr, + "allocating a buffer of %d bytes for the multi-dimensional " + "pointer array\n", + nbytes); + if (mem_dry_run) + estimated_maxrss += nbytes; + varbuf = (void ***)calloc (nbytes, 1); + if (varbuf == NULL) + { + fprintf (stderr, "Could not allocate a memory of size %lu bytes\n", + sizeof (void *) * (*bf) * MAX_NC_VARS); + exit (1); + } + /* now initialize the buffer to create a mult-dimensional array */ + int z; + for (z = 0; z < (*bf); z++) + { + varbuf[z] = (void **)((size_t)varbuf + (*bf) * sizeof (void **) + + z * MAX_NC_VARS * sizeof (void *)); + } + /* The nested memory alloc works fine, but it involves multiple malloc + calls. varbuf = (void ***)malloc((*bf) * sizeof(void **)); if (varbuf + == NULL) { fprintf(stderr, "Could not allocate memory\n"); exit(1); + } + int z; + for (z=0; z<(*bf); z++) { + varbuf[z] = calloc(MAX_NC_VARS*sizeof(void*),1); + if (varbuf == NULL) { + fprintf(stderr, "Could not allocate memory\n"); + exit(1); + } + } + */ + } /* end of memory allocation, done once per block */ + + /* Loop over all the variables */ + for (v = 0; v < ncinfile->nvars; v++) + { + if (verbose > 1) + printf (" variable = %s\n", ncinfile->varname[v]); + + /* Get read/write dimension sizes for the variable */ + recsize = 1; + recfullsize = 1; + varrecdim = (-1); + outstart[0] = 0; + outstart[1] = 0; + outstart[2] = 0; + outstart[3] = 0; + for (d = 0; d < ncinfile->varndims[v]; d++) + { + if (ncinfile->vardim[v][d] == ncinfile->recdim) + { + count[d] = 1; + varrecdim = d; + } + else + { + count[d] = ncinfile->dimsize[ncinfile->vardim[v][d]]; + recsize *= count[d]; + instart[d] = 0; + outstart[d] = ncinfile->dimstart[ncinfile->vardim[v][d]] - 1; + recfullsize *= ncinfile->dimfullsize[ncinfile->vardim[v][d]]; + } + if (verbose > 1) + printf (" dim %d: instart=%ld outstart=%ld count=%ld\n", d, + instart[d], outstart[d], count[d]); + } + + /* Prevent unnecessary reads/writes */ + if (r > 0) + { + /* Prevent unnecessary reads/writes of the dimensions */ + if ((dimid = ncdimid (ncinfile->ncfid, ncinfile->varname[v])) + != (-1)) + { + if (ncinfile->recdim == dimid) + { + if (f != 0) + continue; + } + else + continue; + } + /* Prevent unnecessary reads/writes of the variables */ + else + { + /* Prevent unnecessary reads/writes of non-decomposed variables + if (ncinfile->vardecomp[v]!=1 && appendnc) continue; */ + + /* Non-record variables */ + if (varrecdim == (-1)) + continue; + + /* Non-decomposed record variables */ + if (ncinfile->vardecomp[v] != 1 && f > 0) + continue; + } + } + else + { + if (ncinfile->vardecomp[v] != 1 && appendnc) + continue; + } + + /* Allocate a buffer for the variable's record */ + if ((values = malloc (nctypelen (ncinfile->datatype[v]) * recsize)) + == NULL) + { + fprintf (stderr, + "Error: cannot allocate %lld bytes for decomposed variable " + "\"%s\"'s values!\n", + nctypelen (ncinfile->datatype[v]) * recsize, + ncinfile->varname[v]); + return (1); + } + + /* Read the variable */ + if (varrecdim != (-1)) + instart[varrecdim] = outstart[varrecdim] = r; + if (ncvarget (ncinfile->ncfid, v, instart, count, values) == (-1)) + { + fprintf (stderr, "Error: cannot read variable \"%s\"'s values!\n", + ncinfile->varname[v]); + return (1); + } + + /* Write the buffered variable immediately if it's not decomposed */ + if ((ncinfile->vardecomp[v] != 1) && (!mem_dry_run)) + { + if (verbose > 1) + printf (" writing %lld bytes to file\n", + nctypelen (ncinfile->datatype[v]) * recsize); + if (ncvarput (ncoutfile->ncfid, v, outstart, count, values) == (-1)) + { + fprintf (stderr, + "Error: cannot write variable \"%s\"'s values!\n", + ncinfile->varname[v]); + return (1); + } + } + /* Save the buffer */ + else + { + /* Allocate a buffer for the variable's non-decomposed record size */ + /* rather than checking for whether this is the first file, and so */ + /* we do this once per record per variable for each block */ + /* if (f==0) */ + if (varbuf[(r % (*bf))][v] == NULL) + { + varbufsize = nctypelen (ncinfile->datatype[v]) * recfullsize; + if (verbose && print_mem_usage) + printf (" allocating %lld bytes for full domain of " + "variable %s\n", + varbufsize, ncinfile->varname[v]); + if (mem_dry_run) + { + estimated_maxrss += varbufsize; + varbuf[(r % (*bf))][v] = "deadbeef"; + continue; + } + if ((varbuf[(r % (*bf))][v] = calloc (varbufsize, 1)) == NULL) + { + fprintf (stderr, + "Error: cannot allocate %lld bytes for entire " + "variable \"%s\"'s values!\n", + varbufsize, ncinfile->varname[v]); + return (1); + } + mem_allocated += varbufsize; + if (missing && ncoutfile->varmiss[v]) + switch (ncinfile->datatype[v]) + { + case NC_BYTE: + case NC_CHAR: + for (s = 0; s < recfullsize; s++) + *((unsigned char *)(varbuf[(r % (*bf))][v]) + s) + = *((unsigned char *)(ncoutfile->varmissval[v])); + break; + case NC_SHORT: + for (s = 0; s < recfullsize; s++) + *((short *)(varbuf[(r % (*bf))][v]) + s) + = *((short *)(ncoutfile->varmissval[v])); + break; + case NC_INT: + for (s = 0; s < recfullsize; s++) + *((int *)(varbuf[(r % (*bf))][v]) + s) + = *((int *)(ncoutfile->varmissval[v])); + break; + case NC_FLOAT: + for (s = 0; s < recfullsize; s++) + *((float *)(varbuf[(r % (*bf))][v]) + s) + = *((float *)(ncoutfile->varmissval[v])); + break; + case NC_DOUBLE: + for (s = 0; s < recfullsize; s++) + *((double *)(varbuf[(r % (*bf))][v]) + s) + = *((double *)(ncoutfile->varmissval[v])); + break; + } + } + if (varbuf[(r % (*bf))][v] == NULL) + { + fprintf (stderr, "Internal memory usage error!\n"); + exit (1); + } + if (mem_dry_run) + continue; + if (verbose > 1) + printf (" writing %lld bytes to memory\n", + nctypelen (ncinfile->datatype[v]) * recsize); + + imax = ncinfile + ->dimsize[ncinfile->vardim[v][ncinfile->varndims[v] - 1]]; + if (ncinfile->varndims[v] > 1) + { + dimid = ncinfile->vardim[v][ncinfile->varndims[v] - 2]; + if (dimid == ncinfile->recdim) + jmax = 1; + else + jmax = ncinfile->dimsize[dimid]; + } + else + jmax = 1; + if (ncinfile->varndims[v] > 2) + { + dimid = ncinfile->vardim[v][ncinfile->varndims[v] - 3]; + if (dimid == ncinfile->recdim) + kmax = 1; + else + kmax = ncinfile->dimsize[dimid]; + } + else + kmax = 1; + if (ncinfile->varndims[v] > 3) + { + dimid = ncinfile->vardim[v][ncinfile->varndims[v] - 4]; + if (dimid == ncinfile->recdim) + lmax = 1; + else + lmax = ncinfile->dimsize[dimid]; + } + else + lmax = 1; + if (verbose > 1) + printf (" imap=%d jmax=%d kmax=%d lmax=%d\n", imax, jmax, + kmax, lmax); + + imaxfull + = ncinfile->dimfullsize[ncinfile->vardim[v][ncinfile->varndims[v] + - 1]]; + if (ncinfile->varndims[v] > 1) + jmaxfull = ncinfile->dimfullsize + [ncinfile->vardim[v][ncinfile->varndims[v] - 2]]; + else + jmaxfull = 1; + if (ncinfile->varndims[v] > 2) + kmaxfull = ncinfile->dimfullsize + [ncinfile->vardim[v][ncinfile->varndims[v] - 3]]; + else + kmaxfull = 1; + if (ncinfile->varndims[v] > 3) + { + if (ncinfile->vardim[v][ncinfile->varndims[v] - 4] + != ncinfile->recdim) + lmaxfull + = ncinfile->dimfullsize + [ncinfile->vardim[v][ncinfile->varndims[v] - 4]]; + else + lmaxfull = 1; + } + else + lmaxfull = 1; + if (verbose > 1) + printf (" imaxfull=%lld jmaxfull=%lld kmaxfull=%lld " + "lmaxfull=%lld\n", + imaxfull, jmaxfull, kmaxfull, lmaxfull); + imaxjmaxfull = imaxfull * jmaxfull; + imaxjmaxkmaxfull = imaxfull * jmaxfull * kmaxfull; + + ioffset = outstart[ncinfile->varndims[v] - 0 - 1]; + if (ncinfile->varndims[v] > 1) + joffset = outstart[ncinfile->varndims[v] - 1 - 1]; + else + joffset = 0; + if (ncinfile->varndims[v] > 2) + koffset = outstart[ncinfile->varndims[v] - 2 - 1]; + else + koffset = 0; + if (ncinfile->varndims[v] > 3) + loffset = outstart[ncinfile->varndims[v] - 3 - 1]; + else + loffset = 0; + if (varrecdim != (-1)) + { + switch (ncinfile->varndims[v]) + { + case 1: + ioffset = 0; + break; + case 2: + joffset = 0; + break; + case 3: + koffset = 0; + break; + case 4: + loffset = 0; + break; + } + } + if (verbose > 1) + printf (" ioffset=%lld joffset=%lld koffset=%lld " + "loffset=%lld\n", + ioffset, joffset, koffset, loffset); + switch (ncinfile->datatype[v]) + { + case NC_BYTE: + case NC_CHAR: + if (verbose > 1) + printf (" start copying byte/char\n"); + b = 0; + for (l = 0; l < lmax; l++) + for (k = 0; k < kmax; k++) + for (j = 0; j < jmax; j++) + for (i = 0; i < imax; i++) + { + offset = (i + ioffset) + (j + joffset) * imaxfull + + (k + koffset) * imaxjmaxfull + + (l + loffset) * imaxjmaxkmaxfull; + *((unsigned char *)(varbuf[(r % (*bf))][v]) + offset) + = *((unsigned char *)values + (b++)); + } + if (verbose > 1) + printf (" end copying byte/char\n"); + break; + case NC_SHORT: + if (verbose > 1) + printf (" start copying short\n"); + b = 0; + for (l = 0; l < lmax; l++) + for (k = 0; k < kmax; k++) + for (j = 0; j < jmax; j++) + for (i = 0; i < imax; i++) + { + offset = (i + ioffset) + (j + joffset) * imaxfull + + (k + koffset) * imaxjmaxfull + + (l + loffset) * imaxjmaxkmaxfull; + *((short *)(varbuf[(r % (*bf))][v]) + offset) + = *((short *)values + (b++)); + } + if (verbose > 1) + printf (" end copying short\n"); + break; + case NC_INT: + if (verbose > 1) + printf (" start copying int\n"); + b = 0; + for (l = 0; l < lmax; l++) + for (k = 0; k < kmax; k++) + for (j = 0; j < jmax; j++) + for (i = 0; i < imax; i++) + { + offset = (i + ioffset) + (j + joffset) * imaxfull + + (k + koffset) * imaxjmaxfull + + (l + loffset) * imaxjmaxkmaxfull; + *((int *)(varbuf[(r % (*bf))][v]) + offset) + = *((int *)values + (b++)); + } + if (verbose > 1) + printf (" end copying int\n"); + break; + case NC_FLOAT: + if (verbose > 1) + printf (" start copying float\n"); + b = 0; + for (l = 0; l < lmax; l++) + for (k = 0; k < kmax; k++) + for (j = 0; j < jmax; j++) + for (i = 0; i < imax; i++) + { + offset = (i + ioffset) + (j + joffset) * imaxfull + + (k + koffset) * imaxjmaxfull + + (l + loffset) * imaxjmaxkmaxfull; + *((float *)(varbuf[(r % (*bf))][v]) + offset) + = *((float *)values + (b++)); + } + if (verbose > 1) + printf (" end copying float\n"); + break; + case NC_DOUBLE: + if (verbose > 1) + printf (" start copying double\n"); + b = 0; + for (l = 0; l < lmax; l++) + for (k = 0; k < kmax; k++) + for (j = 0; j < jmax; j++) + for (i = 0; i < imax; i++) + { + offset = (i + ioffset) + (j + joffset) * imaxfull + + (k + koffset) * imaxjmaxfull + + (l + loffset) * imaxjmaxkmaxfull; + *((double *)(varbuf[(r % (*bf))][v]) + offset) + = *((double *)values + (b++)); + } + if (verbose > 1) + printf (" end copying double\n"); + break; + } + } + + /* Deallocate the decomposed variable's buffer */ + if (values != NULL) + free (values); + } + first = 0; + return (0); + } + + /* Write all the buffered decomposed variables to the output file */ + int + flush_decomp (struct fileinfo *ncoutfile, int nfiles, int r, int bf, + unsigned char verbose) + { + int v, d; /* Loop variable */ + long outstart[MAX_NC_DIMS]; /* Data array sizes */ + long count[MAX_NC_DIMS]; /* " */ + int varrecdim; /* Position of a variable's record dimension */ + + if (verbose > 1) + { + printf (" nvars=%d\n", ncoutfile->nvars); + } + + /* Write out all the decomposed variables */ + for (v = 0; v < ncoutfile->nvars; v++) + { + if (ncoutfile->vardecomp[v] == 0) + continue; + if (verbose > 1) + printf (" v=%d (%s)\n", v, ncoutfile->varname[v]); + varrecdim = (-1); + for (d = 0; d < ncoutfile->varndims[v]; d++) + { + outstart[d] = 0; + if (ncoutfile->vardim[v][d] == ncoutfile->recdim) + { + count[d] = 1; + varrecdim = d; + } + else + { + count[d] = ncoutfile->dimfullsize[ncoutfile->vardim[v][d]]; + } + if (verbose > 1) + printf (" d=%d: outstart=%ld count=%ld\n", d, outstart[d], + count[d]); + } + if (varrecdim != (-1)) + outstart[varrecdim] = r; + if (varrecdim == (-1) && r > 0) + continue; + if (verbose > 1) + printf (" writing to disk\n"); + if (ncvarput (ncoutfile->ncfid, v, outstart, count, varbuf[(r % bf)][v]) + == (-1)) + { + fprintf (stderr, "Error: cannot write variable \"%d\"'s values!\n", + v); + return (1); + } + } + return (0); + } \ No newline at end of file diff --git a/fms2_io/netcdf_io.F90 b/fms2_io/netcdf_io.F90 index b76042900a..10368ff30f 100644 --- a/fms2_io/netcdf_io.F90 +++ b/fms2_io/netcdf_io.F90 @@ -167,6 +167,12 @@ module netcdf_io_mod real(kind=r8_kind) :: missing_val !< Unpacked missing value for a variable. endtype Valid_t +! A linked list of file paths that need to be combined. +type, public :: file_path_list + character(len=256) :: path + type(file_path_list), pointer :: next => null() +end type file_path_list +type(file_path_list), pointer :: files_to_combine => null() public :: netcdf_io_init public :: netcdf_file_open @@ -240,6 +246,7 @@ module netcdf_io_mod public :: write_restart_bc public :: read_restart_bc public :: flush_file +public :: files_to_combine !> @ingroup netcdf_io_mod interface netcdf_add_restart_variable @@ -691,6 +698,26 @@ subroutine netcdf_file_close(fileobj) integer :: err integer :: i + type(file_path_list), pointer :: current + + ! Append the file to the list of files to combine if it is a partition of a netcdf file. + if ( (.not. fileobj%is_readonly) .and. fileobj%is_root .and. len_trim(fileobj%path) > 7) then + if (fileobj%path(len_trim(fileobj%path)-7:len_trim(fileobj%path)-4) == ".nc.") then + if (.not. associated(files_to_combine)) then + allocate(files_to_combine) + call string_copy(files_to_combine%path, trim(fileobj%path)) + else + current => files_to_combine + do while (associated(current%next)) + current => current%next + enddo + allocate(current%next) + call string_copy(current%next%path, trim(fileobj%path)) + endif + else if (fileobj%path(len_trim(fileobj%path)-2:len_trim(fileobj%path)) /= ".nc") then + call error("netcdf_file_close: Encountered unexpected netcdf file suffix: "//trim(fileobj%path)) + endif + endif if (fileobj%is_root) then err = nf90_close(fileobj%ncid) From 0bc2cef0e2de95610f43034bcdc27d640d53e1f6 Mon Sep 17 00:00:00 2001 From: alperaltuntas Date: Mon, 17 Feb 2025 08:14:23 -0700 Subject: [PATCH 2/9] reme existing outfile, if an old version exists --- diag_manager/mppnccombine.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/diag_manager/mppnccombine.c b/diag_manager/mppnccombine.c index 42a414fd9c..213902908d 100644 --- a/diag_manager/mppnccombine.c +++ b/diag_manager/mppnccombine.c @@ -377,6 +377,11 @@ char** find_files(const char* pattern, int* count) { argv[2 + i] = infiles_array[i]; } + // if an old version exists, remove the output file + if (access(outfile, F_OK) == 0) { + remove(outfile); + } + // call the main mppnccombine function iret = main_(argc, argv); From 79131c0202118c538cbc2c0f76bc412aad073209 Mon Sep 17 00:00:00 2001 From: alperaltuntas Date: Mon, 17 Feb 2025 17:48:55 -0700 Subject: [PATCH 3/9] first complete version of automated file combiner --- diag_manager/diag_manager.F90 | 88 +++++++++++++++++++++++++++++------ diag_manager/diag_output.F90 | 2 + diag_manager/mppnccombine.c | 55 ++++++++++++++++++---- fms2_io/netcdf_io.F90 | 46 +++++++++++------- 4 files changed, 151 insertions(+), 40 deletions(-) diff --git a/diag_manager/diag_manager.F90 b/diag_manager/diag_manager.F90 index 06801bb4ac..fd21e094b4 100644 --- a/diag_manager/diag_manager.F90 +++ b/diag_manager/diag_manager.F90 @@ -239,7 +239,7 @@ MODULE diag_manager_mod USE fms_diag_outfield_mod, ONLY: fmsDiagOutfieldIndex_type, fmsDiagOutfield_type USE fms_diag_fieldbuff_update_mod, ONLY: fieldbuff_update, fieldbuff_copy_missvals, & & fieldbuff_copy_fieldvals - use netcdf_io_mod, ONLY: file_path_list, files_to_combine + use netcdf_io_mod, ONLY: filepath_list_type, partitioned_global_files, partitioned_section_files #ifdef use_netCDF USE netcdf, ONLY: NF90_INT, NF90_FLOAT, NF90_CHAR @@ -252,7 +252,7 @@ MODULE diag_manager_mod !---------- use iso_fortran_env, only: compiler_version -use iso_c_binding, only : c_int, c_char, c_ptr, c_null_ptr, c_null_char +use iso_c_binding, only : c_int, c_char, c_ptr, c_null_ptr, c_null_char, c_new_line use, intrinsic :: iso_c_binding, only: c_int, c_char IMPLICIT NONE @@ -384,6 +384,18 @@ function exec_mppnccombine(outfile, infiles) bind(C) character(kind=c_char) :: infiles integer(c_int) :: exec_mppnccombine endfunction exec_mppnccombine + function get_num_files(pattern) bind(C) + use, intrinsic :: iso_c_binding, only: c_int, c_char + implicit none + character(kind=c_char) :: pattern + integer(c_int) :: get_num_files + endfunction get_num_files + function smallest_pix_suffix(pattern) bind(C) + use, intrinsic :: iso_c_binding, only: c_int, c_char + implicit none + character(kind=c_char) :: pattern + integer(c_int) :: smallest_pix_suffix + endfunction smallest_pix_suffix endinterface !> @addtogroup diag_manager_mod @@ -3718,25 +3730,71 @@ SUBROUTINE diag_manager_end(time) END SUBROUTINE diag_manager_end subroutine combine_files() - integer(c_int) :: ireturn - type(file_path_list), pointer :: current + integer(c_int) :: pix ! The IO PE index within the set of IO PEs. + integer(c_int) :: smallest_pix ! The smallest IO PE index of the set of IO PEs writing the current section file. + integer(c_int) :: ireturn ! Return code from mppnccombine + integer(c_int) :: niopes ! Number of IO PEs participating in writing of global files + integer :: f + type(filepath_list_type), pointer :: current character(len=:), allocatable :: filepath character(kind=c_char, len=256) :: outfile, infiles + integer :: stdout_unit + + stdout_unit = stdout() + niopes = 0 + f = 0 + + ! Part 1 : Global diag files + ! loop through global diagnostic files to combine them in a round-robin fashion + ! where each IO PE combines one file based on the IO PE index (pix) and the file index (f) + current => partitioned_global_files + do while (associated(current)) + filepath = trim(adjustl(current%path)) + outfile = filepath(1:len(filepath)-5) // c_null_char + infiles = filepath(1:len(filepath)-5) // ".*" // c_null_char + + ! get the number of files to combine (for the first file only). The number of files is the same for all global files. + if (niopes == 0) niopes = get_num_files(infiles) + + ! Read the IO PE index (pix) from the file suffix (e.g., 0000, 0001, etc.) + read(filepath(len(filepath)-3:len(filepath)),*) pix + !print *, "pix = ", pix, " filepath = ", trim(filepath), " niopes = ", niopes + + if (mod(f, niopes) == pix) then + !write(stdout_unit,*) 'Combining file' // trim(outfile) + ireturn = exec_mppnccombine(outfile, infiles) + if (ireturn /= 0) call error_mesg('diag_manager_mod::combine_files', 'mppnccombine failed', FATAL) + end if + + current => current%next + f = f + 1 + end do + + ! Part 2 : Section diag files + current => partitioned_section_files + do while (associated(current)) + filepath = trim(adjustl(current%path)) + outfile = filepath(1:len(filepath)-5) // c_null_char + infiles = filepath(1:len(filepath)-5) // ".*" // c_null_char + + ! Read the IO PE index (pix) from the file suffix (e.g., 0000, 0001, etc.) + read(filepath(len(filepath)-3:len(filepath)),*) pix - ! loop through files to combine: - if (mpp_pe() == mpp_root_pe()) then - current => files_to_combine - do while (associated(current)) - filepath = trim(adjustl(current%path)) - outfile = filepath(1:len(filepath)-5) // c_null_char - infiles = filepath(1:len(filepath)-5) // ".*" // c_null_char + ! get the smallest IO PE index of the set of IO PEs writing the current section file + smallest_pix = smallest_pix_suffix(infiles) + + print *, "pix = ", pix, " filepath = ", trim(filepath), " smallest_pix = ", smallest_pix, pix == smallest_pix + + if (pix == smallest_pix) then + !write(stdout_unit,*) 'Combining file' // trim(outfile) ireturn = exec_mppnccombine(outfile, infiles) if (ireturn /= 0) call error_mesg('diag_manager_mod::combine_files', 'mppnccombine failed', FATAL) - current => current%next - end do - end if + end if + + current => current%next + end do - end subroutine combine_files + end subroutine combine_files !> @brief Replaces diag_manager_end; close just one file: files(file) SUBROUTINE closing_file(file, time) diff --git a/diag_manager/diag_output.F90 b/diag_manager/diag_output.F90 index 912561421d..621f498e83 100644 --- a/diag_manager/diag_output.F90 +++ b/diag_manager/diag_output.F90 @@ -127,12 +127,14 @@ SUBROUTINE diag_output_init (file_name, file_title, file_unit,& !> Check if there is an io_domain iF ( associated(mpp_get_io_domain(domain)) ) then fileob => fileobj + fileob%extent_type = 1 if (.not.check_if_open(fileob)) call open_check(open_file(fileobj, trim(file_name)//".nc", "overwrite", & domain, is_restart=.false.)) fnum_domain = "2d" ! 2d domain file_unit = 2 elSE !< No io domain, so every core is going to write its own file. fileob => fileobjND + fileob%extent_type = 2 mype = mpp_pe() write(mype_string,'(I0.4)') mype if (.not.check_if_open(fileob)) then diff --git a/diag_manager/mppnccombine.c b/diag_manager/mppnccombine.c index 213902908d..ca07fc3018 100644 --- a/diag_manager/mppnccombine.c +++ b/diag_manager/mppnccombine.c @@ -141,6 +141,7 @@ #include #include #include + #include #ifndef MAX_BF #define MAX_BF 100 /* maximum blocking factor */ @@ -185,7 +186,6 @@ unsigned char, int, int, int); int process_vars (struct fileinfo *, struct fileinfo *, unsigned char, int *, int *, int *, int, int, int, unsigned char, unsigned char); - int flush_decomp (struct fileinfo *, int, int, int, unsigned char); void print_debug (struct fileinfo *, unsigned char); char *nc_type_to_str (nc_type); @@ -248,15 +248,31 @@ int flush_decomp (struct fileinfo *, int, int, int, unsigned char); printf ("%.0f\n", ceil ((float)estimated_maxrss / (1024 * 1024))); return; } - -int iama_function(int *somearg, char *cdata, int clen) { - int i; - for( i=0; i='A' && cdata[i]<='Z') - cdata[i] = cdata[i]+*somearg; + +int get_num_files(char* pattern) { + /* This function uses the glob library to find files matching the given pattern. + * It returns the number of matched files. + * -aa + */ + glob_t globbuf; + + // Initialize glob result structure + memset(&globbuf, 0, sizeof(globbuf)); + + // Perform the glob operation + int result = glob(pattern, GLOB_TILDE, NULL, &globbuf); + if (result != 0) { + // No matches or glob error + globfree(&globbuf); + return 0; } - *somearg = -(*somearg); // negate it - return(i); + + int count = (int)globbuf.gl_pathc; + + // Free the memory used by glob + globfree(&globbuf); + + return count; } char** find_files(const char* pattern, int* count) { @@ -315,6 +331,27 @@ char** find_files(const char* pattern, int* count) { return files; } + int smallest_pix_suffix(char* pattern){ + /* This function finds the smallest suffix of the given pattern that matches + * a file. It returns the smallest suffix as an integer. + * -aa + */ + int smallest_suffix = INT_MAX; + int count = 0; + char** files = find_files(pattern, &count); + if (files == NULL || count == 0) { + fprintf(stderr, "Error: no files matched or an error occurred\n"); + return 1; + } + for (int i = 0; i < count; i++) { + int suffix = atoi(files[i] + strlen(files[i]) - 4); + if (suffix < smallest_suffix) { + smallest_suffix = suffix; + } + } + return smallest_suffix; + } + int exec_mppnccombine(char *outfile, char *infiles){ /* A wrapper function for main_ that takes a single string of input files * and a single string of output file. This function is called from diag_manager.F90 diff --git a/fms2_io/netcdf_io.F90 b/fms2_io/netcdf_io.F90 index 10368ff30f..43e1144caa 100644 --- a/fms2_io/netcdf_io.F90 +++ b/fms2_io/netcdf_io.F90 @@ -149,7 +149,7 @@ module netcdf_io_mod character (len=20) :: time_name type(dimension_information) :: bc_dimensions ! null() -end type file_path_list -type(file_path_list), pointer :: files_to_combine => null() + type(filepath_list_type), pointer :: next => null() +end type filepath_list_type +type(filepath_list_type), pointer :: partitioned_global_files => null() +type(filepath_list_type), pointer :: partitioned_section_files => null() public :: netcdf_io_init public :: netcdf_file_open @@ -246,7 +247,8 @@ module netcdf_io_mod public :: write_restart_bc public :: read_restart_bc public :: flush_file -public :: files_to_combine +public :: partitioned_global_files +public :: partitioned_section_files !> @ingroup netcdf_io_mod interface netcdf_add_restart_variable @@ -690,6 +692,23 @@ function netcdf_file_open(fileobj, path, mode, nc_format, pelist, is_restart, do end function netcdf_file_open +subroutine append_to_filepath_list(fileobj, filepath_list) + class(FmsNetcdfFile_t), intent(in) :: fileobj !< File object. + type(filepath_list_type), pointer :: filepath_list !< File path list. + type(filepath_list_type), pointer :: current + + if (.not. associated(filepath_list)) then + allocate(filepath_list) + call string_copy(filepath_list%path, trim(fileobj%path)) + else + current => filepath_list + do while (associated(current%next)) + current => current%next + enddo + allocate(current%next) + call string_copy(current%next%path, trim(fileobj%path)) + endif +end subroutine append_to_filepath_list !> @brief Close a netcdf file. subroutine netcdf_file_close(fileobj) @@ -698,21 +717,16 @@ subroutine netcdf_file_close(fileobj) integer :: err integer :: i - type(file_path_list), pointer :: current ! Append the file to the list of files to combine if it is a partition of a netcdf file. if ( (.not. fileobj%is_readonly) .and. fileobj%is_root .and. len_trim(fileobj%path) > 7) then if (fileobj%path(len_trim(fileobj%path)-7:len_trim(fileobj%path)-4) == ".nc.") then - if (.not. associated(files_to_combine)) then - allocate(files_to_combine) - call string_copy(files_to_combine%path, trim(fileobj%path)) + if (fileobj%extent_type == 1) then + call append_to_filepath_list(fileobj, partitioned_global_files) + else if (fileobj%extent_type == 2) then + call append_to_filepath_list(fileobj, partitioned_section_files) else - current => files_to_combine - do while (associated(current%next)) - current => current%next - enddo - allocate(current%next) - call string_copy(current%next%path, trim(fileobj%path)) + call error("netcdf_file_close: Encountered unexpected extent type: "//trim(fileobj%path)) endif else if (fileobj%path(len_trim(fileobj%path)-2:len_trim(fileobj%path)) /= ".nc") then call error("netcdf_file_close: Encountered unexpected netcdf file suffix: "//trim(fileobj%path)) From 9fb9338ff65e54f12ecfdd5461a6801dd83bba2e Mon Sep 17 00:00:00 2001 From: alperaltuntas Date: Tue, 18 Feb 2025 19:41:43 -0700 Subject: [PATCH 4/9] set extent_type to 1 for fileobjects opened with open_domain_file --- diag_manager/diag_output.F90 | 6 +++--- fms2_io/fms_netcdf_domain_io.F90 | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/diag_manager/diag_output.F90 b/diag_manager/diag_output.F90 index 621f498e83..e6f247626d 100644 --- a/diag_manager/diag_output.F90 +++ b/diag_manager/diag_output.F90 @@ -42,7 +42,7 @@ MODULE diag_output_mod & get_domainUG, get_diag_axis_name USE diag_data_mod, ONLY: pack_size, diag_fieldtype, diag_global_att_type, CMOR_MISSING_VALUE, diag_atttype, files USE time_manager_mod, ONLY: get_calendar_type, valid_calendar_types - USE fms_mod, ONLY: error_mesg, mpp_pe, write_version_number, fms_error_handler, FATAL, note + USE fms_mod, ONLY: error_mesg, write_version_number, fms_error_handler, FATAL, note #ifdef use_netCDF USE netcdf, ONLY: NF90_INT, NF90_FLOAT, NF90_CHAR @@ -127,14 +127,14 @@ SUBROUTINE diag_output_init (file_name, file_title, file_unit,& !> Check if there is an io_domain iF ( associated(mpp_get_io_domain(domain)) ) then fileob => fileobj - fileob%extent_type = 1 + fileob%extent_type = 1 ! Extent type 1 is for domain decomposed files (needed for auto combine) if (.not.check_if_open(fileob)) call open_check(open_file(fileobj, trim(file_name)//".nc", "overwrite", & domain, is_restart=.false.)) fnum_domain = "2d" ! 2d domain file_unit = 2 elSE !< No io domain, so every core is going to write its own file. fileob => fileobjND - fileob%extent_type = 2 + fileob%extent_type = 2 ! Extent type 2 is for section files (needed for auto combine) mype = mpp_pe() write(mype_string,'(I0.4)') mype if (.not.check_if_open(fileob)) then diff --git a/fms2_io/fms_netcdf_domain_io.F90 b/fms2_io/fms_netcdf_domain_io.F90 index 57e61800c5..92a5b69e2e 100644 --- a/fms2_io/fms_netcdf_domain_io.F90 +++ b/fms2_io/fms_netcdf_domain_io.F90 @@ -383,6 +383,7 @@ function open_domain_file(fileobj, path, mode, domain, nc_format, is_restart, do allocate(pelist(pelist_size)) call mpp_get_pelist(io_domain, pelist) fileobj%adjust_indices = .true. !Set the default to true + fileobj%extent_type = 1 ! Extent type 1 is for domain decomposed files (needed for auto combine) !Open the distibuted files. success = netcdf_file_open(fileobj, distributed_filepath, mode, nc_format, pelist, & From f3235f29d4e5b78695b71b78fa0f68fedd267bc7 Mon Sep 17 00:00:00 2001 From: alperaltuntas Date: Sat, 22 Feb 2025 13:55:37 -0700 Subject: [PATCH 5/9] use select(type) block to determine netcdf file type (domain or not) and remove existing combined file when root pe opens a partitioned one. --- diag_manager/diag_output.F90 | 2 -- diag_manager/mppnccombine.c | 12 ++++++++++++ fms2_io/fms_netcdf_domain_io.F90 | 21 +++++++++++++++++---- fms2_io/netcdf_io.F90 | 14 ++++++-------- 4 files changed, 35 insertions(+), 14 deletions(-) diff --git a/diag_manager/diag_output.F90 b/diag_manager/diag_output.F90 index e6f247626d..eef10ef7e4 100644 --- a/diag_manager/diag_output.F90 +++ b/diag_manager/diag_output.F90 @@ -127,14 +127,12 @@ SUBROUTINE diag_output_init (file_name, file_title, file_unit,& !> Check if there is an io_domain iF ( associated(mpp_get_io_domain(domain)) ) then fileob => fileobj - fileob%extent_type = 1 ! Extent type 1 is for domain decomposed files (needed for auto combine) if (.not.check_if_open(fileob)) call open_check(open_file(fileobj, trim(file_name)//".nc", "overwrite", & domain, is_restart=.false.)) fnum_domain = "2d" ! 2d domain file_unit = 2 elSE !< No io domain, so every core is going to write its own file. fileob => fileobjND - fileob%extent_type = 2 ! Extent type 2 is for section files (needed for auto combine) mype = mpp_pe() write(mype_string,'(I0.4)') mype if (.not.check_if_open(fileob)) then diff --git a/diag_manager/mppnccombine.c b/diag_manager/mppnccombine.c index ca07fc3018..7e99484559 100644 --- a/diag_manager/mppnccombine.c +++ b/diag_manager/mppnccombine.c @@ -275,6 +275,18 @@ int get_num_files(char* pattern) { return count; } +int delete_file(char* filename) { + /* This function deletes the given file. + * Returns 0 on success, 1 on failure. + * -aa + */ + if (remove(filename) != 0) { + fprintf(stderr, "Error: unable to delete file %s\n", filename); + return 1; + } + return 0; +} + char** find_files(const char* pattern, int* count) { /* This function uses the glob library to find files matching the given pattern. * It returns an array of file-path strings and sets the count to the number of diff --git a/fms2_io/fms_netcdf_domain_io.F90 b/fms2_io/fms_netcdf_domain_io.F90 index 92a5b69e2e..b81dc9b70e 100644 --- a/fms2_io/fms_netcdf_domain_io.F90 +++ b/fms2_io/fms_netcdf_domain_io.F90 @@ -29,6 +29,7 @@ module fms_netcdf_domain_io_mod use fms_io_utils_mod use netcdf_io_mod use platform_mod +use iso_c_binding, only: c_null_char implicit none private @@ -107,6 +108,15 @@ module fms_netcdf_domain_io_mod module procedure compute_global_checksum_4d end interface compute_global_checksum +interface + function delete_file(filename) bind(c) + use, intrinsic :: iso_c_binding, only: c_int, c_char + implicit none + character(kind=c_char) :: filename(*) + integer(kind=c_int) :: delete_file + end function delete_file +end interface + !> @addtogroup fms_netcdf_domain_io_mod !> @{ @@ -383,19 +393,22 @@ function open_domain_file(fileobj, path, mode, domain, nc_format, is_restart, do allocate(pelist(pelist_size)) call mpp_get_pelist(io_domain, pelist) fileobj%adjust_indices = .true. !Set the default to true - fileobj%extent_type = 1 ! Extent type 1 is for domain decomposed files (needed for auto combine) !Open the distibuted files. success = netcdf_file_open(fileobj, distributed_filepath, mode, nc_format, pelist, & is_restart, dont_add_res_to_filename) if (string_compare(mode, "read", .true.) .or. string_compare(mode, "append", .true.)) then if (success) then - if (.not. string_compare(distributed_filepath, combined_filepath)) then + if (mpp_pe() == mpp_root_pe() .and. .not. string_compare(distributed_filepath, combined_filepath)) then success2 = netcdf_file_open(fileobj2, combined_filepath, mode, nc_format, pelist, & is_restart, dont_add_res_to_filename) if (success2) then - call error("The domain decomposed file:"//trim(fileobj%path)// & - & " contains both combined (*.nc) and distributed files (*.nc.XXXX).") + print *, "The domain decomposed file:"//trim(fileobj%path)// & + & " contains both combined (*.nc) and distributed files (*.nc.XXXX)."// & + & " Deleting the combined file (must be left over from a previous run)." + if (delete_file(trim(combined_filepath)//c_null_char) /= 0) then + call error("Failed to delete the combined file:"//trim(combined_filepath)//c_null_char) + endif endif endif else diff --git a/fms2_io/netcdf_io.F90 b/fms2_io/netcdf_io.F90 index 43e1144caa..099382444a 100644 --- a/fms2_io/netcdf_io.F90 +++ b/fms2_io/netcdf_io.F90 @@ -149,7 +149,6 @@ module netcdf_io_mod character (len=20) :: time_name type(dimension_information) :: bc_dimensions ! 7) then if (fileobj%path(len_trim(fileobj%path)-7:len_trim(fileobj%path)-4) == ".nc.") then - if (fileobj%extent_type == 1) then - call append_to_filepath_list(fileobj, partitioned_global_files) - else if (fileobj%extent_type == 2) then - call append_to_filepath_list(fileobj, partitioned_section_files) - else - call error("netcdf_file_close: Encountered unexpected extent type: "//trim(fileobj%path)) - endif + select type(fileobj) + type is (FmsNetcdfFile_t) + call append_to_filepath_list(fileobj, partitioned_section_files) + class default ! FmsNetcdfDomainFile_t + call append_to_filepath_list(fileobj, partitioned_global_files) + end select else if (fileobj%path(len_trim(fileobj%path)-2:len_trim(fileobj%path)) /= ".nc") then call error("netcdf_file_close: Encountered unexpected netcdf file suffix: "//trim(fileobj%path)) endif From 85334222b9b85bc794dc798fc6770939a89e58db Mon Sep 17 00:00:00 2001 From: alperaltuntas Date: Tue, 4 Mar 2025 13:27:21 -0700 Subject: [PATCH 6/9] introduce auto_merge_nc nml parameter and simplify mppnccombine function interfaces --- diag_manager/diag_data.F90 | 1 + diag_manager/diag_manager.F90 | 32 +++++----- diag_manager/mppnccombine.c | 100 +++++++++++-------------------- fms2_io/fms_netcdf_domain_io.F90 | 9 ++- 4 files changed, 57 insertions(+), 85 deletions(-) diff --git a/diag_manager/diag_data.F90 b/diag_manager/diag_data.F90 index 1edaedc6b7..58a85bce91 100644 --- a/diag_manager/diag_data.F90 +++ b/diag_manager/diag_data.F90 @@ -335,6 +335,7 @@ MODULE diag_data_mod !! routine is called with the optional time_init parameter. LOGICAL :: use_mpp_io = .false. !< false is fms2_io (default); true is mpp_io LOGICAL :: use_refactored_send = .false. !< Namelist flag to use refactored send_data math funcitons. + LOGICAL :: auto_merge_nc = .false. !< Namelist flag to automatically merge netCDF files. ! diff --git a/diag_manager/diag_manager.F90 b/diag_manager/diag_manager.F90 index fd21e094b4..920c90305c 100644 --- a/diag_manager/diag_manager.F90 +++ b/diag_manager/diag_manager.F90 @@ -230,7 +230,7 @@ MODULE diag_manager_mod & use_cmor, issue_oor_warnings, oor_warnings_fatal, oor_warning, pack_size,& & max_out_per_in_field, flush_nc_files, region_out_use_alt_value, max_field_attributes, output_field_type,& & max_file_attributes, max_axis_attributes, prepend_date, DIAG_FIELD_NOT_FOUND, diag_init_time, diag_data_init,& - & use_mpp_io, use_refactored_send + & use_mpp_io, use_refactored_send, auto_merge_nc USE diag_data_mod, ONLY: fileobj, fileobjU, fnum_for_domain, fileobjND USE diag_table_mod, ONLY: parse_diag_table USE diag_output_mod, ONLY: get_diag_global_att, set_diag_global_att @@ -377,19 +377,18 @@ MODULE diag_manager_mod ! ----- interface to the C function ----- interface - function exec_mppnccombine(outfile, infiles) bind(C) + function exec_mppnccombine(outfile) bind(C) use, intrinsic :: iso_c_binding, only: c_int, c_char implicit none character(kind=c_char) :: outfile - character(kind=c_char) :: infiles integer(c_int) :: exec_mppnccombine endfunction exec_mppnccombine - function get_num_files(pattern) bind(C) + function num_partitioned_files(outfile) bind(C) use, intrinsic :: iso_c_binding, only: c_int, c_char implicit none - character(kind=c_char) :: pattern - integer(c_int) :: get_num_files - endfunction get_num_files + character(kind=c_char) :: outfile + integer(c_int) :: num_partitioned_files + endfunction num_partitioned_files function smallest_pix_suffix(pattern) bind(C) use, intrinsic :: iso_c_binding, only: c_int, c_char implicit none @@ -3721,7 +3720,7 @@ SUBROUTINE diag_manager_end(time) call mpp_sync() ! combine partitioned netcdf files into single file - call combine_files() + if ( auto_merge_nc ) call combine_files() if (allocated(fileobjU)) deallocate(fileobjU) if (allocated(fileobj)) deallocate(fileobj) @@ -3737,7 +3736,7 @@ subroutine combine_files() integer :: f type(filepath_list_type), pointer :: current character(len=:), allocatable :: filepath - character(kind=c_char, len=256) :: outfile, infiles + character(kind=c_char, len=256) :: outfile integer :: stdout_unit stdout_unit = stdout() @@ -3751,18 +3750,16 @@ subroutine combine_files() do while (associated(current)) filepath = trim(adjustl(current%path)) outfile = filepath(1:len(filepath)-5) // c_null_char - infiles = filepath(1:len(filepath)-5) // ".*" // c_null_char ! get the number of files to combine (for the first file only). The number of files is the same for all global files. - if (niopes == 0) niopes = get_num_files(infiles) + if (niopes == 0) niopes = num_partitioned_files(outfile) ! Read the IO PE index (pix) from the file suffix (e.g., 0000, 0001, etc.) read(filepath(len(filepath)-3:len(filepath)),*) pix - !print *, "pix = ", pix, " filepath = ", trim(filepath), " niopes = ", niopes if (mod(f, niopes) == pix) then !write(stdout_unit,*) 'Combining file' // trim(outfile) - ireturn = exec_mppnccombine(outfile, infiles) + ireturn = exec_mppnccombine(outfile) if (ireturn /= 0) call error_mesg('diag_manager_mod::combine_files', 'mppnccombine failed', FATAL) end if @@ -3775,19 +3772,18 @@ subroutine combine_files() do while (associated(current)) filepath = trim(adjustl(current%path)) outfile = filepath(1:len(filepath)-5) // c_null_char - infiles = filepath(1:len(filepath)-5) // ".*" // c_null_char ! Read the IO PE index (pix) from the file suffix (e.g., 0000, 0001, etc.) read(filepath(len(filepath)-3:len(filepath)),*) pix ! get the smallest IO PE index of the set of IO PEs writing the current section file - smallest_pix = smallest_pix_suffix(infiles) + smallest_pix = smallest_pix_suffix(outfile) - print *, "pix = ", pix, " filepath = ", trim(filepath), " smallest_pix = ", smallest_pix, pix == smallest_pix + !print *, "pix = ", pix, " filepath = ", trim(filepath), " smallest_pix = ", smallest_pix, pix == smallest_pix if (pix == smallest_pix) then !write(stdout_unit,*) 'Combining file' // trim(outfile) - ireturn = exec_mppnccombine(outfile, infiles) + ireturn = exec_mppnccombine(outfile) if (ireturn /= 0) call error_mesg('diag_manager_mod::combine_files', 'mppnccombine failed', FATAL) end if @@ -3892,7 +3888,7 @@ SUBROUTINE diag_manager_init(diag_model_subset, time_init, err_msg) & max_num_axis_sets, max_files, use_cmor, issue_oor_warnings,& & oor_warnings_fatal, max_out_per_in_field, flush_nc_files, region_out_use_alt_value, max_field_attributes,& & max_file_attributes, max_axis_attributes, prepend_date, use_mpp_io, field_log_separator,& - & use_refactored_send + & use_refactored_send, auto_merge_nc ! If the module was already initialized do nothing IF ( module_is_initialized ) RETURN diff --git a/diag_manager/mppnccombine.c b/diag_manager/mppnccombine.c index 7e99484559..4253db3a49 100644 --- a/diag_manager/mppnccombine.c +++ b/diag_manager/mppnccombine.c @@ -249,13 +249,28 @@ int flush_decomp (struct fileinfo *, int, int, int, unsigned char); return; } -int get_num_files(char* pattern) { +int delete_file(char* filename) { + /* This function deletes the given file. + * Returns 0 on success, 1 on failure. + * -aa + */ + if (remove(filename) != 0) { + fprintf(stderr, "Error: unable to delete file %s\n", filename); + return 1; + } + return 0; +} + +int num_partitioned_files(const char* filepath) { /* This function uses the glob library to find files matching the given pattern. * It returns the number of matched files. * -aa */ + char pattern[1024]; glob_t globbuf; + snprintf(pattern, sizeof(pattern), "%s.[0-9][0-9][0-9][0-9]", filepath); + // Initialize glob result structure memset(&globbuf, 0, sizeof(globbuf)); @@ -275,19 +290,7 @@ int get_num_files(char* pattern) { return count; } -int delete_file(char* filename) { - /* This function deletes the given file. - * Returns 0 on success, 1 on failure. - * -aa - */ - if (remove(filename) != 0) { - fprintf(stderr, "Error: unable to delete file %s\n", filename); - return 1; - } - return 0; -} - -char** find_files(const char* pattern, int* count) { +char** find_partitioned_files(const char* filepath, int* count) { /* This function uses the glob library to find files matching the given pattern. * It returns an array of file-path strings and sets the count to the number of * matched files. @@ -295,8 +298,11 @@ char** find_files(const char* pattern, int* count) { * Returns NULL if no files match or an error occurs. * -aa */ + char pattern[1024]; glob_t globbuf; + snprintf(pattern, sizeof(pattern), "%s.[0-9][0-9][0-9][0-9]", filepath); + // Initialize glob result structure memset(&globbuf, 0, sizeof(globbuf)); @@ -343,14 +349,14 @@ char** find_files(const char* pattern, int* count) { return files; } - int smallest_pix_suffix(char* pattern){ + int smallest_pix_suffix(char* outfile){ /* This function finds the smallest suffix of the given pattern that matches * a file. It returns the smallest suffix as an integer. * -aa */ int smallest_suffix = INT_MAX; int count = 0; - char** files = find_files(pattern, &count); + char** files = find_partitioned_files(outfile, &count); if (files == NULL || count == 0) { fprintf(stderr, "Error: no files matched or an error occurred\n"); return 1; @@ -364,57 +370,24 @@ char** find_files(const char* pattern, int* count) { return smallest_suffix; } - int exec_mppnccombine(char *outfile, char *infiles){ + int exec_mppnccombine(char *outfile){ /* A wrapper function for main_ that takes a single string of input files * and a single string of output file. This function is called from diag_manager.F90 * It expands the input files if a wildcard is given and calls main_. * It returns the return value of main_. * -aa */ - char *infiles_in; // Local copy of infiles argument - char **infiles_array; // Array of strings to hold the list of expanded input files - char **argv; // Array of strings to hold the list of input files - int argc = 0; // Number of arguments to pass to main_ - int file_count = 0; // Number of input files - int iret; // Return value of main_ - - // Get a local copy of infiles argument. This is needed because strtok - // leads to segfault when called on the argument directly. - infiles_in = (char *)malloc(strlen(infiles) + 1); - if (infiles_in == NULL) { - fprintf(stderr, "Error: unable to allocate memory\n"); - return 1; - } - strcpy(infiles_in, infiles); + char **partitioned_files; // Array of strings to hold the list of expanded input files + char **argv; // Array of strings to hold the list of arguments to pass to main_ + int argc = 0; // Number of arguments to pass to main_ + int file_count = 0; // Number of input files + int iret; // Return value of main_ - // if a list of files delimited by space is given, then split it into an array - if (strchr(infiles_in, ' ') != NULL) { - char *p = strtok(infiles_in, " "); - while (p != NULL) { - file_count++; - p = strtok(NULL, " "); - } - infiles_array = (char **)malloc(file_count * sizeof(char *)); - strcpy(infiles_in, infiles); - p = strtok(infiles_in, " "); - for (int i = 0; i < file_count; i++) { - infiles_array[i] = p; - p = strtok(NULL, " "); - } - } - // if a wildcard is given, then expand it into the - else if (strchr(infiles_in, '*') != NULL || strchr(infiles_in, '?') != NULL) { - infiles_array = find_files(infiles_in, &file_count); - if (infiles_array == NULL || file_count == 0) { - fprintf(stderr, "Error: no files matched or an error occurred\n"); - return 1; - } - } - // if a single file is given, then put it into the array - else { - file_count = 1; - infiles_array = (char **)malloc(1 * sizeof(char *)); - infiles_array[0] = infiles_in; + // Expand the input files + partitioned_files = find_partitioned_files(outfile, &file_count); + if (partitioned_files == NULL || file_count == 0) { + fprintf(stderr, "Error: no files matched or an error occurred\n"); + return 1; } argc = 2 + file_count; @@ -423,7 +396,7 @@ char** find_files(const char* pattern, int* count) { argv[0] = "mppnccombine"; argv[1] = outfile; for (int i = 0; i < file_count; i++) { - argv[2 + i] = infiles_array[i]; + argv[2 + i] = partitioned_files[i]; } // if an old version exists, remove the output file @@ -437,13 +410,12 @@ char** find_files(const char* pattern, int* count) { // if successful, remove the partitioned files if (iret == 0) { for (int i = 0; i < file_count; i++) { - remove(infiles_array[i]); + remove(partitioned_files[i]); } } // free memory - free(infiles_array); - free(infiles_in); + free(partitioned_files); free(argv); return iret; diff --git a/fms2_io/fms_netcdf_domain_io.F90 b/fms2_io/fms_netcdf_domain_io.F90 index b81dc9b70e..30900eed3c 100644 --- a/fms2_io/fms_netcdf_domain_io.F90 +++ b/fms2_io/fms_netcdf_domain_io.F90 @@ -363,6 +363,9 @@ function open_domain_file(fileobj, path, mode, domain, nc_format, is_restart, do integer, dimension(:), allocatable :: pelist logical :: success2 type(FmsNetcdfDomainFile_t) :: fileobj2 + integer :: stdoutunit + + stdoutunit = stdout() !Get the path of a "combined" file. io_layout = mpp_get_io_domain_layout(domain) @@ -403,9 +406,9 @@ function open_domain_file(fileobj, path, mode, domain, nc_format, is_restart, do success2 = netcdf_file_open(fileobj2, combined_filepath, mode, nc_format, pelist, & is_restart, dont_add_res_to_filename) if (success2) then - print *, "The domain decomposed file:"//trim(fileobj%path)// & - & " contains both combined (*.nc) and distributed files (*.nc.XXXX)."// & - & " Deleting the combined file (must be left over from a previous run)." + write(stdoutunit, *) "The domain decomposed file:"//trim(fileobj%path)// & + & " contains both combined (*.nc) and distributed files (*.nc.XXXX)."// & + & " Deleting the distributed file (must be left over from a previous run)." if (delete_file(trim(combined_filepath)//c_null_char) /= 0) then call error("Failed to delete the combined file:"//trim(combined_filepath)//c_null_char) endif From 56a24d0b69247cad1f073b626e231969848d186f Mon Sep 17 00:00:00 2001 From: alperaltuntas Date: Mon, 17 Mar 2025 10:40:55 -0600 Subject: [PATCH 7/9] split the identification of files to combine and execution of mppnccombine via a barrier --- diag_manager/diag_manager.F90 | 36 +++++++++++++++++++++++++++-------- diag_manager/mppnccombine.c | 8 ++++---- fms2_io/netcdf_io.F90 | 13 +++++++------ 3 files changed, 39 insertions(+), 18 deletions(-) diff --git a/diag_manager/diag_manager.F90 b/diag_manager/diag_manager.F90 index 920c90305c..715220ac32 100644 --- a/diag_manager/diag_manager.F90 +++ b/diag_manager/diag_manager.F90 @@ -239,7 +239,7 @@ MODULE diag_manager_mod USE fms_diag_outfield_mod, ONLY: fmsDiagOutfieldIndex_type, fmsDiagOutfield_type USE fms_diag_fieldbuff_update_mod, ONLY: fieldbuff_update, fieldbuff_copy_missvals, & & fieldbuff_copy_fieldvals - use netcdf_io_mod, ONLY: filepath_list_type, partitioned_global_files, partitioned_section_files + use netcdf_io_mod, ONLY: filepath_list_type, partitioned_global_files, partitioned_section_files, append_to_filepath_list #ifdef use_netCDF USE netcdf, ONLY: NF90_INT, NF90_FLOAT, NF90_CHAR @@ -3735,6 +3735,7 @@ subroutine combine_files() integer(c_int) :: niopes ! Number of IO PEs participating in writing of global files integer :: f type(filepath_list_type), pointer :: current + type(filepath_list_type), pointer :: files_to_combine ! list of files to combined by this PE character(len=:), allocatable :: filepath character(kind=c_char, len=256) :: outfile integer :: stdout_unit @@ -3746,6 +3747,7 @@ subroutine combine_files() ! Part 1 : Global diag files ! loop through global diagnostic files to combine them in a round-robin fashion ! where each IO PE combines one file based on the IO PE index (pix) and the file index (f) + files_to_combine => null() ! initialize the list of files to be combined by this PE current => partitioned_global_files do while (associated(current)) filepath = trim(adjustl(current%path)) @@ -3754,13 +3756,15 @@ subroutine combine_files() ! get the number of files to combine (for the first file only). The number of files is the same for all global files. if (niopes == 0) niopes = num_partitioned_files(outfile) + if (niopes == -1) then + call error_mesg('diag_manager_mod::combine_files', 'num_partitioned_files failed', FATAL) + end if + ! Read the IO PE index (pix) from the file suffix (e.g., 0000, 0001, etc.) read(filepath(len(filepath)-3:len(filepath)),*) pix - + if (mod(f, niopes) == pix) then - !write(stdout_unit,*) 'Combining file' // trim(outfile) - ireturn = exec_mppnccombine(outfile) - if (ireturn /= 0) call error_mesg('diag_manager_mod::combine_files', 'mppnccombine failed', FATAL) + call append_to_filepath_list(outfile, files_to_combine) end if current => current%next @@ -3779,17 +3783,33 @@ subroutine combine_files() ! get the smallest IO PE index of the set of IO PEs writing the current section file smallest_pix = smallest_pix_suffix(outfile) + if (smallest_pix == -1 ) then + call error_mesg('diag_manager_mod::combine_files', 'smallest_pix_suffix failed', FATAL) + end if + !print *, "pix = ", pix, " filepath = ", trim(filepath), " smallest_pix = ", smallest_pix, pix == smallest_pix if (pix == smallest_pix) then - !write(stdout_unit,*) 'Combining file' // trim(outfile) - ireturn = exec_mppnccombine(outfile) - if (ireturn /= 0) call error_mesg('diag_manager_mod::combine_files', 'mppnccombine failed', FATAL) + call append_to_filepath_list(outfile, files_to_combine) end if current => current%next end do + ! sync all PEs before combining files + call mpp_sync() + + ! Part 3 : Combine all files in the list + if (associated(files_to_combine)) then + current => files_to_combine + do while (associated(current)) + write(stdout_unit,*) ' Combining file ' // trim(outfile) + ireturn = exec_mppnccombine(current%path) + if (ireturn /= 0) call error_mesg('diag_manager_mod::combine_files', 'mppnccombine failed for file ' // trim(current%path), FATAL) + current => current%next + end do + end if + end subroutine combine_files !> @brief Replaces diag_manager_end; close just one file: files(file) diff --git a/diag_manager/mppnccombine.c b/diag_manager/mppnccombine.c index 4253db3a49..72660d24b8 100644 --- a/diag_manager/mppnccombine.c +++ b/diag_manager/mppnccombine.c @@ -279,7 +279,7 @@ int num_partitioned_files(const char* filepath) { if (result != 0) { // No matches or glob error globfree(&globbuf); - return 0; + return -1; // Return -1 to indicate error } int count = (int)globbuf.gl_pathc; @@ -358,8 +358,8 @@ char** find_partitioned_files(const char* filepath, int* count) { int count = 0; char** files = find_partitioned_files(outfile, &count); if (files == NULL || count == 0) { - fprintf(stderr, "Error: no files matched or an error occurred\n"); - return 1; + fprintf(stderr, "Error: no files matched or an error occurred while looking for smallest suffix for %s\n", outfile); + return -1; // Return -1 to indicate error } for (int i = 0; i < count; i++) { int suffix = atoi(files[i] + strlen(files[i]) - 4); @@ -386,7 +386,7 @@ char** find_partitioned_files(const char* filepath, int* count) { // Expand the input files partitioned_files = find_partitioned_files(outfile, &file_count); if (partitioned_files == NULL || file_count == 0) { - fprintf(stderr, "Error: no files matched or an error occurred\n"); + fprintf(stderr, "Error: no files matched or an error occurred while finding partitioned files for %s\n", outfile); return 1; } diff --git a/fms2_io/netcdf_io.F90 b/fms2_io/netcdf_io.F90 index 099382444a..e332d5ba11 100644 --- a/fms2_io/netcdf_io.F90 +++ b/fms2_io/netcdf_io.F90 @@ -248,6 +248,7 @@ module netcdf_io_mod public :: flush_file public :: partitioned_global_files public :: partitioned_section_files +public :: append_to_filepath_list !> @ingroup netcdf_io_mod interface netcdf_add_restart_variable @@ -691,21 +692,21 @@ function netcdf_file_open(fileobj, path, mode, nc_format, pelist, is_restart, do end function netcdf_file_open -subroutine append_to_filepath_list(fileobj, filepath_list) - class(FmsNetcdfFile_t), intent(in) :: fileobj !< File object. +subroutine append_to_filepath_list(filepath, filepath_list) + character(len=*), intent(in) :: filepath type(filepath_list_type), pointer :: filepath_list !< File path list. type(filepath_list_type), pointer :: current if (.not. associated(filepath_list)) then allocate(filepath_list) - call string_copy(filepath_list%path, trim(fileobj%path)) + call string_copy(filepath_list%path, trim(filepath)) else current => filepath_list do while (associated(current%next)) current => current%next enddo allocate(current%next) - call string_copy(current%next%path, trim(fileobj%path)) + call string_copy(current%next%path, trim(filepath)) endif end subroutine append_to_filepath_list @@ -722,9 +723,9 @@ subroutine netcdf_file_close(fileobj) if (fileobj%path(len_trim(fileobj%path)-7:len_trim(fileobj%path)-4) == ".nc.") then select type(fileobj) type is (FmsNetcdfFile_t) - call append_to_filepath_list(fileobj, partitioned_section_files) + call append_to_filepath_list(fileobj%path, partitioned_section_files) class default ! FmsNetcdfDomainFile_t - call append_to_filepath_list(fileobj, partitioned_global_files) + call append_to_filepath_list(fileobj%path, partitioned_global_files) end select else if (fileobj%path(len_trim(fileobj%path)-2:len_trim(fileobj%path)) /= ".nc") then call error("netcdf_file_close: Encountered unexpected netcdf file suffix: "//trim(fileobj%path)) From e6726afc93b7507937f0aa8caa231a193ae393e2 Mon Sep 17 00:00:00 2001 From: alperaltuntas Date: Thu, 10 Apr 2025 17:35:16 -0600 Subject: [PATCH 8/9] fix round robin combination algm by determining which IO PE combines which file based on the order of IO PES (as opposed to IO PE index, which is not guaranteed to be contiguous when land block elimination is enabled) --- diag_manager/diag_manager.F90 | 73 ++++++++++++++++++++++++++++------- diag_manager/mppnccombine.c | 8 ++-- 2 files changed, 65 insertions(+), 16 deletions(-) diff --git a/diag_manager/diag_manager.F90 b/diag_manager/diag_manager.F90 index 715220ac32..20622d2dab 100644 --- a/diag_manager/diag_manager.F90 +++ b/diag_manager/diag_manager.F90 @@ -3733,7 +3733,8 @@ subroutine combine_files() integer(c_int) :: smallest_pix ! The smallest IO PE index of the set of IO PEs writing the current section file. integer(c_int) :: ireturn ! Return code from mppnccombine integer(c_int) :: niopes ! Number of IO PEs participating in writing of global files - integer :: f + integer :: f ! File index for the global diagnostic files + integer :: pix_order ! 0-based order of the IO PE in the list of all IO PEs writing the file. type(filepath_list_type), pointer :: current type(filepath_list_type), pointer :: files_to_combine ! list of files to combined by this PE character(len=:), allocatable :: filepath @@ -3753,17 +3754,17 @@ subroutine combine_files() filepath = trim(adjustl(current%path)) outfile = filepath(1:len(filepath)-5) // c_null_char - ! get the number of files to combine (for the first file only). The number of files is the same for all global files. - if (niopes == 0) niopes = num_partitioned_files(outfile) - - if (niopes == -1) then - call error_mesg('diag_manager_mod::combine_files', 'num_partitioned_files failed', FATAL) - end if - - ! Read the IO PE index (pix) from the file suffix (e.g., 0000, 0001, etc.) - read(filepath(len(filepath)-3:len(filepath)),*) pix - - if (mod(f, niopes) == pix) then + ! get the number of files to combine (for the first global file only). The number of files is the + ! same for all global files. Similarly, get pix and pix_order for the first file only, since for + ! all global files, the pix and pix_order are the same. + if (niopes == 0) then + niopes = num_partitioned_files(outfile) + if (niopes == -1) call error_mesg('diag_manager_mod::combine_files', 'num_partitioned_files failed', FATAL) + read(filepath(len(filepath)-3:len(filepath)),*) pix + pix_order = get_pix_order(filepath, niopes, pix) + endif + + if (mod(f, niopes) == pix_order) then call append_to_filepath_list(outfile, files_to_combine) end if @@ -3803,7 +3804,7 @@ subroutine combine_files() if (associated(files_to_combine)) then current => files_to_combine do while (associated(current)) - write(stdout_unit,*) ' Combining file ' // trim(outfile) + write(stdout_unit,*) ' Combining file ' // filepath(1:len(filepath)-5) ireturn = exec_mppnccombine(current%path) if (ireturn /= 0) call error_mesg('diag_manager_mod::combine_files', 'mppnccombine failed for file ' // trim(current%path), FATAL) current => current%next @@ -3812,6 +3813,52 @@ subroutine combine_files() end subroutine combine_files + !> Given a filename and pix (process id for an IO PE), return the order of the pix + !! in the list of all IO PEs writing the file. The order is determined by the + !! order of the file suffixes (e.g., 0000, 0001, etc.) in the file name. For example, + !! In a list of files with suffixes [0001, 0003, 0004], the order of pix 0003 + !! is 1, and the order of pix 0004 is 2. + function get_pix_order(filename, num_files, pix) result(pix_order) + character(len=*), intent(in) :: filename + integer, intent(in) :: num_files + integer, intent(in) :: pix ! 0-based id of the IO PE + ! local + integer :: pix_order ! 0-based order of the pix in the list of all IO PEs writing the file + character(len=4) :: suffix ! 0000, 0001, etc. + integer :: npes ! total number of all PEs + integer :: i, f + logical :: exists + + npes = mpp_npes() + pix_order = -1 + + i = 0 + do f=0, num_files-1 + ! Increment i until a file with the suffix i is found + do while (i @brief Replaces diag_manager_end; close just one file: files(file) SUBROUTINE closing_file(file, time) INTEGER, INTENT(in) :: file diff --git a/diag_manager/mppnccombine.c b/diag_manager/mppnccombine.c index 72660d24b8..bf0546c673 100644 --- a/diag_manager/mppnccombine.c +++ b/diag_manager/mppnccombine.c @@ -390,13 +390,15 @@ char** find_partitioned_files(const char* filepath, int* count) { return 1; } - argc = 2 + file_count; + argc = 4 + file_count; argv = (char **)malloc(argc * sizeof(char *)); argv[0] = "mppnccombine"; - argv[1] = outfile; + argv[1] = "-f"; + argv[2] = "-m"; + argv[3] = outfile; for (int i = 0; i < file_count; i++) { - argv[2 + i] = partitioned_files[i]; + argv[4 + i] = partitioned_files[i]; } // if an old version exists, remove the output file From 05bb5a4a9e4c4520f268f7258734af2fc28b0643 Mon Sep 17 00:00:00 2001 From: alperaltuntas Date: Fri, 11 Apr 2025 11:11:50 -0600 Subject: [PATCH 9/9] comment out Combining file log --- diag_manager/diag_manager.F90 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/diag_manager/diag_manager.F90 b/diag_manager/diag_manager.F90 index 20622d2dab..d0d6e4a3ad 100644 --- a/diag_manager/diag_manager.F90 +++ b/diag_manager/diag_manager.F90 @@ -3804,7 +3804,7 @@ subroutine combine_files() if (associated(files_to_combine)) then current => files_to_combine do while (associated(current)) - write(stdout_unit,*) ' Combining file ' // filepath(1:len(filepath)-5) + !write(stdout_unit,*) ' Combining file ' // filepath(1:len(filepath)-5) ireturn = exec_mppnccombine(current%path) if (ireturn /= 0) call error_mesg('diag_manager_mod::combine_files', 'mppnccombine failed for file ' // trim(current%path), FATAL) current => current%next