Skip to content

Commit

Permalink
implement ResiliencePipeline when using `IsLanguageForgeProjectDataLo…
Browse files Browse the repository at this point in the history
…ader` (#1097)

in case of a failure, the server will wait 60 seconds before trying again. Reduced connection timeouts to 5 seconds from 30.
  • Loading branch information
hahn-kev authored Oct 7, 2024
1 parent 6fd51ce commit 4a029e7
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,101 @@
using LfClassicData;
using MongoDB.Driver;
using MongoDB.Driver.Linq;
using Polly;
using Polly.CircuitBreaker;
using Polly.Fallback;
using Polly.Registry;

namespace LexBoxApi.GraphQL.CustomTypes;

public class IsLanguageForgeProjectDataLoader : BatchDataLoader<string, bool>, IIsLanguageForgeProjectDataLoader
{
public const string ResiliencePolicyName = "IsLanguageForgeProjectDataLoader";
private readonly SystemDbContext _systemDbContext;
private readonly ResiliencePipeline<IReadOnlyDictionary<string, bool>> _resiliencePipeline;

public IsLanguageForgeProjectDataLoader(
SystemDbContext systemDbContext,
IBatchScheduler batchScheduler,
[FromKeyedServices(ResiliencePolicyName)]
ResiliencePipeline<IReadOnlyDictionary<string, bool>> resiliencePipeline,
DataLoaderOptions? options = null)
: base(batchScheduler, options)
{
_resiliencePipeline = resiliencePipeline;
_systemDbContext = systemDbContext;
}

protected override async Task<IReadOnlyDictionary<string, bool>> LoadBatchAsync(
IReadOnlyList<string> projectCodes,
CancellationToken cancellationToken)
{
return await MongoExtensions.ToAsyncEnumerable(_systemDbContext.Projects.AsQueryable()
.Select(p => p.ProjectCode)
.Where(projectCode => projectCodes.Contains(projectCode)))
.ToDictionaryAsync(projectCode => projectCode, _ => true, cancellationToken);
return await FaultTolerantLoadBatch(projectCodes, cancellationToken);
}

private async ValueTask<IReadOnlyDictionary<string, bool>> FaultTolerantLoadBatch(
IReadOnlyList<string> projectCodes,
CancellationToken cancellationToken)
{
ResilienceContext context = ResilienceContextPool.Shared.Get(cancellationToken);
context.Properties.Set(ProjectCodesKey, projectCodes);
try
{
return await _resiliencePipeline.ExecuteAsync(
static async (context, state) => await LoadBatch(state,
context.Properties.GetValue(ProjectCodesKey, []),
context.CancellationToken),
context,
this);
}
finally
{
ResilienceContextPool.Shared.Return(context);
}
}

private static async Task<Dictionary<string, bool>> LoadBatch(IsLanguageForgeProjectDataLoader loader,
IReadOnlyList<string> list,
CancellationToken token)
{
return await MongoExtensions.ToAsyncEnumerable(loader._systemDbContext.Projects.AsQueryable()
.Select(p => p.ProjectCode)
.Where(projectCode => list.Contains(projectCode)))

Check warning on line 64 in backend/LexBoxApi/GraphQL/CustomTypes/IsLanguageForgeProjectDataLoader.cs

View workflow job for this annotation

GitHub Actions / Integration tests (ubuntu-latest, 6) / Dotnet tests on ubuntu-latest for Mercurial 6 on staging

Method referencing lambda parameter is not supported LINQ expression. (https://www.mongodb.com/docs/mongodb-analyzer/current/rules/#MALinq2001)

Check warning on line 64 in backend/LexBoxApi/GraphQL/CustomTypes/IsLanguageForgeProjectDataLoader.cs

View workflow job for this annotation

GitHub Actions / Integration tests (ubuntu-latest, 3) / Dotnet tests on ubuntu-latest for Mercurial 3 on staging

Method referencing lambda parameter is not supported LINQ expression. (https://www.mongodb.com/docs/mongodb-analyzer/current/rules/#MALinq2001)

Check warning on line 64 in backend/LexBoxApi/GraphQL/CustomTypes/IsLanguageForgeProjectDataLoader.cs

View workflow job for this annotation

GitHub Actions / Integration tests (windows-latest, 6) / Dotnet tests on windows-latest for Mercurial 6 on staging

Method referencing lambda parameter is not supported LINQ expression. (https://www.mongodb.com/docs/mongodb-analyzer/current/rules/#MALinq2001)

Check warning on line 64 in backend/LexBoxApi/GraphQL/CustomTypes/IsLanguageForgeProjectDataLoader.cs

View workflow job for this annotation

GitHub Actions / Integration tests (windows-latest, 3) / Dotnet tests on windows-latest for Mercurial 3 on staging

Method referencing lambda parameter is not supported LINQ expression. (https://www.mongodb.com/docs/mongodb-analyzer/current/rules/#MALinq2001)
.ToDictionaryAsync(projectCode => projectCode, _ => true, token);
}


public static readonly ResiliencePropertyKey<IReadOnlyList<string>> ProjectCodesKey = new("project-codes");

public static ResiliencePipelineBuilder<IReadOnlyDictionary<string, bool>> ConfigureResiliencePipeline(
ResiliencePipelineBuilder<IReadOnlyDictionary<string, bool>> builder, TimeSpan circuitBreakerDuration)
{
var circuitBreakerStrategyOptions = new CircuitBreakerStrategyOptions<IReadOnlyDictionary<string, bool>>
{
//docs https://www.pollydocs.org/strategies/circuit-breaker.html
Name = "IsLanguageForgeProjectDataLoaderCircuitBreaker",
MinimumThroughput = 2,//must be at least 2
BreakDuration = circuitBreakerDuration,
//window in which the minimum throughput can be reached.
//ff there is only 1 failure in an hour, then the circuit will not break,
//but the moment there is a second failure then it will break immediately.
SamplingDuration = TimeSpan.FromHours(1),
};
var fallbackStrategyOptions = new FallbackStrategyOptions<IReadOnlyDictionary<string, bool>>()
{
//docs https://www.pollydocs.org/strategies/fallback.html
Name = "IsLanguageForgeProjectDataLoaderFallback",
FallbackAction = arguments =>
{
IReadOnlyDictionary<string, bool> emptyResult = arguments.Context.Properties
.GetValue(ProjectCodesKey, []).ToDictionary(pc => pc, _ => false);
return Outcome.FromResultAsValueTask(emptyResult);
}
};
builder
.AddFallback(fallbackStrategyOptions)
.AddCircuitBreaker(circuitBreakerStrategyOptions)
;
return builder;
}
}
2 changes: 2 additions & 0 deletions backend/LexBoxApi/LexBoxApi.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
<PackageReference Include="OpenTelemetry.Instrumentation.Process" Version="0.5.0-beta.3" />
<PackageReference Include="OpenTelemetry.Instrumentation.Quartz" Version="1.0.0-beta.1" />
<PackageReference Include="OpenTelemetry.Instrumentation.Runtime" Version="1.9.0" />
<PackageReference Include="Polly" Version="8.4.2" />
<PackageReference Include="Polly.Extensions" Version="8.4.2" />
<PackageReference Include="Quartz.AspNetCore" Version="3.13.0" />
<PackageReference Include="Quartz.Serialization.SystemTextJson" Version="3.13.0" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.7.3" />
Expand Down
8 changes: 8 additions & 0 deletions backend/LexBoxApi/LexBoxKernel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
using LexCore.Config;
using LexCore.ServiceInterfaces;
using LexSyncReverseProxy;
using LfClassicData;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Options;
using Polly;
using Swashbuckle.AspNetCore.Swagger;

namespace LexBoxApi;
Expand Down Expand Up @@ -55,6 +58,11 @@ public static void AddLexBoxApi(this IServiceCollection services,
services.AddHostedService<HgService>();
services.AddTransient<HgWebHealthCheck>();
services.AddScoped<IIsLanguageForgeProjectDataLoader, IsLanguageForgeProjectDataLoader>();
services.AddResiliencePipeline<string, IReadOnlyDictionary<string, bool>>(IsLanguageForgeProjectDataLoader.ResiliencePolicyName, (builder, context) =>
{
builder.ConfigureTelemetry(context.ServiceProvider.GetRequiredService<ILoggerFactory>());
IsLanguageForgeProjectDataLoader.ConfigureResiliencePipeline(builder, context.ServiceProvider.GetRequiredService<IOptions<LfClassicConfig>>().Value.IsLfProjectConnectionRetryTimeout);
});
services.AddScoped<ILexProxyService, LexProxyService>();
services.AddSingleton<ISendReceiveService, SendReceiveService>();
services.AddSingleton<LexboxLinkGenerator>();
Expand Down
5 changes: 4 additions & 1 deletion backend/LexBoxApi/appsettings.Development.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
"ConnectionString": "mongodb://localhost:27017",
"AuthSource": "admin",
"Username": "admin",
"Password": "pass"
"Password": "pass",
"ServerSelectionTimeout": "00:00:01",
"ConnectTimeout": "00:00:01",
"IsLfProjectConnectionRetryTimeout": "00:10:00"
},
"ForwardedHeadersOptions": {
"KnownNetworks": [
Expand Down
15 changes: 8 additions & 7 deletions backend/LfClassicData/DataServiceKernel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,25 @@ public static void AddLanguageForgeClassicMiniLcm(this IServiceCollection servic
services.AddSingleton(BuildMongoClientSettings);
services.AddSingleton(provider => new MongoClient(provider.GetRequiredService<MongoClientSettings>()));
services.AddSingleton<LfClassicLexboxApiProvider>();

services.AddSingleton<SystemDbContext>();
services.AddSingleton<ProjectDbContext>();
}

public static MongoClientSettings BuildMongoClientSettings(IServiceProvider provider)
{
var config = provider.GetRequiredService<IOptions<LfClassicConfig>>();
var mongoSettings = MongoClientSettings.FromConnectionString(config.Value.ConnectionString);
if (config.Value.HasCredentials)
var config = provider.GetRequiredService<IOptions<LfClassicConfig>>().Value;
var mongoSettings = MongoClientSettings.FromConnectionString(config.ConnectionString);
if (config.HasCredentials)
{
mongoSettings.Credential = MongoCredential.CreateCredential(
databaseName: config.Value.AuthSource,
username: config.Value.Username,
password: config.Value.Password
databaseName: config.AuthSource,
username: config.Username,
password: config.Password
);
}
mongoSettings.LoggingSettings = new LoggingSettings(provider.GetRequiredService<ILoggerFactory>());
mongoSettings.ConnectTimeout = config.ConnectTimeout;
mongoSettings.ServerSelectionTimeout = config.ServerSelectionTimeout;
mongoSettings.ClusterConfigurator = cb =>
cb.Subscribe(new DiagnosticsActivityEventSubscriber(new() { CaptureCommandText = true }));
return mongoSettings;
Expand Down
6 changes: 6 additions & 0 deletions backend/LfClassicData/LfClassicConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,11 @@ public class LfClassicConfig
public string? AuthSource { get; set; }
public string? Username { get; set; }
public string? Password { get; set; }
public TimeSpan ConnectTimeout { get; set; } = TimeSpan.FromSeconds(5);
public TimeSpan ServerSelectionTimeout { get; set; } = TimeSpan.FromSeconds(5);
/// <summary>
/// how long to wait before trying to determine if a project is an LF project after a failure
/// </summary>
public TimeSpan IsLfProjectConnectionRetryTimeout { get; set; } = TimeSpan.FromSeconds(60);
public bool HasCredentials => AuthSource is not null && Username is not null && Password is not null;
}
89 changes: 89 additions & 0 deletions backend/Testing/Services/IsLanguageForgeProjectDataLoaderTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
using LexBoxApi.GraphQL.CustomTypes;
using Microsoft.Extensions.Time.Testing;
using Polly;
using Shouldly;

namespace Testing.Services;

public class IsLanguageForgeProjectDataLoaderTests
{
private readonly FakeTimeProvider _timeProvider = new();
private readonly ResiliencePipeline<IReadOnlyDictionary<string, bool>> _pipeline;
private static readonly TimeSpan BreakDuration = TimeSpan.FromSeconds(60);

public IsLanguageForgeProjectDataLoaderTests()
{
_pipeline = IsLanguageForgeProjectDataLoader.ConfigureResiliencePipeline(new() { TimeProvider = _timeProvider }, BreakDuration)
.Build();
}

private ValueTask<Outcome<Dictionary<string, bool>>> Execute(Exception? exception = null)
{
ResilienceContext context = ResilienceContextPool.Shared.Get();
context.Properties.Set(IsLanguageForgeProjectDataLoader.ProjectCodesKey, new[] { "test" });
return _pipeline.ExecuteOutcomeAsync((context, state) =>
{
if (exception is not null)
{
return Outcome.FromExceptionAsValueTask<Dictionary<string, bool>>(exception);
}
return Outcome.FromResultAsValueTask(new Dictionary<string, bool>() { { "test", true } });
},
context,
this);
}

private void VerifyEmptyResult(Outcome<Dictionary<string, bool>> result)
{
result.Exception.ShouldBeNull();
result.Result.ShouldBe(new Dictionary<string, bool>() { { "test", false } });
}

private void VerifySuccessResult(Outcome<Dictionary<string, bool>> result)
{
result.Exception.ShouldBeNull();
result.Result.ShouldBe(new Dictionary<string, bool>() { { "test", true } });
}

[Fact]
public async Task ResiliencePipelineWorksFine()
{
var result = await Execute();
VerifySuccessResult(result);
}

[Fact]
public async Task ResiliencePipelineReturnsEmptyResultWhenExceptionIsThrown()
{
var result = await Execute(new Exception("test"));
VerifyEmptyResult(result);
}

[Fact]
public async Task CircuitBreaksAfter2Failures()
{
for (int i = 0; i < 3; i++)
{
await Execute(new Exception("test"));
_timeProvider.Advance(TimeSpan.FromSeconds(21));
}
//the circuit is open, now the fallback should be used
var result = await Execute();
VerifyEmptyResult(result);
}

[Fact]
public async Task CircuitBreaksAndReOpensAfterTimeout()
{
for (int i = 0; i < 3; i++)
{
await Execute(new Exception("test"));
_timeProvider.Advance(TimeSpan.FromSeconds(21));
}
//the circuit is open, now the fallback should be used
VerifyEmptyResult(await Execute());
_timeProvider.Advance(BreakDuration + TimeSpan.FromSeconds(1));
VerifySuccessResult(await Execute());
}
}
1 change: 1 addition & 0 deletions backend/Testing/Testing.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
</Choose>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Http.Resilience" Version="8.8.0" />
<PackageReference Include="Microsoft.Extensions.TimeProvider.Testing" Version="8.9.1" />
<PackageReference Include="Squidex.Assets.TusClient" Version="6.6.4" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="8.0.0" />
Expand Down

0 comments on commit 4a029e7

Please sign in to comment.