Skip to content

Commit

Permalink
Add HPA for paasta services(k8s deployments) (#2362)
Browse files Browse the repository at this point in the history
Add HPA for paasta services
  • Loading branch information
mzq592 authored Aug 29, 2019
1 parent 7ff3d04 commit e55bf81
Show file tree
Hide file tree
Showing 4 changed files with 456 additions and 69 deletions.
130 changes: 107 additions & 23 deletions paasta_tools/kubernetes/application/controller_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from paasta_tools.kubernetes_tools import KubernetesDeploymentConfig
from paasta_tools.kubernetes_tools import KubernetesDeploymentConfigDict
from paasta_tools.kubernetes_tools import load_kubernetes_service_config_no_cache
from paasta_tools.kubernetes_tools import max_unavailable
from paasta_tools.kubernetes_tools import pod_disruption_budget_for_service_instance
from paasta_tools.kubernetes_tools import update_deployment
from paasta_tools.kubernetes_tools import update_stateful_set
Expand All @@ -31,6 +30,14 @@ def __init__(
item: Union[V1Deployment, V1StatefulSet],
logging=logging.getLogger(__name__),
) -> None:
"""
This Application wrapper is an interface for creating/deleting k8s deployments and statefulsets
soa_config is KubernetesDeploymentConfig. It is not loaded in init because it is not always required.
:param item: Kubernetes Object(V1Deployment/V1StatefulSet) that has already been filled up.
:param logging: where logs go
"""
if not item.metadata.namespace:
item.metadata.namespace = "paasta"
self.kube_deployment = KubeDeployment(
service=item.metadata.labels["yelp.com/paasta_service"],
instance=item.metadata.labels["yelp.com/paasta_instance"],
Expand Down Expand Up @@ -103,7 +110,7 @@ def delete_pod_disruption_budget(self, kube_client: KubeClient) -> None:
)
)
else:
raise e
raise
else:
self.logging.info(
"deleted pod disruption budget/{} from namespace/{}".format(
Expand All @@ -117,10 +124,8 @@ def ensure_pod_disruption_budget(
pdr = pod_disruption_budget_for_service_instance(
service=self.kube_deployment.service,
instance=self.kube_deployment.instance,
min_instances=self.soa_config.get_desired_instances()
- max_unavailable(
instance_count=self.soa_config.get_desired_instances(),
bounce_margin_factor=self.soa_config.get_bounce_margin_factor(),
max_unavailable="{}%".format(
int((1 - self.soa_config.get_bounce_margin_factor()) * 100)
),
)
try:
Expand All @@ -134,21 +139,10 @@ def ensure_pod_disruption_budget(
raise

if existing_pdr:
if existing_pdr.spec.min_available != pdr.spec.min_available:
# poddisruptionbudget objects are not mutable like most things in the kubernetes api,
# so we have to do a delete/replace.
# unfortunately we can't really do this transactionally, but I guess we'll just hope for the best?
logging.debug(
f"existing poddisruptionbudget {pdr.metadata.name} is out of date; deleting"
)
kube_client.policy.delete_namespaced_pod_disruption_budget(
name=pdr.metadata.name,
namespace=pdr.metadata.namespace,
body=V1DeleteOptions(),
)
logging.debug(f"creating poddisruptionbudget {pdr.metadata.name}")
return create_pod_disruption_budget(
kube_client=kube_client, pod_disruption_budget=pdr
if existing_pdr.spec.max_unavailable != pdr.spec.max_unavailable:
logging.debug(f"Updating poddisruptionbudget {pdr.metadata.name}")
return kube_client.policy.patch_namespaced_pod_disruption_budget(
name=pdr.metadata.name, namespace=pdr.metadata.namespace, body=pdr
)
else:
logging.debug(f"poddisruptionbudget {pdr.metadata.name} up to date")
Expand Down Expand Up @@ -180,22 +174,112 @@ def deep_delete(self, kube_client: KubeClient) -> None:
)
)
else:
raise e
raise
else:
self.logging.info(
"deleted deploy/{} from namespace/{}".format(
self.item.metadata.name, self.item.metadata.namespace
)
)
self.delete_pod_disruption_budget(kube_client)
self.delete_horizontal_pod_autoscaler(kube_client)

def get_existing_app(self, kube_client: KubeClient):
return kube_client.deployments.read_namespaced_deployment(
name=self.item.metadata.name, namespace=self.item.metadata.namespace
)

def create(self, kube_client: KubeClient) -> None:
create_deployment(kube_client=kube_client, formatted_deployment=self.item)
self.ensure_pod_disruption_budget(kube_client)
self.sync_horizontal_pod_autoscaler(kube_client)

def update(self, kube_client: KubeClient) -> None:
# If autoscaling is enabled, do not update replicas.
# In all other cases, replica is set to max(instances, min_instances)
if not self.get_soa_config().get("instances"):
self.item.spec.replicas = self.get_existing_app(kube_client).spec.replicas
update_deployment(kube_client=kube_client, formatted_deployment=self.item)
self.ensure_pod_disruption_budget(kube_client)
self.sync_horizontal_pod_autoscaler(kube_client)

def sync_horizontal_pod_autoscaler(self, kube_client: KubeClient) -> None:
"""
In order for autoscaling to work, there needs to be at least two configurations
min_instnace, max_instance, and there cannot be instance.
"""
self.logging.info(
f"Syncing HPA setting for {self.item.metadata.name}/name in {self.item.metadata.namespace}"
)
hpa_exists = self.exists_hpa(kube_client)
# NO autoscaling
if self.get_soa_config().get("instances"):
# Remove HPA if autoscaling is disabled
if hpa_exists:
self.delete_horizontal_pod_autoscaler(kube_client)
return
body = self.soa_config.get_autoscaling_metric_spec(
name=self.item.metadata.name, namespace=self.item.metadata.namespace
)
if not body:
raise Exception(
f"CRIT: autoscaling misconfigured for {self.kube_deployment.service}.\
{self.kube_deployment.instance}. Please correct the configuration and update pre-commit hook."
)
self.logging.debug(body)
if not hpa_exists:
self.logging.info(
f"Creating new HPA for {self.item.metadata.name}/name in {self.item.metadata.namespace}"
)
kube_client.autoscaling.create_namespaced_horizontal_pod_autoscaler(
namespace=self.item.metadata.namespace, body=body, pretty=True
)
else:
self.logging.info(
f"Updating new HPA for {self.item.metadata.name}/name in {self.item.metadata.namespace}/namespace"
)
kube_client.autoscaling.patch_namespaced_horizontal_pod_autoscaler(
name=self.item.metadata.name,
namespace=self.item.metadata.namespace,
body=body,
pretty=True,
)

def exists_hpa(self, kube_client: KubeClient) -> bool:
return (
len(
kube_client.autoscaling.list_namespaced_horizontal_pod_autoscaler(
field_selector=f"metadata.name={self.item.metadata.name}",
namespace=self.item.metadata.namespace,
).items
)
> 0
)

def delete_horizontal_pod_autoscaler(self, kube_client: KubeClient) -> None:
try:
kube_client.autoscaling.delete_namespaced_horizontal_pod_autoscaler(
name=self.item.metadata.name,
namespace=self.item.metadata.namespace,
body=V1DeleteOptions(),
)
except ApiException as e:
if e.status == 404:
# Deployment does not exist, nothing to delete but
# we can consider this a success.
self.logging.debug(
"not deleting nonexistent HPA/{self.item.metadata.name} from namespace/{self.item.metadata.namespace}".format(
self.item.metadata.name, self.item.metadata.namespace
)
)
else:
raise
else:
self.logging.info(
"deleted HPA/{} from namespace/{}".format(
self.item.metadata.name, self.item.metadata.namespace
)
)


class StatefulSetWrapper(Application):
Expand All @@ -219,7 +303,7 @@ def deep_delete(self, kube_client: KubeClient) -> None:
)
)
else:
raise e
raise
else:
self.logging.info(
"deleted statefulset/{} from namespace/{}".format(
Expand Down
Loading

0 comments on commit e55bf81

Please sign in to comment.