Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Perf: Kernel Sync Performance #1987

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
34394c1
Created GetKernels Message.
Nov 14, 2018
a8a2d4d
Modified GetKernels and Kernels message to use approach suggested by …
Nov 14, 2018
9d212f7
rustfmt
Nov 14, 2018
90c2660
Fixing comment and formatting.
Nov 14, 2018
eb87f7a
Adding in MAX_KERNELS
Nov 15, 2018
6eea9ba
Implemented read_kernels to retrieve kernels from the mmr given a blo…
Nov 15, 2018
6082089
Created kernel_sync that requests 512 kernels at a time. Currently on…
Nov 18, 2018
d41774c
rustfmt and prioritizing ENHANCED_TXHASHSET_HIST peers over others. S…
Nov 18, 2018
f940a07
Implemented sync_kernels in pipe & chain.
Nov 18, 2018
6fc61a4
Took care of a few TODOs
Nov 19, 2018
e180629
Addressed a few PR comments, and added root_validate_tip to PMMRHandle.
Nov 21, 2018
f523bdb
Skipping unzip of kernel subdir in txhashset zip if kernels already s…
Nov 23, 2018
12c56ef
Missed Kernels implementation in protocol
Nov 23, 2018
fcc0323
rustfmt
Nov 23, 2018
98fa264
Was using head_header instead of header_head
Nov 23, 2018
b570a22
Was using Tip.hash() instead of Tip.last_block_h.
Nov 24, 2018
e7fcf2d
Using header_head instead of header_head-5 to see if kernel syncing i…
Nov 24, 2018
8df281c
get_header_by_height can't be used during kernel_sync, since header h…
Nov 24, 2018
202e609
Fixing build failures.
Nov 24, 2018
07aacf6
Adding debug logs to help trace down issues.
Nov 24, 2018
14a8852
Attempting to use sync_pmmr instead of header_pmmr during kernel vali…
Nov 24, 2018
79c5790
PMMR leaves are hashed with index (why?), so that was a dead end.
Nov 24, 2018
d468f59
Changing kernels message to group kernels by block hash.
Nov 26, 2018
acff2c2
rustfmt
Nov 26, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use core::core::hash::{Hash, Hashed, ZERO_HASH};
use core::core::merkle_proof::MerkleProof;
use core::core::verifier_cache::VerifierCache;
use core::core::{
Block, BlockHeader, BlockSums, Output, OutputIdentifier, Transaction, TxKernelEntry,
Block, BlockHeader, BlockSums, Output, OutputIdentifier, Transaction, TxKernel, TxKernelEntry,
};
use core::global;
use core::pow;
Expand Down Expand Up @@ -345,6 +345,23 @@ impl Chain {
Ok(())
}

/// Attempt to add new kernels to the kernel mmr.
/// This is only ever used during sync.
pub fn sync_kernels(
&self,
first_kernel_index: u64,
kernels: &Vec<TxKernel>,
opts: Options,
) -> Result<(), Error> {
let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?;

pipe::sync_kernels(first_kernel_index, kernels, &mut ctx)?;

Ok(())
}

fn new_ctx<'a>(
&self,
opts: Options,
Expand Down Expand Up @@ -973,6 +990,18 @@ impl Chain {
txhashset.last_n_kernel(distance)
}

/// kernels by insertion index
pub fn get_kernels_by_insertion_index(&self, start_index: u64, max: u64) -> Vec<TxKernelEntry> {
let mut txhashset = self.txhashset.write();
txhashset.kernels_by_insertion_index(start_index, max).1
}

/// returns the number of leaves in the kernel mmr
pub fn get_num_kernels(&self) -> u64 {
let txhashset = self.txhashset.read();
txhashset.num_kernels()
}

/// outputs by insertion index
pub fn unspent_outputs_by_insertion_index(
&self,
Expand Down
41 changes: 40 additions & 1 deletion chain/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use core::consensus;
use core::core::hash::{Hash, Hashed};
use core::core::verifier_cache::VerifierCache;
use core::core::Committed;
use core::core::{Block, BlockHeader, BlockSums};
use core::core::{Block, BlockHeader, BlockSums, TxKernel};
use core::global;
use core::pow;
use error::{Error, ErrorKind};
Expand Down Expand Up @@ -276,6 +276,45 @@ pub fn sync_block_headers(
}
}

/// Process the kernels.
/// This is only ever used during sync.
pub fn sync_kernels(
first_kernel_index: u64,
kernels: &Vec<TxKernel>,
ctx: &mut BlockContext,
) -> Result<(), Error> {
if let Some(kernel) = kernels.first() {
debug!(
"pipe: sync_kernels: {} kernels from {} at {}",
kernels.len(),
kernel.hash(),
first_kernel_index
);
} else {
return Ok(());
}

if ctx.txhashset.num_kernels() < (first_kernel_index + 1) {
txhashset::extending(&mut ctx.txhashset, &mut ctx.batch, |extension| {
// DAVID: Rewind mmr to correct kernel index just to be safe?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would make sense.


for kernel in kernels {
// Ensure kernel is self-consistent
kernel.verify()?;

// Apply the kernel to the kernel MMR.
extension.apply_kernel(kernel)?;

// DAVID: If kernel is last in block, validate root.
// Use RewindableKernelView? Or modify validate_kernel_history?
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider pushing that whole loop down in txhashset in some apply_kernels.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also shouldn't there be some MMR reconstruction after this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, looks like apply does it. But there should be some clearer separation around validation. Here you're verifying each kernel, but the whole state validation will do that again. So we either slip the latter one or this one.

Copy link
Contributor Author

@DavidBurkett DavidBurkett Nov 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends what you mean by whole state validation. If you're talking about after the TxHashSet is fully downloaded and processed, sure. But then we'd have to go back and start the kernel validation process all over again if something's wrong with the kernel. If instead you're referring to the expected call to "validate_kernel_history" on line 309, then I disagree. It doesn't verify the signature of the kernel. It only verifies that the MMR root matches the header.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After re-reading, it appears you're talking about verify_kernel_signatures in Extension. Yea, I can probably skip that verify if it's already been done. It'd probably be easiest to store a 'validated' boolean on the kernel. If you're opposed to adding the extra byte (since kernels are all written to disk), I can look into other options.


Ok(())
})?;
}
Ok(())
}

/// Process block header as part of "header first" block propagation.
/// We validate the header but we do not store it or update header head based
/// on this. We will update these once we get the block back after requesting
Expand Down
32 changes: 29 additions & 3 deletions chain/src/txhashset/txhashset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,23 @@ impl TxHashSet {
kernel_pmmr.get_last_n_insertions(distance)
}

/// returns kernels from the given insertion (leaf) index up to the
/// specified limit. Also returns the last index actually populated
pub fn kernels_by_insertion_index(
&mut self,
start_index: u64,
max_count: u64,
) -> (u64, Vec<TxKernelEntry>) {
let kernel_pmmr: PMMR<TxKernelEntry, _> =
PMMR::at(&mut self.kernel_pmmr_h.backend, self.kernel_pmmr_h.last_pos);
kernel_pmmr.elements_from_insertion_index(start_index, max_count)
}

/// returns the number of kernels (leaves) in the kernel_pmmr
pub fn num_kernels(&self) -> u64 {
pmmr::n_leaves(self.kernel_pmmr_h.last_pos)
}

/// returns outputs from the given insertion (leaf) index up to the
/// specified limit. Also returns the last index actually populated
pub fn outputs_by_insertion_index(
Expand Down Expand Up @@ -863,8 +880,17 @@ impl<'a> Extension<'a> {
self.apply_input(input)?;
}

for kernel in b.kernels() {
self.apply_kernel(kernel)?;
// Because of kernel sync, it's possible we already have the kernels for this block.
let kernel_size = pmmr::n_leaves(self.kernel_pmmr.last_pos);
if b.header.kernel_mmr_size > kernel_size {
let num_kernels_behind = b.header.kernel_mmr_size - kernel_size;
let mut num_kernels_added = 0;
for kernel in b.kernels() {
if num_kernels_added < num_kernels_behind {
num_kernels_added += 1;
self.apply_kernel(kernel)?;
}
}
}

// Update the header on the extension to reflect the block we just applied.
Expand Down Expand Up @@ -951,7 +977,7 @@ impl<'a> Extension<'a> {
}

/// Push kernel onto MMR (hash and data files).
fn apply_kernel(&mut self, kernel: &TxKernel) -> Result<(), Error> {
pub fn apply_kernel(&mut self, kernel: &TxKernel) -> Result<(), Error> {
self.kernel_pmmr
.push(TxKernelEntry::from(kernel.clone()))
.map_err(&ErrorKind::TxHashSetErr)?;
Expand Down
2 changes: 1 addition & 1 deletion core/src/core/pmmr/pmmr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ pub fn bintree_rightmost(num: u64) -> u64 {
num - bintree_postorder_height(num)
}

/// Gets the position of the rightmost node (i.e. leaf) beneath the provided subtree root.
/// Gets the position of the leftmost node (i.e. leaf) beneath the provided subtree root.
pub fn bintree_leftmost(num: u64) -> u64 {
let height = bintree_postorder_height(num);
num + 2 - (2 << height)
Expand Down
4 changes: 2 additions & 2 deletions p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ pub use peers::Peers;
pub use serv::{DummyAdapter, Server};
pub use store::{PeerData, State};
pub use types::{
Capabilities, ChainAdapter, Direction, Error, P2PConfig, PeerInfo, ReasonForBan, Seeding,
TxHashSetRead, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS,
Capabilities, ChainAdapter, Direction, Error, P2PConfig, PeerInfo, ReasonForBan, Seeding,
TxHashSetRead, MAX_BLOCK_HEADERS, MAX_KERNELS, MAX_LOCATORS, MAX_PEER_ADDRS,
};
87 changes: 86 additions & 1 deletion p2p/src/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ use std::{thread, time};
use core::consensus;
use core::core::hash::Hash;
use core::core::BlockHeader;
use core::core::TxKernel;
use core::pow::Difficulty;
use core::ser::{self, Readable, Reader, Writeable, Writer};

use types::{Capabilities, Error, ReasonForBan, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS};
use types::{
Capabilities, Error, ReasonForBan, MAX_BLOCK_HEADERS, MAX_KERNELS, MAX_LOCATORS, MAX_PEER_ADDRS,
};

/// Current latest version of the protocol
pub const PROTOCOL_VERSION: u32 = 1;
Expand Down Expand Up @@ -70,6 +73,8 @@ enum_from_primitive! {
BanReason = 18,
GetTransaction = 19,
TransactionKernel = 20,
GetKernels = 21,
Kernels = 22,
}
}

Expand Down Expand Up @@ -97,6 +102,8 @@ fn max_msg_size(msg_type: Type) -> u64 {
Type::BanReason => 64,
Type::GetTransaction => 32,
Type::TransactionKernel => 32,
Type::GetKernels => 48,
Type::Kernels => 52 + 114 * MAX_KERNELS as u64,
}
}

Expand Down Expand Up @@ -764,3 +771,81 @@ impl Readable for TxHashSetArchive {
})
}
}

/// Request to get the kernels(leaves) from the kernel MMR up to the given block.
/// This will return the kernels from the specified leaf index onward, up to MAX_KERNELS at a time.
pub struct GetKernels {
/// Hash of the last block for which the kernels should be provided.
/// Also used to identify the chain/fork the sender is requesting kernels for.
pub last_hash: Hash,
/// Height of the corresponding block.
pub last_height: u64,
/// The (leaf) index of the first kernel being requested.
pub first_kernel_index: u64,
}

impl Writeable for GetKernels {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
self.last_hash.write(writer)?;
writer.write_u64(self.last_height)?;
writer.write_u64(self.first_kernel_index)?;
Ok(())
}
}

impl Readable for GetKernels {
fn read(reader: &mut Reader) -> Result<GetKernels, ser::Error> {
Ok(GetKernels {
last_hash: Hash::read(reader)?,
last_height: reader.read_u64()?,
first_kernel_index: reader.read_u64()?,
})
}
}

/// Response to a Get kernels request containing the requested kernels.
pub struct Kernels {
/// Hash of the block identifying the chain/fork the kernels belong to.
pub last_hash: Hash,
/// Height of the corresponding block.
pub last_height: u64,
/// The (leaf) index of the first kernel returned.
pub first_kernel_index: u64,
/// The requested kernels in the order they appear in the Kernel MMR leafset.
pub kernels: Vec<TxKernel>,
}

impl Writeable for Kernels {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
self.last_hash.write(writer)?;
ser_multiwrite!(
writer,
[write_u64, self.last_height],
[write_u64, self.first_kernel_index],
[write_u32, self.kernels.len() as u32]
);
for kernel in &self.kernels {
kernel.write(writer)?
}
Ok(())
}
}

impl Readable for Kernels {
fn read(reader: &mut Reader) -> Result<Kernels, ser::Error> {
let last_hash = Hash::read(reader)?;
let (last_height, first_kernel_index, num_kernels) =
ser_multiread!(reader, read_u64, read_u64, read_u32);

let mut kernels = Vec::with_capacity(num_kernels as usize);
for _ in 0..num_kernels {
kernels.push(TxKernel::read(reader)?);
}
Ok(Kernels {
last_hash,
last_height,
first_kernel_index,
kernels,
})
}
}
55 changes: 54 additions & 1 deletion p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use core::core::hash::{Hash, Hashed};
use core::pow::Difficulty;
use core::{core, global};
use handshake::Handshake;
use msg::{self, BanReason, GetPeerAddrs, Locator, Ping, TxHashSetRequest};
use msg::{self, BanReason, GetKernels, GetPeerAddrs, Locator, Ping, TxHashSetRequest};
use protocol::Protocol;
use types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerInfo, ReasonForBan, TxHashSetRead,
Expand Down Expand Up @@ -415,6 +415,44 @@ impl Peer {
)
}

/// Sends a request for the kernels up to the given block height and hash.
/// NOTE: Only sends the request if remote peer has ENHANCED_TXHASHSET_HIST capability.
pub fn send_kernel_request(
&self,
last_hash: Hash,
last_height: u64,
first_kernel_index: u64,
) -> Result<bool, Error> {
if self
.info
.capabilities
.contains(Capabilities::ENHANCED_TXHASHSET_HIST)
{
trace!(
"Asking {} for kernels up to block {} {} starting with kernel {}.",
self.info.addr,
last_hash,
last_height,
first_kernel_index
);
self.connection.as_ref().unwrap().lock().send(
&GetKernels {
last_hash,
last_height,
first_kernel_index,
},
msg::Type::GetKernels,
);
Ok(true)
} else {
trace!(
"Not requesting kernels from {} (peer not capable)",
self.info.addr
);
Ok(false)
}
}

/// Stops the peer, closing its connection
pub fn stop(&self) {
stop_with_connection(&self.connection.as_ref().unwrap().lock());
Expand Down Expand Up @@ -579,6 +617,21 @@ impl ChainAdapter for TrackingAdapter {
self.adapter
.txhashset_download_update(start_time, downloaded_size, total_size)
}

fn read_kernels(&self, last_hash: Hash, first_kernel_index: u64) -> Vec<core::TxKernel> {
self.adapter.read_kernels(last_hash, first_kernel_index)
}

fn kernels_received(
&self,
last_hash: Hash,
first_kernel_index: u64,
kernels: Vec<core::TxKernel>,
peer_addr: SocketAddr,
) -> bool {
self.adapter
.kernels_received(last_hash, first_kernel_index, kernels, peer_addr)
}
}

impl NetAdapter for TrackingAdapter {
Expand Down
15 changes: 15 additions & 0 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,21 @@ impl ChainAdapter for Peers {
self.adapter
.txhashset_download_update(start_time, downloaded_size, total_size)
}

fn read_kernels(&self, last_hash: Hash, first_kernel_index: u64) -> Vec<core::TxKernel> {
self.adapter.read_kernels(last_hash, first_kernel_index)
}

fn kernels_received(
&self,
last_hash: Hash,
first_kernel_index: u64,
kernels: Vec<core::TxKernel>,
peer_addr: SocketAddr,
) -> bool {
self.adapter
.kernels_received(last_hash, first_kernel_index, kernels, peer_addr)
}
}

impl NetAdapter for Peers {
Expand Down
Loading