Skip to content

Commit

Permalink
refactor: WatchPodAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
josephnhtam committed Jul 28, 2024
1 parent b33b27c commit de77448
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ internal class PodWatcherService : BackgroundService
private readonly IPodLifetimeManager _lifetimeManager;
private readonly ILogger _logger;

private readonly object syncLock = new();
private Task? reconilationTask;

public PodWatcherService(IKubernetesContext context, IPodLifetimeManager lifetimeManager, ILogger<PodWatcherService> logger)
{
_context = context;
Expand All @@ -36,18 +39,34 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

private async Task WatchPod(CancellationToken stoppingToken)
{
var watcher = _context.WatchPodAsync(stoppingToken);
await _context.WatchPodAsync(
(eventType, pod) =>
{
if (eventType != WatchEventType.Added && eventType != WatchEventType.Modified)
return;
await foreach (var (eventType, pod) in watcher)
{
if (eventType != WatchEventType.Added && eventType != WatchEventType.Modified)
continue;
var labels = pod.Labels() ?? new Dictionary<string, string>();
var annotataions = pod.Annotations() ?? new Dictionary<string, string>();
var labels = pod.Labels() ?? new Dictionary<string, string>();
var annotataions = pod.Annotations() ?? new Dictionary<string, string>();
lock (syncLock)
{
if (reconilationTask == null || reconilationTask.IsCompleted)
{
reconilationTask = _lifetimeManager.ReconcileAsync(labels.AsReadOnly(), annotataions.AsReadOnly(), stoppingToken).AsTask();
}
else
{
reconilationTask = reconilationTask.ContinueWith(async _ =>
await _lifetimeManager.ReconcileAsync(labels.AsReadOnly(), annotataions.AsReadOnly(), stoppingToken)
);
}
}
},
stoppingToken: stoppingToken
);

await _lifetimeManager.ReconcileAsync(labels.AsReadOnly(), annotataions.AsReadOnly());
}
if (reconilationTask != null && !reconilationTask.IsCompleted)
await reconilationTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
using Microsoft.Extensions.Logging;
using k8s;
using Microsoft.Extensions.Logging;

namespace LiveStreamingServerNet.KubernetesPod.Internal.Logging
{
internal static partial class LoggerExtensions
{
[LoggerMessage(LogLevel.Information, "Starting the pod watcher")]
public static partial void StartingWatcher(this ILogger logger);
public static partial void StartingPodWatcher(this ILogger logger);

[LoggerMessage(LogLevel.Information, "Pod event received | EventType: {EventType}")]
public static partial void PodEventReceived(this ILogger logger, WatchEventType eventType);

[LoggerMessage(LogLevel.Error, "An error occurred when watching the pod")]
public static partial void WatchingPodError(this ILogger logger, Exception exception);
Expand All @@ -19,8 +23,8 @@ internal static partial class LoggerExtensions
[LoggerMessage(LogLevel.Error, "An error occurred when patching the pod | JsonPatch: {JsonPatch}")]
public static partial void PatchingPodError(this ILogger logger, string jsonPatch, Exception exception);

[LoggerMessage(LogLevel.Information, "Restarting watcher as no event is received since {LastEventTime}")]
public static partial void RestartingWatcher(this ILogger logger, DateTimeOffset lastEventTime);
[LoggerMessage(LogLevel.Information, "Restarting the pod watcher as no event is received since {LastEventTime}")]
public static partial void RestartingPodWatcher(this ILogger logger, DateTimeOffset lastEventTime);

[LoggerMessage(LogLevel.Information, "Ignoring error occurred when watching the pod")]
public static partial void IgnoringWatchingPodError(this ILogger logger, Exception exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ internal interface IKubernetesContext
string PodName { get; }
Task<V1Pod> GetPodAsync(CancellationToken cancellationToken);
Task PatchPodAsync(Action<IPodPatcherBuilder> configureBuilder);
IAsyncEnumerable<(WatchEventType, V1Pod)> WatchPodAsync(CancellationToken cancellationToken = default, TimeSpan? reconnectCheck = null, TimeSpan? retryDelay = null);
Task WatchPodAsync(Action<WatchEventType, V1Pod> onPodEvent, WatchPodOptions? options = null, CancellationToken stoppingToken = default);
}

public record WatchPodOptions(TimeSpan? ReconnectCheck = null, TimeSpan? RetryDelay = null);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace LiveStreamingServerNet.KubernetesPod.Internal.Services.Contracts
{
internal interface IPodLifetimeManager : IPodStatus
{
ValueTask ReconcileAsync(IReadOnlyDictionary<string, string> labels, IReadOnlyDictionary<string, string> annotations);
ValueTask ReconcileAsync(IReadOnlyDictionary<string, string> labels, IReadOnlyDictionary<string, string> annotations, CancellationToken cancellation);

ValueTask OnClientDisposedAsync(uint clientId);
ValueTask OnStreamPublishedAsync(uint clientId, string streamIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using LiveStreamingServerNet.KubernetesPod.Utilities;
using LiveStreamingServerNet.KubernetesPod.Utilities.Contracts;
using Microsoft.Extensions.Logging;
using System.Runtime.CompilerServices;
using System.Text.Json;

namespace LiveStreamingServerNet.KubernetesPod.Internal.Services
Expand Down Expand Up @@ -55,118 +54,91 @@ public async Task<V1Pod> GetPodAsync(CancellationToken cancellationToken)
throw new InvalidOperationException($"Failed to get the pod '{PodName} in {PodNamespace}");
}

public async IAsyncEnumerable<(WatchEventType, V1Pod)> WatchPodAsync(
[EnumeratorCancellation] CancellationToken stoppingToken = default,
TimeSpan? reconnectCheck = null,
TimeSpan? retryDelay = null)
public async Task WatchPodAsync(Action<WatchEventType, V1Pod> onPodEvent, WatchPodOptions? options = null, CancellationToken stoppingToken = default)
{
reconnectCheck ??= TimeSpan.FromMinutes(5);
retryDelay ??= TimeSpan.FromSeconds(5);
var reconnectCheck = options?.ReconnectCheck ?? TimeSpan.FromMinutes(5);
var retryDelay = options?.ReconnectCheck ?? TimeSpan.FromSeconds(5);

while (true)
{
stoppingToken.ThrowIfCancellationRequested();

var lastEventReceivedTime = DateTime.UtcNow;
try
{
var lastEventReceivedTime = DateTime.UtcNow;

using var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
var cancellationToken = cts.Token;
using var watcherCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
var watcherCancellation = watcherCts.Token;

using var timer = new Timer(_ =>
{
if (cts.IsCancellationRequested)
return;
var watcherCompletionSource = new TaskCompletionSource();

var timeSinceLastEvent = DateTime.UtcNow - lastEventReceivedTime;
if (timeSinceLastEvent > reconnectCheck.Value)
{
_logger.RestartingWatcher(lastEventReceivedTime);
cts.Cancel();
}
},
state: null,
dueTime: reconnectCheck.Value / 2,
period: reconnectCheck.Value / 2);
_logger.StartingPodWatcher();

IAsyncEnumerator<(WatchEventType, V1Pod)> watcher;
using var watcher = CreatePodWatcher(
(eventType, pod) =>
{
_logger.PodEventReceived(eventType);
try
{
_logger.StartingWatcher();
watcher = WatchPodAsyncCore(cancellationToken);
lastEventReceivedTime = DateTime.UtcNow;
onPodEvent(eventType, pod);
},
watcherCompletionSource.SetException,
watcherCompletionSource.SetResult,
watcherCancellation);

using var timer = new Timer(_ =>
{
if (watcherCts.IsCancellationRequested)
return;
var timeSinceLastEvent = DateTime.UtcNow - lastEventReceivedTime;
if (timeSinceLastEvent > reconnectCheck)
{
_logger.RestartingPodWatcher(lastEventReceivedTime);
watcherCts.Cancel();
watcher.Dispose();
}
},
state: null,
dueTime: reconnectCheck / 2,
period: reconnectCheck / 2);

await watcherCompletionSource.Task;
}
catch (OperationCanceledException)
{
continue;
}
catch (Exception ex)
{
await OnPodWatchErrorAsync(ex, retryDelay.Value, cancellationToken);
continue;
_logger.WatchingPodError(ex, retryDelay);
await Task.Delay(retryDelay, stoppingToken);
}

while (true)
{
try
{
if (!await watcher.MoveNextAsync())
break;
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
await OnPodWatchErrorAsync(ex, retryDelay.Value, cancellationToken);
break;
}

lastEventReceivedTime = DateTime.UtcNow;
yield return watcher.Current;
}
}
}

private async ValueTask OnPodWatchErrorAsync(Exception ex, TimeSpan retryDelay, CancellationToken cancellation)
{
if (cancellation.IsCancellationRequested)
return;

_logger.WatchingPodError(ex, retryDelay);

try
{
await Task.Delay(retryDelay, cancellation);
}
catch (OperationCanceledException)
{
return;
}
}

private IAsyncEnumerator<(WatchEventType, V1Pod)> WatchPodAsyncCore(CancellationToken cancellationToken)
private Watcher<V1Pod> CreatePodWatcher(Action<WatchEventType, V1Pod> onPodEvent, Action<Exception> onError, Action onClosed, CancellationToken cancellationToken)
{
var watcher = KubernetesClient.CoreV1.ListNamespacedPodWithHttpMessagesAsync(
return KubernetesClient.CoreV1.ListNamespacedPodWithHttpMessagesAsync(
namespaceParameter: PodNamespace,
fieldSelector: $"metadata.name={PodName}",
watch: true,
cancellationToken: cancellationToken
).WatchAsync<V1Pod, V1PodList>(
).Watch(
onEvent: onPodEvent,
onError: ex =>
{
if (ex is KubernetesException kubernetesError &&
string.Equals(kubernetesError.Status.Reason, "Expired", StringComparison.Ordinal))
string.Equals(kubernetesError.Status.Reason, "Expired", StringComparison.OrdinalIgnoreCase))
{
onError(ex);
throw ex;
}
_logger.IgnoringWatchingPodError(ex);
},
cancellationToken: cancellationToken
onClosed: onClosed
);

return watcher.GetAsyncEnumerator(cancellationToken);
}

public async Task PatchPodAsync(Action<IPodPatcherBuilder> configureBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public PodLifetimeManager(
_streamsLimit = int.MaxValue;
}

public async ValueTask ReconcileAsync(IReadOnlyDictionary<string, string> labels, IReadOnlyDictionary<string, string> annotations)
public async ValueTask ReconcileAsync(IReadOnlyDictionary<string, string> labels, IReadOnlyDictionary<string, string> annotations, CancellationToken cancellation)
{
if (!labels.TryGetValue(PodConstants.PendingStopLabel, out var isPendingStopStr))
{
Expand Down

0 comments on commit de77448

Please sign in to comment.