Skip to content

Commit

Permalink
wip #4
Browse files Browse the repository at this point in the history
Signed-off-by: Nikolay Nedkov <[email protected]>
  • Loading branch information
Psykepro committed May 4, 2023
1 parent 5256c40 commit 5fa33b4
Show file tree
Hide file tree
Showing 16 changed files with 194 additions and 94 deletions.
2 changes: 1 addition & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func start(cliCtx *cli.Context) error {
}
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
seq := createSequencer(*c, poolInstance, ethTxManagerStorage, st, eventLog)
go seq.Start(ctx)
go seq.Start(ctx, time.Now)
case RPC:
ev.Component = event.Component_RPC
ev.Description = "Running JSON-RPC server"
Expand Down
25 changes: 8 additions & 17 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type finalizer struct {
nextForcedBatchesMux *sync.RWMutex
handlingL2Reorg bool
eventLog *event.EventLog
nowFuncForBatches func() time.Time
}

// WipBatch represents a work-in-progress batch.
Expand All @@ -72,18 +73,7 @@ func (w *WipBatch) isEmpty() bool {
}

// newFinalizer returns a new instance of Finalizer.
func newFinalizer(
cfg FinalizerCfg,
worker workerInterface,
dbManager dbManagerInterface,
executor stateInterface,
sequencerAddr common.Address,
isSynced func(ctx context.Context) bool,
closingSignalCh ClosingSignalCh,
txsStore TxsStore,
batchConstraints batchConstraints,
eventLog *event.EventLog,
) *finalizer {
func newFinalizer(cfg FinalizerCfg, worker workerInterface, dbManager dbManagerInterface, executor stateInterface, sequencerAddr common.Address, isSynced func(ctx context.Context) bool, closingSignalCh ClosingSignalCh, txsStore TxsStore, batchConstraints batchConstraints, eventLog *event.EventLog, nowFuncForBatches func() time.Time) *finalizer {
return &finalizer{
cfg: cfg,
txsStore: txsStore,
Expand All @@ -106,6 +96,7 @@ func newFinalizer(
nextForcedBatchDeadline: 0,
nextForcedBatchesMux: new(sync.RWMutex),
eventLog: eventLog,
nowFuncForBatches: nowFuncForBatches,
}
}

Expand Down Expand Up @@ -189,13 +180,13 @@ func (f *finalizer) listenForClosingSignals(ctx context.Context) {
func (f *finalizer) finalizeBatches(ctx context.Context) {
for {
start := now()
log.Debug("finalizer init loop")
//log.Debug("finalizer init loop")
tx := f.worker.GetBestFittingTx(f.batch.remainingResources)
metrics.WorkerProcessingTime(time.Since(start))
if tx != nil {
// Timestamp resolution
if f.batch.isEmpty() {
f.batch.timestamp = now()
f.batch.timestamp = f.nowFuncForBatches()
}

f.sharedResourcesMux.Lock()
Expand All @@ -207,7 +198,7 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {
f.sharedResourcesMux.Unlock()
} else {
// wait for new txs
log.Debugf("no transactions to be processed. Sleeping for %v", f.cfg.SleepDurationInMs.Duration)
//log.Debugf("no transactions to be processed. Sleeping for %v", f.cfg.SleepDurationInMs.Duration)
if f.cfg.SleepDurationInMs.Duration > 0 {
time.Sleep(f.cfg.SleepDurationInMs.Duration)
}
Expand Down Expand Up @@ -630,7 +621,7 @@ func (f *finalizer) processForcedBatch(lastBatchNumberInState uint64, stateRoot
GlobalExitRoot: forcedBatch.GlobalExitRoot,
Transactions: forcedBatch.RawTxsData,
Coinbase: f.sequencerAddress,
Timestamp: now(),
Timestamp: f.nowFuncForBatches(),
Caller: stateMetrics.SequencerCallerLabel,
}
response, err := f.dbManager.ProcessForcedBatch(forcedBatch.ForcedBatchNumber, request)
Expand Down Expand Up @@ -718,7 +709,7 @@ func (f *finalizer) openBatch(ctx context.Context, num uint64, ger common.Hash,
processingCtx := state.ProcessingContext{
BatchNumber: num,
Coinbase: f.sequencerAddress,
Timestamp: now(),
Timestamp: f.nowFuncForBatches(),
GlobalExitRoot: ger,
}
err := f.dbManager.OpenBatch(ctx, processingCtx, dbTx)
Expand Down
10 changes: 5 additions & 5 deletions sequencer/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ func TestNewFinalizer(t *testing.T) {
//func TestFinalizer_newWIPBatch(t *testing.T) {
// // arrange
// f = setupFinalizer(true)
// now = testNow
// nowFuncForBatches = testNow
// defer func() {
// now = time.Now
// nowFuncForBatches = time.Now
// }()
//
// txs := make([]types.Transaction, 0)
Expand All @@ -132,7 +132,7 @@ func TestNewFinalizer(t *testing.T) {
// coinbase: f.sequencerAddress,
// initialStateRoot: newHash,
// stateRoot: newHash,
// timestamp: uint64(now().Unix()),
// timestamp: uint64(nowFuncForBatches().Unix()),
// remainingResources: getMaxRemainingResources(f.batchConstraints),
// }
// closeBatchParams := ClosingBatchParameters{
Expand Down Expand Up @@ -687,9 +687,9 @@ func TestFinalizer_openBatch(t *testing.T) {
// TestFinalizer_reprocessBatch is a test for reprocessBatch which tests all possible cases of reprocessBatch
func TestFinalizer_reprocessBatch(t *testing.T) {
// arrange
now = testNow
nowFuncForBatches = testNow
defer func() {
now = time.Now
nowFuncForBatches = time.Now
}()
f = setupFinalizer(true)
Expand Down
4 changes: 2 additions & 2 deletions sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func New(cfg Config, txPool txPool, state stateInterface, etherman etherman, man
}

// Start starts the sequencer
func (s *Sequencer) Start(ctx context.Context) {
func (s *Sequencer) Start(ctx context.Context, nowFuncForBatches func() time.Time) {
for !s.isSynced(ctx) {
log.Infof("waiting for synchronizer to sync...")
time.Sleep(s.cfg.WaitPeriodPoolIsEmpty.Duration)
Expand Down Expand Up @@ -154,7 +154,7 @@ func (s *Sequencer) Start(ctx context.Context) {
dbManager := newDBManager(ctx, s.cfg.DBManager, s.pool, s.state, worker, closingSignalCh, txsStore, batchConstraints)
go dbManager.Start()

finalizer := newFinalizer(s.cfg.Finalizer, worker, dbManager, s.state, s.address, s.isSynced, closingSignalCh, txsStore, batchConstraints, s.eventLog)
finalizer := newFinalizer(s.cfg.Finalizer, worker, dbManager, s.state, s.address, s.isSynced, closingSignalCh, txsStore, batchConstraints, s.eventLog, nowFuncForBatches)
currBatch, processingReq := s.bootstrap(ctx, dbManager, finalizer)
go finalizer.Start(ctx, currBatch, processingReq)

Expand Down
12 changes: 8 additions & 4 deletions state/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,9 @@ func (s *State) ProcessBatch(ctx context.Context, request ProcessRequest, update
}

forkID := GetForkIDByBatchNumber(s.cfg.ForkIDIntervals, request.BatchNumber)

// Create Batch
processBatchRequest := &pb.ProcessBatchRequest{
var processBatchRequest = &pb.ProcessBatchRequest{
OldBatchNum: request.BatchNumber - 1,
Coinbase: request.Coinbase.String(),
BatchL2Data: request.Transactions,
Expand Down Expand Up @@ -264,6 +265,10 @@ func (s *State) ExecuteBatch(ctx context.Context, batch Batch, updateMerkleTree
return processBatchResponse, err
}

func uint32ToBool(value uint32) bool {
return value != 0
}

func (s *State) processBatch(ctx context.Context, batchNumber uint64, batchL2Data []byte, caller metrics.CallerLabel, dbTx pgx.Tx) (*pb.ProcessBatchResponse, error) {
if dbTx == nil {
return nil, ErrDBTxNil
Expand Down Expand Up @@ -299,6 +304,7 @@ func (s *State) processBatch(ctx context.Context, batchNumber uint64, batchL2Dat
return nil, ErrInvalidBatchNumber
}
forkID := GetForkIDByBatchNumber(s.cfg.ForkIDIntervals, lastBatch.BatchNumber)

// Create Batch
processBatchRequest := &pb.ProcessBatchRequest{
OldBatchNum: lastBatch.BatchNumber - 1,
Expand All @@ -313,9 +319,7 @@ func (s *State) processBatch(ctx context.Context, batchNumber uint64, batchL2Dat
ForkId: forkID,
}

res, err := s.sendBatchRequestToExecutor(ctx, processBatchRequest, caller)

return res, err
return s.sendBatchRequestToExecutor(ctx, processBatchRequest, caller)
}

func (s *State) sendBatchRequestToExecutor(ctx context.Context, processBatchRequest *pb.ProcessBatchRequest, caller metrics.CallerLabel) (*pb.ProcessBatchResponse, error) {
Expand Down
4 changes: 2 additions & 2 deletions test/config/test.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ TxLifetimeCheckTimeout = "10m"
MaxTxLifetime = "3h"
MaxTxSizeForL1 = 131072
[Sequencer.Finalizer]
GERDeadlineTimeoutInSec = "2s"
GERDeadlineTimeoutInSec = "1000s"
ForcedBatchDeadlineTimeoutInSec = "1s"
SleepDurationInMs = "100ms"
ResourcePercentageToCloseBatch = 10
GERFinalityNumberOfBlocks = 0
ClosingSignalsManagerWaitForCheckingL1Timeout = "10s"
ClosingSignalsManagerWaitForCheckingGER = "10s"
ClosingSignalsManagerWaitForCheckingGER = "10000s"
ClosingSignalsManagerWaitForCheckingForcedBatches = "10s"
ForcedBatchesFinalityNumberOfBlocks = 0
TimestampResolution = "15s"
Expand Down
129 changes: 88 additions & 41 deletions test/e2e/forcedbatches_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package e2e

import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
Expand All @@ -27,7 +26,7 @@ var (
forcedBatchSignatureHash = crypto.Keccak256Hash([]byte("ForceBatch(uint64,bytes32,address,bytes)"))
)

func TestForcedBatchesVector(t *testing.T) {
func TestForcedBatchesVectorFiles(t *testing.T) {

if testing.Short() {
t.Skip()
Expand All @@ -39,43 +38,91 @@ func TestForcedBatchesVector(t *testing.T) {
return err
}
if !info.IsDir() && !strings.HasSuffix(info.Name(), "list.json") {
//defer func() {
// require.NoError(t, operations.Teardown())
//}()

// Load test vectors
fmt.Println(path)
testCase, err := vectors.LoadStateTransitionTestCaseV2(path)
require.NoError(t, err)

opsCfg := operations.GetDefaultOperationsConfig()
opsCfg.State.MaxCumulativeGasUsed = 80000000000
opsman, err := operations.NewManager(ctx, opsCfg)
require.NoError(t, err)

// Setting Genesis
genesisActions := vectors.GenerateGenesisActions(testCase.Genesis)
require.NoError(t, opsman.SetGenesis(genesisActions))
require.NoError(t, opsman.Setup())

// Check initial root
actualOldStateRoot, err := opsman.State().GetLastStateRoot(ctx, nil)
require.NoError(t, err)
require.Equal(t, testCase.ExpectedOldStateRoot, actualOldStateRoot.Hex())
b, err := hex.DecodeHex(testCase.BatchL2Data)
require.NoError(t, err)
txs, txsBytes, err := state.DecodeTxs(b)
require.NoError(t, err)
fmt.Println(txs[0].ChainId())

_, err = sendForcedBatchForVector(t, txsBytes, opsman)
require.NoError(t, err)

// Check new root
actualNewStateRoot, err := opsman.State().GetLastStateRoot(ctx, nil)

require.NoError(t, err)
require.Equal(t, testCase.ExpectedNewStateRoot, actualNewStateRoot.Hex())
t.Run(info.Name(), func(t *testing.T) {

defer func() {
require.NoError(t, operations.Teardown())
}()

// Load test vectors
log.Info("=====================================================================")
log.Info(path)
log.Info("=====================================================================")
testCase, err := vectors.LoadStateTransitionTestCaseV2(path)
require.NoError(t, err)

// TODO: To be used to set the timestamp func of sequencer
//func() time.Time {
// unixTimestamp := int64(1944498031)
// return time.Unix(unixTimestamp, 0)0
//}

opsCfg := operations.GetDefaultOperationsConfig()
// TODO: uncomment when ready to start sequencer from here
//opsCfg.WithoutSequencer = true
opsCfg.State.MaxCumulativeGasUsed = 80000000000
opsman, err := operations.NewManager(ctx, opsCfg)
require.NoError(t, err)

// Setting Genesis
log.Info("[Setting Genesis]")
genesisActions := vectors.GenerateGenesisActions(testCase.Genesis)
require.NoError(t, opsman.SetGenesis(genesisActions))
require.NoError(t, opsman.Setup())
merkleTree := opsman.State().GetTree()

// Check initial root
log.Info("[Checking initial root]")
actualOldStateRoot, err := opsman.State().GetLastStateRoot(ctx, nil)
require.NoError(t, err)
require.Equal(t, testCase.ExpectedOldStateRoot, actualOldStateRoot.Hex())
b, err := hex.DecodeHex(testCase.BatchL2Data)
require.NoError(t, err)
_, txsBytes, err := state.DecodeTxs(b)
require.NoError(t, err)

forcedBatch, err := sendForcedBatchForVector(t, txsBytes, opsman)
require.NoError(t, err)
isClosed, err := opsman.State().IsBatchClosed(ctx, forcedBatch.BatchNumber, nil)
require.NoError(t, err)
// wait until is closed
for !isClosed {
time.Sleep(1 * time.Second)
isClosed, err = opsman.State().IsBatchClosed(ctx, forcedBatch.BatchNumber, nil)
require.NoError(t, err)
if isClosed {
forcedBatch, err = sendForcedBatchForVector(t, txsBytes, opsman)
require.NoError(t, err)
}
}

// Check new root
log.Info("[Checking new root]")
actualNewStateRoot := forcedBatch.StateRoot
require.NoError(t, err)

require.Equal(t, testCase.ExpectedNewStateRoot, actualNewStateRoot.Hex())
log.Info("[Checking new leafs]")
for _, expectedNewLeaf := range testCase.ExpectedNewLeafs {
if expectedNewLeaf.IsSmartContract {
log.Info("Smart Contract Address: ", expectedNewLeaf.Address)
} else {
log.Info("Account Address: ", expectedNewLeaf.Address)
}
actualBalance, err := merkleTree.GetBalance(ctx, common.HexToAddress(expectedNewLeaf.Address), actualNewStateRoot.Bytes())
require.NoError(t, err)
require.Equal(t, expectedNewLeaf.Balance.String(), actualBalance.String())

actualNonce, err := merkleTree.GetNonce(ctx, common.HexToAddress(expectedNewLeaf.Address), actualNewStateRoot.Bytes())
require.NoError(t, err)
require.Equal(t, expectedNewLeaf.Nonce, actualNonce.String())
if expectedNewLeaf.IsSmartContract {
// TODO: Implement leaf properties for smart contracts
}
}
return
})

return nil
}
Expand Down Expand Up @@ -128,7 +175,7 @@ func sendForcedBatchForVector(t *testing.T, txs []byte, opsman *operations.Manag
tx, err := zkEvm.ForceBatch(auth, txs, tip)
require.NoError(t, err)

log.Info("TxHash: ", tx.Hash())
log.Info("Forced Batch Submit to L1 TxHash: ", tx.Hash())
time.Sleep(1 * time.Second)

err = operations.WaitTxToBeMined(ctx, ethClient, tx, operations.DefaultTimeoutTxToBeMined)
Expand Down Expand Up @@ -170,8 +217,8 @@ func sendForcedBatchForVector(t *testing.T, txs []byte, opsman *operations.Manag
require.NoError(t, err)
require.NotNil(t, forcedBatch)

err = operations.WaitBatchToBeVirtualized(forcedBatch.BatchNumber, 4*time.Minute, st)
require.NoError(t, err)
//err = operations.WaitBatchToBeVirtualized(forcedBatch.BatchNumber, 4*time.Minute, st)
//require.NoError(t, err)
}

return forcedBatch, nil
Expand Down
Loading

0 comments on commit 5fa33b4

Please sign in to comment.