Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

namespace Orleans.Clustering.DynamoDB
{
internal class DynamoDBMembershipTable : IMembershipTable
internal partial class DynamoDBMembershipTable : IMembershipTable
{
private static readonly TableVersion NotFoundTableVersion = new TableVersion(0, "0");

Expand Down Expand Up @@ -52,7 +52,7 @@ public async Task InitializeMembershipTable(bool tryInitTableVersion)
this.options.CreateIfNotExists,
this.options.UpdateIfExists);

logger.LogInformation((int)ErrorCode.MembershipBase, "Initializing AWS DynamoDB Membership Table");
LogInformationInitializingMembershipTable();
await storage.InitializeTable(this.options.TableName,
new List<KeySchemaElement>
{
Expand All @@ -72,7 +72,7 @@ await storage.InitializeTable(this.options.TableName,
{
// ignore return value, since we don't care if I inserted it or not, as long as it is in there.
bool created = await TryCreateTableVersionEntryAsync();
if(created) logger.LogInformation("Created new table version row.");
if (created) LogInformationCreatedNewTableVersionRow();
}
}

Expand Down Expand Up @@ -158,12 +158,7 @@ public async Task DeleteMembershipTableEntries(string clusterId)
}
catch (Exception exc)
{
this.logger.LogError(
(int)ErrorCode.MembershipBase,
exc,
"Unable to delete membership records on table {TableName} for ClusterId {ClusterId}",
this.options.TableName,
clusterId);
LogErrorUnableToDeleteMembershipRecords(exc, this.options.TableName, clusterId);
throw;
}
}
Expand All @@ -188,17 +183,12 @@ public async Task<MembershipTableData> ReadRow(SiloAddress siloAddress)
new[] {siloEntryKeys, versionEntryKeys}, fields => new SiloInstanceRecord(fields));

MembershipTableData data = Convert(entries.ToList());
if (this.logger.IsEnabled(LogLevel.Trace)) this.logger.LogTrace("Read my entry {SiloAddress} Table: {TableData}", siloAddress.ToString(), data.ToString());
LogTraceReadMyEntry(siloAddress, data);
return data;
}
catch (Exception exc)
{
this.logger.LogWarning(
(int)ErrorCode.MembershipBase,
exc,
"Intermediate error reading silo entry for key {SiloAddress} from the table {TableName}",
siloAddress.ToString(),
this.options.TableName);
LogWarningIntermediateErrorReadingSiloEntry(exc, siloAddress, this.options.TableName);
throw;
}
}
Expand All @@ -225,23 +215,19 @@ public async Task<MembershipTableData> ReadAll()

if (records.Exists(record => record.MembershipVersion > versionRow.MembershipVersion))
{
this.logger.LogWarning((int)ErrorCode.MembershipBase, "Found an inconsistency while reading all silo entries");
LogWarningFoundInconsistencyReadingAllSiloEntries();
//not expecting this to hit often, but if it does, should put in a limit
return await this.ReadAll();
}

MembershipTableData data = Convert(records);
if (this.logger.IsEnabled(LogLevel.Trace)) this.logger.LogTrace("ReadAll Table {Table}", data.ToString());
LogTraceReadAllTable(data);

return data;
}
catch (Exception exc)
{
this.logger.LogWarning(
(int)ErrorCode.MembershipBase,
exc,
"Intermediate error reading all silo entries {TableName}.",
options.TableName);
LogWarningIntermediateErrorReadingAllSiloEntries(exc, options.TableName);
throw;
}
}
Expand All @@ -250,16 +236,12 @@ public async Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersi
{
try
{
if (this.logger.IsEnabled(LogLevel.Debug)) this.logger.LogDebug("InsertRow entry = {Entry}", entry.ToString());
LogDebugInsertRow(entry);
var tableEntry = Convert(entry, tableVersion);

if (!TryCreateTableVersionRecord(tableVersion.Version, tableVersion.VersionEtag, out var versionEntry))
{
this.logger.LogWarning(
(int)ErrorCode.MembershipBase,
"Insert failed. Invalid ETag value. Will retry. Entry {Entry}, eTag {ETag}",
entry.ToString(),
tableVersion.VersionEtag);
LogWarningInsertFailedInvalidETag(entry, tableVersion.VersionEtag);
return false;
}

Expand Down Expand Up @@ -298,10 +280,7 @@ public async Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersi
if (canceledException.Message.Contains("ConditionalCheckFailed")) //not a good way to check for this currently
{
result = false;
this.logger.LogWarning(
(int)ErrorCode.MembershipBase,
"Insert failed due to contention on the table. Will retry. Entry {Entry}",
entry.ToString());
LogWarningInsertFailedDueToContention(entry);
}
else
{
Expand All @@ -313,12 +292,7 @@ public async Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersi
}
catch (Exception exc)
{
this.logger.LogWarning(
(int)ErrorCode.MembershipBase,
exc,
"Intermediate error inserting entry {Entry} to the table {TableName}.",
entry.ToString(),
this.options.TableName);
LogWarningIntermediateErrorInsertingEntry(exc, entry, this.options.TableName);
throw;
}
}
Expand All @@ -327,27 +301,19 @@ public async Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersi
{
try
{
if (this.logger.IsEnabled(LogLevel.Debug)) this.logger.LogDebug("UpdateRow entry = {Entry}, etag = {}", entry.ToString(), etag);
LogDebugUpdateRow(entry, etag);
var siloEntry = Convert(entry, tableVersion);
if (!int.TryParse(etag, out var currentEtag))
{
this.logger.LogWarning(
(int)ErrorCode.MembershipBase,
"Update failed. Invalid ETag value. Will retry. Entry {Entry}, eTag {ETag}",
entry.ToString(),
etag);
LogWarningUpdateFailedInvalidETag(entry, etag);
return false;
}

siloEntry.ETag = currentEtag + 1;

if (!TryCreateTableVersionRecord(tableVersion.Version, tableVersion.VersionEtag, out var versionEntry))
{
this.logger.LogWarning(
(int)ErrorCode.MembershipBase,
"Update failed. Invalid ETag value. Will retry. Entry {Entry}, eTag {ETag}",
entry.ToString(),
tableVersion.VersionEtag);
LogWarningUpdateFailedInvalidETag(entry, tableVersion.VersionEtag);
return false;
}

Expand Down Expand Up @@ -388,12 +354,7 @@ public async Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersi
if (canceledException.Message.Contains("ConditionalCheckFailed")) //not a good way to check for this currently
{
result = false;
this.logger.LogWarning(
(int)ErrorCode.MembershipBase,
canceledException,
"Update failed due to contention on the table. Will retry. Entry {Entry}, eTag {ETag}",
entry.ToString(),
etag);
LogWarningUpdateFailedDueToContention(canceledException, entry, etag);
}
else
{
Expand All @@ -405,12 +366,7 @@ public async Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersi
}
catch (Exception exc)
{
this.logger.LogWarning(
(int)ErrorCode.MembershipBase,
exc,
"Intermediate error updating entry {Entry} to the table {TableName}.",
entry.ToString(),
this.options.TableName);
LogWarningIntermediateErrorUpdatingEntry(exc, entry, this.options.TableName);
throw;
}
}
Expand All @@ -419,20 +375,15 @@ public async Task UpdateIAmAlive(MembershipEntry entry)
{
try
{
if (this.logger.IsEnabled(LogLevel.Debug)) this.logger.LogDebug("Merge entry = {Entry}", entry.ToString());
LogDebugMergeEntry(entry);
var siloEntry = ConvertPartial(entry);
var fields = new Dictionary<string, AttributeValue> { { SiloInstanceRecord.I_AM_ALIVE_TIME_PROPERTY_NAME, new AttributeValue(siloEntry.IAmAliveTime) } };
var expression = $"attribute_exists({SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}) AND attribute_exists({SiloInstanceRecord.SILO_IDENTITY_PROPERTY_NAME})";
await this.storage.UpsertEntryAsync(this.options.TableName, siloEntry.GetKeys(),fields, expression);
}
catch (Exception exc)
{
this.logger.LogWarning(
(int)ErrorCode.MembershipBase,
exc,
"Intermediate error updating IAmAlive field for entry {Entry} to the table {TableName}.",
entry.ToString(),
this.options.TableName);
LogWarningIntermediateErrorUpdatingIAmAlive(exc, entry, this.options.TableName);
throw;
}
}
Expand All @@ -459,11 +410,7 @@ private MembershipTableData Convert(List<SiloInstanceRecord> entries)
}
catch (Exception exc)
{
this.logger.LogError(
(int)ErrorCode.MembershipBase,
exc,
"Intermediate error parsing SiloInstanceTableEntry to MembershipTableData: {TableEntry}. Ignoring this entry.",
tableEntry);
LogErrorIntermediateErrorParsingSiloInstanceTableEntry(exc, tableEntry);
}
}
}
Expand All @@ -472,11 +419,7 @@ private MembershipTableData Convert(List<SiloInstanceRecord> entries)
}
catch (Exception exc)
{
this.logger.LogError(
(int)ErrorCode.MembershipBase,
exc,
"Intermediate error parsing SiloInstanceTableEntry to MembershipTableData: {Entries}.",
Utils.EnumerableToString(entries));
LogErrorIntermediateErrorParsingSiloInstanceTableEntries(exc, entries);
throw;
}
}
Expand Down Expand Up @@ -611,12 +554,7 @@ public async Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
}
catch (Exception exc)
{
this.logger.LogError(
(int)ErrorCode.MembershipBase,
exc,
"Unable to clean up defunct membership records on table {TableName} for ClusterId {ClusterId}",
this.options.TableName,
this.clusterId);
LogErrorUnableToCleanUpDefunctMembershipRecords(exc, this.options.TableName, this.clusterId);
throw;
}
}
Expand All @@ -627,5 +565,131 @@ private static bool SiloIsDefunct(SiloInstanceRecord silo, DateTimeOffset before
&& iAmAliveTime < beforeDate
&& silo.Status != (int)SiloStatus.Active;
}

[LoggerMessage(
Level = LogLevel.Information,
Message = "Initializing AWS DynamoDB Membership Table"
)]
private partial void LogInformationInitializingMembershipTable();

[LoggerMessage(
Level = LogLevel.Information,
Message = "Created new table version row."
)]
private partial void LogInformationCreatedNewTableVersionRow();

[LoggerMessage(
Level = LogLevel.Error,
Message = "Unable to delete membership records on table {TableName} for ClusterId {ClusterId}"
)]
private partial void LogErrorUnableToDeleteMembershipRecords(Exception exception, string tableName, string clusterId);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "Read my entry {SiloAddress} Table: {TableData}"
)]
private partial void LogTraceReadMyEntry(SiloAddress siloAddress, MembershipTableData tableData);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Intermediate error reading silo entry for key {SiloAddress} from the table {TableName}"
)]
private partial void LogWarningIntermediateErrorReadingSiloEntry(Exception exception, SiloAddress siloAddress, string tableName);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Found an inconsistency while reading all silo entries"
)]
private partial void LogWarningFoundInconsistencyReadingAllSiloEntries();

[LoggerMessage(
Level = LogLevel.Trace,
Message = "ReadAll Table {Table}"
)]
private partial void LogTraceReadAllTable(MembershipTableData table);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Intermediate error reading all silo entries {TableName}."
)]
private partial void LogWarningIntermediateErrorReadingAllSiloEntries(Exception exception, string tableName);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "InsertRow entry = {Entry}"
)]
private partial void LogDebugInsertRow(MembershipEntry entry);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Insert failed. Invalid ETag value. Will retry. Entry {Entry}, eTag {ETag}"
)]
private partial void LogWarningInsertFailedInvalidETag(MembershipEntry entry, string etag);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Insert failed due to contention on the table. Will retry. Entry {Entry}"
)]
private partial void LogWarningInsertFailedDueToContention(MembershipEntry entry);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Intermediate error inserting entry {Entry} to the table {TableName}."
)]
private partial void LogWarningIntermediateErrorInsertingEntry(Exception exception, MembershipEntry entry, string tableName);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "UpdateRow entry = {Entry}, etag = {Etag}"
)]
private partial void LogDebugUpdateRow(MembershipEntry entry, string etag);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Update failed. Invalid ETag value. Will retry. Entry {Entry}, eTag {ETag}"
)]
private partial void LogWarningUpdateFailedInvalidETag(MembershipEntry entry, string etag);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Update failed due to contention on the table. Will retry. Entry {Entry}, eTag {ETag}"
)]
private partial void LogWarningUpdateFailedDueToContention(Exception exception, MembershipEntry entry, string etag);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Intermediate error updating entry {Entry} to the table {TableName}."
)]
private partial void LogWarningIntermediateErrorUpdatingEntry(Exception exception, MembershipEntry entry, string tableName);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Merge entry = {Entry}"
)]
private partial void LogDebugMergeEntry(MembershipEntry entry);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Intermediate error updating IAmAlive field for entry {Entry} to the table {TableName}."
)]
private partial void LogWarningIntermediateErrorUpdatingIAmAlive(Exception exception, MembershipEntry entry, string tableName);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Intermediate error parsing SiloInstanceTableEntry to MembershipTableData: {TableEntry}. Ignoring this entry."
)]
private partial void LogErrorIntermediateErrorParsingSiloInstanceTableEntry(Exception exception, SiloInstanceRecord tableEntry);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Intermediate error parsing SiloInstanceTableEntry to MembershipTableData: {Entries}."
)]
private partial void LogErrorIntermediateErrorParsingSiloInstanceTableEntries(Exception exception, IEnumerable<SiloInstanceRecord> entries);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Unable to clean up defunct membership records on table {TableName} for ClusterId {ClusterId}"
)]
private partial void LogErrorUnableToCleanUpDefunctMembershipRecords(Exception exception, string tableName, string clusterId);
}
}
Loading