Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/KubernetesClient/GenericClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,34 @@ public async Task<T> ReplaceNamespacedAsync<T>(T obj, string ns, string name, Ca
return KubernetesJson.Deserialize<T>(resp.Body.ToString());
}

public IAsyncEnumerable<(WatchEventType, T)> WatchAsync<T>(Action<Exception> onError = null, CancellationToken cancel = default)
where T : IKubernetesObject
{
var respTask = kubernetes.CustomObjects.ListClusterCustomObjectWithHttpMessagesAsync(group, version, plural, watch: true, cancellationToken: cancel);
return respTask.WatchAsync<T, object>();
}

public IAsyncEnumerable<(WatchEventType, T)> WatchNamespacedAsync<T>(string ns, Action<Exception> onError = null, CancellationToken cancel = default)
where T : IKubernetesObject
{
var respTask = kubernetes.CustomObjects.ListNamespacedCustomObjectWithHttpMessagesAsync(group, version, ns, plural, watch: true, cancellationToken: cancel);
return respTask.WatchAsync<T, object>();
}

public Watcher<T> Watch<T>(Action<WatchEventType, T> onEvent, Action<Exception> onError = null, Action onClosed = null)
where T : IKubernetesObject
{
var respTask = kubernetes.CustomObjects.ListClusterCustomObjectWithHttpMessagesAsync(group, version, plural, watch: true);
return respTask.Watch(onEvent, onError, onClosed);
}

public Watcher<T> WatchNamespaced<T>(string ns, Action<WatchEventType, T> onEvent, Action<Exception> onError = null, Action onClosed = null)
where T : IKubernetesObject
{
var respTask = kubernetes.CustomObjects.ListNamespacedCustomObjectWithHttpMessagesAsync(group, version, ns, plural, watch: true);
return respTask.Watch(onEvent, onError, onClosed);
}

public void Dispose()
{
Dispose(true);
Expand Down
7 changes: 5 additions & 2 deletions src/KubernetesClient/WatcherExt.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using k8s.Autorest;
using k8s.Exceptions;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace k8s
Expand Down Expand Up @@ -72,12 +73,14 @@ public static Watcher<T> Watch<T, L>(
/// <typeparam name="L">type of the HttpOperationResponse object</typeparam>
/// <param name="responseTask">the api response</param>
/// <param name="onError">a callbak when any exception was caught during watching</param>
/// <param name="cancellationToken">cancellation token</param>
/// <returns>IAsyncEnumerable of watch events</returns>
public static IAsyncEnumerable<(WatchEventType, T)> WatchAsync<T, L>(
this Task<HttpOperationResponse<L>> responseTask,
Action<Exception> onError = null)
Action<Exception> onError = null,
CancellationToken cancellationToken = default)
{
return Watcher<T>.CreateWatchEventEnumerator(MakeStreamReaderCreator<T, L>(responseTask), onError);
return Watcher<T>.CreateWatchEventEnumerator(MakeStreamReaderCreator<T, L>(responseTask), onError, cancellationToken);
}
}
}