Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
JaeAeich committed Aug 2, 2024
1 parent 84dfc46 commit 2429b0f
Show file tree
Hide file tree
Showing 9 changed files with 427 additions and 248 deletions.
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,32 @@ After the last executor, the `filer` is called once more to process the outputs
and push them to remote locations from the PVC. The PVC is the scrubbed, deleted
and the taskmaster ends, completing the task.

┌─────────────────────────────────────────────────────────┐
│ Kubernetes │
│ │
│ ┌────────────────────────────┐ ┌───────────────────┐ │
│ │ Secret: ftp-secret │ │ ConfigMap/PVC │ │
│ │ - username │ │ - JSON_INPUT.gz │ │
│ │ - password │ │ │ │
│ └──────────▲─────────────────┘ └───────▲───────────┘ │
│ │ | │
│ │ | │
│ │ | │
│ ┌─────────┴────────────────────────────┴────────────┐ │
│ │ Job: taskmaster │ │
│ │ ┌───────────────────────────────────────────────┐ │ │
│ │ │ Pod: taskmaster │ │ │
│ │ │ - Container: taskmaster │ │ │
│ │ │ - Env: TESK_FTP_USERNAME │ │ │
│ │ │ - Env: TESK_FTP_PASSWORD │ │ │
│ │ │ - Args: -f /jsoninput/JSON_INPUT.gz │ │ │
│ │ │ - Mounts: /podinfo │ │ │
│ │ │ /jsoninput │ │ │
│ │ └───────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘

## Requirements

- A working [Kubernetes](https://kubernetes.io/) cluster version 1.9 and later.
Expand Down
2 changes: 1 addition & 1 deletion deployment/charts/tesk/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ host_name: ""
#

# 'openstack' or 's3'
storage: none
storage: s3

# Configurable storage class.
storageClass:
Expand Down
369 changes: 184 additions & 185 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions tesk/api/ga4gh/tes/controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from tesk.api.ga4gh.tes.models import TesTask
from tesk.api.ga4gh.tes.service_info.service_info import ServiceInfo
from tesk.api.ga4gh.tes.task.create_task import CreateTesTask
from tesk.api.kubernetes.converter import TesKubernetesConverter
from tesk.api.kubernetes.template import KubernetesTemplateSupplier
from tesk.exceptions import BadRequest, InternalServerError
Expand Down Expand Up @@ -42,6 +43,7 @@ def CreateTask(**kwargs) -> dict: # type: ignore
if request_body is None:
logger("Nothing recieved in request body.")
raise BadRequest("No request body recieved.")
CreateTesTask(TesTask(**request_body)).create_task()
except Exception as e:
raise InternalServerError from e

Expand Down
6 changes: 4 additions & 2 deletions tesk/api/ga4gh/tes/task/create_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def __init__(self, task: TesTask, namespace=TeskConstants.tesk_namespace):
Args:
task: TES task to create.
user: User who creates the task.
namespace: Kubernetes namespace where the task is created.
"""
self.task = task
Expand All @@ -45,10 +44,13 @@ def create_task(self):

task_master_job = (
self.tes_kubernetes_converter.from_tes_task_to_k8s_job(
self.task, self.user
self.task,
# self.user
)
)

print(task_master_job)

# TODO: Create ConfigMap
# TODO: Create Job
# TODO Return created job
Expand Down
159 changes: 106 additions & 53 deletions tesk/api/kubernetes/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
)
from kubernetes.client.models import V1Job

from tesk.api.ga4gh.tes.models import TesTask
from tesk.api.ga4gh.tes.models import TesExecutor, TesResources, TesTask
from tesk.api.kubernetes.constants import Constants, K8sConstants
from tesk.api.kubernetes.template import KubernetesTemplateSupplier
from tesk.constants import TeskConstants
Expand All @@ -26,7 +26,6 @@
class TesKubernetesConverter:
def __init__(self, namespace=TeskConstants.tesk_namespace):
"""Initialize the converter."""
self.taskmaster_template: V1Job = get_taskmaster_template()
self.taskmaster_env_properties: TaskmasterEnvProperties = (
get_taskmaster_env_property()
)
Expand Down Expand Up @@ -82,54 +81,108 @@ def from_tes_task_to_k8s_job(self, task: TesTask):

return taskmsater_job

# def from_tes_task_to_k8s_config_map(self, task, user, job):
# task_master_config_map = V1ConfigMap(
# metadata=V1ObjectMeta(name=job.metadata.name)
# )
# task_master_config_map.metadata.annotations[ANN_TESTASK_NAME_KEY] = task["name"]
# task_master_config_map.metadata.labels[LABEL_USERID_KEY] = user["username"]

# if "tags" in task and "GROUP_NAME" in task["tags"]:
# task_master_config_map.metadata.labels[LABEL_GROUPNAME_KEY] = task["tags"][
# "GROUP_NAME"
# ]
# elif user["is_member"]:
# task_master_config_map.metadata.labels[LABEL_GROUPNAME_KEY] = user[
# "any_group"
# ]

# executors_as_jobs = [
# self.from_tes_executor_to_k8s_job(
# task_master_config_map.metadata.name,
# task["name"],
# executor,
# idx,
# task["resources"],
# user,
# )
# for idx, executor in enumerate(task["executors"])
# ]

# task_master_input = {
# "inputs": task.get("inputs", []),
# "outputs": task.get("outputs", []),
# "volumes": task.get("volumes", []),
# "resources": {"disk_gb": task["resources"].get("disk_gb", 10.0)},
# }
# task_master_input[TASKMASTER_INPUT_EXEC_KEY] = executors_as_jobs

# task_master_input_as_json = json.dumps(task_master_input)
# try:
# with BytesIO() as obj:
# with gzip.GzipFile(fileobj=obj, mode="wb") as gzip_file:
# gzip_file.write(task_master_input_as_json.encode("utf-8"))
# task_master_config_map.binary_data = {
# f"{TASKMASTER_INPUT}.gz": obj.getvalue()
# }
# except Exception as e:
# logger.info(
# f"Compression of task {task_master_config_map.metadata.name} JSON configmap failed",
# e,
# )

# return task_master_config_map
def from_tes_task_to_k8s_config_map(
self,
task: TesTask,
# user,
job,
):
task_master_config_map = V1ConfigMap(
metadata=V1ObjectMeta(name=job.metadata.name)
)
task_master_config_map.metadata.annotations[
self.constants.ann_testask_name_key
] = task["name"]
# task_master_config_map.metadata.labels[self.constants.label_userid_key] = user["username"]

if "tags" in task and "GROUP_NAME" in task["tags"]:
task_master_config_map.metadata.labels[
self.constants.label_groupname_key
] = task["tags"]["GROUP_NAME"]
# elif user["is_member"]:
# task_master_config_map.metadata.labels[self.constants.label_groupname_key] = user[
# "any_group"
# ]

executors_as_jobs = [
self.from_tes_executor_to_k8s_job(
task_master_config_map.metadata.name,
task["name"],
executor,
idx,
task["resources"],
# user,
)
for idx, executor in enumerate(task["executors"])
]

task_master_input = {
"inputs": task.inputs or [],
"outputs": task.outputs or [],
"volumes": task.volumes or [],
"resources": {"disk_gb": task.resources.disk_gb or 10.0},
}
task_master_input[self.constants.taskmaster_input_exec_key] = executors_as_jobs

task_master_input_as_json = json.dumps(task_master_input)
try:
with BytesIO() as obj:
with gzip.GzipFile(fileobj=obj, mode="wb") as gzip_file:
gzip_file.write(task_master_input_as_json.encode("utf-8"))
task_master_config_map.binary_data = {
f"{self.constants.taskmaster_input}.gz": obj.getvalue()
}
except Exception as e:
logger.info(
f"Compression of task {task_master_config_map.metadata.name} JSON configmap failed",
e,
)

return task_master_config_map


def from_tes_executor_to_k8s_job(
self,
generated_task_id: str,
tes_task_name: str,
executor: TesExecutor,
executor_index: int,
resources: TesResources,
# user: User
) -> V1Job:
# Get new template executor Job object
job = self.executor_template_supplier()

# Set executors name based on taskmaster's job name
Job(job).change_job_name(Task(generated_task_id).get_executor_name(executor_index))

# Put arbitrary labels and annotations
job.metadata.labels = job.metadata.labels or {}
job.metadata.labels["taskId"] = generated_task_id
job.metadata.labels["execNo"] = str(executor_index)
job.metadata.labels["userId"] = user.username

job.metadata.annotations = job.metadata.annotations or {}
job.metadata.annotations["tesTaskName"] = tes_task_name

container = job.spec.template.spec.containers[0]

# Convert potential TRS URI into docker image
container.image = self.trs_client.get_docker_image_for_tool_version_uri(executor.image)

# Map executor's command to job container's command
for command in ExecutorCommandWrapper(executor).get_commands_with_stream_redirects():
container.add_command_item(command)

if executor.env:
container.env = [V1EnvVar(name=key, value=value) for key, value in executor.env.items()]

container.working_dir = executor.workdir

if resources.cpu_cores:
container.resources.requests['cpu'] = QuantityFormatter().parse(str(resources.cpu_cores))

if resources.ram_gb:
container.resources.requests['memory'] = QuantityFormatter().parse(f"{resources.ram_gb:.6f}Gi")

return job
57 changes: 56 additions & 1 deletion tesk/api/kubernetes/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,16 @@
import uuid

from kubernetes.client import (
V1Container,
V1EnvVar,
V1JobSpec,
V1ObjectMeta,
V1PodSpec,
V1PodTemplateSpec,
V1ResourceRequirements,
V1SecretVolumeSource,
V1Volume,
V1VolumeMount,
)
from kubernetes.client.models import V1Job

Expand All @@ -19,7 +28,7 @@
class KubernetesTemplateSupplier:
"""Templates for tasmaster's and executor's job object.."""

def __init__(self, namespace=TeskConstants.tesk_namespace):
def __init__(self, namespace=TeskConstants.tesk_namespace, security_context = None):
"""Initialize the converter."""
self.taskmaster_template: V1Job = get_taskmaster_template()
self.taskmaster_env_properties: TaskmasterEnvProperties = (
Expand All @@ -28,6 +37,7 @@ def __init__(self, namespace=TeskConstants.tesk_namespace):
self.constants = Constants()
self.k8s_constants = K8sConstants()
self.namespace = namespace
self.security_context= security_context

def get_task_master_name(self) -> str:
"""Generate a unique name for the taskmaster job."""
Expand Down Expand Up @@ -89,3 +99,48 @@ def task_master_template(self) -> V1Job:
)

return job

def executor_template(self):
container = V1Container(resources=V1ResourceRequirements())

if self.taskmaster_env_properties.executorSecret is not None:
container.volume_mounts = [
V1VolumeMount(
read_only=True,
name=self.taskmaster_env_properties.executorSecret.name,
mount_path=self.taskmaster_env_properties.executorSecret.mountPath,
)
]

pod_spec = V1PodSpec(
containers=[container],
restart_policy=self.k8s_constants.job_restart_policy,
)

if self.security_context:
pod_spec.security_context = self.security_context

job = V1Job(
api_version=self.k8s_constants.k8s_batch_api_version,
kind=self.k8s_constants.k8s_batch_api_job_type,
metadata=V1ObjectMeta(
labels={
self.constants.label_jobtype_key: self.constants.label_jobtype_value_exec
}
),
spec=V1JobSpec(
template=V1PodTemplateSpec(metadata=V1ObjectMeta(), spec=pod_spec)
),
)

if self.taskmaster_env_properties.executorSecret is not None:
job.spec.template.spec.volumes = [
V1Volume(
name=self.taskmaster_env_properties.executorSecret.name,
secret=V1SecretVolumeSource(
secret_name=self.taskmaster_env_properties.executorSecret.name
),
)
]

return job
25 changes: 19 additions & 6 deletions tesk/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,27 @@


class TeskConstants:
"""Tesk scoped constants."""
"""Tesk's K8s scoped constants."""

filer_image_name: str = os.getenv(
"FILER_IMAGE_NAME", "docker.io/elixircloud/tesk-core-filer"
"TESK_API_TASKMASTER_FILER_IMAGE_NAME", "docker.io/elixircloud/tesk-core-filer"
)
filer_image_version: str = os.getenv(
"TESK_API_TASKMASTER_FILER_IMAGE_VERSION", "latest"
)
filer_image_version: str = os.getenv("FILER_IMAGE_VERSION", "latest")
taskmaster_image_name: str = os.getenv(
"TASKMASTER_IMAGE_NAME", "docker.io/elixircloud/tesk-core-taskmaster"
"TESK_API_TASKMASTER_IMAGE_NAME", "docker.io/elixircloud/tesk-core-taskmaster"
)
taskmaster_image_version: str = os.getenv(
"TESK_API_TASKMASTER_IMAGE_VERSION", "latest"
)
tesk_namespace: str = os.getenv("TESK_API_K8S_NAMESPACE", "tesk")
taskmaster_service_account_name: str = os.getenv(
"TESK_API_TASKMASTER_SERVICE_ACCOUNT_NAME", "taskmaster"
)
taskmaster_environement_executor_backoff_limit: str = os.getenv(
"ENVIRONMENT_EXECUTOR_BACKOFF_LIMIT", "2"
)
filer_environment_filer_backoff_limit: str = os.getenv(
"TESK_API_TASKMASTER_ENVIRONMENT_FILER_BACKOFF_LIMIT", "2"
)
taskmaster_image_version: str = os.getenv("TASKMASTER_IMAGE_VERSION", "latest")
tesk_namespace: str = os.getenv("TESK_NAMESPACE", "tesk")
Loading

0 comments on commit 2429b0f

Please sign in to comment.