diff --git a/.gitignore b/.gitignore index 914e4727941..47db2488716 100644 --- a/.gitignore +++ b/.gitignore @@ -184,5 +184,3 @@ versions/run.ver # jcb checkout and symlinks ush/python/jcb -workflow/jcb -ci/scripts/jcb diff --git a/dev/parm/config/gfs/config.globus b/dev/parm/config/gfs/config.globus index 5d02be04861..22510ddcd87 100644 --- a/dev/parm/config/gfs/config.globus +++ b/dev/parm/config/gfs/config.globus @@ -12,20 +12,24 @@ echo "BEGIN: config.globus" export STAGE_DIR="${DATAROOT}/archive_rotdir/${PSLOT}" # Set variables used by the Sven and Doorman services -# General delivery location on Niagara (staging area for data) -export SERVER_HOME='/collab1/data/{{SERVER_USERNAME}}' -# Sven's dropbox -export SVEN_DROPBOX_ROOT="${DATA}/SVEN_DROPBOX" +# Server name where the doorman will run +export SERVER_NAME="mercury" -# Location of the doorman package on Niagara -export DOORMAN_ROOT="/home/David.Huber/doorman" +# Username on the server +export SERVER_USERNAME="" +# General delivery location on Mercury (staging area for data on server) +export SERVER_HOME="/collab2/data/${SERVER_USERNAME}/${PSLOT}" -# Server name (should match ~/.ssh/config) -export SERVER_NAME="niagara" +# Location of the doorman package on Mercury +export DOORMAN_ROOT="/home/David.Huber/doorman" # Server globus UUID -niagara_UUID="1bfd8a79-52b2-4589-88b2-0648e0c0b35d" -export SERVER_GLOBUS_UUID="${niagara_UUID}" +# niagara_UUID="1bfd8a79-52b2-4589-88b2-0648e0c0b35d" +mercury_UUID="e24545db-4d02-4b80-8aa0-fda791ddc604" +export SERVER_GLOBUS_UUID="${mercury_UUID}" + +# Sven's dropbox (temporary local directory) +export SVEN_DROPBOX_ROOT="${DATA}/SVEN_DROPBOX" echo "END: config.globus" diff --git a/dev/workflow/applications/applications.py b/dev/workflow/applications/applications.py index 44ae2210aa4..2ba8f515538 100644 --- a/dev/workflow/applications/applications.py +++ b/dev/workflow/applications/applications.py @@ -260,6 +260,10 @@ def _check_globus(self, conf): if "rdhpcs" in domain: rdhpcs_uid_found = True + if globus_conf.get("SERVER_USERNAME", "") == "": + raise ValueError(f"The username for {globus_conf.SERVER_NAME} was not provided. " + f"Please provide your username in {globus_conf.EXPDIR}/config.globus as SERVER_USERNAME.") + if not local_uid_found or not rdhpcs_uid_found: logger.error(f"ERROR a globus session is not yet established on {globus_conf.SERVER_NAME}. " "Please establish a globus connection!") diff --git a/dev/workflow/hosts.py b/dev/workflow/hosts.py index 70a337ff63d..91d711b7fe7 100644 --- a/dev/workflow/hosts.py +++ b/dev/workflow/hosts.py @@ -24,57 +24,60 @@ def __init__(self, host=None): raise NotImplementedError(f'{host} is not a supported host.\n' + 'Currently supported hosts are:\n' + f'{" | ".join(Host.SUPPORTED_HOSTS)}') + # If Host is instantiated with "host", use it + elif host is not None: + self.machine = host + # Otherwise, detect the host. + else: + # Detect the host if not provided + self.detect() if host is None else host - # Detect the host if not provided - detected_host = self.detect() if host is None else host - - if host is not None and host != detected_host: - raise ValueError( - f'detected host: "{detected_host}" does not match provided host: "{host}"') - - self.machine = detected_host self.info = self._get_info self.scheduler = self.info['SCHEDULER'] - @classmethod - def detect(cls): + def __str__(self) -> str: + # The string representation of the Host object is the name of the machine + return f"{self.machine}" + + def detect(self) -> None: + # Detect the machine name and store in self.machine - machine = os.getenv('MACHINE_ID', 'UNKNOWN') + machine_id = os.getenv('MACHINE_ID', 'UNKNOWN') pw_csp = os.getenv('PW_CSP', 'UNKNOWN') container = os.getenv('SINGULARITY_NAME', None) # Detect the machine since MACHINE_ID is set, # Additionaly, if PW_CSP is set, then the machine is a cloud machine - if machine != 'UNKNOWN': + if machine_id != 'UNKNOWN': if pw_csp != 'UNKNOWN': - machine = f"{pw_csp.upper()}PW" - return machine + self.machine = f"{pw_csp.upper()}PW" + return # Detect the machine since MACHINE_ID is not set if os.path.exists('/scratch1/NCEPDEV'): - machine = 'HERA' + self.machine = 'HERA' elif os.path.exists('/work/noaa'): - machine = socket.gethostname().split("-", 1)[0].upper() + self.machine = socket.gethostname().split("-", 1)[0].upper() elif os.path.exists('/lfs/f1'): - machine = 'WCOSS2' + self.machine = 'WCOSS2' elif os.path.exists('/gpfs/f5'): - machine = 'GAEAC5' + self.machine = 'GAEAC5' elif os.path.exists('/gpfs/f6'): - machine = 'GAEAC6' + self.machine = 'GAEAC6' elif container is not None: - machine = 'CONTAINER' + self.machine = 'CONTAINER' elif pw_csp is not None: if pw_csp.lower() not in ['azure', 'aws', 'google']: raise ValueError( f'cloud service provider "{pw_csp}" is not supported.') - machine = f"{pw_csp.upper()}PW" + self.machine = f"{pw_csp.upper()}PW" - if machine not in Host.SUPPORTED_HOSTS: - raise NotImplementedError(f'This machine is not a supported host.\n' + + if self.machine not in Host.SUPPORTED_HOSTS: + raise NotImplementedError('This machine is not a supported host.\n' + 'Currently supported hosts are:\n' + f'{" | ".join(Host.SUPPORTED_HOSTS)}') - return machine + return @property def _get_info(self) -> dict: @@ -88,6 +91,6 @@ def _get_info(self) -> dict: except IOError: raise IOError(f'Unable to read from {hostfile}') except Exception: - raise Exception(f'unable to get information for {self.machine}') + raise Exception(f'unable to get information for {self}') return info diff --git a/dev/workflow/rocoto/gfs_tasks.py b/dev/workflow/rocoto/gfs_tasks.py index f063d28de61..f940bd6f0af 100644 --- a/dev/workflow/rocoto/gfs_tasks.py +++ b/dev/workflow/rocoto/gfs_tasks.py @@ -1271,8 +1271,6 @@ def wavepostbndpnt(self): def wavepostbndpntbll(self): # The wavepostbndpntbll job runs on forecast hours up to FHMAX_WAV_IBP - last_fhr = self._configs['wavepostbndpntbll']['FHMAX_WAV_IBP'] - deps = [] dep_dict = {'type': 'task', 'name': f'{self.run}_wavepostbndpnt'} deps.append(rocoto.add_dependency(dep_dict)) @@ -2218,10 +2216,10 @@ def cleanup(self): deps.append(rocoto.add_dependency(dep_dict)) if self.options['do_archcom']: if self.options['do_globusarch']: - dep_dict = {'type': 'metatask', 'name': f'{self.run}_globus_arch'} + dep_dict = {'type': 'metatask', 'name': f'{self.run}_globus_earc'} else: dep_dict = {'type': 'metatask', 'name': f'{self.run}_earc_tars'} - deps.append(rocoto.add_dependency(dep_dict)) + deps.append(rocoto.add_dependency(dep_dict)) else: if self.app_config.mode in ['cycled']: diff --git a/dev/workflow/rocoto/workflow_xml.py b/dev/workflow/rocoto/workflow_xml.py index 7dc1f11ac70..d84f02c5b45 100644 --- a/dev/workflow/rocoto/workflow_xml.py +++ b/dev/workflow/rocoto/workflow_xml.py @@ -8,7 +8,7 @@ from typing import Dict from applications.applications import AppConfig from rocoto.workflow_tasks import get_wf_tasks -from wxflow import to_timedelta, which, ProcessError, mkdir +from wxflow import to_timedelta, which, mkdir import rocoto.rocoto as rocoto from abc import ABC, abstractmethod from hosts import Host @@ -233,40 +233,17 @@ def _write_server_crontab(self, cronint: int = 1): expdir = globus_conf["EXPDIR"] pslot = globus_conf["PSLOT"] - server = globus_conf["SERVER_NAME"] - server_home = globus_conf["SERVER_HOME"] - - # Get the server username from ~/.ssh/config - # TODO move this to an earlier point and actually amend config.globus with the username - ssh = which("ssh") - if ssh is None: - raise ProcessError("Failed to locate the ssh command!") - - try: - ssh_output = ssh("-G", server, output=str).split("\n") - except ProcessError: - logger.warning(f"Failed to automatically determine the username for {server}.") - ssh_output = "" - - server_username = None - for line in ssh_output: - if line.startswith("user "): - server_username = line.split()[1] - - # If ssh -G failed or the username could not be determined, ask for it - if not server_username: - server_username = input(f"Please provide your username for {server} (this is required to use globus): ") - if server_username == "": - raise ValueError("A valid username must be provided!") - - server_home = server_home.replace( - "{{SERVER_USERNAME}}", server_username - ) - - try: - replyto = os.environ['REPLYTO'] - except KeyError: - replyto = '' + server = globus_conf.get("SERVER_NAME", None) + server_home = globus_conf.get("SERVER_HOME", None) + server_username = globus_conf.get("SERVER_USERNAME", None) + + if not (server and server_home and server_username): + raise ValueError( + "ERROR: At least one server variable is missing!\n" + f"Check that SERVER_NAME, SERVER_HOME, and SERVER_USERNAME are defined in {expdir}/config.globus" + ) + + replyto = os.environ.get('REPLYTO', "") crontab_file = os.path.join(expdir, f"{pslot}.{server}.crontab") diff --git a/dev/workflow/setup_expt.py b/dev/workflow/setup_expt.py index 2913fa7328d..e22e3e480e8 100755 --- a/dev/workflow/setup_expt.py +++ b/dev/workflow/setup_expt.py @@ -56,7 +56,7 @@ def _update_defaults(dict_in: dict) -> dict: # Combine host.info and inputs_dict into a single dict, add some additional keys host_plus_inputs_dict = AttrDict(host.info, **inputs_dict_remapped) host_plus_inputs_dict.HOMEgfs = _top - host_plus_inputs_dict.MACHINE = host.machine.upper() + host_plus_inputs_dict.MACHINE = str(host).upper() # Read in the YAML file yaml_path = inputs.yaml @@ -303,11 +303,11 @@ def query_and_clean(dirname, force_clean=False): create_dir = True if os.path.exists(dirname): - logger.warning(f'directory already exists in:') + logger.warning('directory already exists in:') logger.warning(f' {dirname}') if force_clean: overwrite = "YES" - logger.warning(f'removing directory ...') + logger.warning('removing directory ...') logger.warning(f' {dirname}') else: overwrite = input('Do you wish to over-write [y/N]: ') @@ -322,7 +322,7 @@ def query_and_clean(dirname, force_clean=False): # @logit(logger) def validate_user_request(host, inputs): supp_res = host.info['SUPPORTED_RESOLUTIONS'] - machine = host.machine + machine = host for attr in ['resdetatmos', 'resensatmos']: try: expt_res = f'C{getattr(inputs, attr)}' @@ -378,10 +378,10 @@ def main(*argv): update_configs(host, user_inputs) max_len = max(len(expdir), len(rotdir)) + 8 - logger.info(f"*" * max_len) + logger.info("*" * max_len) logger.info(f'EXPDIR: {expdir}') logger.info(f'ROTDIR: {rotdir}') - logger.info(f"*" * max_len) + logger.info("*" * max_len) if __name__ == '__main__': diff --git a/docs/source/configure.rst b/docs/source/configure.rst index 1bc170fa9b7..694f572453d 100644 --- a/docs/source/configure.rst +++ b/docs/source/configure.rst @@ -55,7 +55,7 @@ The global-workflow configs contain switches that change how the system runs. Ma | | | or globus_hpss| | where the COM structure tarballs should be saved. | | | | | | Choices are 'hpss', 'local', or 'globus_hpss'. | | | | | | HPSS archiving requires a direct connection. | -| | | | | Globus-HPSS archiving uses Niagara as a server to | +| | | | | Globus-HPSS archiving uses Mercury as a server to | | | | | | archiving to HPSS. This is currently only | | | | | | supported on Hercules. Defaults are machine | | | | | | specific. | diff --git a/docs/source/globus_arch.rst b/docs/source/globus_arch.rst index 08df6b1a3e4..b79f6e78e94 100644 --- a/docs/source/globus_arch.rst +++ b/docs/source/globus_arch.rst @@ -4,13 +4,13 @@ Setup Globus Connections for HPSS ================================= - The Global Workflow archives and retrieves data from HPSS. Some systems, such as Hera and WCOSS2, have direct connections to HPSS, while others like Hercules do not. To enable HPSS transfers, RDHPCS Niagara offers temporary disk space and HPSS connections. The high-throughput Globus protocol is used to schedule and transfer data to Niagara where a service (The Doorman) runs jobs to transfer data to HPSS. To make use of this service, users must initialize their connections to Globus and Niagara. This guide provides instructions on how to enable these services. + The Global Workflow archives and retrieves data from HPSS. Some systems, such as Hera and WCOSS2, have direct connections to HPSS, while others like Hercules do not. To enable HPSS transfers, RDHPCS Mercury offers temporary disk space and HPSS connections. The high-throughput Globus protocol is used to schedule and transfer data to Mercury where a service (The Doorman) runs jobs to transfer data to HPSS. To make use of this service, users must initialize their connections to Globus and Mercury. This guide provides instructions on how to enable these services. ^^^^^^^^^^^^^^^^^ Setting Up Globus ^^^^^^^^^^^^^^^^^ -The Globus service offers extremely fast connections between MSU and RDHPCS machines. To make use of this service, you will first need to establish connections from the client (e.g. Hercules) and the server (i.e. Niagara). RDHPCS maintains a guide on this procedure, which can be found in their `Globus Guide `__. The simplest way to establish your connection is by running ``globus login`` (after loading the ``globus-cli`` module). If you have trouble with this or working through the guide, contact RDHPCS for assistance. +The Globus service offers extremely fast connections between MSU and RDHPCS machines. To make use of this service, you will first need to establish connections from the client (e.g. Hercules) and the server (i.e. Mercury). RDHPCS maintains a guide on this procedure, which can be found in their `Globus Guide `__. The simplest way to establish your connection is by running ``globus login`` (after loading the ``globus-cli`` module). If you have trouble with this or working through the guide, contact RDHPCS for assistance. Once you are logged in, verify that the Globus connection is active on the client. First, load the ``globus-cli`` module, then run ``globus session show``. You should see an entry for your RDHPCS user account. @@ -19,11 +19,11 @@ To test the connection and verify that your session is active, you can attempt a .. code-block:: bash echo "Example" > example.file # Create a test file - globus endpoint search rdhpcs#niagara # Get Niagara's Globus ID + globus endpoint search rdhpcs#mercury # Get Mercury's Globus ID globus endpoint search msuhpc2#Hercules-dtn # Get Hercules' Globus ID # Transfer the file; this will print a transfer ID if successfully initialized - globus transfer ':/full/path/to/example.file' ':/collab1/data//example.file' + globus transfer ':/full/path/to/example.file' ':/collab1/data//example.file' # Wait on the transfer to complete globus task wait @@ -34,4 +34,14 @@ If the above snippet is successful, then you are good to go. It's possible that Common Globus Issues ^^^^^^^^^^^^^^^^^^^^ -Note that the globus connection stays active for 7 days. If your experiment fails in a globus* job, then this may be the culprit. Try running the following from either an MSU or Niagara terminal: ``globus session update``. You will be prompted to enter a link into a browser and respond with the corresponding confirmation code. Once this is complete, try rebooting the failing job(s). +Note that the globus connection stays active for 7 days. If your experiment fails in a globus* job, then this may be the culprit. Try running the following from either an MSU or Mercury terminal: ``globus session update --all``. You will be prompted to enter a link into a browser and respond with the corresponding confirmation code. Once this is complete, try rebooting the failing job(s). + +For some users, the new system, Mercury, occassionally fails to add all necessary permissions necessary to run globus transfers. If you receive an error about needing to add ``data_access`` in the logs, then login to Mercury and execute + +.. code-block:: + module load globus-cli + globus session update --all + # Get the host UUID + globus endpoint search hercules # Replace Hercules with the system you are running the global workflow on + # Below, replace with the UUID found in the above command + globus session consent 'urn:globus:auth:scope:transfer.api.globus.org:all[*https://auth.globus.org/scopes//data_access]' diff --git a/docs/source/index.rst b/docs/source/index.rst index 617d01a3bdf..43556ee16fb 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -17,7 +17,6 @@ Code managers ============= * Kate Friedman - @KateFriedman-NOAA / kate.friedman@noaa.gov -* Walter Kolczynski - @WalterKolczynski-NOAA / walter.kolczynski@noaa.gov * David Huber - @DavidHuber-NOAA / david.huber@noaa.gov ============= diff --git a/docs/source/jobs.rst b/docs/source/jobs.rst index 174e68258c6..dbc7bd39298 100644 --- a/docs/source/jobs.rst +++ b/docs/source/jobs.rst @@ -47,7 +47,7 @@ Jobs in the GFS Configuration +-------------------+-----------------------------------------------------------------------------------------------------------------------+ | earcN/eamn | Archival script for EnKF that write selected EnKF output to HPSS or locally | +-------------------+-----------------------------------------------------------------------------------------------------------------------| -| globus_earcN | Additional archival script that pushes data to HPSS via Niagara. | +| globus_earcN | Additional archival script that pushes data to HPSS via Mercury. | +-------------------+-----------------------------------------------------------------------------------------------------------------------+ | ecenN/ecmn | Recenter ensemble members around hi-res deterministic analysis. GFS v16 recenters ensemble member analysis. | | | increments. | diff --git a/docs/source/setup.rst b/docs/source/setup.rst index 29e0297feed..3b700fed9cf 100644 --- a/docs/source/setup.rst +++ b/docs/source/setup.rst @@ -235,7 +235,7 @@ Go to your EXPDIR and check/change the following variables within your config.ba * HPSS_PROJECT (project on HPSS if archiving) * ATARDIR (location on HPSS or locally if archiving) -`NOTE`: If you selected ``ARCHCOM_TO='globus_hpss``, then you will need to activate your globus connections between Niagara and MSU. See :doc: globus_arch.rst for more details. +`NOTE`: If you selected ``ARCHCOM_TO='globus_hpss``, then you will need to activate your globus connections between Mercury and MSU. See :doc: globus_arch.rst for more details. Now is also the time to change any other variables/settings you wish to change in config.base or other configs. `Do that now.` Once you are done making changes to the configs in your EXPDIR, go back to your clone to run the second setup script. See :doc: configure.rst for more information on configuring your run. diff --git a/parm/config/sfs/config.globus b/parm/config/sfs/config.globus new file mode 120000 index 00000000000..acbaffeb725 --- /dev/null +++ b/parm/config/sfs/config.globus @@ -0,0 +1 @@ +../gfs/config.globus \ No newline at end of file diff --git a/parm/globus/init_xfer.sh.j2 b/parm/globus/init_xfer.sh.j2 index 2f6af51cc26..e4d1cbaa90b 100644 --- a/parm/globus/init_xfer.sh.j2 +++ b/parm/globus/init_xfer.sh.j2 @@ -19,7 +19,7 @@ do rm -f "${mkdir_req_fl}" done < <(find "{{server_home}}" -maxdepth 1 -name "req_mkdir.*" || true) -# Look for scripts +# Look for scripts and run them. while IFS= read -r dir do echo "${run_time}" > "${runtime_log}" @@ -29,6 +29,7 @@ do # Check if the corresponding log has already been written log="${script/.sh/.log}" if [[ ! -f "${log}" ]]; then + touch "${log}" "${script}" fi done < <(find "${dir}" -name "run_doorman.sh" || true) diff --git a/parm/globus/run_doorman.sh.j2 b/parm/globus/run_doorman.sh.j2 index 20b77fe128b..f15e55e2fc8 100644 --- a/parm/globus/run_doorman.sh.j2 +++ b/parm/globus/run_doorman.sh.j2 @@ -1,11 +1,11 @@ #!/usr/bin/env bash -# This script runs on Niagara to interact with the Doorman service +# This script runs on Mercury to interact with the Doorman service # Redirect all output to a local file -- it cannot be sent back directly with tty/pty script_relpath="$(dirname "${BASH_SOURCE[0]}")" log_file="${script_relpath}/run_doorman.log" cd "${script_relpath}" || exit 1 -# Initialize the shell (hpss, modules, etc) (this is a Niagara-specific path; parameterize if needed) +# Initialize the shell (hpss, modules, etc) (this is a Mercury-specific path; parameterize if needed) if [[ -f /etc/bashrc ]]; then source /etc/bashrc fi @@ -19,7 +19,7 @@ rm -f FLIST general_delivery_root="{{run_directory}}/GENERAL_DELIVERY" rm -rf "${general_delivery_root}" -# Tell the doorman where the general delivery space is on Niagara (unique for each RUN/cycle) +# Tell the doorman where the general delivery space is on Mercury (unique for each RUN/cycle) { # This is where tarballs will be received and confirmations are written and sent. echo "export GENDEL=${general_delivery_root}" diff --git a/scripts/exglobal_globus_arch.py b/scripts/exglobal_globus_arch.py index da55464986e..19698faa4c8 100755 --- a/scripts/exglobal_globus_arch.py +++ b/scripts/exglobal_globus_arch.py @@ -32,7 +32,7 @@ def main(): # Determine which tarballs to send transfer_sets = globus.configure(globus_dict) - # Send the tarballs to HPSS via Niagara. Start with non-rstprod (standard) data + # Send the tarballs to HPSS via Mercury. Start with non-rstprod (standard) data for transfer_set in ["standard", "rstprod"]: has_rstprod = transfer_set == "rstprod" globus.execute_transfer_data(transfer_sets[transfer_set], has_rstprod) diff --git a/scripts/exglobal_globus_earc.py b/scripts/exglobal_globus_earc.py index f0080f955ce..e528483c575 100755 --- a/scripts/exglobal_globus_earc.py +++ b/scripts/exglobal_globus_earc.py @@ -32,7 +32,7 @@ def main(): # Determine which tarballs to send transfer_sets = globus.configure(globus_dict) - # Send the tarballs to HPSS via Niagara. Start with non-rstprod (standard) data + # Send the tarballs to HPSS via Mercury. Start with non-rstprod (standard) data count_sets = 0 for transfer_set in ["standard", "rstprod"]: if len(transfer_sets[transfer_set]['locations']) > 0: diff --git a/ush/python/pygfs/task/archive.py b/ush/python/pygfs/task/archive.py index f80dbb6c5b0..38124737691 100644 --- a/ush/python/pygfs/task/archive.py +++ b/ush/python/pygfs/task/archive.py @@ -638,6 +638,8 @@ def _create_datasets_yaml(self, datasets, group=-1): output_yaml[dataset.name] = {"target": dataset.target, "has_rstprod": dataset.has_rstprod} + logger.debug(f"Writing the dataset YAML to {yaml_filename}") + logger.debug("YAML contents: \n" + f"{output_yaml}") save_as_yaml(output_yaml, yaml_filename) @logit(logger) diff --git a/ush/python/pygfs/task/globus_hpss.py b/ush/python/pygfs/task/globus_hpss.py index 1d7aa065afc..f147e44ea78 100644 --- a/ush/python/pygfs/task/globus_hpss.py +++ b/ush/python/pygfs/task/globus_hpss.py @@ -50,16 +50,12 @@ def __init__(self, config: Dict[str, Any]) -> None: except CommandNotFoundError: raise FileNotFoundError("FATAL ERROR Could not find the globus command!") - try: - self.ssh = which("ssh", required=True) - except CommandNotFoundError: - raise FileNotFoundError("FATAL ERROR Could not find the ssh command!") - self.wd = os.getcwd() # Prep some globus commands self.globus_rm = copy.deepcopy(self.globus) self.globus_xfr = copy.deepcopy(self.globus) + self.globus_mkdir = copy.deepcopy(self.globus) self.globus_wait = copy.deepcopy(self.globus) # Recursively remove the target, notify on failure, and ignore missing files @@ -69,35 +65,18 @@ def __init__(self, config: Dict[str, Any]) -> None: self.globus_xfr.add_default_arg(["transfer", "--notify", "failed", "--preserve-mtime", "--sync-level", "mtime", "--jmespath", "task_id", "--format=UNIX"]) + + # Make a directory on a target system via globus + self.globus_mkdir.add_default_arg(["mkdir", "--format=UNIX"]) + # Wait on a task ID to finish and output the status of the transfer when complete self.globus_wait.add_default_arg(["task", "wait", "--jmespath", "status", "--format=UNIX", "--timeout", "120"]) - # Get the user's server username from their ~/.ssh/config file - self.server_name = self.task_config.SERVER_NAME - try: - ssh_output = self.ssh("-G", f"{self.server_name}", output=str) - except ProcessError as pe: - raise ProcessError( - f"FATAL ERROR No host information on {self.server_name}!" - "\n" - f"Please add an entry for {self.server_name} into ~/.ssh/config!" - ) from pe - self.CLIENT_GLOBUS_UUID = self.task_config.CLIENT_GLOBUS_UUID self.SERVER_GLOBUS_UUID = self.task_config.SERVER_GLOBUS_UUID - # Parse the ssh output to find the user's Niagara username - ssh_output = ssh_output.split("\n") - for line in ssh_output: - if line.startswith("user "): - server_username = line.split()[1] - - # Update the home directory on the server with the username - self.server_home = self.task_config.SERVER_HOME.replace( - "{{SERVER_USERNAME}}", server_username - ) - - logger.debug(f"Server username detected as {server_username}") + self.server_home = self.task_config.SERVER_HOME + self.server_name = self.task_config.SERVER_NAME local_dict = AttrDict({ 'sven_dropbox': (f"{self.task_config.SVEN_DROPBOX_ROOT}"), @@ -113,7 +92,7 @@ def configure(self, globus_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[s send them to HPSS via Globus and verify success. There are two services running that handle passing and running scripts. - On the client (e.g. Hercules), there is Sven. On the server (i.e. Niagara), there is + On the client (e.g. Hercules), there is Sven. On the server (i.e. Mercury), there is the Doorman. Sven packages up the file list and scripts that need to run on the server and the Doorman executes the scripts on each of the files. The six files involved are @@ -232,7 +211,7 @@ def configure(self, globus_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[s @logit(logger) def execute_transfer_data(self, transfer_set: Dict[str, Any], has_rstprod: bool) -> None: - """Interface function with Sven to send tarballs to HPSS via Niagara. + """Interface function with Sven to send tarballs to HPSS via Mercury. Parameters ---------- @@ -275,7 +254,7 @@ def execute_transfer_data(self, transfer_set: Dict[str, Any], has_rstprod: bool) sven_output = self.forsven(output=str) logger.debug(sven_output) except ProcessError as pe: - raise ProcessError("FATAL ERROR Sven failed to package the request" + raise ProcessError("FATAL ERROR Sven failed to package the request " f"for {location}") from pe # Parse Sven's output to get the name of the return status file @@ -288,7 +267,7 @@ def execute_transfer_data(self, transfer_set: Dict[str, Any], has_rstprod: bool) transfer_set["completed"].append(False) transfer_set["successes"].append(False) - # Transfer the doorman script to Niagara. + # Transfer the doorman script to Mercury. # Note, this assumes we have unattended transfer capability. try: # Now transfer and rename the script @@ -302,9 +281,9 @@ def execute_transfer_data(self, transfer_set: Dict[str, Any], has_rstprod: bool) logger.debug("Successfully transferred the doorman script") except (ProcessError, ConnectionError) as pe: - raise ProcessError("FATAL ERROR Failed to send doorman run script to Niagara") from pe + raise ProcessError("FATAL ERROR Failed to send doorman run script to Mercury") from pe - # Now wait for the doorman script to run via cron on Niagara. + # Now wait for the doorman script to run via cron on Mercury. # Once complete, Sven's dropbox should fill up with status files. wait_count = 0 sleep_time = 60 # s @@ -411,6 +390,14 @@ def _init_server(self, server_dir): raise ProcessError("FATAL ERROR Failed to request a mkdir on the server!") try: + self._wait_on_task_id(self.globus_mkdir( + f"{self.CLIENT_GLOBUS_UUID}:{self.wd}", suppress_errors=True + )) + except ProcessError: + logger.info("Globus reported that it could not create the directory. This is likely because it already exists. Continuing.") + + try: + # If globus was unable to mkdir for another reason, this will fail. self._wait_on_task_id(self.globus_xfr( f"{self.CLIENT_GLOBUS_UUID}:{self.wd}/init_xfer.sh", f"{self.SERVER_GLOBUS_UUID}:{self.server_home}/init_xfer_{pslot}.sh", @@ -451,13 +438,13 @@ def _init_server(self, server_dir): logger.info("Server initialized successfully!") @logit(logger) - def _wait_on_task_id(self, task_id): + def _wait_on_task_id(self, task_id, suppress_errors=False): # The task_id usually has a newline character at the end. Strip that to begin. task_id = task_id.strip() status = self.globus_wait(task_id, output=str).strip() - if status != "SUCCEEDED": + if status != "SUCCEEDED" and not suppress_errors: raise ConnectionError(f"Globus failed on task ID {task_id}") @logit(logger) @@ -466,7 +453,7 @@ def clean(self): Remove the temporary directories/files created by the GlobusHpss task. """ - # Write requests to delete the working directories on Niagara + # Write requests to delete the working directories on Mercury req_file = f"req_rmdir.{self.task_config.jobid}" for job_dir in self._server_job_dirs: with open(req_file, "w") as rmdir_f: