Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to use Balsam native app for Globus transfers #311

Merged
merged 22 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions balsam/cmdline/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ def list_table(job_qs: "JobQuery", client: "RESTClient") -> None:
@click.option("--id", type=str)
@click.option("--by-state", type=bool, default=False, is_flag=True)
@click.option("-w", "--workdir", type=str)
@click.option("-a", "--app", type=str)
@click.option("--site", "site_selector", default="")
@click.option("-v", "--verbose", is_flag=True)
def ls(
Expand All @@ -255,6 +256,7 @@ def ls(
exclude_state: Optional[JobState],
id: Optional[int],
by_state: Optional[bool],
app: Optional[str],
workdir: Optional[str],
verbose: bool,
site_selector: str,
Expand All @@ -270,20 +272,28 @@ def ls(

balsam job ls --tag experiment=XPCS --tag system=H2O

3) Select Jobs by their state
3) Select Jobs by their app

balsam job ls --app flux_capacitance

4) Select Jobs by their state

balsam job ls --state JOB_FINISHED --tag system=H2O

4) Summarize Jobs by their state
5) Summarize Jobs by their state

balsam job ls --by-state [--tag system=H20]

5) Select a specific job by ID
6) Select a specific job by ID

balsam job ls --id [id]
"""
client = load_client()
job_qs = filter_by_sites(client.Job.objects.all(), site_selector)
if app:
app_qs = filter_by_sites(client.App.objects.all(), site_selector)
appo = fetch_app(app_qs, app)
job_qs = job_qs.filter(app_id=appo.id)
if tags:
job_qs = job_qs.filter(tags=tags)
if state:
Expand All @@ -303,11 +313,36 @@ def ls(
list_table(job_qs, client)


@job.command()
@click.option("-i", "--id", "job_ids", multiple=True, type=int)
@click.option("-s", "--state", "state", type=str)
def modify(job_ids: List[int], state: JobState) -> None:
"""
Modify Jobs

1) Modify Job State

balsam job modify --id 41 --id 42 --id 43 -s RESTART_READY
"""
client: RESTClient = load_client()
jobs = client.Job.objects.all()
if job_ids:
jobs = jobs.filter(id=job_ids)
else:
raise click.BadParameter("Provide either list of Job ids or tags to delete")
count = jobs.count()
assert count is not None
for job in jobs:
job.state = state
job.save()


@job.command()
@click.option("-i", "--id", "job_ids", multiple=True, type=int)
@click.option("-t", "--tag", "tags", multiple=True, type=str, callback=validate_tags)
@click.option("-y", "yes", is_flag=True, default=False)
@click.option("--all", is_flag=True, default=False)
def rm(job_ids: List[int], tags: List[str], all: bool) -> None:
def rm(job_ids: List[int], tags: List[str], yes: bool, all: bool) -> None:
"""
Remove Jobs

Expand Down Expand Up @@ -338,5 +373,5 @@ def rm(job_ids: List[int], tags: List[str], all: bool) -> None:
assert count is not None
if count < 1:
click.echo("No jobs match deletion query")
elif click.confirm(f"Really delete {count} jobs?"):
elif yes or click.confirm(f"Really delete {count} jobs?"):
jobs.delete()
14 changes: 10 additions & 4 deletions balsam/cmdline/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,25 @@ def sample_settings() -> None:


@site.command()
def globus_login() -> None:
@click.option("-e", "--endpoint_id", multiple=True, default=None, type=str, help="Globus Endpoint ID to enable")
def globus_login(endpoint_id: str) -> None:
"""
Get credentials for the Globus CLI

Necessary before any Globus CLI commands which require authentication will work
Necessary before any Globus CLI commands which require authentication will work.
This command directs you to the page necessary to permit the Globus CLI to make API
calls for you, and gets the OAuth2 tokens needed to use those permissions.
"""
# if not forcing, stop if user already logged in
if globus_auth.check_logged_in():
click.echo("You are already logged in!")
# user is logged in already, but let's ensure consents are in place for the
# requested endpoints
# FIXME: Since the globus API doesn't allow query of consents, we should
# should store the list of successful consents so we know if this is needed
globus_auth.do_link_auth_flow(force_new_client=False, endpoint_ids=endpoint_id)
return

globus_auth.do_link_auth_flow(force_new_client=True)
globus_auth.do_link_auth_flow(force_new_client=True, endpoint_ids=endpoint_id)

click.echo("You have successfully logged in to the Globus CLI!")
click.echo("You have successfully logged in to Globus")
4 changes: 3 additions & 1 deletion balsam/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class TransferSettings(BaseSettings):
transfer_locations: Dict[str, str] = {"theta_dtn": "globus://08925f04-569f-11e7-bef8-22000b9a448b"}
max_concurrent_transfers: int = 5
globus_endpoint_id: Optional[UUID] = None
globus_endpoint_site_path: Optional[str] = None
transfer_batch_size: int = 100
num_items_query_limit: int = 2000
service_period: int = 5
Expand Down Expand Up @@ -342,8 +343,9 @@ def build_services(self) -> "List[BalsamService]":
transfer_settings = dict(self.settings.transfers)
transfer_interfaces: Dict[str, TransferInterface] = {}
endpoint_id = transfer_settings.pop("globus_endpoint_id")
endpoint_path = transfer_settings.pop("globus_endpoint_site_path", "")
if endpoint_id:
transfer_interfaces["globus"] = GlobusTransferInterface(endpoint_id)
transfer_interfaces["globus"] = GlobusTransferInterface(endpoint_id, self.site_path, endpoint_path)
transfer_service = TransferService(
client=self.client,
site_id=self.site_id,
Expand Down
6 changes: 4 additions & 2 deletions balsam/config/defaults/alcf_polaris/job-template.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
#PBS -A {{ project }}
#PBS -q {{ queue }}

export http_proxy=http://proxy:3128
export https_proxy=http://proxy:3128
export http_proxy="http://proxy:3128"
export https_proxy="http://proxy:3128"

export PYTHONPATH=/home/turam/dev/polaris/balsam:$PYTHONPATH

#remove export PMI_NO_FORK=1
export BALSAM_SITE_PATH={{balsam_site_path}}
Expand Down
40 changes: 36 additions & 4 deletions balsam/platform/transfer/globus_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import os
import subprocess
import time
from pathlib import Path
from typing import List, Sequence, Tuple, Union
from pathlib import Path, PosixPath
from typing import List, Optional, Sequence, Tuple, Union
from uuid import UUID

from globus_sdk import TransferData
Expand Down Expand Up @@ -81,8 +81,10 @@ def submit_subproc(src_endpoint: UUID, dest_endpoint: UUID, batch: List[SrcDestR


class GlobusTransferInterface(TransferInterface):
def __init__(self, endpoint_id: UUID):
def __init__(self, endpoint_id: UUID, data_path: PathLike, endpoint_path: Optional[PathLike] = None):
self.endpoint_id: UUID = UUID(str(endpoint_id))
self.data_path: PathLike = data_path
self.endpoint_path: Optional[PathLike] = endpoint_path

@staticmethod
def _state_map(status: str) -> str:
Expand All @@ -100,14 +102,44 @@ def submit_task(
transfer_paths: Sequence[Tuple[PathLike, PathLike, bool]],
) -> str:
"""Submit Transfer Task via Globus CLI"""
transfer_paths_list: List[List[PathLike, PathLike, bool]] = [] # type: ignore [type-arg]
if direction == "in":
src_endpoint, dest_endpoint = UUID(str(remote_loc)), self.endpoint_id
# modify destination path according to configured endpoint path
if self.endpoint_path:
for transfer in transfer_paths:
transfer_paths_list.append(
[
transfer[0],
PosixPath(str(transfer[1]).replace(str(self.data_path), str(self.endpoint_path))),
transfer[2],
]
)
elif direction == "out":
src_endpoint, dest_endpoint = self.endpoint_id, UUID(str(remote_loc))
# modify source path according to configured endpoint path
if self.endpoint_path:
for transfer in transfer_paths:
transfer_paths_list.append(
[
PosixPath(str(transfer[0]).replace(str(self.data_path), str(self.endpoint_path))),
transfer[1],
transfer[2],
]
)
else:
raise ValueError("direction must be in or out")
try:
task_id = submit_sdk(src_endpoint, dest_endpoint, transfer_paths)
task_id = submit_sdk(src_endpoint, dest_endpoint, transfer_paths_list) # type: ignore
except TransferSubmitError as exc:
if "ConsentRequired" in eval(exc.args[0]):
logger.warn(
f"""Missing required data_access consent for Globus transfer.
Ensure that you have given consent for Balsam to transfer with the required
endpoints by executing the following command:
balsam site globus-login -e {src_endpoint} -e {dest_endpoint}"""
)
raise
except GlobusConnectionError as exc:
raise TransferRetryableError(f"GlobusConnectionError in Transfer task submission: {exc}") from exc
return str(task_id)
Expand Down
26 changes: 18 additions & 8 deletions balsam/util/globus_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
CLIENT_ID_OPTNAME = "client_id"
CLIENT_SECRET_OPTNAME = "client_secret"
TEMPLATE_ID_OPTNAME = "template_id"
DEFAULT_TEMPLATE_ID = "95fdeba8-fac2-42bd-a357-e068d82ff78e"
DEFAULT_TEMPLATE_ID = "b1ca1cb7-ae70-451c-be61-4b2a61a02e9c"
TRANSFER_AT_EXPIRES_OPTNAME = "transfer_access_token_expires"
TRANSFER_RT_OPTNAME = "transfer_refresh_token"
TRANSFER_AT_OPTNAME = "transfer_access_token"
Expand All @@ -31,7 +31,7 @@
SCOPES = (
"openid profile email "
"urn:globus:auth:scope:auth.globus.org:view_identity_set "
"urn:globus:auth:scope:transfer.api.globus.org:all"
"urn:globus:auth:scope:transfer.api.globus.org:all "
)

T = TypeVar("T")
Expand Down Expand Up @@ -184,7 +184,7 @@ def internal_auth_client(requires_instance: bool = False, force_new_client: bool
# if we require a new client to be made
if force_new_client or (requires_instance and not existing):
# register a new instance client with auth
body = {"client": {"template_id": template_id, "name": "Globus CLI"}}
body = {"client": {"template_id": template_id, "name": "Balsam"}}
res = template_client.post("/v2/api/clients", data=body)

# get values and write to config
Expand All @@ -194,11 +194,11 @@ def internal_auth_client(requires_instance: bool = False, force_new_client: bool
write_option(CLIENT_ID_OPTNAME, client_id)
write_option(CLIENT_SECRET_OPTNAME, client_secret)

return ConfidentialAppAuthClient(client_id, client_secret, app_name="Globus CLI")
return ConfidentialAppAuthClient(client_id, client_secret, app_name="Balsam")

# if we already have a client, just return it
elif existing:
return ConfidentialAppAuthClient(client_id, client_secret, app_name="Globus CLI")
return ConfidentialAppAuthClient(client_id, client_secret, app_name="Balsam")

# fall-back to a native client to not break old logins
# TOOD: eventually remove this behavior
Expand All @@ -220,7 +220,7 @@ def get_client() -> RetryingTransferClient:
on_refresh=_update_tokens,
)

return RetryingTransferClient(authorizer=authorizer, app_name="Globus CLI v2.1.0")
return RetryingTransferClient(authorizer=authorizer, app_name="Balsam")


def exchange_code_and_store_config(auth_client: EitherAuthClient, auth_code: str) -> None:
Expand Down Expand Up @@ -267,7 +267,9 @@ def _enqueue(optname: str, newval: str, revoke: bool = True) -> None:
write_option(optname, newval)


def do_link_auth_flow(session_params: Optional[Dict[str, Any]] = None, force_new_client: bool = False) -> bool:
def do_link_auth_flow(
session_params: Optional[Dict[str, Any]] = None, force_new_client: bool = False, endpoint_ids: str = ""
) -> bool:
"""
Prompts the user with a link to authenticate with globus auth
and authorize the CLI to act on their behalf.
Expand All @@ -277,11 +279,19 @@ def do_link_auth_flow(session_params: Optional[Dict[str, Any]] = None, force_new
# get the ConfidentialApp client object
auth_client = internal_auth_client(requires_instance=True, force_new_client=force_new_client)

# add optional endpoint to default scopes
scopes = SCOPES
if endpoint_ids:
for e in endpoint_ids:
scopes += (
f"urn:globus:auth:scope:transfer.api.globus.org:all[*https://auth.globus.org/scopes/{e}/data_access] "
)

# start the Confidential App Grant flow
auth_client.oauth2_start_flow(
redirect_uri=auth_client.base_url + "v2/web/auth-code",
refresh_tokens=True,
requested_scopes=SCOPES,
requested_scopes=scopes,
)

# prompt
Expand Down
13 changes: 10 additions & 3 deletions docs/user-guide/transfer.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,33 @@ Next, we configure the `transfers` section of `settings.yml`:
- `transfer_locations` should be set to a dictionary of trusted [location
aliases](./jobs.md#data-transfer). If you need to add Globus endpoints, they can
be inserted here.
- `globus_endpoint_id` should refer to the endpoint ID of the local Site. On public subscriber endpoints at large HPC facilities, this value will be set correctly in the default Balsam site configuration.
- `globus_endpoint_id` should refer to the endpoint ID of the local Site.
- `globus_endpoint_site_path` specifies the path on the Globus endpoint, which might be different from the path used on login/compute nodes (e.g. for ALCF home filesystem, paths begin with /home/${USER}, but on the dtn_home endpoint, paths begin with /${USER}.)
- `max_concurrent_transfers` determines the maximum number of in-flight
transfer tasks, where each task manages a *batch* of files for many Jobs.
- `transfer_batch_size` determines the maximum number of transfer items per transfer task. This should be tuned depending on your workload (a higher number makes sense to utilize available bandwidth for smaller files).
- `num_items_query_limit` determines the maximum number of transfer items considered in any single transfer task submission.
- `service_period` determines the interval (in seconds) between transfer task submissions.

Globus requires that you give Balsam consent to make transfers on your behalf; consent is granted for each endpoint that you intend to use. You can review your Globus consents [here](https://auth.globus.org/v2/web/consents). For any endpoints that you have configured above (including the `globus_endpoint_id`), determine the Globus endpoint id, and execute the following command:

balsam site globus-login -e ENDPOINT_ID1 -e ENDPOINT_ID2

Note that `globus_endpoint_id` in settings.yaml will be used to stage input data in, and to stage output data out. This endpoint id will depend on the filesystem where your site is located (e.g. at ALCF, if it's in your home directory, use alcf#dtn_home; if it's on the Eagle filesystem, use alcf#eagle_dtn). Also make sure that the path to your site is set to correspond to how it is mapped on your Globus endpoint, using the `globus_endpoint_site_path` setting above.

Once `settings.yml` has been configured appropriately, be sure to restart the Balsam Site:

```bash
$ balsam site sync
```

The Site will start issuing stage in and stage out tasks immediately and
advancing Jobs as needed. We can track the state of transfers using the [Python
advancing Jobs as needed. The state of transfers can be tracked using the [Python
API](./api.md):

```python
from balsam.api import TransferItem

for item in TransferItem.objects.filter(direction="in", state="active"):
print(f"File {item.remote_path} is currently staging in via task ID: {item.task_id}")
```
```
3 changes: 2 additions & 1 deletion requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mypy==0.971
mkdocs-material>=7.1.9,<9.0.0
coverage
black
#jupyter-black
build
twine
flake8
Expand All @@ -26,4 +27,4 @@ types-python-dateutil
types-redis
types-requests
types-ujson
types-jwt
types-jwt