Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 130 additions & 39 deletions src/Azure/Orleans.Clustering.AzureStorage/AzureBasedMembershipTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

namespace Orleans.Runtime.MembershipService
{
internal class AzureBasedMembershipTable : IMembershipTable
internal partial class AzureBasedMembershipTable : IMembershipTable
{
private readonly ILogger logger;
private readonly ILoggerFactory loggerFactory;
Expand Down Expand Up @@ -50,7 +50,7 @@ public async Task InitializeMembershipTable(bool tryInitTableVersion)
{
// ignore return value, since we don't care if I inserted it or not, as long as it is in there.
bool created = await tableManager.TryCreateTableVersionEntryAsync();
if(created) logger.LogInformation("Created new table version row.");
if (created) LogInformationCreatedNewTableVersionRow();
}
}

Expand All @@ -70,14 +70,12 @@ public async Task<MembershipTableData> ReadRow(SiloAddress key)
{
var entries = await tableManager.FindSiloEntryAndTableVersionRow(key);
MembershipTableData data = Convert(entries);
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug($"Read my entry {{SiloAddress}} Table={Environment.NewLine}{{Data}}", key.ToString(), data.ToString());
LogDebugReadMyEntry(key, data);
return data;
}
catch (Exception exc)
{
logger.LogWarning((int)TableStorageErrorCode.AzureTable_20,
exc,
"Intermediate error reading silo entry for key {SiloAddress} from the table {TableName}.", key.ToString(), tableManager.TableName);
LogWarningIntermediateErrorReadingSiloEntry(exc, key, tableManager.TableName);
throw;
}
}
Expand All @@ -88,15 +86,13 @@ public async Task<MembershipTableData> ReadAll()
{
var entries = await tableManager.FindAllSiloEntries();
MembershipTableData data = Convert(entries);
if (logger.IsEnabled(LogLevel.Trace)) logger.LogTrace($"ReadAll Table={Environment.NewLine}{{Data}}", data.ToString());
LogTraceReadAllTable(data);

return data;
}
catch (Exception exc)
{
logger.LogWarning((int)TableStorageErrorCode.AzureTable_21,
exc,
"Intermediate error reading all silo entries {TableName}.", tableManager.TableName);
LogWarningIntermediateErrorReadingAllSiloEntries(exc, tableManager.TableName);
throw;
}
}
Expand All @@ -105,23 +101,23 @@ public async Task<bool> InsertRow(MembershipEntry entry, TableVersion tableVersi
{
try
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("InsertRow entry = {Data}, table version = {TableVersion}", entry.ToString(), tableVersion);
LogDebugInsertRow(entry, tableVersion);
var tableEntry = Convert(entry, tableManager.DeploymentId);
var versionEntry = tableManager.CreateTableVersionEntry(tableVersion.Version);

bool result = await tableManager.InsertSiloEntryConditionally(
tableEntry, versionEntry, tableVersion.VersionEtag);

if (result == false)
logger.LogWarning((int)TableStorageErrorCode.AzureTable_22,
"Insert failed due to contention on the table. Will retry. Entry {Data}, table version = {TableVersion}", entry.ToString(), tableVersion);
LogWarningTableContention(entry, tableVersion);
return result;
}
catch (Exception exc)
{
logger.LogWarning((int)TableStorageErrorCode.AzureTable_23,
exc,
"Intermediate error inserting entry {Data} tableVersion {TableVersion} to the table {TableName}.", entry.ToString(), tableVersion is null ? "null" : tableVersion.ToString(), tableManager.TableName);
LogWarningInsertingMembershipEntry(exc,
entry,
tableVersion is null ? "null" : tableVersion.ToString(),
tableManager.TableName);
throw;
}
}
Expand All @@ -130,25 +126,21 @@ public async Task<bool> UpdateRow(MembershipEntry entry, string etag, TableVersi
{
try
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("UpdateRow entry = {Data}, etag = {ETag}, table version = {TableVersion}", entry.ToString(), etag, tableVersion);
LogDebugUpdateRow(entry, etag, tableVersion);
var siloEntry = Convert(entry, tableManager.DeploymentId);
var versionEntry = tableManager.CreateTableVersionEntry(tableVersion.Version);

bool result = await tableManager.UpdateSiloEntryConditionally(siloEntry, etag, versionEntry, tableVersion.VersionEtag);
if (result == false)
logger.LogWarning(
(int)TableStorageErrorCode.AzureTable_24,
"Update failed due to contention on the table. Will retry. Entry {Data}, eTag {ETag}, table version = {TableVersion}",
entry.ToString(),
etag,
tableVersion);
LogWarningTableContentionEtag(entry, etag, tableVersion);
return result;
}
catch (Exception exc)
{
logger.LogWarning((int)TableStorageErrorCode.AzureTable_25,
exc,
"Intermediate error updating entry {Data} tableVersion {TableVersion} to the table {TableName}.", entry.ToString(), tableVersion is null ? "null" : tableVersion.ToString(), tableManager.TableName);
LogWarningUpdatingMembershipEntry(exc,
entry,
tableVersion is null ? "null" : tableVersion.ToString(),
tableManager.TableName);
throw;
}
}
Expand All @@ -157,15 +149,15 @@ public async Task UpdateIAmAlive(MembershipEntry entry)
{
try
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("Merge entry = {Data}", entry.ToString());
LogDebugMergeEntry(entry);
var siloEntry = ConvertPartial(entry, tableManager.DeploymentId);
await tableManager.MergeTableEntryAsync(siloEntry);
}
catch (Exception exc)
{
logger.LogWarning((int)TableStorageErrorCode.AzureTable_26,
exc,
"Intermediate error updating IAmAlive field for entry {Data} to the table {TableName}.", entry.ToString(), tableManager.TableName);
LogWarningUpdatingMembershipEntry(exc,
entry,
tableManager.TableName);
throw;
}
}
Expand All @@ -187,15 +179,13 @@ private MembershipTableData Convert(List<(SiloInstanceTableEntry Entity, string
{
try
{

MembershipEntry membershipEntry = Parse(tableEntry);
memEntries.Add(new Tuple<MembershipEntry, string>(membershipEntry, tuple.ETag));
}
catch (Exception exc)
{
logger.LogError((int)TableStorageErrorCode.AzureTable_61,
exc,
"Intermediate error parsing SiloInstanceTableEntry to MembershipTableData: {Data}. Ignoring this entry.", tableEntry);
LogErrorParsingMembershipTableDataIgnoring(exc, tableEntry);
}
}
}
Expand All @@ -204,9 +194,7 @@ private MembershipTableData Convert(List<(SiloInstanceTableEntry Entity, string
}
catch (Exception exc)
{
logger.LogError((int)TableStorageErrorCode.AzureTable_60,
exc,
"Intermediate error parsing SiloInstanceTableEntry to MembershipTableData: {Data}.", Utils.EnumerableToString(entries, tuple => tuple.Entity.ToString()));
LogErrorParsingMembershipTableData(exc, new(entries));
throw;
}
}
Expand All @@ -216,7 +204,7 @@ private static MembershipEntry Parse(SiloInstanceTableEntry tableEntry)
var parse = new MembershipEntry
{
HostName = tableEntry.HostName,
Status = (SiloStatus) Enum.Parse(typeof (SiloStatus), tableEntry.Status)
Status = (SiloStatus)Enum.Parse(typeof(SiloStatus), tableEntry.Status)
};

if (!string.IsNullOrEmpty(tableEntry.ProxyPort))
Expand All @@ -236,7 +224,8 @@ private static MembershipEntry Parse(SiloInstanceTableEntry tableEntry)
if (!string.IsNullOrEmpty(tableEntry.SiloName))
{
parse.SiloName = tableEntry.SiloName;
}else if (!string.IsNullOrEmpty(tableEntry.InstanceName))
}
else if (!string.IsNullOrEmpty(tableEntry.InstanceName))
{
// this is for backward compatability: in a mixed cluster of old and new version,
// some entries will have the old InstanceName column.
Expand Down Expand Up @@ -345,5 +334,107 @@ private static SiloInstanceTableEntry ConvertPartial(MembershipEntry memEntry, s
RowKey = SiloInstanceTableEntry.ConstructRowKey(memEntry.SiloAddress)
};
}

private readonly struct UtilsEnumerableToStringLogValue(IEnumerable<(SiloInstanceTableEntry Entity, string ETag)> entries)
{
public override string ToString() => Utils.EnumerableToString(entries, tuple => tuple.Entity.ToString());
}

[LoggerMessage(
Level = LogLevel.Information,
Message = "Created new table version row."
)]
private partial void LogInformationCreatedNewTableVersionRow();

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Read my entry {SiloAddress} Table=\n{Data}"
)]
private partial void LogDebugReadMyEntry(SiloAddress siloAddress, MembershipTableData data);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Intermediate error reading silo entry for key {SiloAddress} from the table {TableName}."
)]
private partial void LogWarningIntermediateErrorReadingSiloEntry(Exception exception, SiloAddress siloAddress, string tableName);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "ReadAll Table={Data}"
)]
private partial void LogTraceReadAllTable(MembershipTableData data);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Intermediate error reading all silo entries {TableName}."
)]
private partial void LogWarningIntermediateErrorReadingAllSiloEntries(Exception exception, string tableName);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "InsertRow entry = {Data}, table version = {TableVersion}"
)]
private partial void LogDebugInsertRow(MembershipEntry data, TableVersion tableVersion);

[LoggerMessage(
EventId = (int)TableStorageErrorCode.AzureTable_22,
Level = LogLevel.Warning,
Message = "Insert failed due to contention on the table. Will retry. Entry {Data}, table version = {TableVersion}"
)]
private partial void LogWarningTableContention(MembershipEntry data, TableVersion tableVersion);

[LoggerMessage(
EventId = (int)TableStorageErrorCode.AzureTable_23,
Level = LogLevel.Warning,
Message = "Intermediate error inserting entry {Data} tableVersion {TableVersion} to the table {TableName}"
)]
private partial void LogWarningInsertingMembershipEntry(Exception ex, MembershipEntry data, string tableVersion, string tableName);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "UpdateRow entry = {Data}, etag = {ETag}, table version = {TableVersion}"
)]
private partial void LogDebugUpdateRow(MembershipEntry data, string eTag, TableVersion tableVersion);

[LoggerMessage(
EventId = (int)TableStorageErrorCode.AzureTable_24,
Level = LogLevel.Warning,
Message = "Update failed due to contention on the table. Will retry. Entry {Data}, eTag {ETag}, table version = {TableVersion}"
)]
private partial void LogWarningTableContentionEtag(MembershipEntry data, string eTag, TableVersion tableVersion);

[LoggerMessage(
EventId = (int)TableStorageErrorCode.AzureTable_25,
Level = LogLevel.Warning,
Message = "Intermediate error updating entry {Data} tableVersion {TableVersion} to the table {TableName}"
)]
private partial void LogWarningUpdatingMembershipEntry(Exception ex, MembershipEntry data, string tableVersion, string tableName);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Merge entry = {Data}"
)]
private partial void LogDebugMergeEntry(MembershipEntry data);

[LoggerMessage(
EventId = (int)TableStorageErrorCode.AzureTable_26,
Level = LogLevel.Warning,
Message = "Intermediate error updating IAmAlive field for entry {Data} to the table {TableName}."
)]
private partial void LogWarningUpdatingMembershipEntry(Exception ex, MembershipEntry data, string tableName);

[LoggerMessage(
EventId = (int)TableStorageErrorCode.AzureTable_61,
Level = LogLevel.Error,
Message = "Intermediate error parsing SiloInstanceTableEntry to MembershipTableData: {Data}. Ignoring this entry."
)]
private partial void LogErrorParsingMembershipTableDataIgnoring(Exception ex, SiloInstanceTableEntry data);

[LoggerMessage(
EventId = (int)TableStorageErrorCode.AzureTable_60,
Level = LogLevel.Error,
Message = "Intermediate error parsing SiloInstanceTableEntry to MembershipTableData: {Data}."
)]
private partial void LogErrorParsingMembershipTableData(Exception ex, UtilsEnumerableToStringLogValue data);
}
}
Loading
Loading