Skip to content
This repository was archived by the owner on Oct 11, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func New(config Config) (*App, error) {
}

// Initialize block watcher (but don't start it yet).
blockWatcherClient, err := blockwatch.NewRpcClient(ethClient, ethereumRPCRequestTimeout)
blockWatcherClient, err := blockwatch.NewRpcClient(config.EthereumRPCURL, ethereumRPCRequestTimeout)
if err != nil {
return nil, err
}
Expand All @@ -104,6 +104,16 @@ func New(config Config) (*App, error) {
Client: blockWatcherClient,
}
blockWatcher := blockwatch.New(blockWatcherConfig)
go func() {
for {
err, isOpen := <-blockWatcher.Errors
if isOpen {
log.WithField("error", err).Error("BlockWatcher error encountered")
} else {
return // Exit when the error channel is closed
}
}
}()

// Initialize order watcher (but don't start it yet).
orderWatcher, err := orderwatch.New(db, blockWatcher, ethClient, config.EthereumNetworkID, config.OrderExpirationBuffer)
Expand Down Expand Up @@ -257,6 +267,6 @@ func (app *App) Close() {
if err := app.orderWatcher.Stop(); err != nil {
log.WithField("error", err.Error()).Error("error while closing orderWatcher")
}
app.blockWatcher.StopPolling()
app.blockWatcher.Stop()
app.db.Close()
}
12 changes: 10 additions & 2 deletions ethereum/blockwatch/block_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func (w *Watcher) startPollingLoop() {
}
}

// StopPolling stops the block poller
func (w *Watcher) StopPolling() {
// stopPolling stops the block poller
func (w *Watcher) stopPolling() {
w.mu.Lock()
defer w.mu.Unlock()
w.isWatching = false
Expand All @@ -129,6 +129,14 @@ func (w *Watcher) StopPolling() {
w.ticker = nil
}

// Stop stops the BlockWatcher
func (w *Watcher) Stop() {
Comment thread
fabioberger marked this conversation as resolved.
if w.isWatching {
w.stopPolling()
}
close(w.Errors)
}

// Subscribe allows one to subscribe to the block events emitted by the Watcher.
// To unsubscribe, simply call `Unsubscribe` on the returned subscription.
// The sink channel should have ample buffer space to avoid blocking other subscribers.
Expand Down
4 changes: 2 additions & 2 deletions ethereum/blockwatch/block_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestWatcherStartStop(t *testing.T) {
}
watcher := New(config)
require.NoError(t, watcher.StartPolling())
watcher.StopPolling()
watcher.stopPolling()
Comment thread
fabioberger marked this conversation as resolved.
require.NoError(t, watcher.StartPolling())
watcher.StopPolling()
watcher.Stop()
}
49 changes: 44 additions & 5 deletions ethereum/blockwatch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package blockwatch

import (
"context"
"errors"
"fmt"
"math/big"
"time"

"github.com/0xProject/0x-mesh/meshdb"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
)

// Client defines the methods needed to satisfy the client expected when
Expand All @@ -22,29 +26,64 @@ type Client interface {

// RpcClient is a Client for fetching Ethereum blocks from a specific JSON-RPC endpoint.
type RpcClient struct {
rpcClient *rpc.Client
client *ethclient.Client
requestTimeout time.Duration
}

// NewRpcClient returns a new Client for fetching Ethereum blocks using the given
// ethclient.Client.
func NewRpcClient(ethClient *ethclient.Client, requestTimeout time.Duration) (*RpcClient, error) {
return &RpcClient{ethClient, requestTimeout}, nil
func NewRpcClient(rpcURL string, requestTimeout time.Duration) (*RpcClient, error) {
ethClient, err := ethclient.Dial(rpcURL)
if err != nil {
return nil, err
}
rpcClient, err := rpc.Dial(rpcURL)
if err != nil {
return nil, err
}
return &RpcClient{rpcClient: rpcClient, client: ethClient, requestTimeout: requestTimeout}, nil
}

type GetBlockByNumberResponse struct {
Hash common.Hash `json:"hash"`
Comment thread
fabioberger marked this conversation as resolved.
ParentHash common.Hash `json:"parentHash"`
Number string `json:"number"`
}

// HeaderByNumber fetches a block header by its number. If no `number` is supplied, it will return the latest
// block header. If no block exists with this number it will return a `ethereum.NotFound` error.
func (rc *RpcClient) HeaderByNumber(number *big.Int) (*meshdb.MiniHeader, error) {
ctx, cancel := context.WithTimeout(context.Background(), rc.requestTimeout)
defer cancel()
header, err := rc.client.HeaderByNumber(ctx, number)

var blockParam string
if number == nil {
blockParam = "latest"
} else {
blockParam = fmt.Sprintf("0x%s", common.Bytes2Hex(number.Bytes()))
}
shouldIncludeTransactions := false

// Note(fabio): We use a raw RPC call here instead of `EthClient`'s `BlockByNumber()` method because block
// hashes are computed differently on Kovan vs. mainnet, resulting in the wrong block hash being returned by
// `BlockByNumber` when using Kovan. By doing a raw RPC call, we can simply use the blockHash returned in the
// RPC response rather than re-compute it from the block header.
// Source: https://github.com/ethereum/go-ethereum/pull/18166
var header GetBlockByNumberResponse
err := rc.rpcClient.CallContext(ctx, &header, "eth_getBlockByNumber", blockParam, shouldIncludeTransactions)
if err != nil {
return nil, err
}

blockNum, ok := math.ParseBig256(header.Number)
if !ok {
return nil, errors.New("Failed to parse big.Int value from hex-encoded block number returned from eth_getBlockByNumber")
}
miniHeader := &meshdb.MiniHeader{
Hash: header.Hash(),
Hash: header.Hash,
Parent: header.ParentHash,
Number: header.Number,
Number: blockNum,
}
return miniHeader, nil
}
Expand Down