Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion accounts/abi/abi.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ func (abi *ABI) UnmarshalJSON(data []byte) error {
Type string
Name string
Constant bool
Indexed bool
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why you remove this from the list of fields to be un-marshaled? I understand that it is derived from another source, but what if the field is still present in some client code?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indexed only makes sense in the context of an event field (which is inside "inputs"). In the outer ABI context there's no notion of "indexed" afaik.

Anonymous bool
Inputs []Argument
Outputs []Argument
Expand Down
27 changes: 21 additions & 6 deletions accounts/abi/bind/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ type ContractCaller interface {
CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error)
}

// DeployBackend wraps the operations needed by WaitMined and WaitDeployed.
type DeployBackend interface {
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error)
}

// PendingContractCaller defines methods to perform contract calls on the pending state.
// Call will try to discover this interface when access to the pending state is requested.
// If the backend does not support the pending state, Call returns ErrNoPendingState.
Expand Down Expand Up @@ -90,8 +84,29 @@ type ContractTransactor interface {
SendTransaction(ctx context.Context, tx *types.Transaction) error
}

// ContractFilterer defines the methods needed to access log events using one-off
// queries or continuous event subscriptions.
type ContractFilterer interface {
// FilterLogs executes a log filter operation, blocking during execution and
// returning all the results in one batch.
//
// TODO(karalabe): Deprecate when the subscription one can return past data too.
FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error)

// SubscribeFilterLogs creates a background log filtering operation, returning
// a subscription immediately, which can be used to stream the found events.
SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error)
}

// DeployBackend wraps the operations needed by WaitMined and WaitDeployed.
type DeployBackend interface {
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error)
}

// ContractBackend defines the methods needed to work with contracts on a read-write basis.
type ContractBackend interface {
ContractCaller
ContractTransactor
ContractFilterer
}
121 changes: 118 additions & 3 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ import (
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
)

// This nil assignment ensures compile time that SimulatedBackend implements bind.ContractBackend.
Expand All @@ -53,6 +57,8 @@ type SimulatedBackend struct {
pendingBlock *types.Block // Currently pending block that will be imported on request
pendingState *state.StateDB // Currently pending state that will be the active on on request

events *filters.EventSystem // Event system for filtering log events live

config *params.ChainConfig
}

Expand All @@ -63,7 +69,13 @@ func NewSimulatedBackend(alloc core.GenesisAlloc) *SimulatedBackend {
genesis := core.Genesis{Config: params.AllEthashProtocolChanges, Alloc: alloc}
genesis.MustCommit(database)
blockchain, _ := core.NewBlockChain(database, genesis.Config, ethash.NewFaker(), vm.Config{})
backend := &SimulatedBackend{database: database, blockchain: blockchain, config: genesis.Config}

backend := &SimulatedBackend{
database: database,
blockchain: blockchain,
config: genesis.Config,
events: filters.NewEventSystem(new(event.TypeMux), &filterBackend{database, blockchain}, false),
}
backend.rollback()
return backend
}
Expand Down Expand Up @@ -248,7 +260,7 @@ func (b *SimulatedBackend) EstimateGas(ctx context.Context, call ethereum.CallMs
return hi, nil
}

// callContract implemens common code between normal and pending contract calls.
// callContract implements common code between normal and pending contract calls.
// state is modified during execution, make sure to copy it if necessary.
func (b *SimulatedBackend) callContract(ctx context.Context, call ethereum.CallMsg, block *types.Block, statedb *state.StateDB) ([]byte, uint64, bool, error) {
// Ensure message is initialized properly.
Expand Down Expand Up @@ -302,7 +314,69 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa
return nil
}

// JumpTimeInSeconds adds skip seconds to the clock
// FilterLogs executes a log filter operation, blocking during execution and
// returning all the results in one batch.
//
// TODO(karalabe): Deprecate when the subscription one can return past data too.
func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) {
// Initialize unset filter boundaried to run from genesis to chain head
from := int64(0)
if query.FromBlock != nil {
from = query.FromBlock.Int64()
}
to := int64(-1)
if query.ToBlock != nil {
to = query.ToBlock.Int64()
}
// Construct and execute the filter
filter := filters.New(&filterBackend{b.database, b.blockchain}, from, to, query.Addresses, query.Topics)

logs, err := filter.Logs(ctx)
if err != nil {
return nil, err
}
res := make([]types.Log, len(logs))
for i, log := range logs {
res[i] = *log
}
return res, nil
}

// SubscribeFilterLogs creates a background log filtering operation, returning a
// subscription immediately, which can be used to stream the found events.
func (b *SimulatedBackend) SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) {
// Subscribe to contract events
sink := make(chan []*types.Log)

sub, err := b.events.SubscribeLogs(query, sink)
if err != nil {
return nil, err
}
// Since we're getting logs in batches, we need to flatten them into a plain stream
return event.NewSubscription(func(quit <-chan struct{}) error {
defer sub.Unsubscribe()
for {
select {
case logs := <-sink:
for _, log := range logs {
select {
case ch <- *log:
case err := <-sub.Err():
return err
case <-quit:
return nil
}
}
case err := <-sub.Err():
return err
case <-quit:
return nil
}
}
}), nil
}

// AdjustTime adds a time shift to the simulated clock.
func (b *SimulatedBackend) AdjustTime(adjustment time.Duration) error {
b.mu.Lock()
defer b.mu.Unlock()
Expand Down Expand Up @@ -331,3 +405,44 @@ func (m callmsg) GasPrice() *big.Int { return m.CallMsg.GasPrice }
func (m callmsg) Gas() uint64 { return m.CallMsg.Gas }
func (m callmsg) Value() *big.Int { return m.CallMsg.Value }
func (m callmsg) Data() []byte { return m.CallMsg.Data }

// filterBackend implements filters.Backend to support filtering for logs without
// taking bloom-bits acceleration structures into account.
type filterBackend struct {
db ethdb.Database
bc *core.BlockChain
}

func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }
func (fb *filterBackend) EventMux() *event.TypeMux { panic("not supported") }

func (fb *filterBackend) HeaderByNumber(ctx context.Context, block rpc.BlockNumber) (*types.Header, error) {
if block == rpc.LatestBlockNumber {
return fb.bc.CurrentHeader(), nil
}
return fb.bc.GetHeaderByNumber(uint64(block.Int64())), nil
}
func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) {
return core.GetBlockReceipts(fb.db, hash, core.GetBlockNumber(fb.db, hash)), nil
}

func (fb *filterBackend) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
<-quit
return nil
})
}
func (fb *filterBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
return fb.bc.SubscribeChainEvent(ch)
}
func (fb *filterBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
return fb.bc.SubscribeRemovedLogsEvent(ch)
}
func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return fb.bc.SubscribeLogsEvent(ch)
}

func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }
func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
panic("not supported")
}
121 changes: 119 additions & 2 deletions accounts/abi/bind/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event"
)

// SignerFn is a signer function callback when a contract requires a method to
Expand Down Expand Up @@ -55,6 +56,22 @@ type TransactOpts struct {
Context context.Context // Network context to support cancellation and timeouts (nil = no timeout)
}

// FilterOpts is the collection of options to fine tune filtering for events
// within a bound contract.
type FilterOpts struct {
Start uint64 // Start of the queried range
End *uint64 // End of the range (nil = latest)

Context context.Context // Network context to support cancellation and timeouts (nil = no timeout)
}

// WatchOpts is the collection of options to fine tune subscribing for events
// within a bound contract.
type WatchOpts struct {
Start *uint64 // Start of the queried range (nil = latest)
Context context.Context // Network context to support cancellation and timeouts (nil = no timeout)
}

// BoundContract is the base wrapper object that reflects a contract on the
// Ethereum network. It contains a collection of methods that are used by the
// higher level contract bindings to operate.
Expand All @@ -63,24 +80,26 @@ type BoundContract struct {
abi abi.ABI // Reflect based ABI to access the correct Ethereum methods
caller ContractCaller // Read interface to interact with the blockchain
transactor ContractTransactor // Write interface to interact with the blockchain
filterer ContractFilterer // Event filtering to interact with the blockchain
}

// NewBoundContract creates a low level contract interface through which calls
// and transactions may be made through.
func NewBoundContract(address common.Address, abi abi.ABI, caller ContractCaller, transactor ContractTransactor) *BoundContract {
func NewBoundContract(address common.Address, abi abi.ABI, caller ContractCaller, transactor ContractTransactor, filterer ContractFilterer) *BoundContract {
return &BoundContract{
address: address,
abi: abi,
caller: caller,
transactor: transactor,
filterer: filterer,
}
}

// DeployContract deploys a contract onto the Ethereum blockchain and binds the
// deployment address with a Go wrapper.
func DeployContract(opts *TransactOpts, abi abi.ABI, bytecode []byte, backend ContractBackend, params ...interface{}) (common.Address, *types.Transaction, *BoundContract, error) {
// Otherwise try to deploy the contract
c := NewBoundContract(common.Address{}, abi, backend, backend)
c := NewBoundContract(common.Address{}, abi, backend, backend, backend)

input, err := c.abi.Pack("", params...)
if err != nil {
Expand Down Expand Up @@ -225,6 +244,104 @@ func (c *BoundContract) transact(opts *TransactOpts, contract *common.Address, i
return signedTx, nil
}

// FilterLogs filters contract logs for past blocks, returning the necessary
// channels to construct a strongly typed bound iterator on top of them.
func (c *BoundContract) FilterLogs(opts *FilterOpts, name string, query ...[]interface{}) (chan types.Log, event.Subscription, error) {
// Don't crash on a lazy user
if opts == nil {
opts = new(FilterOpts)
}
// Append the event selector to the query parameters and construct the topic set
query = append([][]interface{}{{c.abi.Events[name].Id()}}, query...)

topics, err := makeTopics(query...)
if err != nil {
return nil, nil, err
}
// Start the background filtering
logs := make(chan types.Log, 128)

config := ethereum.FilterQuery{
Addresses: []common.Address{c.address},
Topics: topics,
FromBlock: new(big.Int).SetUint64(opts.Start),
}
if opts.End != nil {
config.ToBlock = new(big.Int).SetUint64(*opts.End)
}
/* TODO(karalabe): Replace the rest of the method below with this when supported
sub, err := c.filterer.SubscribeFilterLogs(ensureContext(opts.Context), config, logs)
*/
buff, err := c.filterer.FilterLogs(ensureContext(opts.Context), config)
if err != nil {
return nil, nil, err
}
sub, err := event.NewSubscription(func(quit <-chan struct{}) error {
for _, log := range buff {
select {
case logs <- log:
case <-quit:
return nil
}
}
return nil
}), nil

if err != nil {
return nil, nil, err
}
return logs, sub, nil
}

// WatchLogs filters subscribes to contract logs for future blocks, returning a
// subscription object that can be used to tear down the watcher.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering why you want to make a distinction between past and future logs? Both of them could be handled by the same iterator: you could just make the Iter() function take a start parameter that would either say something like FUTURE_ONLY or PAST_PRESENT_AND_FUTURE events. I don't see any case when a user interested in past event would not also be interested in future events. Also, having an API not making the distinction would save the caller code the trouble of having to handle the case when new events arrive between the moment when they called FilterLogs and the time they call WatchLogs

Copy link
Copy Markdown
Member Author

@karalabe karalabe Jan 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g. When I'm filtering for past events, my code will usually block waiting for the next past event to arrive, until I consumed all past events I filtered for and the subsystem tells me there are no more events coming.

When I'm filtering for future events, my code will not block, rather be event driven, waiting on multiple possible event sources for data to arrive and react to.


To be honest, I can't think of an application where I would want to handle past and future events the same way.

E.g. I can filter for past Akasha events to retrieve the stuff the user posted and serve on a restful API, but that needs to stop and return, not wait for arbitrary future events. On the other hand, I can filter for future Akasha events to detect when users post new content and cache them in my API server via IPFS immediately during posting while the user is still online.

The two use cases are wildly different and have different requirements.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I can't think of an application where I would want to handle past and future events the same way.

I was thinking that implementing something like tail -f in Unix would be very complicated with this approach. I guess that this is an unlikely use case, then.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latter WatchXYZ is the one which supposed to handle that case.

func (c *BoundContract) WatchLogs(opts *WatchOpts, name string, query ...[]interface{}) (chan types.Log, event.Subscription, error) {
// Don't crash on a lazy user
if opts == nil {
opts = new(WatchOpts)
}
// Append the event selector to the query parameters and construct the topic set
query = append([][]interface{}{{c.abi.Events[name].Id()}}, query...)

topics, err := makeTopics(query...)
if err != nil {
return nil, nil, err
}
// Start the background filtering
logs := make(chan types.Log, 128)

config := ethereum.FilterQuery{
Addresses: []common.Address{c.address},
Topics: topics,
}
if opts.Start != nil {
config.FromBlock = new(big.Int).SetUint64(*opts.Start)
}
sub, err := c.filterer.SubscribeFilterLogs(ensureContext(opts.Context), config, logs)
if err != nil {
return nil, nil, err
}
return logs, sub, nil
}

// UnpackLog unpacks a retrieved log into the provided output structure.
func (c *BoundContract) UnpackLog(out interface{}, event string, log types.Log) error {
if len(log.Data) > 0 {
if err := c.abi.Unpack(out, event, log.Data); err != nil {
return err
}
}
var indexed abi.Arguments
for _, arg := range c.abi.Events[event].Inputs {
if arg.Indexed {
indexed = append(indexed, arg)
}
}
return parseTopics(out, indexed, log.Topics[1:])
}

// ensureContext is a helper method to ensure a context is not nil, even if the
// user specified it as such.
func ensureContext(ctx context.Context) context.Context {
if ctx == nil {
return context.TODO()
Expand Down
Loading