Skip to content

Commit

Permalink
Merge pull request #261 from LATOKEN/validator-ddos-attack-fix
Browse files Browse the repository at this point in the history
Validator ddos attack fix
  • Loading branch information
sgladkov authored Sep 2, 2022
2 parents cc26152 + 53085f2 commit ca660b6
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 39 deletions.
13 changes: 11 additions & 2 deletions src/Lachain.Consensus/AbstractProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public void Terminate()
if (Terminated) return;
Logger.LogTrace($"{Id}: protocol is terminated");
Terminated = true;
// We can empty the _queue because the messages will no longer be processed
// This will free some memory in case of spam messages
_queue.Clear();
Monitor.Pulse(_queueLock);
}
}
Expand Down Expand Up @@ -148,8 +151,14 @@ public void ReceiveMessage(MessageEnvelope message)
{
lock (_queueLock)
{
if (Terminated){}
// Logger.LogTrace($"{Id}: got message after termination");
if (Terminated)
{
// we should return here instead of enqueueing messages
// because once terminated, the messages are not being processed anymore
// so the queue will just get large unnecessarily
Monitor.Pulse(_queueLock);
return;
}
_queue.Enqueue(message);
Monitor.Pulse(_queueLock);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Lachain.Consensus/BinaryAgreement/BinaryAgreement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,13 @@ private void TryProgressEpoch()

_currentValues = _binaryBroadcastsResults[_currentEpoch - 1];
var coinId = new CoinId(_agreementId.Era, _agreementId.AssociatedValidatorId, _currentEpoch);
if ((_currentEpoch / 2) % 3 == 2)
if (CoinToss.CreateCoinId(_currentEpoch))
{
Broadcaster.InternalRequest(new ProtocolRequest<CoinId, object?>(Id, coinId, null));
}
else
{
_coins[_currentEpoch] = ((_currentEpoch / 2) % 3) != 0;
_coins[_currentEpoch] = CoinToss.TossCoin(_currentEpoch) != 0;
}

_currentEpoch += 1;
Expand Down
34 changes: 34 additions & 0 deletions src/Lachain.Consensus/CommonCoin/CoinToss.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
namespace Lachain.Consensus.CommonCoin
{
public static class CoinToss
{
/*
RootProtocol requests for one CommonCoin protocol
BinaryAgreement requests many CommonCoin protocols sequentially
For each odd epoch BinaryAgreements takes false, true, result of common coin
So for epoch 1 => false, epoch 3 => true, epoch 5 => result of common coin
and the cycle continues
*/
public static long TossCoin(long epoch)
{
return (epoch / 2) % 3;
}

// epoch will be odd and TossCoin value will be 2
public static bool CreateCoinId(long epoch)
{
return (epoch > 0) && ((epoch & 1) != 0) && TossCoin(epoch) == 2;
}

public static long NextCoinCreationEpoch(long epoch)
{
return epoch + 6;
}

public static long TotalValidCommonCoin(long lowEpoch, long highEpoch)
{
if (!CreateCoinId(lowEpoch) || !CreateCoinId(highEpoch)) return -1;
return (highEpoch - lowEpoch) / 6 + 1;
}
}
}
4 changes: 4 additions & 0 deletions src/Lachain.Consensus/HoneyBadger/HoneyBadger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ private void CheckResult()
new ProtocolResult<HoneyBadgerId, ISet<IRawShare>>(_honeyBadgerId, _result));
}

// TODO: (investigate) the share is comming from ReliableBroadcast and the shareId of the share should match the senderId
// of that ReliableBroadcast, otherwise we might replace one share with another in _receivedShares array
private void HandleCommonSubset(ProtocolResult<CommonSubsetId, ISet<EncryptedShare>> result)
{
Logger.LogTrace($"Common subset finished {result.From}");
Expand Down Expand Up @@ -175,6 +177,8 @@ protected virtual ConsensusMessage CreateDecryptedMessage(PartiallyDecryptedShar
return message;
}

// DecryptorId of the message should match the senderId, otherwise the message should be discarded
// because HoneyBadger does not accept two messages for same DecryptorId and shareId
// We need to handle this message carefully like how about decoding a random message with random length
// and the value of 'share.ShareId' needs to be checked. If it is out of range, it can throw exception
private void HandleDecryptedMessage(TPKEPartiallyDecryptedShareMessage msg, int senderId)
Expand Down
26 changes: 22 additions & 4 deletions src/Lachain.Consensus/RootProtocol/RootProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class RootProtocol : AbstractProtocol
private MultiSig? _multiSig;
private ulong _cycleDuration;
private bool _useNewChainId;
private bool[] _receivedHeader;

private readonly List<Tuple<BlockHeader, MultiSig.Types.SignatureByValidator>> _signatures =
new List<Tuple<BlockHeader, MultiSig.Types.SignatureByValidator>>();
Expand All @@ -43,6 +44,11 @@ public RootProtocol(RootProtocolId id, IPublicConsensusKeySet wallet, ECDSAPriva
_validatorAttendanceRepository = validatorAttendanceRepository;
_cycleDuration = cycleDuration;
_useNewChainId = useNewChainId;
// _receivedHeader is used to check if a validator has sent valid SignedHeaderMessage.
// We should receive at most one valid SignedHeaderMessage from a validator
_receivedHeader = new bool[N];
for (int i = 0; i < N; i++)
_receivedHeader[i] = false;
}

public override void ProcessMessage(MessageEnvelope envelope)
Expand Down Expand Up @@ -78,7 +84,7 @@ public override void ProcessMessage(MessageEnvelope envelope)
Logger.LogWarning($"Header we have {_header}");
Logger.LogWarning($"Header we received {signedHeaderMessage.Header}");
}

// Random message can raise exception like recover id in signature verification can be out of range or
// public key cannot be serialized
var verified = false;
Expand Down Expand Up @@ -106,8 +112,10 @@ public override void ProcessMessage(MessageEnvelope envelope)
$"Incorrect signature of header {signedHeaderMessage.Header.Keccak().ToHex()} from validator {idx}"
);
}
else
else if (!_receivedHeader[idx])
{
// received verified header from validator
_receivedHeader[idx] = true;
Logger.LogTrace("Add signatures");
_lastMessage = "Add signatures";
_signatures.Add(new Tuple<BlockHeader, MultiSig.Types.SignatureByValidator>(
Expand All @@ -124,6 +132,13 @@ public override void ProcessMessage(MessageEnvelope envelope)
message.SignedHeaderMessage.Header.Index / _cycleDuration);
_validatorAttendanceRepository.SaveState(validatorAttendance.ToBytes());
}
else
{
// discard this header because already received a valid header from this validator
var pubKey = Broadcaster.GetPublicKeyById(idx)!.ToHex();
Logger.LogWarning($"Already received verified header from {idx} ({pubKey}). Validator {idx} " +
$"({pubKey}) tried to send SignedHeaderMessage more than once");
}

CheckSignatures();
}
Expand Down Expand Up @@ -206,7 +221,7 @@ private void TrySignHeader()
Logger.LogTrace("TrySignHeader");
if (_receipts is null || _nonce is null || _blockProducer is null)
{
Logger.LogTrace($"Not ready yet: _hasges {_receipts is null}, _nonce {_nonce is null}, _blockProducer {_blockProducer is null}");
Logger.LogTrace($"Not ready yet: _hashes {_receipts is null}, _nonce {_nonce is null}, _blockProducer {_blockProducer is null}");
return;
}

Expand Down Expand Up @@ -260,7 +275,10 @@ private void CheckSignatures()
.Aggregate((x, y) => x.Value > y.Value ? x : y);
Logger.LogTrace($"bestHeader.Value {bestHeader.Value} Wallet.N {Wallet.N} Wallet.F {Wallet.F}");
if (bestHeader.Value < Wallet.N - Wallet.F) return;
Logger.LogTrace($"Received {bestHeader.Value} signatures for block header");
Logger.LogTrace(
$"Received {bestHeader.Value} signatures for block header: height: {bestHeader.Key.Index}, hash: " +
$"{bestHeader.Key.Keccak().ToHex()}. My block header height: {_header.Index}, hash: {_header.Keccak().ToHex()}." +
$" My header hash matches with bestheader hash: {_header.Keccak().Equals(bestHeader.Key.Keccak())}");
_multiSig = new MultiSig {Quorum = (uint) (Wallet.N - Wallet.F)};
_multiSig.Validators.AddRange(Wallet.EcdsaPublicKeySet);
foreach (var (header, signature) in _signatures)
Expand Down
32 changes: 30 additions & 2 deletions src/Lachain.Core/Consensus/ConsensusManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,17 @@ public void Dispatch(ConsensusMessage message, ECDSAPublicKey from)
if (fromIndex == -1)
{
Logger.LogWarning(
$"Skipped message for era {era} since we it came from {from.ToHex()} who is not validator for this era");
$"Skipped message for era {era} since it came from {from.ToHex()} who is not validator for this era");
return;
}

broadcaster.Dispatch(message, fromIndex);
}
else
else if (IsEraNearFuture(era, CurrentEra))
{
lock (_postponedMessages)
{
// This queue can get very long and may cause node shut down due to insufficient memory
_postponedMessages
.PutIfAbsent(era, new List<(ConsensusMessage message, ECDSAPublicKey from)>())
.Add((message, from));
Expand All @@ -140,6 +141,14 @@ public void Dispatch(ConsensusMessage message, ECDSAPublicKey from)
}
}

private bool IsEraNearFuture(long era, long currentEra)
{
long allowedEra = 5;
if (era < currentEra) return false;
if (era - currentEra <= allowedEra) return true;
return false;
}

public void Start(ulong startingEra)
{
_networkManager.AdvanceEra(startingEra);
Expand All @@ -150,6 +159,14 @@ private void FinishEra()
{
lock (_erasLock)
{
// Sometimes _postponedMessages messages are not cleared if the current node is not a validator
// but messages maybe inserted in case the nodes does not have validators set yet and someone sent
// consensus message to this node. So it needs to be cleared for every era
lock (_postponedMessages)
{
_postponedMessages.Remove(CurrentEra);
}

var broadcaster = _eras[CurrentEra];
lock (broadcaster)
{
Expand Down Expand Up @@ -290,6 +307,17 @@ private void Run(ulong startingEra)
foreach (var (message, from) in savedMessages)
{
var fromIndex = validators.GetValidatorIndex(from);
// If a validator from some previous era sends message for some future era, the
// message will come here, but it could be that the validator is not a validator
// anymore, in that case the fromIndex will be -1 and may halt some process.
// So we need to check this and discard such messages
if (fromIndex == -1)
{
Logger.LogWarning(
$"Skipped message for era {CurrentEra} since it came from "
+ $"{from.ToHex()} who is not validator for this era");
continue;
}
Logger.LogTrace(
$"Handling postponed message: {message.PrettyTypeString()}");
broadcaster.Dispatch(message, fromIndex);
Expand Down
Loading

0 comments on commit ca660b6

Please sign in to comment.