Skip to content

Commit

Permalink
Add watch to dynamic client (#221)
Browse files Browse the repository at this point in the history
* First stab at watch implementation

* add docs

* remove extraneous newline

* Fixes #203 fix query parameter checking to prevent filtering out valid falsey values

* Add docstring for watch
  • Loading branch information
fabianvf authored Nov 2, 2018
1 parent 7dcc0a7 commit 2bf10ca
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 10 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,16 @@ v1_services.replace(body=body, namespace='test')

The `replace` implementation is the same for `*List` kinds, except that each definition in the list will be replaced separately.

#### `watch(namespace=None, name=None, label_selector=None, field_selector=None, resource_version=None, timeout=None)`

```python
v1_services = dyn_client.resources.get(api_version='v1', kind='Service')

# Prints the resource that triggered each event related to Services in the 'test' namespace
for event in v1_services.watch(namespace='test'):
print(event['object'])
```

### DEPRECATED Generated client usage

To work with a K8s object, use the K8s client, and to work with an OpenShift specific object, use the OpenShift client. For example, the following uses the K8s client to create a new Service object:
Expand Down
59 changes: 49 additions & 10 deletions openshift/dynamic/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import yaml
from pprint import pformat

from kubernetes import config
from kubernetes import config, watch
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException

Expand Down Expand Up @@ -209,6 +209,45 @@ def patch(self, resource, body=None, name=None, namespace=None, **kwargs):

return self.request('patch', path, body=body, content_type=content_type, **kwargs)

def watch(self, resource, namespace=None, name=None, label_selector=None, field_selector=None, resource_version=None, timeout=None):
"""
Stream events for a resource from the Kubernetes API
:param resource: The API resource object that will be used to query the API
:param namespace: The namespace to query
:param name: The name of the resource instance to query
:param label_selector: The label selector with which to filter results
:param label_selector: The field selector with which to filter results
:param resource_version: The version with which to filter results. Only events with
a resource_version greater than this value will be returned
:param timeout: The amount of time in seconds to wait before terminating the stream
:return: Event object with these keys:
'type': The type of event such as "ADDED", "DELETED", etc.
'raw_object': a dict representing the watched object.
'object': A ResourceInstance wrapping raw_object.
Example:
client = DynamicClient(k8s_client)
v1_pods = client.resources.get(api_version='v1', kind='Pod')
for e in v1_pods.watch(resource_version=0, namespace=default, timeout=5):
print(e['type'])
print(e['object'].metadata)
"""
watcher = watch.Watch()
for event in watcher.stream(
resource.get,
namespace=namespace,
name=name,
field_selector=field_selector,
label_selector=label_selector,
resource_version=resource_version,
serialize=False,
timeout_seconds=timeout
):
event['object'] = ResourceInstance(resource, event['object'])
yield event

def request(self, method, path, body=None, **params):

Expand All @@ -217,23 +256,23 @@ def request(self, method, path, body=None, **params):

path_params = params.get('path_params', {})
query_params = params.get('query_params', [])
if params.get('pretty'):
if params.get('pretty') is not None:
query_params.append(('pretty', params['pretty']))
if params.get('_continue'):
if params.get('_continue') is not None:
query_params.append(('continue', params['_continue']))
if params.get('include_uninitialized'):
if params.get('include_uninitialized') is not None:
query_params.append(('includeUninitialized', params['include_uninitialized']))
if params.get('field_selector'):
if params.get('field_selector') is not None:
query_params.append(('fieldSelector', params['field_selector']))
if params.get('label_selector'):
if params.get('label_selector') is not None:
query_params.append(('labelSelector', params['label_selector']))
if params.get('limit'):
if params.get('limit') is not None:
query_params.append(('limit', params['limit']))
if params.get('resource_version'):
if params.get('resource_version') is not None:
query_params.append(('resourceVersion', params['resource_version']))
if params.get('timeout_seconds'):
if params.get('timeout_seconds') is not None:
query_params.append(('timeoutSeconds', params['timeout_seconds']))
if params.get('watch'):
if params.get('watch') is not None:
query_params.append(('watch', params['watch']))

header_params = params.get('header_params', {})
Expand Down

0 comments on commit 2bf10ca

Please sign in to comment.