Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] kernel data file download #2743

Closed
wants to merge 11 commits into from
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions api/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use self::pool_api::PoolInfoHandler;
use self::pool_api::PoolPushHandler;
use self::server_api::IndexHandler;
use self::server_api::StatusHandler;
use self::server_api::KernelDownloadHandler;
use self::transactions_api::TxHashSetHandler;
use crate::auth::{BasicAuthMiddleware, GRIN_BASIC_REALM};
use crate::chain;
Expand Down Expand Up @@ -135,6 +136,9 @@ pub fn build_router(
chain: Arc::downgrade(&chain),
peers: Arc::downgrade(&peers),
};
let kernel_download_handler = KernelDownloadHandler {
peers: Arc::downgrade(&peers),
};
let txhashset_handler = TxHashSetHandler {
chain: Arc::downgrade(&chain),
};
Expand Down Expand Up @@ -165,6 +169,7 @@ pub fn build_router(
router.add_route("/v1/chain/validate", Arc::new(chain_validation_handler))?;
router.add_route("/v1/txhashset/*", Arc::new(txhashset_handler))?;
router.add_route("/v1/status", Arc::new(status_handler))?;
router.add_route("/v1/kerneldownload", Arc::new(kernel_download_handler))?;
router.add_route("/v1/pool", Arc::new(pool_info_handler))?;
router.add_route("/v1/pool/push", Arc::new(pool_push_handler))?;
router.add_route("/v1/peers/all", Arc::new(peers_all_handler))?;
Expand Down
39 changes: 38 additions & 1 deletion api/src/handlers/server_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::rest::*;
use crate::router::{Handler, ResponseFuture};
use crate::types::*;
use crate::web::*;
use hyper::{Body, Request};
use hyper::{Body, Request, StatusCode};
use std::sync::Weak;

// RESTful index of available api endpoints
Expand All @@ -36,6 +36,43 @@ impl Handler for IndexHandler {
}
}

pub struct KernelDownloadHandler {
pub peers: Weak<p2p::Peers>,
}

impl Handler for KernelDownloadHandler {
fn post(&self, _req: Request<Body>) -> ResponseFuture {
if let Some(peer) = w_fut!(&self.peers).most_work_peer() {
match peer.send_kernel_data_request() {
Ok(_) => response(StatusCode::OK, "{}"),
Err(e) => response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("requesting kernel data from peer failed: {:?}", e),
),
}
} else {
response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("requesting kernel data from peer failed (no peers)"),
)
}

// w_fut!(&self.peers).most_work_peer().and_then(|peer| {
// peer.send_kernel_data_request().and_then(|_| {
// response(StatusCode::OK, "{}")
// })
// })?;

// match w_fut!(&self.chain).xxx() {
// Ok(_) => response(StatusCode::OK, "{}"),
// Err(e) => response(
// StatusCode::INTERNAL_SERVER_ERROR,
// format!("requesting kernel download failed: {}", e),
// ),
// }
}
}

/// Status handler. Post a summary of the server status
/// GET /v1/status
pub struct StatusHandler {
Expand Down
1 change: 1 addition & 0 deletions chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ croaring = "0.3"
log = "0.4"
serde = "1"
serde_derive = "1"
tempfile = "3.0.5"
chrono = "0.4.4"
lru-cache = "0.1"
lazy_static = "1"
Expand Down
44 changes: 44 additions & 0 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::core::core::{
};
use crate::core::global;
use crate::core::pow;
use crate::core::ser::{Readable, StreamingReader};
use crate::error::{Error, ErrorKind};
use crate::lmdb;
use crate::pipe;
Expand All @@ -38,10 +39,12 @@ use grin_store::Error::NotFoundErr;
use std::collections::HashMap;
use std::env;
use std::fs::File;
use std::io::Read;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tempfile::tempdir;

/// Orphan pool size is limited by MAX_ORPHAN_SIZE
pub const MAX_ORPHAN_SIZE: usize = 200;
Expand Down Expand Up @@ -672,6 +675,11 @@ impl Chain {
self.txhashset.read().roots()
}

pub fn kernel_data_read(&self) -> Result<File, Error> {
let txhashset = self.txhashset.read();
txhashset::rewindable_kernel_view(&txhashset, |view| view.kernel_data_read())
}

/// Provides a reading view into the current txhashset state as well as
/// the required indexes for a consumer to rewind to a consistent state
/// at the provided block hash.
Expand Down Expand Up @@ -857,6 +865,42 @@ impl Chain {
txhashset::clean_header_folder(&sandbox_dir);
}

/// TODO - Pull this out into a "rebuildable kernel view" or something similar?
pub fn kernel_data_write(&self, reader: &mut Read) -> Result<(), Error> {
error!("***** kernel_data_write: entered");

// write to an actual tmp file with a path?
let dir = tempdir()?;
let path = dir.path();
error!("***** tempdir: {:?}", path);

let mut txhashset =
TxHashSet::open(path.to_str().unwrap().into(), self.store.clone(), None)?;

let mut streaming_reader = StreamingReader::new(reader, Duration::from_secs(1));

let mut batch = self.store.batch()?;
txhashset::extending(&mut txhashset, &mut batch, |extension| {
while let Ok(kernel) = TxKernelEntry::read(&mut streaming_reader) {
debug!("***** applying a kernel");
extension.apply_kernel(&kernel.kernel)?;
}
Ok(())
})?;

error!(
"***** total bytes read (and applied): {}",
streaming_reader.total_bytes_read()
);

// Some kind of "rebuildable" kernel view
// Streaming (somehow) the reader here and apply_kernel each time -
// * the size file
// * the hash file

Ok(())
}

/// Writes a reading view on a txhashset state that's been provided to us.
/// If we're willing to accept that new state, the data stream will be
/// read as a zip file, unzipped and the resulting state files should be
Expand Down
17 changes: 17 additions & 0 deletions chain/src/txhashset/rewindable_kernel_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

//! Lightweight readonly view into kernel MMR for convenience.

use std::fs::File;
use std::io;

use crate::core::core::pmmr::RewindablePMMR;
use crate::core::core::{BlockHeader, TxKernel};
use crate::error::{Error, ErrorKind};
Expand Down Expand Up @@ -78,4 +81,18 @@ impl<'a> RewindableKernelView<'a> {
}
Ok(())
}

/// Read the "raw" kernel backend data file (via temp file for consistent view on data).
pub fn kernel_data_read(&self) -> Result<File, Error> {
let file = self
.pmmr
.backend()
.temp_data_file()
.map_err(|_| ErrorKind::FileReadErr("Data file woes".into()))?;
Ok(file)
}

pub fn kernel_data_write(&self, file: &File) -> Result<(), Error> {
Ok(())
}
}
2 changes: 1 addition & 1 deletion chain/src/txhashset/txhashset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ impl<'a> Extension<'a> {
}

/// Push kernel onto MMR (hash and data files).
fn apply_kernel(&mut self, kernel: &TxKernel) -> Result<(), Error> {
pub fn apply_kernel(&mut self, kernel: &TxKernel) -> Result<(), Error> {
self.kernel_pmmr
.push(kernel)
.map_err(&ErrorKind::TxHashSetErr)?;
Expand Down
9 changes: 4 additions & 5 deletions core/src/core/pmmr/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use croaring::Bitmap;
use crate::core::hash::Hash;
use crate::core::BlockHeader;
use crate::ser::PMMRable;
use std::fs::File;
use std::io::Read;
use std::path::Path;

/// Storage backend for the MMR, just needs to be indexed by order of insertion.
Expand Down Expand Up @@ -54,17 +56,14 @@ pub trait Backend<T: PMMRable> {
/// Iterator over current (unpruned, unremoved) leaf positions.
fn leaf_pos_iter(&self) -> Box<Iterator<Item = u64> + '_>;

fn temp_data_file(&self) -> Result<File, String>;

/// Remove Hash by insertion position. An index is also provided so the
/// underlying backend can implement some rollback of positions up to a
/// given index (practically the index is the height of a block that
/// triggered removal).
fn remove(&mut self, position: u64) -> Result<(), String>;

/// Returns the data file path.. this is a bit of a hack now that doesn't
/// sit well with the design, but TxKernels have to be summed and the
/// fastest way to to be able to allow direct access to the file
fn get_data_file_path(&self) -> &Path;

/// Release underlying datafiles and locks
fn release_files(&mut self);

Expand Down
5 changes: 0 additions & 5 deletions core/src/core/pmmr/pmmr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,6 @@ where
self.last_pos
}

/// Return the path of the data file (needed to sum kernels efficiently)
pub fn data_file_path(&self) -> &Path {
self.backend.get_data_file_path()
}

/// Debugging utility to print information about the MMRs. Short version
/// only prints the last 8 nodes.
pub fn dump(&self, short: bool) {
Expand Down
4 changes: 4 additions & 0 deletions core/src/core/pmmr/rewindable_pmmr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ where
}
}

pub fn backend(&'a self) -> &Backend<T> {
self.backend
}

/// Build a new readonly PMMR pre-initialized to
/// last_pos with the provided backend.
pub fn at(backend: &'a B, last_pos: u64) -> RewindablePMMR<'_, T, B> {
Expand Down
4 changes: 0 additions & 4 deletions core/tests/vec_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,6 @@ impl<T: PMMRable> Backend<T> for VecBackend<T> {
Ok(())
}

fn get_data_file_path(&self) -> &Path {
Path::new("")
}

fn release_files(&mut self) {}

fn dump_stats(&self) {}
Expand Down
1 change: 1 addition & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ num = "0.1"
rand = "0.5"
serde = "1"
serde_derive = "1"
tempfile = "3.0.5"
log = "0.4"
chrono = { version = "0.4.4", features = ["serde"] }

Expand Down
29 changes: 13 additions & 16 deletions p2p/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! stream and make sure we get the right number of bytes out.

use std::fs::File;
use std::io::{self, Read, Write};
use std::io::{self, BufReader, BufWriter, Read, Write};
use std::net::{Shutdown, TcpStream};
use std::sync::{mpsc, Arc};
use std::{cmp, thread, time};
Expand Down Expand Up @@ -132,21 +132,18 @@ impl<'a> Response<'a> {
let mut sent_bytes = sent_bytes.write();
sent_bytes.inc(msg.len() as u64);
}
if let Some(mut file) = self.attachment {
let mut buf = [0u8; 8000];
loop {
match file.read(&mut buf[..]) {
Ok(0) => break,
Ok(n) => {
write_all(&mut self.stream, &buf[..n], time::Duration::from_secs(10))?;
// Increase sent bytes "quietly" without incrementing the counter.
// (In a loop here for the single attachment).
let mut sent_bytes = sent_bytes.write();
sent_bytes.inc_quiet(n as u64);
}
Err(e) => return Err(From::from(e)),
}
}
if let Some(file) = self.attachment {
error!(
"***** writing attachment (reading from file): file size: {}",
file.metadata().unwrap().len()
);
let mut reader = BufReader::new(file);
let mut writer = BufWriter::new(self.stream);

let bytes_written = io::copy(&mut reader, &mut writer)?;
sent_bytes.write().inc_quiet(bytes_written as u64);

error!("***** wrote some bytes: {}", bytes_written);
}
Ok(())
}
Expand Down
43 changes: 43 additions & 0 deletions p2p/src/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ enum_from_primitive! {
BanReason = 18,
GetTransaction = 19,
TransactionKernel = 20,
KernelDataRequest = 21,
KernelDataResponse = 22,
}
}

Expand Down Expand Up @@ -98,6 +100,8 @@ fn max_msg_size(msg_type: Type) -> u64 {
Type::BanReason => 64,
Type::GetTransaction => 32,
Type::TransactionKernel => 32,
Type::KernelDataRequest => 0,
Type::KernelDataResponse => 8,
}
}

Expand Down Expand Up @@ -612,3 +616,42 @@ impl Readable for TxHashSetArchive {
})
}
}

pub struct KernelDataRequest {}

impl Writeable for KernelDataRequest {
fn write<W: Writer>(&self, _writer: &mut W) -> Result<(), ser::Error> {
Ok(())
}
}

pub struct KernelDataResponse {
/// Support for multiple versions of data serialization.
/// 0 - the original "fixed size" kernel serialization format (lock_height and fee required)
/// 1 - "variable size" kernel serialization
/// * lock_height only included in HeightLocked kernels
/// * fee omitted for coinbase kernels
///
/// Note: Both versions will produce identical hash values (and corresponding MMR root).
/// Only the "data" serialization differs.
///
pub version: u8,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this will be very useful. If the peer that requested the kernel data can't support v1, then it shouldn't ever receive v1 kernel data. It should be up to the requester to ask which version it needs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if we implement the protocol versioning scheme I suggested here: mimblewimble/grin-pm#102 (comment) , we won't even need a version byte at all.

/// Size in bytes of the kernel data file.
pub bytes: u64,
}

impl Writeable for KernelDataResponse {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
writer.write_u8(self.version)?;
writer.write_u64(self.bytes)?;
Ok(())
}
}

impl Readable for KernelDataResponse {
fn read(reader: &mut dyn Reader) -> Result<KernelDataResponse, ser::Error> {
let version = reader.read_u8()?;
let bytes = reader.read_u64()?;
Ok(KernelDataResponse { version, bytes })
}
}
Loading