Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vote every number of ticks #2141

Merged
merged 4 commits into from
Dec 14, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/compute_leader_finality_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub enum FinalityError {
NoValidSupermajority,
}

pub const COMPUTE_FINALITY_MS: u64 = 1000;
pub const COMPUTE_FINALITY_MS: u64 = 100;

pub struct ComputeLeaderFinalityService {
compute_finality_thread: JoinHandle<()>,
Expand Down
55 changes: 29 additions & 26 deletions src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use std::time::Instant;

pub const BLOCK_TICK_COUNT: u64 = 8;
pub const MAX_ENTRY_RECV_PER_ITER: usize = 512;

#[derive(Debug, PartialEq, Eq, Clone)]
pub enum ReplayStageReturnType {
LeaderRotation(u64, u64, Hash),
Expand Down Expand Up @@ -71,6 +74,9 @@ impl ReplayStage {
let mut entries = window_receiver.recv_timeout(timer)?;
while let Ok(mut more) = window_receiver.try_recv() {
entries.append(&mut more);
if entries.len() >= MAX_ENTRY_RECV_PER_ITER {
break;
}
}

submit(
Expand All @@ -94,9 +100,25 @@ impl ReplayStage {
let (current_leader, _) = bank
.get_current_leader()
.expect("Scheduled leader should be calculated by this point");
let my_id = keypair.pubkey();
for (i, entry) in entries.iter().enumerate() {
res = bank.process_entry(&entry);
let my_id = keypair.pubkey();
if res.is_err() {
// TODO: This will return early from the first entry that has an erroneous
// transaction, instead of processing the rest of the entries in the vector
// of received entries. This is in line with previous behavior when
// bank.process_entries() was used to process the entries, but doesn't solve the
// issue that the bank state was still changed, leading to inconsistencies with the
// leader as the leader currently should not be publishing erroneous transactions
break;
}

if bank.tick_height() % BLOCK_TICK_COUNT == 0 {
if let Some(sender) = vote_blob_sender {
send_validator_vote(bank, vote_account_keypair, &cluster_info, sender).unwrap();
}
}

let (scheduled_leader, _) = bank
.get_current_leader()
.expect("Scheduled leader should be calculated by this point");
Expand All @@ -105,20 +127,11 @@ impl ReplayStage {
if scheduled_leader != current_leader {
cluster_info.write().unwrap().set_leader(scheduled_leader);
}

if my_id == scheduled_leader {
num_entries_to_write = i + 1;
break;
}

if res.is_err() {
// TODO: This will return early from the first entry that has an erroneous
// transaction, instead of processing the rest of the entries in the vector
// of received entries. This is in line with previous behavior when
// bank.process_entries() was used to process the entries, but doesn't solve the
// issue that the bank state was still changed, leading to inconsistencies with the
// leader as the leader currently should not be publishing erroneous transactions
break;
}
}

// If leader rotation happened, only write the entries up to leader rotation.
Expand All @@ -134,7 +147,6 @@ impl ReplayStage {
);

let entries_len = entries.len() as u64;
// TODO: move this to another stage?
// TODO: In line with previous behavior, this will write all the entries even if
// an error occurred processing one of the entries (causing the rest of the entries to
// not be processed).
Expand All @@ -144,9 +156,10 @@ impl ReplayStage {

*entry_height += entries_len;
res?;
if let Some(sender) = vote_blob_sender {
send_validator_vote(bank, vote_account_keypair, &cluster_info, sender)?;
}
inc_new_counter_info!(
"replicate_stage-duration",
duration_as_ms(&now.elapsed()) as usize
);

Ok(())
}
Expand All @@ -173,8 +186,6 @@ impl ReplayStage {
.name("solana-replay-stage".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit);
let now = Instant::now();
let mut next_vote_secs = 1;
let mut entry_height_ = entry_height;
let mut last_entry_id = last_entry_id;
loop {
Expand All @@ -194,21 +205,13 @@ impl ReplayStage {
));
}

// Only vote once a second.
let vote_sender = if now.elapsed().as_secs() > next_vote_secs {
next_vote_secs += 1;
Some(&vote_blob_sender)
} else {
None
};

match Self::process_entries(
&bank,
&cluster_info,
&window_receiver,
&keypair,
&vote_account_keypair,
vote_sender,
Some(&vote_blob_sender),
&ledger_entry_sender,
&mut entry_height_,
&mut last_entry_id,
Expand Down
9 changes: 5 additions & 4 deletions src/vote_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ pub fn send_validator_vote(
) -> Result<()> {
let last_id = bank.last_id();

let shared_blob = create_new_signed_vote_blob(&last_id, vote_account, bank, cluster_info)?;
inc_new_counter_info!("validator-vote_sent", 1);
vote_blob_sender.send(vec![shared_blob])?;

if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, vote_account, bank, cluster_info)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed an issue to store failed votes: #2152

{
inc_new_counter_info!("validator-vote_sent", 1);
vote_blob_sender.send(vec![shared_blob])?;
}
Ok(())
}