diff --git a/indexer/indexer.go b/indexer/indexer.go index f2c7ee996b426..9647e93b959d8 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -183,7 +183,6 @@ func NewIndexer(cfg Config, gitVersion string) (*Indexer, error) { ConfDepth: cfg.ConfDepth, MaxHeaderBatchSize: cfg.MaxHeaderBatchSize, StartBlockNumber: uint64(0), - StartBlockHash: cfg.L2GenesisBlockHash, }) if err != nil { return nil, err @@ -211,7 +210,6 @@ func (b *Indexer) Serve() error { b.router.HandleFunc("/v1/l1/status", b.l1IndexingService.GetIndexerStatus).Methods("GET") b.router.HandleFunc("/v1/l2/status", b.l2IndexingService.GetIndexerStatus).Methods("GET") b.router.HandleFunc("/v1/deposits/0x{address:[a-fA-F0-9]{40}}", b.l1IndexingService.GetDeposits).Methods("GET") - b.router.HandleFunc("/v1/withdrawal/0x{hash:[a-fA-F0-9]{64}}", b.l2IndexingService.GetWithdrawalStatus).Methods("GET") b.router.HandleFunc("/v1/withdrawals/0x{address:[a-fA-F0-9]{40}}", b.l2IndexingService.GetWithdrawals).Methods("GET") b.router.HandleFunc("/v1/airdrops/0x{address:[a-fA-F0-9]{40}}", b.airdropService.GetAirdrop) b.router.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { diff --git a/indexer/services/l2/bridge/bridge.go b/indexer/services/l2/bridge/bridge.go index 49e4c243d8101..a672fc2ff8553 100644 --- a/indexer/services/l2/bridge/bridge.go +++ b/indexer/services/l2/bridge/bridge.go @@ -7,75 +7,76 @@ import ( "github.com/ethereum-optimism/optimism/indexer/db" "github.com/ethereum-optimism/optimism/op-bindings/bindings" + "github.com/ethereum-optimism/optimism/op-bindings/predeploys" + "github.com/ethereum/go-ethereum/ethclient" - "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" ) -type DepositsMap map[common.Hash][]db.Deposit // Finalizations type WithdrawalsMap map[common.Hash][]db.Withdrawal type Bridge interface { Address() common.Address - GetDepositsByBlockRange(uint64, uint64) (DepositsMap, error) - GetWithdrawalsByBlockRange(uint64, uint64) (WithdrawalsMap, error) + GetWithdrawalsByBlockRange(context.Context, uint64, uint64) (WithdrawalsMap, error) String() string } type implConfig struct { name string impl string - addr string + addr common.Address } -var defaultBridgeCfgs = map[uint64][]*implConfig{ - // Devnet - 901: { - {"Standard", "StandardBridge", L2StandardBridgeAddr}, - }, - // Goerli Alpha Testnet - 28528: { - {"Standard", "StandardBridge", L2StandardBridgeAddr}, - }, +var defaultBridgeCfgs = []*implConfig{ + {"Standard", "StandardBridge", predeploys.L2StandardBridgeAddr}, } var customBridgeCfgs = map[uint64][]*implConfig{ // Mainnet 10: { - {"BitBTC", StandardBridgeImpl, "0x158F513096923fF2d3aab2BcF4478536de6725e2"}, + {"BitBTC", StandardBridgeImpl, common.HexToAddress("0x158F513096923fF2d3aab2BcF4478536de6725e2")}, //{"DAI", "DAIBridge", "0x467194771dAe2967Aef3ECbEDD3Bf9a310C76C65"}, + {"wstETH", StandardBridgeImpl, common.HexToAddress("0x8E01013243a96601a86eb3153F0d9Fa4fbFb6957")}, }, // Kovan 69: { - {"BitBTC", StandardBridgeImpl, "0x0CFb46528a7002a7D8877a5F7a69b9AaF1A9058e"}, - {"USX", StandardBridgeImpl, "0xB4d37826b14Cd3CB7257A2A5094507d701fe715f"}, + {"BitBTC", StandardBridgeImpl, common.HexToAddress("0x0CFb46528a7002a7D8877a5F7a69b9AaF1A9058e")}, + {"USX", StandardBridgeImpl, common.HexToAddress("0xB4d37826b14Cd3CB7257A2A5094507d701fe715f")}, + {"wstETH", StandardBridgeImpl, common.HexToAddress("0x2E34e7d705AfaC3C4665b6feF31Aa394A1c81c92")}, //{"DAI", " DAIBridge", "0x467194771dAe2967Aef3ECbEDD3Bf9a310C76C65"}, }, } -func BridgesByChainID(chainID *big.Int, client bind.ContractFilterer, ctx context.Context) (map[string]Bridge, error) { +func BridgesByChainID(chainID *big.Int, client *ethclient.Client, isBedrock bool) (map[string]Bridge, error) { allCfgs := make([]*implConfig, 0) - allCfgs = append(allCfgs, defaultBridgeCfgs[chainID.Uint64()]...) + allCfgs = append(allCfgs, defaultBridgeCfgs...) allCfgs = append(allCfgs, customBridgeCfgs[chainID.Uint64()]...) + var l2L1MP *bindings.L2ToL1MessagePasser + var err error + if isBedrock { + l2L1MP, err = bindings.NewL2ToL1MessagePasser(predeploys.L2ToL1MessagePasserAddr, client) + if err != nil { + return nil, err + } + } + bridges := make(map[string]Bridge) for _, bridge := range allCfgs { switch bridge.impl { case "StandardBridge": - l2StandardBridgeAddress := common.HexToAddress(bridge.addr) - l2StandardBridgeFilter, err := bindings.NewL2StandardBridgeFilterer(l2StandardBridgeAddress, client) + l2SB, err := bindings.NewL2StandardBridge(bridge.addr, client) if err != nil { return nil, err } - - standardBridge := &StandardBridge{ - name: bridge.name, - ctx: ctx, - address: l2StandardBridgeAddress, - client: client, - filterer: l2StandardBridgeFilter, + bridges[bridge.name] = &StandardBridge{ + name: bridge.name, + address: bridge.addr, + client: client, + l2SB: l2SB, + l2L1MP: l2L1MP, + isBedrock: isBedrock, } - bridges[bridge.name] = standardBridge default: return nil, errors.New("unsupported bridge") } diff --git a/indexer/services/l2/bridge/filter.go b/indexer/services/l2/bridge/filter.go deleted file mode 100644 index 4ab39493adb14..0000000000000 --- a/indexer/services/l2/bridge/filter.go +++ /dev/null @@ -1,45 +0,0 @@ -package bridge - -import ( - "context" - "time" - - "github.com/ethereum-optimism/optimism/op-bindings/bindings" - "github.com/ethereum/go-ethereum/accounts/abi/bind" -) - -// clientRetryInterval is the interval to wait between retrying client API -// calls. -var clientRetryInterval = 5 * time.Second - -// FilterWithdrawalInitiatedWithRetry retries the given func until it succeeds, -// waiting for clientRetryInterval duration after every call. -func FilterWithdrawalInitiatedWithRetry(ctx context.Context, filterer *bindings.L2StandardBridgeFilterer, opts *bind.FilterOpts) (*bindings.L2StandardBridgeWithdrawalInitiatedIterator, error) { - for { - ctxt, cancel := context.WithTimeout(ctx, DefaultConnectionTimeout) - opts.Context = ctxt - res, err := filterer.FilterWithdrawalInitiated(opts, nil, nil, nil) - cancel() - if err == nil { - return res, nil - } - logger.Error("Error fetching filter", "err", err) - time.Sleep(clientRetryInterval) - } -} - -// FilterDepositFinalizedWithRetry retries the given func until it succeeds, -// waiting for clientRetryInterval duration after every call. -func FilterDepositFinalizedWithRetry(ctx context.Context, filterer *bindings.L2StandardBridgeFilterer, opts *bind.FilterOpts) (*bindings.L2StandardBridgeDepositFinalizedIterator, error) { - for { - ctxt, cancel := context.WithTimeout(ctx, DefaultConnectionTimeout) - opts.Context = ctxt - res, err := filterer.FilterDepositFinalized(opts, nil, nil, nil) - cancel() - if err == nil { - return res, nil - } - logger.Error("Error fetching filter", "err", err) - time.Sleep(clientRetryInterval) - } -} diff --git a/indexer/services/l2/bridge/standard_bridge.go b/indexer/services/l2/bridge/standard_bridge.go index fdbed6863b61f..51f3ae83a0b6d 100644 --- a/indexer/services/l2/bridge/standard_bridge.go +++ b/indexer/services/l2/bridge/standard_bridge.go @@ -5,83 +5,112 @@ import ( "github.com/ethereum-optimism/optimism/indexer/db" "github.com/ethereum-optimism/optimism/op-bindings/bindings" + "github.com/ethereum-optimism/optimism/op-node/withdrawals" + "github.com/ethereum-optimism/optimism/op-service/backoff" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" ) type StandardBridge struct { - name string - ctx context.Context - address common.Address - client bind.ContractFilterer - filterer *bindings.L2StandardBridgeFilterer + name string + address common.Address + client *ethclient.Client + l2SB *bindings.L2StandardBridge + l2L1MP *bindings.L2ToL1MessagePasser + isBedrock bool } func (s *StandardBridge) Address() common.Address { return s.address } -func (s *StandardBridge) GetDepositsByBlockRange(start, end uint64) (DepositsMap, error) { - depositsByBlockhash := make(DepositsMap) +func (s *StandardBridge) GetWithdrawalsByBlockRange(ctx context.Context, start, end uint64) (WithdrawalsMap, error) { + withdrawalsByBlockhash := make(map[common.Hash][]db.Withdrawal) + opts := &bind.FilterOpts{ + Context: ctx, + Start: start, + End: &end, + } - iter, err := FilterDepositFinalizedWithRetry(s.ctx, s.filterer, &bind.FilterOpts{ - Start: start, - End: &end, + var iter *bindings.L2StandardBridgeWithdrawalInitiatedIterator + err := backoff.Do(3, backoff.Exponential(), func() error { + var err error + iter, err = s.l2SB.FilterWithdrawalInitiated(opts, nil, nil, nil) + return err }) if err != nil { - logger.Error("Error fetching filter", "err", err) + return nil, err } + receipts := make(map[common.Hash]*types.Receipt) + defer iter.Close() for iter.Next() { - depositsByBlockhash[iter.Event.Raw.BlockHash] = append( - depositsByBlockhash[iter.Event.Raw.BlockHash], db.Deposit{ - TxHash: iter.Event.Raw.TxHash, - L1Token: iter.Event.L1Token, - L2Token: iter.Event.L2Token, - FromAddress: iter.Event.From, - ToAddress: iter.Event.To, - Amount: iter.Event.Amount, - Data: iter.Event.ExtraData, - LogIndex: iter.Event.Raw.Index, - }) - } - if err := iter.Error(); err != nil { - return nil, err - } + ev := iter.Event + if s.isBedrock { + receipt := receipts[ev.Raw.TxHash] + if receipt == nil { + receipt, err = s.client.TransactionReceipt(ctx, ev.Raw.TxHash) + if err != nil { + return nil, err + } + receipts[ev.Raw.TxHash] = receipt + } - return depositsByBlockhash, nil -} + var withdrawalInitiated *bindings.L2ToL1MessagePasserMessagePassed + for _, eLog := range receipt.Logs { + if len(eLog.Topics) == 0 || eLog.Topics[0] != withdrawals.MessagePassedTopic { + continue + } -func (s *StandardBridge) GetWithdrawalsByBlockRange(start, end uint64) (WithdrawalsMap, error) { - withdrawalsByBlockhash := make(map[common.Hash][]db.Withdrawal) + if withdrawalInitiated != nil { + logger.Warn("detected multiple withdrawal initiated events! ignoring", "tx_hash", ev.Raw.TxHash) + continue + } - iter, err := FilterWithdrawalInitiatedWithRetry(s.ctx, s.filterer, &bind.FilterOpts{ - Start: start, - End: &end, - }) - if err != nil { - logger.Error("Error fetching filter", "err", err) - } + withdrawalInitiated, err = s.l2L1MP.ParseMessagePassed(*eLog) + if err != nil { + return nil, err + } + } - for iter.Next() { - withdrawalsByBlockhash[iter.Event.Raw.BlockHash] = append( - withdrawalsByBlockhash[iter.Event.Raw.BlockHash], db.Withdrawal{ - TxHash: iter.Event.Raw.TxHash, - L1Token: iter.Event.L1Token, - L2Token: iter.Event.L2Token, - FromAddress: iter.Event.From, - ToAddress: iter.Event.To, - Amount: iter.Event.Amount, - Data: iter.Event.ExtraData, - LogIndex: iter.Event.Raw.Index, - }) - } - if err := iter.Error(); err != nil { - return nil, err + hash, err := withdrawals.WithdrawalHash(withdrawalInitiated) + if err != nil { + return nil, err + } + + withdrawalsByBlockhash[ev.Raw.BlockHash] = append( + withdrawalsByBlockhash[ev.Raw.BlockHash], db.Withdrawal{ + TxHash: ev.Raw.TxHash, + L1Token: ev.L1Token, + L2Token: ev.L2Token, + FromAddress: ev.From, + ToAddress: ev.To, + Amount: ev.Amount, + Data: ev.ExtraData, + LogIndex: ev.Raw.Index, + BedrockHash: &hash, + }, + ) + } else { + withdrawalsByBlockhash[ev.Raw.BlockHash] = append( + withdrawalsByBlockhash[ev.Raw.BlockHash], db.Withdrawal{ + TxHash: ev.Raw.TxHash, + L1Token: ev.L1Token, + L2Token: ev.L2Token, + FromAddress: ev.From, + ToAddress: ev.To, + Amount: ev.Amount, + Data: ev.ExtraData, + LogIndex: ev.Raw.Index, + }, + ) + } } - return withdrawalsByBlockhash, nil + return withdrawalsByBlockhash, iter.Error() } func (s *StandardBridge) String() string { diff --git a/indexer/services/l2/confirmed_headers.go b/indexer/services/l2/confirmed_headers.go index 4e6811341701f..72b9e75c3eafd 100644 --- a/indexer/services/l2/confirmed_headers.go +++ b/indexer/services/l2/confirmed_headers.go @@ -147,8 +147,7 @@ func (f *ConfirmedHeaderSelector) NewHead( return headers, nil } -func NewConfirmedHeaderSelector(cfg HeaderSelectorConfig) (*ConfirmedHeaderSelector, - error) { +func NewConfirmedHeaderSelector(cfg HeaderSelectorConfig) (*ConfirmedHeaderSelector, error) { if cfg.ConfDepth == 0 { return nil, errors.New("ConfDepth must be greater than zero") } diff --git a/indexer/services/l2/query.go b/indexer/services/l2/query.go deleted file mode 100644 index 4e5d65f1f799b..0000000000000 --- a/indexer/services/l2/query.go +++ /dev/null @@ -1,38 +0,0 @@ -package l2 - -import ( - "github.com/ethereum-optimism/optimism/indexer/db" - - "github.com/ethereum-optimism/optimism/op-bindings/bindings" - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" -) - -func QueryERC20(address common.Address, client *ethclient.Client) (*db.Token, error) { - contract, err := bindings.NewERC20(address, client) - if err != nil { - return nil, err - } - - name, err := contract.Name(&bind.CallOpts{}) - if err != nil { - return nil, err - } - - symbol, err := contract.Symbol(&bind.CallOpts{}) - if err != nil { - return nil, err - } - - decimals, err := contract.Decimals(&bind.CallOpts{}) - if err != nil { - return nil, err - } - - return &db.Token{ - Name: name, - Symbol: symbol, - Decimals: decimals, - }, nil -} diff --git a/indexer/services/l2/service.go b/indexer/services/l2/service.go index 2d96444d02028..ae003eab67219 100644 --- a/indexer/services/l2/service.go +++ b/indexer/services/l2/service.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum-optimism/optimism/indexer/metrics" "github.com/ethereum-optimism/optimism/indexer/server" + "github.com/ethereum-optimism/optimism/indexer/services/query" "github.com/prometheus/client_golang/prometheus" "github.com/ethereum-optimism/optimism/indexer/db" @@ -38,37 +39,18 @@ var errWrongChainID = errors.New("wrong chain id provided") var errNoNewBlocks = errors.New("no new blocks") -// clientRetryInterval is the interval to wait between retrying client API -// calls. -var clientRetryInterval = 5 * time.Second - -// HeaderByNumberWithRetry retries the given func until it succeeds, waiting -// for clientRetryInterval duration after every call. -func HeaderByNumberWithRetry(ctx context.Context, - client *ethclient.Client) (*types.Header, error) { - for { - res, err := client.HeaderByNumber(ctx, nil) - switch err { - case nil: - return res, err - default: - log.Error("Error fetching header", "err", err) - } - time.Sleep(clientRetryInterval) - } -} - type ServiceConfig struct { - Context context.Context - Metrics *metrics.Metrics - L2RPC *rpc.Client - L2Client *ethclient.Client - ChainID *big.Int + Context context.Context + Metrics *metrics.Metrics + L2RPC *rpc.Client + L2Client *ethclient.Client + ChainID *big.Int + ConfDepth uint64 MaxHeaderBatchSize uint64 StartBlockNumber uint64 - StartBlockHash string DB *db.Database + Bedrock bool } type Service struct { @@ -113,7 +95,7 @@ func NewService(cfg ServiceConfig) (*Service, error) { cfg.ChainID = chainID } - bridges, err := bridge.BridgesByChainID(cfg.ChainID, cfg.L2Client, ctx) + bridges, err := bridge.BridgesByChainID(cfg.ChainID, cfg.L2Client, cfg.Bedrock) if err != nil { cancel() return nil, err @@ -131,7 +113,7 @@ func NewService(cfg ServiceConfig) (*Service, error) { return nil, err } - return &Service{ + service := &Service{ cfg: cfg, ctx: ctx, cancel: cancel, @@ -141,12 +123,16 @@ func NewService(cfg ServiceConfig) (*Service, error) { tokenCache: map[common.Address]*db.Token{ db.ETHL2Address: db.ETHL1Token, }, - }, nil + } + service.wg.Add(1) + return service, nil } -func (s *Service) Loop(ctx context.Context) { +func (s *Service) loop() { + defer s.wg.Done() + for { - err := s.catchUp(ctx) + err := s.catchUp() if err == nil { break } @@ -160,10 +146,18 @@ func (s *Service) Loop(ctx context.Context) { } newHeads := make(chan *types.Header, 1000) - go s.subscribeNewHeads(ctx, newHeads) + tick := time.NewTicker(5 * time.Second) + defer tick.Stop() for { select { + case <-tick.C: + header, err := query.HeaderByNumberWithRetry(s.ctx, s.cfg.L2Client) + if err != nil { + logger.Error("error fetching header by number", "err", err) + continue + } + newHeads <- header case header := <-newHeads: logger.Info("Received new header", "header", header.Hash) for { @@ -176,6 +170,7 @@ func (s *Service) Loop(ctx context.Context) { } } case <-s.ctx.Done(): + logger.Info("service stopped") return } } @@ -184,7 +179,6 @@ func (s *Service) Loop(ctx context.Context) { func (s *Service) Update(newHeader *types.Header) error { var lowest = db.BlockLocator{ Number: s.cfg.StartBlockNumber, - Hash: common.HexToHash(s.cfg.StartBlockHash), } highestConfirmed, err := s.cfg.DB.GetHighestL2Block() if err != nil { @@ -209,7 +203,7 @@ func (s *Service) Update(newHeader *types.Header) error { return nil } - if lowest.Hash != headers[0].ParentHash { + if lowest.Number > 0 && lowest.Hash != headers[0].ParentHash { logger.Error("Parent hash does not connect to ", "block", headers[0].Number.Uint64(), "hash", headers[0].Hash(), "lowest_block", lowest.Number, "hash", lowest.Hash) @@ -218,7 +212,6 @@ func (s *Service) Update(newHeader *types.Header) error { startHeight := headers[0].Number.Uint64() endHeight := headers[len(headers)-1].Number.Uint64() - depositsByBlockHash := make(map[common.Hash][]db.Deposit) withdrawalsByBlockHash := make(map[common.Hash][]db.Withdrawal) start := prometheus.NewTimer(s.metrics.UpdateDuration.WithLabelValues("l2")) @@ -227,21 +220,12 @@ func (s *Service) Update(newHeader *types.Header) error { logger.Info("updated index", "start_height", startHeight, "end_height", endHeight, "duration", dur) }() - bridgeDepositsCh := make(chan bridge.DepositsMap, len(s.bridges)) bridgeWdsCh := make(chan bridge.WithdrawalsMap) errCh := make(chan error, len(s.bridges)) for _, bridgeImpl := range s.bridges { go func(b bridge.Bridge) { - deposits, err := b.GetDepositsByBlockRange(startHeight, endHeight) - if err != nil { - errCh <- err - return - } - bridgeDepositsCh <- deposits - }(bridgeImpl) - go func(b bridge.Bridge) { - wds, err := b.GetWithdrawalsByBlockRange(startHeight, endHeight) + wds, err := b.GetWithdrawalsByBlockRange(s.ctx, startHeight, endHeight) if err != nil { errCh <- err return @@ -256,29 +240,19 @@ func (s *Service) Update(newHeader *types.Header) error { case bridgeWds := <-bridgeWdsCh: for blockHash, withdrawals := range bridgeWds { for _, wd := range withdrawals { - if err := s.cacheToken(wd.L2Token); err != nil { + if err := s.cacheToken(wd); err != nil { logger.Warn("error caching token", "err", err) } } withdrawalsByBlockHash[blockHash] = append(withdrawalsByBlockHash[blockHash], withdrawals...) } - case bridgeDeposits := <-bridgeDepositsCh: - for blockHash, deposits := range bridgeDeposits { - for _, deposit := range deposits { - if err := s.cacheToken(deposit.L2Token); err != nil { - logger.Warn("error caching token", "err", err) - } - } - - depositsByBlockHash[blockHash] = append(depositsByBlockHash[blockHash], deposits...) - } case err := <-errCh: return err } receives++ - if receives == 2*len(s.bridges) { + if receives == len(s.bridges) { break } } @@ -356,8 +330,21 @@ func (s *Service) GetIndexerStatus(w http.ResponseWriter, r *http.Request) { server.RespondWithJSON(w, http.StatusOK, status) } -func (s *Service) GetWithdrawalStatus(w http.ResponseWriter, r *http.Request) { - // Temporary stub until rest of indexer is landed +func (s *Service) GetWithdrawalBatch(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + hash := vars["hash"] + if hash == "" { + server.RespondWithError(w, http.StatusBadRequest, "must specify a hash") + return + } + + batch, err := s.cfg.DB.GetWithdrawalBatch(common.HexToHash(vars["hash"])) + if err != nil { + server.RespondWithError(w, http.StatusInternalServerError, err.Error()) + return + } + + server.RespondWithJSON(w, http.StatusOK, batch) } func (s *Service) GetWithdrawals(w http.ResponseWriter, r *http.Request) { @@ -380,12 +367,14 @@ func (s *Service) GetWithdrawals(w http.ResponseWriter, r *http.Request) { return } + finalizationState := db.ParseFinalizationState(r.URL.Query().Get("finalized")) + page := db.PaginationParam{ - Limit: uint64(limit), - Offset: uint64(offset), + Limit: limit, + Offset: offset, } - withdrawals, err := s.cfg.DB.GetWithdrawalsByAddress(common.HexToAddress(vars["address"]), page, db.FinalizationStateAny) + withdrawals, err := s.cfg.DB.GetWithdrawalsByAddress(common.HexToAddress(vars["address"]), page, finalizationState) if err != nil { server.RespondWithError(w, http.StatusInternalServerError, err.Error()) return @@ -394,25 +383,8 @@ func (s *Service) GetWithdrawals(w http.ResponseWriter, r *http.Request) { server.RespondWithJSON(w, http.StatusOK, withdrawals) } -func (s *Service) subscribeNewHeads(ctx context.Context, heads chan *types.Header) { - tick := time.NewTicker(5 * time.Second) - - for { - select { - case <-tick.C: - header, err := HeaderByNumberWithRetry(ctx, s.cfg.L2Client) - if err != nil { - logger.Error("error fetching header by number", "err", err) - } - heads <- header - case <-ctx.Done(): - return - } - } -} - -func (s *Service) catchUp(ctx context.Context) error { - realHead, err := HeaderByNumberWithRetry(ctx, s.cfg.L2Client) +func (s *Service) catchUp() error { + realHead, err := query.HeaderByNumberWithRetry(s.ctx, s.cfg.L2Client) if err != nil { return err } @@ -436,8 +408,8 @@ func (s *Service) catchUp(ctx context.Context) error { for realHeadNum-s.cfg.ConfDepth > currHeadNum+s.cfg.MaxHeaderBatchSize { select { - case <-ctx.Done(): - return context.Canceled + case <-s.ctx.Done(): + return s.ctx.Err() default: if err := s.Update(realHead); err != nil && err != errNoNewBlocks { return err @@ -455,32 +427,32 @@ func (s *Service) catchUp(ctx context.Context) error { return nil } -func (s *Service) cacheToken(address common.Address) error { - if s.tokenCache[address] != nil { +func (s *Service) cacheToken(withdrawal db.Withdrawal) error { + if s.tokenCache[withdrawal.L2Token] != nil { return nil } - token, err := s.cfg.DB.GetL2TokenByAddress(address.String()) + token, err := s.cfg.DB.GetL2TokenByAddress(withdrawal.L2Token.String()) if err != nil { return err } if token != nil { s.metrics.IncL2CachedTokensCount() - s.tokenCache[address] = token + s.tokenCache[withdrawal.L2Token] = token return nil } - token, err = QueryERC20(address, s.cfg.L2Client) + token, err = query.NewERC20(withdrawal.L2Token, s.cfg.L2Client) if err != nil { logger.Error("Error querying ERC20 token details", - "l2_token", address.String(), "err", err) + "l2_token", withdrawal.L2Token.String(), "err", err) token = &db.Token{ - Address: address.String(), + Address: withdrawal.L2Token.String(), } } - if err := s.cfg.DB.AddL2Token(address.String(), token); err != nil { + if err := s.cfg.DB.AddL2Token(withdrawal.L2Token.String(), token); err != nil { return err } - s.tokenCache[address] = token + s.tokenCache[withdrawal.L2Token] = token s.metrics.IncL2CachedTokensCount() return nil } @@ -489,16 +461,11 @@ func (s *Service) Start() error { if s.cfg.ChainID == nil { return errNoChainID } - s.wg.Add(1) - go s.Loop(s.ctx) + go s.loop() return nil } func (s *Service) Stop() { s.cancel() s.wg.Wait() - err := s.cfg.DB.Close() - if err != nil { - logger.Error("Error closing db", "err", err) - } }