Skip to content

Commit

Permalink
Fix #464 - add lock lifetime for all
Browse files Browse the repository at this point in the history
* Add HTTP timeout
* Make adjustable through options
* Will need to delete all locks from MongoDB - otherwise will endlessly loop for startup
  • Loading branch information
johnml1135 committed Sep 6, 2024
1 parent 4382410 commit b643a06
Show file tree
Hide file tree
Showing 24 changed files with 133 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ public class BuildJobOptions
public const string Key = "BuildJob";

public IList<ClearMLBuildQueue> ClearML { get; set; } = new List<ClearMLBuildQueue>();
public TimeSpan PostProcessLockLifetime { get; set; } = TimeSpan.FromSeconds(120);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Serval.Machine.Shared.Configuration;

public class DistributedReaderWriterLockOptions
{
public const string Key = "DistributedReaderWriterLock";

public TimeSpan DefaultLifetime { get; set; } = TimeSpan.FromSeconds(56); // must be less than DefaultHttpRequestTimeout
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,24 @@ public static IMachineBuilder AddClearMLOptions(this IMachineBuilder builder, IC
return builder;
}

public static IMachineBuilder AddDistributedReaderWriterLockOptions(
this IMachineBuilder build,
Action<DistributedReaderWriterLockOptions> configureOptions
)
{
build.Services.Configure(configureOptions);
return build;
}

public static IMachineBuilder AddDistributedReaderWriterLockOptions(
this IMachineBuilder build,
IConfiguration config
)
{
build.Services.Configure<DistributedReaderWriterLockOptions>(config);
return build;
}

public static IMachineBuilder AddMessageOutboxOptions(
this IMachineBuilder builder,
Action<MessageOutboxOptions> configureOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public static IMachineBuilder AddMachine(this IServiceCollection services, IConf
builder.AddSharedFileOptions(o => { });
builder.AddSmtTransferEngineOptions(o => { });
builder.AddClearMLOptions(o => { });
builder.AddDistributedReaderWriterLockOptions(o => { });
builder.AddBuildJobOptions(o => { });
builder.AddMessageOutboxOptions(o => { });
}
Expand All @@ -37,6 +38,9 @@ public static IMachineBuilder AddMachine(this IServiceCollection services, IConf
builder.AddSharedFileOptions(configuration.GetSection(SharedFileOptions.Key));
builder.AddSmtTransferEngineOptions(configuration.GetSection(SmtTransferEngineOptions.Key));
builder.AddClearMLOptions(configuration.GetSection(ClearMLOptions.Key));
builder.AddDistributedReaderWriterLockOptions(
configuration.GetSection(DistributedReaderWriterLockOptions.Key)
);
builder.AddBuildJobOptions(configuration.GetSection(BuildJobOptions.Key));
builder.AddMessageOutboxOptions(configuration.GetSection(MessageOutboxOptions.Key));
}
Expand Down
3 changes: 2 additions & 1 deletion src/Machine/src/Serval.Machine.Shared/Models/Lock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
public record Lock
{
public required string Id { get; init; }
public DateTime? ExpiresAt { get; init; }

public DateTime ExpiresAt { get; init; }
public required string HostId { get; init; }
}
7 changes: 3 additions & 4 deletions src/Machine/src/Serval.Machine.Shared/Models/RWLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ public record RWLock : IEntity
public bool IsAvailableForReading()
{
var now = DateTime.UtcNow;
return (WriterLock is null || WriterLock.ExpiresAt is not null && WriterLock.ExpiresAt <= now)
&& WriterQueue.Count == 0;
return (WriterLock is null || WriterLock.ExpiresAt <= now) && WriterQueue.Count == 0;
}

public bool IsAvailableForWriting(string? lockId = null)
{
var now = DateTime.UtcNow;
return (WriterLock is null || WriterLock.ExpiresAt is not null && WriterLock.ExpiresAt <= now)
&& !ReaderLocks.Any(l => l.ExpiresAt is null || l.ExpiresAt > now)
return (WriterLock is null || WriterLock.ExpiresAt <= now)
&& !ReaderLocks.Any(l => l.ExpiresAt > now)
&& (lockId is null || WriterQueue.Count > 0 && WriterQueue[0].Id == lockId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
<ItemGroup>
<ProjectReference Include="..\..\..\DataAccess\src\SIL.DataAccess\SIL.DataAccess.csproj" />
<ProjectReference Include="..\..\..\Serval\src\Serval.Grpc\Serval.Grpc.csproj" />
<ProjectReference Include="..\..\..\Serval\src\Serval.Shared\Serval.Shared.csproj" />
<ProjectReference Include="..\..\..\..\..\machine\src\SIL.Machine\SIL.Machine.csproj" Condition="Exists('..\..\..\..\..\machine\src\SIL.Machine\SIL.Machine.csproj')" />
<ProjectReference Include="..\..\..\..\..\machine\src\SIL.Machine.Morphology.HermitCrab\SIL.Machine.Morphology.HermitCrab.csproj" Condition="Exists('..\..\..\..\..\machine\src\SIL.Machine.Morphology.HermitCrab\SIL.Machine.Morphology.HermitCrab.csproj')" />
<ProjectReference Include="..\..\..\..\..\machine\src\SIL.Machine.Translation.Thot\SIL.Machine.Translation.Thot.csproj" Condition="Exists('..\..\..\..\..\machine\src\SIL.Machine.Translation.Thot\SIL.Machine.Translation.Thot.csproj')" />
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
namespace Serval.Machine.Shared.Services;

public class DistributedReaderWriterLock(string hostId, IRepository<RWLock> locks, IIdGenerator idGenerator, string id)
: IDistributedReaderWriterLock
public class DistributedReaderWriterLock(
string hostId,
IRepository<RWLock> locks,
IIdGenerator idGenerator,
string id,
DistributedReaderWriterLockOptions lockOptions
) : IDistributedReaderWriterLock
{
private readonly string _hostId = hostId;
private readonly IRepository<RWLock> _locks = locks;
private readonly IIdGenerator _idGenerator = idGenerator;
private readonly string _id = id;
private readonly DistributedReaderWriterLockOptions _lockOptions = lockOptions;

public async Task<IAsyncDisposable> ReaderLockAsync(
TimeSpan? lifetime = default,
CancellationToken cancellationToken = default
)
{
string lockId = _idGenerator.GenerateId();
if (!await TryAcquireReaderLock(lockId, lifetime, cancellationToken))
TimeSpan resolvedLifetime = lifetime ?? _lockOptions.DefaultLifetime;
if (!await TryAcquireReaderLock(lockId, resolvedLifetime, cancellationToken))
{
using ISubscription<RWLock> sub = await _locks.SubscribeAsync(rwl => rwl.Id == _id, cancellationToken);
do
Expand All @@ -32,7 +39,7 @@ public async Task<IAsyncDisposable> ReaderLockAsync(
if (timeout != TimeSpan.Zero)
await sub.WaitForChangeAsync(timeout, cancellationToken);
}
} while (!await TryAcquireReaderLock(lockId, lifetime, cancellationToken));
} while (!await TryAcquireReaderLock(lockId, resolvedLifetime, cancellationToken));
}
return new ReaderLockReleaser(this, lockId);
}
Expand All @@ -43,11 +50,12 @@ public async Task<IAsyncDisposable> WriterLockAsync(
)
{
string lockId = _idGenerator.GenerateId();
if (!await TryAcquireWriterLock(lockId, lifetime, cancellationToken))
TimeSpan resolvedLifetime = lifetime ?? _lockOptions.DefaultLifetime;
if (!await TryAcquireWriterLock(lockId, resolvedLifetime, cancellationToken))
{
await _locks.UpdateAsync(
_id,
u => u.Add(rwl => rwl.WriterQueue, new Lock { Id = lockId, HostId = _hostId }),
u => u.Add(rwl => rwl.WriterQueue, new Lock { Id = lockId, HostId = _hostId, }),
cancellationToken: cancellationToken
);
try
Expand All @@ -58,12 +66,9 @@ await _locks.UpdateAsync(
RWLock? rwLock = sub.Change.Entity;
if (rwLock is not null && !rwLock.IsAvailableForWriting(lockId))
{
var dateTimes = rwLock
.ReaderLocks.Where(l => l.ExpiresAt.HasValue)
.Select(l => l.ExpiresAt.GetValueOrDefault())
.ToList();
var dateTimes = rwLock.ReaderLocks.Select(l => l.ExpiresAt).ToList();
if (rwLock.WriterLock?.ExpiresAt is not null)
dateTimes.Add(rwLock.WriterLock.ExpiresAt.Value);
dateTimes.Add(rwLock.WriterLock.ExpiresAt);
TimeSpan? timeout = default;
if (dateTimes.Count > 0)
{
Expand All @@ -74,7 +79,7 @@ await _locks.UpdateAsync(
if (timeout != TimeSpan.Zero)
await sub.WaitForChangeAsync(timeout, cancellationToken);
}
} while (!await TryAcquireWriterLock(lockId, lifetime, cancellationToken));
} while (!await TryAcquireWriterLock(lockId, resolvedLifetime, cancellationToken));
}
catch
{
Expand All @@ -89,17 +94,13 @@ await _locks.UpdateAsync(
return new WriterLockReleaser(this, lockId);
}

private async Task<bool> TryAcquireWriterLock(
string lockId,
TimeSpan? lifetime,
CancellationToken cancellationToken
)
private async Task<bool> TryAcquireWriterLock(string lockId, TimeSpan lifetime, CancellationToken cancellationToken)
{
var now = DateTime.UtcNow;
Expression<Func<RWLock, bool>> filter = rwl =>
rwl.Id == _id
&& (rwl.WriterLock == null || rwl.WriterLock.ExpiresAt != null && rwl.WriterLock.ExpiresAt <= now)
&& !rwl.ReaderLocks.Any(l => l.ExpiresAt == null || l.ExpiresAt > now)
&& (rwl.WriterLock == null || rwl.WriterLock.ExpiresAt <= now)
&& !rwl.ReaderLocks.Any(l => l.ExpiresAt > now)
&& (!rwl.WriterQueue.Any() || rwl.WriterQueue[0].Id == lockId);
void Update(IUpdateBuilder<RWLock> u)
{
Expand All @@ -108,7 +109,7 @@ void Update(IUpdateBuilder<RWLock> u)
new Lock
{
Id = lockId,
ExpiresAt = lifetime is null ? null : now + lifetime,
ExpiresAt = now + lifetime,
HostId = _hostId
}
);
Expand All @@ -118,25 +119,19 @@ void Update(IUpdateBuilder<RWLock> u)
return rwLock is not null;
}

private async Task<bool> TryAcquireReaderLock(
string lockId,
TimeSpan? lifetime,
CancellationToken cancellationToken
)
private async Task<bool> TryAcquireReaderLock(string lockId, TimeSpan lifetime, CancellationToken cancellationToken)
{
var now = DateTime.UtcNow;
Expression<Func<RWLock, bool>> filter = rwl =>
rwl.Id == _id
&& (rwl.WriterLock == null || rwl.WriterLock.ExpiresAt != null && rwl.WriterLock.ExpiresAt <= now)
&& !rwl.WriterQueue.Any();
rwl.Id == _id && (rwl.WriterLock == null || rwl.WriterLock.ExpiresAt <= now) && !rwl.WriterQueue.Any();
void Update(IUpdateBuilder<RWLock> u)
{
u.Add(
rwl => rwl.ReaderLocks,
new Lock
{
Id = lockId,
ExpiresAt = lifetime is null ? null : now + lifetime,
ExpiresAt = now + lifetime,
HostId = _hostId
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

public class DistributedReaderWriterLockFactory(
IOptions<ServiceOptions> serviceOptions,
IOptions<DistributedReaderWriterLockOptions> lockOptions,
IRepository<RWLock> locks,
IIdGenerator idGenerator
) : IDistributedReaderWriterLockFactory
{
private readonly ServiceOptions _serviceOptions = serviceOptions.Value;
private readonly DistributedReaderWriterLockOptions _lockOptions = lockOptions.Value;
private readonly IIdGenerator _idGenerator = idGenerator;
private readonly IRepository<RWLock> _locks = locks;

Expand Down Expand Up @@ -39,7 +41,7 @@ await _locks.InsertAsync(
// the lock is already made - no new one needs to be made
// This is done instead of checking if it exists first to prevent race conditions.
}
return new DistributedReaderWriterLock(_serviceOptions.ServiceId, _locks, _idGenerator, id);
return new DistributedReaderWriterLock(_serviceOptions.ServiceId, _locks, _idGenerator, id, _lockOptions);
}

public async Task<bool> DeleteAsync(string id, CancellationToken cancellationToken = default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ public class PostprocessBuildJob(
IDataAccessContext dataAccessContext,
IBuildJobService buildJobService,
ILogger<PostprocessBuildJob> logger,
ISharedFileService sharedFileService
ISharedFileService sharedFileService,
IOptionsMonitor<BuildJobOptions> buildJobOptions
) : HangfireBuildJob<(int, double)>(platformService, engines, lockFactory, dataAccessContext, buildJobService, logger)
{
protected ISharedFileService SharedFileService { get; } = sharedFileService;
private readonly BuildJobOptions _buildJobOptions = buildJobOptions.CurrentValue;

protected override async Task DoWorkAsync(
string engineId,
Expand All @@ -33,7 +35,12 @@ CancellationToken cancellationToken
await PlatformService.InsertPretranslationsAsync(engineId, pretranslationsStream, cancellationToken);
}

await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None))
await using (
await @lock.WriterLockAsync(
lifetime: _buildJobOptions.PostProcessLockLifetime,
cancellationToken: CancellationToken.None
)
)
{
await DataAccessContext.WithTransactionAsync(
async (ct) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ public class SmtTransferBuildJob(
IRepository<TrainSegmentPair> trainSegmentPairs,
ITruecaserFactory truecaserFactory,
ISmtModelFactory smtModelFactory,
ICorpusService corpusService
ICorpusService corpusService,
IOptions<BuildJobOptions> buildJobOptions
)
: HangfireBuildJob<IReadOnlyList<Corpus>>(
platformService,
Expand All @@ -25,6 +26,7 @@ ICorpusService corpusService
private readonly ITruecaserFactory _truecaserFactory = truecaserFactory;
private readonly ISmtModelFactory _smtModelFactory = smtModelFactory;
private readonly ICorpusService _corpusService = corpusService;
private readonly BuildJobOptions _buildJobOptions = buildJobOptions.Value;

protected override Task InitializeAsync(
string engineId,
Expand Down Expand Up @@ -110,7 +112,12 @@ CancellationToken cancellationToken
if (engine is null)
throw new OperationCanceledException();

await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
await using (
await @lock.WriterLockAsync(
lifetime: _buildJobOptions.PostProcessLockLifetime,
cancellationToken: cancellationToken
)
)
{
cancellationToken.ThrowIfCancellationRequested();
await smtModelTrainer.SaveAsync(CancellationToken.None);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ public class SmtTransferPostprocessBuildJob(
IBuildJobService buildJobService,
ILogger<SmtTransferPostprocessBuildJob> logger,
ISharedFileService sharedFileService,
IOptionsMonitor<BuildJobOptions> buildJobOptions,
IRepository<TrainSegmentPair> trainSegmentPairs,
ISmtModelFactory smtModelFactory,
ITruecaserFactory truecaserFactory,
IOptionsMonitor<SmtTransferEngineOptions> options
IOptionsMonitor<SmtTransferEngineOptions> engineOptions
)
: PostprocessBuildJob(
platformService,
Expand All @@ -20,13 +21,14 @@ IOptionsMonitor<SmtTransferEngineOptions> options
dataAccessContext,
buildJobService,
logger,
sharedFileService
sharedFileService,
buildJobOptions
)
{
private readonly ISmtModelFactory _smtModelFactory = smtModelFactory;
private readonly ITruecaserFactory _truecaserFactory = truecaserFactory;
private readonly IRepository<TrainSegmentPair> _trainSegmentPairs = trainSegmentPairs;
private readonly IOptionsMonitor<SmtTransferEngineOptions> _options = options;
private readonly IOptionsMonitor<SmtTransferEngineOptions> _engineOptions = engineOptions;

protected override async Task<int> SaveModelAsync(string engineId, string buildId)
{
Expand All @@ -38,7 +40,7 @@ protected override async Task<int> SaveModelAsync(string engineId, string buildI
)
{
await _smtModelFactory.UpdateEngineFromAsync(
Path.Combine(_options.CurrentValue.EnginesDir, engineId),
Path.Combine(_engineOptions.CurrentValue.EnginesDir, engineId),
engineStream,
CancellationToken.None
);
Expand All @@ -54,7 +56,7 @@ private async Task<int> TrainOnNewSegmentPairsAsync(string engineId)
if (segmentPairs.Count == 0)
return segmentPairs.Count;

string engineDir = Path.Combine(_options.CurrentValue.EnginesDir, engineId);
string engineDir = Path.Combine(_engineOptions.CurrentValue.EnginesDir, engineId);
var tokenizer = new LatinWordTokenizer();
var detokenizer = new LatinWordDetokenizer();
ITruecaser truecaser = await _truecaserFactory.CreateAsync(engineDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public TestEnvironment()
ServiceOptions serviceOptions = new() { ServiceId = "this_service" };
Factory = new DistributedReaderWriterLockFactory(
new OptionsWrapper<ServiceOptions>(serviceOptions),
new OptionsWrapper<DistributedReaderWriterLockOptions>(new DistributedReaderWriterLockOptions()),
Locks,
new ObjectIdGenerator()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,11 @@ public TestEnvironment()
{
Locks = new MemoryRepository<RWLock>();
var idGenerator = new ObjectIdGenerator();
var options = Substitute.For<IOptions<ServiceOptions>>();
options.Value.Returns(new ServiceOptions { ServiceId = "host" });
Factory = new DistributedReaderWriterLockFactory(options, Locks, idGenerator);
var serviceOptions = Substitute.For<IOptions<ServiceOptions>>();
serviceOptions.Value.Returns(new ServiceOptions { ServiceId = "host" });
var lockOptions = Substitute.For<IOptions<DistributedReaderWriterLockOptions>>();
lockOptions.Value.Returns(new DistributedReaderWriterLockOptions());
Factory = new DistributedReaderWriterLockFactory(serviceOptions, lockOptions, Locks, idGenerator);
}

public DistributedReaderWriterLockFactory Factory { get; }
Expand Down
Loading

0 comments on commit b643a06

Please sign in to comment.