Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Initialization of support for S3 #540

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions pydra/engine/helpers_aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""List of helper methods and clients."""
# TODO It would be nice to rewrite this to some class (e.g. AwsManager).

try:
import boto3
except ImportError:
pass

import logging

logger = logging.getLogger("pydra")

s3_client: boto3.client = None


def get_s3_client():
"""Lazy getter for S3 client."""

global s3_client

if not s3_client:
s3_client = boto3.client("s3")

return s3_client
6 changes: 6 additions & 0 deletions pydra/engine/helpers_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,3 +811,9 @@ def is_existing_file(value):
return Path(value).exists()
except TypeError:
return False


def is_s3_file(f):
from .specs import S3File

return isinstance(f, S3File)
43 changes: 42 additions & 1 deletion pydra/engine/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ class File:
"""An :obj:`os.pathlike` object, designating a file."""


class S3File(File):
"""Remote file in AWS S3 block storage."""

bucket_name: str = None
obj_key: str = None

@classmethod
def converter(cls, value):

# from helpers_aws import get_s3_client
# s3_client = get_s3_client()

# Upload file

raise NotImplementedError("S3File Converter")


class Directory:
"""An :obj:`os.pathlike` object, designating a folder."""

Expand Down Expand Up @@ -183,7 +200,7 @@ def check_fields_input_spec(self):
require_to_check[fld.name] = mdata["requires"]

if (
fld.type in [File, Directory]
fld.type in [File, Directory, S3File]
or "pydra.engine.specs.File" in str(fld.type)
or "pydra.engine.specs.Directory" in str(fld.type)
):
Expand All @@ -196,6 +213,10 @@ def check_fields_input_spec(self):

def _file_check(self, field):
"""checking if the file exists"""

if field.type is S3File:
return self._s3_file_exists(field)

if isinstance(getattr(self, field.name), list):
# if value is a list and type is a list of Files/Directory, checking all elements
if field.type in [ty.List[File], ty.List[Directory]]:
Expand All @@ -213,6 +234,26 @@ def _file_check(self, field):
f"the file {file} from the {field.name} input does not exist"
)

def _s3_file_exists(self, field):
"""Checks if the file exists in the bucket and is accessible."""

assert isinstance(field.type, S3File), f"Field {field} is not of type S3File."

try:
from botocore.exceptions import ClientError
from helpers_aws import get_s3_client
except ImportError:
pass

s3_client = get_s3_client()

bucket, key = field.bucket, field.obj_key
try:
s3_client.head_object(Bucket=bucket, Key=key)
return True
except ClientError:
return False

def check_metadata(self):
"""Check contained metadata."""

Expand Down
6 changes: 6 additions & 0 deletions pydra/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,3 +825,9 @@ def container_args(self):
cargs.extend(["--pwd", str(self.output_cpath)])
cargs.append(self.inputs.image)
return cargs


class ServerlessTask(ShellCommandTask):
"""Placeholder for a type of Tasks running using a FaaS (aka Serverless) platform."""

pass
13 changes: 13 additions & 0 deletions pydra/engine/tests/test_helpers_aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import botocore

from ..helpers_aws import (
s3_client,
get_s3_client,
)


def test_get_s3_client():
assert s3_client is None
r = get_s3_client()

assert isinstance(r, botocore.client.BaseClient)
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ test_requires =
tornado
boutiques
pympler
boto3
packages = find:
include_package_data = True

Expand Down Expand Up @@ -84,6 +85,7 @@ dev =
%(test)s
black==21.4b2
pre-commit
boto3
dask =
%(test)s
dask
Expand Down