Skip to content

Commit

Permalink
Remove InternalTrigger, unused genesis fn, DeltasType and unused pub fns
Browse files Browse the repository at this point in the history
  • Loading branch information
bfish713 committed Feb 1, 2024
1 parent fb2437b commit 1248f94
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 243 deletions.
110 changes: 4 additions & 106 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ use crate::{
traits::{NodeImplementation, Storage},
types::{Event, SystemContextHandle},
};
use async_compatibility_layer::{
art::{async_spawn, async_spawn_local},
channel::UnboundedSender,
};
use async_compatibility_layer::art::async_spawn;
use async_lock::RwLock;
use async_trait::async_trait;
use commit::Committable;
Expand All @@ -42,13 +39,11 @@ use hotshot_types::{
data::Leaf,
error::StorageSnafu,
event::EventType,
message::{
DataMessage, InternalTrigger, Message, MessageKind, ProcessedGeneralConsensusMessage,
},
message::{DataMessage, Message, MessageKind},
simple_certificate::QuorumCertificate,
traits::{
consensus_api::ConsensusApi,
network::{CommunicationChannel, NetworkError},
network::CommunicationChannel,
node_implementation::{ConsensusTime, NodeType},
signature_key::SignatureKey,
states::ValidatedState,
Expand All @@ -66,7 +61,7 @@ use std::{
time::Duration,
};
use tasks::add_vid_task;
use tracing::{debug, info, instrument, trace, warn};
use tracing::{debug, info, instrument, trace};

// -- Rexports
// External
Expand Down Expand Up @@ -270,37 +265,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
.await;
}

/// Marks a given view number as timed out. This should be called a fixed period after a round is started.
///
/// If the round has already ended then this function will essentially be a no-op. Otherwise `run_round` will return shortly after this function is called.
/// # Panics
/// Panics if the current view is not in the channel map
#[instrument(
skip_all,
fields(id = self.inner.id, view = *current_view),
name = "Timeout consensus tasks",
level = "warn"
)]
pub async fn timeout_view(
&self,
current_view: TYPES::Time,
send_replica: UnboundedSender<ProcessedGeneralConsensusMessage<TYPES>>,
send_next_leader: Option<UnboundedSender<ProcessedGeneralConsensusMessage<TYPES>>>,
) {
let msg = ProcessedGeneralConsensusMessage::<TYPES>::InternalTrigger(
InternalTrigger::Timeout(current_view),
);
if let Some(chan) = send_next_leader {
if chan.send(msg.clone()).await.is_err() {
debug!("Error timing out next leader task");
}
};
// NOTE this should always exist
if send_replica.send(msg).await.is_err() {
debug!("Error timing out replica task");
};
}

/// Emit an external event
// A copypasta of `ConsensusApi::send_event`
// TODO: remove with https://github.com/EspressoSystems/HotShot/issues/2407
Expand Down Expand Up @@ -447,72 +411,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
Ok((handle, internal_event_stream))
}

/// Send a broadcast message.
///
/// This is an alias for `hotshot.inner.networking.broadcast_message(msg.into())`.
///
/// # Errors
///
/// Will return any errors that the underlying `broadcast_message` can return.
// this clippy lint is silly. This is async by requirement of the trait.
#[allow(clippy::unused_async)]
pub async fn send_broadcast_message(
&self,
kind: impl Into<MessageKind<TYPES>>,
) -> std::result::Result<(), NetworkError> {
let inner = self.inner.clone();
let pk = self.inner.public_key.clone();
let kind = kind.into();

async_spawn_local(async move {
if inner
.networks
.quorum_network
.broadcast_message(
Message {
version: PROGRAM_PROTOCOL_VERSION,
sender: pk,
kind,
},
// TODO this is morally wrong
&inner.memberships.quorum_membership.clone(),
)
.await
.is_err()
{
warn!("Failed to broadcast message");
};
});
Ok(())
}

/// Send a direct message to a given recipient.
///
/// This is an alias for `hotshot.inner.networking.message_node(msg.into(), recipient)`.
///
/// # Errors
///
/// Will return any errors that the underlying `message_node` can return.
pub async fn send_direct_message(
&self,
kind: impl Into<MessageKind<TYPES>>,
recipient: TYPES::SignatureKey,
) -> std::result::Result<(), NetworkError> {
self.inner
.networks
.quorum_network
.direct_message(
Message {
version: PROGRAM_PROTOCOL_VERSION,
sender: self.inner.public_key.clone(),
kind: kind.into(),
},
recipient,
)
.await?;
Ok(())
}

/// return the timeout for a view for `self`
#[must_use]
pub fn get_next_view_timeout(&self) -> u64 {
Expand Down
73 changes: 2 additions & 71 deletions crates/hotshot/src/types/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use crate::{traits::NodeImplementation, types::Event, SystemContext};
use async_compatibility_layer::channel::UnboundedStream;
use async_lock::RwLock;
use commit::Committable;
use futures::Stream;
use hotshot_task::{
boxed_sync,
Expand All @@ -14,25 +13,12 @@ use hotshot_task::{
};
use hotshot_task_impls::events::HotShotEvent;
#[cfg(feature = "hotshot-testing")]
use hotshot_types::{
message::{MessageKind, SequencingMessage},
traits::election::Membership,
};
use hotshot_types::traits::election::Membership;

use hotshot_types::simple_vote::QuorumData;
use hotshot_types::{
consensus::Consensus,
data::Leaf,
error::HotShotError,
event::EventType,
simple_certificate::QuorumCertificate,
traits::{
node_implementation::{ConsensusTime, NodeType},
storage::Storage,
},
consensus::Consensus, data::Leaf, error::HotShotError, traits::node_implementation::NodeType,
};
use std::sync::Arc;
use tracing::error;

/// Event streaming handle for a [`SystemContext`] instance running in the background
///
Expand Down Expand Up @@ -140,39 +126,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static> SystemContextHandl
self.hotshot.publish_transaction_async(tx).await
}

/// performs the genesis initializaiton
pub async fn maybe_do_genesis_init(&self) {
let _anchor = self.storage();
if let Ok(anchor_leaf) = self.storage().get_anchored_view().await {
if anchor_leaf.view_number == TYPES::Time::genesis() {
let leaf = Leaf::from_stored_view(anchor_leaf);
let mut qc = QuorumCertificate::<TYPES>::genesis();
qc.data = QuorumData {
leaf_commit: leaf.commit(),
};
let event = Event {
view_number: TYPES::Time::genesis(),
event: EventType::Decide {
leaf_chain: Arc::new(vec![leaf]),
qc: Arc::new(qc),
block_size: None,
},
};
self.output_event_stream.publish(event).await;
}
} else {
// TODO (justin) this seems bad. I think we should hard error in this case??
error!("Hotshot storage has no anchor leaf!");
}
}

/// begin consensus by sending a genesis event
/// Use `start_consensus` on `SystemContext` instead
#[deprecated]
pub async fn start_consensus_deprecated(&self) {
self.maybe_do_genesis_init().await;
}

/// Provides a reference to the underlying storage for this [`SystemContext`], allowing access to
/// historical data
pub fn storage(&self) -> &I::Storage {
Expand Down Expand Up @@ -233,26 +186,4 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static> SystemContextHandl
pub async fn get_current_view(&self) -> TYPES::Time {
self.hotshot.inner.consensus.read().await.cur_view
}

/// Wrapper around `HotShotConsensusApi`'s `send_broadcast_consensus_message` function
#[cfg(feature = "hotshot-testing")]
pub async fn send_broadcast_consensus_message(&self, msg: SequencingMessage<TYPES>) {
let _result = self
.hotshot
.send_broadcast_message(MessageKind::from_consensus_message(msg))
.await;
}

/// Wrapper around `HotShotConsensusApi`'s `send_direct_consensus_message` function
#[cfg(feature = "hotshot-testing")]
pub async fn send_direct_consensus_message(
&self,
msg: SequencingMessage<TYPES>,
recipient: TYPES::SignatureKey,
) {
let _result = self
.hotshot
.send_direct_message(MessageKind::from_consensus_message(msg), recipient)
.await;
}
}
4 changes: 0 additions & 4 deletions crates/task-impls/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,6 @@ impl<TYPES: NodeType> NetworkMessageTaskState<TYPES> {
GeneralConsensusMessage::UpgradeVote(message) => {
HotShotEvent::UpgradeVoteRecv(message)
}
GeneralConsensusMessage::InternalTrigger(_) => {
error!("Got unexpected message type in network task!");
return;
}
},
Either::Right(committee_message) => match committee_message {
CommitteeConsensusMessage::DAProposal(proposal) => {
Expand Down
35 changes: 0 additions & 35 deletions crates/types/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,41 +243,6 @@ impl<TYPES: NodeType> HasViewNumber<TYPES> for UpgradeProposal<TYPES> {
}
}

/// A state change encoded in a leaf.
///
/// [`DeltasType`] represents a [block](NodeType::BlockPayload), but it may not contain the block in
/// full. It is guaranteed to contain, at least, a cryptographic commitment to the block, and it
/// provides an interface for resolving the commitment to a full block if the full block is
/// available.
pub trait DeltasType<PAYLOAD: BlockPayload>:
Clone + Debug + for<'a> Deserialize<'a> + PartialEq + Eq + std::hash::Hash + Send + Serialize + Sync
{
/// Errors reported by this type.
type Error: std::error::Error;

/// Get a cryptographic commitment to the block represented by this delta.
fn payload_commitment(&self) -> VidCommitment;

/// Get the full block if it is available, otherwise return this object unchanged.
///
/// # Errors
///
/// Returns the original [`DeltasType`], unchanged, in an [`Err`] variant in the case where the
/// full block is not currently available.
fn try_resolve(self) -> Result<PAYLOAD, Self>;

/// Fill this [`DeltasType`] by providing a complete block.
///
/// After this function succeeds, [`try_resolve`](Self::try_resolve) is guaranteed to return
/// `Ok(block)`.
///
/// # Errors
///
/// Fails if `block` does not match `self.payload_commitment()`, or if the block is not able to be
/// stored for some implementation-defined reason.
fn fill(&mut self, block: PAYLOAD) -> Result<(), Self::Error>;
}

/// The error type for block and its transactions.
#[derive(Snafu, Debug)]
pub enum BlockError {
Expand Down
27 changes: 0 additions & 27 deletions crates/types/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,6 @@ impl<TYPES: NodeType> ViewMessage<TYPES> for MessageKind<TYPES> {
}
}

/// Internal triggers sent by consensus messages.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
#[serde(bound(deserialize = ""))]
pub enum InternalTrigger<TYPES: NodeType> {
// May add other triggers if necessary.
/// Internal timeout at the specified view number.
Timeout(TYPES::Time),
}

/// A processed consensus message for both validating and sequencing consensus.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[serde(bound(deserialize = ""))]
Expand All @@ -149,9 +140,6 @@ pub enum ProcessedGeneralConsensusMessage<TYPES: NodeType> {
Proposal(Proposal<TYPES, QuorumProposal<TYPES>>, TYPES::SignatureKey),
/// Message with a quorum vote.
Vote(QuorumVote<TYPES>, TYPES::SignatureKey),
/// Internal ONLY message indicating a view interrupt.
#[serde(skip)]
InternalTrigger(InternalTrigger<TYPES>),
}

impl<TYPES: NodeType> From<ProcessedGeneralConsensusMessage<TYPES>>
Expand All @@ -163,9 +151,6 @@ impl<TYPES: NodeType> From<ProcessedGeneralConsensusMessage<TYPES>>
GeneralConsensusMessage::Proposal(p)
}
ProcessedGeneralConsensusMessage::Vote(v, _) => GeneralConsensusMessage::Vote(v),
ProcessedGeneralConsensusMessage::InternalTrigger(a) => {
GeneralConsensusMessage::InternalTrigger(a)
}
}
}
}
Expand All @@ -180,9 +165,6 @@ impl<TYPES: NodeType> ProcessedGeneralConsensusMessage<TYPES> {
ProcessedGeneralConsensusMessage::Proposal(p, sender)
}
GeneralConsensusMessage::Vote(v) => ProcessedGeneralConsensusMessage::Vote(v, sender),
GeneralConsensusMessage::InternalTrigger(a) => {
ProcessedGeneralConsensusMessage::InternalTrigger(a)
}
// ED NOTE These are deprecated
GeneralConsensusMessage::TimeoutVote(_) => unimplemented!(),
GeneralConsensusMessage::ViewSyncPreCommitVote(_) => unimplemented!(),
Expand Down Expand Up @@ -313,10 +295,6 @@ pub enum GeneralConsensusMessage<TYPES: NodeType> {

/// Message with an upgrade vote
UpgradeVote(UpgradeVote<TYPES>),

/// Internal ONLY message indicating a view interrupt.
#[serde(skip)]
InternalTrigger(InternalTrigger<TYPES>),
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Hash, Eq)]
Expand Down Expand Up @@ -360,10 +338,6 @@ impl<TYPES: NodeType> SequencingMessage<TYPES> {
p.data.get_view_number()
}
GeneralConsensusMessage::Vote(vote_message) => vote_message.get_view_number(),
GeneralConsensusMessage::InternalTrigger(trigger) => match trigger {
InternalTrigger::Timeout(time) => *time,
},

GeneralConsensusMessage::TimeoutVote(message) => message.get_view_number(),
GeneralConsensusMessage::ViewSyncPreCommitVote(message) => {
message.get_view_number()
Expand Down Expand Up @@ -419,7 +393,6 @@ impl<TYPES: NodeType> SequencingMessage<TYPES> {
GeneralConsensusMessage::Vote(_) | GeneralConsensusMessage::TimeoutVote(_) => {
MessagePurpose::Vote
}
GeneralConsensusMessage::InternalTrigger(_) => MessagePurpose::Internal,
GeneralConsensusMessage::ViewSyncPreCommitVote(_)
| GeneralConsensusMessage::ViewSyncCommitVote(_)
| GeneralConsensusMessage::ViewSyncFinalizeVote(_) => MessagePurpose::ViewSyncVote,
Expand Down

0 comments on commit 1248f94

Please sign in to comment.