Skip to content

Commit

Permalink
create task
Browse files Browse the repository at this point in the history
  • Loading branch information
JaeAeich committed Aug 3, 2024
1 parent 2429b0f commit cebb7b2
Show file tree
Hide file tree
Showing 15 changed files with 899 additions and 677 deletions.
1,033 changes: 500 additions & 533 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ version = "0.1.0"
[tool.poetry.dependencies]
boto3 = "1.34.104"
foca = "^0.13.0"
kubernetes = "^30.1.0"
kubernetes = "^29.0.0"
python = "^3.11"
requests = ">=2.20.0"
urllib3 = "^2.2.2"
Expand Down
9 changes: 6 additions & 3 deletions tesk/api/ga4gh/tes/controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from tesk.api.ga4gh.tes.models import TesTask
from tesk.api.ga4gh.tes.service_info.service_info import ServiceInfo
from tesk.api.ga4gh.tes.task.create_task import CreateTesTask
from tesk.api.kubernetes.converter import TesKubernetesConverter
from tesk.api.kubernetes.template import KubernetesTemplateSupplier
from tesk.api.kubernetes.convert.converter import TesKubernetesConverter
from tesk.api.kubernetes.convert.template import KubernetesTemplateSupplier
from tesk.exceptions import BadRequest, InternalServerError
from tesk.utils import get_custom_config

Expand Down Expand Up @@ -43,7 +43,10 @@ def CreateTask(**kwargs) -> dict: # type: ignore
if request_body is None:
logger("Nothing recieved in request body.")
raise BadRequest("No request body recieved.")
CreateTesTask(TesTask(**request_body)).create_task()
tes_task = TesTask(**request_body)
namespace = "tesk"
task_creater = CreateTesTask(tes_task, namespace)
task_creater.response()
except Exception as e:
raise InternalServerError from e

Expand Down
34 changes: 17 additions & 17 deletions tesk/api/ga4gh/tes/task/create_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from tesk.api.ga4gh.tes.models import TesTask
from tesk.api.kubernetes.client_wrapper import KubernetesClientWrapper
from tesk.api.kubernetes.constants import Constants
from tesk.api.kubernetes.converter import TesKubernetesConverter
from tesk.exceptions import KubernetesError
from tesk.api.kubernetes.convert.converter import TesKubernetesConverter
from tesk.constants import TeskConstants
from tesk.exceptions import KubernetesError

logger = logging.getLogger(__name__)

Expand All @@ -33,8 +33,9 @@ def __init__(self, task: TesTask, namespace=TeskConstants.tesk_namespace):
def create_task(self):
"""Create TES task."""
attempts_no = 0
while attempts_no > self.constants.job_create_attempts_no:
while attempts_no < self.constants.job_create_attempts_no:
try:
attempts_no += 1
resources = self.task.resources

if resources and resources.ram_gb:
Expand All @@ -49,28 +50,27 @@ def create_task(self):
)
)

task_master_config_map = (
self.tes_kubernetes_converter.from_tes_task_to_k8s_config_map(
self.task,
task_master_job,
# user
)
)
_ = self.kubernetes_client_wrapper.create_config_map(
task_master_config_map
)
created_job = self.kubernetes_client_wrapper.create_job(task_master_job)
print(task_master_config_map)
print(task_master_job)

# TODO: Create ConfigMap
# TODO: Create Job
# TODO Return created job
# task_master_config_map = converter.from_tes_task_to_k8s_config_map(
# task, user, task_master_job
# )
# created_config_map = kubernetes_client_wrapper.create_config_map(
# task_master_config_map
# )
# created_job = kubernetes_client_wrapper.create_job(task_master_job)
# return converter.from_k8s_job_to_tes_create_task_response(created_job)
return created_job.metadata.name

except KubernetesError as e:
# Handle Kubernetes specific exceptions
if (
not e.is_object_name_duplicated()
or attempts_no >= self.constants.job_create_attempts_no
):
raise e
attempts_no += 1

except Exception as exc:
logging.error("ERROR: In createTask", exc_info=True)
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -1,40 +1,57 @@
"""Module for converting TES tasks to Kubernetes jobs."""

import base64
import gzip
import json
import logging
from decimal import Decimal
from io import BytesIO
from typing import Any

from kubernetes.client import (
V1ConfigMap,
V1ConfigMapVolumeSource,
V1Container,
V1EnvVar,
V1ObjectMeta,
V1ResourceRequirements,
V1Volume,
)
from kubernetes.client.models import V1Job
from kubernetes.utils.quantity import parse_quantity # type: ignore

from tesk.api.ga4gh.tes.models import TesExecutor, TesResources, TesTask
from tesk.api.kubernetes.constants import Constants, K8sConstants
from tesk.api.kubernetes.template import KubernetesTemplateSupplier
from tesk.api.kubernetes.convert.data.job import Job
from tesk.api.kubernetes.convert.data.task import Task
from tesk.api.kubernetes.convert.executor_command_wrapper import ExecutorCommandWrapper
from tesk.api.kubernetes.convert.template import KubernetesTemplateSupplier
from tesk.constants import TeskConstants
from tesk.custom_config import TaskmasterEnvProperties
from tesk.utils import get_taskmaster_env_property, get_taskmaster_template
from tesk.utils import get_taskmaster_env_property, pydantic_model_list_json

logger = logging.getLogger(__name__)


class TesKubernetesConverter:
"""Convert TES requests to Kubernetes resources."""

def __init__(self, namespace=TeskConstants.tesk_namespace):
"""Initialize the converter."""
self.taskmaster_env_properties: TaskmasterEnvProperties = (
get_taskmaster_env_property()
)
self.template_supplier = KubernetesTemplateSupplier(
namespace=namespace
# security_context=security_context
)
self.constants = Constants()
self.k8s_constants = K8sConstants()
self.namespace = namespace

# TODO: Add user to the mmethod when auth implemented in FOCA
# TODO: Add user to the method when auth implemented in FOCA
def from_tes_task_to_k8s_job(self, task: TesTask):
"""Convert TES task to Kubernetes job."""
taskmsater_job: V1Job = KubernetesTemplateSupplier(
self.namespace
).task_master_template()
Expand All @@ -48,6 +65,8 @@ def from_tes_task_to_k8s_job(self, task: TesTask):
if taskmsater_job.metadata.labels is None:
taskmsater_job.metadata.labels = {}

# taskmsater_job.metadata.name = task.name

taskmsater_job.metadata.annotations[self.constants.ann_testask_name_key] = (
task.name
)
Expand Down Expand Up @@ -84,21 +103,30 @@ def from_tes_task_to_k8s_job(self, task: TesTask):
def from_tes_task_to_k8s_config_map(
self,
task: TesTask,
job: V1Job,
# user,
job,
):
) -> V1ConfigMap:
"""Create a Kubernetes ConfigMap from a TES task."""
task_master_config_map = V1ConfigMap(
metadata=V1ObjectMeta(name=job.metadata.name)
)

task_master_config_map.metadata.labels = (
task_master_config_map.metadata.labels or {}
)
task_master_config_map.metadata.annotations = (
task_master_config_map.metadata.annotations or {}
)

task_master_config_map.metadata.annotations[
self.constants.ann_testask_name_key
] = task["name"]
] = task.name
# task_master_config_map.metadata.labels[self.constants.label_userid_key] = user["username"]

if "tags" in task and "GROUP_NAME" in task["tags"]:
if "tags" in task and "GROUP_NAME" in task.tags:
task_master_config_map.metadata.labels[
self.constants.label_groupname_key
] = task["tags"]["GROUP_NAME"]
] = task.tags["GROUP_NAME"]
# elif user["is_member"]:
# task_master_config_map.metadata.labels[self.constants.label_groupname_key] = user[
# "any_group"
Expand All @@ -107,30 +135,42 @@ def from_tes_task_to_k8s_config_map(
executors_as_jobs = [
self.from_tes_executor_to_k8s_job(
task_master_config_map.metadata.name,
task["name"],
task.name,
executor,
idx,
task["resources"],
task.resources,
# user,
)
for idx, executor in enumerate(task["executors"])
for idx, executor in enumerate(task.executors)
]

task_master_input = {
"inputs": task.inputs or [],
"outputs": task.outputs or [],
task_master_input: dict[str, Any] = {
"inputs": pydantic_model_list_json(task.inputs) or [],
"outputs": pydantic_model_list_json(task.outputs) or [],
"volumes": task.volumes or [],
"resources": {"disk_gb": task.resources.disk_gb or 10.0},
"resources": {"disk_gb": float(task.resources.disk_gb) or 10.0},
}
task_master_input[self.constants.taskmaster_input_exec_key] = executors_as_jobs
task_master_input[self.constants.taskmaster_input_exec_key] = [
exec_job.to_dict() for exec_job in executors_as_jobs
]

def decimal_to_float(obj):
if isinstance(obj, Decimal):
return float(obj)
raise TypeError

task_master_input_as_json = json.dumps(task_master_input)
taskmaster_input_as_json = json.loads(
json.dumps(task_master_input, default=decimal_to_float)
)
try:
with BytesIO() as obj:
with gzip.GzipFile(fileobj=obj, mode="wb") as gzip_file:
gzip_file.write(task_master_input_as_json.encode("utf-8"))
json_data = json.dumps(taskmaster_input_as_json)
gzip_file.write(json_data.encode("utf-8"))
task_master_config_map.binary_data = {
f"{self.constants.taskmaster_input}.gz": obj.getvalue()
f"{self.constants.taskmaster_input}.gz": base64.b64encode(
obj.getvalue()
).decode("utf-8")
}
except Exception as e:
logger.info(
Expand All @@ -140,7 +180,6 @@ def from_tes_task_to_k8s_config_map(

return task_master_config_map


def from_tes_executor_to_k8s_job(
self,
generated_task_id: str,
Expand All @@ -151,38 +190,58 @@ def from_tes_executor_to_k8s_job(
# user: User
) -> V1Job:
# Get new template executor Job object
job = self.executor_template_supplier()
job: V1Job = self.template_supplier.executor_template()

# Set executors name based on taskmaster's job name
Job(job).change_job_name(Task(generated_task_id).get_executor_name(executor_index))

Job(job).change_job_name(
# Task(job, generated_task_id).get_executor_name(executor_index)
"newname"
)

# Put arbitrary labels and annotations
job.metadata.labels = job.metadata.labels or {}
job.metadata.labels["taskId"] = generated_task_id
job.metadata.labels["execNo"] = str(executor_index)
job.metadata.labels["userId"] = user.username
job.metadata.labels[self.constants.label_testask_id_key] = generated_task_id
job.metadata.labels[self.constants.label_execno_key] = str(executor_index)
# job.metadata.labels[self.constants.label_userid_key] = user.username

job.metadata.annotations = job.metadata.annotations or {}
job.metadata.annotations["tesTaskName"] = tes_task_name

container = job.spec.template.spec.containers[0]

job.metadata.annotations[self.constants.ann_testask_name_key] = tes_task_name

container: V1Container = job.spec.template.spec.containers[0]

# TODO: Not sure what to do with this
# Convert potential TRS URI into docker image
container.image = self.trs_client.get_docker_image_for_tool_version_uri(executor.image)

# container.image = self.trs_client.get_docker_image_for_tool_version_uri(
# executor.image
# )

if not container.command:
container.command = []
# Map executor's command to job container's command
for command in ExecutorCommandWrapper(executor).get_commands_with_stream_redirects():
container.add_command_item(command)

for command in ExecutorCommandWrapper(
executor
).get_commands_with_stream_redirects():
container.command.append(command)

if executor.env:
container.env = [V1EnvVar(name=key, value=value) for key, value in executor.env.items()]

container.env = [
V1EnvVar(name=key, value=value) for key, value in executor.env.items()
]
else:
container.env = []

container.working_dir = executor.workdir


container.resources = V1ResourceRequirements(requests={})

if resources.cpu_cores:
container.resources.requests['cpu'] = QuantityFormatter().parse(str(resources.cpu_cores))

container.resources.requests["cpu"] = parse_quantity(
str(resources.cpu_cores)
)

if resources.ram_gb:
container.resources.requests['memory'] = QuantityFormatter().parse(f"{resources.ram_gb:.6f}Gi")

container.resources.requests["memory"] = parse_quantity(
f"{resources.ram_gb:.6f}Gi"
)

return job
Empty file.
48 changes: 48 additions & 0 deletions tesk/api/kubernetes/convert/data/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""A container for a single Kubernetes job object (can be both a taskmaster and an executor) and its list of worker pods (Kubernetes Pod objects)."""

from typing import List, Optional

from kubernetes.client import V1Job, V1Pod


class Job:
"""A container for a single Kubernetes job object (can be both a taskmaster and an executor) and its list of worker pods (Kubernetes Pod objects)"""

def __init__(self, job: V1Job):
"""Initializes the Job with a Kubernetes job object."""
self.job: V1Job = job
self.pods: List[V1Pod] = []

def get_job(self) -> V1Job:
"""Returns the Kubernetes job object."""
return self.job

def add_pod(self, pod: V1Pod):
"""Adds a single pod to the list (without any duplication checks)."""
# TODO: This doesn't take care of duplication, use set on id or name
self.pods.append(pod)

def has_pods(self) -> bool:
"""Checks if the job has any pods."""
return bool(self.pods)

def get_first_pod(self) -> Optional[V1Pod]:
"""Returns arbitrarily chosen pod from the list (currently the first one added) or None if the job has no pods."""
if not self.has_pods():
return None
return self.pods[0]

def get_pods(self) -> List[V1Pod]:
"""Returns the list of job pods in the order of addition to the list or an empty list if no pods."""
return self.pods

def change_job_name(self, new_name: str):
"""Changes the job name, as well as the names in its metadata and container specs."""
self.job.metadata.name = new_name
self.job.spec.template.metadata.name = new_name
if self.job.spec.template.spec.containers:
self.job.spec.template.spec.containers[0].name = new_name

def get_job_name(self) -> Optional[str]:
"""Returns the job name."""
return self.job.metadata.name
Loading

0 comments on commit cebb7b2

Please sign in to comment.