Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub struct AccountsDbPluginPostgresConfig {
pub threads: Option<usize>,
pub batch_size: Option<usize>,
pub panic_on_db_errors: Option<bool>,
/// Indicates if to store historical data for accounts
pub store_account_historical_data: Option<bool>,
}

#[derive(Error, Debug)]
Expand Down Expand Up @@ -74,7 +76,7 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
/// Accounts either satisyfing the accounts condition or owners condition will be selected.
/// When only owners is specified,
/// all accounts belonging to the owners will be streamed.
/// The accounts field support wildcard to select all accounts:
/// The accounts field supports wildcard to select all accounts:
/// "accounts_selector" : {
/// "accounts" : \["*"\],
/// }
Expand All @@ -85,6 +87,8 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
/// Please refer to https://docs.rs/postgres/0.19.2/postgres/config/struct.Config.html for the connection configuration.
/// When `connection_str` is set, the values in "host", "user" and "port" are ignored. If `connection_str` is not given,
/// `host` and `user` must be given.
/// "store_account_historical_data", optional, set it to 'true', to store historical account data to account_audit
/// table.
/// * "threads" optional, specifies the number of worker threads for the plugin. A thread
/// maintains a PostgreSQL connection to the server. The default is '10'.
/// * "batch_size" optional, specifies the batch size of bulk insert when the AccountsDb is created
Expand Down
86 changes: 83 additions & 3 deletions accountsdb-plugin-postgres/src/postgres_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const DEFAULT_THREADS_COUNT: usize = 100;
const DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE: usize = 10;
const ACCOUNT_COLUMN_COUNT: usize = 9;
const DEFAULT_PANIC_ON_DB_ERROR: bool = false;
const DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA: bool = false;

struct PostgresSqlClientWrapper {
client: Client,
Expand All @@ -48,6 +49,7 @@ struct PostgresSqlClientWrapper {
update_slot_without_parent_stmt: Statement,
update_transaction_log_stmt: Statement,
update_block_metadata_stmt: Statement,
insert_account_audit_stmt: Option<Statement>,
}

pub struct SimplePostgresClient {
Expand Down Expand Up @@ -324,6 +326,28 @@ impl SimplePostgresClient {
}
}

fn build_account_audit_insert_statement(
client: &mut Client,
config: &AccountsDbPluginPostgresConfig,
) -> Result<Statement, AccountsDbPluginError> {
let stmt = "INSERT INTO account_audit (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)";

let stmt = client.prepare(stmt);

match stmt {
Err(err) => {
return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
msg: format!(
"Error in preparing for the account_audit update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
err, config.host, config.user, config
),
})));
}
Ok(stmt) => Ok(stmt),
}
}

fn build_slot_upsert_statement_with_parent(
client: &mut Client,
config: &AccountsDbPluginPostgresConfig,
Expand Down Expand Up @@ -370,16 +394,52 @@ impl SimplePostgresClient {
}
}

/// Internal function for inserting an account into account_audit table.
fn insert_account_audit(
account: &DbAccountInfo,
statement: &Statement,
client: &mut Client,
) -> Result<(), AccountsDbPluginError> {
let lamports = account.lamports() as i64;
let rent_epoch = account.rent_epoch() as i64;
let updated_on = Utc::now().naive_utc();
let result = client.execute(
statement,
&[
&account.pubkey(),
&account.slot,
&account.owner(),
&lamports,
&account.executable(),
&rent_epoch,
&account.data(),
&account.write_version(),
&updated_on,
],
);

if let Err(err) = result {
let msg = format!(
"Failed to persist the insert of account_audit to the PostgreSQL database. Error: {:?}",
err
);
error!("{}", msg);
return Err(AccountsDbPluginError::AccountsUpdateError { msg });
}
Ok(())
}

/// Internal function for updating or inserting a single account
fn upsert_account_internal(
account: &DbAccountInfo,
statement: &Statement,
client: &mut Client,
insert_account_audit_stmt: &Option<Statement>,
) -> Result<(), AccountsDbPluginError> {
let lamports = account.lamports() as i64;
let rent_epoch = account.rent_epoch() as i64;
let updated_on = Utc::now().naive_utc();
let result = client.query(
let result = client.execute(
statement,
&[
&account.pubkey(),
Expand All @@ -401,6 +461,11 @@ impl SimplePostgresClient {
);
error!("{}", msg);
return Err(AccountsDbPluginError::AccountsUpdateError { msg });
} else if result.unwrap() == 0 && insert_account_audit_stmt.is_some() {
// If no records modified (inserted or updated), it is because the account is updated
// at an older slot, insert the record directly into the account_audit table.
let statement = insert_account_audit_stmt.as_ref().unwrap();
Self::insert_account_audit(account, statement, client)?;
}

Ok(())
Expand All @@ -409,9 +474,10 @@ impl SimplePostgresClient {
/// Update or insert a single account
fn upsert_account(&mut self, account: &DbAccountInfo) -> Result<(), AccountsDbPluginError> {
let client = self.client.get_mut().unwrap();
let insert_account_audit_stmt = &client.insert_account_audit_stmt;
let statement = &client.update_account_stmt;
let client = &mut client.client;
Self::upsert_account_internal(account, statement, client)
Self::upsert_account_internal(account, statement, client, insert_account_audit_stmt)
}

/// Insert accounts in batch to reduce network overhead
Expand Down Expand Up @@ -487,11 +553,12 @@ impl SimplePostgresClient {
}

let client = self.client.get_mut().unwrap();
let insert_account_audit_stmt = &client.insert_account_audit_stmt;
let statement = &client.update_account_stmt;
let client = &mut client.client;

for account in self.pending_account_updates.drain(..) {
Self::upsert_account_internal(&account, statement, client)?;
Self::upsert_account_internal(&account, statement, client, insert_account_audit_stmt)?;
}

Ok(())
Expand All @@ -516,6 +583,18 @@ impl SimplePostgresClient {
let batch_size = config
.batch_size
.unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);

let store_account_historical_data = config
.store_account_historical_data
.unwrap_or(DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA);

let insert_account_audit_stmt = if store_account_historical_data {
let stmt = Self::build_account_audit_insert_statement(&mut client, config)?;
Some(stmt)
} else {
None
};

info!("Created SimplePostgresClient.");
Ok(Self {
batch_size,
Expand All @@ -528,6 +607,7 @@ impl SimplePostgresClient {
update_slot_without_parent_stmt,
update_transaction_log_stmt,
update_block_metadata_stmt,
insert_account_audit_stmt,
}),
})
}
Expand Down
11 changes: 5 additions & 6 deletions docs/src/developing/plugins/accountsdb_plugin.md
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,11 @@ psql -U solana -p 5433 -h 10.138.0.9 -w -d solana -f drop_schema.sql

### Capture Historical Account Data

The account historical data is captured using a database trigger as shown in
`create_schema.sql`,
To capture account historical data, in the configuration file, turn
`store_account_historical_data` to true.

And ensure the database trigger is created to save data in the `audit_table` when
records in `account` are updated, as shown in `create_schema.sql`,

```
CREATE FUNCTION audit_account_update() RETURNS trigger AS $audit_account_update$
Expand All @@ -399,19 +402,15 @@ CREATE TRIGGER account_update_trigger AFTER UPDATE OR DELETE ON account
FOR EACH ROW EXECUTE PROCEDURE audit_account_update();
```

The historical data is stored in the account_audit table.

The trigger can be dropped to disable this feature, for example,


```
DROP TRIGGER account_update_trigger ON account;
```

Over time, the account_audit can accumulate large amount of data. You may choose to
limit that by deleting older historical data.


For example, the following SQL statement can be used to keep up to 1000 of the most
recent records for an account:

Expand Down