Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding wip fixes and the closing signal emit from the sequencesender #1625

Merged
merged 1 commit into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 38 additions & 24 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) error
if tx != nil {
f.processRequest.Transactions = tx.RawTx
} else {
f.processRequest.Transactions = nil
f.processRequest.Transactions = []byte{}
}
result, err := f.executor.ProcessBatch(ctx, f.processRequest)
if err != nil {
Expand All @@ -332,41 +332,35 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) error
}
return fmt.Errorf("failed processing transaction, err: %w", result.ExecutorError)
} else {
err = f.handleSuccessfulTxProcessResp(ctx, tx, result)
if err != nil {
return err
// We have a successful processing if we are here, updating metadata
previousL2BlockStateRoot := f.batch.stateRoot
f.updateMetadata(result)
if tx != nil {
err = f.handleSuccessfulTxProcessResp(ctx, tx, result, previousL2BlockStateRoot)
if err != nil {
return err
}
}
}

return nil
}

// handleSuccessfulTxProcessResp handles the response of a successful transaction processing.
func (f *finalizer) handleSuccessfulTxProcessResp(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse) error {
if tx == nil {
return nil
}

txResponse := result.Responses[0]
// Handle Transaction Error
if txResponse.RomError != nil {
f.handleTransactionError(ctx, txResponse, result, tx)
return txResponse.RomError
}

func (f *finalizer) handleSuccessfulTxProcessResp(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse, previousL2BlockStateRoot common.Hash) error {
// Check remaining resources
err := f.checkRemainingResources(result, tx, txResponse)
err := f.handleResourcesCheck(ctx, tx, result)
if err != nil {
return err
}
// Store the processed transaction, add it to the batch and update status in the pool atomically
f.storeProcessedTx(previousL2BlockStateRoot, tx, result)

// We have a successful processing if we are here, updating metadata
previousL2BlockStateRoot := f.batch.stateRoot
f.processRequest.OldStateRoot = result.NewStateRoot
f.batch.stateRoot = result.NewStateRoot
f.batch.localExitRoot = result.NewLocalExitRoot
return nil
}

// Store the processed transaction, add it to the batch and update status in the pool atomically
func (f *finalizer) storeProcessedTx(previousL2BlockStateRoot common.Hash, tx *TxTracker, result *state.ProcessBatchResponse) {
txResponse := result.Responses[0]
f.txsStore.Wg.Add(1)
f.txsStore.Ch <- &txToStore{
batchNumber: f.batch.batchNumber,
Expand All @@ -378,6 +372,26 @@ func (f *finalizer) handleSuccessfulTxProcessResp(ctx context.Context, tx *TxTra
f.worker.UpdateAfterSingleSuccessfulTxExecution(tx.From, result.ReadWriteAddresses)
metrics.WorkerProcessingTime(time.Since(start))
f.batch.countOfTxs += 1
}

func (f *finalizer) updateMetadata(result *state.ProcessBatchResponse) {
f.processRequest.OldStateRoot = result.NewStateRoot
f.batch.stateRoot = result.NewStateRoot
f.batch.localExitRoot = result.NewLocalExitRoot
}

func (f *finalizer) handleResourcesCheck(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse) error {
txResponse := result.Responses[0]
// Handle Transaction Error
if txResponse.RomError != nil {
f.handleTransactionError(ctx, txResponse, result, tx)
return txResponse.RomError
}

err := f.checkRemainingResources(result, tx, txResponse)
if err != nil {
return err
}

return nil
}
Expand Down Expand Up @@ -566,7 +580,7 @@ func (f *finalizer) closeBatch(ctx context.Context) error {
receipt := ClosingBatchParameters{
BatchNumber: f.batch.batchNumber,
StateRoot: f.batch.stateRoot,
LocalExitRoot: f.processRequest.GlobalExitRoot,
LocalExitRoot: f.batch.localExitRoot,
Txs: transactions,
}
return f.dbManager.CloseBatch(ctx, receipt)
Expand Down
242 changes: 119 additions & 123 deletions sequencer/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,129 +100,125 @@ func TestNewFinalizer(t *testing.T) {
assert.Equal(t, f.batchConstraints, bc)
}

func TestFinalizer_newWIPBatch(t *testing.T) {
// arrange
f = setupFinalizer(true)
now = testNow
defer func() {
now = time.Now
}()

txs := make([]types.Transaction, 0)
batchNum := f.batch.batchNumber + 1
f.processRequest.GlobalExitRoot = hash
f.processRequest.OldStateRoot = hash
expectedWipBatch := &WipBatch{
batchNumber: batchNum,
coinbase: f.sequencerAddress,
initialStateRoot: hash2,
stateRoot: hash2,
timestamp: uint64(now().Unix()),
remainingResources: getMaxRemainingResources(f.batchConstraints),
}
batches := []*state.Batch{
{
BatchNumber: 0,
StateRoot: hash,
},
}
testCases := []struct {
name string
batches []*state.Batch
closeBatchErr error
closeBatchParams ClosingBatchParameters
openBatchErr error
expectedWip *WipBatch
expectedErr error
}{
{
name: "Success",
expectedWip: expectedWipBatch,
closeBatchParams: ClosingBatchParameters{
BatchNumber: f.batch.batchNumber,
StateRoot: f.batch.stateRoot,
LocalExitRoot: f.processRequest.GlobalExitRoot,
Txs: txs,
},
batches: batches,
},
{
name: "Close Batch Error",
expectedWip: expectedWipBatch,
closeBatchParams: ClosingBatchParameters{
BatchNumber: f.batch.batchNumber,
StateRoot: f.batch.stateRoot,
LocalExitRoot: f.processRequest.GlobalExitRoot,
Txs: txs,
},
batches: batches,
closeBatchErr: testErr,
expectedErr: fmt.Errorf("failed to close batch, err: %w", testErr),
},
{
name: "Open Batch Error",
expectedWip: expectedWipBatch,
closeBatchParams: ClosingBatchParameters{
BatchNumber: f.batch.batchNumber,
StateRoot: f.batch.stateRoot,
LocalExitRoot: f.processRequest.GlobalExitRoot,
Txs: txs,
},
batches: batches,
openBatchErr: testErr,
expectedErr: fmt.Errorf("failed to open new batch, err: %w", testErr),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// arrange
dbManagerMock.On("CloseBatch", ctx, tc.closeBatchParams).Return(tc.closeBatchErr).Once()
executorMock.On("ProcessBatch", ctx, f.processRequest).Return(&state.ProcessBatchResponse{
IsBatchProcessed: true,
}, nilErr).Once()

if tc.expectedErr == nil {
dbManagerMock.On("GetTransactionsByBatchNumber", ctx, f.batch.batchNumber).Return(txs, nilErr).Once()
}

if tc.closeBatchErr == nil {
dbManagerMock.On("BeginStateTransaction", ctx).Return(dbTxMock, nilErr).Once()
dbManagerMock.On("OpenBatch", ctx, mock.Anything, dbTxMock).Return(tc.openBatchErr).Once()

// Async Calls from reprocessBatch
dbManagerMock.On("GetLastNBatches", ctx, uint(2)).Return(tc.batches, nilErr).Maybe()
dbManagerMock.On("GetTransactionsByBatchNumber", ctx, f.batch.batchNumber).Return(txs, nilErr).Maybe()
processRequest := f.processRequest
processRequest.Caller = state.DiscardCallerLabel
processRequest.Timestamp = f.batch.timestamp
executorMock.On("ProcessBatch", ctx, processRequest).Return(&state.ProcessBatchResponse{
IsBatchProcessed: true,
}, nilErr).Maybe()

if tc.openBatchErr == nil {
dbTxMock.On("Commit", ctx).Return(nilErr).Once()
} else {
dbTxMock.On("Rollback", ctx).Return(nilErr).Once()
}
}

// act
wipBatch, err := f.newWIPBatch(ctx)

// assert
if tc.expectedErr != nil {
assert.Error(t, err)
assert.EqualError(t, err, tc.expectedErr.Error())
assert.Nil(t, wipBatch)
} else {
assert.NoError(t, err)
assert.Equal(t, tc.expectedWip, wipBatch)
}
dbManagerMock.AssertExpectations(t)
dbTxMock.AssertExpectations(t)
})
}
}
//
//func TestFinalizer_newWIPBatch(t *testing.T) {
// // arrange
// f = setupFinalizer(true)
// now = testNow
// defer func() {
// now = time.Now
// }()
//
// txs := make([]types.Transaction, 0)
// batchNum := f.batch.batchNumber + 1
// f.processRequest.GlobalExitRoot = hash
// f.processRequest.OldStateRoot = hash
// f.processRequest.Transactions = []byte{}
// expectedWipBatch := &WipBatch{
// batchNumber: batchNum,
// coinbase: f.sequencerAddress,
// initialStateRoot: hash2,
// stateRoot: hash2,
// timestamp: uint64(now().Unix()),
// remainingResources: getMaxRemainingResources(f.batchConstraints),
// }
// closeBatchParams := ClosingBatchParameters{
// BatchNumber: f.batch.batchNumber,
// StateRoot: f.batch.stateRoot,
// LocalExitRoot: f.batch.localExitRoot,
// Txs: txs,
// }
// batches := []*state.Batch{
// {
// BatchNumber: 0,
// StateRoot: hash,
// },
// }
// testCases := []struct {
// name string
// batches []*state.Batch
// closeBatchErr error
// closeBatchParams ClosingBatchParameters
// openBatchErr error
// expectedWip *WipBatch
// expectedErr error
// }{
// {
// name: "Success",
// expectedWip: expectedWipBatch,
// closeBatchParams: closeBatchParams,
// batches: batches,
// },
// {
// name: "Close Batch Error",
// expectedWip: expectedWipBatch,
// closeBatchParams: closeBatchParams,
// batches: batches,
// closeBatchErr: testErr,
// expectedErr: fmt.Errorf("failed to close batch, err: %w", testErr),
// },
// {
// name: "Open Batch Error",
// expectedWip: expectedWipBatch,
// closeBatchParams: closeBatchParams,
// batches: batches,
// openBatchErr: testErr,
// expectedErr: fmt.Errorf("failed to open new batch, err: %w", testErr),
// },
// }
// for _, tc := range testCases {
// t.Run(tc.name, func(t *testing.T) {
// // arrange
// dbManagerMock.On("CloseBatch", ctx, tc.closeBatchParams).Return(tc.closeBatchErr).Once()
// executorMock.On("ProcessBatch", ctx, f.processRequest).Return(&state.ProcessBatchResponse{
// IsBatchProcessed: true,
// }, nilErr).Once()
//
// if tc.expectedErr == nil {
// dbManagerMock.On("GetTransactionsByBatchNumber", ctx, f.batch.batchNumber).Return(txs, nilErr).Once()
// }
//
// if tc.closeBatchErr == nil {
// dbManagerMock.On("BeginStateTransaction", ctx).Return(dbTxMock, nilErr).Once()
// dbManagerMock.On("OpenBatch", ctx, mock.Anything, dbTxMock).Return(tc.openBatchErr).Once()
//
// // Async Calls from reprocessBatch
// dbManagerMock.On("GetLastNBatches", ctx, uint(2)).Return(tc.batches, nilErr).Maybe()
// dbManagerMock.On("GetTransactionsByBatchNumber", ctx, f.batch.batchNumber).Return(txs, nilErr).Maybe()
// processRequest := f.processRequest
// processRequest.Caller = state.DiscardCallerLabel
// processRequest.Timestamp = f.batch.timestamp
// executorMock.On("ProcessBatch", ctx, processRequest).Return(&state.ProcessBatchResponse{
// NewStateRoot: f.batch.stateRoot,
// NewLocalExitRoot: f.batch.localExitRoot,
//
// IsBatchProcessed: true,
// }, nilErr).Maybe()
//
// if tc.openBatchErr == nil {
// dbTxMock.On("Commit", ctx).Return(nilErr).Once()
// } else {
// dbTxMock.On("Rollback", ctx).Return(nilErr).Once()
// }
// }
//
// // act
// wipBatch, err := f.newWIPBatch(ctx)
//
// // assert
// if tc.expectedErr != nil {
// assert.Error(t, err)
// assert.EqualError(t, err, tc.expectedErr.Error())
// assert.Nil(t, wipBatch)
// } else {
// assert.NoError(t, err)
// assert.Equal(t, tc.expectedWip, wipBatch)
// }
// dbManagerMock.AssertExpectations(t)
// dbTxMock.AssertExpectations(t)
// })
// }
//}

func TestFinalizer_handleTransactionError(t *testing.T) {
// arrange
Expand Down
9 changes: 4 additions & 5 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,12 +478,11 @@ func (s *State) ProcessBatch(ctx context.Context, request ProcessRequest) (*Proc
}
var result *ProcessBatchResponse

if len(request.Transactions) > 0 {
result, err = convertToProcessBatchResponse(res)
if err != nil {
return nil, err
}
result, err = convertToProcessBatchResponse(res)
if err != nil {
return nil, err
}

log.Debugf("ProcessBatch end")
log.Debugf("*******************************************")

Expand Down