diff --git a/Cargo.lock b/Cargo.lock index bc43297..9d1259d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -251,9 +251,9 @@ dependencies = [ [[package]] name = "bitvec" -version = "0.20.4" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7774144344a4faa177370406a7ff5f1da24303817368584c6206c8303eb07848" +checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" dependencies = [ "funty", "radium", @@ -267,7 +267,6 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" dependencies = [ - "block-padding", "generic-array", ] @@ -280,12 +279,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "block-padding" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae" - [[package]] name = "brotli" version = "3.3.4" @@ -730,12 +723,14 @@ dependencies = [ [[package]] name = "ethabi" -version = "16.0.0" +version = "17.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4c98847055d934070b90e806e12d3936b787d0a115068981c1d8dfd5dfef5a5" +checksum = "e4966fba78396ff92db3b817ee71143eccd98acf0f876b8d600e585a670c5d1b" dependencies = [ "ethereum-types", "hex", + "once_cell", + "regex", "serde", "serde_json", "sha3", @@ -745,9 +740,9 @@ dependencies = [ [[package]] name = "ethbloom" -version = "0.11.1" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfb684ac8fa8f6c5759f788862bb22ec6fe3cb392f6bfd08e3c64b603661e3f8" +checksum = "11da94e443c60508eb62cf256243a64da87304c2802ac2528847f79d750007ef" dependencies = [ "crunchy", "fixed-hash", @@ -758,9 +753,9 @@ dependencies = [ [[package]] name = "ethereum-types" -version = "0.12.1" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05136f7057fe789f06e6d41d07b34e6f70d8c86e5693b60f97aaa6553553bdaf" +checksum = "b2827b94c556145446fcce834ca86b7abf0c39a805883fe20e72c5bfdb5a0dc6" dependencies = [ "ethbloom", "fixed-hash", @@ -868,9 +863,9 @@ dependencies = [ [[package]] name = "funty" -version = "1.1.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7" +checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "futures" @@ -1013,7 +1008,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util 0.7.3", + "tokio-util", "tracing", ] @@ -1190,9 +1185,9 @@ dependencies = [ [[package]] name = "impl-codec" -version = "0.5.1" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "161ebdfec3c8e3b52bf61c4f3550a1eea4f9579d10dc1b936f3171ebdcd6c443" +checksum = "ba6a270039626615617f3f36d15fc827041df3b78c439da2cadfa47455a77f2f" dependencies = [ "parity-scale-codec", ] @@ -1746,9 +1741,9 @@ checksum = "21326818e99cfe6ce1e524c2a805c189a99b5ae555a35d19f9a284b427d86afa" [[package]] name = "parity-scale-codec" -version = "2.3.1" +version = "3.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "373b1a4c1338d9cd3d1fa53b3a11bdab5ab6bd80a20f7f7becd76953ae2be909" +checksum = "9182e4a71cae089267ab03e67c99368db7cd877baf50f931e5d6d4b71e195ac0" dependencies = [ "arrayvec", "bitvec", @@ -1760,9 +1755,9 @@ dependencies = [ [[package]] name = "parity-scale-codec-derive" -version = "2.3.1" +version = "3.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1557010476e0595c9b568d16dcfb81b93cdeb157612726f5170d31aa707bed27" +checksum = "9299338969a3d2f491d65f140b00ddec470858402f888af98e8642fb5e8965cd" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1901,9 +1896,9 @@ checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" [[package]] name = "primitive-types" -version = "0.10.1" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05e4722c697a58a99d5d06a08c30821d7c082a4632198de1eaa5a6c22ef42373" +checksum = "e28720988bff275df1f51b171e1b2a18c30d194c4d2b61defdacecd625a5d94a" dependencies = [ "fixed-hash", "impl-codec", @@ -1976,9 +1971,9 @@ dependencies = [ [[package]] name = "radium" -version = "0.6.2" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "643f8f41a8ebc4c5dc4515c82bb8abd397b527fc20fd681b7c011c2aee5d44fb" +checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" [[package]] name = "rand" @@ -2129,7 +2124,7 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls", - "tokio-util 0.7.3", + "tokio-util", "tower-service", "url", "wasm-bindgen", @@ -2428,14 +2423,12 @@ dependencies = [ [[package]] name = "sha3" -version = "0.9.1" +version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f81199417d4e5de3f04b1e871023acea7389672c4135918f05aa9cbf2f2fa809" +checksum = "eaedf34ed289ea47c2b741bb72e5357a209512d67bcd4bda44359e5bf0470f56" dependencies = [ - "block-buffer 0.9.0", - "digest 0.9.0", + "digest 0.10.3", "keccak", - "opaque-debug", ] [[package]] @@ -2796,21 +2789,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-util" -version = "0.6.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" -dependencies = [ - "bytes", - "futures-core", - "futures-io", - "futures-sink", - "log", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.3" @@ -2819,6 +2797,7 @@ checksum = "cc463cd8deddc3770d20f9852143d50bf6094e640b485cb2e189a2099085ff45" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -3130,9 +3109,8 @@ dependencies = [ [[package]] name = "web3" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44f258e254752d210b84fe117b31f1e3cc9cbf04c0d747eb7f8cf7cf5e370f6d" +version = "0.19.0" +source = "git+https://github.com/BohuTANG/rust-web3#69b468aec7529014c507e4cc31a35f1cee4ec0de" dependencies = [ "arrayvec", "base64", @@ -3159,7 +3137,7 @@ dependencies = [ "tiny-keccak", "tokio", "tokio-stream", - "tokio-util 0.6.10", + "tokio-util", "url", "web3-async-native-tls", ] @@ -3280,9 +3258,12 @@ dependencies = [ [[package]] name = "wyz" -version = "0.2.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214" +checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" +dependencies = [ + "tap", +] [[package]] name = "xz2" diff --git a/common/eth/Cargo.toml b/common/eth/Cargo.toml index ac53ff0..e134c21 100644 --- a/common/eth/Cargo.toml +++ b/common/eth/Cargo.toml @@ -12,6 +12,6 @@ test = false anyhow = "1" hex = "0.4" -web3 = "0.18.0" +web3 = { git = "https://github.com/BohuTANG/rust-web3"} [dev-dependencies] diff --git a/ethetl/Cargo.toml b/ethetl/Cargo.toml index de35454..604f9f9 100644 --- a/ethetl/Cargo.toml +++ b/ethetl/Cargo.toml @@ -36,7 +36,7 @@ serde = { version = "1.0.137", features = ["derive"] } serde_json = "1.0.82" ticker = "0.1.0" tokio = { version = "1.19.2", features = ["full"] } -web3 = "0.18.0" +web3 = { git = "https://github.com/BohuTANG/rust-web3"} [dev-dependencies] diff --git a/ethetl/src/contexts/progress.rs b/ethetl/src/contexts/progress.rs index 2bed1e5..38aa5e1 100644 --- a/ethetl/src/contexts/progress.rs +++ b/ethetl/src/contexts/progress.rs @@ -29,6 +29,7 @@ pub struct ProgressValue { pub logs: usize, pub token_transfers: usize, pub ens: usize, + pub traces: usize, } #[derive(Debug)] @@ -40,6 +41,7 @@ pub struct Progress { logs: AtomicUsize, token_transfers: AtomicUsize, ens: AtomicUsize, + traces: AtomicUsize, } impl Progress { @@ -52,6 +54,7 @@ impl Progress { logs: AtomicUsize::new(0), token_transfers: AtomicUsize::new(0), ens: AtomicUsize::new(0), + traces: AtomicUsize::new(0), }) } @@ -83,6 +86,10 @@ impl Progress { self.ens.fetch_add(v, Ordering::Relaxed); } + pub fn incr_traces(&self, v: usize) { + self.traces.fetch_add(v, Ordering::Relaxed); + } + pub fn value(&self) -> Arc { Arc::new(ProgressValue { blocks: self.blocks.load(Ordering::Relaxed), @@ -91,6 +98,7 @@ impl Progress { logs: self.logs.load(Ordering::Relaxed), token_transfers: self.token_transfers.load(Ordering::Relaxed), ens: self.ens.load(Ordering::Relaxed), + traces: self.traces.load(Ordering::Relaxed), }) } @@ -110,7 +118,7 @@ impl Progress { if value.blocks > 0 { let percent = ((value.blocks as f32 / all as f32) * 100_f32) as usize; log::info!( - "block {:?} processed/{}, {:?} transactions processed, {:?} receipts processed, {:?} logs processed, {:?} token_transfers processed, {:?} ens processed. Progress is {:.2}", + "block {:?} processed/{}, {:?} transactions processed, {:?} receipts processed, {:?} logs processed, {:?} token_transfers processed, {:?} ens processed, {:?} traces processed. Progress is {:.2}", value.blocks, all, value.txs, @@ -118,6 +126,7 @@ impl Progress { value.logs, value.token_transfers, value.ens, + value.traces, percent.percent(), ); } diff --git a/ethetl/src/eth/mod.rs b/ethetl/src/eth/mod.rs index 5e6f5d6..5326b43 100644 --- a/ethetl/src/eth/mod.rs +++ b/ethetl/src/eth/mod.rs @@ -17,9 +17,11 @@ mod blocks; mod contracts; mod receipts; mod syncing; +mod traces; pub use block_number::BlockNumber; pub use blocks::BlockFetcher; pub use contracts::ContractFetcher; pub use receipts::ReceiptFetcher; pub use syncing::Syncing; +pub use traces::Traces; diff --git a/ethetl/src/eth/traces.rs b/ethetl/src/eth/traces.rs new file mode 100644 index 0000000..97f4c59 --- /dev/null +++ b/ethetl/src/eth/traces.rs @@ -0,0 +1,90 @@ +// Copyright 2022 BohuTANG. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use log::info; +use common_exceptions::Result; +use common_exceptions::Retryable; +use web3::types::BlockNumber; +use web3::types::Trace; +use web3::types::U64; + +use crate::contexts::ContextRef; + +pub struct Traces { + ctx: ContextRef, + numbers: Vec, +} + +impl Traces { + pub fn create(ctx: &ContextRef) -> Traces { + Self { + ctx: ctx.clone(), + numbers: vec![], + } + } + + // Push a block number. + pub fn push(&mut self, number: usize) -> Result<()> { + self.numbers.push(number); + Ok(()) + } + + // Push range of block numbers. + pub fn push_batch(&mut self, nums: Vec) -> Result<()> { + self.numbers.extend(nums); + Ok(()) + } + + pub async fn fetch(&self) -> Result> { + let notify = |e, duration| { + log::warn!( + "Fetch traces error at duration {:?}, error:{:?}", + duration, + e + ) + }; + let op = || async { + let res = self.fetch_with_no_retry().await?; + Ok(res) + }; + + op.retry_with_notify(notify).await + } + + // Get the blocks traces. + async fn fetch_with_no_retry(&self) -> Result> { + let http = web3::transports::Http::new(self.ctx.get_rpc_url())?; + let web3 = web3::Web3::new(web3::transports::Batch::new(http)); + + let mut block_traces = vec![]; + + for chunks in self.numbers.chunks(10) { + let mut callbacks = vec![]; + for num in chunks { + let block_trace = web3.trace().block(BlockNumber::Number(U64::from(*num))); + callbacks.push(block_trace); + } + let _ = web3.transport().submit_batch().await?; + + // Get the callback. + for cb in callbacks { + let r = cb.await?; + info!("traces: {:?}", r); + block_traces.extend(r); + } + } + + Ok(block_traces) + } +} diff --git a/ethetl/src/exporters/blocks.rs b/ethetl/src/exporters/blocks.rs index da9853e..75f5b3c 100644 --- a/ethetl/src/exporters/blocks.rs +++ b/ethetl/src/exporters/blocks.rs @@ -38,8 +38,10 @@ use web3::types::U64; use crate::contexts::ContextRef; use crate::eth::BlockFetcher; +use crate::eth::Traces; use crate::exporters::write_file; use crate::exporters::ReceiptExporter; +use crate::exporters::TracesExporter; use crate::exporters::TransactionExporter; pub struct BlockExporter { @@ -73,6 +75,7 @@ impl BlockExporter { self.export_blocks(&blocks).await?; self.export_txs(&blocks).await?; self.export_tx_receipts().await?; + self.export_traces().await?; } Ok(()) @@ -260,4 +263,14 @@ impl BlockExporter { } Ok(tx_hashes) } + + pub async fn export_traces(&self) -> Result<()> { + let mut traces = Traces::create(&self.ctx); + traces.push_batch(self.numbers.clone())?; + let block_traces = traces.fetch().await?; + + let exporter = + TracesExporter::create(&self.ctx, &self.output_dir, &self.range_path, &block_traces); + exporter.export().await + } } diff --git a/ethetl/src/exporters/file.rs b/ethetl/src/exporters/file.rs new file mode 100644 index 0000000..e0d47b0 --- /dev/null +++ b/ethetl/src/exporters/file.rs @@ -0,0 +1,56 @@ +// Copyright 2022 BohuTANG. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow2::array::Array; +use arrow2::chunk::Chunk; +use arrow2::datatypes::Schema; +use common_exceptions::Error; +use common_exceptions::Result; + +use crate::contexts::ContextRef; + +pub async fn write_file( + ctx: &ContextRef, + path: &str, + schema: Schema, + columns: Chunk>, + msg: &str, +) -> Result<()> { + debug_assert!( + schema + .fields + .iter() + .zip(columns.iter()) + .all(|(dt1, dt2)| &dt1.data_type == dt2.data_type()), + "schema={schema:?}\ncolumns={:?}", + columns.iter().map(|v| v.data_type()).collect::>() + ); + + match ctx.get_output_format().to_lowercase().as_str() { + "csv" => { + let path = format!("{}.csv", path); + log::info!("Write {} to {}", msg, path); + common_storages::write_csv(ctx.get_storage(), &path, schema, columns).await + } + "parquet" => { + let path = format!("{}.parquet", path); + log::info!("Write {} to {}", msg, path); + common_storages::write_parquet(ctx.get_storage(), &path, schema, columns).await + } + v => Err(Error::msg(format!( + "Unsupported format, must be one of [csv, parquet], got: {}", + v + ))), + } +} diff --git a/ethetl/src/exporters/mod.rs b/ethetl/src/exporters/mod.rs index 151ea76..1bd141d 100644 --- a/ethetl/src/exporters/mod.rs +++ b/ethetl/src/exporters/mod.rs @@ -14,50 +14,22 @@ mod blocks; mod ens; +mod file; mod logs; mod pipeline; mod receipts; mod token_transfers; +mod traces; mod transactions; mod worker; -use arrow2::array::Array; -use arrow2::chunk::Chunk; -use arrow2::datatypes::Schema; pub use blocks::BlockExporter; -use common_exceptions::Error; -use common_exceptions::Result; pub use ens::EnsExporter; +pub use file::write_file; pub use logs::LogsExporter; pub use pipeline::Pipeline; pub use receipts::ReceiptExporter; pub use token_transfers::TokenTransferExporter; +pub use traces::TracesExporter; pub use transactions::TransactionExporter; pub use worker::Worker; - -use crate::contexts::ContextRef; - -pub async fn write_file( - ctx: &ContextRef, - path: &str, - schema: Schema, - columns: Chunk>, - msg: &str, -) -> Result<()> { - match ctx.get_output_format().to_lowercase().as_str() { - "csv" => { - let path = format!("{}.csv", path); - log::info!("Write {} to {}", msg, path); - common_storages::write_csv(ctx.get_storage(), &path, schema, columns).await - } - "parquet" => { - let path = format!("{}.parquet", path); - log::info!("Write {} to {}", msg, path); - common_storages::write_parquet(ctx.get_storage(), &path, schema, columns).await - } - v => Err(Error::msg(format!( - "Unsupported format, must be one of [csv, parquet], got: {}", - v - ))), - } -} diff --git a/ethetl/src/exporters/traces.rs b/ethetl/src/exporters/traces.rs new file mode 100644 index 0000000..38bcfae --- /dev/null +++ b/ethetl/src/exporters/traces.rs @@ -0,0 +1,258 @@ +// Copyright 2022 BohuTANG. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow2::array::UInt64Array; +use arrow2::array::Utf8Array; +use arrow2::chunk::Chunk; +use arrow2::datatypes::DataType; +use arrow2::datatypes::Field; +use arrow2::datatypes::Schema; +use common_eth::bytes_to_hex; +use common_eth::h160_to_hex; +use common_eth::h256_to_hex; +use common_exceptions::Result; +use web3::types::Action; +use web3::types::Address; +use web3::types::Bytes; +use web3::types::Res; +use web3::types::Trace; +use web3::types::H256; +use web3::types::U256; + +use crate::contexts::ContextRef; +use crate::exporters::write_file; + +pub struct TracesExporter { + ctx: ContextRef, + output_dir: String, + range_path: String, + traces: Vec, +} + +impl TracesExporter { + pub fn create( + ctx: &ContextRef, + output_dir: &str, + range_path: &str, + traces: &[Trace], + ) -> TracesExporter { + Self { + ctx: ctx.clone(), + output_dir: output_dir.to_string(), + range_path: range_path.to_string(), + traces: traces.to_vec(), + } + } + + fn schema(&self) -> Schema { + let block_number = Field::new("block_number", DataType::UInt64, true); + let transaction_hash = Field::new("transaction_hash", DataType::Utf8, true); + let transaction_index = Field::new("transaction_index", DataType::UInt64, true); + let from_address = Field::new("from_address", DataType::Utf8, true); + let to_address = Field::new("to_address", DataType::Utf8, true); + let value = Field::new("value", DataType::UInt64, true); + let input = Field::new("input", DataType::Utf8, true); + let output = Field::new("output", DataType::Utf8, true); + let trace_type = Field::new("trace_type", DataType::Utf8, true); + let call_type = Field::new("call_type", DataType::Utf8, true); + let reward_type = Field::new("reward_type", DataType::Utf8, true); + let gas = Field::new("gas", DataType::UInt64, true); + let gas_used = Field::new("gas_used", DataType::UInt64, true); + let subtraces = Field::new("subtraces", DataType::UInt64, true); + let trace_address = Field::new("trace_address", DataType::Utf8, true); + let error = Field::new("error", DataType::Utf8, true); + let status = Field::new("status", DataType::UInt64, true); + let trace_id = Field::new("trace_id", DataType::Utf8, true); + Schema::from(vec![ + block_number, + transaction_hash, + transaction_index, + from_address, + to_address, + value, + input, + output, + trace_type, + call_type, + reward_type, + gas, + gas_used, + subtraces, + trace_address, + error, + status, + trace_id, + ]) + } + + pub async fn export(&self) -> Result<()> { + let traces = &self.traces; + let traces_len = traces.len(); + let mut block_number_vec = Vec::with_capacity(traces_len); + let mut transaction_hash_vec = Vec::with_capacity(traces_len); + let mut transaction_index_vec = Vec::with_capacity(traces_len); + let mut from_address_vec = Vec::with_capacity(traces_len); + let mut to_address_vec = Vec::with_capacity(traces_len); + let mut value_vec = Vec::with_capacity(traces_len); + let mut input_vec = Vec::with_capacity(traces_len); + let mut output_vec = Vec::with_capacity(traces_len); + let mut trace_type_vec = Vec::with_capacity(traces_len); + let mut call_type_vec = Vec::with_capacity(traces_len); + let mut reward_type_vec = Vec::with_capacity(traces_len); + let mut gas_vec = Vec::with_capacity(traces_len); + let mut gas_used_vec = Vec::with_capacity(traces_len); + let mut subtraces_vec = Vec::with_capacity(traces_len); + let mut trace_address_vec = Vec::with_capacity(traces_len); + let mut error_vec = Vec::with_capacity(traces_len); + let mut status_vec = Vec::with_capacity(traces_len); + let mut trace_id_vec = Vec::with_capacity(traces_len); + + for trace in traces { + block_number_vec.push(trace.block_number); + transaction_hash_vec.push(h256_to_hex( + &trace.transaction_hash.unwrap_or_else(H256::zero), + )); + transaction_index_vec.push(trace.transaction_position.unwrap_or(0) as u64); + + let res = trace.result.clone().unwrap_or_default(); + let ( + call_type, + reward_type, + from_address, + to_address, + input, + output, + value, + gas, + gas_used, + ) = match &trace.action { + Action::Call(v) => { + let gas_used = match res { + Res::Call(x) => x.gas_used, + _ => U256::zero(), + }; + ( + format!("{:?}", v.call_type), + "".to_string(), + v.from, + v.to, + v.input.clone(), + Bytes::default(), + v.value, + v.gas, + gas_used, + ) + } + Action::Create(v) => { + let (to_address, gas_used, output) = match res { + Res::Create(x) => (x.address, x.gas_used, x.code), + _ => (Address::zero(), U256::zero(), Bytes::default()), + }; + ( + "".to_string(), + "".to_string(), + v.from, + to_address, + v.init.clone(), + output, + v.value, + v.gas, + gas_used, + ) + } + Action::Suicide(v) => ( + "".to_string(), + "".to_string(), + v.address, + v.refund_address, + Bytes::default(), + Bytes::default(), + v.balance, + U256::zero(), + U256::zero(), + ), + Action::Reward(v) => ( + "".to_string(), + format!("{:?}", v.reward_type), + Address::zero(), + v.author, + Bytes::default(), + Bytes::default(), + v.value, + U256::zero(), + U256::zero(), + ), + }; + + from_address_vec.push(h160_to_hex(&from_address)); + to_address_vec.push(h160_to_hex(&to_address)); + value_vec.push(value.as_u64()); + input_vec.push(bytes_to_hex(&input)); + output_vec.push(bytes_to_hex(&output)); + trace_type_vec.push(format!("{:?}", trace.action_type)); + call_type_vec.push(call_type); + reward_type_vec.push(reward_type); + gas_vec.push(gas.as_u64()); + gas_used_vec.push(gas_used.as_u64()); + subtraces_vec.push(trace.subtraces as u64); + trace_address_vec.push(format!("{:?}", trace.trace_address)); + error_vec.push(trace.error.clone().unwrap_or_default()); + status_vec.push(0u64); + trace_id_vec.push(""); + } + + let block_number_array = UInt64Array::from_slice(block_number_vec); + let transaction_hash_array = Utf8Array::::from_slice(transaction_hash_vec); + let transaction_index_array = UInt64Array::from_slice(transaction_index_vec); + let from_address_array = Utf8Array::::from_slice(from_address_vec); + let to_address_array = Utf8Array::::from_slice(to_address_vec); + let value_array = UInt64Array::from_slice(value_vec); + let input_array = Utf8Array::::from_slice(input_vec); + let output_array = Utf8Array::::from_slice(output_vec); + let trace_type_array = Utf8Array::::from_slice(trace_type_vec); + let call_type_array = Utf8Array::::from_slice(call_type_vec); + let reward_type_array = Utf8Array::::from_slice(reward_type_vec); + let gas_array = UInt64Array::from_slice(gas_vec); + let gas_used_array = UInt64Array::from_slice(gas_used_vec); + let subtraces_array = UInt64Array::from_slice(subtraces_vec); + let trace_address_array = Utf8Array::::from_slice(trace_address_vec); + let error_array = Utf8Array::::from_slice(error_vec); + let status_array = UInt64Array::from_slice(status_vec); + let trace_id_array = Utf8Array::::from_slice(trace_id_vec); + + let columns = Chunk::try_new(vec![ + block_number_array.boxed(), + transaction_hash_array.boxed(), + transaction_index_array.boxed(), + from_address_array.boxed(), + to_address_array.boxed(), + value_array.boxed(), + input_array.boxed(), + output_array.boxed(), + trace_type_array.boxed(), + call_type_array.boxed(), + reward_type_array.boxed(), + gas_array.boxed(), + gas_used_array.boxed(), + subtraces_array.boxed(), + trace_address_array.boxed(), + error_array.boxed(), + status_array.boxed(), + trace_id_array.boxed(), + ])?; + + let path = format!("{}/traces/traces_{}", self.output_dir, self.range_path); + write_file(&self.ctx, &path, self.schema(), columns, "traces").await + } +}