From bb21344af1563f22ef4a65f8efac813d6a528e91 Mon Sep 17 00:00:00 2001 From: Larry Liu Date: Thu, 5 Sep 2024 18:54:24 -0700 Subject: [PATCH] Create centralized config and transaction importer. --- Cargo.lock | 20 +++ Cargo.toml | 2 + .../indexer-transaction-generator/Cargo.toml | 37 +++++ .../indexer-transaction-generator/README.md | 20 ++- .../src/config.rs | 136 ++++++++++++++++ .../indexer-transaction-generator/src/lib.rs | 5 + .../indexer-transaction-generator/src/main.rs | 13 ++ .../src/transaction_importer.rs | 146 ++++++++++++++++++ 8 files changed, 378 insertions(+), 1 deletion(-) create mode 100644 ecosystem/indexer-grpc/indexer-transaction-generator/Cargo.toml create mode 100644 ecosystem/indexer-grpc/indexer-transaction-generator/src/config.rs create mode 100644 ecosystem/indexer-grpc/indexer-transaction-generator/src/lib.rs create mode 100644 ecosystem/indexer-grpc/indexer-transaction-generator/src/main.rs create mode 100644 ecosystem/indexer-grpc/indexer-transaction-generator/src/transaction_importer.rs diff --git a/Cargo.lock b/Cargo.lock index f9ca7e852b7ce5..6e0f8ca54acc8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2289,6 +2289,26 @@ dependencies = [ "serde_json", ] +[[package]] +name = "aptos-indexer-transaction-generator" +version = "1.0.0" +dependencies = [ + "anyhow", + "aptos-indexer-grpc-utils", + "aptos-protos 1.3.1", + "clap 4.4.14", + "futures", + "itertools 0.13.0", + "serde", + "serde_json", + "serde_yaml 0.8.26", + "tempfile", + "tokio", + "tokio-stream", + "tonic 0.11.0", + "url", +] + [[package]] name = "aptos-infallible" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 8835db03e4b8ca..01669e6418f3fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -123,6 +123,7 @@ members = [ "ecosystem/indexer-grpc/indexer-grpc-table-info", "ecosystem/indexer-grpc/indexer-grpc-utils", "ecosystem/indexer-grpc/indexer-test-transactions", + "ecosystem/indexer-grpc/indexer-transaction-generator", "ecosystem/indexer-grpc/transaction-filter", "ecosystem/nft-metadata-crawler-parser", "ecosystem/node-checker", @@ -364,6 +365,7 @@ aptos-indexer-grpc-table-info = { path = "ecosystem/indexer-grpc/indexer-grpc-ta aptos-indexer-test-transactions = { path = "ecosystem/indexer-grpc/indexer-test-transactions" } aptos-indexer-grpc-utils = { path = "ecosystem/indexer-grpc/indexer-grpc-utils" } aptos-indexer-grpc-server-framework = { path = "ecosystem/indexer-grpc/indexer-grpc-server-framework" } +aptos-indexer-transaction-generator = { path = "ecosystem/indexer-grpc/indexer-transaction-generator" } aptos-infallible = { path = "crates/aptos-infallible" } aptos-inspection-service = { path = "crates/aptos-inspection-service" } aptos-jellyfish-merkle = { path = "storage/jellyfish-merkle" } diff --git a/ecosystem/indexer-grpc/indexer-transaction-generator/Cargo.toml b/ecosystem/indexer-grpc/indexer-transaction-generator/Cargo.toml new file mode 100644 index 00000000000000..749bfea4466ad1 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-transaction-generator/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "aptos-indexer-transaction-generator" +description = "Indexer integration testing framework." +version = "1.0.0" + +# Workspace inherited keys +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +anyhow = { workspace = true } +# used for localnode. +# aptos = { workspace = true } +# aptos-config = { workspace = true } +aptos-indexer-grpc-utils = { workspace = true } +aptos-protos ={ workspace = true } +clap = { workspace = true } +futures = { workspace = true } +# rand = { workspace = true } +# regex = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +serde_yaml = { workspace = true } +tokio = { workspace = true } +# toml = { workspace = true } +tonic = { workspace = true } +url = { workspace = true } + +[dev-dependencies] +itertools = { workspace = true } +tempfile = { workspace = true } +tokio-stream = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-transaction-generator/README.md b/ecosystem/indexer-grpc/indexer-transaction-generator/README.md index fa2d864e4f8046..8c0788a85ed520 100644 --- a/ecosystem/indexer-grpc/indexer-transaction-generator/README.md +++ b/ecosystem/indexer-grpc/indexer-transaction-generator/README.md @@ -1,3 +1,21 @@ # Indexer Transaction Generator -This tool is to generate transactions for testing purpose. \ No newline at end of file +This tool is to generate transactions for testing purpose. + +## Usage + +`cargo run -- --config example.yaml --output-folder /your_path_to_store_transactions/` + +### Config + +```YAML +import_config: + testnet: + # Transaction Stream endpoint addresss. + transaction_stream_endpoint: https://grpc.testnet.aptoslabs.com:443 + # (Optional) The key to use with developers.aptoslabs.com + api_key: YOUR_KEY_HERE + # A map from versions to dump and their output names. + versions_to_import: + 123: testnet_v1.json +``` diff --git a/ecosystem/indexer-grpc/indexer-transaction-generator/src/config.rs b/ecosystem/indexer-grpc/indexer-transaction-generator/src/config.rs new file mode 100644 index 00000000000000..3a7bb5945a2f0c --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-transaction-generator/src/config.rs @@ -0,0 +1,136 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Context; +use clap::Parser; +use serde::{Deserialize, Serialize}; +use std::{ + collections::{HashMap, HashSet}, + path::{Path, PathBuf}, +}; +use url::Url; + +const IMPORTED_TRANSACTIONS_FOLDER: &str = "imported_transactions"; + +#[derive(Parser)] +pub struct IndexerCliArgs { + /// Path to the configuration file with `TransactionGeneratorConfig`. + #[clap(long)] + pub config: PathBuf, + + /// Path to the output folder where the generated transactions will be saved. + #[clap(long)] + pub output_folder: PathBuf, +} + +impl IndexerCliArgs { + pub async fn run(&self) -> anyhow::Result<()> { + // Read the configuration file. + let config_raw = tokio::fs::read_to_string(&self.config) + .await + .with_context(|| format!("Failed to read configuration file: {:?}", self.config))?; + + // Parse the configuration. + let config: TransactionGeneratorConfig = serde_yaml::from_str(&config_raw) + .with_context(|| format!("Failed to parse configuration file: {:?}", self.config))?; + + // Run the transaction generator. + config.run(&self.output_folder).await + } +} + +/// Overall configuration for the transaction generator. +#[derive(Debug, Serialize, Deserialize)] +pub struct TransactionGeneratorConfig { + pub import_config: TransactionImporterConfig, // TODO: Add scripted transaction generation configuration. +} + +impl TransactionGeneratorConfig { + pub async fn run(&self, output_path: &Path) -> anyhow::Result<()> { + let import_config_path = output_path.join(IMPORTED_TRANSACTIONS_FOLDER); + // Check if the output folder exists. + if !import_config_path.exists() { + tokio::fs::create_dir_all(&import_config_path).await?; + } + self.import_config.run(&import_config_path).await + } +} + +/// Configuration for importing transactions from multiple networks. +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct TransactionImporterConfig { + // Config is a map from network name to the configuration for that network. + #[serde(flatten)] + pub configs: HashMap, +} + +impl TransactionImporterConfig { + pub async fn run(&self, output_path: &Path) -> anyhow::Result<()> { + // Validate the configuration. This is to make sure that no output file shares the same name. + let mut output_files = HashSet::new(); + for (_, network_config) in self.configs.iter() { + for output_file in network_config.versions_to_import.values() { + if !output_files.insert(output_file) { + return Err(anyhow::anyhow!( + "[Transaction Importer] Output file name {} is duplicated", + output_file + )); + } + } + } + // Run the transaction importer for each network. + for (network_name, network_config) in self.configs.iter() { + network_config.run(output_path).await.context(format!( + "[Transaction Importer] Failed for network: {}", + network_name + ))?; + } + Ok(()) + } +} + +/// Configuration for importing transactions from a network. +/// This includes the URL of the network, the API key, the version of the transaction to fetch, +#[derive(Debug, Serialize, Deserialize)] +pub struct TransactionImporterPerNetworkConfig { + /// The endpoint of the transaction stream. + pub transaction_stream_endpoint: Url, + /// The API key to use for the transaction stream if required. + pub api_key: Option, + /// The version of the transaction to fetch and their output file names. + pub versions_to_import: HashMap, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_duplicate_output_name() { + let transaction_generator_config = r#" + { + "import_config": { + "mainnet": { + "transaction_stream_endpoint": "http://mainnet.com", + "api_key": "mainnet_api_key", + "versions_to_import": { + 1: "mainnet_v1.json" + } + }, + "testnet": { + "transaction_stream_endpoint": "http://testnet.com", + "api_key": "testnet_api_key", + "versions_to_import": { + 1: "mainnet_v1.json" + } + } + } + } + "#; + let transaction_generator_config: TransactionGeneratorConfig = + serde_yaml::from_str(transaction_generator_config).unwrap(); + let output_path = PathBuf::from("/tmp"); + let result = transaction_generator_config.run(&output_path).await; + assert!(result.is_err()); + } +} diff --git a/ecosystem/indexer-grpc/indexer-transaction-generator/src/lib.rs b/ecosystem/indexer-grpc/indexer-transaction-generator/src/lib.rs new file mode 100644 index 00000000000000..12a64f344da919 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-transaction-generator/src/lib.rs @@ -0,0 +1,5 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod config; +pub mod transaction_importer; diff --git a/ecosystem/indexer-grpc/indexer-transaction-generator/src/main.rs b/ecosystem/indexer-grpc/indexer-transaction-generator/src/main.rs new file mode 100644 index 00000000000000..ce68a99c9ac688 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-transaction-generator/src/main.rs @@ -0,0 +1,13 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Result; +use aptos_indexer_transaction_generator::config::IndexerCliArgs; +use clap::Parser; + +#[tokio::main] +async fn main() -> Result<()> { + // Parse the command line arguments. + let args = IndexerCliArgs::parse(); + args.run().await +} diff --git a/ecosystem/indexer-grpc/indexer-transaction-generator/src/transaction_importer.rs b/ecosystem/indexer-grpc/indexer-transaction-generator/src/transaction_importer.rs new file mode 100644 index 00000000000000..3ac92ee95c21d6 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-transaction-generator/src/transaction_importer.rs @@ -0,0 +1,146 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::config::TransactionImporterPerNetworkConfig; +use anyhow::Context; +use aptos_indexer_grpc_utils::create_data_service_grpc_client; +use aptos_protos::indexer::v1::GetTransactionsRequest; +use std::{path::Path, time::Duration}; + +/// GRPC request metadata key for the token ID. +const GRPC_API_GATEWAY_API_KEY_HEADER: &str = "authorization"; +const GRPC_REQUEST_NAME_HEADER: &str = "x-request-name"; +const GRPC_REQUEST_NAME_VALUE: &str = "testing-framework"; +const TRANSACTION_STREAM_TIMEOUT_IN_SECS: u64 = 60; + +impl TransactionImporterPerNetworkConfig { + pub async fn run(&self, output_path: &Path) -> anyhow::Result<()> { + let mut client = create_data_service_grpc_client( + self.transaction_stream_endpoint.clone(), + Some(Duration::from_secs(TRANSACTION_STREAM_TIMEOUT_IN_SECS)), + ) + .await?; + + for (version, output_file) in &self.versions_to_import { + let mut request = tonic::Request::new(GetTransactionsRequest { + starting_version: Some(*version), + transactions_count: Some(1), + ..GetTransactionsRequest::default() + }); + request.metadata_mut().insert( + GRPC_REQUEST_NAME_HEADER, + GRPC_REQUEST_NAME_VALUE.parse().unwrap(), + ); + if let Some(api_key) = &self.api_key { + request.metadata_mut().insert( + GRPC_API_GATEWAY_API_KEY_HEADER, + format!("Bearer {}", api_key.clone()).parse().unwrap(), + ); + } + let mut stream = client.get_transactions(request).await?.into_inner(); + while let Some(resp) = stream.message().await.context(format!( + "[Transaction Importer] Stream ended unexpected for endpoint {:?}", + self.transaction_stream_endpoint + ))? { + let transaction = resp.transactions.first().context(format!( + "[Transaction Importer] Transaction at version {} is not in response.", + version + ))?; + let json_string = serde_json::to_string_pretty(transaction).context( + format!("[Transaction Importer] Transaction at version {} failed to serialized to json string.", version))?; + let output_path = output_path.join(output_file).with_extension("json"); + // TODO: add a diffing process here. + tokio::fs::write(output_path, json_string) + .await + .context(format!( + "[Transaction Importer] Failed to write transaction at version {} to file.", + version + ))?; + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::config::TransactionImporterPerNetworkConfig; + use aptos_protos::{ + indexer::v1::{ + raw_data_server::{RawData, RawDataServer}, + GetTransactionsRequest, TransactionsResponse, + }, + transaction::v1::Transaction, + }; + use futures::Stream; + use std::pin::Pin; + use tonic::{Request, Response, Status}; + + type ResponseStream = Pin> + Send>>; + + #[derive(Debug, Default)] + pub struct DummyServer { + pub transactions: Vec, + } + + #[tonic::async_trait] + impl RawData for DummyServer { + type GetTransactionsStream = ResponseStream; + + async fn get_transactions( + &self, + req: Request, + ) -> Result, Status> { + let version = req.into_inner().starting_version.unwrap(); + let transaction = self + .transactions + .iter() + .find(|t| t.transactions.first().unwrap().version == version) + .unwrap(); + let stream = futures::stream::iter(vec![Ok(transaction.clone())]); + Ok(Response::new(Box::pin(stream))) + } + } + + #[tokio::test] + async fn test_run() { + // Create a dummy transaction server. + let transaction = Transaction { + version: 1, + ..Transaction::default() + }; + let transactions = vec![TransactionsResponse { + transactions: vec![transaction], + ..TransactionsResponse::default() + }]; + let server = DummyServer { transactions }; + tokio::spawn(async move { + tonic::transport::Server::builder() + .add_service(RawDataServer::new(server)) + .serve("127.0.0.1:51254".parse().unwrap()) + .await + .unwrap(); + }); + // Note: do not sleep here; client connection will be retried. + + // create temp dir + let temp_dir = tempfile::tempdir().unwrap(); + + let config_json = r#" + transaction_stream_endpoint: "http://localhost:51254" + versions_to_import: + 1: "testing_transaction" + "#; + + let config = + serde_yaml::from_str::(config_json).unwrap(); + config.run(temp_dir.path()).await.unwrap(); + + // Validate the output. + let output_path = temp_dir.path().join("testing_transaction.json"); + let output = tokio::fs::read_to_string(output_path).await.unwrap(); + let transaction = serde_json::from_str::(&output).unwrap(); + assert_eq!(transaction.version, 1); + } +}