diff --git a/dynamic/client.py b/dynamic/client.py index 353a481..a81039b 100644 --- a/dynamic/client.py +++ b/dynamic/client.py @@ -149,6 +149,20 @@ def patch(self, resource, body=None, name=None, namespace=None, **kwargs): return self.request('patch', path, body=body, content_type=content_type, **kwargs) + def server_side_apply(self, resource, body=None, name=None, namespace=None, force_conflicts=None, **kwargs): + body = self.serialize_body(body) + name = name or body.get('metadata', {}).get('name') + if not name: + raise ValueError("name is required to patch {}.{}".format(resource.group_version, resource.kind)) + if resource.namespaced: + namespace = self.ensure_namespace(resource, namespace, body) + + # force content type to 'application/apply-patch+yaml' + kwargs.update({'content_type': 'application/apply-patch+yaml'}) + path = resource.path(name=name, namespace=namespace) + + return self.request('patch', path, body=body, force_conflicts=force_conflicts, **kwargs) + def watch(self, resource, namespace=None, name=None, label_selector=None, field_selector=None, resource_version=None, timeout=None, watcher=None): """ Stream events for a resource from the Kubernetes API @@ -227,6 +241,10 @@ def request(self, method, path, body=None, **params): query_params.append(('orphanDependents', params['orphan_dependents'])) if params.get('dry_run') is not None: query_params.append(('dryRun', params['dry_run'])) + if params.get('field_manager') is not None: + query_params.append(('fieldManager', params['field_manager'])) + if params.get('force_conflicts') is not None: + query_params.append(('force', params['force_conflicts'])) header_params = params.get('header_params', {}) form_params = [] diff --git a/dynamic/test_client.py b/dynamic/test_client.py index ab1df93..c31270b 100644 --- a/dynamic/test_client.py +++ b/dynamic/test_client.py @@ -15,6 +15,7 @@ import time import unittest import uuid +import json from kubernetes.e2e_test import base from kubernetes.client import api_client @@ -394,6 +395,29 @@ def test_node_apis_partial_object_metadata(self): self.assertEqual('PartialObjectMetadataList', resp.kind) self.assertEqual('meta.k8s.io/v1', resp.apiVersion) + def test_server_side_apply_api(self): + client = DynamicClient(api_client.ApiClient(configuration=self.config)) + api = client.resources.get( + api_version='v1', kind='Pod') + + name = 'pod-' + short_uuid() + pod_manifest = { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': {'labels': {'name': name}, + 'name': name}, + 'spec': {'containers': [{ + 'image': 'nginx', + 'name': 'nginx', + 'ports': [{'containerPort': 80, + 'protocol': 'TCP'}]}]}} + + body = json.dumps(pod_manifest).encode() + resp = api.server_side_apply( + name=name, namespace='default', body=body, + field_manager='kubernetes-unittests', dry_run="All") + self.assertEqual('kubernetes-unittests', resp.metadata.managedFields[0].manager) + class TestDynamicClientSerialization(unittest.TestCase):