diff --git a/.changeset/kind-drinks-dance.md b/.changeset/kind-drinks-dance.md new file mode 100644 index 00000000000..850b2fdc620 --- /dev/null +++ b/.changeset/kind-drinks-dance.md @@ -0,0 +1,5 @@ +--- +"@eth-optimism/l2geth": minor +--- + +Implement the SyncService spec diff --git a/l2geth/cmd/geth/main.go b/l2geth/cmd/geth/main.go index ff0ad41f306..d2da7bec986 100644 --- a/l2geth/cmd/geth/main.go +++ b/l2geth/cmd/geth/main.go @@ -160,6 +160,7 @@ var ( utils.RollupEnableVerifierFlag, utils.RollupAddressManagerOwnerAddressFlag, utils.RollupTimstampRefreshFlag, + utils.RollupSyncTypeFlag, utils.RollupPollIntervalFlag, utils.RollupStateDumpPathFlag, utils.RollupDiffDbFlag, diff --git a/l2geth/cmd/geth/usage.go b/l2geth/cmd/geth/usage.go index c59d214dbf5..51b61588f2d 100644 --- a/l2geth/cmd/geth/usage.go +++ b/l2geth/cmd/geth/usage.go @@ -74,6 +74,7 @@ var AppHelpFlagGroups = []flagGroup{ utils.RollupAddressManagerOwnerAddressFlag, utils.RollupEnableVerifierFlag, utils.RollupTimstampRefreshFlag, + utils.RollupSyncTypeFlag, utils.RollupPollIntervalFlag, utils.RollupStateDumpPathFlag, utils.RollupDiffDbFlag, diff --git a/l2geth/cmd/utils/flags.go b/l2geth/cmd/utils/flags.go index 432599ae4b8..05561ac1d9f 100644 --- a/l2geth/cmd/utils/flags.go +++ b/l2geth/cmd/utils/flags.go @@ -849,6 +849,12 @@ var ( Value: time.Minute * 15, EnvVar: "ROLLUP_TIMESTAMP_REFRESH", } + RollupSyncTypeFlag = cli.StringFlag{ + Name: "rollup.synctype", + Usage: "Transaction sync source", + Value: "batched", + EnvVar: "ROLLUP_SYNC_TYPE", + } // Flag to enable verifier mode RollupEnableVerifierFlag = cli.BoolFlag{ Name: "rollup.verifier", @@ -1161,6 +1167,14 @@ func setRollup(ctx *cli.Context, cfg *rollup.Config) { if ctx.GlobalIsSet(RollupL1GasPriceFlag.Name) { cfg.L1GasPrice = GlobalBig(ctx, RollupL1GasPriceFlag.Name) } + if ctx.GlobalIsSet(RollupSyncTypeFlag.Name) { + typ, err := rollup.NewSyncType(ctx.GlobalString(RollupSyncTypeFlag.Name)) + if err != nil { + log.Error("Configured with unknown sync type") + typ, _ = rollup.NewSyncType("batched") + } + cfg.SyncType = typ + } } // setLes configures the les server and ultra light client settings from the command line flags. diff --git a/l2geth/core/rawdb/rollup_indexes.go b/l2geth/core/rawdb/rollup_indexes.go index 4ce5532c175..4f275e6af1a 100644 --- a/l2geth/core/rawdb/rollup_indexes.go +++ b/l2geth/core/rawdb/rollup_indexes.go @@ -44,3 +44,22 @@ func WriteHeadQueueIndex(db ethdb.KeyValueWriter, index uint64) { log.Crit("Failed to store queue index", "err", err) } } + +func ReadHeadVerifiedIndex(db ethdb.KeyValueReader) *uint64 { + data, _ := db.Get(headVerifiedIndexKey) + if len(data) == 0 { + return nil + } + ret := new(big.Int).SetBytes(data).Uint64() + return &ret +} + +func WriteHeadVerifiedIndex(db ethdb.KeyValueWriter, index uint64) { + value := new(big.Int).SetUint64(index).Bytes() + if index == 0 { + value = []byte{0} + } + if err := db.Put(headVerifiedIndexKey, value); err != nil { + log.Crit("Failed to store verfied index", "err", err) + } +} diff --git a/l2geth/core/rawdb/schema.go b/l2geth/core/rawdb/schema.go index ed091fa2398..622a67831e6 100644 --- a/l2geth/core/rawdb/schema.go +++ b/l2geth/core/rawdb/schema.go @@ -58,8 +58,10 @@ var ( // headIndexKey tracks the last processed ctc index headIndexKey = []byte("LastIndex") - // headQueueIndexKey tracks th last processed queue index + // headQueueIndexKey tracks the last processed queue index headQueueIndexKey = []byte("LastQueueIndex") + // headVerifiedIndexKey tracks the latest verified index + headVerifiedIndexKey = []byte("LastVerifiedIndex") preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db diff --git a/l2geth/eth/api_backend.go b/l2geth/eth/api_backend.go index a0889ea4333..1510b7f4462 100644 --- a/l2geth/eth/api_backend.go +++ b/l2geth/eth/api_backend.go @@ -71,9 +71,10 @@ func (b *EthAPIBackend) GetEthContext() (uint64, uint64) { return bn, ts } -func (b *EthAPIBackend) GetRollupContext() (uint64, uint64) { +func (b *EthAPIBackend) GetRollupContext() (uint64, uint64, uint64) { i := uint64(0) q := uint64(0) + v := uint64(0) index := b.eth.syncService.GetLatestIndex() if index != nil { i = *index @@ -82,7 +83,11 @@ func (b *EthAPIBackend) GetRollupContext() (uint64, uint64) { if queueIndex != nil { q = *queueIndex } - return i, q + verifiedIndex := b.eth.syncService.GetLatestVerifiedIndex() + if verifiedIndex != nil { + v = *verifiedIndex + } + return i, q, v } // ChainConfig returns the active chain configuration. @@ -318,7 +323,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) return fmt.Errorf("Calldata cannot be larger than %d, sent %d", b.MaxCallDataSize, len(signedTx.Data())) } } - return b.eth.syncService.ApplyTransaction(signedTx) + return b.eth.syncService.ValidateAndApplySequencerTransaction(signedTx) } // OVM Disabled return b.eth.txPool.AddLocal(signedTx) diff --git a/l2geth/go.sum b/l2geth/go.sum index 6d571f50def..5c53cca7b50 100644 --- a/l2geth/go.sum +++ b/l2geth/go.sum @@ -108,6 +108,7 @@ github.com/influxdata/influxdb v1.2.3-0.20180221223340-01288bdb0883 h1:FSeK4fZCo github.com/influxdata/influxdb v1.2.3-0.20180221223340-01288bdb0883/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY= github.com/jackpal/go-nat-pmp v1.0.2-0.20160603034137-1fa385a6f458 h1:6OvNmYgJyexcZ3pYbTI9jWx5tHo1Dee/tWbLMfPe2TA= github.com/jackpal/go-nat-pmp v1.0.2-0.20160603034137-1fa385a6f458/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= +github.com/jarcoal/httpmock v1.0.8 h1:8kI16SoO6LQKgPE7PvQuV+YuD/inwHd7fOOe2zMbo4k= github.com/jarcoal/httpmock v1.0.8/go.mod h1:ATjnClrvW/3tijVmpL/va5Z3aAyGvqU3gCT8nX0Txik= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= diff --git a/l2geth/internal/ethapi/api.go b/l2geth/internal/ethapi/api.go index 6c384955ce1..7b765cbe30b 100644 --- a/l2geth/internal/ethapi/api.go +++ b/l2geth/internal/ethapi/api.go @@ -1913,9 +1913,15 @@ type EthContext struct { BlockNumber uint64 `json:"blockNumber"` Timestamp uint64 `json:"timestamp"` } + +// RollupContext represents the height of the rollup. +// Index is the last processed CanonicalTransactionChain index +// QueueIndex is the last processed `enqueue` index +// VerifiedIndex is the last processed CTC index that was batched type RollupContext struct { - Index uint64 `json:"index"` - QueueIndex uint64 `json:"queueIndex"` + Index uint64 `json:"index"` + QueueIndex uint64 `json:"queueIndex"` + VerifiedIndex uint64 `json:"verifiedIndex"` } type rollupInfo struct { @@ -1932,7 +1938,7 @@ func (api *PublicRollupAPI) GetInfo(ctx context.Context) rollupInfo { } syncing := api.b.IsSyncing() bn, ts := api.b.GetEthContext() - index, queueIndex := api.b.GetRollupContext() + index, queueIndex, verifiedIndex := api.b.GetRollupContext() return rollupInfo{ Mode: mode, @@ -1942,8 +1948,9 @@ func (api *PublicRollupAPI) GetInfo(ctx context.Context) rollupInfo { Timestamp: ts, }, RollupContext: RollupContext{ - Index: index, - QueueIndex: queueIndex, + Index: index, + QueueIndex: queueIndex, + VerifiedIndex: verifiedIndex, }, } } diff --git a/l2geth/internal/ethapi/backend.go b/l2geth/internal/ethapi/backend.go index 734a83702fd..beef5aaf053 100644 --- a/l2geth/internal/ethapi/backend.go +++ b/l2geth/internal/ethapi/backend.go @@ -91,7 +91,7 @@ type Backend interface { IsVerifier() bool IsSyncing() bool GetEthContext() (uint64, uint64) - GetRollupContext() (uint64, uint64) + GetRollupContext() (uint64, uint64, uint64) GasLimit() uint64 GetDiff(*big.Int) (diffdb.Diff, error) SuggestDataPrice(ctx context.Context) (*big.Int, error) diff --git a/l2geth/les/api_backend.go b/l2geth/les/api_backend.go index 75e30bbbd83..1dbd466497a 100644 --- a/l2geth/les/api_backend.go +++ b/l2geth/les/api_backend.go @@ -58,8 +58,8 @@ func (b *LesApiBackend) GetEthContext() (uint64, uint64) { return 0, 0 } -func (b *LesApiBackend) GetRollupContext() (uint64, uint64) { - return 0, 0 +func (b *LesApiBackend) GetRollupContext() (uint64, uint64, uint64) { + return 0, 0, 0 } func (b *LesApiBackend) IsSyncing() bool { diff --git a/l2geth/rollup/client.go b/l2geth/rollup/client.go index d890d8e6782..0048c56cd61 100644 --- a/l2geth/rollup/client.go +++ b/l2geth/rollup/client.go @@ -13,12 +13,6 @@ import ( "github.com/go-resty/resty/v2" ) -/** - * GET /enqueue/index/{index} - * GET /transaction/index/{index} - * GET /eth/context/latest - */ - type Batch struct { Index uint64 `json:"index"` Root common.Hash `json:"root,omitempty"` @@ -90,11 +84,13 @@ type decoded struct { type RollupClient interface { GetEnqueue(index uint64) (*types.Transaction, error) GetLatestEnqueue() (*types.Transaction, error) - GetTransaction(index uint64) (*types.Transaction, error) - GetLatestTransaction() (*types.Transaction, error) - GetEthContext(index uint64) (*EthContext, error) + GetTransaction(uint64, SyncType) (*types.Transaction, error) + GetLatestTransaction(SyncType) (*types.Transaction, error) + GetEthContext(uint64) (*EthContext, error) GetLatestEthContext() (*EthContext, error) GetLastConfirmedEnqueue() (*types.Transaction, error) + GetLatestTransactionBatch() (*Batch, []*types.Transaction, error) + GetTransactionBatch(uint64) (*Batch, []*types.Transaction, error) SyncStatus() (*SyncStatus, error) GetL1GasPrice() (*big.Int, error) } @@ -109,9 +105,15 @@ type TransactionResponse struct { Batch *Batch `json:"batch"` } +type TransactionBatchResponse struct { + Batch *Batch `json:"batch"` + Transactions []*transaction `json:"transactions"` +} + func NewClient(url string, chainID *big.Int) *Client { client := resty.New() client.SetHostURL(url) + client.SetHeader("User-Agent", "sequencer") signer := types.NewOVMSigner(chainID) return &Client{ @@ -120,7 +122,6 @@ func NewClient(url string, chainID *big.Int) *Client { } } -// This needs to return a transaction instead func (c *Client) GetEnqueue(index uint64) (*types.Transaction, error) { str := strconv.FormatUint(index, 10) response, err := c.client.R(). @@ -222,45 +223,45 @@ func (c *Client) GetLatestEnqueue() (*types.Transaction, error) { return tx, nil } -func transactionResponseToTransaction(res *TransactionResponse, signer *types.OVMSigner) (*types.Transaction, error) { +func transactionResponseToTransaction(res *transaction, signer *types.OVMSigner) (*types.Transaction, error) { // `nil` transactions are not found - if res.Transaction == nil { + if res == nil { return nil, nil } // The queue origin must be either sequencer of l1, otherwise // it is considered an unknown queue origin and will not be processed var queueOrigin types.QueueOrigin - if res.Transaction.QueueOrigin == "sequencer" { + if res.QueueOrigin == "sequencer" { queueOrigin = types.QueueOriginSequencer - } else if res.Transaction.QueueOrigin == "l1" { + } else if res.QueueOrigin == "l1" { queueOrigin = types.QueueOriginL1ToL2 } else { - return nil, fmt.Errorf("Unknown queue origin: %s", res.Transaction.QueueOrigin) + return nil, fmt.Errorf("Unknown queue origin: %s", res.QueueOrigin) } // The transaction type must be EIP155 or EthSign. Throughout this // codebase, it is referred to as "sighash type" but it could actually // be generalized to transaction type. Right now the only different // types use a different signature hashing scheme. var sighashType types.SignatureHashType - if res.Transaction.Type == "EIP155" { + if res.Type == "EIP155" { sighashType = types.SighashEIP155 - } else if res.Transaction.Type == "ETH_SIGN" { + } else if res.Type == "ETH_SIGN" { sighashType = types.SighashEthSign } else { - return nil, fmt.Errorf("Unknown transaction type: %s", res.Transaction.Type) + return nil, fmt.Errorf("Unknown transaction type: %s", res.Type) } // Transactions that have been decoded are // Queue Origin Sequencer transactions - if res.Transaction.Decoded != nil { - nonce := res.Transaction.Decoded.Nonce - to := res.Transaction.Decoded.Target + if res.Decoded != nil { + nonce := res.Decoded.Nonce + to := res.Decoded.Target value := new(big.Int) // Note: there are two gas limits, one top level and // another on the raw transaction itself. Maybe maxGasLimit // for the top level? - gasLimit := res.Transaction.Decoded.GasLimit - gasPrice := new(big.Int).SetUint64(res.Transaction.Decoded.GasPrice) - data := res.Transaction.Decoded.Data + gasLimit := res.Decoded.GasLimit + gasPrice := new(big.Int).SetUint64(res.Decoded.GasPrice) + data := res.Decoded.Data var tx *types.Transaction if to == (common.Address{}) { @@ -270,22 +271,22 @@ func transactionResponseToTransaction(res *TransactionResponse, signer *types.OV } txMeta := types.NewTransactionMeta( - new(big.Int).SetUint64(res.Transaction.BlockNumber), - res.Transaction.Timestamp, - res.Transaction.Origin, + new(big.Int).SetUint64(res.BlockNumber), + res.Timestamp, + res.Origin, sighashType, queueOrigin, - &res.Transaction.Index, - res.Transaction.QueueIndex, - res.Transaction.Data, + &res.Index, + res.QueueIndex, + res.Data, ) tx.SetTransactionMeta(txMeta) - r, s := res.Transaction.Decoded.Signature.R, res.Transaction.Decoded.Signature.S + r, s := res.Decoded.Signature.R, res.Decoded.Signature.S sig := make([]byte, crypto.SignatureLength) copy(sig[32-len(r):32], r) copy(sig[64-len(s):64], s) - sig[64] = byte(res.Transaction.Decoded.Signature.V) + sig[64] = byte(res.Decoded.Signature.V) tx, err := tx.WithSignature(signer, sig[:]) if err != nil { @@ -298,37 +299,40 @@ func transactionResponseToTransaction(res *TransactionResponse, signer *types.OV // The transaction is either an L1 to L2 transaction or it does not have a // known deserialization nonce := uint64(0) - if res.Transaction.QueueOrigin == "l1" { - if res.Transaction.QueueIndex == nil { + if res.QueueOrigin == "l1" { + if res.QueueIndex == nil { return nil, errors.New("Queue origin L1 to L2 without a queue index") } - nonce = *res.Transaction.QueueIndex + nonce = *res.QueueIndex } - target := res.Transaction.Target - gasLimit := res.Transaction.GasLimit - data := res.Transaction.Data - origin := res.Transaction.Origin + target := res.Target + gasLimit := res.GasLimit + data := res.Data + origin := res.Origin tx := types.NewTransaction(nonce, target, big.NewInt(0), gasLimit, big.NewInt(0), data) txMeta := types.NewTransactionMeta( - new(big.Int).SetUint64(res.Transaction.BlockNumber), - res.Transaction.Timestamp, + new(big.Int).SetUint64(res.BlockNumber), + res.Timestamp, origin, sighashType, queueOrigin, - &res.Transaction.Index, - res.Transaction.QueueIndex, - res.Transaction.Data, + &res.Index, + res.QueueIndex, + res.Data, ) tx.SetTransactionMeta(txMeta) return tx, nil } -func (c *Client) GetTransaction(index uint64) (*types.Transaction, error) { +func (c *Client) GetTransaction(index uint64, syncType SyncType) (*types.Transaction, error) { str := strconv.FormatUint(index, 10) response, err := c.client.R(). SetPathParams(map[string]string{ "index": str, }). + SetQueryParams(map[string]string{ + "source": syncType.String(), + }). SetResult(&TransactionResponse{}). Get("/transaction/index/{index}") @@ -339,12 +343,14 @@ func (c *Client) GetTransaction(index uint64) (*types.Transaction, error) { if !ok { return nil, fmt.Errorf("could not get tx with index %d", index) } - - return transactionResponseToTransaction(res, c.signer) + return transactionResponseToTransaction(res.Transaction, c.signer) } -func (c *Client) GetLatestTransaction() (*types.Transaction, error) { +func (c *Client) GetLatestTransaction(syncType SyncType) (*types.Transaction, error) { response, err := c.client.R(). + SetQueryParams(map[string]string{ + "source": syncType.String(), + }). SetResult(&TransactionResponse{}). Get("/transaction/latest") @@ -353,10 +359,10 @@ func (c *Client) GetLatestTransaction() (*types.Transaction, error) { } res, ok := response.Result().(*TransactionResponse) if !ok { - return nil, errors.New("") + return nil, errors.New("Cannot get latest transaction") } - return transactionResponseToTransaction(res, c.signer) + return transactionResponseToTransaction(res.Transaction, c.signer) } func (c *Client) GetEthContext(blockNumber uint64) (*EthContext, error) { @@ -376,7 +382,6 @@ func (c *Client) GetEthContext(blockNumber uint64) (*EthContext, error) { if !ok { return nil, errors.New("Cannot parse EthContext") } - return context, nil } @@ -445,6 +450,56 @@ func (c *Client) SyncStatus() (*SyncStatus, error) { return status, nil } +func (c *Client) GetLatestTransactionBatch() (*Batch, []*types.Transaction, error) { + response, err := c.client.R(). + SetResult(&TransactionBatchResponse{}). + Get("/batch/transaction/latest") + + if err != nil { + return nil, nil, errors.New("Cannot get latest transaction batch") + } + txBatch, ok := response.Result().(*TransactionBatchResponse) + if !ok { + return nil, nil, fmt.Errorf("Cannot parse transaction batch response") + } + return parseTransactionBatchResponse(txBatch, c.signer) +} + +func (c *Client) GetTransactionBatch(index uint64) (*Batch, []*types.Transaction, error) { + str := strconv.FormatUint(index, 10) + response, err := c.client.R(). + SetResult(&TransactionBatchResponse{}). + SetPathParams(map[string]string{ + "index": str, + }). + Get("/batch/transaction/index/{index}") + + if err != nil { + return nil, nil, fmt.Errorf("Cannot get transaction batch %d", index) + } + txBatch, ok := response.Result().(*TransactionBatchResponse) + if !ok { + return nil, nil, fmt.Errorf("Cannot parse transaction batch response") + } + return parseTransactionBatchResponse(txBatch, c.signer) +} + +func parseTransactionBatchResponse(txBatch *TransactionBatchResponse, signer *types.OVMSigner) (*Batch, []*types.Transaction, error) { + if txBatch == nil { + return nil, nil, nil + } + batch := txBatch.Batch + txs := make([]*types.Transaction, len(txBatch.Transactions)) + for i, tx := range txBatch.Transactions { + transaction, err := transactionResponseToTransaction(tx, signer) + if err != nil { + return nil, nil, fmt.Errorf("Cannot parse transaction batch: %w", err) + } + txs[i] = transaction + } + return batch, txs, nil +} + func (c *Client) GetL1GasPrice() (*big.Int, error) { response, err := c.client.R(). SetResult(&L1GasPrice{}). diff --git a/l2geth/rollup/config.go b/l2geth/rollup/config.go index c79ee411cb5..67b3e5826e8 100644 --- a/l2geth/rollup/config.go +++ b/l2geth/rollup/config.go @@ -35,4 +35,6 @@ type Config struct { TimestampRefreshThreshold time.Duration // The gas price to use when estimating L1 calldata publishing costs L1GasPrice *big.Int + // SyncType defines the security model of the transactions that are synced + SyncType SyncType } diff --git a/l2geth/rollup/sync_service.go b/l2geth/rollup/sync_service.go index 4a8864e5bc5..0d8d6b90d4c 100644 --- a/l2geth/rollup/sync_service.go +++ b/l2geth/rollup/sync_service.go @@ -23,15 +23,9 @@ import ( "github.com/ethereum/go-ethereum/eth/gasprice" ) -// OVMContext represents the blocknumber and timestamp -// that exist during L2 execution -type OVMContext struct { - blockNumber uint64 - timestamp uint64 -} - -// SyncService implements the verifier functionality as well as the reorg -// protection for the sequencer. +// SyncService implements the main functionality around pulling in transactions +// and executing them. It can be configured to run in both sequencer mode and in +// verifier mode. type SyncService struct { ctx context.Context cancel context.CancelFunc @@ -40,6 +34,7 @@ type SyncService struct { scope event.SubscriptionScope txFeed event.Feed txLock sync.Mutex + loopLock sync.Mutex enable bool eth1ChainId uint64 bc *core.BlockChain @@ -47,13 +42,15 @@ type SyncService struct { L1gpo *gasprice.L1Oracle client RollupClient syncing atomic.Value + chainHeadSub event.Subscription OVMContext OVMContext - confirmationDepth uint64 pollInterval time.Duration timestampRefreshThreshold time.Duration + chainHeadCh chan core.ChainHeadEvent + syncType SyncType } -// NewSyncService returns an initialized sync service +// NewSyncService returns an initialized sync service. func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *core.BlockChain, db ethdb.Database) (*SyncService, error) { if bc == nil { return nil, errors.New("Must pass BlockChain to SyncService") @@ -63,9 +60,9 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co _ = cancel // satisfy govet if cfg.IsVerifier { - log.Info("Running in verifier mode") + log.Info("Running in verifier mode", "sync-type", cfg.SyncType.String()) } else { - log.Info("Running in sequencer mode") + log.Info("Running in sequencer mode", "sync-type", cfg.SyncType.String()) } pollInterval := cfg.PollInterval @@ -75,8 +72,8 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co } timestampRefreshThreshold := cfg.TimestampRefreshThreshold if timestampRefreshThreshold == 0 { - log.Info("Sanitizing timestamp refresh threshold to 15 minutes") - timestampRefreshThreshold = time.Minute * 15 + log.Info("Sanitizing timestamp refresh threshold to 5 minutes") + timestampRefreshThreshold = time.Minute * 5 } // Layer 2 chainid @@ -86,24 +83,33 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co } // Initialize the rollup client client := NewClient(cfg.RollupClientHttp, chainID) - log.Info("Configured rollup client", "url", cfg.RollupClientHttp, "chain-id", chainID.Uint64(), "ctc-deploy-height", cfg.CanonicalTransactionChainDeployHeight) + log.Info("Configured rollup client", "url", cfg.RollupClientHttp, "chain-id", chainID.Uint64()) service := SyncService{ ctx: ctx, cancel: cancel, verifier: cfg.IsVerifier, enable: cfg.Eth1SyncServiceEnable, - confirmationDepth: cfg.Eth1ConfirmationDepth, syncing: atomic.Value{}, bc: bc, txpool: txpool, + chainHeadCh: make(chan core.ChainHeadEvent, 1), eth1ChainId: cfg.Eth1ChainId, client: client, db: db, pollInterval: pollInterval, timestampRefreshThreshold: timestampRefreshThreshold, + syncType: cfg.SyncType, } - // Initial sync service setup if it is enabled. This code depends on + // The chainHeadSub is used to synchronize the SyncService with the chain. + // As the SyncService processes transactions, it waits until the transaction + // is added to the chain. This synchronization is required for handling + // reorgs and also favors safety over liveliness. If a transaction breaks + // things downstream, it is expected that this channel will halt ingestion + // of additional transactions by the SyncService. + service.chainHeadSub = service.bc.SubscribeChainHeadEvent(service.chainHeadCh) + + // Initialize sync service setup if it is enabled. This code depends on // a remote server that indexes the layer one contracts. Place this // code behind this if statement so that this can run without the // requirement of the remote server being up. @@ -115,7 +121,7 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co return nil, fmt.Errorf("Rollup client unable to connect: %w", err) } - // Ensure that the remote is still not syncing + // Wait until the remote service is done syncing for { status, err := service.client.SyncStatus() if err != nil { @@ -132,30 +138,26 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co // Initialize the latest L1 data here to make sure that // it happens before the RPC endpoints open up // Only do it if the sync service is enabled so that this - // can be ran without needing to have a configured client. + // can be ran without needing to have a configured RollupClient err = service.initializeLatestL1(cfg.CanonicalTransactionChainDeployHeight) if err != nil { return nil, fmt.Errorf("Cannot initialize latest L1 data: %w", err) } + // Log the OVMContext information on startup bn := service.GetLatestL1BlockNumber() ts := service.GetLatestL1Timestamp() log.Info("Initialized Latest L1 Info", "blocknumber", bn, "timestamp", ts) - var i, q string index := service.GetLatestIndex() queueIndex := service.GetLatestEnqueueIndex() - if index == nil { - i = "" - } else { - i = strconv.FormatUint(*index, 10) - } - if queueIndex == nil { - q = "" - } else { - q = strconv.FormatUint(*queueIndex, 10) + verifiedIndex := service.GetLatestVerifiedIndex() + block := service.bc.CurrentBlock() + if block == nil { + block = types.NewBlock(&types.Header{}, nil, nil, nil) } - log.Info("Initialized Eth Context", "index", i, "queue-index", q) + header := block.Header() + log.Info("Initial Rollup State", "state", header.Root.Hex(), "index", stringify(index), "queue-index", stringify(queueIndex), "verified-index", verifiedIndex) // The sequencer needs to sync to the tip at start up // By setting the sync status to true, it will prevent RPC calls. @@ -164,10 +166,11 @@ func NewSyncService(ctx context.Context, cfg Config, txpool *core.TxPool, bc *co service.setSyncStatus(true) } } - return &service, nil } +// ensureClient checks to make sure that the remote transaction source is +// available. It will return an error if it cannot connect via HTTP func (s *SyncService) ensureClient() error { _, err := s.client.GetLatestEthContext() if err != nil { @@ -176,31 +179,29 @@ func (s *SyncService) ensureClient() error { return nil } -// Start initializes the service, connecting to Ethereum1 and starting the -// subservices required for the operation of the SyncService. -// txs through syncservice go to mempool.locals -// txs through rpc go to mempool.remote +// Start initializes the SyncService. func (s *SyncService) Start() error { if !s.enable { + log.Info("Sync Service not initialized") return nil } log.Info("Initializing Sync Service", "eth1-chainid", s.eth1ChainId) - // When a sequencer, be sure to sync to the tip of the ctc before allowing - // user transactions. - if !s.verifier { - err := s.syncTransactionsToTip() + if s.verifier { + go s.VerifierLoop() + } else { + // The sequencer must sync the transactions to the tip and the + // pending queue transactions on start before setting sync status + // to false and opening up the RPC to accept transactions. + err := s.syncTransactionsToTip(s.syncType) if err != nil { return fmt.Errorf("Cannot sync transactions to the tip: %w", err) } - // TODO: This should also sync the enqueue'd transactions that have not - // been synced yet + err = s.syncQueueToTip() + if err != nil { + log.Error("Sequencer cannot sync queue", "msg", err) + } s.setSyncStatus(false) - } - - if s.verifier { - go s.VerifierLoop() - } else { go s.SequencerLoop() } return nil @@ -217,6 +218,7 @@ func (s *SyncService) initializeLatestL1(ctcDeployHeight *big.Int) error { if ctcDeployHeight == nil { return errors.New("Must configure with canonical transaction chain deploy height") } + log.Info("Initializing initial OVM Context", "ctc-deploy-height", ctcDeployHeight.Uint64()) context, err := s.client.GetEthContext(ctcDeployHeight.Uint64()) if err != nil { return fmt.Errorf("Cannot fetch ctc deploy block at height %d: %w", ctcDeployHeight.Uint64(), err) @@ -230,7 +232,7 @@ func (s *SyncService) initializeLatestL1(ctcDeployHeight *big.Int) error { block = s.bc.CurrentBlock() idx := block.Number().Uint64() if idx > *index { - // This is recoverable with a reorg + // This is recoverable with a reorg but should never happen return fmt.Errorf("Current block height greater than index") } s.SetLatestIndex(&idx) @@ -244,22 +246,19 @@ func (s *SyncService) initializeLatestL1(ctcDeployHeight *big.Int) error { s.SetLatestL1Timestamp(tx.L1Timestamp()) s.SetLatestL1BlockNumber(tx.L1BlockNumber().Uint64()) } - // Only the sequencer cares about latest queue index - if !s.verifier { - queueIndex := s.GetLatestEnqueueIndex() - if queueIndex == nil { - enqueue, err := s.client.GetLastConfirmedEnqueue() - if err != nil { - return fmt.Errorf("Cannot fetch last confirmed queue tx: %w", err) - } - // There are no enqueues yet - if enqueue == nil { - return nil - } - queueIndex = enqueue.GetMeta().QueueIndex + queueIndex := s.GetLatestEnqueueIndex() + if queueIndex == nil { + enqueue, err := s.client.GetLastConfirmedEnqueue() + if err != nil { + return fmt.Errorf("Cannot fetch last confirmed queue tx: %w", err) } - s.SetLatestEnqueueIndex(queueIndex) + // There are no enqueues yet + if enqueue == nil { + return nil + } + queueIndex = enqueue.GetMeta().QueueIndex } + s.SetLatestEnqueueIndex(queueIndex) return nil } @@ -285,6 +284,8 @@ func (s *SyncService) IsSyncing() bool { // Stop will close the open channels and cancel the goroutines // started by this service. func (s *SyncService) Stop() error { + s.chainHeadSub.Unsubscribe() + close(s.chainHeadCh) s.scope.Close() if s.cancel != nil { @@ -293,6 +294,7 @@ func (s *SyncService) Stop() error { return nil } +// VerifierLoop is the main loop for Verifier mode func (s *SyncService) VerifierLoop() { log.Info("Starting Verifier Loop", "poll-interval", s.pollInterval, "timestamp-refresh-threshold", s.timestampRefreshThreshold) for { @@ -303,371 +305,308 @@ func (s *SyncService) VerifierLoop() { } } +// verify is the main logic for the Verifier. The verifier logic is different +// depending on the SyncType. func (s *SyncService) verify() error { - // The verifier polls for ctc transactions. - // the ctc transactions are extending the chain. - latest, err := s.client.GetLatestTransaction() - if err != nil { - return err - } - - if latest == nil { - log.Debug("latest transaction not found") - return nil - } - - var start uint64 - if s.GetLatestIndex() == nil { - start = 0 - } else { - start = *s.GetLatestIndex() + 1 - } - end := *latest.GetMeta().Index - log.Info("Polling transactions", "start", start, "end", end) - for i := start; i <= end; i++ { - tx, err := s.client.GetTransaction(i) + switch s.syncType { + case SyncTypeBatched: + err := s.syncTransactionBatchesToTip() if err != nil { - return fmt.Errorf("cannot get tx in loop: %w", err) + log.Error("Verifier cannot sync transaction batches to tip", "msg", err) } - - log.Debug("Applying transaction", "index", i) - err = s.maybeApplyTransaction(tx) + case SyncTypeSequenced: + err := s.syncTransactionsToTip(SyncTypeSequenced) if err != nil { - return fmt.Errorf("could not apply transaction: %w", err) + log.Error("Verifier cannot sync transactions with SyncTypeSequencer", "msg", err) } - s.SetLatestIndex(&i) } - return nil } +// SequencerLoop is the polling loop that runs in sequencer mode. It sequences +// transactions and then updates the EthContext. func (s *SyncService) SequencerLoop() { log.Info("Starting Sequencer Loop", "poll-interval", s.pollInterval, "timestamp-refresh-threshold", s.timestampRefreshThreshold) for { + err := s.updateGasPrice() + if err != nil { + log.Error("Cannot update L1 gas price", "msg", err) + } s.txLock.Lock() - err := s.sequence() + err = s.sequence() if err != nil { log.Error("Could not sequence", "error", err) } s.txLock.Unlock() - if s.updateContext() != nil { + if s.updateEthContext() != nil { log.Error("Could not update execution context", "error", err) } - time.Sleep(s.pollInterval) } } +// sequence is the main logic for the Sequencer. It will sync any `enqueue` +// transactions it has yet to sync and then pull in transaction batches to +// compare against the transactions it has in its local state. The sequencer +// should reorg based on the transaction batches that are posted because +// L1 is the source of truth. The sequencer concurrently accepts user +// transactions via the RPC. func (s *SyncService) sequence() error { - // Update to the latest L1 gas price - l1GasPrice, err := s.client.GetL1GasPrice() + err := s.syncQueueToTip() if err != nil { - return err + log.Error("Sequencer cannot sync queue", "msg", err) } - s.L1gpo.SetL1GasPrice(l1GasPrice) - log.Info("Adjusted L1 Gas Price", "gasprice", l1GasPrice) - - // Only the sequencer needs to poll for enqueue transactions - // and then can choose when to apply them. We choose to apply - // transactions such that it makes for efficient batch submitting. - // Place as many L1ToL2 transactions in the same context as possible - // by executing them one after another. - latest, err := s.client.GetLatestEnqueue() + err = s.syncTransactionBatchesToTip() if err != nil { - return err - } - - // This should never happen unless the backend is empty - if latest == nil { - log.Debug("No enqueue transactions found") - return nil - } - - // Compare the remote latest queue index to the local latest - // queue index. If the remote latest queue index is greater - // than the local latest queue index, be sure to ingest more - // enqueued transactions - var start uint64 - if s.GetLatestEnqueueIndex() == nil { - start = 0 - } else { - start = *s.GetLatestEnqueueIndex() + 1 - } - end := *latest.GetMeta().QueueIndex - - log.Info("Polling enqueued transactions", "start", start, "end", end) - for i := start; i <= end; i++ { - enqueue, err := s.client.GetEnqueue(i) - if err != nil { - return fmt.Errorf("Cannot get enqueue in loop %d: %w", i, err) - } - - if enqueue == nil { - log.Debug("No enqueue transaction found") - return nil - } - - // This should never happen - if enqueue.L1BlockNumber() == nil { - return fmt.Errorf("No blocknumber for enqueue idx %d, timestamp %d, blocknumber %d", i, enqueue.L1Timestamp(), enqueue.L1BlockNumber()) - } - - // Update the timestamp and blocknumber based on the enqueued - // transactions - if enqueue.L1Timestamp() > s.GetLatestL1Timestamp() { - ts := enqueue.L1Timestamp() - bn := enqueue.L1BlockNumber().Uint64() - s.SetLatestL1Timestamp(ts) - s.SetLatestL1BlockNumber(bn) - log.Info("Updated Eth Context from enqueue", "index", i, "timestamp", ts, "blocknumber", bn) - } - - log.Debug("Applying enqueue transaction", "index", i) - err = s.applyTransaction(enqueue) - if err != nil { - return fmt.Errorf("could not apply transaction: %w", err) - } - - s.SetLatestEnqueueIndex(enqueue.GetMeta().QueueIndex) - if enqueue.GetMeta().Index == nil { - latest := s.GetLatestIndex() - index := uint64(0) - if latest != nil { - index = *latest + 1 - } - s.SetLatestIndex(&index) - } else { - s.SetLatestIndex(enqueue.GetMeta().Index) - } + log.Error("Sequencer cannot sync transaction batches", "msg", err) } - return nil } -/// Update the execution context's timestamp and blocknumber -/// over time. This is only necessary for the sequencer. -func (s *SyncService) updateContext() error { - context, err := s.client.GetLatestEthContext() +// updateGasPrice will query the remote data transport layer for the current L1 +// gas price +func (s *SyncService) updateGasPrice() error { + l1GasPrice, err := s.client.GetL1GasPrice() if err != nil { - return err + return fmt.Errorf("cannot fetch L1 gas price: %w", err) } - - current := time.Unix(int64(s.GetLatestL1Timestamp()), 0) - next := time.Unix(int64(context.Timestamp), 0) - if next.Sub(current) > s.timestampRefreshThreshold { - log.Info("Updating Eth Context", "timetamp", context.Timestamp, "blocknumber", context.BlockNumber) - s.SetLatestL1BlockNumber(context.BlockNumber) - s.SetLatestL1Timestamp(context.Timestamp) - } - + s.L1gpo.SetL1GasPrice(l1GasPrice) + log.Info("Adjusted L1 Gas Price", "gasprice", l1GasPrice) return nil } -// This function must sync all the way to the tip -// TODO: it should then sync all of the enqueue transactions -func (s *SyncService) syncTransactionsToTip() error { - // Then set up a while loop that only breaks when the latest - // transaction does not change through two runs of the loop. - // The latest transaction can change during the timeframe of - // all of the transactions being sync'd. - for { - // This function must be sure to sync all the way to the tip. - // First query the latest transaction - latest, err := s.client.GetLatestTransaction() - if err != nil { - log.Error("Cannot get latest transaction", "msg", err) - time.Sleep(time.Second * 2) - continue - } - if latest == nil { - log.Info("No transactions to sync") - return nil - } - tipHeight := latest.GetMeta().Index - index := rawdb.ReadHeadIndex(s.db) - start := uint64(0) - if index != nil { - start = *index + 1 - } - - log.Info("Syncing transactions to tip", "start", start, "end", *tipHeight) - for i := start; i <= *tipHeight; i++ { - tx, err := s.client.GetTransaction(i) - if err != nil { - log.Error("Cannot get transaction", "index", i, "msg", err) - time.Sleep(time.Second * 2) - continue - } - // The transaction does not yet exist in the ctc - if tx == nil { - index := latest.GetMeta().Index - if index == nil { - return fmt.Errorf("Unexpected nil index") - } - return fmt.Errorf("Transaction %d not found when %d is latest", i, *index) - } - err = s.maybeApplyTransaction(tx) - if err != nil { - return fmt.Errorf("Cannot apply transaction: %w", err) - } - if err != nil { - log.Error("Cannot ingest transaction", "index", i) - } - s.SetLatestIndex(tx.GetMeta().Index) - if types.QueueOrigin(tx.QueueOrigin().Uint64()) == types.QueueOriginL1ToL2 { - queueIndex := tx.GetMeta().QueueIndex - s.SetLatestEnqueueIndex(queueIndex) - } - } - // Be sure to check that no transactions came in while - // the above loop was running - post, err := s.client.GetLatestTransaction() - if err != nil { - return fmt.Errorf("Cannot get latest transaction: %w", err) - } - // These transactions should always have an index since they - // are already in the ctc. - if *latest.GetMeta().Index == *post.GetMeta().Index { - log.Info("Done syncing transactions to tip") - return nil - } - } -} - // Methods for safely accessing and storing the latest // L1 blocknumber and timestamp. These are held in memory. + +// GetLatestL1Timestamp returns the OVMContext timestamp func (s *SyncService) GetLatestL1Timestamp() uint64 { return atomic.LoadUint64(&s.OVMContext.timestamp) } +// GetLatestL1BlockNumber returns the OVMContext blocknumber func (s *SyncService) GetLatestL1BlockNumber() uint64 { return atomic.LoadUint64(&s.OVMContext.blockNumber) } +// SetLatestL1Timestamp will set the OVMContext timestamp func (s *SyncService) SetLatestL1Timestamp(ts uint64) { atomic.StoreUint64(&s.OVMContext.timestamp, ts) } +// SetLatestL1BlockNumber will set the OVMContext blocknumber func (s *SyncService) SetLatestL1BlockNumber(bn uint64) { atomic.StoreUint64(&s.OVMContext.blockNumber, bn) } +// GetLatestEnqueueIndex reads the last queue index processed func (s *SyncService) GetLatestEnqueueIndex() *uint64 { return rawdb.ReadHeadQueueIndex(s.db) } +// GetNextEnqueueIndex returns the next queue index to process +func (s *SyncService) GetNextEnqueueIndex() uint64 { + latest := s.GetLatestEnqueueIndex() + if latest == nil { + return 0 + } + return *latest + 1 +} + +// SetLatestEnqueueIndex writes the last queue index that was processed func (s *SyncService) SetLatestEnqueueIndex(index *uint64) { if index != nil { rawdb.WriteHeadQueueIndex(s.db, *index) } } +// GetLatestIndex reads the last CTC index that was processed +func (s *SyncService) GetLatestIndex() *uint64 { + return rawdb.ReadHeadIndex(s.db) +} + +// GetNextIndex reads the next CTC index to process +func (s *SyncService) GetNextIndex() uint64 { + latest := s.GetLatestIndex() + if latest == nil { + return 0 + } + return *latest + 1 +} + +// SetLatestIndex writes the last CTC index that was processed func (s *SyncService) SetLatestIndex(index *uint64) { if index != nil { rawdb.WriteHeadIndex(s.db, *index) } } -func (s *SyncService) GetLatestIndex() *uint64 { - return rawdb.ReadHeadIndex(s.db) +// GetLatestVerifiedIndex reads the last verified CTC index that was processed +// These are set by processing batches of transactions that were submitted to +// the Canonical Transaction Chain. +func (s *SyncService) GetLatestVerifiedIndex() *uint64 { + return rawdb.ReadHeadVerifiedIndex(s.db) } -// reorganize will reorganize to directly to the index passed in. -// The caller must handle the offset relative to the ctc. -func (s *SyncService) reorganize(index uint64) error { - if index == 0 { - return nil - } - err := s.bc.SetHead(index) - if err != nil { - return fmt.Errorf("Cannot reorganize in syncservice: %w", err) +// GetNextVerifiedIndex reads the next verified index +func (s *SyncService) GetNextVerifiedIndex() uint64 { + index := s.GetLatestVerifiedIndex() + if index == nil { + return 0 } + return *index + 1 +} - // TODO: make sure no off by one error here - s.SetLatestIndex(&index) - - // When in sequencer mode, be sure to roll back the latest queue - // index as well. - if !s.verifier { - enqueue, err := s.client.GetLastConfirmedEnqueue() - if err != nil { - return fmt.Errorf("cannot reorganize: %w", err) - } - s.SetLatestEnqueueIndex(enqueue.GetMeta().QueueIndex) +// SetLatestVerifiedIndex writes the last verified index that was processed +func (s *SyncService) SetLatestVerifiedIndex(index *uint64) { + if index != nil { + rawdb.WriteHeadVerifiedIndex(s.db, *index) } - log.Info("Reorganizing", "height", index) - return nil } -// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and -// starts sending event to the given channel. -func (s *SyncService) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { - return s.scope.Track(s.txFeed.Subscribe(ch)) +// applyTransaction is a higher level API for applying a transaction +func (s *SyncService) applyTransaction(tx *types.Transaction) error { + if tx.GetMeta().Index != nil { + return s.applyIndexedTransaction(tx) + } + return s.applyTransactionToTip(tx) } -// maybeApplyTransaction will potentially apply the transaction after first -// inspecting the local database. This is mean to prevent transactions from -// being replayed. -func (s *SyncService) maybeApplyTransaction(tx *types.Transaction) error { +// applyIndexedTransaction applys a transaction that has an index. This means +// that the source of the transaction was either a L1 batch or from the +// sequencer. +func (s *SyncService) applyIndexedTransaction(tx *types.Transaction) error { if tx == nil { - return fmt.Errorf("nil transaction passed to maybeApplyTransaction") + return errors.New("Transaction is nil in applyIndexedTransaction") } + index := tx.GetMeta().Index + if index == nil { + return errors.New("No index found in applyIndexedTransaction") + } + log.Trace("Applying indexed transaction", "index", *index) + next := s.GetNextIndex() + if *index == next { + return s.applyTransactionToTip(tx) + } + if *index < next { + return s.applyHistoricalTransaction(tx) + } + return fmt.Errorf("Received tx at index %d when looking for %d", *index, next) +} - log.Debug("Maybe applying transaction", "hash", tx.Hash().Hex()) +// applyHistoricalTransaction will compare a historical transaction against what +// is locally indexed. This will trigger a reorg in the future +func (s *SyncService) applyHistoricalTransaction(tx *types.Transaction) error { + if tx == nil { + return errors.New("Transaction is nil in applyHistoricalTransaction") + } index := tx.GetMeta().Index if index == nil { - return fmt.Errorf("nil index in maybeApplyTransaction") + return errors.New("No index is found in applyHistoricalTransaction") } - // Handle off by one + // Handle the off by one block := s.bc.GetBlockByNumber(*index + 1) - - // The transaction has yet to be played, so it is safe to apply if block == nil { - err := s.applyTransaction(tx) - if err != nil { - return fmt.Errorf("Maybe apply transaction failed on index %d: %w", *index, err) - } - return nil + return fmt.Errorf("Block %d is not found", *index+1) } - // There is already a transaction at that index, so check - // for its equality. txs := block.Transactions() if len(txs) != 1 { - log.Info("block", "txs", len(txs), "number", block.Number().Uint64()) - return fmt.Errorf("More than 1 transaction in block") + return fmt.Errorf("More than one transaction found in block %d", *index+1) } - if isCtcTxEqual(tx, txs[0]) { - log.Info("Matching transaction found", "index", *index) + if !isCtcTxEqual(tx, txs[0]) { + log.Error("Mismatched transaction", "index", index) } else { - log.Warn("Non matching transaction found", "index", *index) + log.Debug("Batched transaction matches", "index", index, "hash", tx.Hash().Hex()) } return nil } -// Lower level API used to apply a transaction, must only be used with -// transactions that came from L1. -func (s *SyncService) applyTransaction(tx *types.Transaction) error { +// applyTransactionToTip will do sanity checks on the transaction before +// applying it to the tip. It blocks until the transaction has been included in +// the chain. +func (s *SyncService) applyTransactionToTip(tx *types.Transaction) error { + if tx == nil { + return errors.New("nil transaction passed to applyTransactionToTip") + } + log.Trace("Applying transaction to tip") + if tx.L1Timestamp() == 0 { + ts := s.GetLatestL1Timestamp() + bn := s.GetLatestL1BlockNumber() + tx.SetL1Timestamp(ts) + tx.SetL1BlockNumber(bn) + } else if tx.L1Timestamp() > s.GetLatestL1Timestamp() { + ts := tx.L1Timestamp() + bn := tx.L1BlockNumber() + s.SetLatestL1Timestamp(ts) + s.SetLatestL1BlockNumber(bn.Uint64()) + } else if tx.L1Timestamp() < s.GetLatestL1Timestamp() { + // TODO: this should force a reorg + log.Error("Timestamp monotonicity violation", "hash", tx.Hash().Hex()) + } + + if tx.GetMeta().Index == nil { + index := s.GetLatestIndex() + if index == nil { + tx.SetIndex(0) + } else { + tx.SetIndex(*index + 1) + } + } + s.SetLatestIndex(tx.GetMeta().Index) + if tx.GetMeta().QueueIndex != nil { + s.SetLatestEnqueueIndex(tx.GetMeta().QueueIndex) + } + + // This is a temporary fix for a bug in the SequencerEntrypoint. It will + // be removed when the custom batch serialization is removed in favor of + // batching RLP encoded transactions. tx = fixType(tx) + txs := types.Transactions{tx} s.txFeed.Send(core.NewTxsEvent{Txs: txs}) + // Block until the transaction has been added to the chain + log.Debug("Waiting for transaction to be added to chain", "hash", tx.Hash().Hex()) + <-s.chainHeadCh + + return nil +} + +// applyBatchedTransaction applies transactions that were batched to layer one. +// The sequencer checks for batches over time to make sure that it does not +// deviate from the L1 state and this is the main method of transaction +// ingestion for the verifier. +func (s *SyncService) applyBatchedTransaction(tx *types.Transaction) error { + if tx == nil { + return errors.New("nil transaction passed into applyBatchedTransaction") + } + index := tx.GetMeta().Index + if index == nil { + return errors.New("No index found on transaction") + } + log.Trace("Applying batched transaction", "index", *index) + err := s.applyIndexedTransaction(tx) + if err != nil { + return fmt.Errorf("Cannot apply batched transaction: %w", err) + } + s.SetLatestVerifiedIndex(index) return nil } // Higher level API for applying transactions. Should only be called for // queue origin sequencer transactions, as the contracts on L1 manage the same // validity checks that are done here. -func (s *SyncService) ApplyTransaction(tx *types.Transaction) error { +func (s *SyncService) ValidateAndApplySequencerTransaction(tx *types.Transaction) error { + if s.verifier { + return errors.New("Verifier does not accept transactions out of band") + } if tx == nil { - return fmt.Errorf("nil transaction passed to ApplyTransaction") + return errors.New("nil transaction passed to ValidateAndApplySequencerTransaction") } - log.Debug("Sending transaction to sync service", "hash", tx.Hash().Hex()) s.txLock.Lock() defer s.txLock.Unlock() - if s.verifier { - return errors.New("Verifier does not accept transactions out of band") - } + log.Trace("Sequencer transaction validation", "hash", tx.Hash().Hex()) + qo := tx.QueueOrigin() if qo == nil { return errors.New("invalid transaction with no queue origin") @@ -680,14 +619,7 @@ func (s *SyncService) ApplyTransaction(tx *types.Transaction) error { return fmt.Errorf("invalid transaction: %w", err) } - if tx.L1Timestamp() == 0 { - ts := s.GetLatestL1Timestamp() - bn := s.GetLatestL1BlockNumber() - tx.SetL1Timestamp(ts) - tx.SetL1BlockNumber(bn) - } - - // Set the raw transaction data in the meta + // Set the raw transaction data in the meta. txRaw, err := getRawTransaction(tx) if err != nil { return fmt.Errorf("invalid transaction: %w", err) @@ -704,10 +636,176 @@ func (s *SyncService) ApplyTransaction(tx *types.Transaction) error { txRaw, ) tx.SetTransactionMeta(newMeta) - return s.applyTransaction(tx) } +// syncTransactionsToTip will sync all of the transactions to the tip. +// The syncType determines the source of the transactions. +func (s *SyncService) syncTransactionsToTip(syncType SyncType) error { + s.loopLock.Lock() + defer s.loopLock.Unlock() + + for { + latest, err := s.client.GetLatestTransaction(syncType) + if err != nil { + return fmt.Errorf("Cannot get latest transaction: %w", err) + } + if latest == nil { + log.Info("No transactions to sync") + return nil + } + latestIndex := latest.GetMeta().Index + if latestIndex == nil { + return errors.New("Latest index is nil") + } + nextIndex := s.GetNextIndex() + log.Info("Syncing transactions to tip", "start", *latestIndex, "end", nextIndex) + + for i := nextIndex; i <= *latestIndex; i++ { + tx, err := s.client.GetTransaction(i, syncType) + if err != nil { + log.Error("Cannot get latest transaction", "msg", err) + time.Sleep(time.Second * 2) + continue + } + if tx == nil { + return fmt.Errorf("Transaction %d is nil", i) + } + err = s.applyTransaction(tx) + if err != nil { + return fmt.Errorf("Cannot apply transaction: %w", err) + } + } + + post, err := s.client.GetLatestTransaction(syncType) + if err != nil { + return fmt.Errorf("Cannot get latest transaction: %w", err) + } + postLatestIndex := post.GetMeta().Index + if postLatestIndex == nil { + return errors.New("Latest index is nil") + } + if *postLatestIndex == *latestIndex { + return nil + } + } +} + +// syncTransactionBatchesToTip will sync all of the transaction batches to the +// tip +func (s *SyncService) syncTransactionBatchesToTip() error { + s.loopLock.Lock() + defer s.loopLock.Unlock() + log.Debug("Syncing transaction batches to tip") + + for { + latest, _, err := s.client.GetLatestTransactionBatch() + if err != nil { + return fmt.Errorf("Cannot get latest transaction batch: %w", err) + } + if latest == nil { + log.Info("No transaction batches to sync") + return nil + } + latestIndex := latest.Index + nextIndex := s.GetNextVerifiedIndex() + + for i := nextIndex; i <= latestIndex; i++ { + log.Debug("Fetching transaction batch", "index", i) + _, txs, err := s.client.GetTransactionBatch(i) + if err != nil { + return fmt.Errorf("Cannot get transaction batch: %w", err) + } + for _, tx := range txs { + s.applyBatchedTransaction(tx) + } + } + post, _, err := s.client.GetLatestTransactionBatch() + if err != nil { + return fmt.Errorf("Cannot get latest transaction batch: %w", err) + } + if post.Index == latest.Index { + return nil + } + } +} + +// syncQueueToTip will sync the `enqueue` transactions to the tip +// from the last known `enqueue` transaction +func (s *SyncService) syncQueueToTip() error { + s.loopLock.Lock() + defer s.loopLock.Unlock() + + for { + latest, err := s.client.GetLatestEnqueue() + if err != nil { + return fmt.Errorf("Cannot get latest enqueue transaction: %w", err) + } + if latest == nil { + log.Info("No enqueue transactions to sync") + return nil + } + latestIndex := latest.GetMeta().QueueIndex + if latestIndex == nil { + return errors.New("Latest queue transaction has no queue index") + } + nextIndex := s.GetNextEnqueueIndex() + log.Info("Syncing enqueue transactions to tip", "start", *latestIndex, "end", nextIndex) + + for i := nextIndex; i <= *latestIndex; i++ { + tx, err := s.client.GetEnqueue(i) + if err != nil { + return fmt.Errorf("Canot get enqueue transaction; %w", err) + } + if tx == nil { + return fmt.Errorf("Cannot get queue tx at index %d", i) + } + err = s.applyTransaction(tx) + if err != nil { + return fmt.Errorf("Cannot apply transaction: %w", err) + } + } + post, err := s.client.GetLatestEnqueue() + if err != nil { + return fmt.Errorf("Cannot get latest transaction: %w", err) + } + postLatestIndex := post.GetMeta().QueueIndex + if postLatestIndex == nil { + return errors.New("Latest queue index is nil") + } + if *latestIndex == *postLatestIndex { + return nil + } + } +} + +// updateEthContext will update the OVM execution context's +// timestamp and blocknumber if enough time has passed since +// it was last updated. This is a sequencer only function. +func (s *SyncService) updateEthContext() error { + context, err := s.client.GetLatestEthContext() + if err != nil { + return fmt.Errorf("Cannot get eth context: %w", err) + } + current := time.Unix(int64(s.GetLatestL1Timestamp()), 0) + next := time.Unix(int64(context.Timestamp), 0) + if next.Sub(current) > s.timestampRefreshThreshold { + log.Info("Updating Eth Context", "timetamp", context.Timestamp, "blocknumber", context.BlockNumber) + s.SetLatestL1BlockNumber(context.BlockNumber) + s.SetLatestL1Timestamp(context.Timestamp) + } + return nil +} + +// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and +// starts sending event to the given channel. +func (s *SyncService) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { + return s.scope.Track(s.txFeed.Subscribe(ch)) +} + +// getRawTransaction will return the raw serialization of the transaction. This +// function will be deprecated in the near future when the batch serialization +// is RLP encoded transactions. func getRawTransaction(tx *types.Transaction) ([]byte, error) { if tx == nil { return nil, errors.New("Cannot process nil transaction") @@ -755,6 +853,7 @@ func getRawTransaction(tx *types.Transaction) ([]byte, error) { return data.Bytes(), nil } +// fillBytes is taken from a newer version of the golang standard library func fillBytes(x *big.Int, size int) []byte { b := x.Bytes() switch { @@ -769,6 +868,8 @@ func fillBytes(x *big.Int, size int) []byte { } } +// getSignatureType is a patch to fix a bug in the contracts. Will be deprecated +// with the move to RLP encoded transactions in batches. func getSignatureType(tx *types.Transaction) uint8 { if tx.SignatureHashType() == 0 { return 0 @@ -797,3 +898,10 @@ func fixType(tx *types.Transaction) *types.Transaction { tx.SetTransactionMeta(fixed) return tx } + +func stringify(i *uint64) string { + if i == nil { + return "" + } + return strconv.FormatUint(*i, 10) +} diff --git a/l2geth/rollup/sync_service_test.go b/l2geth/rollup/sync_service_test.go index 2f3a981682b..523f0d6f47f 100644 --- a/l2geth/rollup/sync_service_test.go +++ b/l2geth/rollup/sync_service_test.go @@ -48,7 +48,7 @@ func TestSyncServiceContextUpdated(t *testing.T) { } // run the update context call once - err := service.updateContext() + err := service.updateEthContext() if err != nil { t.Fatal(err) } @@ -72,7 +72,7 @@ func TestSyncServiceContextUpdated(t *testing.T) { }) // call it again - err = service.updateContext() + err = service.updateEthContext() if err != nil { t.Fatal(err) } @@ -111,7 +111,7 @@ func TestSyncServiceTransactionEnqueued(t *testing.T) { // The queue index of the L1 to L2 transaction queueIndex := uint64(0) // The index in the ctc - index := uint64(5) + index := uint64(0) tx := types.NewTransaction(0, target, big.NewInt(0), gasLimit, big.NewInt(0), data) txMeta := types.NewTransactionMeta( @@ -132,11 +132,7 @@ func TestSyncServiceTransactionEnqueued(t *testing.T) { }, }) - // Run an iteration of the eloop - err = service.sequence() - if err != nil { - t.Fatal("sequencing failed", err) - } + go service.syncQueueToTip() // Wait for the tx to be confirmed into the chain and then // make sure it is the transactions that was set up with in the mockclient @@ -169,8 +165,7 @@ func TestSyncServiceL1GasPrice(t *testing.T) { t.Fatal("expected 0 gas price, got", gasBefore) } - // run 1 iteration of the eloop - service.sequence() + service.updateGasPrice() gasAfter, err := service.L1gpo.SuggestDataPrice(context.Background()) if err != nil { @@ -217,11 +212,7 @@ func TestSyncServiceSync(t *testing.T) { }, }) - err = service.verify() - if err != nil { - t.Fatal("verification failed", err) - } - + go service.syncTransactionsToTip(SyncTypeSequenced) event := <-txCh if len(event.Txs) != 1 { t.Fatal("Unexpected number of transactions") @@ -320,6 +311,7 @@ func newTestSyncService(isVerifier bool) (*SyncService, chan core.NewTxsEvent, e // Set as an empty string as this is a dummy value anyways. // The client needs to be mocked with a mockClient RollupClientHttp: "", + SyncType: SyncTypeSequenced, } service, err := NewSyncService(context.Background(), cfg, txPool, chain, db) @@ -395,7 +387,7 @@ func (m *mockClient) GetLatestEnqueue() (*types.Transaction, error) { return m.getEnqueue[len(m.getEnqueue)-1], nil } -func (m *mockClient) GetTransaction(index uint64) (*types.Transaction, error) { +func (m *mockClient) GetTransaction(index uint64, typ SyncType) (*types.Transaction, error) { if m.getTransactionCallCount < len(m.getTransaction) { tx := m.getTransaction[m.getTransactionCallCount] m.getTransactionCallCount++ @@ -404,7 +396,7 @@ func (m *mockClient) GetTransaction(index uint64) (*types.Transaction, error) { return nil, errors.New("") } -func (m *mockClient) GetLatestTransaction() (*types.Transaction, error) { +func (m *mockClient) GetLatestTransaction(typ SyncType) (*types.Transaction, error) { if len(m.getTransaction) == 0 { return nil, errors.New("") } @@ -428,6 +420,14 @@ func (m *mockClient) GetLastConfirmedEnqueue() (*types.Transaction, error) { return nil, nil } +func (m *mockClient) GetLatestTransactionBatch() (*Batch, []*types.Transaction, error) { + return nil, nil, nil +} + +func (m *mockClient) GetTransactionBatch(index uint64) (*Batch, []*types.Transaction, error) { + return nil, nil, nil +} + func (m *mockClient) SyncStatus() (*SyncStatus, error) { return &SyncStatus{ Syncing: false, diff --git a/l2geth/rollup/types.go b/l2geth/rollup/types.go index b3dbdbbce99..5fee52e502b 100644 --- a/l2geth/rollup/types.go +++ b/l2geth/rollup/types.go @@ -2,11 +2,58 @@ package rollup import ( "bytes" + "fmt" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" ) +// OVMContext represents the blocknumber and timestamp +// that exist during L2 execution +type OVMContext struct { + blockNumber uint64 + timestamp uint64 +} + +// SyncType represents the type of transactions that are being synced. +// The different types have different security models. +type SyncType uint + +func (s SyncType) String() string { + switch s { + case SyncTypeBatched: + return "batched" + case SyncTypeSequenced: + return "sequenced" + default: + return "" + } +} + +func NewSyncType(typ string) (SyncType, error) { + switch typ { + case "batched": + return SyncTypeBatched, nil + case "sequenced": + return SyncTypeSequenced, nil + default: + return 0, fmt.Errorf("Unknown SyncType: %s", typ) + } +} + +const ( + // Batched SyncType involves syncing transactions that have been batched to + // Layer One. Once the transactions have been batched to L1, they cannot be + // removed assuming that they are not reorganized out of the chain. + SyncTypeBatched SyncType = iota + // Sequenced SyncType involves syncing transactions from the sequencer, + // meaning that the transactions may have not been batched to Layer One yet. + // This gives higher latency access to the sequencer data but no guarantees + // around the transactions as they have not been submitted via a batch to + // L1. + SyncTypeSequenced +) + func isCtcTxEqual(a, b *types.Transaction) bool { if a.To() == nil && b.To() != nil { if !bytes.Equal(b.To().Bytes(), common.Address{}.Bytes()) {