Skip to content

Commit

Permalink
implement Persist and Persister with generic KVStorePersister trait
Browse files Browse the repository at this point in the history
  • Loading branch information
johncantrell97 committed Apr 14, 2022
1 parent 711bcef commit cb6c7c6
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 151 deletions.
62 changes: 24 additions & 38 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescr
use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
use lightning::util::events::{Event, EventHandler, EventsProvider};
use lightning::util::logger::Logger;
use lightning::util::persist::Persister;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
Expand Down Expand Up @@ -80,22 +81,7 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
#[cfg(test)]
const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;

/// Trait that handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk.
pub trait Persister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
where
M::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
{
/// Persist the given [`ChannelManager`] to disk, returning an error if persistence failed
/// (which will cause the [`BackgroundProcessor`] which called this method to exit).
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error>;

/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error>;
}


/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
struct DecoratingEventHandler<
Expand Down Expand Up @@ -133,6 +119,7 @@ impl BackgroundProcessor {
/// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
/// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
/// either [`join`] or [`stop`].
///
///
/// # Data Persistence
///
Expand All @@ -142,7 +129,7 @@ impl BackgroundProcessor {
/// provided implementation.
///
/// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk. See
/// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See [`FilesystemPersister::persist_network_graph`]
/// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See [`FilesystemPersister::persist_graph`]
/// for Rust-Lightning's provided implementation.
///
/// Typically, users should either implement [`Persister::persist_manager`] to never return an
Expand All @@ -161,8 +148,8 @@ impl BackgroundProcessor {
/// [`stop`]: Self::stop
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
/// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager
/// [`FilesystemPersister::persist_network_graph`]: lightning_persister::FilesystemPersister::persist_network_graph
/// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister#impl-Persister
/// [`FilesystemPersister::persist_graph`]: lightning_persister::FilesystemPersister#impl-Persister
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
/// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
pub fn start<
Expand Down Expand Up @@ -365,10 +352,11 @@ mod tests {
use lightning::util::logger::Logger;
use lightning::util::ser::Writeable;
use lightning::util::test_utils;
use lightning::util::persist::KVStorePersister;
use lightning_invoice::payment::{InvoicePayer, RetryAttempts};
use lightning_invoice::utils::DefaultRouter;
use lightning_persister::FilesystemPersister;
use std::fs;
use lightning_persister::{FilesystemPersister};
use std::fs::{self, File};
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -414,7 +402,7 @@ mod tests {
struct Persister {
data_dir: String,
graph_error: Option<(std::io::ErrorKind, &'static str)>,
manager_error: Option<(std::io::ErrorKind, &'static str)>
manager_error: Option<(std::io::ErrorKind, &'static str)>,
}

impl Persister {
Expand All @@ -431,25 +419,23 @@ mod tests {
}
}

impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L:Deref> super::Persister<Signer, M, T, K, F, L> for Persister where
M::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
{
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), std::io::Error> {
match self.manager_error {
None => FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager),
Some((error, message)) => Err(std::io::Error::new(error, message)),
impl KVStorePersister for Persister {
fn persist<W: Writeable>(&self, key: String, object: &W) -> std::io::Result<()> {
if key == self.get_channel_manager_key() {
return match self.manager_error {
None => Ok(()),
Some((error, message)) => Err(std::io::Error::new(error, message))
}
}
}

fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
match self.graph_error {
None => FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph),
Some((error, message)) => Err(std::io::Error::new(error, message)),
if key == self.get_network_graph_key() {
return match self.graph_error {
None => Ok(()),
Some((error, message)) => Err(std::io::Error::new(error, message))
}
}

Ok(())
}
}

Expand Down
93 changes: 11 additions & 82 deletions lightning-persister/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,15 @@ extern crate bitcoin;
extern crate libc;

use bitcoin::hash_types::{BlockHash, Txid};
use bitcoin::hashes::hex::{FromHex, ToHex};
use lightning::routing::network_graph::NetworkGraph;
use crate::util::DiskWriteable;
use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
use lightning::chain::chainmonitor;
use bitcoin::hashes::hex::FromHex;
use lightning::chain::channelmonitor::ChannelMonitor;
use lightning::chain::keysinterface::{Sign, KeysInterface};
use lightning::chain::transaction::OutPoint;
use lightning::ln::channelmanager::ChannelManager;
use lightning::util::logger::Logger;
use lightning::util::ser::{ReadableArgs, Writeable};
use lightning::util::persist::KVStorePersister;
use std::fs;
use std::io::{Cursor, Error};
use std::io::Cursor;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::path::{Path, PathBuf, MAIN_SEPARATOR};

/// FilesystemPersister persists channel data on disk, where each channel's
/// data is stored in a file named after its funding outpoint.
Expand All @@ -48,31 +41,6 @@ pub struct FilesystemPersister {
path_to_channel_data: String,
}

impl<Signer: Sign> DiskWriteable for ChannelMonitor<Signer> {
fn write_to_file(&self, writer: &mut fs::File) -> Result<(), Error> {
self.write(writer)
}
}

impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> DiskWriteable for ChannelManager<Signer, M, T, K, F, L>
where
M::Target: chain::Watch<Signer>,
T::Target: BroadcasterInterface,
K::Target: KeysInterface<Signer=Signer>,
F::Target: FeeEstimator,
L::Target: Logger,
{
fn write_to_file(&self, writer: &mut fs::File) -> Result<(), std::io::Error> {
self.write(writer)
}
}

impl DiskWriteable for NetworkGraph {
fn write_to_file(&self, writer: &mut fs::File) -> Result<(), std::io::Error> {
self.write(writer)
}
}

impl FilesystemPersister {
/// Initialize a new FilesystemPersister and set the path to the individual channels'
/// files.
Expand All @@ -87,34 +55,8 @@ impl FilesystemPersister {
self.path_to_channel_data.clone()
}

pub(crate) fn path_to_monitor_data(&self) -> PathBuf {
let mut path = PathBuf::from(self.path_to_channel_data.clone());
path.push("monitors");
path
}

/// Writes the provided `ChannelManager` to the path provided at `FilesystemPersister`
/// initialization, within a file called "manager".
pub fn persist_manager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>(
data_dir: String,
manager: &ChannelManager<Signer, M, T, K, F, L>
) -> Result<(), std::io::Error>
where
M::Target: chain::Watch<Signer>,
T::Target: BroadcasterInterface,
K::Target: KeysInterface<Signer=Signer>,
F::Target: FeeEstimator,
L::Target: Logger,
{
let path = PathBuf::from(data_dir);
util::write_to_file(path, "manager".to_string(), manager)
}

/// Write the provided `NetworkGraph` to the path provided at `FilesystemPersister`
/// initialization, within a file called "network_graph"
pub fn persist_network_graph(data_dir: String, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
let path = PathBuf::from(data_dir);
util::write_to_file(path, "network_graph".to_string(), network_graph)
pub(crate) fn path_to_monitor_data(&self) -> String {
format!("{}{}monitors", self.path_to_channel_data, MAIN_SEPARATOR)
}

/// Read `ChannelMonitor`s from disk.
Expand All @@ -124,7 +66,7 @@ impl FilesystemPersister {
where K::Target: KeysInterface<Signer=Signer> + Sized,
{
let path = self.path_to_monitor_data();
if !Path::new(&path).exists() {
if !Path::new(&PathBuf::from(&path)).exists() {
return Ok(Vec::new());
}
let mut res = Vec::new();
Expand Down Expand Up @@ -180,22 +122,9 @@ impl FilesystemPersister {
}
}

impl<ChannelSigner: Sign> chainmonitor::Persist<ChannelSigner> for FilesystemPersister {
// TODO: We really need a way for the persister to inform the user that its time to crash/shut
// down once these start returning failure.
// A PermanentFailure implies we need to shut down since we're force-closing channels without
// even broadcasting!

fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
util::write_to_file(self.path_to_monitor_data(), filename, monitor)
.map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
}

fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &Option<ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
util::write_to_file(self.path_to_monitor_data(), filename, monitor)
.map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
impl KVStorePersister for FilesystemPersister {
fn persist<W: Writeable>(&self, key: String, object: &W) -> std::io::Result<()> {
util::write_to_file(format!("{}{}{}", self.path_to_channel_data, MAIN_SEPARATOR, key), object)
}
}

Expand Down
63 changes: 32 additions & 31 deletions lightning-persister/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,14 @@ use std::path::{Path, PathBuf};
#[cfg(not(target_os = "windows"))]
use std::os::unix::io::AsRawFd;

use lightning::util::ser::Writeable;

#[cfg(target_os = "windows")]
use {
std::ffi::OsStr,
std::os::windows::ffi::OsStrExt
};

pub(crate) trait DiskWriteable {
fn write_to_file(&self, writer: &mut fs::File) -> Result<(), std::io::Error>;
}

pub(crate) fn get_full_filepath(mut filepath: PathBuf, filename: String) -> String {
filepath.push(filename);
filepath.to_str().unwrap().to_string()
}

#[cfg(target_os = "windows")]
macro_rules! call {
($e: expr) => (
Expand All @@ -39,21 +32,25 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<winapi::shared::ntdef::W
}

#[allow(bare_trait_objects)]
pub(crate) fn write_to_file<D: DiskWriteable>(path: PathBuf, filename: String, data: &D) -> std::io::Result<()> {
fs::create_dir_all(path.clone())?;
pub(crate) fn write_to_file<W: Writeable>(filename_with_path: String, data: &W) -> std::io::Result<()> {
let mut tmp_filename = filename_with_path.clone();
tmp_filename.push_str(".tmp");

let full_path = PathBuf::from(&filename_with_path);
let path = full_path.parent().unwrap();
fs::create_dir_all(path)?;
// Do a crazy dance with lots of fsync()s to be overly cautious here...
// We never want to end up in a state where we've lost the old data, or end up using the
// old data on power loss after we've returned.
// The way to atomically write a file on Unix platforms is:
// open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
let filename_with_path = get_full_filepath(path, filename);
let tmp_filename = format!("{}.tmp", filename_with_path.clone());


{
// Note that going by rust-lang/rust@d602a6b, on MacOS it is only safe to use
// rust stdlib 1.36 or higher.
let mut f = fs::File::create(&tmp_filename)?;
data.write_to_file(&mut f)?;
data.write(&mut f)?;
f.sync_all()?;
}
// Fsync the parent directory on Unix.
Expand Down Expand Up @@ -87,15 +84,17 @@ pub(crate) fn write_to_file<D: DiskWriteable>(path: PathBuf, filename: String, d

#[cfg(test)]
mod tests {
use super::{DiskWriteable, get_full_filepath, write_to_file};
use lightning::util::ser::{Writer, Writeable};

use super::{write_to_file};
use std::fs;
use std::io;
use std::io::Write;
use std::path::PathBuf;
use std::path::{PathBuf, MAIN_SEPARATOR};

struct TestWriteable{}
impl DiskWriteable for TestWriteable {
fn write_to_file(&self, writer: &mut fs::File) -> Result<(), io::Error> {
impl Writeable for TestWriteable {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), std::io::Error> {
writer.write_all(&[42; 1])
}
}
Expand All @@ -113,7 +112,8 @@ mod tests {
let mut perms = fs::metadata(path.to_string()).unwrap().permissions();
perms.set_readonly(true);
fs::set_permissions(path.to_string(), perms).unwrap();
match write_to_file(PathBuf::from(path.to_string()), filename, &test_writeable) {

match write_to_file(format!("{}{}{}", path, MAIN_SEPARATOR, filename), &test_writeable) {
Err(e) => assert_eq!(e.kind(), io::ErrorKind::PermissionDenied),
_ => panic!("Unexpected error message")
}
Expand All @@ -131,10 +131,11 @@ mod tests {
fn test_rename_failure() {
let test_writeable = TestWriteable{};
let filename = "test_rename_failure_filename";
let path = PathBuf::from("test_rename_failure_dir");
let path = "test_rename_failure_dir";
let full_file_path = format!("{}{}{}", path, MAIN_SEPARATOR, filename);
// Create the channel data file and make it a directory.
fs::create_dir_all(get_full_filepath(path.clone(), filename.to_string())).unwrap();
match write_to_file(path.clone(), filename.to_string(), &test_writeable) {
fs::create_dir_all(full_file_path.clone()).unwrap();
match write_to_file(full_file_path, &test_writeable) {
Err(e) => assert_eq!(e.raw_os_error(), Some(libc::EISDIR)),
_ => panic!("Unexpected Ok(())")
}
Expand All @@ -144,16 +145,17 @@ mod tests {
#[test]
fn test_diskwriteable_failure() {
struct FailingWriteable {}
impl DiskWriteable for FailingWriteable {
fn write_to_file(&self, _writer: &mut fs::File) -> Result<(), std::io::Error> {
impl Writeable for FailingWriteable {
fn write<W: Writer>(&self, _writer: &mut W) -> Result<(), std::io::Error> {
Err(std::io::Error::new(std::io::ErrorKind::Other, "expected failure"))
}
}

let filename = "test_diskwriteable_failure";
let path = PathBuf::from("test_diskwriteable_failure_dir");
let path = "test_diskwriteable_failure_dir";
let test_writeable = FailingWriteable{};
match write_to_file(path.clone(), filename.to_string(), &test_writeable) {
let full_path = format!("{}{}{}", path, MAIN_SEPARATOR, filename);
match write_to_file(full_path.clone(), &test_writeable) {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::Other);
assert_eq!(e.get_ref().unwrap().to_string(), "expected failure");
Expand All @@ -170,12 +172,11 @@ mod tests {
fn test_tmp_file_creation_failure() {
let test_writeable = TestWriteable{};
let filename = "test_tmp_file_creation_failure_filename".to_string();
let path = PathBuf::from("test_tmp_file_creation_failure_dir");

// Create the tmp file and make it a directory.
let tmp_path = get_full_filepath(path.clone(), format!("{}.tmp", filename.clone()));
let path = "test_tmp_file_creation_failure_dir";
let tmp_path = format!("{}{}{}.tmp", path, MAIN_SEPARATOR, filename.clone());
let full_filepath = format!("{}{}{}", path, MAIN_SEPARATOR, filename);
fs::create_dir_all(tmp_path).unwrap();
match write_to_file(path, filename, &test_writeable) {
match write_to_file(full_filepath, &test_writeable) {
Err(e) => {
#[cfg(not(target_os = "windows"))]
assert_eq!(e.raw_os_error(), Some(libc::EISDIR));
Expand Down
Loading

0 comments on commit cb6c7c6

Please sign in to comment.