Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion docs/features/kubernetes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ The example here shows a typical configuration:
}
}

Service deployment in ``Dev`` namespace, and discovery provider type is ``Kube``, you also can set :ref:`k8s-pollkube-provider` type.
Service deployment in ``Dev`` namespace, and discovery provider type is ``Kube``, you also can set :ref:`k8s-pollkube-provider` or :ref:`k8s-watchkube-provider` type.

**Note 1**: ``Scheme``, ``Host``, ``Port``, and ``Token`` are not used if ``usePodServiceAccount`` is true when `KubeClient`_ is created from a pod service account.
Please refer to the :ref:`k8s-install` section for technical details.
Expand Down Expand Up @@ -236,6 +236,22 @@ The polling interval is in milliseconds and tells Ocelot how often to call Kuber
We doubt it will matter for most people and polling may give a tiny performance improvement over calling Kubernetes per request.
There is no way for Ocelot to work these out for you, except perhaps through a `discussion <https://github.com/ThreeMammals/Ocelot/discussions>`_.

.. _k8s-watchkube-provider:

WatchKube provider
^^^^^^^^^^^^^^^^^^

This option utilizes Kubernetes API `watch requests <https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes>`_ for fetching service configuration.
Essentially it means that there will be one streamed http connection with kube-api per downstream service.
Changes streamed by this connection will be used for updating available endpoints list.

.. code-block:: json

"ServiceDiscoveryProvider": {
"Namespace": "dev",
"Type": "WatchKube"
}

Global vs Route levels
^^^^^^^^^^^^^^^^^^^^^^

Expand Down
32 changes: 27 additions & 5 deletions src/Ocelot.Provider.Kubernetes/EndPointClientV1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,49 @@ namespace Ocelot.Provider.Kubernetes;

public class EndPointClientV1 : KubeResourceClient, IEndPointClient
{
private static readonly HttpRequest Collection = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}");
private static readonly HttpRequest EndpointsRequest =
KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}");

private static readonly HttpRequest EndpointsWatchRequest =
KubeRequest.Create("api/v1/watch/namespaces/{Namespace}/endpoints/{ServiceName}");

public EndPointClientV1(IKubeApiClient client) : base(client)
{
}

public Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default)
public Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace = null,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(serviceName))
{
throw new ArgumentNullException(nameof(serviceName));
}

var request = Collection
var request = EndpointsRequest
.WithTemplateParameters(new
{
Namespace = kubeNamespace ?? KubeClient.DefaultNamespace,
ServiceName = serviceName,
Namespace = kubeNamespace ?? KubeClient.DefaultNamespace, ServiceName = serviceName,
});

return Http.GetAsync(request, cancellationToken)
.ReadContentAsObjectV1Async<EndpointsV1>(operationDescription: $"get {nameof(EndpointsV1)}");
}

public IObservable<IResourceEventV1<EndpointsV1>> Watch(string serviceName, string kubeNamespace,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(serviceName))
{
throw new ArgumentNullException(nameof(serviceName));
}

return ObserveEvents<EndpointsV1>(
EndpointsWatchRequest.WithTemplateParameters(new
{
ServiceName = serviceName,
Namespace = kubeNamespace ?? KubeClient.DefaultNamespace,
}),
"watch v1/Endpoints '" + serviceName + "' in namespace " +
(kubeNamespace ?? KubeClient.DefaultNamespace));
}
}
2 changes: 2 additions & 0 deletions src/Ocelot.Provider.Kubernetes/Interfaces/IEndPointClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ namespace Ocelot.Provider.Kubernetes.Interfaces;
public interface IEndPointClient : IKubeResourceClient
{
Task<EndpointsV1> GetAsync(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default);

IObservable<IResourceEventV1<EndpointsV1>> Watch(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default);
}
2 changes: 1 addition & 1 deletion src/Ocelot.Provider.Kubernetes/Kube.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private async Task<EndpointsV1> GetEndpoint()
try
{
return await _kubeApi
.ResourceClient<IEndPointClient>(client => new EndPointClientV1(client))
.EndpointsV1()
.GetAsync(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace);
}
catch (KubeApiException ex)
Expand Down
9 changes: 9 additions & 0 deletions src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using Ocelot.Provider.Kubernetes.Interfaces;

namespace Ocelot.Provider.Kubernetes;

public static class KubeApiClientExtensions
{
public static IEndPointClient EndpointsV1(this IKubeApiClient client)
=> client.ResourceClient<IEndPointClient>(x => new EndPointClientV1(x));
}
13 changes: 11 additions & 2 deletions src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Ocelot.Configuration;
using Ocelot.Logging;
using Ocelot.Provider.Kubernetes.Interfaces;
using System.Reactive.Concurrency;

namespace Ocelot.Provider.Kubernetes;

Expand All @@ -10,9 +11,12 @@ public static class KubernetesProviderFactory // TODO : IServiceDiscoveryProvide
/// <summary>String constant used for provider type definition.</summary>
public const string PollKube = nameof(Kubernetes.PollKube);

public const string WatchKube = nameof(Kubernetes.WatchKube);

public static ServiceDiscoveryFinderDelegate Get { get; } = CreateProvider;

private static IServiceDiscoveryProvider CreateProvider(IServiceProvider provider, ServiceProviderConfiguration config, DownstreamRoute route)
private static IServiceDiscoveryProvider CreateProvider(IServiceProvider provider,
ServiceProviderConfiguration config, DownstreamRoute route)
{
var factory = provider.GetService<IOcelotLoggerFactory>();
var kubeClient = provider.GetService<IKubeApiClient>();
Expand All @@ -21,10 +25,15 @@ private static IServiceDiscoveryProvider CreateProvider(IServiceProvider provide
var configuration = new KubeRegistryConfiguration
{
KeyOfServiceInK8s = route.ServiceName,
KubeNamespace = string.IsNullOrEmpty(route.ServiceNamespace) ? config.Namespace : route.ServiceNamespace,
KubeNamespace = string.IsNullOrEmpty(route.ServiceNamespace)
? config.Namespace
: route.ServiceNamespace,
Scheme = route.DownstreamScheme,
};

if (WatchKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase))
return new WatchKube(configuration, factory, kubeClient, serviceBuilder, Scheduler.Default);

var defaultK8sProvider = new Kube(configuration, factory, kubeClient, serviceBuilder);

return PollKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase)
Expand Down
26 changes: 26 additions & 0 deletions src/Ocelot.Provider.Kubernetes/ObservableExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System.Reactive.Concurrency;
using System.Reactive.Linq;

namespace Ocelot.Provider.Kubernetes;

public static class ObservableExtensions
{
public static IObservable<TSource> RetryAfter<TSource>(this IObservable<TSource> source,
TimeSpan dueTime,
IScheduler scheduler)
{
return RepeatInfinite(source, dueTime, scheduler).Catch();
}

private static IEnumerable<IObservable<TSource>> RepeatInfinite<TSource>(IObservable<TSource> source,
TimeSpan dueTime,
IScheduler scheduler)
{
yield return source;

while (true)
{
yield return source.DelaySubscription(dueTime, scheduler);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,7 @@
<ItemGroup>
<ProjectReference Include="..\Ocelot\Ocelot.csproj" />
</ItemGroup>
</Project>
<ItemGroup>
<InternalsVisibleTo Include="Ocelot.UnitTests" />
</ItemGroup>
</Project>
88 changes: 88 additions & 0 deletions src/Ocelot.Provider.Kubernetes/WatchKube.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using KubeClient.Models;
using Ocelot.Logging;
using Ocelot.Provider.Kubernetes.Interfaces;
using Ocelot.Values;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

namespace Ocelot.Provider.Kubernetes;

public class WatchKube : IServiceDiscoveryProvider, IDisposable
{
internal const int FailedSubscriptionRetrySeconds = 5;
internal const int FirstResultsFetchingTimeoutSeconds = 3;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 5? Why 3? Why internal?
How will the user be able to configure this?

Copy link
Contributor Author

@kick2nick kick2nick Jun 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made internal for reference from Tests (with InternalsVisibleTo attribute). Numbers are made up =) Didn't want to make configuration complicated. We can move it to configuration, but I think FailedSubscriptionRetrySeconds should stay as const, because KubeClient already uses retries. Our retry was added for really unexpected errors.

Copy link
Member

@raman-m raman-m Jun 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no objection to the Retry logic However, my concern lies with the unconfigurable numbers. I believe that a configuration design can be deferred for now and introduced at a later stage, in a separate pull request.
An acceptable compromise would be to enable configuration directly in C# code.

Next step 👇

Therefore, I propose converting the internal constants into public static properties, allowing developers to set custom values for advanced user scenarios directly in C# code, given that configuration properties are not implemented now to be defined in ocelot.json, aka global ServiceDiscoveryProvider section.

The Q ❔

What I consider inefficient and worth discussing further is the Seconds dimension. Why not Milliseconds? As you may know, some deployed environments operate at the millisecond level — 10, 100, or 500 ms — clearly less than one second. For now, I propose decreasing the default values to one second as the minimum for definitions, but I believe that for highly rapid systems operating in milliseconds, one second won't significantly impact response time.
Ultimately, I would suggest that waiting 5 seconds between retries is excessive 😉
Do you have any use cases from your project, Kubernetes with the Ocelot environment in production, based on data from monitoring or traced logs?

2nd Q

The 2nd question: Do both properties depend on each other? It seems the second property of fetching the first result is merely a delay in obtaining the initial result from the Observable layer, correct? This delay does not appear to be related to the retry logic, is that right?
In case they are both dependent, we have to think carefully about their values ​​in seconds, avoiding any disruptions.

Copy link
Member

@raman-m raman-m Jun 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, my proposal is this commit → 061f018
However, I am uncertain about the following:

  • Returning an int type that represents the value of the TimeSpan.FromSeconds(double) argument: it has the double parameter
  • The default values of 5 and 3 seconds for the backing fields should be reduced, for instance, they could be set to 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Therefore, I propose converting the internal constants into public static properties, allowing developers to set custom values for advanced user scenarios directly in C# code

reasonable

What I consider inefficient and worth discussing further is the Seconds dimension. Why not Milliseconds?

10 ms for retries could create high pressure on k8s =) 10 ms = 100 rps. 100 rps * downstream services count * ocelot instances.
First result timeout is time that client can wait before first results fetching, and it can vary from couple milliseconds and up to defined timeout. So it's good candidate for configuration. Will try to find some numbers from prod env.

Do you have any use cases from your project, Kubernetes with the Ocelot environment in production, based on data from monitoring or traced logs

No, as I said KubeClient already retries subcription in case of network failures. You can think about retry logic introduced in this PR like "try -> catch -> retry all nested stuff inside KubeClient whatever it was under the hood". This 'whatever' could be bug or anything else in KubeClient logic.

This delay does not appear to be related to the retry logic, is that right?

Right, there is no dependency between properties. First results is important part, as far as discovery provider instantiated during first call.

So, my proposal is this commit.

Agree with your proposal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I have assigned def values (both to 1 second) in commit 28a3afe 💡

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 ms for retries could create high pressure on k8s =) 10 ms = 100 rps. 100 rps * downstream services count * ocelot instances.
First result timeout is time that client can wait before first results fetching, and it can vary from couple milliseconds and up to defined timeout. So it's good candidate for configuration. Will try to find some numbers from prod env.

It appears this analysis is taking some time, and I am unable to wait any longer: the PR is ready to merge.
If there are further proposals for improvement, we will incorporate them into subsequent PRs, OK?


private readonly KubeRegistryConfiguration _configuration;
private readonly IOcelotLogger _logger;
private readonly IKubeApiClient _kubeApi;
private readonly IKubeServiceBuilder _serviceBuilder;
private readonly IScheduler _scheduler;

private readonly IDisposable _subscription;
private readonly TaskCompletionSource _firstResultsCompletionSource;

private List<Service> _services = new();

public WatchKube(
KubeRegistryConfiguration configuration,
IOcelotLoggerFactory factory,
IKubeApiClient kubeApi,
IKubeServiceBuilder serviceBuilder,
IScheduler scheduler)
{
_configuration = configuration;
_logger = factory.CreateLogger<WatchKube>();
_kubeApi = kubeApi;
_serviceBuilder = serviceBuilder;
_scheduler = scheduler;

_firstResultsCompletionSource = new TaskCompletionSource();
SetFirstResultsCompletedAfterDelay();
_subscription = CreateSubscription();
}

public virtual async Task<List<Service>> GetAsync()
{
// wait for first results fetching
await _firstResultsCompletionSource.Task;

if (_services is not { Count: > 0 })
{
_logger.LogWarning(() => GetMessage("Subscription to service endpoints gave no results!"));
}

return _services;
}

private void SetFirstResultsCompletedAfterDelay() => Observable
.Timer(TimeSpan.FromSeconds(FirstResultsFetchingTimeoutSeconds), _scheduler)
.Subscribe(_ => _firstResultsCompletionSource.TrySetResult());

private IDisposable CreateSubscription() =>
_kubeApi
.EndpointsV1()
.Watch(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace)
.Do(_ => { }, ex => _logger.LogError(() => GetMessage("Endpoints subscription error occured."), ex))
.RetryAfter(TimeSpan.FromSeconds(FailedSubscriptionRetrySeconds), _scheduler)
.Subscribe(
onNext: endpointEvent =>
{
_services = endpointEvent.EventType switch
{
ResourceEventType.Deleted or ResourceEventType.Error => new(),
_ when (endpointEvent.Resource?.Subsets?.Count ?? 0) == 0 => new(),
_ => _serviceBuilder.BuildServices(_configuration, endpointEvent.Resource).ToList(),
};
_firstResultsCompletionSource.TrySetResult();
},
onCompleted: () =>
{
// called only when subscription canceled in Dispose
_logger.LogInformation(() => GetMessage("Subscription to service endpoints completed"));
});

private string GetMessage(string message)
=> $"{nameof(WatchKube)} provider. Namespace:{_configuration.KubeNamespace}, Service:{_configuration.KeyOfServiceInK8s}; {message}";

public void Dispose() => _subscription.Dispose();
}
13 changes: 12 additions & 1 deletion test/Ocelot.AcceptanceTests/ConcurrentSteps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,19 @@ private async Task GetParallelResponse(string url, int threadIndex)

public void ThenAllStatusCodesShouldBe(HttpStatusCode expected)
=> _responses.ShouldAllBe(response => response.Value.StatusCode == expected);

public void ThenAllResponseBodiesShouldBe(string expectedBody)
=> _responses.ShouldAllBe(response => response.Value.Content.ReadAsStringAsync().Result == expectedBody);
{
foreach (var r in _responses)
{
var content = r.Value.Content.ReadAsStringAsync().Result;
content = content?.Contains(':') == true
? content.Split(':')[1] // remove counter for body comparison
: "0";

content.ShouldBe(expectedBody);
}
}

protected string CalledTimesMessage()
=> $"All values are [{string.Join(',', _counters)}]";
Expand Down
Loading
Loading