Skip to content

Commit

Permalink
installation: use kubernetes official client
Browse files Browse the repository at this point in the history
* Upgrades code to support Kubernetes 1.9.4 (1.10 compatible already).

* Uses restartpolicy never. This will cause that for each retry a new pod
  will be started instead of trying with the same all the time. Even though
  it seems to be fixed kubernetes/kubernetes#54870 (comment)
  in Kubernetes 1.9.4 backoffLimit is ignored with OnFailure policy.
  • Loading branch information
Diego Rodriguez committed Jun 6, 2018
1 parent 067050b commit 8a3619f
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 167 deletions.
15 changes: 4 additions & 11 deletions reana_job_controller/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from reana_commons.database import Session

from reana_job_controller.k8s import (create_api_client, instantiate_job,
watch_jobs, watch_pods)
watch_jobs)
from reana_job_controller.schemas import Job, JobRequest
from reana_job_controller.spec import build_openapi_spec

Expand Down Expand Up @@ -238,8 +238,8 @@ def create_job(): # noqa
job['status'] = 'started'
job['restart_count'] = 0
job['max_restart_count'] = 3
job['obj'] = job_obj
job['deleted'] = False
job['obj'] = job_obj
JOB_DB[str(job['job_id'])] = job

job_db_entry = JobTable(
Expand Down Expand Up @@ -379,19 +379,12 @@ def get_openapi_spec():

with app.app_context():
app.config['OPENAPI_SPEC'] = build_openapi_spec()
app.config['PYKUBE_CLIENT'] = create_api_client(
app.config['PYKUBE_API'])
app.config['KUBERNETES_CLIENT'] = create_api_client()

job_event_reader_thread = threading.Thread(target=watch_jobs,
args=(JOB_DB,
app.config['PYKUBE_API']))
args=(JOB_DB,))

job_event_reader_thread.start()
pod_event_reader_thread = threading.Thread(target=watch_pods,
args=(JOB_DB,
app.config['PYKUBE_API']))

pod_event_reader_thread.start()

app.run(debug=True, port=5000,
host='0.0.0.0')
5 changes: 2 additions & 3 deletions reana_job_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,5 @@

"""Flask application configuration."""

import pykube

PYKUBE_API = pykube.KubeConfig.from_service_account()
MAX_JOB_RESTARTS = 3
"""Number of retries for a job before considering it as failed."""
242 changes: 91 additions & 151 deletions reana_job_controller/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,36 @@

"""Kubernetes wrapper."""

import json
import logging
import os
import time
import traceback

import pykube
from reana_commons.models import Job
from reana_commons.database import Session
from flask import current_app as app
from kubernetes import client, watch, config as k8s_config
from kubernetes.client.models.v1_delete_options import V1DeleteOptions
from reana_commons.database import Session
from reana_commons.models import Job
from reana_job_controller import config, volume_templates

from reana_job_controller import volume_templates

def create_api_client(api='BatchV1'):
"""Create pykube HTTPClient using config.
def create_api_client(config):
"""Create pykube HTTPClient using config."""
api_client = pykube.HTTPClient(config)
api_client.session.verify = False
:param api: String which represents which Kubernetes API to spawn. By
default BatchV1.
:returns: Kubernetes python client object for a specific API i.e. BatchV1.
"""
k8s_config.load_incluster_config()
api_configuration = client.Configuration()
api_configuration.verify_ssl = False
if api == 'CoreV1':
api_client = client.CoreV1Api()
else:
api_client = client.BatchV1Api()
return api_client


Expand Down Expand Up @@ -85,6 +99,7 @@ def instantiate_job(job_id, docker_img, cmd, cvmfs_repos, env_vars, namespace,
'namespace': namespace
},
'spec': {
'backoffLimit': app.config['MAX_JOB_RESTARTS'],
'autoSelector': True,
'template': {
'metadata': {
Expand All @@ -100,7 +115,7 @@ def instantiate_job(job_id, docker_img, cmd, cvmfs_repos, env_vars, namespace,
},
],
'volumes': [],
'restartPolicy': 'OnFailure'
'restartPolicy': 'Never'
}
}
}
Expand Down Expand Up @@ -134,156 +149,81 @@ def instantiate_job(job_id, docker_img, cmd, cvmfs_repos, env_vars, namespace,

# add better handling
try:
job_obj = pykube.Job(app.config['PYKUBE_CLIENT'], job)
job_obj.create()
return job_obj
except pykube.exceptions.HTTPError:
return None


def watch_jobs(job_db, config):
api_response = \
app.config['KUBERNETES_CLIENT'].create_namespaced_job(
namespace=namespace, body=job)
return api_response.to_str()
except client.rest.ApiException as e:
logging.debug("Error while connecting to Kubernetes API: {}".format(e))
except Exception as e:
logging.error(traceback.format_exc())
logging.debug("Unexpected error: {}".format(e))


def watch_jobs(job_db):
"""Open stream connection to k8s apiserver to watch all jobs status.
:param job_db: Dictionary which contains all current jobs.
:param config: configuration to connect to k8s apiserver.
"""
api_client = create_api_client(config)
batchv1_api_client = create_api_client()
corev1_api_client = create_api_client('CoreV1')
while True:
logging.debug('Starting a new stream request to watch Jobs')
stream = pykube.Job.objects(
api_client).filter(namespace=pykube.all).watch()
for event in stream:
logging.info('New Job event received')
job = event.object
unended_jobs = [j for j in job_db.keys()
if not job_db[j]['deleted']]

if job.name in unended_jobs and event.type == 'DELETED':
while not job_db[job.name].get('pod'):
time.sleep(5)
logging.warn(
'Job {} Pod still not known'.format(job.name)
)
pod = job_db[job.name].get('pod')
while job.exists():
logging.warn(
'Waiting for Job {} to be cleaned'.format(
job.name
)
)
time.sleep(5)
logging.info(
'Getting {} logs'.format(pod.name)
)
job_db[job.name]['log'] = pod.logs()
logging.info(
'Deleting {}\'s pod -> {}'.format(
job.name, job_db[job.name]['pod'].name
)
)
job_db[job.name]['pod'].delete()
job_db[job.name]['deleted'] = True
elif (job.name in unended_jobs and
job.obj['status'].get('succeeded')):
logging.info(
'Job {} successfuly ended. Cleaning...'.format(job.name)
)
job_db[job.name]['status'] = 'succeeded'
job.delete()

# with the current k8s implementation this is never
# going to happen...
elif job.name in unended_jobs and job.obj['status'].get('failed'):
logging.info('Job {} failed. Cleaning...'.format(job.name))
job_db[job['metadata']['name']]['status'] = 'failed'
job.delete()


def watch_pods(job_db, config):
"""Open stream connection to k8s apiserver to watch all pods status.
:param job_db: Dictionary which contains all current jobs.
:param config: configuration to connect to k8s apiserver.
"""
api_client = create_api_client(config)
while True:
logging.info('Starting a new stream request to watch Pods')
stream = pykube.Pod.objects(
api_client).filter(namespace=pykube.all).watch()
for event in stream:
logging.info('New Pod event received')
pod = event.object
unended_jobs = [j for j in job_db.keys()
if not job_db[j]['deleted'] and
job_db[j]['status'] != 'failed']
# FIXME: watch out here, if they change the naming convention at
# some point the following line won't work. Get job name from API.
job_name = '-'.join(pod.name.split('-')[:-1])
# Store existing job pod if not done yet
if job_name in job_db:
if job_db[job_name].get('pod'):
logging.info('checking the pod logs')
try:
job_db[job_name]['log'] = pod.logs()
logging.info('Storing job logs: {}'.
format(job_db[job_name]['log']))
Session.query(Job).filter_by(id_=job_name).\
update(dict(logs=job_db[job_name]['log']))
Session.commit()

except Exception as e:
logging.debug('Could not retrieve'
' logs for object: {}'.
format(pod))
logging.debug('Exception: {}'.format(str(e)))
else:
# Store job's pod
logging.info(
'Storing {} as Job {} Pod'.format(pod.name, job_name)
)
job_db[job_name]['pod'] = pod

# Take note of the related Pod
if job_name in unended_jobs:
try:
restarts = (pod.obj['status']['containerStatuses'][0]
['restartCount'])
exit_code = (pod.obj['status']
['containerStatuses'][0]
['state'].get('terminated', {})
.get('exitCode'))
logging.info(
pod.obj['status']['containerStatuses'][0]['state'].
get('terminated', {})
)

try:
w = watch.Watch()
for event in w.stream(
batchv1_api_client.list_job_for_all_namespaces,
_request_timeout=60):
logging.info('New Job event received: {0}'.format(event['type']))
job = event['object']

# Taking note of the remaining jobs since deletion might not
# happend straight away.
remaining_jobs = [j for j in job_db.keys()
if not job_db[j]['deleted']]
if (not job_db.get(job.metadata.name) or
not job.metadata.name in remaining_jobs):
# Ignore jobs not created by this specific instance
# or already deleted jobs.
continue
elif job.status.succeeded:
logging.info(
'Updating Pod {} restarts to {}'.format(
pod.name, restarts
)
)

job_db[job_name]['restart_count'] = restarts

if restarts >= job_db[job_name]['max_restart_count'] and \
exit_code != 0:

logging.info(
'Job {} reached max restarts...'.format(job_name)
)

logging.info(
'Getting {} logs'.format(pod.name)
)
job_db[job_name]['log'] = pod.logs()
logging.info(
'Cleaning Job {}'.format(job_name)
)
job_db[job_name]['status'] = 'failed'
job_db[job_name]['obj'].delete()

except KeyError as e:
logging.debug('Skipping event because: {}'.format(e))
logging.debug(
'Event: {}\nObject:\n{}'.format(event.type, pod.obj)
'Job {} succeeded.'.format(
job.metadata.name)
)
job_db[job.metadata.name]['status'] = 'succeeded'
elif (job.status.failed and
job.status.failed >= config.MAX_JOB_RESTARTS):
logging.info('Job {} failed.'.format(
job.metadata.name))
job_db[job.metadata.name]['status'] = 'failed'
else:
continue
# Grab logs when job either succeeds or fails.
logging.info('Getting last spawned pod for job {}'.format(
job.metadata.name))
last_spawned_pod = corev1_api_client.list_namespaced_pod(
job.metadata.namespace,
label_selector='job-name={job_name}'.format(
job_name=job.metadata.name)).items[-1]
logging.info('Grabbing pod {} logs...'.format(
last_spawned_pod.metadata.name))
job_db[job.metadata.name]['log'] = \
corev1_api_client.read_namespaced_pod_log(
namespace=last_spawned_pod.metadata.namespace,
name=last_spawned_pod.metadata.name)
logging.info('Cleaning job {} ...'.format(
job.metadata.name))
# Delete all depending pods.
delete_options = V1DeleteOptions(
propagation_policy='Foreground')
batchv1_api_client.delete_namespaced_job(
job.metadata.name, job.metadata.namespace, delete_options)
job_db[job.metadata.name]['deleted'] = True
except client.rest.ApiException as e:
logging.debug(
"Error while connecting to Kubernetes API: {}".format(e))
except Exception as e:
logging.error(traceback.format_exc())
logging.debug("Unexpected error: {}".format(e))
4 changes: 4 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
wdb
ipdb
Flask-DebugToolbar
git+git://github.com/reanahub/reana-commons.git#egg=reana-commons
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@
]

install_requires = [
'Flask>=0.11',
'pykube>=0.14.0',
'apispec>=0.21.0',
'Flask>=0.11',
'kubernetes>=6.0.0',
'marshmallow>=2.13',
'pika==0.11.2',
'reana-commons>=0.1.0',
]

Expand Down

0 comments on commit 8a3619f

Please sign in to comment.