Skip to content

Commit 9f26947

Browse files
fixes(buehler#739): No retry is performed when the ResourceWatcher fail to watch a resource
1 parent 65d328b commit 9f26947

File tree

10 files changed

+198
-121
lines changed

10 files changed

+198
-121
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,5 @@ coverage.info
3333

3434
# Docs
3535
_site
36+
37+
.idea/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using System.Text.RegularExpressions;
2+
3+
namespace KubeOps.Operator.Watcher;
4+
5+
/// <summary>
6+
/// Method extensions for the <see cref="Exception"/> class.
7+
/// </summary>
8+
public static class ExceptionExtensions
9+
{
10+
/// <summary>
11+
/// Walk through all collected Exceptions (base exception and all inner exceptions) LINQ style.
12+
/// </summary>
13+
public static IEnumerable<Exception> All(this Exception self)
14+
{
15+
if (self == null)
16+
{
17+
throw new ArgumentNullException(nameof(self));
18+
}
19+
20+
var cause = self;
21+
do
22+
{
23+
yield return cause;
24+
cause = ReferenceEquals(cause, cause.InnerException) ? null : cause.InnerException;
25+
}
26+
while (cause != null && !ReferenceEquals(cause, self));
27+
}
28+
}

src/KubeOps.KubernetesClient/IKubernetesClient.cs

+12-4
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ Task<string> GetCurrentNamespaceAsync(
8787
/// <param name="labelSelector">A string, representing an optional label selector for filtering fetched objects.</param>
8888
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
8989
/// <returns>A list of Kubernetes entities.</returns>
90-
Task<IList<TEntity>> ListAsync<TEntity>(
90+
Task<(string? Version, IList<TEntity> Items)> ListAsync<TEntity>(
9191
string? @namespace = null,
9292
string? labelSelector = null,
9393
CancellationToken cancellationToken = default)
@@ -106,7 +106,7 @@ Task<IList<TEntity>> ListAsync<TEntity>(
106106
/// </param>
107107
/// <param name="labelSelectors">A list of label-selectors to apply to the search.</param>
108108
/// <returns>A list of Kubernetes entities.</returns>
109-
Task<IList<TEntity>> ListAsync<TEntity>(
109+
Task<(string? Version, IList<TEntity> Items)> ListAsync<TEntity>(
110110
string? @namespace = null,
111111
params LabelSelector[] labelSelectors)
112112
where TEntity : IKubernetesObject<V1ObjectMeta>
@@ -116,13 +116,13 @@ Task<IList<TEntity>> ListAsync<TEntity>(
116116
}
117117

118118
/// <inheritdoc cref="ListAsync{TEntity}(string?,string?,CancellationToken)"/>
119-
IList<TEntity> List<TEntity>(
119+
(string? Version, IList<TEntity> Items) List<TEntity>(
120120
string? @namespace = null,
121121
string? labelSelector = null)
122122
where TEntity : IKubernetesObject<V1ObjectMeta>;
123123

124124
/// <inheritdoc cref="ListAsync{TEntity}(string?,LabelSelector[])"/>
125-
IList<TEntity> List<TEntity>(
125+
(string? Version, IList<TEntity> Items) List<TEntity>(
126126
string? @namespace = null,
127127
params LabelSelector[] labelSelectors)
128128
where TEntity : IKubernetesObject<V1ObjectMeta>
@@ -489,4 +489,12 @@ Watcher<TEntity> Watch<TEntity>(
489489
string? labelSelector = null,
490490
CancellationToken cancellationToken = default)
491491
where TEntity : IKubernetesObject<V1ObjectMeta>;
492+
493+
public Task WatchSafeAsync<TEntity>(
494+
Func<WatchEventType, TEntity?, CancellationToken, Task> eventTask,
495+
string? @namespace = null,
496+
string? resourceVersion = null,
497+
string? labelSelector = null,
498+
CancellationToken cancellationToken = default)
499+
where TEntity : IKubernetesObject<V1ObjectMeta>;
492500
}

src/KubeOps.KubernetesClient/KubernetesClient.cs

+77-12
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
using System.Collections.Concurrent;
22
using System.Diagnostics;
33
using System.Net;
4+
using System.Net.Sockets;
45
using System.Runtime.CompilerServices;
6+
using System.Text.RegularExpressions;
57

68
using k8s;
79
using k8s.Autorest;
810
using k8s.Models;
911

1012
using KubeOps.Abstractions.Entities;
13+
using KubeOps.Operator.Watcher;
1114
using KubeOps.Transpiler;
1215

1316
namespace KubeOps.KubernetesClient;
@@ -19,6 +22,22 @@ public class KubernetesClient : IKubernetesClient
1922
private const string DefaultNamespace = "default";
2023

2124
private static readonly ConcurrentDictionary<Type, EntityMetadata> MetadataCache = new();
25+
private static List<int?> ResourceFailureCodes = ((int?[])[(int)HttpStatusCode.GatewayTimeout, (int)HttpStatusCode.Gone]).ToList();
26+
27+
/// <summary>
28+
/// HACK to ge the last applicable resourceVersion from the exception.
29+
/// </summary>
30+
/// <example>
31+
/// "too old resource version: 512122628 (544688086)".
32+
/// </example>
33+
private static string? ResourceVersionFromException(Exception? ex)
34+
{
35+
if (ex?.Message is null) return null;
36+
37+
var pattern = @"^\s*too old resource version.*\(([a-zA-Z0-9_-]+)\)\s*$";
38+
var match = Regex.Match(ex.Message, pattern);
39+
return (match.Groups.Count > 1) ? match.Groups[1].Value : null;
40+
}
2241

2342
private readonly KubernetesClientConfiguration _clientConfig;
2443
private readonly IKubernetes _client;
@@ -29,18 +48,14 @@ public class KubernetesClient : IKubernetesClient
2948
/// The client will use the default configuration.
3049
/// </summary>
3150
public KubernetesClient()
32-
: this(KubernetesClientConfiguration.BuildDefaultConfig())
33-
{
34-
}
51+
: this(KubernetesClientConfiguration.BuildDefaultConfig()) { }
3552

3653
/// <summary>
3754
/// Create a new Kubernetes client for the given entity with a custom client configuration.
3855
/// </summary>
3956
/// <param name="clientConfig">The config for the underlying Kubernetes client.</param>
4057
public KubernetesClient(KubernetesClientConfiguration clientConfig)
41-
: this(clientConfig, new Kubernetes(clientConfig))
42-
{
43-
}
58+
: this(clientConfig, new Kubernetes(clientConfig)) { }
4459

4560
/// <summary>
4661
/// Create a new Kubernetes client for the given entity with a custom client configuration and client.
@@ -180,7 +195,7 @@ public string GetCurrentNamespace(string downwardApiEnvName = "POD_NAMESPACE")
180195
}
181196

182197
/// <inheritdoc />
183-
public async Task<IList<TEntity>> ListAsync<TEntity>(
198+
public async Task<(string? Version, IList<TEntity> Items)> ListAsync<TEntity>(
184199
string? @namespace = null,
185200
string? labelSelector = null,
186201
CancellationToken cancellationToken = default)
@@ -189,7 +204,7 @@ public async Task<IList<TEntity>> ListAsync<TEntity>(
189204
ThrowIfDisposed();
190205

191206
var metadata = GetMetadata<TEntity>();
192-
return (@namespace switch
207+
var result = @namespace switch
193208
{
194209
null => await _client.CustomObjects.ListClusterCustomObjectAsync<EntityList<TEntity>>(
195210
metadata.Group ?? string.Empty,
@@ -204,17 +219,20 @@ public async Task<IList<TEntity>> ListAsync<TEntity>(
204219
metadata.PluralName,
205220
labelSelector: labelSelector,
206221
cancellationToken: cancellationToken),
207-
}).Items;
222+
};
223+
224+
return (result.Metadata.ResourceVersion, result.Items);
208225
}
209226

210227
/// <inheritdoc />
211-
public IList<TEntity> List<TEntity>(string? @namespace = null, string? labelSelector = null)
228+
public (string? Version, IList<TEntity> Items) List<TEntity>(string? @namespace = null,
229+
string? labelSelector = null)
212230
where TEntity : IKubernetesObject<V1ObjectMeta>
213231
{
214232
ThrowIfDisposed();
215233

216234
var metadata = GetMetadata<TEntity>();
217-
return (@namespace switch
235+
var result = @namespace switch
218236
{
219237
null => _client.CustomObjects.ListClusterCustomObject<EntityList<TEntity>>(
220238
metadata.Group ?? string.Empty,
@@ -227,7 +245,9 @@ public IList<TEntity> List<TEntity>(string? @namespace = null, string? labelSele
227245
@namespace,
228246
metadata.PluralName,
229247
labelSelector: labelSelector),
230-
}).Items;
248+
};
249+
250+
return (result.Metadata.ResourceVersion, result.Items);
231251
}
232252

233253
/// <inheritdoc />
@@ -339,6 +359,51 @@ public async Task DeleteAsync<TEntity>(
339359
}
340360
}
341361

362+
/// <summary>
363+
///
364+
/// </summary>
365+
/// <param name="onEvent"></param>
366+
/// <param name="namespace"></param>
367+
/// <param name="resourceVersion"></param>
368+
/// <param name="labelSelector"></param>
369+
/// <param name="cancellationToken"></param>
370+
/// <typeparam name="TEntity"></typeparam>
371+
public async Task WatchSafeAsync<TEntity>(
372+
Func<WatchEventType, TEntity?, CancellationToken, Task> eventTask,
373+
string? @namespace = null,
374+
string? resourceVersion = null,
375+
string? labelSelector = null,
376+
CancellationToken cancellationToken = default)
377+
where TEntity : IKubernetesObject<V1ObjectMeta>
378+
{
379+
var currentVersion = resourceVersion;
380+
while (!cancellationToken.IsCancellationRequested)
381+
{
382+
try
383+
{
384+
await foreach (var (typ, e) in WatchAsync<TEntity>(@namespace, currentVersion, labelSelector, cancellationToken))
385+
{
386+
currentVersion = e.ResourceVersion();
387+
await eventTask(typ, e, cancellationToken);
388+
}
389+
}
390+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
391+
{
392+
// OK, end the watch
393+
}
394+
catch (KubernetesException cause) when (ResourceFailureCodes.Contains(cause.Status.Code))
395+
{
396+
currentVersion = ResourceVersionFromException(cause);
397+
if (currentVersion == null) break; // bail out of watch
398+
}
399+
catch (Exception cause) when (cause.All().Any(e => e.Message.Contains("server reset the stream")
400+
|| e is SocketException { ErrorCode: 104 }))
401+
{
402+
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
403+
}
404+
}
405+
}
406+
342407
/// <inheritdoc />
343408
public Watcher<TEntity> Watch<TEntity>(
344409
Action<WatchEventType, TEntity> onEvent,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
namespace KubeOps.Operator.Watcher;
2+
3+
/// <summary>
4+
/// Simple exponential backoff logic.
5+
/// </summary>
6+
public class BackoffPolicy(CancellationToken stoppingToken, Func<int, TimeSpan> policy)
7+
{
8+
private int _retries = 0;
9+
10+
/// <summary>
11+
/// Default exponential backoff algorithm
12+
/// </summary>
13+
public static Func<int, TimeSpan> ExponentialWithJitter(int maxExp = 5, int jitterMillis = 1000)
14+
=> retries => TimeSpan.FromSeconds(Math.Pow(2, Math.Clamp(retries, 0, maxExp)))
15+
.Add(TimeSpan.FromMilliseconds(new Random().Next(0, jitterMillis)));
16+
17+
/// <summary>
18+
/// Clear all counters.
19+
/// </summary>
20+
public void Clear()
21+
{
22+
_retries = 0;
23+
}
24+
25+
/// <summary>
26+
/// Adds a delay.
27+
/// </summary>
28+
/// <param name="ex"><see cref="Exception"/>.</param>
29+
/// <returns><see cref="Task"/>.</returns>
30+
public async Task WaitOnException(Exception ex)
31+
{
32+
try
33+
{
34+
_retries++;
35+
await Task.Delay(WaitTime(), stoppingToken);
36+
}
37+
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
38+
{
39+
// Do nothing
40+
}
41+
}
42+
43+
private TimeSpan WaitTime()
44+
=> policy(_retries);
45+
}

0 commit comments

Comments
 (0)