diff --git a/pro_tes/ga4gh/tes/models.py b/pro_tes/ga4gh/tes/models.py index 4b9fd9d..96672fc 100644 --- a/pro_tes/ga4gh/tes/models.py +++ b/pro_tes/ga4gh/tes/models.py @@ -666,7 +666,7 @@ class TesEndpoint(CustomBaseModel): Args: host: Host at which the TES API is served that is processing this request; note that this should include the path information but - *not* the base path path defined in the TES API specification; + *not* the base path defined in the TES API specification; e.g., specify https://my.tes.com/api if the actual API is hosted at https://my.tes.com/api/ga4gh/tes/v1. base_path: Override the default path suffix defined in the TES API diff --git a/pro_tes/ga4gh/tes/task_runs.py b/pro_tes/ga4gh/tes/task_runs.py index bf30dad..8c220ee 100644 --- a/pro_tes/ga4gh/tes/task_runs.py +++ b/pro_tes/ga4gh/tes/task_runs.py @@ -453,7 +453,16 @@ def _set_projection(self, view: str) -> Dict: def _update_task_incoming( self, payload: dict, db_document: DbDocument, start_time: str, **kwargs ) -> DbDocument: - """Update the task incoming object.""" + """Update the task incoming object. + + Args: + payload: A dictionary containing the payload for the update. + db_document: The document in the database to be updated. + start_time: The starting time of the incoming tes request. + + Returns: + DbDocument: The updated database document. + """ logs = self._set_logs( payloads=deepcopy(payload), start_time=start_time ) @@ -467,7 +476,15 @@ def _update_task_incoming( return db_document def _set_logs(self, payloads: dict, start_time: str) -> Dict: - """Set up the logs for the incoming request.""" + """Set up the logs for the incoming request. + + Args: + payloads: A dictionary containing the payload for the update. + start_time: The starting time of the incoming tes request. + + Returns: + Dict: The updated logs. + """ if "logs" not in payloads.keys(): logs = [ { @@ -491,7 +508,16 @@ def _update_doc_in_db( tes_uri: str, remote_task_id: str, ) -> DbDocument: - """Update the document in the database.""" + """Update the document in the database. + + Args: + db_connector: The database connector. + tes_uri: The tes uri where the task if forwarded. + remote_task_id: Task identifier at the remote tes instance. + + Returns: + DbDocument: The updated database document. + """ time_now = datetime.now().strftime("%m-%d-%Y %H:%M:%S") tes_endpoint_dict = {"host": tes_uri, "base_path": ""} db_document = db_connector.upsert_fields_in_root_object( @@ -529,7 +555,16 @@ def _update_doc_in_db( def _update_task_metadata( self, db_document: DbDocument, tes_uri: str, remote_task_id: str ) -> DbDocument: - """Update the task metadata.""" + """Update the task metadata. + + Args: + db_document: The document in the database to be updated. + tes_uri: The tes uri where the task if forwarded. + remote_task_id: Task identifier at the remote tes instance. + + Returns: + DbDocument: The updated database document. + """ for logs in db_document.task_incoming.logs: tesNextTes_obj = TesNextTes(id=remote_task_id, url=tes_uri) if logs.metadata.forwarded_to is None: diff --git a/pro_tes/middleware/middleware.py b/pro_tes/middleware/middleware.py index c2e3f94..f38312f 100644 --- a/pro_tes/middleware/middleware.py +++ b/pro_tes/middleware/middleware.py @@ -14,7 +14,14 @@ class AbstractMiddleware(metaclass=abc.ABCMeta): @abc.abstractmethod def modify_request(self, request): - """Modify the request before it is sent to the TES instance.""" + """Abstract method to modify the incoming request. + + Args: + request: The incoming request object. + + Returns: + The modified request object. + """ class TaskDistributionMiddleware(AbstractMiddleware): @@ -22,6 +29,7 @@ class TaskDistributionMiddleware(AbstractMiddleware): Attributes: tes_uri: TES instance best suited for TES task. + input_uris: A list of input URIs in the incoming request. """ def __init__(self) -> None: @@ -36,7 +44,7 @@ def modify_request(self, request): request: Incoming request object. Returns: - Tuple of modified request object. + The modified request object. """ if "inputs" in request.json.keys(): for index in range(len(request.json["inputs"])): diff --git a/pro_tes/middleware/task_distribution/distance.py b/pro_tes/middleware/task_distribution/distance.py index 545e8f9..8f13ccd 100644 --- a/pro_tes/middleware/task_distribution/distance.py +++ b/pro_tes/middleware/task_distribution/distance.py @@ -26,14 +26,12 @@ def task_distribution(input_uri: List) -> List: """Task distributor. - Distributes task by selecting the TES instance having minimum - distance between the input files and TES Instance. - Args: input_uri: List of inputs of a TES task request Returns: - A list of ranked TES instance. + A list of ranked TES instances, ordered by the minimum distance + between the input files and each TES instance. """ foca_conf = current_app.config.foca tes_uri: List[str] = foca_conf.tes["service_list"] @@ -71,7 +69,21 @@ def get_uri_combination( Args: input_uri: List of input uris of TES request. tes_uri: List of TES instance. + Returns: + An AccessUriCombination object, consisting of the following keys: + + - "task_params": A dictionary with a single key "input_uri", + whose value is a list of the input URIs. + + - "tes_deployments": A list of dictionaries, one for each TES + instance,with the following keys: + + - "tes_uri": The URI of the TES instance. + - "stats": A dictionary with a single key "total_distance", + whose value is initially None. + + Examples: A AccessUriCombination object of the form like: { "task_params": { @@ -116,11 +128,21 @@ def ip_combination(input_uri: List[str], tes_uri: List[str]) -> Dict: """Create a pair of TES IP and Input IP. Args: - input_uri: List of input uris of TES request. - tes_uri: List of TES instance. + input_uri: A list of input URIs for a TES task request. + tes_uri: A list of TES instances to choose from. Returns: - A dictionary of combination of tes ip with all the input ips. + A dictionary where the keys are tuples representing the combination + of TES instance and input URI, and the values are tuples containing + the IP addresses of the TES instance and input URI. + + Example: + { + (0, 0): ('10.0.0.1', '192.168.0.1'), + (0, 1): ('10.0.0.1', '192.168.0.2'), + (1, 0): ('10.0.0.2', '192.168.0.1'), + (1, 1): ('10.0.0.2', '192.168.0.2') + } """ ips = {} @@ -149,17 +171,21 @@ def ip_distance( ) -> Dict[str, Dict]: """Compute ip distance between ip pairs. - :param *args: IP addresses of the form '8.8.8.8' without schema and - suffixes. + Args: + *args: IP addresses of the form '8.8.8.8' without schema and + suffixes. - :return: A dictionary with a key for each IP address, pointing to a - dictionary containing city, region and country information for the - IP address, as well as a key "distances" pointing to a dictionary - indicating the distances, in kilometers, between all pairs of IPs, - with the tuple of IPs as the keys. IPs that cannot be located are - skipped from the resulting dictionary. - :raises ValueError: No args were passed. + Returns: + A dictionary with a key for each IP address, pointing to a + dictionary containing city, region and country information for the + IP address, as well as a key "distances" pointing to a dictionary + indicating the distances, in kilometers, between all pairs of IPs, + with the tuple of IPs as the keys. IPs that cannot be located are + skipped from the resulting dictionary. + + Raises: + ValueError: No args were passed. """ if not args: raise ValueError("Expected at least one URI or IP address.") @@ -195,7 +221,8 @@ def ip_distance( def calculate_distance( - ips_unique: Dict[Set[str], List[Tuple[int, str]]], tes_uri: List[str] + ips_unique: Dict[Set[str], List[Tuple[int, str]]], + tes_uri: List[str], ) -> Dict[Set[str], float]: """Calculate distances between all IPs. @@ -204,7 +231,9 @@ def calculate_distance( tes_uri: List of TES instance. Returns: - A dictionary of distances between all ips. + A dictionary of distances between all IP addresses. + The keys are sets of IP addresses, and the values are the distances + between them as floats. """ distances_unique: Dict[Set[str], float] = {} ips_all = frozenset().union(*list(ips_unique.keys())) # type: ignore @@ -246,7 +275,7 @@ def calculate_distance( def rank_tes_instances( access_uri_combination: AccessUriCombination, ) -> List[str]: - """Rank the tes instance based on the total distance. + """Rank TES instances in increasing order of total distance. Args: access_uri_combination: Combination of task_params and tes_deployments. diff --git a/pro_tes/tasks/track_task_progress.py b/pro_tes/tasks/track_task_progress.py index bf4f387..6a8191e 100644 --- a/pro_tes/tasks/track_task_progress.py +++ b/pro_tes/tasks/track_task_progress.py @@ -50,9 +50,6 @@ def task__track_task_progress( # pylint: disable=too-many-arguments remote_task_id: task run identifier on remote tes service. user: User name for basic authentication. password: Password for basic authentication. - - Returns: - Task identifier. """ foca_config: Config = current_app.config.foca controller_config: Dict = foca_config.controllers["post_task"] diff --git a/pro_tes/utils/models.py b/pro_tes/utils/models.py index b03b58a..32803a4 100644 --- a/pro_tes/utils/models.py +++ b/pro_tes/utils/models.py @@ -20,7 +20,7 @@ class TaskModelConverter: - """Convert py-tes to proTES to proTES TES task model. + """Convert py-tes to proTES TES task model. Convert :class:`tes.models.Task` to :class:`pro_tes.ga4gh.tes.models.TesTask` @@ -37,7 +37,7 @@ def __init__(self, task: Task) -> None: self.task: Task = task def convert_task(self) -> TesTask: - """Convert py-tes to proTES TES task to proTES TES task. + """Convert py-tes to proTES TES task. Returns: Instance of :class:`pro_tes.ga4gh.tes.models.TesTask` @@ -66,7 +66,7 @@ def convert_task(self) -> TesTask: ) def convert_state(self) -> TesState: - """Convert py-tes to proTES TES task state to proTES TES task state. + """Convert py-tes to proTES TES task state. Returns: Instance of :class:`pro_tes.ga4gh.tes.models.TesState`