diff --git a/src/LiveStreamingServerNet.KubernetesPod/Internal/HostedServices/PodWatcherService.cs b/src/LiveStreamingServerNet.KubernetesPod/Internal/HostedServices/PodWatcherService.cs index 9fc005f2..7b0b5734 100644 --- a/src/LiveStreamingServerNet.KubernetesPod/Internal/HostedServices/PodWatcherService.cs +++ b/src/LiveStreamingServerNet.KubernetesPod/Internal/HostedServices/PodWatcherService.cs @@ -13,6 +13,9 @@ internal class PodWatcherService : BackgroundService private readonly IPodLifetimeManager _lifetimeManager; private readonly ILogger _logger; + private readonly object syncLock = new(); + private Task? reconilationTask; + public PodWatcherService(IKubernetesContext context, IPodLifetimeManager lifetimeManager, ILogger logger) { _context = context; @@ -36,18 +39,34 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) private async Task WatchPod(CancellationToken stoppingToken) { - var watcher = _context.WatchPodAsync(stoppingToken); + await _context.WatchPodAsync( + (eventType, pod) => + { + if (eventType != WatchEventType.Added && eventType != WatchEventType.Modified) + return; - await foreach (var (eventType, pod) in watcher) - { - if (eventType != WatchEventType.Added && eventType != WatchEventType.Modified) - continue; + var labels = pod.Labels() ?? new Dictionary(); + var annotataions = pod.Annotations() ?? new Dictionary(); - var labels = pod.Labels() ?? new Dictionary(); - var annotataions = pod.Annotations() ?? new Dictionary(); + lock (syncLock) + { + if (reconilationTask == null || reconilationTask.IsCompleted) + { + reconilationTask = _lifetimeManager.ReconcileAsync(labels.AsReadOnly(), annotataions.AsReadOnly(), stoppingToken).AsTask(); + } + else + { + reconilationTask = reconilationTask.ContinueWith(async _ => + await _lifetimeManager.ReconcileAsync(labels.AsReadOnly(), annotataions.AsReadOnly(), stoppingToken) + ); + } + } + }, + stoppingToken: stoppingToken + ); - await _lifetimeManager.ReconcileAsync(labels.AsReadOnly(), annotataions.AsReadOnly()); - } + if (reconilationTask != null && !reconilationTask.IsCompleted) + await reconilationTask; } } } diff --git a/src/LiveStreamingServerNet.KubernetesPod/Internal/Logging/LoggerExtensions.cs b/src/LiveStreamingServerNet.KubernetesPod/Internal/Logging/LoggerExtensions.cs index 6da1a4a6..61da0127 100644 --- a/src/LiveStreamingServerNet.KubernetesPod/Internal/Logging/LoggerExtensions.cs +++ b/src/LiveStreamingServerNet.KubernetesPod/Internal/Logging/LoggerExtensions.cs @@ -1,11 +1,15 @@ -using Microsoft.Extensions.Logging; +using k8s; +using Microsoft.Extensions.Logging; namespace LiveStreamingServerNet.KubernetesPod.Internal.Logging { internal static partial class LoggerExtensions { [LoggerMessage(LogLevel.Information, "Starting the pod watcher")] - public static partial void StartingWatcher(this ILogger logger); + public static partial void StartingPodWatcher(this ILogger logger); + + [LoggerMessage(LogLevel.Information, "Pod event received | EventType: {EventType}")] + public static partial void PodEventReceived(this ILogger logger, WatchEventType eventType); [LoggerMessage(LogLevel.Error, "An error occurred when watching the pod")] public static partial void WatchingPodError(this ILogger logger, Exception exception); @@ -19,8 +23,8 @@ internal static partial class LoggerExtensions [LoggerMessage(LogLevel.Error, "An error occurred when patching the pod | JsonPatch: {JsonPatch}")] public static partial void PatchingPodError(this ILogger logger, string jsonPatch, Exception exception); - [LoggerMessage(LogLevel.Information, "Restarting watcher as no event is received since {LastEventTime}")] - public static partial void RestartingWatcher(this ILogger logger, DateTimeOffset lastEventTime); + [LoggerMessage(LogLevel.Information, "Restarting the pod watcher as no event is received since {LastEventTime}")] + public static partial void RestartingPodWatcher(this ILogger logger, DateTimeOffset lastEventTime); [LoggerMessage(LogLevel.Information, "Ignoring error occurred when watching the pod")] public static partial void IgnoringWatchingPodError(this ILogger logger, Exception exception); diff --git a/src/LiveStreamingServerNet.KubernetesPod/Internal/Services/Contracts/IKubernetesContext.cs b/src/LiveStreamingServerNet.KubernetesPod/Internal/Services/Contracts/IKubernetesContext.cs index 31f728f3..6b7416ff 100644 --- a/src/LiveStreamingServerNet.KubernetesPod/Internal/Services/Contracts/IKubernetesContext.cs +++ b/src/LiveStreamingServerNet.KubernetesPod/Internal/Services/Contracts/IKubernetesContext.cs @@ -11,6 +11,8 @@ internal interface IKubernetesContext string PodName { get; } Task GetPodAsync(CancellationToken cancellationToken); Task PatchPodAsync(Action configureBuilder); - IAsyncEnumerable<(WatchEventType, V1Pod)> WatchPodAsync(CancellationToken cancellationToken = default, TimeSpan? reconnectCheck = null, TimeSpan? retryDelay = null); + Task WatchPodAsync(Action onPodEvent, WatchPodOptions? options = null, CancellationToken stoppingToken = default); } + + public record WatchPodOptions(TimeSpan? ReconnectCheck = null, TimeSpan? RetryDelay = null); } diff --git a/src/LiveStreamingServerNet.KubernetesPod/Internal/Services/Contracts/IPodLifetimeManager.cs b/src/LiveStreamingServerNet.KubernetesPod/Internal/Services/Contracts/IPodLifetimeManager.cs index 96b94602..29b0be57 100644 --- a/src/LiveStreamingServerNet.KubernetesPod/Internal/Services/Contracts/IPodLifetimeManager.cs +++ b/src/LiveStreamingServerNet.KubernetesPod/Internal/Services/Contracts/IPodLifetimeManager.cs @@ -4,7 +4,7 @@ namespace LiveStreamingServerNet.KubernetesPod.Internal.Services.Contracts { internal interface IPodLifetimeManager : IPodStatus { - ValueTask ReconcileAsync(IReadOnlyDictionary labels, IReadOnlyDictionary annotations); + ValueTask ReconcileAsync(IReadOnlyDictionary labels, IReadOnlyDictionary annotations, CancellationToken cancellation); ValueTask OnClientDisposedAsync(uint clientId); ValueTask OnStreamPublishedAsync(uint clientId, string streamIdentifier); diff --git a/src/LiveStreamingServerNet.KubernetesPod/Internal/Services/KubernetesContext.cs b/src/LiveStreamingServerNet.KubernetesPod/Internal/Services/KubernetesContext.cs index 6aafc0d0..742f9345 100644 --- a/src/LiveStreamingServerNet.KubernetesPod/Internal/Services/KubernetesContext.cs +++ b/src/LiveStreamingServerNet.KubernetesPod/Internal/Services/KubernetesContext.cs @@ -5,7 +5,6 @@ using LiveStreamingServerNet.KubernetesPod.Utilities; using LiveStreamingServerNet.KubernetesPod.Utilities.Contracts; using Microsoft.Extensions.Logging; -using System.Runtime.CompilerServices; using System.Text.Json; namespace LiveStreamingServerNet.KubernetesPod.Internal.Services @@ -55,45 +54,56 @@ public async Task GetPodAsync(CancellationToken cancellationToken) throw new InvalidOperationException($"Failed to get the pod '{PodName} in {PodNamespace}"); } - public async IAsyncEnumerable<(WatchEventType, V1Pod)> WatchPodAsync( - [EnumeratorCancellation] CancellationToken stoppingToken = default, - TimeSpan? reconnectCheck = null, - TimeSpan? retryDelay = null) + public async Task WatchPodAsync(Action onPodEvent, WatchPodOptions? options = null, CancellationToken stoppingToken = default) { - reconnectCheck ??= TimeSpan.FromMinutes(5); - retryDelay ??= TimeSpan.FromSeconds(5); + var reconnectCheck = options?.ReconnectCheck ?? TimeSpan.FromMinutes(5); + var retryDelay = options?.ReconnectCheck ?? TimeSpan.FromSeconds(5); while (true) { stoppingToken.ThrowIfCancellationRequested(); - var lastEventReceivedTime = DateTime.UtcNow; + try + { + var lastEventReceivedTime = DateTime.UtcNow; - using var cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); - var cancellationToken = cts.Token; + using var watcherCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); + var watcherCancellation = watcherCts.Token; - using var timer = new Timer(_ => - { - if (cts.IsCancellationRequested) - return; + var watcherCompletionSource = new TaskCompletionSource(); - var timeSinceLastEvent = DateTime.UtcNow - lastEventReceivedTime; - if (timeSinceLastEvent > reconnectCheck.Value) - { - _logger.RestartingWatcher(lastEventReceivedTime); - cts.Cancel(); - } - }, - state: null, - dueTime: reconnectCheck.Value / 2, - period: reconnectCheck.Value / 2); + _logger.StartingPodWatcher(); - IAsyncEnumerator<(WatchEventType, V1Pod)> watcher; + using var watcher = CreatePodWatcher( + (eventType, pod) => + { + _logger.PodEventReceived(eventType); - try - { - _logger.StartingWatcher(); - watcher = WatchPodAsyncCore(cancellationToken); + lastEventReceivedTime = DateTime.UtcNow; + onPodEvent(eventType, pod); + }, + watcherCompletionSource.SetException, + watcherCompletionSource.SetResult, + watcherCancellation); + + using var timer = new Timer(_ => + { + if (watcherCts.IsCancellationRequested) + return; + + var timeSinceLastEvent = DateTime.UtcNow - lastEventReceivedTime; + if (timeSinceLastEvent > reconnectCheck) + { + _logger.RestartingPodWatcher(lastEventReceivedTime); + watcherCts.Cancel(); + watcher.Dispose(); + } + }, + state: null, + dueTime: reconnectCheck / 2, + period: reconnectCheck / 2); + + await watcherCompletionSource.Task; } catch (OperationCanceledException) { @@ -101,72 +111,34 @@ public async Task GetPodAsync(CancellationToken cancellationToken) } catch (Exception ex) { - await OnPodWatchErrorAsync(ex, retryDelay.Value, cancellationToken); - continue; + _logger.WatchingPodError(ex, retryDelay); + await Task.Delay(retryDelay, stoppingToken); } - - while (true) - { - try - { - if (!await watcher.MoveNextAsync()) - break; - } - catch (OperationCanceledException) - { - break; - } - catch (Exception ex) - { - await OnPodWatchErrorAsync(ex, retryDelay.Value, cancellationToken); - break; - } - - lastEventReceivedTime = DateTime.UtcNow; - yield return watcher.Current; - } - } - } - - private async ValueTask OnPodWatchErrorAsync(Exception ex, TimeSpan retryDelay, CancellationToken cancellation) - { - if (cancellation.IsCancellationRequested) - return; - - _logger.WatchingPodError(ex, retryDelay); - - try - { - await Task.Delay(retryDelay, cancellation); - } - catch (OperationCanceledException) - { - return; } } - private IAsyncEnumerator<(WatchEventType, V1Pod)> WatchPodAsyncCore(CancellationToken cancellationToken) + private Watcher CreatePodWatcher(Action onPodEvent, Action onError, Action onClosed, CancellationToken cancellationToken) { - var watcher = KubernetesClient.CoreV1.ListNamespacedPodWithHttpMessagesAsync( + return KubernetesClient.CoreV1.ListNamespacedPodWithHttpMessagesAsync( namespaceParameter: PodNamespace, fieldSelector: $"metadata.name={PodName}", watch: true, cancellationToken: cancellationToken - ).WatchAsync( + ).Watch( + onEvent: onPodEvent, onError: ex => { if (ex is KubernetesException kubernetesError && - string.Equals(kubernetesError.Status.Reason, "Expired", StringComparison.Ordinal)) + string.Equals(kubernetesError.Status.Reason, "Expired", StringComparison.OrdinalIgnoreCase)) { + onError(ex); throw ex; } _logger.IgnoringWatchingPodError(ex); }, - cancellationToken: cancellationToken + onClosed: onClosed ); - - return watcher.GetAsyncEnumerator(cancellationToken); } public async Task PatchPodAsync(Action configureBuilder) diff --git a/src/LiveStreamingServerNet.KubernetesPod/Internal/Services/PodLifetimeManager.cs b/src/LiveStreamingServerNet.KubernetesPod/Internal/Services/PodLifetimeManager.cs index efc56453..5c6179ce 100644 --- a/src/LiveStreamingServerNet.KubernetesPod/Internal/Services/PodLifetimeManager.cs +++ b/src/LiveStreamingServerNet.KubernetesPod/Internal/Services/PodLifetimeManager.cs @@ -33,7 +33,7 @@ public PodLifetimeManager( _streamsLimit = int.MaxValue; } - public async ValueTask ReconcileAsync(IReadOnlyDictionary labels, IReadOnlyDictionary annotations) + public async ValueTask ReconcileAsync(IReadOnlyDictionary labels, IReadOnlyDictionary annotations, CancellationToken cancellation) { if (!labels.TryGetValue(PodConstants.PendingStopLabel, out var isPendingStopStr)) {