Skip to content

Commit

Permalink
Fix #464 - add lock lifetime for all
Browse files Browse the repository at this point in the history
* Add HTTP timeout
* Make adjustable through options
* Will need to delete all locks from MongoDB - otherwise will endlessly loop for startup
* Fix some ci-e2e issues
  • Loading branch information
johnml1135 committed Sep 6, 2024
1 parent 4382410 commit 1d4032d
Show file tree
Hide file tree
Showing 25 changed files with 145 additions and 62 deletions.
14 changes: 12 additions & 2 deletions .github/workflows/ci-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,15 @@ jobs:
- name: Install regctl
uses: iarekylew00t/regctl-installer@v1

- name: Getr Version of Machine.py
run: echo "MACHINE_PY_IMAGE=ghcr.io/sillsdev/machine.py:$(regctl image config ghcr.io/sillsdev/machine.py | jq -r ".config.Labels[\"org.opencontainers.image.version\"]")" >> $GITHUB_ENV
- name: Set proper version of Machine.py
run: |
export MACHINE_PY_IMAGE=ghcr.io/sillsdev/machine.py:$(regctl image config ghcr.io/sillsdev/machine.py | jq -r ".config.Labels[\"org.opencontainers.image.version\"]") && \
echo "MACHINE_PY_IMAGE=$MACHINE_PY_IMAGE" >> $GITHUB_ENV && \
echo "MACHINE_PY_CPU_IMAGE=$MACHINE_PY_IMAGE.cpu_only" >> $GITHUB_ENV
- name: Confirm proper version of Machine.py
run: |
echo $MACHINE_PY_IMAGE $MACHINE_PY_CPU_IMAGE
- name: Setup .NET
uses: actions/setup-dotnet@v3
Expand All @@ -50,6 +57,9 @@ jobs:
- name: Test
run: dotnet test --no-build --verbosity normal --filter "TestCategory!=slow&TestCategory=E2E" --collect:"Xplat Code Coverage"

- name: Debug network again
run: docker ps -a && docker logs --since 10m serval_cntr && docker logs --since 10m echo_cntr && docker logs --since 10m machine-engine-cntr && docker logs --since 10m serval-mongo-1 && docker logs --since 10m machine-job-cntr

- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
env:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ public class BuildJobOptions
public const string Key = "BuildJob";

public IList<ClearMLBuildQueue> ClearML { get; set; } = new List<ClearMLBuildQueue>();
public TimeSpan PostProcessLockLifetime { get; set; } = TimeSpan.FromSeconds(120);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Serval.Machine.Shared.Configuration;

public class DistributedReaderWriterLockOptions
{
public const string Key = "DistributedReaderWriterLock";

public TimeSpan DefaultLifetime { get; set; } = TimeSpan.FromSeconds(56); // must be less than DefaultHttpRequestTimeout
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,24 @@ public static IMachineBuilder AddClearMLOptions(this IMachineBuilder builder, IC
return builder;
}

public static IMachineBuilder AddDistributedReaderWriterLockOptions(
this IMachineBuilder build,
Action<DistributedReaderWriterLockOptions> configureOptions
)
{
build.Services.Configure(configureOptions);
return build;
}

public static IMachineBuilder AddDistributedReaderWriterLockOptions(
this IMachineBuilder build,
IConfiguration config
)
{
build.Services.Configure<DistributedReaderWriterLockOptions>(config);
return build;
}

public static IMachineBuilder AddMessageOutboxOptions(
this IMachineBuilder builder,
Action<MessageOutboxOptions> configureOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public static IMachineBuilder AddMachine(this IServiceCollection services, IConf
builder.AddSharedFileOptions(o => { });
builder.AddSmtTransferEngineOptions(o => { });
builder.AddClearMLOptions(o => { });
builder.AddDistributedReaderWriterLockOptions(o => { });
builder.AddBuildJobOptions(o => { });
builder.AddMessageOutboxOptions(o => { });
}
Expand All @@ -37,6 +38,9 @@ public static IMachineBuilder AddMachine(this IServiceCollection services, IConf
builder.AddSharedFileOptions(configuration.GetSection(SharedFileOptions.Key));
builder.AddSmtTransferEngineOptions(configuration.GetSection(SmtTransferEngineOptions.Key));
builder.AddClearMLOptions(configuration.GetSection(ClearMLOptions.Key));
builder.AddDistributedReaderWriterLockOptions(
configuration.GetSection(DistributedReaderWriterLockOptions.Key)
);
builder.AddBuildJobOptions(configuration.GetSection(BuildJobOptions.Key));
builder.AddMessageOutboxOptions(configuration.GetSection(MessageOutboxOptions.Key));
}
Expand Down
3 changes: 2 additions & 1 deletion src/Machine/src/Serval.Machine.Shared/Models/Lock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
public record Lock
{
public required string Id { get; init; }
public DateTime? ExpiresAt { get; init; }

public DateTime ExpiresAt { get; init; }
public required string HostId { get; init; }
}
7 changes: 3 additions & 4 deletions src/Machine/src/Serval.Machine.Shared/Models/RWLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ public record RWLock : IEntity
public bool IsAvailableForReading()
{
var now = DateTime.UtcNow;
return (WriterLock is null || WriterLock.ExpiresAt is not null && WriterLock.ExpiresAt <= now)
&& WriterQueue.Count == 0;
return (WriterLock is null || WriterLock.ExpiresAt <= now) && WriterQueue.Count == 0;
}

public bool IsAvailableForWriting(string? lockId = null)
{
var now = DateTime.UtcNow;
return (WriterLock is null || WriterLock.ExpiresAt is not null && WriterLock.ExpiresAt <= now)
&& !ReaderLocks.Any(l => l.ExpiresAt is null || l.ExpiresAt > now)
return (WriterLock is null || WriterLock.ExpiresAt <= now)
&& !ReaderLocks.Any(l => l.ExpiresAt > now)
&& (lockId is null || WriterQueue.Count > 0 && WriterQueue[0].Id == lockId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
<ItemGroup>
<ProjectReference Include="..\..\..\DataAccess\src\SIL.DataAccess\SIL.DataAccess.csproj" />
<ProjectReference Include="..\..\..\Serval\src\Serval.Grpc\Serval.Grpc.csproj" />
<ProjectReference Include="..\..\..\Serval\src\Serval.Shared\Serval.Shared.csproj" />
<ProjectReference Include="..\..\..\..\..\machine\src\SIL.Machine\SIL.Machine.csproj" Condition="Exists('..\..\..\..\..\machine\src\SIL.Machine\SIL.Machine.csproj')" />
<ProjectReference Include="..\..\..\..\..\machine\src\SIL.Machine.Morphology.HermitCrab\SIL.Machine.Morphology.HermitCrab.csproj" Condition="Exists('..\..\..\..\..\machine\src\SIL.Machine.Morphology.HermitCrab\SIL.Machine.Morphology.HermitCrab.csproj')" />
<ProjectReference Include="..\..\..\..\..\machine\src\SIL.Machine.Translation.Thot\SIL.Machine.Translation.Thot.csproj" Condition="Exists('..\..\..\..\..\machine\src\SIL.Machine.Translation.Thot\SIL.Machine.Translation.Thot.csproj')" />
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
namespace Serval.Machine.Shared.Services;

public class DistributedReaderWriterLock(string hostId, IRepository<RWLock> locks, IIdGenerator idGenerator, string id)
: IDistributedReaderWriterLock
public class DistributedReaderWriterLock(
string hostId,
IRepository<RWLock> locks,
IIdGenerator idGenerator,
string id,
DistributedReaderWriterLockOptions lockOptions
) : IDistributedReaderWriterLock
{
private readonly string _hostId = hostId;
private readonly IRepository<RWLock> _locks = locks;
private readonly IIdGenerator _idGenerator = idGenerator;
private readonly string _id = id;
private readonly DistributedReaderWriterLockOptions _lockOptions = lockOptions;

public async Task<IAsyncDisposable> ReaderLockAsync(
TimeSpan? lifetime = default,
CancellationToken cancellationToken = default
)
{
string lockId = _idGenerator.GenerateId();
if (!await TryAcquireReaderLock(lockId, lifetime, cancellationToken))
TimeSpan resolvedLifetime = lifetime ?? _lockOptions.DefaultLifetime;
if (!await TryAcquireReaderLock(lockId, resolvedLifetime, cancellationToken))
{
using ISubscription<RWLock> sub = await _locks.SubscribeAsync(rwl => rwl.Id == _id, cancellationToken);
do
Expand All @@ -32,7 +39,7 @@ public async Task<IAsyncDisposable> ReaderLockAsync(
if (timeout != TimeSpan.Zero)
await sub.WaitForChangeAsync(timeout, cancellationToken);
}
} while (!await TryAcquireReaderLock(lockId, lifetime, cancellationToken));
} while (!await TryAcquireReaderLock(lockId, resolvedLifetime, cancellationToken));
}
return new ReaderLockReleaser(this, lockId);
}
Expand All @@ -43,11 +50,12 @@ public async Task<IAsyncDisposable> WriterLockAsync(
)
{
string lockId = _idGenerator.GenerateId();
if (!await TryAcquireWriterLock(lockId, lifetime, cancellationToken))
TimeSpan resolvedLifetime = lifetime ?? _lockOptions.DefaultLifetime;
if (!await TryAcquireWriterLock(lockId, resolvedLifetime, cancellationToken))
{
await _locks.UpdateAsync(
_id,
u => u.Add(rwl => rwl.WriterQueue, new Lock { Id = lockId, HostId = _hostId }),
u => u.Add(rwl => rwl.WriterQueue, new Lock { Id = lockId, HostId = _hostId, }),
cancellationToken: cancellationToken
);
try
Expand All @@ -58,12 +66,9 @@ await _locks.UpdateAsync(
RWLock? rwLock = sub.Change.Entity;
if (rwLock is not null && !rwLock.IsAvailableForWriting(lockId))
{
var dateTimes = rwLock
.ReaderLocks.Where(l => l.ExpiresAt.HasValue)
.Select(l => l.ExpiresAt.GetValueOrDefault())
.ToList();
var dateTimes = rwLock.ReaderLocks.Select(l => l.ExpiresAt).ToList();
if (rwLock.WriterLock?.ExpiresAt is not null)
dateTimes.Add(rwLock.WriterLock.ExpiresAt.Value);
dateTimes.Add(rwLock.WriterLock.ExpiresAt);
TimeSpan? timeout = default;
if (dateTimes.Count > 0)
{
Expand All @@ -74,7 +79,7 @@ await _locks.UpdateAsync(
if (timeout != TimeSpan.Zero)
await sub.WaitForChangeAsync(timeout, cancellationToken);
}
} while (!await TryAcquireWriterLock(lockId, lifetime, cancellationToken));
} while (!await TryAcquireWriterLock(lockId, resolvedLifetime, cancellationToken));
}
catch
{
Expand All @@ -89,17 +94,13 @@ await _locks.UpdateAsync(
return new WriterLockReleaser(this, lockId);
}

private async Task<bool> TryAcquireWriterLock(
string lockId,
TimeSpan? lifetime,
CancellationToken cancellationToken
)
private async Task<bool> TryAcquireWriterLock(string lockId, TimeSpan lifetime, CancellationToken cancellationToken)
{
var now = DateTime.UtcNow;
Expression<Func<RWLock, bool>> filter = rwl =>
rwl.Id == _id
&& (rwl.WriterLock == null || rwl.WriterLock.ExpiresAt != null && rwl.WriterLock.ExpiresAt <= now)
&& !rwl.ReaderLocks.Any(l => l.ExpiresAt == null || l.ExpiresAt > now)
&& (rwl.WriterLock == null || rwl.WriterLock.ExpiresAt <= now)
&& !rwl.ReaderLocks.Any(l => l.ExpiresAt > now)
&& (!rwl.WriterQueue.Any() || rwl.WriterQueue[0].Id == lockId);
void Update(IUpdateBuilder<RWLock> u)
{
Expand All @@ -108,7 +109,7 @@ void Update(IUpdateBuilder<RWLock> u)
new Lock
{
Id = lockId,
ExpiresAt = lifetime is null ? null : now + lifetime,
ExpiresAt = now + lifetime,
HostId = _hostId
}
);
Expand All @@ -118,25 +119,19 @@ void Update(IUpdateBuilder<RWLock> u)
return rwLock is not null;
}

private async Task<bool> TryAcquireReaderLock(
string lockId,
TimeSpan? lifetime,
CancellationToken cancellationToken
)
private async Task<bool> TryAcquireReaderLock(string lockId, TimeSpan lifetime, CancellationToken cancellationToken)
{
var now = DateTime.UtcNow;
Expression<Func<RWLock, bool>> filter = rwl =>
rwl.Id == _id
&& (rwl.WriterLock == null || rwl.WriterLock.ExpiresAt != null && rwl.WriterLock.ExpiresAt <= now)
&& !rwl.WriterQueue.Any();
rwl.Id == _id && (rwl.WriterLock == null || rwl.WriterLock.ExpiresAt <= now) && !rwl.WriterQueue.Any();
void Update(IUpdateBuilder<RWLock> u)
{
u.Add(
rwl => rwl.ReaderLocks,
new Lock
{
Id = lockId,
ExpiresAt = lifetime is null ? null : now + lifetime,
ExpiresAt = now + lifetime,
HostId = _hostId
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

public class DistributedReaderWriterLockFactory(
IOptions<ServiceOptions> serviceOptions,
IOptions<DistributedReaderWriterLockOptions> lockOptions,
IRepository<RWLock> locks,
IIdGenerator idGenerator
) : IDistributedReaderWriterLockFactory
{
private readonly ServiceOptions _serviceOptions = serviceOptions.Value;
private readonly DistributedReaderWriterLockOptions _lockOptions = lockOptions.Value;
private readonly IIdGenerator _idGenerator = idGenerator;
private readonly IRepository<RWLock> _locks = locks;

Expand Down Expand Up @@ -39,7 +41,7 @@ await _locks.InsertAsync(
// the lock is already made - no new one needs to be made
// This is done instead of checking if it exists first to prevent race conditions.
}
return new DistributedReaderWriterLock(_serviceOptions.ServiceId, _locks, _idGenerator, id);
return new DistributedReaderWriterLock(_serviceOptions.ServiceId, _locks, _idGenerator, id, _lockOptions);
}

public async Task<bool> DeleteAsync(string id, CancellationToken cancellationToken = default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ public class PostprocessBuildJob(
IDataAccessContext dataAccessContext,
IBuildJobService buildJobService,
ILogger<PostprocessBuildJob> logger,
ISharedFileService sharedFileService
ISharedFileService sharedFileService,
IOptionsMonitor<BuildJobOptions> buildJobOptions
) : HangfireBuildJob<(int, double)>(platformService, engines, lockFactory, dataAccessContext, buildJobService, logger)
{
protected ISharedFileService SharedFileService { get; } = sharedFileService;
private readonly BuildJobOptions _buildJobOptions = buildJobOptions.CurrentValue;

protected override async Task DoWorkAsync(
string engineId,
Expand All @@ -33,7 +35,12 @@ CancellationToken cancellationToken
await PlatformService.InsertPretranslationsAsync(engineId, pretranslationsStream, cancellationToken);
}

await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None))
await using (
await @lock.WriterLockAsync(
lifetime: _buildJobOptions.PostProcessLockLifetime,
cancellationToken: CancellationToken.None
)
)
{
await DataAccessContext.WithTransactionAsync(
async (ct) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ public class SmtTransferBuildJob(
IRepository<TrainSegmentPair> trainSegmentPairs,
ITruecaserFactory truecaserFactory,
ISmtModelFactory smtModelFactory,
ICorpusService corpusService
ICorpusService corpusService,
IOptions<BuildJobOptions> buildJobOptions
)
: HangfireBuildJob<IReadOnlyList<Corpus>>(
platformService,
Expand All @@ -25,6 +26,7 @@ ICorpusService corpusService
private readonly ITruecaserFactory _truecaserFactory = truecaserFactory;
private readonly ISmtModelFactory _smtModelFactory = smtModelFactory;
private readonly ICorpusService _corpusService = corpusService;
private readonly BuildJobOptions _buildJobOptions = buildJobOptions.Value;

protected override Task InitializeAsync(
string engineId,
Expand Down Expand Up @@ -110,7 +112,12 @@ CancellationToken cancellationToken
if (engine is null)
throw new OperationCanceledException();

await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
await using (
await @lock.WriterLockAsync(
lifetime: _buildJobOptions.PostProcessLockLifetime,
cancellationToken: cancellationToken
)
)
{
cancellationToken.ThrowIfCancellationRequested();
await smtModelTrainer.SaveAsync(CancellationToken.None);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ public class SmtTransferPostprocessBuildJob(
IBuildJobService buildJobService,
ILogger<SmtTransferPostprocessBuildJob> logger,
ISharedFileService sharedFileService,
IOptionsMonitor<BuildJobOptions> buildJobOptions,
IRepository<TrainSegmentPair> trainSegmentPairs,
ISmtModelFactory smtModelFactory,
ITruecaserFactory truecaserFactory,
IOptionsMonitor<SmtTransferEngineOptions> options
IOptionsMonitor<SmtTransferEngineOptions> engineOptions
)
: PostprocessBuildJob(
platformService,
Expand All @@ -20,13 +21,14 @@ IOptionsMonitor<SmtTransferEngineOptions> options
dataAccessContext,
buildJobService,
logger,
sharedFileService
sharedFileService,
buildJobOptions
)
{
private readonly ISmtModelFactory _smtModelFactory = smtModelFactory;
private readonly ITruecaserFactory _truecaserFactory = truecaserFactory;
private readonly IRepository<TrainSegmentPair> _trainSegmentPairs = trainSegmentPairs;
private readonly IOptionsMonitor<SmtTransferEngineOptions> _options = options;
private readonly IOptionsMonitor<SmtTransferEngineOptions> _engineOptions = engineOptions;

protected override async Task<int> SaveModelAsync(string engineId, string buildId)
{
Expand All @@ -38,7 +40,7 @@ protected override async Task<int> SaveModelAsync(string engineId, string buildI
)
{
await _smtModelFactory.UpdateEngineFromAsync(
Path.Combine(_options.CurrentValue.EnginesDir, engineId),
Path.Combine(_engineOptions.CurrentValue.EnginesDir, engineId),
engineStream,
CancellationToken.None
);
Expand All @@ -54,7 +56,7 @@ private async Task<int> TrainOnNewSegmentPairsAsync(string engineId)
if (segmentPairs.Count == 0)
return segmentPairs.Count;

string engineDir = Path.Combine(_options.CurrentValue.EnginesDir, engineId);
string engineDir = Path.Combine(_engineOptions.CurrentValue.EnginesDir, engineId);
var tokenizer = new LatinWordTokenizer();
var detokenizer = new LatinWordDetokenizer();
ITruecaser truecaser = await _truecaserFactory.CreateAsync(engineDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public TestEnvironment()
ServiceOptions serviceOptions = new() { ServiceId = "this_service" };
Factory = new DistributedReaderWriterLockFactory(
new OptionsWrapper<ServiceOptions>(serviceOptions),
new OptionsWrapper<DistributedReaderWriterLockOptions>(new DistributedReaderWriterLockOptions()),
Locks,
new ObjectIdGenerator()
);
Expand Down
Loading

0 comments on commit 1d4032d

Please sign in to comment.