Skip to content
This repository was archived by the owner on Aug 31, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
44 changes: 22 additions & 22 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@

[[constraint]]
name = "github.com/ethereum/go-ethereum"
version = "1.8.18"
version = "1.8.21"
52 changes: 49 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,11 @@ false

If you have full rinkeby chaindata you can move it to `rinkeby_vulcanizedb_geth_data` docker volume to skip long wait of sync.

## omniWatcher and lightOmniWatcher
These commands require a pre-synced (full or light) vulcanizeDB (see above sections)
## omniWatcher
This command allows for generic watching of any Ethereum contract provided only the contract's address and additional optional filtering information
Currently the contract's ABI must be available on etherscan or manually added
This command will index watched events and polled methods using Postgres tables automatically generated from the ABI
This command requires a pre-synced (full or light) vulcanizeDB (see above sections)

To watch all events of a contract using a light synced vDB:
- Execute `./vulcanizedb omniWatcher --config <path to config.toml> --contract-address <contract address>`
Expand Down Expand Up @@ -170,4 +173,47 @@ To watch all types of events of the contract but only persist the ones that emit

To watch all events of the contract but only poll the specified method with specified argument values (if they are emitted from the watched events):
- Execute `./vulcanizedb omniWatcher --config <path to config.toml> --contract-address <contract address> --methods <methodName> --method-args <arg1> --method-args <arg2>`


### omniWatcher output

Watched events and methods are transformed and persisted in auto-generated Postgres schemas and tables
Schemas are created for each contract, with the naming convention `<sync-type>_<lowercase contract-address>`
e.g. for the TrueUSD contract in lightSync mode we would generate a schema named `light_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e`
Under this schema, tables are generated for watched events as `<lowercase event name>_event` and for polled methods as `<lowercase method name>_method`
e.g. if we watch Transfer and Mint events of the TrueUSD contract and poll its balanceOf method using the addresses we find emitted from those events we produce a schema with three tables:

`light_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e.transfer_event`
`light_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e.mint_event`
`light_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e.balanceof_method`

The 'method' and 'event' identifiers are tacked onto the end of the table names to prevent collisions between methods and events of the same name

Column id's and types are also auto-generated based on the event and method argument names, resulting in tables such as

Table "light_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e.transfer_event"

| Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
|:----------:|:---------------------:|:---------:|:--------:|:-------------------------------------------------------------------------------------------:|:--------:|:------------:|:-----------:|
| id | integer | | not null | nextval('light_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e.transfer_event_id_seq'::regclass) | plain | | |
| header_id | integer | | not null | | plain | | |
| token_name | character varying(66) | | not null | | extended | | |
| raw_log | jsonb | | | | extended | | |
| log_idx | integer | | not null | | plain | | |
| tx_idx | integer | | not null | | plain | | |
| from_ | character varying(66) | | not null | | extended | | |
| to_ | character varying(66) | | not null | | extended | | |
| value_ | numeric | | not null | | main | | |

and

Table "light_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e.balanceof_method"

| Column | Type | Collation | Nullable | Default | Storage | Stats target | Description |
|:----------:|:---------------------:|:---------:|:--------:|:-------------------------------------------------------------------------------------------:|:--------:|:------------:|:-----------:|
| id | integer | | not null | nextval('light_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e.balanceof_method_id_seq'::regclass) | plain | | |
| token_name | character varying(66) | | not null | | extended | | |
| block | integer | | not null | | plain | | |
| who_ | character varying(66) | | not null | | extended | | |
| returned | numeric | | not null | | main | | |

The addition of '_' after table names is to prevent collisions with reserved Postgres words
6 changes: 5 additions & 1 deletion pkg/geth/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ func (blockChain *BlockChain) FetchContractData(abiJSON string, address string,
if err != nil {
return err
}
output, err := blockChain.callContract(address, input, big.NewInt(blockNumber))
var bn *big.Int
if blockNumber > 0 {
bn = big.NewInt(blockNumber)
}
output, err := blockChain.callContract(address, input, bn)
if err != nil {
return err
}
Expand Down
46 changes: 38 additions & 8 deletions pkg/omni/light/repository/header_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func NewHeaderRepository(db *postgres.DB) *headerRepository {
}
}

// Adds a checked_header column for the provided column id
func (r *headerRepository) AddCheckColumn(id string) error {
// Check cache to see if column already exists before querying pg
_, ok := r.columns.Get(id)
Expand All @@ -73,6 +74,7 @@ func (r *headerRepository) AddCheckColumn(id string) error {
return nil
}

// Adds a checked_header column for all of the provided column ids
func (r *headerRepository) AddCheckColumns(ids []string) error {
var err error
baseQuery := "ALTER TABLE public.checked_headers"
Expand All @@ -96,6 +98,7 @@ func (r *headerRepository) AddCheckColumns(ids []string) error {
return err
}

// Marks the header checked for the provided column id
func (r *headerRepository) MarkHeaderChecked(headerID int64, id string) error {
_, err := r.db.Exec(`INSERT INTO public.checked_headers (header_id, `+id+`)
VALUES ($1, $2)
Expand All @@ -105,6 +108,7 @@ func (r *headerRepository) MarkHeaderChecked(headerID int64, id string) error {
return err
}

// Marks the header checked for all of the provided column ids
func (r *headerRepository) MarkHeaderCheckedForAll(headerID int64, ids []string) error {
pgStr := "INSERT INTO public.checked_headers (header_id, "
for _, id := range ids {
Expand All @@ -124,6 +128,7 @@ func (r *headerRepository) MarkHeaderCheckedForAll(headerID int64, ids []string)
return err
}

// Marks all of the provided headers checked for each of the provided column ids
func (r *headerRepository) MarkHeadersCheckedForAll(headers []core.Header, ids []string) error {
tx, err := r.db.Begin()
if err != nil {
Expand Down Expand Up @@ -154,6 +159,7 @@ func (r *headerRepository) MarkHeadersCheckedForAll(headers []core.Header, ids [
return tx.Commit()
}

// Returns missing headers for the provided checked_headers column id
func (r *headerRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64, id string) ([]core.Header, error) {
var result []core.Header
var query string
Expand All @@ -165,7 +171,7 @@ func (r *headerRepository) MissingHeaders(startingBlockNumber, endingBlockNumber
WHERE (header_id ISNULL OR ` + id + ` IS FALSE)
AND headers.block_number >= $1
AND headers.eth_node_fingerprint = $2
ORDER BY headers.block_number`
ORDER BY headers.block_number LIMIT 100`
err = r.db.Select(&result, query, startingBlockNumber, r.db.Node.ID)
} else {
query = `SELECT headers.id, headers.block_number, headers.hash FROM headers
Expand All @@ -174,13 +180,14 @@ func (r *headerRepository) MissingHeaders(startingBlockNumber, endingBlockNumber
AND headers.block_number >= $1
AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $3
ORDER BY headers.block_number`
ORDER BY headers.block_number LIMIT 100`
err = r.db.Select(&result, query, startingBlockNumber, endingBlockNumber, r.db.Node.ID)
}

return result, err
return contiguousHeaders(result, startingBlockNumber), err
}

// Returns missing headers for all of the provided checked_headers column ids
func (r *headerRepository) MissingHeadersForAll(startingBlockNumber, endingBlockNumber int64, ids []string) ([]core.Header, error) {
var result []core.Header
var query string
Expand All @@ -196,21 +203,42 @@ func (r *headerRepository) MissingHeadersForAll(startingBlockNumber, endingBlock
if endingBlockNumber == -1 {
endStr := `) AND headers.block_number >= $1
AND headers.eth_node_fingerprint = $2
ORDER BY headers.block_number`
ORDER BY headers.block_number LIMIT 100`
query = baseQuery + endStr
err = r.db.Select(&result, query, startingBlockNumber, r.db.Node.ID)
} else {
endStr := `) AND headers.block_number >= $1
AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $3
ORDER BY headers.block_number`
ORDER BY headers.block_number LIMIT 100`
query = baseQuery + endStr
err = r.db.Select(&result, query, startingBlockNumber, endingBlockNumber, r.db.Node.ID)
}

return result, err
return contiguousHeaders(result, startingBlockNumber), err
}

// Takes in an ordered sequence of headers and returns only the first contiguous segment
// Enforce continuity with previous segment with the appropriate startingBlockNumber
func contiguousHeaders(headers []core.Header, startingBlockNumber int64) []core.Header {
if len(headers) < 1 {
return headers
}
previousHeader := headers[0].BlockNumber
if previousHeader != startingBlockNumber {
return []core.Header{}
}
for i := 1; i < len(headers); i++ {
previousHeader++
if headers[i].BlockNumber != previousHeader {
return headers[:i]
}
}

return headers
}

// Returns headers that have been checked for all of the provided event ids but not for the provided method ids
func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlockNumber, endingBlockNumber int64, methodIds, eventIds []string) ([]core.Header, error) {
var result []core.Header
var query string
Expand All @@ -231,25 +259,27 @@ func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlock
if endingBlockNumber == -1 {
endStr := `AND headers.block_number >= $1
AND headers.eth_node_fingerprint = $2
ORDER BY headers.block_number`
ORDER BY headers.block_number LIMIT 100`
query = baseQuery + endStr
err = r.db.Select(&result, query, startingBlockNumber, r.db.Node.ID)
} else {
endStr := `AND headers.block_number >= $1
AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $3
ORDER BY headers.block_number`
ORDER BY headers.block_number LIMIT 100`
query = baseQuery + endStr
err = r.db.Select(&result, query, startingBlockNumber, endingBlockNumber, r.db.Node.ID)
}

return result, err
}

// Check the repositories column id cache for a value
func (r *headerRepository) CheckCache(key string) (interface{}, bool) {
return r.columns.Get(key)
}

// Used to mark a header checked as part of some external transaction so as to group into one commit
func MarkHeaderCheckedInTransaction(headerID int64, tx *sql.Tx, eventID string) error {
_, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+eventID+`)
VALUES ($1, $2)
Expand Down
Loading