diff --git a/core/core.go b/core/core.go index 129249729..9a646f766 100644 --- a/core/core.go +++ b/core/core.go @@ -89,7 +89,7 @@ func New(config Config) (*App, error) { } // Initialize block watcher (but don't start it yet). - blockWatcherClient, err := blockwatch.NewRpcClient(ethClient, ethereumRPCRequestTimeout) + blockWatcherClient, err := blockwatch.NewRpcClient(config.EthereumRPCURL, ethereumRPCRequestTimeout) if err != nil { return nil, err } @@ -104,6 +104,16 @@ func New(config Config) (*App, error) { Client: blockWatcherClient, } blockWatcher := blockwatch.New(blockWatcherConfig) + go func() { + for { + err, isOpen := <-blockWatcher.Errors + if isOpen { + log.WithField("error", err).Error("BlockWatcher error encountered") + } else { + return // Exit when the error channel is closed + } + } + }() // Initialize order watcher (but don't start it yet). orderWatcher, err := orderwatch.New(db, blockWatcher, ethClient, config.EthereumNetworkID, config.OrderExpirationBuffer) @@ -257,6 +267,6 @@ func (app *App) Close() { if err := app.orderWatcher.Stop(); err != nil { log.WithField("error", err.Error()).Error("error while closing orderWatcher") } - app.blockWatcher.StopPolling() + app.blockWatcher.Stop() app.db.Close() } diff --git a/ethereum/blockwatch/block_watcher.go b/ethereum/blockwatch/block_watcher.go index 3e4f5eb64..46d730495 100644 --- a/ethereum/blockwatch/block_watcher.go +++ b/ethereum/blockwatch/block_watcher.go @@ -118,8 +118,8 @@ func (w *Watcher) startPollingLoop() { } } -// StopPolling stops the block poller -func (w *Watcher) StopPolling() { +// stopPolling stops the block poller +func (w *Watcher) stopPolling() { w.mu.Lock() defer w.mu.Unlock() w.isWatching = false @@ -129,6 +129,14 @@ func (w *Watcher) StopPolling() { w.ticker = nil } +// Stop stops the BlockWatcher +func (w *Watcher) Stop() { + if w.isWatching { + w.stopPolling() + } + close(w.Errors) +} + // Subscribe allows one to subscribe to the block events emitted by the Watcher. // To unsubscribe, simply call `Unsubscribe` on the returned subscription. // The sink channel should have ample buffer space to avoid blocking other subscribers. diff --git a/ethereum/blockwatch/block_watcher_test.go b/ethereum/blockwatch/block_watcher_test.go index 32c0935dc..fd663c3ff 100644 --- a/ethereum/blockwatch/block_watcher_test.go +++ b/ethereum/blockwatch/block_watcher_test.go @@ -85,7 +85,7 @@ func TestWatcherStartStop(t *testing.T) { } watcher := New(config) require.NoError(t, watcher.StartPolling()) - watcher.StopPolling() + watcher.stopPolling() require.NoError(t, watcher.StartPolling()) - watcher.StopPolling() + watcher.Stop() } diff --git a/ethereum/blockwatch/client.go b/ethereum/blockwatch/client.go index 3f2962a5b..3e3aca446 100644 --- a/ethereum/blockwatch/client.go +++ b/ethereum/blockwatch/client.go @@ -2,14 +2,18 @@ package blockwatch import ( "context" + "errors" + "fmt" "math/big" "time" "github.com/0xProject/0x-mesh/meshdb" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/rpc" ) // Client defines the methods needed to satisfy the client expected when @@ -22,14 +26,29 @@ type Client interface { // RpcClient is a Client for fetching Ethereum blocks from a specific JSON-RPC endpoint. type RpcClient struct { + rpcClient *rpc.Client client *ethclient.Client requestTimeout time.Duration } // NewRpcClient returns a new Client for fetching Ethereum blocks using the given // ethclient.Client. -func NewRpcClient(ethClient *ethclient.Client, requestTimeout time.Duration) (*RpcClient, error) { - return &RpcClient{ethClient, requestTimeout}, nil +func NewRpcClient(rpcURL string, requestTimeout time.Duration) (*RpcClient, error) { + ethClient, err := ethclient.Dial(rpcURL) + if err != nil { + return nil, err + } + rpcClient, err := rpc.Dial(rpcURL) + if err != nil { + return nil, err + } + return &RpcClient{rpcClient: rpcClient, client: ethClient, requestTimeout: requestTimeout}, nil +} + +type GetBlockByNumberResponse struct { + Hash common.Hash `json:"hash"` + ParentHash common.Hash `json:"parentHash"` + Number string `json:"number"` } // HeaderByNumber fetches a block header by its number. If no `number` is supplied, it will return the latest @@ -37,14 +56,34 @@ func NewRpcClient(ethClient *ethclient.Client, requestTimeout time.Duration) (*R func (rc *RpcClient) HeaderByNumber(number *big.Int) (*meshdb.MiniHeader, error) { ctx, cancel := context.WithTimeout(context.Background(), rc.requestTimeout) defer cancel() - header, err := rc.client.HeaderByNumber(ctx, number) + + var blockParam string + if number == nil { + blockParam = "latest" + } else { + blockParam = fmt.Sprintf("0x%s", common.Bytes2Hex(number.Bytes())) + } + shouldIncludeTransactions := false + + // Note(fabio): We use a raw RPC call here instead of `EthClient`'s `BlockByNumber()` method because block + // hashes are computed differently on Kovan vs. mainnet, resulting in the wrong block hash being returned by + // `BlockByNumber` when using Kovan. By doing a raw RPC call, we can simply use the blockHash returned in the + // RPC response rather than re-compute it from the block header. + // Source: https://github.com/ethereum/go-ethereum/pull/18166 + var header GetBlockByNumberResponse + err := rc.rpcClient.CallContext(ctx, &header, "eth_getBlockByNumber", blockParam, shouldIncludeTransactions) if err != nil { return nil, err } + + blockNum, ok := math.ParseBig256(header.Number) + if !ok { + return nil, errors.New("Failed to parse big.Int value from hex-encoded block number returned from eth_getBlockByNumber") + } miniHeader := &meshdb.MiniHeader{ - Hash: header.Hash(), + Hash: header.Hash, Parent: header.ParentHash, - Number: header.Number, + Number: blockNum, } return miniHeader, nil }