Skip to content

Commit

Permalink
initial support for kernel data download (#2765)
Browse files Browse the repository at this point in the history
* initial support for kernel data download

* fix vec backend for tests

* cleanup after rebase
  • Loading branch information
antiochp authored May 14, 2019
1 parent ff1c551 commit e56cd55
Show file tree
Hide file tree
Showing 19 changed files with 280 additions and 10 deletions.
25 changes: 25 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions api/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use self::pool_api::PoolInfoHandler;
use self::pool_api::PoolPushHandler;
use self::server_api::IndexHandler;
use self::server_api::StatusHandler;
use self::server_api::KernelDownloadHandler;
use self::transactions_api::TxHashSetHandler;
use crate::auth::{BasicAuthMiddleware, GRIN_BASIC_REALM};
use crate::chain;
Expand Down Expand Up @@ -135,6 +136,9 @@ pub fn build_router(
chain: Arc::downgrade(&chain),
peers: Arc::downgrade(&peers),
};
let kernel_download_handler = KernelDownloadHandler {
peers: Arc::downgrade(&peers),
};
let txhashset_handler = TxHashSetHandler {
chain: Arc::downgrade(&chain),
};
Expand Down Expand Up @@ -165,6 +169,7 @@ pub fn build_router(
router.add_route("/v1/chain/validate", Arc::new(chain_validation_handler))?;
router.add_route("/v1/txhashset/*", Arc::new(txhashset_handler))?;
router.add_route("/v1/status", Arc::new(status_handler))?;
router.add_route("/v1/kerneldownload", Arc::new(kernel_download_handler))?;
router.add_route("/v1/pool", Arc::new(pool_info_handler))?;
router.add_route("/v1/pool/push", Arc::new(pool_push_handler))?;
router.add_route("/v1/peers/all", Arc::new(peers_all_handler))?;
Expand Down
25 changes: 24 additions & 1 deletion api/src/handlers/server_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::rest::*;
use crate::router::{Handler, ResponseFuture};
use crate::types::*;
use crate::web::*;
use hyper::{Body, Request};
use hyper::{Body, Request, StatusCode};
use std::sync::Weak;

// RESTful index of available api endpoints
Expand All @@ -36,6 +36,29 @@ impl Handler for IndexHandler {
}
}

pub struct KernelDownloadHandler {
pub peers: Weak<p2p::Peers>,
}

impl Handler for KernelDownloadHandler {
fn post(&self, _req: Request<Body>) -> ResponseFuture {
if let Some(peer) = w_fut!(&self.peers).most_work_peer() {
match peer.send_kernel_data_request() {
Ok(_) => response(StatusCode::OK, "{}"),
Err(e) => response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("requesting kernel data from peer failed: {:?}", e),
),
}
} else {
response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("requesting kernel data from peer failed (no peers)"),
)
}
}
}

/// Status handler. Post a summary of the server status
/// GET /v1/status
pub struct StatusHandler {
Expand Down
24 changes: 24 additions & 0 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::core::core::{
};
use crate::core::global;
use crate::core::pow;
use crate::core::ser::{Readable, StreamingReader};
use crate::error::{Error, ErrorKind};
use crate::pipe;
use crate::store;
Expand All @@ -36,6 +37,7 @@ use crate::util::{Mutex, RwLock, StopState};
use grin_store::Error::NotFoundErr;
use std::collections::HashMap;
use std::fs::{self, File};
use std::io::Read;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -659,6 +661,28 @@ impl Chain {
self.txhashset.read().roots()
}

/// Provides a reading view into the current kernel state.
pub fn kernel_data_read(&self) -> Result<File, Error> {
let txhashset = self.txhashset.read();
txhashset::rewindable_kernel_view(&txhashset, |view| view.kernel_data_read())
}

/// Writes kernels provided to us (via a kernel data download).
/// Currently does not write these to disk and simply deserializes
/// the provided data.
/// TODO - Write this data to disk and validate the rebuilt kernel MMR.
pub fn kernel_data_write(&self, reader: &mut Read) -> Result<(), Error> {
let mut count = 0;
let mut stream = StreamingReader::new(reader, Duration::from_secs(1));
while let Ok(_kernel) = TxKernelEntry::read(&mut stream) {
count += 1;
}

debug!("kernel_data_write: read {} kernels", count);

Ok(())
}

/// Provides a reading view into the current txhashset state as well as
/// the required indexes for a consumer to rewind to a consistent state
/// at the provided block hash.
Expand Down
12 changes: 12 additions & 0 deletions chain/src/txhashset/rewindable_kernel_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

//! Lightweight readonly view into kernel MMR for convenience.
use std::fs::File;

use crate::core::core::pmmr::RewindablePMMR;
use crate::core::core::{BlockHeader, TxKernel};
use crate::error::{Error, ErrorKind};
Expand Down Expand Up @@ -78,4 +80,14 @@ impl<'a> RewindableKernelView<'a> {
}
Ok(())
}

/// Read the "raw" kernel backend data file (via temp file for consistent view on data).
pub fn kernel_data_read(&self) -> Result<File, Error> {
let file = self
.pmmr
.backend()
.data_as_temp_file()
.map_err(|_| ErrorKind::FileReadErr("Data file woes".into()))?;
Ok(file)
}
}
7 changes: 7 additions & 0 deletions core/src/core/pmmr/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fs::File;

use croaring::Bitmap;

use crate::core::hash::Hash;
Expand Down Expand Up @@ -59,6 +61,11 @@ pub trait Backend<T: PMMRable> {
/// triggered removal).
fn remove(&mut self, position: u64) -> Result<(), String>;

/// Creates a temp file containing the contents of the underlying data file
/// from the backend storage. This allows a caller to see a consistent view
/// of the data without needing to lock the backend storage.
fn data_as_temp_file(&self) -> Result<File, String>;

/// Release underlying datafiles and locks
fn release_files(&mut self);

Expand Down
5 changes: 5 additions & 0 deletions core/src/core/pmmr/rewindable_pmmr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ where
}
}

/// Reference to the underlying storage backend.
pub fn backend(&'a self) -> &Backend<T> {
self.backend
}

/// Build a new readonly PMMR pre-initialized to
/// last_pos with the provided backend.
pub fn at(backend: &'a B, last_pos: u64) -> RewindablePMMR<'_, T, B> {
Expand Down
6 changes: 6 additions & 0 deletions core/tests/vec_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fs::File;

use self::core::core::hash::{DefaultHashable, Hash};
use self::core::core::pmmr::{self, Backend};
use self::core::core::BlockHeader;
Expand Down Expand Up @@ -103,6 +105,10 @@ impl<T: PMMRable> Backend<T> for VecBackend<T> {
Some(data.as_elmt())
}

fn data_as_temp_file(&self) -> Result<File, String> {
unimplemented!()
}

fn leaf_pos_iter(&self) -> Box<Iterator<Item = u64> + '_> {
unimplemented!()
}
Expand Down
1 change: 1 addition & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ num = "0.1"
rand = "0.6"
serde = "1"
serde_derive = "1"
tempfile = "3.0.5"
log = "0.4"
chrono = { version = "0.4.4", features = ["serde"] }

Expand Down
31 changes: 31 additions & 0 deletions p2p/src/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ enum_from_primitive! {
BanReason = 18,
GetTransaction = 19,
TransactionKernel = 20,
KernelDataRequest = 21,
KernelDataResponse = 22,
}
}

Expand Down Expand Up @@ -111,6 +113,8 @@ fn max_msg_size(msg_type: Type) -> u64 {
Type::BanReason => 64,
Type::GetTransaction => 32,
Type::TransactionKernel => 32,
Type::KernelDataRequest => 0,
Type::KernelDataResponse => 8,
}
}

Expand Down Expand Up @@ -714,3 +718,30 @@ impl Readable for TxHashSetArchive {
})
}
}

pub struct KernelDataRequest {}

impl Writeable for KernelDataRequest {
fn write<W: Writer>(&self, _writer: &mut W) -> Result<(), ser::Error> {
Ok(())
}
}

pub struct KernelDataResponse {
/// Size in bytes of the attached kernel data file.
pub bytes: u64,
}

impl Writeable for KernelDataResponse {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
writer.write_u64(self.bytes)?;
Ok(())
}
}

impl Readable for KernelDataResponse {
fn read(reader: &mut dyn Reader) -> Result<KernelDataResponse, ser::Error> {
let bytes = reader.read_u64()?;
Ok(KernelDataResponse { bytes })
}
}
20 changes: 19 additions & 1 deletion p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use crate::util::{Mutex, RwLock};
use std::fmt;
use std::fs::File;
use std::io::Read;
use std::net::{Shutdown, TcpStream};
use std::path::PathBuf;
use std::sync::Arc;
Expand All @@ -26,7 +27,9 @@ use crate::core::pow::Difficulty;
use crate::core::ser::Writeable;
use crate::core::{core, global};
use crate::handshake::Handshake;
use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Ping, TxHashSetRequest, Type};
use crate::msg::{
self, BanReason, GetPeerAddrs, KernelDataRequest, Locator, Ping, TxHashSetRequest, Type,
};
use crate::protocol::Protocol;
use crate::types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
Expand Down Expand Up @@ -379,6 +382,13 @@ impl Peer {
)
}

pub fn send_kernel_data_request(&self) -> Result<(), Error> {
debug!("Asking {} for kernel data.", self.info.addr);
self.connection
.lock()
.send(&KernelDataRequest {}, msg::Type::KernelDataRequest)
}

/// Stops the peer, closing its connection
pub fn stop(&self) {
self.connection.lock().close();
Expand Down Expand Up @@ -521,6 +531,14 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.get_block(h)
}

fn kernel_data_read(&self) -> Result<File, chain::Error> {
self.adapter.kernel_data_read()
}

fn kernel_data_write(&self, reader: &mut Read) -> Result<bool, chain::Error> {
self.adapter.kernel_data_write(reader)
}

fn txhashset_read(&self, h: Hash) -> Option<TxHashSetRead> {
self.adapter.txhashset_read(h)
}
Expand Down
9 changes: 9 additions & 0 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use crate::util::RwLock;
use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
use std::path::PathBuf;
use std::sync::Arc;

Expand Down Expand Up @@ -594,6 +595,14 @@ impl ChainAdapter for Peers {
self.adapter.get_block(h)
}

fn kernel_data_read(&self) -> Result<File, chain::Error> {
self.adapter.kernel_data_read()
}

fn kernel_data_write(&self, reader: &mut Read) -> Result<bool, chain::Error> {
self.adapter.kernel_data_write(reader)
}

fn txhashset_read(&self, h: Hash) -> Option<TxHashSetRead> {
self.adapter.txhashset_read(h)
}
Expand Down
Loading

0 comments on commit e56cd55

Please sign in to comment.