Skip to content
This repository has been archived by the owner on Apr 4, 2024. It is now read-only.

Commit

Permalink
rpc: namespaces refractor (#170)
Browse files Browse the repository at this point in the history
* refactor: rpc namespace refactor - txpool

* refactor: rpc namespace refactor - net

* refactor: rpc namespace refactor - web3

* refactor: rpc namespace refactor - eth

* refactor: rpc namespace refactor - personal

* fix: api to uppercase

* fix: fix import cycle

* fix: fix import cycle
  • Loading branch information
crypto-facs authored Jun 23, 2021
1 parent 1f96204 commit 1c06553
Show file tree
Hide file tree
Showing 14 changed files with 292 additions and 256 deletions.
21 changes: 14 additions & 7 deletions ethereum/rpc/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ package rpc
import (
"github.com/cosmos/cosmos-sdk/client"
"github.com/ethereum/go-ethereum/rpc"
"github.com/tharsis/ethermint/ethereum/rpc/backend"
"github.com/tharsis/ethermint/ethereum/rpc/namespaces/eth"
"github.com/tharsis/ethermint/ethereum/rpc/namespaces/eth/filters"
"github.com/tharsis/ethermint/ethereum/rpc/namespaces/net"
"github.com/tharsis/ethermint/ethereum/rpc/namespaces/personal"
"github.com/tharsis/ethermint/ethereum/rpc/namespaces/txpool"
"github.com/tharsis/ethermint/ethereum/rpc/namespaces/web3"
"github.com/tharsis/ethermint/ethereum/rpc/types"

rpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client"
Expand All @@ -24,14 +31,14 @@ const (
// GetRPCAPIs returns the list of all APIs
func GetRPCAPIs(clientCtx client.Context, tmWSClient *rpcclient.WSClient) []rpc.API {
nonceLock := new(types.AddrLocker)
backend := NewEVMBackend(clientCtx)
ethAPI := NewPublicEthAPI(clientCtx, backend, nonceLock)
backend := backend.NewEVMBackend(clientCtx)
ethAPI := eth.NewPublicAPI(clientCtx, backend, nonceLock)

return []rpc.API{
{
Namespace: Web3Namespace,
Version: apiVersion,
Service: NewPublicWeb3API(),
Service: web3.NewPublicAPI(),
Public: true,
},
{
Expand All @@ -43,25 +50,25 @@ func GetRPCAPIs(clientCtx client.Context, tmWSClient *rpcclient.WSClient) []rpc.
{
Namespace: EthNamespace,
Version: apiVersion,
Service: NewPublicFilterAPI(tmWSClient, backend),
Service: filters.NewPublicAPI(tmWSClient, backend),
Public: true,
},
{
Namespace: NetNamespace,
Version: apiVersion,
Service: NewPublicNetAPI(clientCtx),
Service: net.NewPublicAPI(clientCtx),
Public: true,
},
{
Namespace: PersonalNamespace,
Version: apiVersion,
Service: NewPersonalAPI(ethAPI),
Service: personal.NewAPI(ethAPI),
Public: true,
},
{
Namespace: TxPoolNamespace,
Version: apiVersion,
Service: NewPublicTxPoolAPI(),
Service: txpool.NewPublicAPI(),
Public: true,
},
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rpc
package backend

import (
"context"
Expand Down
91 changes: 46 additions & 45 deletions ethereum/rpc/eth_api.go → ethereum/rpc/namespaces/eth/api.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rpc
package filters

import (
"context"
Expand Down Expand Up @@ -57,8 +57,8 @@ type PublicFilterAPI struct {
filters map[rpc.ID]*filter
}

// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(tmWSClient *rpcclient.WSClient, backend FiltersBackend) *PublicFilterAPI {
// NewPublicAPI returns a new PublicFilterAPI instance.
func NewPublicAPI(tmWSClient *rpcclient.WSClient, backend FiltersBackend) *PublicFilterAPI {
api := &PublicFilterAPI{
backend: backend,
filters: make(map[rpc.ID]*filter),
Expand Down Expand Up @@ -371,7 +371,7 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit filters.FilterCriteri
return
}

logs := filterLogs(evmtypes.LogsToEthereum(txResponse.Logs), crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)
logs := FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), crit.FromBlock, crit.ToBlock, crit.Addresses, crit.Topics)

for _, log := range logs {
err = notifier.Notify(rpcSub.ID, log)
Expand Down Expand Up @@ -448,7 +448,7 @@ func (api *PublicFilterAPI) NewFilter(criteria filters.FilterCriteria) (rpc.ID,
return
}

logs := filterLogs(evmtypes.LogsToEthereum(txResponse.Logs), criteria.FromBlock, criteria.ToBlock, criteria.Addresses, criteria.Topics)
logs := FilterLogs(evmtypes.LogsToEthereum(txResponse.Logs), criteria.FromBlock, criteria.ToBlock, criteria.Addresses, criteria.Topics)

api.filtersMu.Lock()
if f, found := api.filters[filterID]; found {
Expand Down Expand Up @@ -595,21 +595,3 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
return nil, fmt.Errorf("invalid filter %s type %d", id, f.typ)
}
}

// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
// otherwise the given hashes array is returned.
func returnHashes(hashes []common.Hash) []common.Hash {
if hashes == nil {
return []common.Hash{}
}
return hashes
}

// returnLogs is a helper that will return an empty log array in case the given logs array is nil,
// otherwise the given logs array is returned.
func returnLogs(logs []*ethtypes.Log) []*ethtypes.Log {
if logs == nil {
return []*ethtypes.Log{}
}
return logs
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rpc
package filters

import (
"context"
Expand Down Expand Up @@ -297,49 +297,3 @@ func (es *EventSystem) consumeEvents() {
time.Sleep(time.Second)
}
}

// Subscription defines a wrapper for the private subscription
type Subscription struct {
id rpc.ID
typ filters.Type
event string
created time.Time
logsCrit filters.FilterCriteria
logs chan []*ethtypes.Log
hashes chan []common.Hash
headers chan *ethtypes.Header
installed chan struct{} // closed when the filter is installed
eventCh <-chan coretypes.ResultEvent
err chan error
}

// ID returns the underlying subscription RPC identifier.
func (s Subscription) ID() rpc.ID {
return s.id
}

// Unsubscribe from the current subscription to Tendermint Websocket. It sends an error to the
// subscription error channel if unsubscription fails.
func (s *Subscription) Unsubscribe(es *EventSystem) {
go func() {
uninstallLoop:
for {
// write uninstall request and consume logs/hashes. This prevents
// the eventLoop broadcast method to deadlock when writing to the
// filter event channel while the subscription loop is waiting for
// this method to return (and thus not reading these events).
select {
case es.uninstall <- s:
break uninstallLoop
case <-s.logs:
case <-s.hashes:
case <-s.headers:
}
}
}()
}

// Err returns the error channel
func (s *Subscription) Err() <-chan error {
return s.err
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rpc
package filters

import (
"context"
Expand Down Expand Up @@ -198,7 +198,7 @@ func (f *Filter) blockLogs(header *ethtypes.Header) ([]*ethtypes.Log, error) {
unfiltered = append(unfiltered, logs...)
}

logs := filterLogs(unfiltered, nil, nil, f.criteria.Addresses, f.criteria.Topics)
logs := FilterLogs(unfiltered, nil, nil, f.criteria.Addresses, f.criteria.Topics)
if len(logs) == 0 {
return []*ethtypes.Log{}, nil
}
Expand All @@ -222,81 +222,5 @@ func (f *Filter) checkMatches(transactions []common.Hash) []*ethtypes.Log {
unfiltered = append(unfiltered, logs...)
}

return filterLogs(unfiltered, f.criteria.FromBlock, f.criteria.ToBlock, f.criteria.Addresses, f.criteria.Topics)
}

// filterLogs creates a slice of logs matching the given criteria.
// [] -> anything
// [A] -> A in first position of log topics, anything after
// [null, B] -> anything in first position, B in second position
// [A, B] -> A in first position and B in second position
// [[A, B], [A, B]] -> A or B in first position, A or B in second position
func filterLogs(logs []*ethtypes.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*ethtypes.Log {
var ret []*ethtypes.Log
Logs:
for _, log := range logs {
if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber {
continue
}
if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber {
continue
}
if len(addresses) > 0 && !includes(addresses, log.Address) {
continue
}
// If the to filtered topics is greater than the amount of topics in logs, skip.
if len(topics) > len(log.Topics) {
continue
}
for i, sub := range topics {
match := len(sub) == 0 // empty rule set == wildcard
for _, topic := range sub {
if log.Topics[i] == topic {
match = true
break
}
}
if !match {
continue Logs
}
}
ret = append(ret, log)
}
return ret
}

func includes(addresses []common.Address, a common.Address) bool {
for _, addr := range addresses {
if addr == a {
return true
}
}

return false
}

func bloomFilter(bloom ethtypes.Bloom, addresses []common.Address, topics [][]common.Hash) bool {
var included bool
if len(addresses) > 0 {
for _, addr := range addresses {
if ethtypes.BloomLookup(bloom, addr) {
included = true
break
}
}
if !included {
return false
}
}

for _, sub := range topics {
included = len(sub) == 0 // empty rule set == wildcard
for _, topic := range sub {
if ethtypes.BloomLookup(bloom, topic) {
included = true
break
}
}
}
return included
return FilterLogs(unfiltered, f.criteria.FromBlock, f.criteria.ToBlock, f.criteria.Addresses, f.criteria.Topics)
}
62 changes: 62 additions & 0 deletions ethereum/rpc/namespaces/eth/filters/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package filters

import (
"time"

"github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/rpc"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
)

// Subscription defines a wrapper for the private subscription
type Subscription struct {
id rpc.ID
typ filters.Type
event string
created time.Time
logsCrit filters.FilterCriteria
logs chan []*ethtypes.Log
hashes chan []common.Hash
headers chan *ethtypes.Header
installed chan struct{} // closed when the filter is installed
eventCh <-chan coretypes.ResultEvent
err chan error
}

// ID returns the underlying subscription RPC identifier.
func (s Subscription) ID() rpc.ID {
return s.id
}

// Unsubscribe from the current subscription to Tendermint Websocket. It sends an error to the
// subscription error channel if unsubscription fails.
func (s *Subscription) Unsubscribe(es *EventSystem) {
go func() {
uninstallLoop:
for {
// write uninstall request and consume logs/hashes. This prevents
// the eventLoop broadcast method to deadlock when writing to the
// filter event channel while the subscription loop is waiting for
// this method to return (and thus not reading these events).
select {
case es.uninstall <- s:
break uninstallLoop
case <-s.logs:
case <-s.hashes:
case <-s.headers:
}
}
}()
}

// Err returns the error channel
func (s *Subscription) Err() <-chan error {
return s.err
}

// Event returns the tendermint result event channel
func (s *Subscription) Event() <-chan coretypes.ResultEvent {
return s.eventCh
}
Loading

0 comments on commit 1c06553

Please sign in to comment.