diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0aeeb8f0e3fa..5b58cd0d8749 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,6 +1,8 @@ name: CI on: + pull_request: + types: [opened, synchronize, reopened] push: branches: - '*' @@ -84,10 +86,13 @@ jobs: run: echo ${{ secrets.DOCKER_PASSWORD }} | docker login -u ${{ secrets.DOCKER_USERNAME }} --password-stdin - name: Build and Push Docker images run: | + git_hash=$(git rev-parse --short "$GITHUB_SHA") docker pull xinfinorg/devnet:latest docker tag xinfinorg/devnet:latest xinfinorg/devnet:previous docker rmi xinfinorg/devnet:latest docker build -t xinfinorg/devnet:latest -f cicd/Dockerfile . + docker tag xinfinorg/devnet:latest xinfinorg/devnet:dev-upgrade-${git_hash} + docker push xinfinorg/devnet:dev-upgrade-${git_hash} docker push xinfinorg/devnet:latest docker push xinfinorg/devnet:previous @@ -103,9 +108,10 @@ jobs: - uses: actions/checkout@v4 - name: Terraform Apply run: | + git_hash=$(git rev-parse --short "$GITHUB_SHA") cd cicd/devnet/terraform terraform init ${{ env.tf_init_cli_options }} - terraform apply ${{ env.tf_apply_cli_options }} + terraform apply -var "docker_tag=dev-upgrade-${git_hash}" ${{ env.tf_apply_cli_options }} sleep 5 source .env for ((i=$us_east_2_start;i<$us_east_2_end;i++)); do @@ -120,7 +126,6 @@ jobs: echo "Force deploy xdc-$i" aws ecs update-service --region ap-southeast-2 --cluster devnet-xdcnode-cluster --service ecs-service-xdc$i --force-new-deployment --no-cli-pager | head -n 10; done - aws ecs update-service --region ap-southeast-1 --cluster devnet-xdcnode-cluster --service ecs-service-rpc1 --force-new-deployment --no-cli-pager | head -n 10; rpcnode_terraform_apply: runs-on: ubuntu-latest @@ -137,6 +142,20 @@ jobs: terraform init ${{ env.tf_init_cli_options }} terraform apply ${{ env.tf_apply_cli_options }} + devnet_dev-upgrade_node: + runs-on: ubuntu-latest + if: github.ref == 'refs/heads/dev-upgrade' && !startsWith(github.ref, 'refs/tags/') + needs: rpcnode_terraform_apply + environment: devnet + env: + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + steps: + - uses: actions/checkout@v4 + - name: ECS Update + run: | + aws ecs update-service --region ap-southeast-1 --cluster devnet-xdcnode-cluster --service ecs-service-rpc1 --force-new-deployment --no-cli-pager | head -n 10; + testnet_dev-upgrade_node: runs-on: ubuntu-latest if: github.ref == 'refs/heads/dev-upgrade' && !startsWith(github.ref, 'refs/tags/') diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 16cde0b1929f..204fe1634768 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -415,18 +415,24 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa // // TODO(karalabe): Deprecate when the subscription one can return past data too. func (b *SimulatedBackend) FilterLogs(ctx context.Context, query XDPoSChain.FilterQuery) ([]types.Log, error) { - // Initialize unset filter boundaried to run from genesis to chain head - from := int64(0) - if query.FromBlock != nil { - from = query.FromBlock.Int64() - } - to := int64(-1) - if query.ToBlock != nil { - to = query.ToBlock.Int64() + var filter *filters.Filter + if query.BlockHash != nil { + // Block filter requested, construct a single-shot filter + filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain}, *query.BlockHash, query.Addresses, query.Topics) + } else { + // Initialize unset filter boundaried to run from genesis to chain head + from := int64(0) + if query.FromBlock != nil { + from = query.FromBlock.Int64() + } + to := int64(-1) + if query.ToBlock != nil { + to = query.ToBlock.Int64() + } + // Construct the range filter + filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain}, from, to, query.Addresses, query.Topics) } - // Construct and execute the filter - filter := filters.New(&filterBackend{b.database, b.blockchain}, from, to, query.Addresses, query.Topics) - + // Run the filter and return all the logs logs, err := filter.Logs(ctx) if err != nil { return nil, err @@ -527,6 +533,10 @@ func (fb *filterBackend) HeaderByNumber(ctx context.Context, block rpc.BlockNumb return fb.bc.GetHeaderByNumber(uint64(block.Int64())), nil } +func (fb *filterBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + return fb.bc.GetHeaderByHash(hash), nil +} + func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) { return core.GetBlockReceipts(fb.db, hash, core.GetBlockNumber(fb.db, hash)), nil } diff --git a/cicd/devnet/terraform/.env b/cicd/devnet/terraform/.env index 4eb6ca5a95c0..682e47178e0d 100644 --- a/cicd/devnet/terraform/.env +++ b/cicd/devnet/terraform/.env @@ -1,4 +1,4 @@ -log_level=3 +log_level=2 # Ohio us_east_2_start=0 diff --git a/cicd/devnet/terraform/main.tf b/cicd/devnet/terraform/main.tf index 285b9fc01030..e7723398bf44 100644 --- a/cicd/devnet/terraform/main.tf +++ b/cicd/devnet/terraform/main.tf @@ -23,6 +23,7 @@ module "us-east-2" { devnetNodeKeys = local.devnetNodeKeys["us-east-2"] logLevel = local.logLevel devnet_xdc_ecs_tasks_execution_role_arn = aws_iam_role.devnet_xdc_ecs_tasks_execution_role.arn + docker_tag = var.docker_tag providers = { aws = aws.us-east-2 } @@ -39,6 +40,7 @@ module "eu-west-1" { devnetNodeKeys = local.devnetNodeKeys["eu-west-1"] logLevel = local.logLevel devnet_xdc_ecs_tasks_execution_role_arn = aws_iam_role.devnet_xdc_ecs_tasks_execution_role.arn + docker_tag = var.docker_tag providers = { aws = aws.eu-west-1 } @@ -55,27 +57,8 @@ module "ap-southeast-2" { devnetNodeKeys = local.devnetNodeKeys["ap-southeast-2"] logLevel = local.logLevel devnet_xdc_ecs_tasks_execution_role_arn = aws_iam_role.devnet_xdc_ecs_tasks_execution_role.arn + docker_tag = var.docker_tag providers = { aws = aws.ap-southeast-2 } } - -# WARNING: APSE-1 will only be used to host rpc node -# Workaround to avoid conflicts with existing ecs cluster in existing regions -provider "aws" { - alias = "ap-southeast-1" - region = "ap-southeast-1" -} - -module "ap-southeast-1-rpc" { - source = "./module/region" - region = "ap-southeast-1" - devnetNodeKeys = local.rpcNodeKeys - enableFixedIp = true - logLevel = local.logLevel - devnet_xdc_ecs_tasks_execution_role_arn = aws_iam_role.devnet_xdc_ecs_tasks_execution_role.arn - - providers = { - aws = aws.ap-southeast-1 - } -} diff --git a/cicd/devnet/terraform/module/region/ecs.tf b/cicd/devnet/terraform/module/region/ecs.tf index 8cfc43427e68..bd7a88b17aea 100644 --- a/cicd/devnet/terraform/module/region/ecs.tf +++ b/cicd/devnet/terraform/module/region/ecs.tf @@ -4,7 +4,7 @@ data template_file devnet_container_definition { vars = { image_environment = "${lookup(each.value, "imageEnvironment", "devnet")}" - image_tag = "${lookup(each.value, "imageTag", "latest")}" + image_tag = "${lookup(each.value, "imageTag", var.docker_tag)}" node_name = "${each.key}" private_key = "${each.value.pk}" cloudwatch_group = "tf-${each.key}" diff --git a/cicd/devnet/terraform/module/region/variables.tf b/cicd/devnet/terraform/module/region/variables.tf index fc7afca84bb6..00dcd2277cf4 100644 --- a/cicd/devnet/terraform/module/region/variables.tf +++ b/cicd/devnet/terraform/module/region/variables.tf @@ -22,4 +22,10 @@ variable "enableFixedIp" { description = "a flag to indicate whether fixed ip should be associated to the nodes. This is used for RPC node" type = bool default = false -} \ No newline at end of file +} + +variable docker_tag { + type = string + default = "latest" + description = "description" +} diff --git a/cicd/devnet/terraform/s3.tf b/cicd/devnet/terraform/s3.tf index c7aba085aca1..f04eeeb7d890 100644 --- a/cicd/devnet/terraform/s3.tf +++ b/cicd/devnet/terraform/s3.tf @@ -1,14 +1,14 @@ # Bucket need to be created first. If first time run terraform init, need to comment out the below section terraform { backend "s3" { - bucket = "tf-devnet-bucket" // This name need to be updated to be the same as local.s3BucketName. We can't use variable here. - key = "tf/terraform_new.tfstate" + bucket = "tf-xinfin-bucket" // This name need to be updated to be the same as local.s3BucketName. We can't use variable here. + key = "tf/terraform_devnet.tfstate" region = "us-east-1" encrypt = true } } data "aws_s3_object" "devnet_xdc_node_config" { - bucket = local.s3BucketName + bucket = "tf-xinfin-bucket" key = "node-config.json" } diff --git a/cicd/devnet/terraform/variables.tf b/cicd/devnet/terraform/variables.tf index b4c4b14fafaa..0d9b3bb125b0 100644 --- a/cicd/devnet/terraform/variables.tf +++ b/cicd/devnet/terraform/variables.tf @@ -1,3 +1,9 @@ +variable docker_tag { + type = string + default = "latest" + description = "description" +} + locals { /** Load the nodes data from s3 @@ -39,8 +45,4 @@ locals { for r in local.regions : r.name => { for i in local.keyNames[r.name]: i => local.predefinedNodesConfig[i] } } - - rpcNodeKeys = { "rpc1": local.predefinedNodesConfig["rpc1"]} // we hardcode the rpc to a single node for now - - s3BucketName = "tf-devnet-bucket" } diff --git a/cicd/terraform/main.tf b/cicd/terraform/main.tf index 5df86c7c2bd1..ccb6ce690e00 100644 --- a/cicd/terraform/main.tf +++ b/cicd/terraform/main.tf @@ -19,6 +19,25 @@ provider "aws" { region = "ap-southeast-1" } +module "devnet-rpc" { + source = "./module/region" + region = "ap-southeast-1" + nodeKeys = local.rpcDevnetNodeKeys + enableFixedIp = true + logLevel = local.logLevel + xdc_ecs_tasks_execution_role_arn = aws_iam_role.xdc_ecs_tasks_execution_role.arn + + cpu = 1024 + memory = 4096 + + network = "devnet" + vpc_cidr = "10.0.0.0/16" + subnet_cidr = "10.0.0.0/20" + providers = { + aws = aws.ap-southeast-1 + } +} + module "testnet-rpc" { source = "./module/region" region = "ap-southeast-1" diff --git a/cicd/terraform/s3.tf b/cicd/terraform/s3.tf index 4968c852c33f..5c1fc4911508 100644 --- a/cicd/terraform/s3.tf +++ b/cicd/terraform/s3.tf @@ -2,7 +2,7 @@ terraform { backend "s3" { bucket = "tf-xinfin-bucket" - key = "tf/terraform.tfstate" + key = "tf/terraform_rpc.tfstate" region = "us-east-1" encrypt = true } diff --git a/cicd/terraform/variables.tf b/cicd/terraform/variables.tf index d952bc258cf4..89d6945e6178 100644 --- a/cicd/terraform/variables.tf +++ b/cicd/terraform/variables.tf @@ -30,6 +30,7 @@ locals { # r.name => { for i in local.keyNames[r.name]: i => local.predefinedNodesConfig[i] } # } + rpcDevnetNodeKeys = { "devnet-rpc1": local.predefinedNodesConfig["devnet-rpc1"]} // we hardcode the rpc to a single node for now rpcTestnetNodeKeys = { "testnet-rpc1": local.predefinedNodesConfig["testnet-rpc1"]} // we hardcode the rpc to a single node for now rpcMainnetNodeKeys = { "mainnet-rpc1": local.predefinedNodesConfig["mainnet-rpc1"]} // we hardcode the rpc to a single node for now } diff --git a/consensus/XDPoS/engines/engine_v2/verifyHeader.go b/consensus/XDPoS/engines/engine_v2/verifyHeader.go index a759d7455fad..7ba672bccbaa 100644 --- a/consensus/XDPoS/engines/engine_v2/verifyHeader.go +++ b/consensus/XDPoS/engines/engine_v2/verifyHeader.go @@ -36,7 +36,8 @@ func (x *XDPoS_v2) verifyHeader(chain consensus.ChainReader, header *types.Heade } if len(header.Validator) == 0 { - return consensus.ErrNoValidatorSignature + // This should never happen, if it does, then it means the peer is sending us invalid data. + return consensus.ErrNoValidatorSignatureV2 } if fullVerify { diff --git a/consensus/errors.go b/consensus/errors.go index 0747516f8754..c8ed578c2c5f 100644 --- a/consensus/errors.go +++ b/consensus/errors.go @@ -39,6 +39,8 @@ var ( ErrNoValidatorSignature = errors.New("no validator in header") + ErrNoValidatorSignatureV2 = errors.New("no validator in v2 header") + ErrNotReadyToPropose = errors.New("not ready to propose, QC is not ready") ErrNotReadyToMine = errors.New("Not ready to mine, it's not your turn") diff --git a/consensus/tests/engine_v2_tests/verify_header_test.go b/consensus/tests/engine_v2_tests/verify_header_test.go index a8b2eaec30e2..ab3e47fbf48a 100644 --- a/consensus/tests/engine_v2_tests/verify_header_test.go +++ b/consensus/tests/engine_v2_tests/verify_header_test.go @@ -50,7 +50,7 @@ func TestShouldVerifyBlock(t *testing.T) { noValidatorBlock := blockchain.GetBlockByNumber(902).Header() noValidatorBlock.Validator = []byte{} err = adaptor.VerifyHeader(blockchain, noValidatorBlock, true) - assert.Equal(t, consensus.ErrNoValidatorSignature, err) + assert.Equal(t, consensus.ErrNoValidatorSignatureV2, err) blockFromFuture := blockchain.GetBlockByNumber(902).Header() blockFromFuture.Time = big.NewInt(time.Now().Unix() + 10000) diff --git a/core/blockchain.go b/core/blockchain.go index 4dd96a00e0ac..63e401ad906e 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2459,6 +2459,11 @@ func (bc *BlockChain) HasHeader(hash common.Hash, number uint64) bool { return bc.hc.HasHeader(hash, number) } +// GetCanonicalHash returns the canonical hash for a given block number +func (bc *BlockChain) GetCanonicalHash(number uint64) common.Hash { + return bc.hc.GetCanonicalHash(number) +} + // GetBlockHashesFromHash retrieves a number of block hashes starting at a given // hash, fetching towards the genesis block. func (bc *BlockChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash { diff --git a/core/headerchain.go b/core/headerchain.go index cd61f76dbdff..0dbc47e1a845 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -32,7 +32,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/ethdb" "github.com/XinFinOrg/XDPoSChain/log" "github.com/XinFinOrg/XDPoSChain/params" - "github.com/hashicorp/golang-lru" + lru "github.com/hashicorp/golang-lru" ) const ( @@ -382,6 +382,11 @@ func (hc *HeaderChain) GetHeaderByNumber(number uint64) *types.Header { return hc.GetHeader(hash, number) } +func (hc *HeaderChain) GetCanonicalHash(number uint64) common.Hash { + // TODO: return rawdb.ReadCanonicalHash(hc.chainDb, number) + return GetCanonicalHash(hc.chainDb, number) +} + // CurrentHeader retrieves the current head header of the canonical chain. The // header is retrieved from the HeaderChain's internal cache. func (hc *HeaderChain) CurrentHeader() *types.Header { diff --git a/eth/api_backend.go b/eth/api_backend.go index 8d6e1d766be5..f0afde38f17d 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -99,7 +99,32 @@ func (b *EthApiBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNum return nil, errors.New("PoS V1 does not support confirmed block lookup") } } - return b.eth.blockchain.GetHeaderByNumber(uint64(blockNr)), nil + header := b.eth.blockchain.GetHeaderByNumber(uint64(blockNr)) + if header == nil { + return nil, errors.New("header for number not found") + } + return header, nil +} + +func (b *EthApiBackend) HeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Header, error) { + if blockNr, ok := blockNrOrHash.Number(); ok { + return b.HeaderByNumber(ctx, blockNr) + } + if hash, ok := blockNrOrHash.Hash(); ok { + header := b.eth.blockchain.GetHeaderByHash(hash) + if header == nil { + return nil, errors.New("header for hash not found") + } + if blockNrOrHash.RequireCanonical && b.eth.blockchain.GetCanonicalHash(header.Number.Uint64()) != hash { + return nil, errors.New("hash is not currently canonical") + } + return header, nil + } + return nil, errors.New("invalid arguments; neither block nor hash specified") +} + +func (b *EthApiBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + return b.eth.blockchain.GetHeaderByHash(hash), nil } func (b *EthApiBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Block, error) { @@ -130,6 +155,31 @@ func (b *EthApiBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumb return b.eth.blockchain.GetBlockByNumber(uint64(blockNr)), nil } +func (b *EthApiBackend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { + return b.eth.blockchain.GetBlockByHash(hash), nil +} + +func (b *EthApiBackend) BlockByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Block, error) { + if blockNr, ok := blockNrOrHash.Number(); ok { + return b.BlockByNumber(ctx, blockNr) + } + if hash, ok := blockNrOrHash.Hash(); ok { + header := b.eth.blockchain.GetHeaderByHash(hash) + if header == nil { + return nil, errors.New("header for hash not found") + } + if blockNrOrHash.RequireCanonical && b.eth.blockchain.GetCanonicalHash(header.Number.Uint64()) != hash { + return nil, errors.New("hash is not currently canonical") + } + block := b.eth.blockchain.GetBlock(hash, header.Number.Uint64()) + if block == nil { + return nil, errors.New("header found, but block body is missing") + } + return block, nil + } + return nil, errors.New("invalid arguments; neither block nor hash specified") +} + func (b *EthApiBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*state.StateDB, *types.Header, error) { // Pending state is only known by the miner if blockNr == rpc.PendingBlockNumber { @@ -141,9 +191,36 @@ func (b *EthApiBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc. return nil, nil, err } stateDb, err := b.eth.BlockChain().StateAt(header.Root) + if err != nil { + return nil, nil, err + } return stateDb, header, err } +func (b *EthApiBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error) { + if blockNr, ok := blockNrOrHash.Number(); ok { + return b.StateAndHeaderByNumber(ctx, blockNr) + } + if hash, ok := blockNrOrHash.Hash(); ok { + header, err := b.HeaderByHash(ctx, hash) + if err != nil { + return nil, nil, err + } + if header == nil { + return nil, nil, errors.New("header for hash not found") + } + if blockNrOrHash.RequireCanonical && b.eth.blockchain.GetCanonicalHash(header.Number.Uint64()) != hash { + return nil, nil, errors.New("hash is not currently canonical") + } + stateDb, err := b.eth.BlockChain().StateAt(header.Root) + if err != nil { + return nil, nil, err + } + return stateDb, header, nil + } + return nil, nil, errors.New("invalid arguments; neither block nor hash specified") +} + func (b *EthApiBackend) GetBlock(ctx context.Context, blockHash common.Hash) (*types.Block, error) { return b.eth.blockchain.GetBlockByHash(blockHash), nil } diff --git a/eth/bloombits.go b/eth/bloombits.go index 69e2528a54c9..097b4fb94d5b 100644 --- a/eth/bloombits.go +++ b/eth/bloombits.go @@ -17,13 +17,13 @@ package eth import ( - "github.com/XinFinOrg/XDPoSChain/core/rawdb" "time" "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/common/bitutil" "github.com/XinFinOrg/XDPoSChain/core" "github.com/XinFinOrg/XDPoSChain/core/bloombits" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/ethdb" "github.com/XinFinOrg/XDPoSChain/params" @@ -44,7 +44,7 @@ const ( // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests // to accumulate request an entire batch (avoiding hysteresis). - bloomRetrievalWait = time.Duration(0) + bloomRetrievalWait = time.Microsecond * 100 ) // startBloomHandlers starts a batch of goroutines to accept bloom bit database diff --git a/eth/filters/api.go b/eth/filters/api.go index aef9adba103f..ed36c3a6476a 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -35,6 +35,13 @@ import ( "github.com/XinFinOrg/XDPoSChain/rpc" ) +var ( + errExceedMaxTopics = errors.New("exceed max topics") +) + +// The maximum number of topic criteria allowed, vm.LOG4 - vm.LOG0 +const maxTopics = 4 + var ( deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline ) @@ -99,7 +106,7 @@ func (api *PublicFilterAPI) timeoutLoop() { // NewPendingTransactionFilter creates a filter that fetches pending transaction hashes // as transactions enter the pending state. // -// It is part of the filter package because this filter can be used throug the +// It is part of the filter package because this filter can be used through the // `eth_getFilterChanges` polling method that is also used for log filters. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter @@ -269,14 +276,8 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc } // FilterCriteria represents a request to create a new filter. -// -// TODO(karalabe): Kill this in favor of ethereum.FilterQuery. -type FilterCriteria struct { - FromBlock *big.Int - ToBlock *big.Int - Addresses []common.Address - Topics [][]common.Hash -} +// Same as ethereum.FilterQuery but with UnmarshalJSON() method. +type FilterCriteria ethereum.FilterQuery // NewFilter creates a new filter and returns the filter id. It can be // used to retrieve logs when the state changes. This method cannot be @@ -327,16 +328,28 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) { - // Convert the RPC block numbers into internal representations - if crit.FromBlock == nil { - crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) - } - if crit.ToBlock == nil { - crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) + if len(crit.Topics) > maxTopics { + return nil, errExceedMaxTopics } - // Create and run the filter to get all the logs - filter := New(api.backend, crit.FromBlock.Int64(), crit.ToBlock.Int64(), crit.Addresses, crit.Topics) + var filter *Filter + if crit.BlockHash != nil { + // Block filter requested, construct a single-shot filter + filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics) + } else { + // Convert the RPC block numbers into internal representations + begin := rpc.LatestBlockNumber.Int64() + if crit.FromBlock != nil { + begin = crit.FromBlock.Int64() + } + end := rpc.LatestBlockNumber.Int64() + if crit.ToBlock != nil { + end = crit.ToBlock.Int64() + } + // Construct the range filter + filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics) + } + // Run the filter and return all the logs logs, err := filter.Logs(ctx) if err != nil { return nil, err @@ -374,17 +387,24 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*ty return nil, fmt.Errorf("filter not found") } - begin := rpc.LatestBlockNumber.Int64() - if f.crit.FromBlock != nil { - begin = f.crit.FromBlock.Int64() - } - end := rpc.LatestBlockNumber.Int64() - if f.crit.ToBlock != nil { - end = f.crit.ToBlock.Int64() + var filter *Filter + if f.crit.BlockHash != nil { + // Block filter requested, construct a single-shot filter + filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics) + } else { + // Convert the RPC block numbers into internal representations + begin := rpc.LatestBlockNumber.Int64() + if f.crit.FromBlock != nil { + begin = f.crit.FromBlock.Int64() + } + end := rpc.LatestBlockNumber.Int64() + if f.crit.ToBlock != nil { + end = f.crit.ToBlock.Int64() + } + // Construct the range filter + filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics) } - // Create and run the filter to get all the logs - filter := New(api.backend, begin, end, f.crit.Addresses, f.crit.Topics) - + // Run the filter and return all the logs logs, err := filter.Logs(ctx) if err != nil { return nil, err @@ -451,7 +471,8 @@ func returnLogs(logs []*types.Log) []*types.Log { // UnmarshalJSON sets *args fields with given data. func (args *FilterCriteria) UnmarshalJSON(data []byte) error { type input struct { - From *rpc.BlockNumber `json:"fromBlock"` + BlockHash *common.Hash `json:"blockHash"` + FromBlock *rpc.BlockNumber `json:"fromBlock"` ToBlock *rpc.BlockNumber `json:"toBlock"` Addresses interface{} `json:"address"` Topics []interface{} `json:"topics"` @@ -462,12 +483,20 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error { return err } - if raw.From != nil { - args.FromBlock = big.NewInt(raw.From.Int64()) - } + if raw.BlockHash != nil { + if raw.FromBlock != nil || raw.ToBlock != nil { + // BlockHash is mutually exclusive with FromBlock/ToBlock criteria + return fmt.Errorf("cannot specify both BlockHash and FromBlock/ToBlock, choose one or the other") + } + args.BlockHash = raw.BlockHash + } else { + if raw.FromBlock != nil { + args.FromBlock = big.NewInt(raw.FromBlock.Int64()) + } - if raw.ToBlock != nil { - args.ToBlock = big.NewInt(raw.ToBlock.Int64()) + if raw.ToBlock != nil { + args.ToBlock = big.NewInt(raw.ToBlock.Int64()) + } } args.Addresses = []common.Address{} @@ -550,7 +579,7 @@ func decodeAddress(s string) (common.Address, error) { } b, err := hexutil.Decode(s) if err == nil && len(b) != common.AddressLength { - err = fmt.Errorf("hex has invalid length %d after decoding", len(b)) + err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for address", len(b), common.AddressLength) } return common.BytesToAddress(b), err } @@ -558,7 +587,7 @@ func decodeAddress(s string) (common.Address, error) { func decodeTopic(s string) (common.Hash, error) { b, err := hexutil.Decode(s) if err == nil && len(b) != common.HashLength { - err = fmt.Errorf("hex has invalid length %d after decoding", len(b)) + err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for topic", len(b), common.HashLength) } return common.BytesToHash(b), err } diff --git a/eth/filters/bench_test.go b/eth/filters/bench_test.go index 799344a53c83..98635b9de7b9 100644 --- a/eth/filters/bench_test.go +++ b/eth/filters/bench_test.go @@ -20,7 +20,6 @@ import ( "bytes" "context" "fmt" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" "testing" "time" @@ -28,6 +27,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/common/bitutil" "github.com/XinFinOrg/XDPoSChain/core" "github.com/XinFinOrg/XDPoSChain/core/bloombits" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/ethdb" "github.com/XinFinOrg/XDPoSChain/event" @@ -66,7 +66,7 @@ const benchFilterCnt = 2000 func benchmarkBloomBits(b *testing.B, sectionSize uint64) { benchDataDir := node.DefaultDataDir() + "/geth/chaindata" - fmt.Println("Running bloombits benchmark section size:", sectionSize) + b.Log("Running bloombits benchmark section size:", sectionSize) db, err := rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "") if err != nil { @@ -78,7 +78,7 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) { } clearBloomBits(db) - fmt.Println("Generating bloombits data...") + b.Log("Generating bloombits data...") headNum := core.GetBlockNumber(db, head) if headNum < sectionSize+512 { b.Fatalf("not enough blocks for running a benchmark") @@ -113,16 +113,16 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) { core.WriteBloomBits(db, uint(i), sectionIdx, sectionHead, comp) } //if sectionIdx%50 == 0 { - // fmt.Println(" section", sectionIdx, "/", cnt) + // b.Log(" section", sectionIdx, "/", cnt) //} } d := time.Since(start) - fmt.Println("Finished generating bloombits data") - fmt.Println(" ", d, "total ", d/time.Duration(cnt*sectionSize), "per block") - fmt.Println(" data size:", dataSize, " compressed size:", compSize, " compression ratio:", float64(compSize)/float64(dataSize)) + b.Log("Finished generating bloombits data") + b.Log(" ", d, "total ", d/time.Duration(cnt*sectionSize), "per block") + b.Log(" data size:", dataSize, " compressed size:", compSize, " compression ratio:", float64(compSize)/float64(dataSize)) - fmt.Println("Running filter benchmarks...") + b.Log("Running filter benchmarks...") start = time.Now() mux := new(event.TypeMux) var backend *testBackend @@ -136,14 +136,14 @@ func benchmarkBloomBits(b *testing.B, sectionSize uint64) { var addr common.Address addr[0] = byte(i) addr[1] = byte(i / 256) - filter := New(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil) + filter := NewRangeFilter(backend, 0, int64(cnt*sectionSize-1), []common.Address{addr}, nil) if _, err := filter.Logs(context.Background()); err != nil { b.Error("filter.Find error:", err) } } d = time.Since(start) - fmt.Println("Finished running filter benchmarks") - fmt.Println(" ", d, "total ", d/time.Duration(benchFilterCnt), "per address", d*time.Duration(1000000)/time.Duration(benchFilterCnt*cnt*sectionSize), "per million blocks") + b.Log("Finished running filter benchmarks") + b.Log(" ", d, "total ", d/time.Duration(benchFilterCnt), "per address", d*time.Duration(1000000)/time.Duration(benchFilterCnt*cnt*sectionSize), "per million blocks") db.Close() } @@ -175,7 +175,7 @@ func clearBloomBits(db ethdb.Database) { func BenchmarkNoBloomBits(b *testing.B) { benchDataDir := node.DefaultDataDir() + "/geth/chaindata" - fmt.Println("Running benchmark without bloombits") + b.Log("Running benchmark without bloombits") db, err := rawdb.NewLevelDBDatabase(benchDataDir, 128, 1024, "") if err != nil { b.Fatalf("error opening database at %v: %v", benchDataDir, err) @@ -188,14 +188,14 @@ func BenchmarkNoBloomBits(b *testing.B) { clearBloomBits(db) - fmt.Println("Running filter benchmarks...") + b.Log("Running filter benchmarks...") start := time.Now() mux := new(event.TypeMux) backend := &testBackend{mux, db, 0, new(event.Feed), new(event.Feed), new(event.Feed), new(event.Feed)} - filter := New(backend, 0, int64(headNum), []common.Address{{}}, nil) + filter := NewRangeFilter(backend, 0, int64(headNum), []common.Address{{}}, nil) filter.Logs(context.Background()) d := time.Since(start) - fmt.Println("Finished running filter benchmarks") - fmt.Println(" ", d, "total ", d*time.Duration(1000000)/time.Duration(headNum+1), "per million blocks") + b.Log("Finished running filter benchmarks") + b.Log(" ", d, "total ", d*time.Duration(1000000)/time.Duration(headNum+1), "per million blocks") db.Close() } diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 0137e44cfa58..dcd872fc4c48 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -18,6 +18,7 @@ package filters import ( "context" + "errors" "math/big" "github.com/XinFinOrg/XDPoSChain/common" @@ -33,6 +34,7 @@ type Backend interface { ChainDb() ethdb.Database EventMux() *event.TypeMux HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) + HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error) @@ -49,17 +51,19 @@ type Backend interface { type Filter struct { backend Backend - db ethdb.Database - begin, end int64 - addresses []common.Address - topics [][]common.Hash + db ethdb.Database + addresses []common.Address + topics [][]common.Hash + + block common.Hash // Block hash if filtering a single block + begin, end int64 // Range interval if filtering multiple blocks matcher *bloombits.Matcher } -// New creates a new filter which uses a bloom filter on blocks to figure out whether -// a particular block is interesting or not. -func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { +// NewRangeFilter creates a new filter which uses a bloom filter on blocks to +// figure out whether a particular block is interesting or not. +func NewRangeFilter(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { // Flatten the address and topic filter clauses into a single bloombits filter // system. Since the bloombits are not positional, nil topics are permitted, // which get flattened into a nil byte slice. @@ -78,23 +82,52 @@ func New(backend Backend, begin, end int64, addresses []common.Address, topics [ } filters = append(filters, filter) } - // Assemble and return the filter size, _ := backend.BloomStatus() + // Create a generic filter and convert it into a range filter + filter := newFilter(backend, addresses, topics) + + filter.matcher = bloombits.NewMatcher(size, filters) + filter.begin = begin + filter.end = end + + return filter +} + +// NewBlockFilter creates a new filter which directly inspects the contents of +// a block to figure out whether it is interesting or not. +func NewBlockFilter(backend Backend, block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter { + // Create a generic filter and convert it into a block filter + filter := newFilter(backend, addresses, topics) + filter.block = block + return filter +} + +// newFilter creates a generic filter that can either filter based on a block hash, +// or based on range queries. The search criteria needs to be explicitly set. +func newFilter(backend Backend, addresses []common.Address, topics [][]common.Hash) *Filter { return &Filter{ backend: backend, - begin: begin, - end: end, addresses: addresses, topics: topics, db: backend.ChainDb(), - matcher: bloombits.NewMatcher(size, filters), } } // Logs searches the blockchain for matching log entries, returning all from the // first block that contains matches, updating the start of the filter accordingly. func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { + // If we're doing singleton block filtering, execute and return + if f.block != (common.Hash{}) { + header, err := f.backend.HeaderByHash(ctx, f.block) + if err != nil { + return nil, err + } + if header == nil { + return nil, errors.New("unknown block") + } + return f.blockLogs(ctx, header) + } // Figure out the limits of the filter range header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) if header == nil { @@ -187,13 +220,23 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e if header == nil || err != nil { return logs, err } - if bloomFilter(header.Bloom, f.addresses, f.topics) { - found, err := f.checkMatches(ctx, header) - if err != nil { - return logs, err - } - logs = append(logs, found...) + found, err := f.blockLogs(ctx, header) + if err != nil { + return logs, err + } + logs = append(logs, found...) + } + return logs, nil +} + +// blockLogs returns the logs matching the filter criteria within a single block. +func (f *Filter) blockLogs(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { + if bloomFilter(header.Bloom, f.addresses, f.topics) { + found, err := f.checkMatches(ctx, header) + if err != nil { + return logs, err } + logs = append(logs, found...) } return logs, nil } @@ -258,9 +301,9 @@ Logs: if len(topics) > len(log.Topics) { continue Logs } - for i, topics := range topics { - match := len(topics) == 0 // empty rule set == wildcard - for _, topic := range topics { + for i, sub := range topics { + match := len(sub) == 0 // empty rule set == wildcard + for _, topic := range sub { if log.Topics[i] == topic { match = true break diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index b3ee0c72fefd..6dafd9610421 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -30,6 +30,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/core" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/event" + "github.com/XinFinOrg/XDPoSChain/log" "github.com/XinFinOrg/XDPoSChain/rpc" ) @@ -91,8 +92,21 @@ type EventSystem struct { backend Backend lightMode bool lastHead *types.Header - install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification + + // Subscriptions + txSub event.Subscription // Subscription for new transaction event + logsSub event.Subscription // Subscription for new log event + rmLogsSub event.Subscription // Subscription for removed log event + chainSub event.Subscription // Subscription for new chain event + pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event + + // Channels + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txCh chan core.TxPreEvent // Channel to receive new transaction event + logsCh chan []*types.Log // Channel to receive new log event + rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event + chainCh chan core.ChainEvent // Channel to receive new chain event } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -108,6 +122,24 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS lightMode: lightMode, install: make(chan *subscription), uninstall: make(chan *subscription), + txCh: make(chan core.TxPreEvent, txChanSize), + logsCh: make(chan []*types.Log, logsChanSize), + rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), + chainCh: make(chan core.ChainEvent, chainEvChanSize), + } + + // Subscribe events + m.txSub = m.backend.SubscribeTxPreEvent(m.txCh) + m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) + m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) + m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) + // TODO(rjl493456442): use feed to subscribe pending log event + m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{}) + + // Make sure none of the subscriptions are empty + if m.txSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || + m.pendingLogSub.Closed() { + log.Crit("Subscribe for event system failed") } go m.eventLoop() @@ -306,8 +338,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) { } } case *event.TypeMuxEvent: - switch muxe := e.Data.(type) { - case core.PendingLogsEvent: + if muxe, ok := e.Data.(core.PendingLogsEvent); ok { for _, f := range filters[PendingLogsSubscription] { if e.Time.After(f.created) { if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { @@ -411,50 +442,35 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. // eventLoop (un)installs filters and processes mux events. func (es *EventSystem) eventLoop() { - var ( - index = make(filterIndex) - sub = es.mux.Subscribe(core.PendingLogsEvent{}) - // Subscribe TxPreEvent form txpool - txCh = make(chan core.TxPreEvent, txChanSize) - txSub = es.backend.SubscribeTxPreEvent(txCh) - // Subscribe RemovedLogsEvent - rmLogsCh = make(chan core.RemovedLogsEvent, rmLogsChanSize) - rmLogsSub = es.backend.SubscribeRemovedLogsEvent(rmLogsCh) - // Subscribe []*types.Log - logsCh = make(chan []*types.Log, logsChanSize) - logsSub = es.backend.SubscribeLogsEvent(logsCh) - // Subscribe ChainEvent - chainEvCh = make(chan core.ChainEvent, chainEvChanSize) - chainEvSub = es.backend.SubscribeChainEvent(chainEvCh) - ) - - // Unsubscribe all events - defer sub.Unsubscribe() - defer txSub.Unsubscribe() - defer rmLogsSub.Unsubscribe() - defer logsSub.Unsubscribe() - defer chainEvSub.Unsubscribe() - + // Ensure all subscriptions get cleaned up + defer func() { + es.pendingLogSub.Unsubscribe() + es.txSub.Unsubscribe() + es.logsSub.Unsubscribe() + es.rmLogsSub.Unsubscribe() + es.chainSub.Unsubscribe() + }() + + index := make(filterIndex) for i := UnknownSubscription; i < LastIndexSubscription; i++ { index[i] = make(map[rpc.ID]*subscription) } for { select { - case ev, active := <-sub.Chan(): - if !active { // system stopped - return - } - es.broadcast(index, ev) - // Handle subscribed events - case ev := <-txCh: + case ev := <-es.txCh: es.broadcast(index, ev) - case ev := <-rmLogsCh: + case ev := <-es.logsCh: es.broadcast(index, ev) - case ev := <-logsCh: + case ev := <-es.rmLogsCh: es.broadcast(index, ev) - case ev := <-chainEvCh: + case ev := <-es.chainCh: + es.broadcast(index, ev) + case ev, active := <-es.pendingLogSub.Chan(): + if !active { // system stopped + return + } es.broadcast(index, ev) case f := <-es.install: @@ -466,6 +482,7 @@ func (es *EventSystem) eventLoop() { index[f.typ][f.id] = f } close(f.installed) + case f := <-es.uninstall: if f.typ == MinedAndPendingLogsSubscription { // the type are logs and pending logs subscriptions @@ -477,13 +494,13 @@ func (es *EventSystem) eventLoop() { close(f.err) // System stopped - case <-txSub.Err(): + case <-es.txSub.Err(): return - case <-rmLogsSub.Err(): + case <-es.logsSub.Err(): return - case <-logsSub.Err(): + case <-es.rmLogsSub.Err(): return - case <-chainEvSub.Err(): + case <-es.chainSub.Err(): return } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index edcba2e90ab8..6d7129a1d1ae 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -19,7 +19,6 @@ package filters import ( "context" "fmt" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" "math/big" "math/rand" "reflect" @@ -31,6 +30,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/consensus/ethash" "github.com/XinFinOrg/XDPoSChain/core" "github.com/XinFinOrg/XDPoSChain/core/bloombits" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/ethdb" "github.com/XinFinOrg/XDPoSChain/event" @@ -69,6 +69,11 @@ func (b *testBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumbe return core.GetHeader(b.db, hash, num), nil } +func (b *testBackend) HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error) { + num := core.GetBlockNumber(b.db, blockHash) + return core.GetHeader(b.db, blockHash, num), nil +} + func (b *testBackend) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) { number := core.GetBlockNumber(b.db, blockHash) return core.GetBlockReceipts(b.db, blockHash, number), nil @@ -335,6 +340,33 @@ func TestInvalidLogFilterCreation(t *testing.T) { } } +func TestInvalidGetLogsRequest(t *testing.T) { + var ( + mux = new(event.TypeMux) + db = rawdb.NewMemoryDatabase() + txFeed = new(event.Feed) + rmLogsFeed = new(event.Feed) + logsFeed = new(event.Feed) + chainFeed = new(event.Feed) + backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed} + api = NewPublicFilterAPI(backend, false) + blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") + ) + + // Reason: Cannot specify both BlockHash and FromBlock/ToBlock) + testCases := []FilterCriteria{ + 0: {BlockHash: &blockHash, FromBlock: big.NewInt(100)}, + 1: {BlockHash: &blockHash, ToBlock: big.NewInt(500)}, + 2: {BlockHash: &blockHash, FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, + } + + for i, test := range testCases { + if _, err := api.GetLogs(context.Background(), test); err == nil { + t.Errorf("Expected Logs for case #%d to fail", i) + } + } +} + // TestLogFilter tests whether log filters match the correct logs that are posted to the event feed. func TestLogFilter(t *testing.T) { t.Parallel() diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index b75012dcb015..1b74d21df7e0 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -97,7 +97,7 @@ func BenchmarkFilters(b *testing.B) { } b.ResetTimer() - filter := New(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil) + filter := NewRangeFilter(backend, 0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil) for i := 0; i < b.N; i++ { logs, _ := filter.Logs(context.Background()) @@ -186,14 +186,14 @@ func TestFilters(t *testing.T) { } } - filter := New(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}}) + filter := NewRangeFilter(backend, 0, -1, []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}}) logs, _ := filter.Logs(context.Background()) if len(logs) != 4 { t.Error("expected 4 log, got", len(logs)) } - filter = New(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}}) + filter = NewRangeFilter(backend, 900, 999, []common.Address{addr}, [][]common.Hash{{hash3}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 1 { t.Error("expected 1 log, got", len(logs)) @@ -202,7 +202,7 @@ func TestFilters(t *testing.T) { t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) } - filter = New(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}}) + filter = NewRangeFilter(backend, 990, -1, []common.Address{addr}, [][]common.Hash{{hash3}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 1 { t.Error("expected 1 log, got", len(logs)) @@ -211,7 +211,7 @@ func TestFilters(t *testing.T) { t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) } - filter = New(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}}) + filter = NewRangeFilter(backend, 1, 10, nil, [][]common.Hash{{hash1, hash2}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 2 { @@ -219,7 +219,7 @@ func TestFilters(t *testing.T) { } failHash := common.BytesToHash([]byte("fail")) - filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}}) + filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { @@ -227,14 +227,14 @@ func TestFilters(t *testing.T) { } failAddr := common.BytesToAddress([]byte("failmenow")) - filter = New(backend, 0, -1, []common.Address{failAddr}, nil) + filter = NewRangeFilter(backend, 0, -1, []common.Address{failAddr}, nil) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { t.Error("expected 0 log, got", len(logs)) } - filter = New(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}}) + filter = NewRangeFilter(backend, 0, -1, nil, [][]common.Hash{{failHash}, {hash1}}) logs, _ = filter.Logs(context.Background()) if len(logs) != 0 { diff --git a/event/event.go b/event/event.go index 20d20d1f57aa..423278731483 100644 --- a/event/event.go +++ b/event/event.go @@ -180,6 +180,12 @@ func (s *TypeMuxSubscription) Unsubscribe() { s.closewait() } +func (s *TypeMuxSubscription) Closed() bool { + s.closeMu.Lock() + defer s.closeMu.Unlock() + return s.closed +} + func (s *TypeMuxSubscription) closewait() { s.closeMu.Lock() defer s.closeMu.Unlock() diff --git a/interfaces.go b/interfaces.go index 665a74d2325e..88cf2fcb0c37 100644 --- a/interfaces.go +++ b/interfaces.go @@ -132,6 +132,7 @@ type ContractCaller interface { // FilterQuery contains options for contract log filtering. type FilterQuery struct { + BlockHash *common.Hash // used by eth_getLogs, return logs only from block with this hash FromBlock *big.Int // beginning of the queried range, nil means genesis block ToBlock *big.Int // end of the range, nil means latest block Addresses []common.Address // restricts matches to events created by specific contracts diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 519d4490ab24..d0b1f15e4315 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -525,8 +525,8 @@ func (s *PublicBlockChainAPI) GetRewardByHash(hash common.Hash) map[string]map[s // GetBalance returns the amount of wei for the given address in the state of the // given block number. The rpc.LatestBlockNumber and rpc.PendingBlockNumber meta // block numbers are also allowed. -func (s *PublicBlockChainAPI) GetBalance(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (*hexutil.Big, error) { - state, _, err := s.b.StateAndHeaderByNumber(ctx, blockNr) +func (s *PublicBlockChainAPI) GetBalance(ctx context.Context, address common.Address, blockNrOrHash rpc.BlockNumberOrHash) (*hexutil.Big, error) { + state, _, err := s.b.StateAndHeaderByNumberOrHash(ctx, blockNrOrHash) if state == nil || err != nil { return nil, err } @@ -614,8 +614,8 @@ func (s *PublicBlockChainAPI) GetUncleCountByBlockHash(ctx context.Context, bloc } // GetCode returns the code stored at the given address in the state for the given block number. -func (s *PublicBlockChainAPI) GetCode(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (hexutil.Bytes, error) { - state, _, err := s.b.StateAndHeaderByNumber(ctx, blockNr) +func (s *PublicBlockChainAPI) GetCode(ctx context.Context, address common.Address, blockNrOrHash rpc.BlockNumberOrHash) (hexutil.Bytes, error) { + state, _, err := s.b.StateAndHeaderByNumberOrHash(ctx, blockNrOrHash) if state == nil || err != nil { return nil, err } @@ -624,8 +624,8 @@ func (s *PublicBlockChainAPI) GetCode(ctx context.Context, address common.Addres } // GetAccountInfo returns the information at the given address in the state for the given block number. -func (s *PublicBlockChainAPI) GetAccountInfo(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (map[string]interface{}, error) { - state, _, err := s.b.StateAndHeaderByNumber(ctx, blockNr) +func (s *PublicBlockChainAPI) GetAccountInfo(ctx context.Context, address common.Address, blockNrOrHash rpc.BlockNumberOrHash) (map[string]interface{}, error) { + state, _, err := s.b.StateAndHeaderByNumberOrHash(ctx, blockNrOrHash) if state == nil || err != nil { return nil, err } @@ -644,8 +644,8 @@ func (s *PublicBlockChainAPI) GetAccountInfo(ctx context.Context, address common // GetStorageAt returns the storage from the state at the given address, key and // block number. The rpc.LatestBlockNumber and rpc.PendingBlockNumber meta block // numbers are also allowed. -func (s *PublicBlockChainAPI) GetStorageAt(ctx context.Context, address common.Address, key string, blockNr rpc.BlockNumber) (hexutil.Bytes, error) { - state, _, err := s.b.StateAndHeaderByNumber(ctx, blockNr) +func (s *PublicBlockChainAPI) GetStorageAt(ctx context.Context, address common.Address, key string, blockNrOrHash rpc.BlockNumberOrHash) (hexutil.Bytes, error) { + state, _, err := s.b.StateAndHeaderByNumberOrHash(ctx, blockNrOrHash) if state == nil || err != nil { return nil, err } @@ -1113,10 +1113,10 @@ type CallArgs struct { Data hexutil.Bytes `json:"data"` } -func (s *PublicBlockChainAPI) doCall(ctx context.Context, args CallArgs, blockNr rpc.BlockNumber, vmCfg vm.Config, timeout time.Duration) ([]byte, uint64, bool, error, error) { +func (s *PublicBlockChainAPI) doCall(ctx context.Context, args CallArgs, blockNrOrHash rpc.BlockNumberOrHash, vmCfg vm.Config, timeout time.Duration) ([]byte, uint64, bool, error, error) { defer func(start time.Time) { log.Debug("Executing EVM call finished", "runtime", time.Since(start)) }(time.Now()) - statedb, header, err := s.b.StateAndHeaderByNumber(ctx, blockNr) + statedb, header, err := s.b.StateAndHeaderByNumberOrHash(ctx, blockNrOrHash) if statedb == nil || err != nil { return nil, 0, false, err, nil } @@ -1154,7 +1154,7 @@ func (s *PublicBlockChainAPI) doCall(ctx context.Context, args CallArgs, blockNr // this makes sure resources are cleaned up. defer cancel() - block, err := s.b.BlockByNumber(ctx, blockNr) + block, err := s.b.BlockByNumberOrHash(ctx, blockNrOrHash) if err != nil { return nil, 0, false, err, nil } @@ -1229,8 +1229,12 @@ func (e *revertError) ErrorData() interface{} { // Call executes the given transaction on the state for the given block number. // It doesn't make and changes in the state/blockchain and is useful to execute and retrieve values. -func (s *PublicBlockChainAPI) Call(ctx context.Context, args CallArgs, blockNr rpc.BlockNumber) (hexutil.Bytes, error) { - result, _, failed, err, vmErr := s.doCall(ctx, args, blockNr, vm.Config{}, 5*time.Second) +func (s *PublicBlockChainAPI) Call(ctx context.Context, args CallArgs, blockNrOrHash *rpc.BlockNumberOrHash) (hexutil.Bytes, error) { + if blockNrOrHash == nil { + latest := rpc.BlockNumberOrHashWithNumber(rpc.LatestBlockNumber) + blockNrOrHash = &latest + } + result, _, failed, err, vmErr := s.doCall(ctx, args, *blockNrOrHash, vm.Config{}, 5*time.Second) if err != nil { return nil, err } @@ -1242,9 +1246,13 @@ func (s *PublicBlockChainAPI) Call(ctx context.Context, args CallArgs, blockNr r return (hexutil.Bytes)(result), vmErr } -// EstimateGas returns an estimate of the amount of gas needed to execute the -// given transaction against the current pending block. -func (s *PublicBlockChainAPI) EstimateGas(ctx context.Context, args CallArgs, blockNrOrHash *rpc.BlockNumberOrHash) (hexutil.Uint64, error) { +func (s *PublicBlockChainAPI) doEstimateGas(ctx context.Context, args CallArgs, blockNrOrHash rpc.BlockNumberOrHash) (hexutil.Uint64, error) { + // Retrieve the base state and mutate it with any overrides + state, _, err := s.b.StateAndHeaderByNumberOrHash(ctx, blockNrOrHash) + if state == nil || err != nil { + return 0, err + } + // Binary search the gas requirement, as it may be higher than the amount used var ( lo uint64 = params.TxGas - 1 @@ -1267,7 +1275,7 @@ func (s *PublicBlockChainAPI) EstimateGas(ctx context.Context, args CallArgs, bl executable := func(gas uint64) (bool, []byte, error, error) { args.Gas = hexutil.Uint64(gas) - res, _, failed, err, vmErr := s.doCall(ctx, args, rpc.LatestBlockNumber, vm.Config{}, 0) + res, _, failed, err, vmErr := s.doCall(ctx, args, blockNrOrHash, vm.Config{}, 0) if err != nil { if errors.Is(err, vm.ErrOutOfGas) || errors.Is(err, core.ErrIntrinsicGas) { return false, nil, nil, nil // Special case, raise gas limit @@ -1340,6 +1348,16 @@ func (s *PublicBlockChainAPI) EstimateGas(ctx context.Context, args CallArgs, bl return hexutil.Uint64(hi), nil } +// EstimateGas returns an estimate of the amount of gas needed to execute the +// given transaction against the current pending block. +func (s *PublicBlockChainAPI) EstimateGas(ctx context.Context, args CallArgs, blockNrOrHash *rpc.BlockNumberOrHash) (hexutil.Uint64, error) { + bNrOrHash := rpc.BlockNumberOrHashWithNumber(rpc.LatestBlockNumber) + if blockNrOrHash != nil { + bNrOrHash = *blockNrOrHash + } + return s.doEstimateGas(ctx, args, bNrOrHash) +} + // ExecutionResult groups all structured logs emitted by the EVM // while replaying a transaction in debug mode as well as transaction // execution status, the amount of gas used and the return value @@ -1765,8 +1783,17 @@ func (s *PublicTransactionPoolAPI) GetRawTransactionByBlockHashAndIndex(ctx cont } // GetTransactionCount returns the number of transactions the given address has sent for the given block number -func (s *PublicTransactionPoolAPI) GetTransactionCount(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (*hexutil.Uint64, error) { - state, _, err := s.b.StateAndHeaderByNumber(ctx, blockNr) +func (s *PublicTransactionPoolAPI) GetTransactionCount(ctx context.Context, address common.Address, blockNrOrHash rpc.BlockNumberOrHash) (*hexutil.Uint64, error) { + // Ask transaction pool for the nonce which includes pending transactions + if blockNr, ok := blockNrOrHash.Number(); ok && blockNr == rpc.PendingBlockNumber { + nonce, err := s.b.GetPoolNonce(ctx, address) + if err != nil { + return nil, err + } + return (*hexutil.Uint64)(&nonce), nil + } + // Resolve block number and use its state to ask for the nonce + state, _, err := s.b.StateAndHeaderByNumberOrHash(ctx, blockNrOrHash) if state == nil || err != nil { return nil, err } diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 34b41b71c314..2304490787cc 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -57,8 +57,13 @@ type Backend interface { // BlockChain API SetHead(number uint64) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) + HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) + HeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Header, error) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Block, error) + BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) + BlockByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Block, error) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*state.StateDB, *types.Header, error) + StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error) GetBlock(ctx context.Context, blockHash common.Hash) (*types.Block, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) GetTd(blockHash common.Hash) *big.Int diff --git a/les/api_backend.go b/les/api_backend.go index 29e0abdf0506..152db5e488c0 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -74,6 +74,30 @@ func (b *LesApiBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNum return b.eth.blockchain.GetHeaderByNumberOdr(ctx, uint64(blockNr)) } +func (b *LesApiBackend) HeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Header, error) { + if blockNr, ok := blockNrOrHash.Number(); ok { + return b.HeaderByNumber(ctx, blockNr) + } + if hash, ok := blockNrOrHash.Hash(); ok { + header, err := b.HeaderByHash(ctx, hash) + if err != nil { + return nil, err + } + if header == nil { + return nil, errors.New("header for hash not found") + } + if blockNrOrHash.RequireCanonical && b.eth.blockchain.GetCanonicalHash(header.Number.Uint64()) != hash { + return nil, errors.New("hash is not currently canonical") + } + return header, nil + } + return nil, errors.New("invalid arguments; neither block nor hash specified") +} + +func (b *LesApiBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + return b.eth.blockchain.GetHeaderByHash(hash), nil +} + func (b *LesApiBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Block, error) { header, err := b.HeaderByNumber(ctx, blockNr) if header == nil || err != nil { @@ -82,6 +106,30 @@ func (b *LesApiBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumb return b.GetBlock(ctx, header.Hash()) } +func (b *LesApiBackend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { + return b.eth.blockchain.GetBlockByHash(ctx, hash) +} + +func (b *LesApiBackend) BlockByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*types.Block, error) { + if blockNr, ok := blockNrOrHash.Number(); ok { + return b.BlockByNumber(ctx, blockNr) + } + if hash, ok := blockNrOrHash.Hash(); ok { + block, err := b.BlockByHash(ctx, hash) + if err != nil { + return nil, err + } + if block == nil { + return nil, errors.New("header found, but block body is missing") + } + if blockNrOrHash.RequireCanonical && b.eth.blockchain.GetCanonicalHash(block.NumberU64()) != hash { + return nil, errors.New("hash is not currently canonical") + } + return block, nil + } + return nil, errors.New("invalid arguments; neither block nor hash specified") +} + func (b *LesApiBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*state.StateDB, *types.Header, error) { header, err := b.HeaderByNumber(ctx, blockNr) if header == nil || err != nil { @@ -90,6 +138,23 @@ func (b *LesApiBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc. return light.NewState(ctx, header, b.eth.odr), header, nil } +func (b *LesApiBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error) { + if blockNr, ok := blockNrOrHash.Number(); ok { + return b.StateAndHeaderByNumber(ctx, blockNr) + } + if hash, ok := blockNrOrHash.Hash(); ok { + header := b.eth.blockchain.GetHeaderByHash(hash) + if header == nil { + return nil, nil, errors.New("header for hash not found") + } + if blockNrOrHash.RequireCanonical && b.eth.blockchain.GetCanonicalHash(header.Number.Uint64()) != hash { + return nil, nil, errors.New("hash is not currently canonical") + } + return light.NewState(ctx, header, b.eth.odr), header, nil + } + return nil, nil, errors.New("invalid arguments; neither block nor hash specified") +} + func (b *LesApiBackend) GetBlock(ctx context.Context, blockHash common.Hash) (*types.Block, error) { return b.eth.blockchain.GetBlockByHash(ctx, blockHash) } diff --git a/light/lightchain.go b/light/lightchain.go index a6acd7b447c6..72873a57c2fd 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -420,6 +420,11 @@ func (bc *LightChain) HasHeader(hash common.Hash, number uint64) bool { return bc.hc.HasHeader(hash, number) } +// GetCanonicalHash returns the canonical hash for a given block number +func (bc *LightChain) GetCanonicalHash(number uint64) common.Hash { + return bc.hc.GetCanonicalHash(number) +} + // GetBlockHashesFromHash retrieves a number of block hashes starting at a given // hash, fetching towards the genesis block. func (self *LightChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash { diff --git a/miner/worker.go b/miner/worker.go index 6967ae5207eb..6f7ee10fce79 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -563,7 +563,7 @@ func (self *worker) commitNewWork() { tstamp = parent.Time().Int64() + 1 } // this will ensure we're not going off too far in the future - if now := time.Now().Unix(); tstamp > now+1 { + if now := time.Now().Unix(); tstamp > now { wait := time.Duration(tstamp-now) * time.Second log.Info("Mining too far in the future", "wait", common.PrettyDuration(wait)) time.Sleep(wait) diff --git a/params/version.go b/params/version.go index f458365c4801..41efb96e34df 100644 --- a/params/version.go +++ b/params/version.go @@ -22,7 +22,7 @@ import ( const ( VersionMajor = 2 // Major version component of the current release - VersionMinor = 1 // Minor version component of the current release + VersionMinor = 2 // Minor version component of the current release VersionPatch = 0 // Patch version component of the current release VersionMeta = "beta1" // Version metadata to append to the version string )