Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 156 additions & 41 deletions src/AdoNet/Orleans.Clustering.AdoNet/Messaging/AdoNetClusteringTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace Orleans.Runtime.MembershipService
{
public class AdoNetClusteringTable : IMembershipTable
public partial class AdoNetClusteringTable : IMembershipTable
{
private readonly string clusterId;
private readonly IServiceProvider serviceProvider;
Expand All @@ -29,66 +29,58 @@ public AdoNetClusteringTable(

public async Task InitializeMembershipTable(bool tryInitTableVersion)
{
if (logger.IsEnabled(LogLevel.Trace)) logger.LogTrace("AdoNetClusteringTable.InitializeMembershipTable called.");
LogTraceInitializeMembershipTable();

//This initializes all of Orleans operational queries from the database using a well known view
//and assumes the database with appropriate definitions exists already.
orleansQueries = await RelationalOrleansQueries.CreateInstance(
clusteringTableOptions.Invariant,
clusteringTableOptions.ConnectionString);

// even if I am not the one who created the table,
// even if I am not the one who created the table,
// try to insert an initial table version if it is not already there,
// so we always have a first table version row, before this silo starts working.
if (tryInitTableVersion)
{
var wasCreated = await InitTableAsync();
if (wasCreated)
{
logger.LogInformation("Created new table version row.");
LogInfoCreatedNewTableVersionRow();
}
}
}


public async Task<MembershipTableData> ReadRow(SiloAddress key)
{
if (logger.IsEnabled(LogLevel.Trace))
logger.LogTrace("AdoNetClusteringTable.ReadRow called with key: {Key}.", key);
LogTraceReadRow(key);
try
{
return await orleansQueries.MembershipReadRowAsync(this.clusterId, key);
}
catch (Exception ex)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug(ex, "AdoNetClusteringTable.ReadRow failed");
LogDebugReadRowFailed(ex);
throw;
}
}


public async Task<MembershipTableData> ReadAll()
{
if (logger.IsEnabled(LogLevel.Trace)) logger.LogTrace("AdoNetClusteringTable.ReadAll called.");
LogTraceReadAll();
try
{
return await orleansQueries.MembershipReadAllAsync(this.clusterId);
}
catch (Exception ex)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug(ex, "AdoNetClusteringTable.ReadAll failed");
LogDebugReadAllFailed(ex);
throw;
}
}


public async Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersion)
{
if (logger.IsEnabled(LogLevel.Trace))
logger.LogTrace(
"AdoNetClusteringTable.InsertRow called with entry {Entry} and tableVersion {TableVersion}.",
entry,
tableVersion);
LogTraceInsertRow(entry, tableVersion);

//The "tableVersion" parameter should always exist when inserting a row as Init should
//have been called and membership version created and read. This is an optimization to
Expand All @@ -97,12 +89,12 @@ public async Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersi
//Likewise, no update can be done without membership entry.
if (entry == null)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.InsertRow aborted due to null check. MembershipEntry is null.");
LogDebugInsertRowAbortedNullEntry();
throw new ArgumentNullException(nameof(entry));
}
if (tableVersion is null)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.InsertRow aborted due to null check. TableVersion is null ");
LogDebugInsertRowAbortedNullTableVersion();
throw new ArgumentNullException(nameof(tableVersion));
}

Expand All @@ -112,15 +104,14 @@ public async Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersi
}
catch (Exception ex)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug(ex, "AdoNetClusteringTable.InsertRow failed");
LogDebugInsertRowFailed(ex);
throw;
}
}


public async Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion)
{
if (logger.IsEnabled(LogLevel.Trace)) logger.LogTrace("IMembershipTable.UpdateRow called with entry {Entry}, etag {ETag} and tableVersion {TableVersion}.", entry, etag, tableVersion);
LogTraceUpdateRow(entry, etag, tableVersion);

//The "tableVersion" parameter should always exist when updating a row as Init should
//have been called and membership version created and read. This is an optimization to
Expand All @@ -129,12 +120,12 @@ public async Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersi
//Likewise, no update can be done without membership entry or an etag.
if (entry == null)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.UpdateRow aborted due to null check. MembershipEntry is null.");
LogDebugUpdateRowAbortedNullEntry();
throw new ArgumentNullException(nameof(entry));
}
if (tableVersion is null)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.UpdateRow aborted due to null check. TableVersion is null");
LogDebugUpdateRowAbortedNullTableVersion();
throw new ArgumentNullException(nameof(tableVersion));
}

Expand All @@ -144,19 +135,17 @@ public async Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersi
}
catch (Exception ex)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug(ex, "AdoNetClusteringTable.UpdateRow failed");
LogDebugUpdateRowFailed(ex);
throw;
}
}


public async Task UpdateIAmAlive(MembershipEntry entry)
{
if (logger.IsEnabled(LogLevel.Trace))
logger.LogTrace("IMembershipTable.UpdateIAmAlive called with entry {Entry}.", entry);
LogTraceUpdateIAmAlive(entry);
if (entry == null)
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("AdoNetClusteringTable.UpdateIAmAlive aborted due to null check. MembershipEntry is null.");
LogDebugUpdateIAmAliveAbortedNullEntry();
throw new ArgumentNullException(nameof(entry));
}
try
Expand All @@ -165,41 +154,35 @@ public async Task UpdateIAmAlive(MembershipEntry entry)
}
catch (Exception ex)
{
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug(ex, "AdoNetClusteringTable.UpdateIAmAlive failed");
LogDebugUpdateIAmAliveFailed(ex);
throw;
}
}


public async Task DeleteMembershipTableEntries(string clusterId)
{
if (logger.IsEnabled(LogLevel.Trace))
logger.LogTrace("IMembershipTable.DeleteMembershipTableEntries called with clusterId {ClusterId}.", clusterId);
LogTraceDeleteMembershipTableEntries(clusterId);
try
{
await orleansQueries.DeleteMembershipTableEntriesAsync(clusterId);
}
catch (Exception ex)
{
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug(ex, "AdoNetClusteringTable.DeleteMembershipTableEntries failed");
LogDebugDeleteMembershipTableEntriesFailed(ex);
throw;
}
}

public async Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
{
if (logger.IsEnabled(LogLevel.Trace))
logger.LogTrace("IMembershipTable.CleanupDefunctSiloEntries called with beforeDate {beforeDate} and clusterId {ClusterId}.", beforeDate, clusterId);
LogTraceCleanupDefunctSiloEntries(beforeDate, clusterId);
try
{
await orleansQueries.CleanupDefunctSiloEntriesAsync(beforeDate, this.clusterId);
}
catch (Exception ex)
{
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug(ex, "AdoNetClusteringTable.CleanupDefunctSiloEntries failed");
LogDebugCleanupDefunctSiloEntriesFailed(ex);
throw;
}
}
Expand All @@ -212,9 +195,141 @@ private async Task<bool> InitTableAsync()
}
catch (Exception ex)
{
if (logger.IsEnabled(LogLevel.Trace)) logger.LogTrace(ex, "Insert silo membership version failed");
LogTraceInsertSiloMembershipVersionFailed(ex);
throw;
}
}

[LoggerMessage(
Level = LogLevel.Trace,
Message = $"{nameof(AdoNetClusteringTable)}.{nameof(InitializeMembershipTable)} called."
)]
private partial void LogTraceInitializeMembershipTable();

[LoggerMessage(
Level = LogLevel.Information,
Message = "Created new table version row."
)]
private partial void LogInfoCreatedNewTableVersionRow();

[LoggerMessage(
Level = LogLevel.Trace,
Message = $"{nameof(AdoNetClusteringTable)}.{nameof(ReadRow)} called with key: {{Key}}."
)]
private partial void LogTraceReadRow(SiloAddress key);

[LoggerMessage(
Level = LogLevel.Debug,
Message = $"{nameof(AdoNetClusteringTable)}.{nameof(ReadRow)} failed"
)]
private partial void LogDebugReadRowFailed(Exception exception);

[LoggerMessage(
Level = LogLevel.Trace,
Message = $"{nameof(AdoNetClusteringTable)}.{nameof(ReadAll)} called."
)]
private partial void LogTraceReadAll();

[LoggerMessage(
Level = LogLevel.Debug,
Message = $"{nameof(AdoNetClusteringTable)}.{nameof(ReadAll)} failed"
)]
private partial void LogDebugReadAllFailed(Exception exception);

[LoggerMessage(
Level = LogLevel.Trace,
Message = $"{nameof(AdoNetClusteringTable)}.{nameof(InsertRow)} called with entry {{Entry}} and tableVersion {{TableVersion}}."
)]
private partial void LogTraceInsertRow(MembershipEntry entry, TableVersion tableVersion);

[LoggerMessage(
Level = LogLevel.Debug,
Message = $"{nameof(AdoNetClusteringTable)}.{nameof(InsertRow)} aborted due to null check. MembershipEntry is null."
)]
private partial void LogDebugInsertRowAbortedNullEntry();

[LoggerMessage(
Level = LogLevel.Debug,
Message = $"{nameof(AdoNetClusteringTable)}.{nameof(InsertRow)} aborted due to null check. TableVersion is null "
)]
private partial void LogDebugInsertRowAbortedNullTableVersion();

[LoggerMessage(
Level = LogLevel.Debug,
Message = $"{nameof(AdoNetClusteringTable)}.{nameof(InsertRow)} failed"
)]
private partial void LogDebugInsertRowFailed(Exception exception);

[LoggerMessage(
Level = LogLevel.Trace,
Message = $"{nameof(IMembershipTable)}.{nameof(UpdateRow)} called with entry {{Entry}}, etag {{ETag}} and tableVersion {{TableVersion}}."
)]
private partial void LogTraceUpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion);

[LoggerMessage(
Level = LogLevel.Debug,
Message = $"{nameof(AdoNetClusteringTable)}.{nameof(UpdateRow)} aborted due to null check. MembershipEntry is null."
)]
private partial void LogDebugUpdateRowAbortedNullEntry();

[LoggerMessage(
Level = LogLevel.Debug,
Message = $"{nameof(AdoNetClusteringTable)}.{nameof(UpdateRow)} aborted due to null check. TableVersion is null"
)]
private partial void LogDebugUpdateRowAbortedNullTableVersion();

[LoggerMessage(
Level = LogLevel.Debug,
Message = $"{nameof(AdoNetClusteringTable)}.{nameof(UpdateRow)} failed"
)]
private partial void LogDebugUpdateRowFailed(Exception exception);

[LoggerMessage(
Level = LogLevel.Trace,
Message = $"{nameof(IMembershipTable)}.{nameof(UpdateIAmAlive)} called with entry {{Entry}}."
)]
private partial void LogTraceUpdateIAmAlive(MembershipEntry entry);

[LoggerMessage(
Level = LogLevel.Debug,
Message = $"{nameof(AdoNetClusteringTable)}.{nameof(UpdateIAmAlive)} aborted due to null check. MembershipEntry is null."
)]
private partial void LogDebugUpdateIAmAliveAbortedNullEntry();

[LoggerMessage(
Level = LogLevel.Debug,
Message = $"{nameof(AdoNetClusteringTable)}.{nameof(UpdateIAmAlive)} failed"
)]
private partial void LogDebugUpdateIAmAliveFailed(Exception exception);

[LoggerMessage(
Level = LogLevel.Trace,
Message = $"{nameof(IMembershipTable)}.{nameof(DeleteMembershipTableEntries)} called with clusterId {{ClusterId}}."
)]
private partial void LogTraceDeleteMembershipTableEntries(string clusterId);

[LoggerMessage(
Level = LogLevel.Debug,
Message = $"{nameof(AdoNetClusteringTable)}.{nameof(DeleteMembershipTableEntries)} failed"
)]
private partial void LogDebugDeleteMembershipTableEntriesFailed(Exception exception);

[LoggerMessage(
Level = LogLevel.Trace,
Message = $"{nameof(IMembershipTable)}.{nameof(CleanupDefunctSiloEntries)} called with beforeDate {{beforeDate}} and clusterId {{ClusterId}}."
)]
private partial void LogTraceCleanupDefunctSiloEntries(DateTimeOffset beforeDate, string clusterId);

[LoggerMessage(
Level = LogLevel.Debug,
Message = $"{nameof(AdoNetClusteringTable)}.{nameof(CleanupDefunctSiloEntries)} failed"
)]
private partial void LogDebugCleanupDefunctSiloEntriesFailed(Exception exception);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "Insert silo membership version failed"
)]
private partial void LogTraceInsertSiloMembershipVersionFailed(Exception exception);
}
}
Loading