Skip to content

Commit

Permalink
Merge pull request #304 from LATOKEN/consensus-persistence
Browse files Browse the repository at this point in the history
Consensus persistence
  • Loading branch information
dkhrustalev authored Dec 2, 2022
2 parents 75b13ff + 2a1e898 commit e0237f2
Show file tree
Hide file tree
Showing 86 changed files with 3,908 additions and 57 deletions.
57 changes: 55 additions & 2 deletions src/Lachain.Consensus/AbstractProtocol.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using System.Threading;
using Lachain.Logger;
using Lachain.Consensus.Messages;
using Lachain.Proto;
using Lachain.Utility.Utils;
using Prometheus;

Expand Down Expand Up @@ -36,6 +38,11 @@ public abstract class AbstractProtocol : IConsensusProtocol
protected string _lastMessage = "";
private ulong _startTime = 0;
private const ulong _alertTime = 60 * 1000;
public bool Started { get; private set; } = false;

// waiting for 10 block time
// TODO: calculate _requestTime properly
private const ulong _requestTime = 5 * 4 * 1000;

protected AbstractProtocol(
IPublicConsensusKeySet wallet,
Expand All @@ -44,12 +51,29 @@ IConsensusBroadcaster broadcaster
)
{
_thread = new Thread(Start) {IsBackground = true};
_thread.Start();
Broadcaster = broadcaster;
Id = id;
Wallet = wallet;
}

[MethodImpl(MethodImplOptions.Synchronized)]
public void StartThread()
{
if (Started)
{
throw new InvalidOperationException("StartThread() already called previously");
}

_thread.Start();
Started = true;
}

public bool HasThreadStarted()
{
return Started;
}


public int GetMyId()
{
return Broadcaster.GetMyId();
Expand Down Expand Up @@ -104,19 +128,25 @@ public void Terminate()
public void Start()
{
_startTime = TimeUtils.CurrentTimeMillis();
var lastRequestTime = _startTime;
while (!Terminated)
{
MessageEnvelope msg;
lock (_queueLock)
{
while (_queue.IsEmpty && !Terminated)
{
Monitor.Wait(_queueLock, 1000);
var queueUsed = Monitor.Wait(_queueLock, 1000);
if (TimeUtils.CurrentTimeMillis() - _startTime > _alertTime)
{
Logger.LogWarning($"Protocol {Id} is waiting for _queueLock too long, last message" +
$" is [{_lastMessage}]");
}
if (!queueUsed && TimeUtils.CurrentTimeMillis() - lastRequestTime > _requestTime)
{
_protocolWaitingTooLong?.Invoke(this, Id);
lastRequestTime = TimeUtils.CurrentTimeMillis();
}
}

if (Terminated)
Expand Down Expand Up @@ -164,6 +194,29 @@ public void ReceiveMessage(MessageEnvelope message)
}
}

protected void InvokeReceivedExternalMessage(int from, ConsensusMessage msg)
{
// received a valid msg from validator
_receivedExternalMessage?.Invoke(this, (from, msg));
}

protected void InvokeMessageBroadcasted(ConsensusMessage msg)
{
// an external message is broadcasted
_messageBroadcasted?.Invoke(this, msg);
}

protected void InvokeMessageSent(int validator, ConsensusMessage msg)
{
// an external message is sent to validator
_messageSent?.Invoke(this, (validator, msg));
}

public abstract void ProcessMessage(MessageEnvelope envelope);

public event EventHandler<IProtocolIdentifier>? _protocolWaitingTooLong;
public event EventHandler<(int from, ConsensusMessage msg)>? _receivedExternalMessage;
public event EventHandler<ConsensusMessage>? _messageBroadcasted;
public event EventHandler<(int validator, ConsensusMessage msg)>? _messageSent;
}
}
22 changes: 22 additions & 0 deletions src/Lachain.Consensus/BinaryAgreement/BinaryAgreementId.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Lachain.Utility.Serialization;
using Nethereum.RLP;

namespace Lachain.Consensus.BinaryAgreement
{
Expand Down Expand Up @@ -40,5 +44,23 @@ public override string ToString()
{
return $"BA (E={Era}, A={AssociatedValidatorId})";
}

public byte[] ToByteArray()
{
var list = new List<byte[]>
{
Era.ToBytes().ToArray(),
AssociatedValidatorId.ToBytes().ToArray()
};
return RLP.EncodeList(list.Select(RLP.EncodeElement).ToArray());
}

public static BinaryAgreementId FromByteArray(byte[] bytes)
{
var decoded = (RLPCollection) RLP.Decode(bytes.ToArray());
var era = decoded[0].RLPData.AsReadOnlySpan().ToInt64();
var id = decoded[1].RLPData.AsReadOnlySpan().ToInt64();
return new BinaryAgreementId(era, id);
}
}
}
15 changes: 12 additions & 3 deletions src/Lachain.Consensus/BinaryAgreement/BinaryBroadcast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ private void BroadcastBVal(bool value)
{
var b = value ? 1 : 0;
_wasBvalBroadcasted[b] = true;
Broadcaster.Broadcast(CreateBValMessage(b));
var msg = CreateBValMessage(b);
Broadcaster.Broadcast(msg);
InvokeMessageBroadcasted(msg);
}

[MethodImpl(MethodImplOptions.Synchronized)]
Expand All @@ -139,6 +141,7 @@ private void HandleBValMessage(int sender, BValMessage bval)

_receivedValues[sender].Add(b == 1);
++_receivedCount[b];
InvokeReceivedExternalMessage(sender, new ConsensusMessage { Bval = bval });

if (!_wasBvalBroadcasted[b] && _receivedCount[b] >= F + 1)
{
Expand All @@ -151,7 +154,9 @@ private void HandleBValMessage(int sender, BValMessage bval)
_binValues = _binValues.Add(b == 1);
if (_binValues.Count() == 1)
{
Broadcaster.Broadcast(CreateAuxMessage(b));
var msg = CreateAuxMessage(b);
Broadcaster.Broadcast(msg);
InvokeMessageBroadcasted(msg);
}

RevisitAuxMessages();
Expand All @@ -173,6 +178,7 @@ private void HandleAuxMessage(int sender, AuxMessage aux)

_playerSentAux[sender] = true;
_receivedAux[b]++;
InvokeReceivedExternalMessage(sender, new ConsensusMessage { Aux = aux });
RevisitAuxMessages();
}

Expand All @@ -191,6 +197,7 @@ private void HandleConfMessage(int sender, ConfMessage conf)
_validatorSentConf[sender] = true;

_confReceived.Add(new BoolSet(conf.Values));
InvokeReceivedExternalMessage(sender, new ConsensusMessage { Conf = conf });
RevisitConfMessages();
}

Expand Down Expand Up @@ -233,7 +240,9 @@ private void RevisitAuxMessages()
if (_confSent) return;
if (_binValues.Values().Sum(b => _receivedAux[b ? 1 : 0]) < N - F) return;
Logger.LogTrace($"{_broadcastId}: conf message sent with set {_binValues}");
Broadcaster.Broadcast(CreateConfMessage(_binValues));
var msg = CreateConfMessage(_binValues);
Broadcaster.Broadcast(msg);
InvokeMessageBroadcasted(msg);
_confSent = true;
RevisitConfMessages();
}
Expand Down
26 changes: 25 additions & 1 deletion src/Lachain.Consensus/BinaryAgreement/BinaryBroadcastId.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
namespace Lachain.Consensus.BinaryAgreement
using System.Collections.Generic;
using System.Linq;
using Lachain.Utility.Serialization;
using Nethereum.RLP;

namespace Lachain.Consensus.BinaryAgreement
{
public class BinaryBroadcastId : IProtocolIdentifier
{
Expand Down Expand Up @@ -46,5 +51,24 @@ public override string ToString()
{
return $"BB (Er={Era}, A={Agreement}, Ep={Epoch})";
}
public byte[] ToByteArray()
{
var list = new List<byte[]>
{
Era.ToBytes().ToArray(),
Agreement.ToBytes().ToArray(),
Epoch.ToBytes().ToArray()
};
return RLP.EncodeList(list.Select(RLP.EncodeElement).ToArray());
}

public static BinaryBroadcastId FromByteArray(byte[] bytes)
{
var decoded = (RLPCollection) RLP.Decode(bytes.ToArray());
var era = decoded[0].RLPData.AsReadOnlySpan().ToInt64();
var agreement = decoded[1].RLPData.AsReadOnlySpan().ToInt64();
var epoch = decoded[2].RLPData.AsReadOnlySpan().ToInt64();
return new BinaryBroadcastId(era, agreement, epoch);
}
}
}
20 changes: 20 additions & 0 deletions src/Lachain.Consensus/CommonCoin/CoinId.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using Lachain.Utility.Serialization;
using Nethereum.RLP;

namespace Lachain.Consensus.CommonCoin
{
Expand Down Expand Up @@ -50,5 +51,24 @@ public override int GetHashCode()
{
return HashCode.Combine(Era, Agreement, Epoch);
}
public byte[] ToByteArray()
{
var list = new List<byte[]>
{
Era.ToBytes().ToArray(),
Agreement.ToBytes().ToArray(),
Epoch.ToBytes().ToArray()
};
return RLP.EncodeList(list.Select(RLP.EncodeElement).ToArray());
}

public static CoinId FromByteArray(byte[] bytes)
{
var decoded = (RLPCollection) RLP.Decode(bytes.ToArray());
var era = decoded[0].RLPData.AsReadOnlySpan().ToInt64();
var agreement = decoded[1].RLPData.AsReadOnlySpan().ToInt64();
var epoch = decoded[2].RLPData.AsReadOnlySpan().ToInt64();
return new CoinId(era, agreement, epoch);
}
}
}
13 changes: 12 additions & 1 deletion src/Lachain.Consensus/CommonCoin/CoinResult.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
using System;
using System.Linq;
using Lachain.Utility.Serialization;
using Lachain.Utility.Utils;

namespace Lachain.Consensus.CommonCoin
{
public class CoinResult : IEquatable<CoinResult>
public class CoinResult : IEquatable<CoinResult>, IByteSerializable
{
public CoinResult(byte[] bytes)
{
Expand Down Expand Up @@ -36,5 +37,15 @@ public override int GetHashCode()
{
return RawBytes.Aggregate(0, (i, b) => i * 31 + b);
}

public byte[] ToByteArray()
{
return RawBytes.ToArray();
}

public static CoinResult FromByteArray(byte[] bytes)
{
return new CoinResult(bytes);
}
}
}
12 changes: 9 additions & 3 deletions src/Lachain.Consensus/CommonCoin/CommonCoin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public override void ProcessMessage(MessageEnvelope envelope)

// To create signature from the message, some requirements need to be fulfilled, otherwise it can
// throw exception (for example maybe a fixed length of the input bytes or maybe valid array of bytes)
bool validMsg = false;
try
{
Logger.LogTrace($"Received share from {envelope.ValidatorIndex}");
Expand All @@ -85,23 +86,27 @@ public override void ProcessMessage(MessageEnvelope envelope)
$"Faulty behaviour from player {envelope.ValidatorIndex}, {message.PrettyTypeString()}, {message.Coin.SignatureShare.ToByteArray().ToHex()}: bad signature share");
return; // potential fault evidence
}
validMsg = true;

if (signature == null)
{
_lastMessage = "signature == null";
return;
}

_result = new CoinResult(signature.RawSignature.ToBytes());
else
_result = new CoinResult(signature.RawSignature.ToBytes());
}
catch (Exception exception)
{
validMsg = false;
var pubKey = Broadcaster.GetPublicKeyById(envelope.ValidatorIndex)!.ToHex();
Logger.LogWarning(
$"Exception occured while handling message from validator {envelope.ValidatorIndex} " +
$"({pubKey}). Exception: {exception}");
}

if (validMsg)
InvokeReceivedExternalMessage(envelope.ValidatorIndex, message);

CheckResult();
}
else
Expand All @@ -121,6 +126,7 @@ public override void ProcessMessage(MessageEnvelope envelope)
CheckResult();
var msg = CreateCoinMessage(signatureShare);
Broadcaster.Broadcast(msg);
InvokeMessageBroadcasted(msg);
break;
case ProtocolResult<CoinId, CoinResult> _:
_lastMessage = "ProtocolResult";
Expand Down
21 changes: 20 additions & 1 deletion src/Lachain.Consensus/CommonSubset/CommonSubsetId.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
using Lachain.Consensus.HoneyBadger;
using System.Collections.Generic;
using System.Linq;
using Lachain.Consensus.HoneyBadger;
using Lachain.Utility.Serialization;
using Nethereum.RLP;

namespace Lachain.Consensus.CommonSubset
{
Expand Down Expand Up @@ -43,5 +47,20 @@ public override int GetHashCode()
{
return Era.GetHashCode();
}
public byte[] ToByteArray()
{
var list = new List<byte[]>
{
Era.ToBytes().ToArray(),
};
return RLP.EncodeList(list.Select(RLP.EncodeElement).ToArray());
}

public static CommonSubsetId FromByteArray(byte[] bytes)
{
var decoded = (RLPCollection) RLP.Decode(bytes.ToArray());
var era = decoded[0].RLPData.AsReadOnlySpan().ToInt64();
return new CommonSubsetId((int)era);
}
}
}
Loading

0 comments on commit e0237f2

Please sign in to comment.