Skip to content

Commit f82d655

Browse files
fixes(buehler#739): No retry is performed when the ResourceWatcher fail to watch a resource
1 parent 65d328b commit f82d655

File tree

10 files changed

+208
-121
lines changed

10 files changed

+208
-121
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,6 @@ coverage.info
3333

3434
# Docs
3535
_site
36+
37+
.idea/
38+
global.json
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using System.Text.RegularExpressions;
2+
3+
namespace KubeOps.KubernetesClient;
4+
5+
/// <summary>
6+
/// Method extensions for the <see cref="Exception"/> class.
7+
/// </summary>
8+
public static class ExceptionExtensions
9+
{
10+
/// <summary>
11+
/// Walk through all collected Exceptions (base exception and all inner exceptions) LINQ style.
12+
/// </summary>
13+
public static IEnumerable<Exception> All(this Exception self)
14+
{
15+
if (self == null)
16+
{
17+
throw new ArgumentNullException(nameof(self));
18+
}
19+
20+
var cause = self;
21+
do
22+
{
23+
yield return cause;
24+
cause = ReferenceEquals(cause, cause.InnerException) ? null : cause.InnerException;
25+
}
26+
while (cause != null && !ReferenceEquals(cause, self));
27+
}
28+
}

src/KubeOps.KubernetesClient/IKubernetesClient.cs

+30-4
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ Task<string> GetCurrentNamespaceAsync(
8787
/// <param name="labelSelector">A string, representing an optional label selector for filtering fetched objects.</param>
8888
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
8989
/// <returns>A list of Kubernetes entities.</returns>
90-
Task<IList<TEntity>> ListAsync<TEntity>(
90+
Task<(string? Version, IList<TEntity> Items)> ListAsync<TEntity>(
9191
string? @namespace = null,
9292
string? labelSelector = null,
9393
CancellationToken cancellationToken = default)
@@ -106,7 +106,7 @@ Task<IList<TEntity>> ListAsync<TEntity>(
106106
/// </param>
107107
/// <param name="labelSelectors">A list of label-selectors to apply to the search.</param>
108108
/// <returns>A list of Kubernetes entities.</returns>
109-
Task<IList<TEntity>> ListAsync<TEntity>(
109+
Task<(string? Version, IList<TEntity> Items)> ListAsync<TEntity>(
110110
string? @namespace = null,
111111
params LabelSelector[] labelSelectors)
112112
where TEntity : IKubernetesObject<V1ObjectMeta>
@@ -116,13 +116,13 @@ Task<IList<TEntity>> ListAsync<TEntity>(
116116
}
117117

118118
/// <inheritdoc cref="ListAsync{TEntity}(string?,string?,CancellationToken)"/>
119-
IList<TEntity> List<TEntity>(
119+
(string? Version, IList<TEntity> Items) List<TEntity>(
120120
string? @namespace = null,
121121
string? labelSelector = null)
122122
where TEntity : IKubernetesObject<V1ObjectMeta>;
123123

124124
/// <inheritdoc cref="ListAsync{TEntity}(string?,LabelSelector[])"/>
125-
IList<TEntity> List<TEntity>(
125+
(string? Version, IList<TEntity> Items) List<TEntity>(
126126
string? @namespace = null,
127127
params LabelSelector[] labelSelectors)
128128
where TEntity : IKubernetesObject<V1ObjectMeta>
@@ -489,4 +489,30 @@ Watcher<TEntity> Watch<TEntity>(
489489
string? labelSelector = null,
490490
CancellationToken cancellationToken = default)
491491
where TEntity : IKubernetesObject<V1ObjectMeta>;
492+
493+
/// <summary>
494+
/// Creates an asynchronous entity watcher on the Kubernetes API.
495+
/// Kubernetes recoverable exceptions are handled within the watcher,
496+
/// and the watch is automatically recreated in such cases.
497+
/// </summary>
498+
/// <param name="eventTask">(async) Task Action that is executed when an event occurs.</param>
499+
/// <param name="namespace">
500+
/// The namespace to watch for entities (if needed).
501+
/// If the namespace is omitted, all entities on the cluster are watched.
502+
/// </param>
503+
/// <param name="resourceVersion">
504+
/// When specified with a watch call, shows changes that occur after that particular version of a resource.
505+
/// Defaults to changes from the beginning of history.
506+
/// </param>
507+
/// <param name="labelSelector">A string, representing an optional label selector for filtering watched objects.</param>
508+
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
509+
/// <typeparam name="TEntity">The type of the Kubernetes entity.</typeparam>
510+
/// <returns>A task that completes when the watcher ends.</returns>
511+
public Task WatchSafeAsync<TEntity>(
512+
Func<WatchEventType, TEntity?, CancellationToken, Task> eventTask,
513+
string? @namespace = null,
514+
string? resourceVersion = null,
515+
string? labelSelector = null,
516+
CancellationToken cancellationToken = default)
517+
where TEntity : IKubernetesObject<V1ObjectMeta>;
492518
}

src/KubeOps.KubernetesClient/KubernetesClient.cs

+68-12
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
using System.Collections.Concurrent;
22
using System.Diagnostics;
33
using System.Net;
4+
using System.Net.Sockets;
45
using System.Runtime.CompilerServices;
6+
using System.Text.RegularExpressions;
57

68
using k8s;
79
using k8s.Autorest;
@@ -19,6 +21,22 @@ public class KubernetesClient : IKubernetesClient
1921
private const string DefaultNamespace = "default";
2022

2123
private static readonly ConcurrentDictionary<Type, EntityMetadata> MetadataCache = new();
24+
private static List<int?> ResourceFailureCodes = ((int?[])[(int)HttpStatusCode.GatewayTimeout, (int)HttpStatusCode.Gone]).ToList();
25+
26+
/// <summary>
27+
/// HACK to ge the last applicable resourceVersion from the exception.
28+
/// </summary>
29+
/// <example>
30+
/// "too old resource version: 512122628 (544688086)".
31+
/// </example>
32+
private static string? ResourceVersionFromException(Exception? ex)
33+
{
34+
if (ex?.Message is null) return null;
35+
36+
var pattern = @"^\s*too old resource version.*\(([a-zA-Z0-9_-]+)\)\s*$";
37+
var match = Regex.Match(ex.Message, pattern);
38+
return (match.Groups.Count > 1) ? match.Groups[1].Value : null;
39+
}
2240

2341
private readonly KubernetesClientConfiguration _clientConfig;
2442
private readonly IKubernetes _client;
@@ -29,18 +47,14 @@ public class KubernetesClient : IKubernetesClient
2947
/// The client will use the default configuration.
3048
/// </summary>
3149
public KubernetesClient()
32-
: this(KubernetesClientConfiguration.BuildDefaultConfig())
33-
{
34-
}
50+
: this(KubernetesClientConfiguration.BuildDefaultConfig()) { }
3551

3652
/// <summary>
3753
/// Create a new Kubernetes client for the given entity with a custom client configuration.
3854
/// </summary>
3955
/// <param name="clientConfig">The config for the underlying Kubernetes client.</param>
4056
public KubernetesClient(KubernetesClientConfiguration clientConfig)
41-
: this(clientConfig, new Kubernetes(clientConfig))
42-
{
43-
}
57+
: this(clientConfig, new Kubernetes(clientConfig)) { }
4458

4559
/// <summary>
4660
/// Create a new Kubernetes client for the given entity with a custom client configuration and client.
@@ -180,7 +194,7 @@ public string GetCurrentNamespace(string downwardApiEnvName = "POD_NAMESPACE")
180194
}
181195

182196
/// <inheritdoc />
183-
public async Task<IList<TEntity>> ListAsync<TEntity>(
197+
public async Task<(string? Version, IList<TEntity> Items)> ListAsync<TEntity>(
184198
string? @namespace = null,
185199
string? labelSelector = null,
186200
CancellationToken cancellationToken = default)
@@ -189,7 +203,7 @@ public async Task<IList<TEntity>> ListAsync<TEntity>(
189203
ThrowIfDisposed();
190204

191205
var metadata = GetMetadata<TEntity>();
192-
return (@namespace switch
206+
var result = @namespace switch
193207
{
194208
null => await _client.CustomObjects.ListClusterCustomObjectAsync<EntityList<TEntity>>(
195209
metadata.Group ?? string.Empty,
@@ -204,17 +218,20 @@ public async Task<IList<TEntity>> ListAsync<TEntity>(
204218
metadata.PluralName,
205219
labelSelector: labelSelector,
206220
cancellationToken: cancellationToken),
207-
}).Items;
221+
};
222+
223+
return (result.Metadata.ResourceVersion, result.Items);
208224
}
209225

210226
/// <inheritdoc />
211-
public IList<TEntity> List<TEntity>(string? @namespace = null, string? labelSelector = null)
227+
public (string? Version, IList<TEntity> Items) List<TEntity>(string? @namespace = null,
228+
string? labelSelector = null)
212229
where TEntity : IKubernetesObject<V1ObjectMeta>
213230
{
214231
ThrowIfDisposed();
215232

216233
var metadata = GetMetadata<TEntity>();
217-
return (@namespace switch
234+
var result = @namespace switch
218235
{
219236
null => _client.CustomObjects.ListClusterCustomObject<EntityList<TEntity>>(
220237
metadata.Group ?? string.Empty,
@@ -227,7 +244,9 @@ public IList<TEntity> List<TEntity>(string? @namespace = null, string? labelSele
227244
@namespace,
228245
metadata.PluralName,
229246
labelSelector: labelSelector),
230-
}).Items;
247+
};
248+
249+
return (result.Metadata.ResourceVersion, result.Items);
231250
}
232251

233252
/// <inheritdoc />
@@ -339,6 +358,43 @@ public async Task DeleteAsync<TEntity>(
339358
}
340359
}
341360

361+
/// <inheritdoc />
362+
public async Task WatchSafeAsync<TEntity>(
363+
Func<WatchEventType, TEntity?, CancellationToken, Task> eventTask,
364+
string? @namespace = null,
365+
string? resourceVersion = null,
366+
string? labelSelector = null,
367+
CancellationToken cancellationToken = default)
368+
where TEntity : IKubernetesObject<V1ObjectMeta>
369+
{
370+
var currentVersion = resourceVersion;
371+
while (!cancellationToken.IsCancellationRequested)
372+
{
373+
try
374+
{
375+
await foreach (var (typ, e) in WatchAsync<TEntity>(@namespace, currentVersion, labelSelector, cancellationToken))
376+
{
377+
currentVersion = e.ResourceVersion();
378+
await eventTask(typ, e, cancellationToken);
379+
}
380+
}
381+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
382+
{
383+
// OK, end the watch
384+
}
385+
catch (KubernetesException cause) when (ResourceFailureCodes.Contains(cause.Status.Code))
386+
{
387+
currentVersion = ResourceVersionFromException(cause);
388+
if (currentVersion == null) break; // bail out of watch
389+
}
390+
catch (Exception cause) when (cause.All().Any(e => e.Message.Contains("server reset the stream")
391+
|| e is SocketException { ErrorCode: 104 }))
392+
{
393+
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
394+
}
395+
}
396+
}
397+
342398
/// <inheritdoc />
343399
public Watcher<TEntity> Watch<TEntity>(
344400
Action<WatchEventType, TEntity> onEvent,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
namespace KubeOps.Operator.Watcher;
2+
3+
/// <summary>
4+
/// Simple exponential backoff logic.
5+
/// </summary>
6+
public class BackoffPolicy(CancellationToken stoppingToken, Func<int, TimeSpan> policy)
7+
{
8+
private int _retries = 0;
9+
10+
/// <summary>
11+
/// Default exponential backoff algorithm
12+
/// </summary>
13+
public static Func<int, TimeSpan> ExponentialWithJitter(int maxExp = 5, int jitterMillis = 1000)
14+
=> retries => TimeSpan.FromSeconds(Math.Pow(2, Math.Clamp(retries, 0, maxExp)))
15+
.Add(TimeSpan.FromMilliseconds(new Random().Next(0, jitterMillis)));
16+
17+
/// <summary>
18+
/// Clear all counters.
19+
/// </summary>
20+
public void Clear()
21+
{
22+
_retries = 0;
23+
}
24+
25+
/// <summary>
26+
/// Adds a delay.
27+
/// </summary>
28+
/// <param name="ex"><see cref="Exception"/>.</param>
29+
/// <returns><see cref="Task"/>.</returns>
30+
public async Task WaitOnException(Exception ex)
31+
{
32+
try
33+
{
34+
_retries++;
35+
await Task.Delay(WaitTime(), stoppingToken);
36+
}
37+
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
38+
{
39+
// Do nothing
40+
}
41+
}
42+
43+
private TimeSpan WaitTime()
44+
=> policy(_retries);
45+
}

0 commit comments

Comments
 (0)