Skip to content

Commit

Permalink
Merge branch 'main' into keyao/instance-state
Browse files Browse the repository at this point in the history
  • Loading branch information
shenkeyao committed Jan 30, 2024
2 parents 3d0d229 + 11d2e9a commit 60b0711
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 76 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/hotshot/src/traits/networking/web_server_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ impl<TYPES: NodeType> Inner<TYPES> {
};

while self.running.load(Ordering::Relaxed) {
async_sleep(additional_wait).await;

let endpoint = match message_purpose {
MessagePurpose::Proposal => config::get_proposal_route(view_number),
MessagePurpose::LatestQuorumProposal => config::get_latest_quorum_proposal_route(),
Expand Down Expand Up @@ -437,7 +439,6 @@ impl<TYPES: NodeType> Inner<TYPES> {
async_sleep(self.wait_between_polls).await;
}
}
async_sleep(additional_wait).await;
}
let maybe_event = receiver.try_recv();
match maybe_event {
Expand Down
1 change: 0 additions & 1 deletion crates/hotshot/src/traits/storage/memory_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ mod test {
header,
Some(payload),
dummy_leaf_commit,
Vec::new(),
<<TestTypes as NodeType>::SignatureKey as SignatureKey>::genesis_proposer_pk(),
)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/libp2p-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ libp2p-swarm-derive = { workspace = true }
libp2p-identity = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
serde_json = "1.0.112"
serde_json = "1.0.113"
snafu = { workspace = true }
tide = { version = "0.16", optional = true, default-features = false, features = [
"h1-server",
Expand Down
5 changes: 0 additions & 5 deletions crates/task-impls/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
parent_commitment,
block_header: proposal.block_header.clone(),
block_payload: None,
rejected: Vec::new(),
proposer_id: self.quorum_membership.get_leader(view),
};
let Ok(vote) = QuorumVote::<TYPES>::create_signed_vote(
Expand Down Expand Up @@ -305,7 +304,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
parent_commitment,
block_header: proposal.block_header.clone(),
block_payload: None,
rejected: Vec::new(),
proposer_id: self.quorum_membership.get_leader(view),
};

Expand Down Expand Up @@ -538,7 +536,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
parent_commitment: justify_qc.get_data().leaf_commit,
block_header: proposal.data.block_header.clone(),
block_payload: None,
rejected: Vec::new(),
proposer_id: sender,
};
let state = <TYPES::ValidatedState as ValidatedState>::from_header(
Expand Down Expand Up @@ -611,7 +608,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
parent_commitment,
block_header: proposal.data.block_header.clone(),
block_payload: None,
rejected: Vec::new(),
proposer_id: sender.clone(),
};
let leaf_commitment = leaf.commit();
Expand Down Expand Up @@ -1234,7 +1230,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
parent_commitment: parent_leaf.commit(),
block_header: block_header.clone(),
block_payload: None,
rejected: vec![],
proposer_id: self.api.public_key().clone(),
};

Expand Down
52 changes: 23 additions & 29 deletions crates/task-impls/src/view_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use hotshot_types::{
},
};
use snafu::Snafu;
use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Duration};
use std::{collections::BTreeMap, collections::HashMap, fmt::Debug, sync::Arc, time::Duration};
#[cfg(async_executor_impl = "tokio")]
use tokio::task::JoinHandle;
use tracing::{debug, error, info, instrument, warn};
Expand All @@ -61,9 +61,9 @@ pub enum ViewSyncPhase {
/// Stub of a view sync error
pub struct ViewSyncTaskError {}

/// Type alias for a map from View Number to Vote Task
/// Type alias for a map from View Number to Relay to Vote Task
type RelayMap<TYPES, VOTE, CERT> =
HashMap<<TYPES as NodeType>::Time, VoteCollectionTaskState<TYPES, VOTE, CERT>>;
HashMap<<TYPES as NodeType>::Time, BTreeMap<u64, VoteCollectionTaskState<TYPES, VOTE, CERT>>>;

/// Main view sync task state
pub struct ViewSyncTaskState<
Expand Down Expand Up @@ -197,12 +197,13 @@ impl<
) {
// This certificate is old, we can throw it away
// If next view = cert round, then that means we should already have a task running for it
let mut task_map = self.replica_task_map.write().await;
if self.current_view > view {
debug!("Already in a higher view than the view sync message");
return;
}

let mut task_map = self.replica_task_map.write().await;

if let Some(replica_task) = task_map.remove(&view) {
// Forward event then return
debug!("Forwarding message");
Expand Down Expand Up @@ -276,25 +277,22 @@ impl<
HotShotEvent::ViewSyncPreCommitVoteRecv(ref vote) => {
let mut map = self.pre_commit_relay_map.write().await;
let vote_view = vote.get_view_number();
if let Some(relay_task) = map.remove(&vote_view) {
let relay = vote.get_data().relay;
let relay_map = map.entry(vote_view).or_insert(BTreeMap::new());
if let Some(relay_task) = relay_map.remove(&relay) {
debug!("Forwarding message");
let result = relay_task.handle_event(event.clone()).await;

if result.0 == Some(HotShotTaskCompleted::ShutDown) {
// The protocol has finished
return;
}

map.insert(vote_view, result.1);
relay_map.insert(relay, result.1);
return;
}

// We do not have a relay task already running, so start one
if self
.membership
.get_leader(vote_view + vote.get_data().relay)
!= self.public_key
{
if self.membership.get_leader(vote_view + relay) != self.public_key {
// TODO ED This will occur because everyone is pulling down votes for now. Will be fixed in `https://github.com/EspressoSystems/HotShot/issues/1471`
debug!("View sync vote sent to wrong leader");
return;
Expand All @@ -310,14 +308,16 @@ impl<
};
let vote_collector = create_vote_accumulator(&info, vote.clone(), event).await;
if let Some(vote_task) = vote_collector {
map.insert(vote_view, vote_task);
relay_map.insert(relay, vote_task);
}
}

HotShotEvent::ViewSyncCommitVoteRecv(ref vote) => {
let mut map = self.commit_relay_map.write().await;
let vote_view = vote.get_view_number();
if let Some(relay_task) = map.remove(&vote_view) {
let relay = vote.get_data().relay;
let relay_map = map.entry(vote_view).or_insert(BTreeMap::new());
if let Some(relay_task) = relay_map.remove(&relay) {
debug!("Forwarding message");
let result = relay_task.handle_event(event.clone()).await;

Expand All @@ -326,16 +326,12 @@ impl<
return;
}

map.insert(vote_view, result.1);
relay_map.insert(relay, result.1);
return;
}

// We do not have a relay task already running, so start one
if self
.membership
.get_leader(vote_view + vote.get_data().relay)
!= self.public_key
{
if self.membership.get_leader(vote_view + relay) != self.public_key {
// TODO ED This will occur because everyone is pulling down votes for now. Will be fixed in `https://github.com/EspressoSystems/HotShot/issues/1471`
debug!("View sync vote sent to wrong leader");
return;
Expand All @@ -351,14 +347,16 @@ impl<
};
let vote_collector = create_vote_accumulator(&info, vote.clone(), event).await;
if let Some(vote_task) = vote_collector {
map.insert(vote_view, vote_task);
relay_map.insert(relay, vote_task);
}
}

HotShotEvent::ViewSyncFinalizeVoteRecv(ref vote) => {
let mut map = self.finalize_relay_map.write().await;
let vote_view = vote.get_view_number();
if let Some(relay_task) = map.remove(&vote_view) {
let relay = vote.get_data().relay;
let relay_map = map.entry(vote_view).or_insert(BTreeMap::new());
if let Some(relay_task) = relay_map.remove(&relay) {
debug!("Forwarding message");
let result = relay_task.handle_event(event.clone()).await;

Expand All @@ -367,16 +365,12 @@ impl<
return;
}

map.insert(vote_view, result.1);
relay_map.insert(relay, result.1);
return;
}

// We do not have a relay task already running, so start one
if self
.membership
.get_leader(vote_view + vote.get_data().relay)
!= self.public_key
{
if self.membership.get_leader(vote_view + relay) != self.public_key {
// TODO ED This will occur because everyone is pulling down votes for now. Will be fixed in `https://github.com/EspressoSystems/HotShot/issues/1471`
debug!("View sync vote sent to wrong leader");
return;
Expand All @@ -392,7 +386,7 @@ impl<
};
let vote_collector = create_vote_accumulator(&info, vote.clone(), event).await;
if let Some(vote_task) = vote_collector {
map.insert(vote_view, vote_task);
relay_map.insert(relay, vote_task);
}
}

Expand Down
4 changes: 1 addition & 3 deletions crates/task-impls/src/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{collections::HashMap, fmt::Debug, marker::PhantomData, sync::Arc};

use crate::events::HotShotEvent;
use async_trait::async_trait;
use bitvec::prelude::*;
use either::Either::{self, Left, Right};
use hotshot_task::{
event_stream::{ChannelStream, EventStream},
Expand Down Expand Up @@ -202,8 +201,7 @@ where
}
let new_accumulator = VoteAccumulator {
vote_outcomes: HashMap::new(),
sig_lists: Vec::new(),
signers: bitvec![0;info. membership.total_nodes()],
signers: HashMap::new(),
phantom: PhantomData,
};

Expand Down
2 changes: 0 additions & 2 deletions crates/testing/src/task_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,6 @@ async fn build_quorum_proposal_and_signature(
parent_commitment: parent_leaf.commit(),
block_header: block_header.clone(),
block_payload: None,
rejected: vec![],
proposer_id: *api.public_key(),
};

Expand Down Expand Up @@ -316,7 +315,6 @@ async fn build_quorum_proposal_and_signature(
parent_commitment: parent_leaf.commit(),
block_header: block_header.clone(),
block_payload: None,
rejected: vec![],
proposer_id: quorum_membership.get_leader(ViewNumber::new(cur_view)),
};
let signature_new_view =
Expand Down
1 change: 0 additions & 1 deletion crates/testing/tests/consensus_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ async fn build_vote(
parent_commitment,
block_header: proposal.block_header,
block_payload: None,
rejected: Vec::new(),
proposer_id: membership.get_leader(view),
};
let vote = QuorumVote::<TestTypes>::create_signed_vote(
Expand Down
2 changes: 1 addition & 1 deletion crates/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ tracing = { workspace = true }
typenum = { workspace = true }

[dev-dependencies]
serde_json = "1.0.112"
serde_json = "1.0.113"

[target.'cfg(all(async_executor_impl = "async-std"))'.dependencies]
async-std = { workspace = true }
Expand Down
14 changes: 2 additions & 12 deletions crates/types/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,6 @@ pub struct Leaf<TYPES: NodeType> {
/// It may be empty for nodes not in the DA committee.
pub block_payload: Option<TYPES::BlockPayload>,

/// Transactions that were marked for rejection while collecting the block.
pub rejected: Vec<<TYPES::BlockPayload as BlockPayload>::Transaction>,

/// the proposer id of the leaf
pub proposer_id: TYPES::SignatureKey,
}
Expand All @@ -339,7 +336,6 @@ impl<TYPES: NodeType> PartialEq for Leaf<TYPES> {
&& self.justify_qc == other.justify_qc
&& self.parent_commitment == other.parent_commitment
&& self.block_header == other.block_header
&& self.rejected == other.rejected
}
}

Expand All @@ -349,7 +345,6 @@ impl<TYPES: NodeType> Hash for Leaf<TYPES> {
self.justify_qc.hash(state);
self.parent_commitment.hash(state);
self.block_header.hash(state);
self.rejected.hash(state);
}
}

Expand All @@ -376,7 +371,6 @@ impl<TYPES: NodeType> Leaf<TYPES> {
parent_commitment: fake_commitment(),
block_header: block_header.clone(),
block_payload: Some(block_payload),
rejected: Vec::new(),
proposer_id: <<TYPES as NodeType>::SignatureKey as SignatureKey>::genesis_proposer_pk(),
}
}
Expand Down Expand Up @@ -443,14 +437,12 @@ impl<TYPES: NodeType> Leaf<TYPES> {
pub fn get_payload_commitment(&self) -> VidCommitment {
self.get_block_header().payload_commitment()
}
/// Transactions rejected or invalidated by the application of this leaf.
pub fn get_rejected(&self) -> Vec<<TYPES::BlockPayload as BlockPayload>::Transaction> {
self.rejected.clone()
}

/// Identity of the network participant who proposed this leaf.
pub fn get_proposer_id(&self) -> TYPES::SignatureKey {
self.proposer_id.clone()
}

/// Create a leaf from information stored about a view.
pub fn from_stored_view(stored_view: StoredView<TYPES>) -> Self {
Self {
Expand All @@ -459,7 +451,6 @@ impl<TYPES: NodeType> Leaf<TYPES> {
parent_commitment: stored_view.parent,
block_header: stored_view.block_header,
block_payload: stored_view.block_payload,
rejected: stored_view.rejected,
proposer_id: stored_view.proposer_id,
}
}
Expand Down Expand Up @@ -559,7 +550,6 @@ where
justify_qc: leaf.get_justify_qc(),
block_header: leaf.get_block_header().clone(),
block_payload: leaf.get_block_payload(),
rejected: leaf.get_rejected(),
proposer_id: leaf.get_proposer_id(),
}
}
Expand Down
8 changes: 1 addition & 7 deletions crates/types/src/traits/storage.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
//! Abstraction over on-disk storage of node state

use super::node_implementation::NodeType;
use crate::{
data::Leaf, simple_certificate::QuorumCertificate, traits::BlockPayload, vote::HasViewNumber,
};
use crate::{data::Leaf, simple_certificate::QuorumCertificate, vote::HasViewNumber};
use async_trait::async_trait;
use commit::Commitment;
use derivative::Derivative;
Expand Down Expand Up @@ -130,8 +128,6 @@ pub struct StoredView<TYPES: NodeType> {
///
/// It may be empty for nodes not in the DA committee.
pub block_payload: Option<TYPES::BlockPayload>,
/// transactions rejected in this view
pub rejected: Vec<TYPES::Transaction>,
/// the proposer id
#[derivative(PartialEq = "ignore")]
pub proposer_id: TYPES::SignatureKey,
Expand All @@ -150,7 +146,6 @@ where
block_header: TYPES::BlockHeader,
block_payload: Option<TYPES::BlockPayload>,
parent_commitment: Commitment<Leaf<TYPES>>,
rejected: Vec<<TYPES::BlockPayload as BlockPayload>::Transaction>,
proposer_id: TYPES::SignatureKey,
) -> Self {
Self {
Expand All @@ -159,7 +154,6 @@ where
justify_qc: qc,
block_header,
block_payload,
rejected,
proposer_id,
}
}
Expand Down
Loading

0 comments on commit 60b0711

Please sign in to comment.