diff --git a/src/Lachain.Consensus/AbstractProtocol.cs b/src/Lachain.Consensus/AbstractProtocol.cs index c7dd245ef..eb8d7c220 100644 --- a/src/Lachain.Consensus/AbstractProtocol.cs +++ b/src/Lachain.Consensus/AbstractProtocol.cs @@ -94,6 +94,9 @@ public void Terminate() if (Terminated) return; Logger.LogTrace($"{Id}: protocol is terminated"); Terminated = true; + // We can empty the _queue because the messages will no longer be processed + // This will free some memory in case of spam messages + _queue.Clear(); Monitor.Pulse(_queueLock); } } @@ -148,8 +151,14 @@ public void ReceiveMessage(MessageEnvelope message) { lock (_queueLock) { - if (Terminated){} - // Logger.LogTrace($"{Id}: got message after termination"); + if (Terminated) + { + // we should return here instead of enqueueing messages + // because once terminated, the messages are not being processed anymore + // so the queue will just get large unnecessarily + Monitor.Pulse(_queueLock); + return; + } _queue.Enqueue(message); Monitor.Pulse(_queueLock); } diff --git a/src/Lachain.Consensus/BinaryAgreement/BinaryAgreement.cs b/src/Lachain.Consensus/BinaryAgreement/BinaryAgreement.cs index e67fbc7d6..093cd8500 100644 --- a/src/Lachain.Consensus/BinaryAgreement/BinaryAgreement.cs +++ b/src/Lachain.Consensus/BinaryAgreement/BinaryAgreement.cs @@ -128,13 +128,13 @@ private void TryProgressEpoch() _currentValues = _binaryBroadcastsResults[_currentEpoch - 1]; var coinId = new CoinId(_agreementId.Era, _agreementId.AssociatedValidatorId, _currentEpoch); - if ((_currentEpoch / 2) % 3 == 2) + if (CoinToss.CreateCoinId(_currentEpoch)) { Broadcaster.InternalRequest(new ProtocolRequest(Id, coinId, null)); } else { - _coins[_currentEpoch] = ((_currentEpoch / 2) % 3) != 0; + _coins[_currentEpoch] = CoinToss.TossCoin(_currentEpoch) != 0; } _currentEpoch += 1; diff --git a/src/Lachain.Consensus/CommonCoin/CoinToss.cs b/src/Lachain.Consensus/CommonCoin/CoinToss.cs new file mode 100644 index 000000000..6592cc5d7 --- /dev/null +++ b/src/Lachain.Consensus/CommonCoin/CoinToss.cs @@ -0,0 +1,34 @@ +namespace Lachain.Consensus.CommonCoin +{ + public static class CoinToss + { + /* + RootProtocol requests for one CommonCoin protocol + BinaryAgreement requests many CommonCoin protocols sequentially + For each odd epoch BinaryAgreements takes false, true, result of common coin + So for epoch 1 => false, epoch 3 => true, epoch 5 => result of common coin + and the cycle continues + */ + public static long TossCoin(long epoch) + { + return (epoch / 2) % 3; + } + + // epoch will be odd and TossCoin value will be 2 + public static bool CreateCoinId(long epoch) + { + return (epoch > 0) && ((epoch & 1) != 0) && TossCoin(epoch) == 2; + } + + public static long NextCoinCreationEpoch(long epoch) + { + return epoch + 6; + } + + public static long TotalValidCommonCoin(long lowEpoch, long highEpoch) + { + if (!CreateCoinId(lowEpoch) || !CreateCoinId(highEpoch)) return -1; + return (highEpoch - lowEpoch) / 6 + 1; + } + } +} \ No newline at end of file diff --git a/src/Lachain.Consensus/HoneyBadger/HoneyBadger.cs b/src/Lachain.Consensus/HoneyBadger/HoneyBadger.cs index 1fd3a742f..5a1567b64 100644 --- a/src/Lachain.Consensus/HoneyBadger/HoneyBadger.cs +++ b/src/Lachain.Consensus/HoneyBadger/HoneyBadger.cs @@ -130,6 +130,8 @@ private void CheckResult() new ProtocolResult>(_honeyBadgerId, _result)); } + // TODO: (investigate) the share is comming from ReliableBroadcast and the shareId of the share should match the senderId + // of that ReliableBroadcast, otherwise we might replace one share with another in _receivedShares array private void HandleCommonSubset(ProtocolResult> result) { Logger.LogTrace($"Common subset finished {result.From}"); @@ -175,6 +177,8 @@ protected virtual ConsensusMessage CreateDecryptedMessage(PartiallyDecryptedShar return message; } + // DecryptorId of the message should match the senderId, otherwise the message should be discarded + // because HoneyBadger does not accept two messages for same DecryptorId and shareId // We need to handle this message carefully like how about decoding a random message with random length // and the value of 'share.ShareId' needs to be checked. If it is out of range, it can throw exception private void HandleDecryptedMessage(TPKEPartiallyDecryptedShareMessage msg, int senderId) diff --git a/src/Lachain.Consensus/RootProtocol/RootProtocol.cs b/src/Lachain.Consensus/RootProtocol/RootProtocol.cs index 69ad87022..ab81a4184 100644 --- a/src/Lachain.Consensus/RootProtocol/RootProtocol.cs +++ b/src/Lachain.Consensus/RootProtocol/RootProtocol.cs @@ -30,6 +30,7 @@ public class RootProtocol : AbstractProtocol private MultiSig? _multiSig; private ulong _cycleDuration; private bool _useNewChainId; + private bool[] _receivedHeader; private readonly List> _signatures = new List>(); @@ -43,6 +44,11 @@ public RootProtocol(RootProtocolId id, IPublicConsensusKeySet wallet, ECDSAPriva _validatorAttendanceRepository = validatorAttendanceRepository; _cycleDuration = cycleDuration; _useNewChainId = useNewChainId; + // _receivedHeader is used to check if a validator has sent valid SignedHeaderMessage. + // We should receive at most one valid SignedHeaderMessage from a validator + _receivedHeader = new bool[N]; + for (int i = 0; i < N; i++) + _receivedHeader[i] = false; } public override void ProcessMessage(MessageEnvelope envelope) @@ -78,7 +84,7 @@ public override void ProcessMessage(MessageEnvelope envelope) Logger.LogWarning($"Header we have {_header}"); Logger.LogWarning($"Header we received {signedHeaderMessage.Header}"); } - + // Random message can raise exception like recover id in signature verification can be out of range or // public key cannot be serialized var verified = false; @@ -106,8 +112,10 @@ public override void ProcessMessage(MessageEnvelope envelope) $"Incorrect signature of header {signedHeaderMessage.Header.Keccak().ToHex()} from validator {idx}" ); } - else + else if (!_receivedHeader[idx]) { + // received verified header from validator + _receivedHeader[idx] = true; Logger.LogTrace("Add signatures"); _lastMessage = "Add signatures"; _signatures.Add(new Tuple( @@ -124,6 +132,13 @@ public override void ProcessMessage(MessageEnvelope envelope) message.SignedHeaderMessage.Header.Index / _cycleDuration); _validatorAttendanceRepository.SaveState(validatorAttendance.ToBytes()); } + else + { + // discard this header because already received a valid header from this validator + var pubKey = Broadcaster.GetPublicKeyById(idx)!.ToHex(); + Logger.LogWarning($"Already received verified header from {idx} ({pubKey}). Validator {idx} " + + $"({pubKey}) tried to send SignedHeaderMessage more than once"); + } CheckSignatures(); } @@ -206,7 +221,7 @@ private void TrySignHeader() Logger.LogTrace("TrySignHeader"); if (_receipts is null || _nonce is null || _blockProducer is null) { - Logger.LogTrace($"Not ready yet: _hasges {_receipts is null}, _nonce {_nonce is null}, _blockProducer {_blockProducer is null}"); + Logger.LogTrace($"Not ready yet: _hashes {_receipts is null}, _nonce {_nonce is null}, _blockProducer {_blockProducer is null}"); return; } @@ -260,7 +275,10 @@ private void CheckSignatures() .Aggregate((x, y) => x.Value > y.Value ? x : y); Logger.LogTrace($"bestHeader.Value {bestHeader.Value} Wallet.N {Wallet.N} Wallet.F {Wallet.F}"); if (bestHeader.Value < Wallet.N - Wallet.F) return; - Logger.LogTrace($"Received {bestHeader.Value} signatures for block header"); + Logger.LogTrace( + $"Received {bestHeader.Value} signatures for block header: height: {bestHeader.Key.Index}, hash: " + + $"{bestHeader.Key.Keccak().ToHex()}. My block header height: {_header.Index}, hash: {_header.Keccak().ToHex()}." + + $" My header hash matches with bestheader hash: {_header.Keccak().Equals(bestHeader.Key.Keccak())}"); _multiSig = new MultiSig {Quorum = (uint) (Wallet.N - Wallet.F)}; _multiSig.Validators.AddRange(Wallet.EcdsaPublicKeySet); foreach (var (header, signature) in _signatures) diff --git a/src/Lachain.Core/Consensus/ConsensusManager.cs b/src/Lachain.Core/Consensus/ConsensusManager.cs index 9bdb6f698..ade61874b 100644 --- a/src/Lachain.Core/Consensus/ConsensusManager.cs +++ b/src/Lachain.Core/Consensus/ConsensusManager.cs @@ -122,16 +122,17 @@ public void Dispatch(ConsensusMessage message, ECDSAPublicKey from) if (fromIndex == -1) { Logger.LogWarning( - $"Skipped message for era {era} since we it came from {from.ToHex()} who is not validator for this era"); + $"Skipped message for era {era} since it came from {from.ToHex()} who is not validator for this era"); return; } broadcaster.Dispatch(message, fromIndex); } - else + else if (IsEraNearFuture(era, CurrentEra)) { lock (_postponedMessages) { + // This queue can get very long and may cause node shut down due to insufficient memory _postponedMessages .PutIfAbsent(era, new List<(ConsensusMessage message, ECDSAPublicKey from)>()) .Add((message, from)); @@ -140,6 +141,14 @@ public void Dispatch(ConsensusMessage message, ECDSAPublicKey from) } } + private bool IsEraNearFuture(long era, long currentEra) + { + long allowedEra = 5; + if (era < currentEra) return false; + if (era - currentEra <= allowedEra) return true; + return false; + } + public void Start(ulong startingEra) { _networkManager.AdvanceEra(startingEra); @@ -150,6 +159,14 @@ private void FinishEra() { lock (_erasLock) { + // Sometimes _postponedMessages messages are not cleared if the current node is not a validator + // but messages maybe inserted in case the nodes does not have validators set yet and someone sent + // consensus message to this node. So it needs to be cleared for every era + lock (_postponedMessages) + { + _postponedMessages.Remove(CurrentEra); + } + var broadcaster = _eras[CurrentEra]; lock (broadcaster) { @@ -290,6 +307,17 @@ private void Run(ulong startingEra) foreach (var (message, from) in savedMessages) { var fromIndex = validators.GetValidatorIndex(from); + // If a validator from some previous era sends message for some future era, the + // message will come here, but it could be that the validator is not a validator + // anymore, in that case the fromIndex will be -1 and may halt some process. + // So we need to check this and discard such messages + if (fromIndex == -1) + { + Logger.LogWarning( + $"Skipped message for era {CurrentEra} since it came from " + + $"{from.ToHex()} who is not validator for this era"); + continue; + } Logger.LogTrace( $"Handling postponed message: {message.PrettyTypeString()}"); broadcaster.Dispatch(message, fromIndex); diff --git a/src/Lachain.Core/Consensus/EraBroadcaster.cs b/src/Lachain.Core/Consensus/EraBroadcaster.cs index c83bd7eed..9f453ddef 100644 --- a/src/Lachain.Core/Consensus/EraBroadcaster.cs +++ b/src/Lachain.Core/Consensus/EraBroadcaster.cs @@ -53,6 +53,9 @@ public class EraBroadcaster : IConsensusBroadcaster private readonly IDictionary _registry = new ConcurrentDictionary(); + private readonly IDictionary> _postponedMessages = + new ConcurrentDictionary>(); + public EraBroadcaster( long era, IConsensusMessageDeliverer consensusMessageDeliverer, IPrivateWallet wallet, IValidatorAttendanceRepository validatorAttendanceRepository @@ -91,7 +94,7 @@ public void Broadcast(ConsensusMessage message) } var payload = _messageFactory.ConsensusMessage(message); - foreach (var publicKey in _validators.EcdsaPublicKeySet) + foreach (var publicKey in _validators!.EcdsaPublicKeySet) { if (publicKey.Equals(_wallet.EcdsaKeyPair.PublicKey)) { @@ -125,7 +128,7 @@ public void SendToValidator(ConsensusMessage message, int index) } var payload = _messageFactory.ConsensusMessage(message); - _consensusMessageDeliverer.SendTo(_validators.EcdsaPublicKeySet[index], payload); + _consensusMessageDeliverer.SendTo(_validators!.EcdsaPublicKeySet[index], payload); } public void Dispatch(ConsensusMessage message, int from) @@ -150,49 +153,73 @@ public void Dispatch(ConsensusMessage message, int from) return; } + IProtocolIdentifier protocolId; switch (message.PayloadCase) { case ConsensusMessage.PayloadOneofCase.Bval: - var idBval = new BinaryBroadcastId(message.Validator.Era, message.Bval.Agreement, + protocolId = new BinaryBroadcastId(message.Validator.Era, message.Bval.Agreement, message.Bval.Epoch); - EnsureProtocol(idBval)?.ReceiveMessage(new MessageEnvelope(message, from)); break; case ConsensusMessage.PayloadOneofCase.Aux: - var idAux = new BinaryBroadcastId(message.Validator.Era, message.Aux.Agreement, message.Aux.Epoch); - EnsureProtocol(idAux)?.ReceiveMessage(new MessageEnvelope(message, from)); + protocolId = new BinaryBroadcastId(message.Validator.Era, message.Aux.Agreement, message.Aux.Epoch); break; case ConsensusMessage.PayloadOneofCase.Conf: - var idConf = new BinaryBroadcastId(message.Validator.Era, message.Conf.Agreement, + protocolId = new BinaryBroadcastId(message.Validator.Era, message.Conf.Agreement, message.Conf.Epoch); - EnsureProtocol(idConf)?.ReceiveMessage(new MessageEnvelope(message, from)); break; case ConsensusMessage.PayloadOneofCase.Coin: - var idCoin = new CoinId(message.Validator.Era, message.Coin.Agreement, message.Coin.Epoch); - EnsureProtocol(idCoin)?.ReceiveMessage(new MessageEnvelope(message, from)); + protocolId = new CoinId(message.Validator.Era, message.Coin.Agreement, message.Coin.Epoch); break; case ConsensusMessage.PayloadOneofCase.Decrypted: - var hbbftId = new HoneyBadgerId((int) message.Validator.Era); - EnsureProtocol(hbbftId)?.ReceiveMessage(new MessageEnvelope(message, from)); + protocolId = new HoneyBadgerId((int) message.Validator.Era); break; case ConsensusMessage.PayloadOneofCase.ValMessage: - var reliableBroadcastId = new ReliableBroadcastId(message.ValMessage.SenderId, (int) message.Validator.Era); - EnsureProtocol(reliableBroadcastId)?.ReceiveMessage(new MessageEnvelope(message, from)); + protocolId = new ReliableBroadcastId(message.ValMessage.SenderId, (int) message.Validator.Era); break; case ConsensusMessage.PayloadOneofCase.EchoMessage: - var rbIdEchoMsg = new ReliableBroadcastId(message.EchoMessage.SenderId, (int) message.Validator.Era); - EnsureProtocol(rbIdEchoMsg)?.ReceiveMessage(new MessageEnvelope(message, from)); + protocolId = new ReliableBroadcastId(message.EchoMessage.SenderId, (int) message.Validator.Era); break; case ConsensusMessage.PayloadOneofCase.ReadyMessage: - var rbIdReadyMsg = new ReliableBroadcastId(message.ReadyMessage.SenderId, (int) message.Validator.Era); - EnsureProtocol(rbIdReadyMsg)?.ReceiveMessage(new MessageEnvelope(message, from)); + protocolId = new ReliableBroadcastId(message.ReadyMessage.SenderId, (int) message.Validator.Era); break; case ConsensusMessage.PayloadOneofCase.SignedHeaderMessage: - var rootId = new RootProtocolId(message.Validator.Era); - EnsureProtocol(rootId)?.ReceiveMessage(new MessageEnvelope(message, from)); + protocolId = new RootProtocolId(message.Validator.Era); break; default: throw new InvalidOperationException($"Unknown message type {message}"); } + + HandleExternalMessage(protocolId, new MessageEnvelope(message, from)); + } + + private void HandleExternalMessage(IProtocolIdentifier protocolId, MessageEnvelope message) + { + // For external message we don't create new protocols. each protocol is requested for some result + // by another protocol (maybe itself) via internal message. When protocols are created by internal message + // we deliver the external messages to that protocol, otherwise store them to deliver later + if (ValidateProtocolId(protocolId)) + { + if (message.External) + { + var protocol = GetProtocolById(protocolId); + if (protocol is null) + { + lock (_postponedMessages) + { + _postponedMessages + .PutIfAbsent(protocolId, new List()) + .Add(message); + } + } + else protocol.ReceiveMessage(message); + } + else Logger.LogWarning("Internal message should not be here"); + } + else + { + var from = message.ValidatorIndex; + Logger.LogWarning($"Invalid protocol id {protocolId} from validator {GetPublicKeyById(from)!.ToHex()} ({from})"); + } } [MethodImpl(MethodImplOptions.Synchronized)] @@ -223,6 +250,21 @@ public void InternalRequest(ProtocolRequest re if (_registry.TryGetValue(request.To, out var protocol)) protocol?.ReceiveMessage(new MessageEnvelope(request, GetMyId())); + + if (!(protocol is null)) + { + lock (_postponedMessages) + { + if (_postponedMessages.TryGetValue(request.To, out var savedMessages)) + { + foreach (var message in savedMessages) + { + protocol.ReceiveMessage(message); + } + _postponedMessages.Remove(request.To); + } + } + } } public void InternalResponse(ProtocolResult result) @@ -261,7 +303,7 @@ public int GetMyId() public int GetIdByPublicKey(ECDSAPublicKey publicKey) { - return _validators.GetValidatorIndex(publicKey); + return _validators!.GetValidatorIndex(publicKey); } public ECDSAPublicKey? GetPublicKeyById(int id) @@ -286,29 +328,44 @@ public void Terminate() _registry.Clear(); _callback.Clear(); + lock (_postponedMessages) + { + _postponedMessages.Clear(); + } } + // Each ProtocolId is created only once to prevent spamming, Protocols are mapped against ProtocolId, so each + // Protocol will also be created only once, after achieving result, Protocol terminate and no longer process any + // messages. [MethodImpl(MethodImplOptions.Synchronized)] private IConsensusProtocol? EnsureProtocol(IProtocolIdentifier id) { ValidateId(id); if (_registry.TryGetValue(id, out var existingProtocol)) return existingProtocol; - Logger.LogTrace($"Creating protocol {id} on demand"); if (_terminated) { Logger.LogTrace($"Protocol {id} not created since broadcaster is terminated"); return null; } + var protocol = CreateProtocol(id); + if (!(protocol is null)) + Logger.LogTrace($"Created protocol {id} on demand"); + return protocol; + } + + private IConsensusProtocol? CreateProtocol(IProtocolIdentifier id) + { + if (!ValidateProtocolId(id)) return null; switch (id) { case BinaryBroadcastId bbId: - var bb = new BinaryBroadcast(bbId, _validators, this); + var bb = new BinaryBroadcast(bbId, _validators!, this); RegisterProtocols(new[] {bb}); return bb; case CoinId coinId: var coin = new CommonCoin( - coinId, _validators, + coinId, _validators!, _wallet.GetThresholdSignatureKeyForBlock((ulong) _era - 1) ?? throw new InvalidOperationException($"No TS keys present for era {_era}"), this @@ -316,20 +373,20 @@ public void Terminate() RegisterProtocols(new[] {coin}); return coin; case ReliableBroadcastId rbcId: - var rbc = new ReliableBroadcast(rbcId, _validators, this); + var rbc = new ReliableBroadcast(rbcId, _validators!, this); RegisterProtocols(new[] {rbc}); return rbc; case BinaryAgreementId baId: - var ba = new BinaryAgreement(baId, _validators, this); + var ba = new BinaryAgreement(baId, _validators!, this); RegisterProtocols(new[] {ba}); return ba; case CommonSubsetId acsId: - var acs = new CommonSubset(acsId, _validators, this); + var acs = new CommonSubset(acsId, _validators!, this); RegisterProtocols(new[] {acs}); return acs; case HoneyBadgerId hbId: var hb = new HoneyBadger( - hbId, _validators, + hbId, _validators!, _wallet.GetTpkePrivateKeyForBlock((ulong) _era - 1) ?? throw new InvalidOperationException($"No TPKE keys present for era {_era}"), !HardforkHeights.IsHardfork_12Active((ulong)_era > 2 * StakingContract.CycleDuration ? (ulong)_era - 2 * StakingContract.CycleDuration : 0), @@ -338,7 +395,7 @@ public void Terminate() RegisterProtocols(new[] {hb}); return hb; case RootProtocolId rootId: - var root = new RootProtocol(rootId, _validators, _wallet.EcdsaKeyPair.PrivateKey, + var root = new RootProtocol(rootId, _validators!, _wallet.EcdsaKeyPair.PrivateKey, this, _validatorAttendanceRepository, StakingContract.CycleDuration, HardforkHeights.IsHardfork_9Active((ulong)_era)); RegisterProtocols(new[] {root}); @@ -354,6 +411,124 @@ private void ValidateId(IProtocolIdentifier id) throw new InvalidOperationException($"Era mismatched, expected {_era} got message with {id.Era}"); } + private bool ValidateProtocolId(IProtocolIdentifier id) + { + switch (id) + { + case BinaryBroadcastId bbId: + return ValidateBinaryBroadcastId(bbId); + case CoinId coinId: + return ValidateCoinId(coinId); + case ReliableBroadcastId rbcId: + // need to validate the sender id only + return ValidateSenderId((long) rbcId.SenderId); + case BinaryAgreementId baId: + // created only by internal request, external messages never reach BinaryAgreementId + // so no need to validate + return true; + case CommonSubsetId acsId: + // created only by internal request, external messages never reach BinaryAgreementId + // so no need to validate + return true; + case HoneyBadgerId hbId: + // only has era in the fields + ValidateId(hbId); + return true; + case RootProtocolId rootId: + // only has era in the fields + ValidateId(rootId); + return true; + default: + return false; + } + } + + // There are separate instance of ReliableBroadcast for each validator. + // Check if the SenderId is one of the validator's id before creating ReliableBroadcastId + // Sender id is basically validator's id, so it must be between 0 and N-1 inclusive + private bool ValidateSenderId(long senderId) + { + if (_validators is null) + { + Logger.LogWarning("We don't have validators"); + return false; + } + if (senderId < 0 || senderId >= _validators.N) + { + Logger.LogWarning($"Invalid sender id in consensus message: {senderId}. N: {_validators.N}"); + return false; + } + return true; + } + + // Check if parent protocol is terminated + // Check validity + private bool ValidateCoinId(CoinId coinId) + { + if (coinId.Agreement == -1 && coinId.Epoch == 0) + { + // This type of coinId is created from RootProtocol or via network from another validator + return true; + } + if (ValidateSenderId(coinId.Agreement)) + { + // BinaryAgreement requests such CommonCoin + // Checking if the epoch argument is valid + var createCoinId = CoinToss.CreateCoinId(coinId.Epoch); + if (!createCoinId) + { + Logger.LogInformation($"Invalid CoinId: {coinId}"); + return false; + } + + // Checking if BA is terminated + var binaryAgreementId = CreateBaId(coinId.Agreement); + if (!(binaryAgreementId is null) && + _registry.TryGetValue(binaryAgreementId, out var binaryAgreement) && + binaryAgreement.Terminated) + { + Logger.LogInformation($"BinaryAgreement {binaryAgreementId} for CoinId {coinId} is already terminated"); + return false; + } + + return true; + } + return false; + } + + // same logic as ValidateCoinId + private bool ValidateBinaryBroadcastId(BinaryBroadcastId binaryBroadcastId) + { + if (!ValidateSenderId(binaryBroadcastId.Agreement) + || binaryBroadcastId.Epoch < 0 || (binaryBroadcastId.Epoch & 1) != 0) + { + Logger.LogInformation($"Invalid BbId: {binaryBroadcastId}"); + return false; + } + if (binaryBroadcastId.Epoch > 0) // positive and even + { + var binaryAgreementId = CreateBaId(binaryBroadcastId.Agreement); + // Checking if BA is terminated + if (!(binaryAgreementId is null) && + _registry.TryGetValue(binaryAgreementId, out var binaryAgreement) && + binaryAgreement.Terminated) + { + Logger.LogInformation($"BinaryAgreement {binaryAgreementId} for BinaryBroadcastId " + + $"{binaryBroadcastId} is already terminated"); + return false; + } + + return true; + } + // 0 epoch + return true; + } + + private BinaryAgreementId? CreateBaId(long senderId) + { + return ValidateSenderId(senderId) ? new BinaryAgreementId(_era, senderId) : null; + } + public bool WaitFinish(TimeSpan timeout) { return EnsureProtocol(new RootProtocolId(_era))?.WaitFinish(timeout) ?? true;