Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
/// In addition, the dynamic library must export a "C" function _create_plugin which
/// creates the implementation of the plugin.
use {
solana_sdk::{signature::Signature, transaction::SanitizedTransaction},
solana_transaction_status::TransactionStatusMeta,
solana_sdk::{clock::UnixTimestamp, signature::Signature, transaction::SanitizedTransaction},
solana_transaction_status::{Reward, TransactionStatusMeta},
std::{any::Any, error, io},
thiserror::Error,
};
Expand Down Expand Up @@ -60,6 +60,19 @@ pub enum ReplicaTransactionInfoVersions<'a> {
V0_0_1(&'a ReplicaTransactionInfo<'a>),
}

#[derive(Clone, Debug)]
pub struct ReplicaBlockInfo<'a> {
pub slot: u64,
pub blockhash: &'a str,
pub rewards: &'a [Reward],
pub block_time: Option<UnixTimestamp>,
pub block_height: Option<u64>,
}

pub enum ReplicaBlockInfoVersions<'a> {
V0_0_1(&'a ReplicaBlockInfo<'a>),
}

/// Errors returned by plugin calls
#[derive(Error, Debug)]
pub enum AccountsDbPluginError {
Expand Down Expand Up @@ -173,6 +186,12 @@ pub trait AccountsDbPlugin: Any + Send + Sync + std::fmt::Debug {
Ok(())
}

/// Called when block's metadata is updated.
#[allow(unused_variables)]
fn notify_block_metadata(&mut self, blockinfo: ReplicaBlockInfoVersions) -> Result<()> {
Ok(())
}

/// Check if the plugin is interested in account data
/// Default is true -- if the plugin is not interested in
/// account data, please return false.
Expand Down
31 changes: 23 additions & 8 deletions accountsdb-plugin-manager/src/accountsdb_plugin_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use {
crate::{
accounts_update_notifier::AccountsUpdateNotifierImpl,
accountsdb_plugin_manager::AccountsDbPluginManager,
block_metadata_notifier::BlockMetadataNotifierImpl,
block_metadata_notifier_interface::BlockMetadataNotifierLock,
slot_status_notifier::SlotStatusNotifierImpl, slot_status_observer::SlotStatusObserver,
transaction_notifier::TransactionNotifierImpl,
},
Expand Down Expand Up @@ -50,6 +52,7 @@ pub struct AccountsDbPluginService {
plugin_manager: Arc<RwLock<AccountsDbPluginManager>>,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
transaction_notifier: Option<TransactionNotifierLock>,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
}

impl AccountsDbPluginService {
Expand Down Expand Up @@ -102,24 +105,32 @@ impl AccountsDbPluginService {
None
};

let slot_status_observer =
if account_data_notifications_enabled || transaction_notifications_enabled {
let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone());
let slot_status_notifier = Arc::new(RwLock::new(slot_status_notifier));
let (slot_status_observer, block_metadata_notifier): (
Option<SlotStatusObserver>,
Option<BlockMetadataNotifierLock>,
) = if account_data_notifications_enabled || transaction_notifications_enabled {
let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone());
let slot_status_notifier = Arc::new(RwLock::new(slot_status_notifier));
(
Some(SlotStatusObserver::new(
confirmed_bank_receiver,
slot_status_notifier,
))
} else {
None
};
)),
Some(Arc::new(RwLock::new(BlockMetadataNotifierImpl::new(
plugin_manager.clone(),
)))),
)
} else {
(None, None)
};

info!("Started AccountsDbPluginService");
Ok(AccountsDbPluginService {
slot_status_observer,
plugin_manager,
accounts_update_notifier,
transaction_notifier,
block_metadata_notifier,
})
}

Expand Down Expand Up @@ -186,6 +197,10 @@ impl AccountsDbPluginService {
self.transaction_notifier.clone()
}

pub fn get_block_metadata_notifier(&self) -> Option<BlockMetadataNotifierLock> {
self.block_metadata_notifier.clone()
}

pub fn join(self) -> thread::Result<()> {
if let Some(mut slot_status_observer) = self.slot_status_observer {
slot_status_observer.join()?;
Expand Down
105 changes: 105 additions & 0 deletions accountsdb-plugin-manager/src/block_metadata_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use {
crate::{
accountsdb_plugin_manager::AccountsDbPluginManager,
block_metadata_notifier_interface::BlockMetadataNotifier,
},
log::*,
solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
ReplicaBlockInfo, ReplicaBlockInfoVersions,
},
solana_measure::measure::Measure,
solana_metrics::*,
solana_runtime::bank::RewardInfo,
solana_sdk::{clock::UnixTimestamp, pubkey::Pubkey},
solana_transaction_status::{Reward, Rewards},
std::sync::{Arc, RwLock},
};

pub(crate) struct BlockMetadataNotifierImpl {
plugin_manager: Arc<RwLock<AccountsDbPluginManager>>,
}

impl BlockMetadataNotifier for BlockMetadataNotifierImpl {
/// Notify the block metadata
fn notify_block_metadata(
&self,
slot: u64,
blockhash: &str,
rewards: &RwLock<Vec<(Pubkey, RewardInfo)>>,
block_time: Option<UnixTimestamp>,
block_height: Option<u64>,
) {
let mut plugin_manager = self.plugin_manager.write().unwrap();
if plugin_manager.plugins.is_empty() {
return;
}
let rewards = Self::build_rewards(rewards);

for plugin in plugin_manager.plugins.iter_mut() {
let mut measure = Measure::start("accountsdb-plugin-update-slot");
let block_info =
Self::build_replica_block_info(slot, blockhash, &rewards, block_time, block_height);
let block_info = ReplicaBlockInfoVersions::V0_0_1(&block_info);
match plugin.notify_block_metadata(block_info) {
Err(err) => {
error!(
"Failed to update block metadata at slot {}, error: {} to plugin {}",
slot,
err,
plugin.name()
)
}
Ok(_) => {
trace!(
"Successfully updated block metadata at slot {} to plugin {}",
slot,
plugin.name()
);
}
}
measure.stop();
inc_new_counter_debug!(
"accountsdb-plugin-update-block-metadata-us",
measure.as_us() as usize,
1000,
1000
);
}
}
}

impl BlockMetadataNotifierImpl {
fn build_rewards(rewards: &RwLock<Vec<(Pubkey, RewardInfo)>>) -> Rewards {
let rewards = rewards.read().unwrap();
rewards
.iter()
.map(|(pubkey, reward)| Reward {
pubkey: pubkey.to_string(),
lamports: reward.lamports,
post_balance: reward.post_balance,
reward_type: Some(reward.reward_type),
commission: reward.commission,
})
.collect()
}

fn build_replica_block_info<'a>(
slot: u64,
blockhash: &'a str,
rewards: &'a [Reward],
block_time: Option<UnixTimestamp>,
block_height: Option<u64>,
) -> ReplicaBlockInfo<'a> {
ReplicaBlockInfo {
slot,
blockhash,
rewards,
block_time,
block_height,
}
}

pub fn new(plugin_manager: Arc<RwLock<AccountsDbPluginManager>>) -> Self {
Self { plugin_manager }
}
}
20 changes: 20 additions & 0 deletions accountsdb-plugin-manager/src/block_metadata_notifier_interface.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use {
solana_runtime::bank::RewardInfo,
solana_sdk::{clock::UnixTimestamp, pubkey::Pubkey},
std::sync::{Arc, RwLock},
};

/// Interface for notifying block metadata changes
pub trait BlockMetadataNotifier {
/// Notify the block metadata
fn notify_block_metadata(
&self,
slot: u64,
blockhash: &str,
rewards: &RwLock<Vec<(Pubkey, RewardInfo)>>,
block_time: Option<UnixTimestamp>,
block_height: Option<u64>,
);
}

pub type BlockMetadataNotifierLock = Arc<RwLock<dyn BlockMetadataNotifier + Sync + Send>>;
2 changes: 2 additions & 0 deletions accountsdb-plugin-manager/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
pub mod accounts_update_notifier;
pub mod accountsdb_plugin_manager;
pub mod accountsdb_plugin_service;
pub mod block_metadata_notifier;
pub mod block_metadata_notifier_interface;
pub mod slot_status_notifier;
pub mod slot_status_observer;
pub mod transaction_notifier;
10 changes: 10 additions & 0 deletions accountsdb-plugin-postgres/scripts/create_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ CREATE TABLE transaction (
CONSTRAINT transaction_pk PRIMARY KEY (slot, signature)
);

-- The table storing block metadata
CREATE TABLE block (
slot BIGINT PRIMARY KEY,
blockhash VARCHAR(44),
rewards "Reward"[],
block_time BIGINT,
block_height BIGINT,
updated_on TIMESTAMP NOT NULL
);

/**
* The following is for keeping historical data for accounts and is not required for plugin to work.
*/
Expand Down
1 change: 1 addition & 0 deletions accountsdb-plugin-postgres/scripts/drop_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ DROP TABLE account_audit;
DROP TABLE account;
DROP TABLE slot;
DROP TABLE transaction;
DROP TABLE block;

DROP TYPE "TransactionError" CASCADE;
DROP TYPE "TransactionErrorCode" CASCADE;
Expand Down
27 changes: 26 additions & 1 deletion accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use {
serde_json,
solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions,
ReplicaTransactionInfoVersions, Result, SlotStatus,
ReplicaBlockInfoVersions, ReplicaTransactionInfoVersions, Result, SlotStatus,
},
solana_metrics::*,
std::{fs::File, io::Read},
Expand Down Expand Up @@ -334,6 +334,31 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
Ok(())
}

fn notify_block_metadata(&mut self, block_info: ReplicaBlockInfoVersions) -> Result<()> {
match &mut self.client {
None => {
return Err(AccountsDbPluginError::Custom(Box::new(
AccountsDbPluginPostgresError::DataStoreConnectionError {
msg: "There is no connection to the PostgreSQL database.".to_string(),
},
)));
}
Some(client) => match block_info {
ReplicaBlockInfoVersions::V0_0_1(block_info) => {
let result = client.update_block_metadata(block_info);

if let Err(err) = result {
return Err(AccountsDbPluginError::SlotStatusUpdateError{
msg: format!("Failed to persist the update of block metadata to the PostgreSQL database. Error: {:?}", err)
});
}
}
},
}

Ok(())
}

/// Check if the plugin is interested in account data
/// Default is true -- if the plugin is not interested in
/// account data, please return false.
Expand Down
Loading