diff --git a/chain/ethereum/src/data_source.rs b/chain/ethereum/src/data_source.rs index c0253d2e60e..fa2cb745524 100644 --- a/chain/ethereum/src/data_source.rs +++ b/chain/ethereum/src/data_source.rs @@ -6,6 +6,7 @@ use graph::components::store::{EthereumCallCache, StoredDynamicDataSource}; use graph::components::subgraph::{HostMetrics, InstanceDSTemplateInfo, MappingError}; use graph::components::trigger_processor::RunnableTriggers; use graph::data::value::Word; +use graph::data_source::common::{MappingABI, UnresolvedMappingABI}; use graph::data_source::CausalityRegion; use graph::env::ENV_VARS; use graph::futures03::future::try_join; @@ -33,7 +34,7 @@ use graph::{ derive::CheapClone, prelude::{ async_trait, - ethabi::{Address, Contract, Event, Function, LogParam, ParamType, RawLog}, + ethabi::{Address, Event, Function, LogParam, ParamType, RawLog}, serde_json, warn, web3::types::{Log, Transaction, H256}, BlockNumber, CheapClone, EthereumCall, LightEthereumBlock, LightEthereumBlockExt, @@ -1436,82 +1437,6 @@ impl UnresolvedMapping { } } -#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] -pub struct UnresolvedMappingABI { - pub name: String, - pub file: Link, -} - -impl UnresolvedMappingABI { - pub async fn resolve( - self, - resolver: &Arc, - logger: &Logger, - ) -> Result { - let contract_bytes = resolver.cat(logger, &self.file).await.with_context(|| { - format!( - "failed to resolve ABI {} from {}", - self.name, self.file.link - ) - })?; - let contract = Contract::load(&*contract_bytes)?; - Ok(MappingABI { - name: self.name, - contract, - }) - } -} - -#[derive(Clone, Debug, PartialEq)] -pub struct MappingABI { - pub name: String, - pub contract: Contract, -} - -impl MappingABI { - pub fn function( - &self, - contract_name: &str, - name: &str, - signature: Option<&str>, - ) -> Result<&Function, Error> { - let contract = &self.contract; - let function = match signature { - // Behavior for apiVersion < 0.0.4: look up function by name; for overloaded - // functions this always picks the same overloaded variant, which is incorrect - // and may lead to encoding/decoding errors - None => contract.function(name).with_context(|| { - format!( - "Unknown function \"{}::{}\" called from WASM runtime", - contract_name, name - ) - })?, - - // Behavior for apiVersion >= 0.0.04: look up function by signature of - // the form `functionName(uint256,string) returns (bytes32,string)`; this - // correctly picks the correct variant of an overloaded function - Some(ref signature) => contract - .functions_by_name(name) - .with_context(|| { - format!( - "Unknown function \"{}::{}\" called from WASM runtime", - contract_name, name - ) - })? - .iter() - .find(|f| signature == &f.signature()) - .with_context(|| { - format!( - "Unknown function \"{}::{}\" with signature `{}` \ - called from WASM runtime", - contract_name, name, signature, - ) - })?, - }; - Ok(function) - } -} - #[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] pub struct MappingBlockHandler { pub handler: String, diff --git a/chain/ethereum/src/lib.rs b/chain/ethereum/src/lib.rs index b83415146ac..3853ac13d31 100644 --- a/chain/ethereum/src/lib.rs +++ b/chain/ethereum/src/lib.rs @@ -19,7 +19,7 @@ pub use buffered_call_cache::BufferedCallCache; // ETHDEP: These concrete types should probably not be exposed. pub use data_source::{ - BlockHandlerFilter, DataSource, DataSourceTemplate, Mapping, MappingABI, TemplateSource, + BlockHandlerFilter, DataSource, DataSourceTemplate, Mapping, TemplateSource, }; pub mod chain; diff --git a/chain/ethereum/src/runtime/runtime_adapter.rs b/chain/ethereum/src/runtime/runtime_adapter.rs index 4147d61f5b0..06e425fa73c 100644 --- a/chain/ethereum/src/runtime/runtime_adapter.rs +++ b/chain/ethereum/src/runtime/runtime_adapter.rs @@ -1,10 +1,9 @@ use std::{sync::Arc, time::Instant}; use crate::adapter::EthereumRpcError; -use crate::data_source::MappingABI; use crate::{ capabilities::NodeCapabilities, network::EthereumNetworkAdapters, Chain, ContractCall, - ContractCallError, DataSource, EthereumAdapter, EthereumAdapterTrait, ENV_VARS, + ContractCallError, EthereumAdapter, EthereumAdapterTrait, ENV_VARS, }; use anyhow::{anyhow, Context, Error}; use blockchain::HostFn; @@ -13,6 +12,8 @@ use graph::components::subgraph::HostMetrics; use graph::data::store::ethereum::call; use graph::data::store::scalar::BigInt; use graph::data::subgraph::API_VERSION_0_0_9; +use graph::data_source; +use graph::data_source::common::MappingABI; use graph::futures03::compat::Future01CompatExt; use graph::prelude::web3::types::H160; use graph::runtime::gas::Gas; @@ -80,58 +81,93 @@ pub fn eth_call_gas(chain_identifier: &ChainIdentifier) -> Option { } impl blockchain::RuntimeAdapter for RuntimeAdapter { - fn host_fns(&self, ds: &DataSource) -> Result, Error> { - let abis = ds.mapping.abis.clone(); - let call_cache = self.call_cache.cheap_clone(); - let eth_adapters = self.eth_adapters.cheap_clone(); - let archive = ds.mapping.requires_archive()?; - let eth_call_gas = eth_call_gas(&self.chain_identifier); - - let ethereum_call = HostFn { - name: "ethereum.call", - func: Arc::new(move |ctx, wasm_ptr| { - // Ethereum calls should prioritise call-only adapters if one is available. - let eth_adapter = eth_adapters.call_or_cheapest(Some(&NodeCapabilities { - archive, - traces: false, - }))?; - ethereum_call( - ð_adapter, - call_cache.cheap_clone(), - ctx, - wasm_ptr, - &abis, - eth_call_gas, - ) - .map(|ptr| ptr.wasm_ptr()) - }), - }; - - let eth_adapters = self.eth_adapters.cheap_clone(); - let ethereum_get_balance = HostFn { - name: "ethereum.getBalance", - func: Arc::new(move |ctx, wasm_ptr| { - let eth_adapter = eth_adapters.unverified_cheapest_with(&NodeCapabilities { - archive, - traces: false, - })?; - eth_get_balance(ð_adapter, ctx, wasm_ptr).map(|ptr| ptr.wasm_ptr()) - }), - }; + fn host_fns(&self, ds: &data_source::DataSource) -> Result, Error> { + fn create_host_fns( + abis: Arc>>, // Use Arc to ensure `'static` lifetimes. + archive: bool, + call_cache: Arc, + eth_adapters: Arc, + eth_call_gas: Option, + ) -> Vec { + vec![ + HostFn { + name: "ethereum.call", + func: Arc::new({ + let eth_adapters = eth_adapters.clone(); + let call_cache = call_cache.clone(); + let abis = abis.clone(); + move |ctx, wasm_ptr| { + let eth_adapter = + eth_adapters.call_or_cheapest(Some(&NodeCapabilities { + archive, + traces: false, + }))?; + ethereum_call( + ð_adapter, + call_cache.clone(), + ctx, + wasm_ptr, + &abis, + eth_call_gas, + ) + .map(|ptr| ptr.wasm_ptr()) + } + }), + }, + HostFn { + name: "ethereum.getBalance", + func: Arc::new({ + let eth_adapters = eth_adapters.clone(); + move |ctx, wasm_ptr| { + let eth_adapter = + eth_adapters.unverified_cheapest_with(&NodeCapabilities { + archive, + traces: false, + })?; + eth_get_balance(ð_adapter, ctx, wasm_ptr).map(|ptr| ptr.wasm_ptr()) + } + }), + }, + HostFn { + name: "ethereum.hasCode", + func: Arc::new({ + let eth_adapters = eth_adapters.clone(); + move |ctx, wasm_ptr| { + let eth_adapter = + eth_adapters.unverified_cheapest_with(&NodeCapabilities { + archive, + traces: false, + })?; + eth_has_code(ð_adapter, ctx, wasm_ptr).map(|ptr| ptr.wasm_ptr()) + } + }), + }, + ] + } - let eth_adapters = self.eth_adapters.cheap_clone(); - let ethereum_get_code = HostFn { - name: "ethereum.hasCode", - func: Arc::new(move |ctx, wasm_ptr| { - let eth_adapter = eth_adapters.unverified_cheapest_with(&NodeCapabilities { - archive, - traces: false, - })?; - eth_has_code(ð_adapter, ctx, wasm_ptr).map(|ptr| ptr.wasm_ptr()) - }), + let host_fns = match ds { + data_source::DataSource::Onchain(onchain_ds) => { + let abis = Arc::new(onchain_ds.mapping.abis.clone()); + let archive = onchain_ds.mapping.requires_archive()?; + let call_cache = self.call_cache.cheap_clone(); + let eth_adapters = self.eth_adapters.cheap_clone(); + let eth_call_gas = eth_call_gas(&self.chain_identifier); + + create_host_fns(abis, archive, call_cache, eth_adapters, eth_call_gas) + } + data_source::DataSource::Subgraph(subgraph_ds) => { + let abis = Arc::new(subgraph_ds.mapping.abis.clone()); + let archive = subgraph_ds.mapping.requires_archive()?; + let call_cache = self.call_cache.cheap_clone(); + let eth_adapters = self.eth_adapters.cheap_clone(); + let eth_call_gas = eth_call_gas(&self.chain_identifier); + + create_host_fns(abis, archive, call_cache, eth_adapters, eth_call_gas) + } + data_source::DataSource::Offchain(_) => vec![], }; - Ok(vec![ethereum_call, ethereum_get_balance, ethereum_get_code]) + Ok(host_fns) } } diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 41fbbdef194..6f109138d06 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -6,6 +6,7 @@ use crate::{ subgraph::InstanceDSTemplateInfo, }, data::subgraph::UnifiedMappingApiVersion, + data_source, prelude::{BlockHash, DataSourceTemplateInfo, DeploymentHash}, }; use anyhow::{Error, Result}; @@ -346,7 +347,7 @@ impl TriggerFilter for MockTriggerFilter { pub struct MockRuntimeAdapter; impl RuntimeAdapter for MockRuntimeAdapter { - fn host_fns(&self, _ds: &C::DataSource) -> Result, Error> { + fn host_fns(&self, _ds: &data_source::DataSource) -> Result, Error> { todo!() } } diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index ee4d4d19857..f1c6acc4fbe 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -453,7 +453,6 @@ where } } -// TODO(krishna): Proper ordering for triggers impl Ord for Trigger where C::TriggerData: Ord, @@ -463,7 +462,7 @@ where (Trigger::Chain(data1), Trigger::Chain(data2)) => data1.cmp(data2), (Trigger::Subgraph(_), Trigger::Chain(_)) => std::cmp::Ordering::Greater, (Trigger::Chain(_), Trigger::Subgraph(_)) => std::cmp::Ordering::Less, - (Trigger::Subgraph(_), Trigger::Subgraph(_)) => std::cmp::Ordering::Equal, + (Trigger::Subgraph(t1), Trigger::Subgraph(t2)) => t1.entity.vid.cmp(&t2.entity.vid), } } } @@ -540,7 +539,7 @@ pub struct HostFn { } pub trait RuntimeAdapter: Send + Sync { - fn host_fns(&self, ds: &C::DataSource) -> Result, Error>; + fn host_fns(&self, ds: &data_source::DataSource) -> Result, Error>; } pub trait NodeCapabilities { diff --git a/graph/src/blockchain/noop_runtime_adapter.rs b/graph/src/blockchain/noop_runtime_adapter.rs index 2f30a30e608..0b8b9e0707c 100644 --- a/graph/src/blockchain/noop_runtime_adapter.rs +++ b/graph/src/blockchain/noop_runtime_adapter.rs @@ -1,5 +1,7 @@ use std::marker::PhantomData; +use crate::data_source; + use super::{Blockchain, HostFn, RuntimeAdapter}; /// A [`RuntimeAdapter`] that does not expose any host functions. @@ -16,7 +18,7 @@ impl RuntimeAdapter for NoopRuntimeAdapter where C: Blockchain, { - fn host_fns(&self, _ds: &C::DataSource) -> anyhow::Result> { + fn host_fns(&self, _ds: &data_source::DataSource) -> anyhow::Result> { Ok(vec![]) } } diff --git a/graph/src/data_source/common.rs b/graph/src/data_source/common.rs new file mode 100644 index 00000000000..789f04bb09c --- /dev/null +++ b/graph/src/data_source/common.rs @@ -0,0 +1,82 @@ +use crate::{components::link_resolver::LinkResolver, prelude::Link}; +use anyhow::{Context, Error}; +use ethabi::{Contract, Function}; +use serde::Deserialize; +use slog::Logger; +use std::sync::Arc; + +#[derive(Clone, Debug, PartialEq)] +pub struct MappingABI { + pub name: String, + pub contract: Contract, +} + +impl MappingABI { + pub fn function( + &self, + contract_name: &str, + name: &str, + signature: Option<&str>, + ) -> Result<&Function, Error> { + let contract = &self.contract; + let function = match signature { + // Behavior for apiVersion < 0.0.4: look up function by name; for overloaded + // functions this always picks the same overloaded variant, which is incorrect + // and may lead to encoding/decoding errors + None => contract.function(name).with_context(|| { + format!( + "Unknown function \"{}::{}\" called from WASM runtime", + contract_name, name + ) + })?, + + // Behavior for apiVersion >= 0.0.04: look up function by signature of + // the form `functionName(uint256,string) returns (bytes32,string)`; this + // correctly picks the correct variant of an overloaded function + Some(ref signature) => contract + .functions_by_name(name) + .with_context(|| { + format!( + "Unknown function \"{}::{}\" called from WASM runtime", + contract_name, name + ) + })? + .iter() + .find(|f| signature == &f.signature()) + .with_context(|| { + format!( + "Unknown function \"{}::{}\" with signature `{}` \ + called from WASM runtime", + contract_name, name, signature, + ) + })?, + }; + Ok(function) + } +} + +#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] +pub struct UnresolvedMappingABI { + pub name: String, + pub file: Link, +} + +impl UnresolvedMappingABI { + pub async fn resolve( + self, + resolver: &Arc, + logger: &Logger, + ) -> Result { + let contract_bytes = resolver.cat(logger, &self.file).await.with_context(|| { + format!( + "failed to resolve ABI {} from {}", + self.name, self.file.link + ) + })?; + let contract = Contract::load(&*contract_bytes)?; + Ok(MappingABI { + name: self.name, + contract, + }) + } +} diff --git a/graph/src/data_source/mod.rs b/graph/src/data_source/mod.rs index 1d255b1563c..3b600b7fbaf 100644 --- a/graph/src/data_source/mod.rs +++ b/graph/src/data_source/mod.rs @@ -1,4 +1,5 @@ pub mod causality_region; +pub mod common; pub mod offchain; pub mod subgraph; @@ -18,8 +19,7 @@ use crate::{ link_resolver::LinkResolver, store::{BlockNumber, StoredDynamicDataSource}, }, - data_source::offchain::OFFCHAIN_KINDS, - data_source::subgraph::SUBGRAPH_DS_KIND, + data_source::{offchain::OFFCHAIN_KINDS, subgraph::SUBGRAPH_DS_KIND}, prelude::{CheapClone as _, DataSourceContext}, schema::{EntityType, InputSchema}, }; diff --git a/graph/src/data_source/subgraph.rs b/graph/src/data_source/subgraph.rs index f7124f307c1..4e51191363a 100644 --- a/graph/src/data_source/subgraph.rs +++ b/graph/src/data_source/subgraph.rs @@ -1,16 +1,23 @@ use crate::{ blockchain::{block_stream::EntityWithType, Block, Blockchain}, components::{link_resolver::LinkResolver, store::BlockNumber}, - data::{subgraph::SPEC_VERSION_1_3_0, value::Word}, + data::{ + subgraph::{calls_host_fn, SPEC_VERSION_1_3_0}, + value::Word, + }, data_source, prelude::{DataSourceContext, DeploymentHash, Link}, }; use anyhow::{Context, Error}; +use futures03::{stream::FuturesOrdered, TryStreamExt}; use serde::Deserialize; use slog::{info, Logger}; use std::{fmt, sync::Arc}; -use super::{DataSourceTemplateInfo, TriggerWithHandler}; +use super::{ + common::{MappingABI, UnresolvedMappingABI}, + DataSourceTemplateInfo, TriggerWithHandler, +}; pub const SUBGRAPH_DS_KIND: &str = "subgraph"; @@ -122,12 +129,19 @@ impl Source { pub struct Mapping { pub language: String, pub api_version: semver::Version, + pub abis: Vec>, pub entities: Vec, pub handlers: Vec, pub runtime: Arc>, pub link: Link, } +impl Mapping { + pub fn requires_archive(&self) -> anyhow::Result { + calls_host_fn(&self.runtime, "ethereum.call") + } +} + #[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] pub struct EntityHandler { pub handler: String, @@ -158,6 +172,7 @@ pub struct UnresolvedMapping { pub language: String, pub file: Link, pub handlers: Vec, + pub abis: Vec, pub entities: Vec, } @@ -202,11 +217,28 @@ impl UnresolvedMapping { ) -> Result { info!(logger, "Resolve subgraph ds mapping"; "link" => &self.file.link); + // Resolve each ABI and collect the results + let abis = self + .abis + .into_iter() + .map(|unresolved_abi| { + let resolver = Arc::clone(resolver); + let logger = logger.clone(); + async move { + let resolved_abi = unresolved_abi.resolve(&resolver, &logger).await?; + Ok::<_, Error>(Arc::new(resolved_abi)) + } + }) + .collect::>() + .try_collect::>() + .await?; // Await the async stream collection here + Ok(Mapping { language: self.language, api_version: semver::Version::parse(&self.api_version)?, entities: self.entities, handlers: self.handlers, + abis, runtime: Arc::new(resolver.cat(logger, &self.file).await?), link: self.file, }) diff --git a/runtime/test/src/common.rs b/runtime/test/src/common.rs index 46a17f54f22..0462b79d2a0 100644 --- a/runtime/test/src/common.rs +++ b/runtime/test/src/common.rs @@ -3,13 +3,12 @@ use graph::blockchain::BlockTime; use graph::components::store::DeploymentLocator; use graph::data::subgraph::*; use graph::data_source; +use graph::data_source::common::MappingABI; use graph::env::EnvVars; use graph::ipfs_client::IpfsClient; use graph::log; use graph::prelude::*; -use graph_chain_ethereum::{ - Chain, DataSource, DataSourceTemplate, Mapping, MappingABI, TemplateSource, -}; +use graph_chain_ethereum::{Chain, DataSource, DataSourceTemplate, Mapping, TemplateSource}; use graph_runtime_wasm::host_exports::DataSourceDetails; use graph_runtime_wasm::{HostExports, MappingContext}; use semver::Version; diff --git a/runtime/wasm/src/host.rs b/runtime/wasm/src/host.rs index ebf107fb3ec..bc5610a63d0 100644 --- a/runtime/wasm/src/host.rs +++ b/runtime/wasm/src/host.rs @@ -142,11 +142,7 @@ where ens_lookup, )); - let host_fns = data_source - .as_onchain() - .map(|ds| runtime_adapter.host_fns(ds)) - .transpose()? - .unwrap_or_default(); + let host_fns = runtime_adapter.host_fns(&data_source).unwrap_or_default(); Ok(RuntimeHost { host_fns: Arc::new(host_fns), diff --git a/store/test-store/tests/postgres/store.rs b/store/test-store/tests/postgres/store.rs index aba953975a3..6605c39b51d 100644 --- a/store/test-store/tests/postgres/store.rs +++ b/store/test-store/tests/postgres/store.rs @@ -3,10 +3,11 @@ use graph::blockchain::BlockTime; use graph::data::graphql::ext::TypeDefinitionExt; use graph::data::query::QueryTarget; use graph::data::subgraph::schema::DeploymentCreate; +use graph::data_source::common::MappingABI; use graph::futures01::{future, Stream}; use graph::futures03::compat::Future01CompatExt; use graph::schema::{EntityType, InputSchema}; -use graph_chain_ethereum::{Mapping, MappingABI}; +use graph_chain_ethereum::Mapping; use hex_literal::hex; use lazy_static::lazy_static; use std::time::Duration; diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 288874b8774..fbc28e7dd55 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -25,6 +25,7 @@ use graph::components::subgraph::Settings; use graph::data::graphql::load_manager::LoadManager; use graph::data::query::{Query, QueryTarget}; use graph::data::subgraph::schema::{SubgraphError, SubgraphHealth}; +use graph::data_source::DataSource; use graph::endpoint::EndpointMetrics; use graph::env::EnvVars; use graph::firehose::{FirehoseEndpoint, FirehoseEndpoints, NoopGenesisDecoder, SubgraphLimit}; @@ -892,10 +893,7 @@ struct NoopRuntimeAdapter { } impl RuntimeAdapter for NoopRuntimeAdapter { - fn host_fns( - &self, - _ds: &::DataSource, - ) -> Result, Error> { + fn host_fns(&self, _ds: &DataSource) -> Result, Error> { Ok(vec![]) } }