diff --git a/op-acceptance-tests/tests/supernode/interop/activation/activation_after_genesis_test.go b/op-acceptance-tests/tests/supernode/interop/activation/activation_after_genesis_test.go index 4abc6fd1da5..305763e25c2 100644 --- a/op-acceptance-tests/tests/supernode/interop/activation/activation_after_genesis_test.go +++ b/op-acceptance-tests/tests/supernode/interop/activation/activation_after_genesis_test.go @@ -47,7 +47,7 @@ func TestSupernodeInteropActivationAfterGenesis(gt *testing.T) { // Check pre-activation timestamp preActivationResp, err = snClient.SuperRootAtTimestamp(ctx, preActivationTs) if err != nil { - t.Logger().Debug("superroot_atTimestamp error for pre-activation", "timestamp", preActivationTs, "err", err) + t.Logger().Warn("superroot_atTimestamp error for pre-activation", "timestamp", preActivationTs, "err", err) return false } preVerified := preActivationResp.Data != nil @@ -55,12 +55,12 @@ func TestSupernodeInteropActivationAfterGenesis(gt *testing.T) { // Check post-activation timestamp postActivationResp, err = snClient.SuperRootAtTimestamp(ctx, postActivationTs) if err != nil { - t.Logger().Debug("superroot_atTimestamp error for post-activation", "timestamp", postActivationTs, "err", err) + t.Logger().Warn("superroot_atTimestamp error for post-activation", "timestamp", postActivationTs, "err", err) return false } postVerified := postActivationResp.Data != nil - t.Logger().Debug("waiting for both timestamps to be verified", + t.Logger().Info("waiting for both timestamps to be verified", "pre_activation_ts", preActivationTs, "pre_verified", preVerified, "post_activation_ts", postActivationTs, diff --git a/op-acceptance-tests/tests/supernode/interop/halt/init_test.go b/op-acceptance-tests/tests/supernode/interop/halt/init_test.go new file mode 100644 index 00000000000..e93e0029d80 --- /dev/null +++ b/op-acceptance-tests/tests/supernode/interop/halt/init_test.go @@ -0,0 +1,15 @@ +package halt + +import ( + "os" + "testing" + + "github.com/ethereum-optimism/optimism/op-devstack/presets" +) + +// TestMain creates an isolated two-L2 setup with a shared supernode that has interop enabled. +// This package tests invalid message scenarios that would pollute other tests if run on a shared devnet. +func TestMain(m *testing.M) { + _ = os.Setenv("DEVSTACK_L2CL_KIND", "supernode") + presets.DoMain(m, presets.WithTwoL2SupernodeInterop(0)) +} diff --git a/op-acceptance-tests/tests/supernode/interop/halt/invalid_message_halt_test.go b/op-acceptance-tests/tests/supernode/interop/halt/invalid_message_halt_test.go new file mode 100644 index 00000000000..c6ae93bc1ff --- /dev/null +++ b/op-acceptance-tests/tests/supernode/interop/halt/invalid_message_halt_test.go @@ -0,0 +1,251 @@ +package halt + +import ( + "context" + "math/rand" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + + "github.com/ethereum-optimism/optimism/devnet-sdk/contracts/constants" + "github.com/ethereum-optimism/optimism/op-devstack/devtest" + "github.com/ethereum-optimism/optimism/op-devstack/dsl" + "github.com/ethereum-optimism/optimism/op-devstack/presets" + "github.com/ethereum-optimism/optimism/op-service/bigs" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/testutils" + "github.com/ethereum-optimism/optimism/op-service/txintent" +) + +// TestSupernodeInteropInvalidMessageHalt tests that: +// WHEN: an invalid Executing Message is included in a chain +// THEN: +// - Validity Never Advances to include the Invalid Block +// - Local Safety and Unsafety for both chains continue to advance +// +// This is a TDD test that starts a cycle to implement the Interop Activity's actual algorithm. +func TestSupernodeInteropInvalidMessageHalt(gt *testing.T) { + t := devtest.SerialT(gt) + sys := presets.NewTwoL2SupernodeInterop(t, 0) + + ctx := t.Ctx() + snClient := sys.SuperNodeClient() + + // Create funded EOAs on both chains + alice := sys.FunderA.NewFundedEOA(eth.OneEther) + bob := sys.FunderB.NewFundedEOA(eth.OneEther) + + // Deploy event logger on chain A + eventLoggerA := alice.DeployEventLogger() + + // Sync chains + sys.L2B.CatchUpTo(sys.L2A) + sys.L2A.CatchUpTo(sys.L2B) + + rng := rand.New(rand.NewSource(12345)) + + // Send an initiating message on chain A + initTrigger := randomInitTrigger(rng, eventLoggerA, 2, 10) + initTx, initReceipt := alice.SendInitMessage(initTrigger) + + t.Logger().Info("initiating message sent on chain A", + "block", initReceipt.BlockNumber, + "hash", initReceipt.BlockHash, + ) + + // Wait for chain B to catch up + sys.L2B.WaitForBlock() + + // Record the verified timestamp before the invalid message + // We need to know what timestamp was verified before the invalid exec message + blockTime := sys.L2A.Escape().RollupConfig().BlockTime + genesisTime := sys.L2A.Escape().RollupConfig().Genesis.L2Time + + // Wait for some timestamps to be verified first + targetTimestamp := genesisTime + blockTime*2 + t.Require().Eventually(func() bool { + resp, err := snClient.SuperRootAtTimestamp(ctx, targetTimestamp) + if err != nil { + return false + } + t.Logger().Info("super root at timestamp", "timestamp", targetTimestamp, "data", resp.Data) + return resp.Data != nil + }, 60*time.Second, time.Second, "initial timestamps should be verified") + + t.Logger().Info("initial verification confirmed", "timestamp", targetTimestamp) + + // Send an INVALID executing message on chain B + // Modify the message identifier to make it invalid (wrong log index) + invalidExecReceipt := sendInvalidExecMessage(t, bob, initTx, 0) + + invalidBlockNumber := bigs.Uint64Strict(invalidExecReceipt.BlockNumber) + invalidBlock := sys.L2ELB.BlockRefByHash(invalidExecReceipt.BlockHash) + invalidBlockTimestamp := invalidBlock.Time + + t.Logger().Info("invalid executing message sent on chain B", + "block", invalidExecReceipt.BlockNumber, + "hash", invalidExecReceipt.BlockHash, + "timestamp", invalidBlockTimestamp, + ) + + // Record the safety status before waiting + initialStatusA := sys.L2ACL.SyncStatus() + initialStatusB := sys.L2BCL.SyncStatus() + + t.Logger().Info("initial safety status", + "chainA_local_safe", initialStatusA.LocalSafeL2.Number, + "chainA_unsafe", initialStatusA.UnsafeL2.Number, + "chainB_local_safe", initialStatusB.LocalSafeL2.Number, + "chainB_unsafe", initialStatusB.UnsafeL2.Number, + ) + + // Now we verify the key behaviors over time: + // 1. Validity should NEVER advance to include the invalid block + // 2. Local Safety and Unsafety should continue to advance for both chains + + observationDuration := 30 * time.Second + checkInterval := time.Second + + start := time.Now() + var lastVerifiedTimestamp uint64 + + for time.Since(start) < observationDuration { + time.Sleep(checkInterval) + + // Check current safety status + statusA := sys.L2ACL.SyncStatus() + statusB := sys.L2BCL.SyncStatus() + + // KEY ASSERTION 1: Validity should NOT advance past the invalid block's timestamp + // Check if the invalid block's timestamp has been verified (it should NOT be) + resp, err := snClient.SuperRootAtTimestamp(ctx, invalidBlockTimestamp) + t.Require().NoError(err, "SuperRootAtTimestamp should not error") + + if resp.Data != nil { + t.Logger().Error("UNEXPECTED: invalid block timestamp was verified!", + "timestamp", invalidBlockTimestamp, + "invalid_block", invalidBlockNumber, + ) + t.FailNow() + } + + // Track the last verified timestamp (for timestamps before the invalid block) + if invalidBlockTimestamp > blockTime { + checkTs := invalidBlockTimestamp - blockTime + checkResp, _ := snClient.SuperRootAtTimestamp(ctx, checkTs) + if checkResp.Data != nil { + lastVerifiedTimestamp = checkTs + } + } + + t.Logger().Info("observation tick", + "elapsed", time.Since(start).Round(time.Second), + "chainA_local_safe", statusA.LocalSafeL2.Number, + "chainA_unsafe", statusA.UnsafeL2.Number, + "chainB_local_safe", statusB.LocalSafeL2.Number, + "chainB_unsafe", statusB.UnsafeL2.Number, + "last_verified_ts", lastVerifiedTimestamp, + "invalid_block_ts", invalidBlockTimestamp, + ) + } + + // Final assertions after observation period + + finalStatusA := sys.L2ACL.SyncStatus() + finalStatusB := sys.L2BCL.SyncStatus() + + // ASSERTION: Local Safety should have advanced for both chains + t.Require().Greater(finalStatusA.LocalSafeL2.Number, initialStatusA.LocalSafeL2.Number, + "chain A local safe head should advance") + t.Require().Greater(finalStatusB.LocalSafeL2.Number, initialStatusB.LocalSafeL2.Number, + "chain B local safe head should advance") + + // ASSERTION: Unsafety should have advanced for both chains + t.Require().Greater(finalStatusA.UnsafeL2.Number, initialStatusA.UnsafeL2.Number, + "chain A unsafe head should advance") + t.Require().Greater(finalStatusB.UnsafeL2.Number, initialStatusB.UnsafeL2.Number, + "chain B unsafe head should advance") + + // ASSERTION: The invalid block's timestamp should still NOT be verified + finalResp, err := snClient.SuperRootAtTimestamp(ctx, invalidBlockTimestamp) + t.Require().NoError(err) + t.Require().Nil(finalResp.Data, + "invalid block timestamp should NEVER be verified") + + t.Logger().Info("test complete: invalid message correctly halted validity advancement", + "final_chainA_local_safe", finalStatusA.LocalSafeL2.Number, + "final_chainA_unsafe", finalStatusA.UnsafeL2.Number, + "final_chainB_local_safe", finalStatusB.LocalSafeL2.Number, + "final_chainB_unsafe", finalStatusB.UnsafeL2.Number, + "invalid_block_timestamp", invalidBlockTimestamp, + "last_verified_timestamp", lastVerifiedTimestamp, + ) +} + +// sendInvalidExecMessage sends an executing message with a modified (invalid) identifier. +// This makes the message invalid because it references a non-existent log index. +func sendInvalidExecMessage( + t devtest.T, + bob *dsl.EOA, + initIntent *txintent.IntentTx[*txintent.InitTrigger, *txintent.InteropOutput], + eventIdx int, +) *types.Receipt { + ctx := t.Ctx() + + // Evaluate the init result to get the message entries + result, err := initIntent.Result.Eval(ctx) + t.Require().NoError(err, "failed to evaluate init result") + t.Require().Greater(len(result.Entries), eventIdx, "event index out of range") + + // Get the message and modify it to be invalid + msg := result.Entries[eventIdx] + + // Make the message invalid by setting an impossible log index + // This creates a message that claims to reference a log that doesn't exist + msg.Identifier.LogIndex = 9999 + + // Create the exec trigger with the invalid message + execTrigger := &txintent.ExecTrigger{ + Executor: constants.CrossL2Inbox, + Msg: msg, + } + + // Create the intent with the invalid trigger + tx := txintent.NewIntent[*txintent.ExecTrigger, *txintent.InteropOutput](bob.Plan()) + tx.Content.DependOn(&initIntent.Result) + tx.Content.Fn(func(ctx context.Context) (*txintent.ExecTrigger, error) { + return execTrigger, nil + }) + + receipt, err := tx.PlannedTx.Included.Eval(ctx) + t.Require().NoError(err, "invalid exec msg receipt not found") + t.Logger().Info("invalid exec message included", "chain", bob.ChainID(), "block", receipt.BlockNumber) + + return receipt +} + +// randomInitTrigger creates a random init trigger for testing. +func randomInitTrigger(rng *rand.Rand, eventLoggerAddress common.Address, topicCount, dataLen int) *txintent.InitTrigger { + if topicCount > 4 { + topicCount = 4 // Max 4 topics in EVM logs + } + if topicCount < 1 { + topicCount = 1 + } + if dataLen < 1 { + dataLen = 1 + } + + topics := make([][32]byte, topicCount) + for i := range topics { + copy(topics[i][:], testutils.RandomData(rng, 32)) + } + + return &txintent.InitTrigger{ + Emitter: eventLoggerAddress, + Topics: topics, + OpaqueData: testutils.RandomData(rng, dataLen), + } +} diff --git a/op-supernode/supernode/activity/interop/algo.go b/op-supernode/supernode/activity/interop/algo.go new file mode 100644 index 00000000000..48dabc0508a --- /dev/null +++ b/op-supernode/supernode/activity/interop/algo.go @@ -0,0 +1,153 @@ +package interop + +import ( + "errors" + "fmt" + + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" +) + +// ExpiryTime is the maximum age of an initiating message that can be executed. +// Messages older than this are considered expired and invalid. +// 7 days = 7 * 24 * 60 * 60 = 604800 seconds +const ExpiryTime = 604800 + +var ( + // ErrUnknownChain is returned when an executing message references + // a chain that is not registered with the interop activity. + ErrUnknownChain = errors.New("unknown chain") + + // ErrTimestampViolation is returned when an executing message references + // an initiating message with a timestamp >= the executing message's timestamp. + ErrTimestampViolation = errors.New("initiating message timestamp must be less than executing message timestamp") + + // ErrMessageExpired is returned when an executing message references + // an initiating message that has expired (older than ExpiryTime). + ErrMessageExpired = errors.New("initiating message has expired") +) + +// verifyInteropMessages validates all executing messages at the given timestamp. +// Returns a Result indicating whether all messages are valid or which chains have invalid blocks. +// +// For each chain: +// 1. Open the block from the logsDB and verify it matches blocksAtTimestamp +// 2. For each executing message in the block: +// - Verify the initiating message exists in the source chain's logsDB +// - Verify the initiating message timestamp < executing message timestamp +// - Verify the initiating message hasn't expired (within ExpiryTime) +func (i *Interop) verifyInteropMessages(ts uint64, blocksAtTimestamp map[eth.ChainID]eth.BlockID) (Result, error) { + result := Result{ + Timestamp: ts, + L2Heads: make(map[eth.ChainID]eth.BlockID), + InvalidHeads: make(map[eth.ChainID]eth.BlockID), + } + + for chainID, expectedBlock := range blocksAtTimestamp { + db, ok := i.logsDBs[chainID] + if !ok { + // Skip chains that we don't have a logsDB for + // This can happen if blocksAtTimestamp includes chains not registered with the interop activity + continue + } + + // Get the block from the logsDB + blockRef, _, execMsgs, err := db.OpenBlock(expectedBlock.Number) + if err != nil { + // OpenBlock fails for the first block in the DB because it tries to find the parent. + // Handle this by checking if this is the first sealed block and using FirstSealedBlock instead. + if errors.Is(err, types.ErrSkipped) { + firstBlock, firstErr := db.FirstSealedBlock() + if firstErr != nil { + return Result{}, fmt.Errorf("chain %s: failed to open block %d and failed to get first block: %w", chainID, expectedBlock.Number, err) + } + if firstBlock.Number == expectedBlock.Number { + // This is the first block in the logsDB. Use FirstSealedBlock info. + // The first block has no executing messages (since we can't verify them without prior data). + if firstBlock.Hash != expectedBlock.Hash { + i.log.Warn("first block hash mismatch", + "chain", chainID, + "expected", expectedBlock.Hash, + "got", firstBlock.Hash, + ) + result.InvalidHeads[chainID] = expectedBlock + } + result.L2Heads[chainID] = expectedBlock + continue + } + } + return Result{}, fmt.Errorf("chain %s: failed to open block %d: %w", chainID, expectedBlock.Number, err) + } + + // Verify the block hash matches what we expect + if blockRef.Hash != expectedBlock.Hash { + i.log.Warn("block hash mismatch", + "chain", chainID, + "expected", expectedBlock.Hash, + "got", blockRef.Hash, + ) + result.InvalidHeads[chainID] = expectedBlock + result.L2Heads[chainID] = expectedBlock + continue + } + + // Verify each executing message + blockValid := true + for logIdx, execMsg := range execMsgs { + if err := i.verifyExecutingMessage(chainID, blockRef.Time, logIdx, execMsg); err != nil { + i.log.Warn("invalid executing message", + "chain", chainID, + "block", expectedBlock.Number, + "logIdx", logIdx, + "execMsg", execMsg, + "err", err, + ) + blockValid = false + break + } + } + + result.L2Heads[chainID] = expectedBlock + if !blockValid { + result.InvalidHeads[chainID] = expectedBlock + } + } + + return result, nil +} + +// verifyExecutingMessage verifies a single executing message by checking: +// 1. The initiating message exists in the source chain's database +// 2. The initiating message's timestamp is less than the executing block's timestamp +// 3. The initiating message hasn't expired (timestamp + ExpiryTime >= executing timestamp) +func (i *Interop) verifyExecutingMessage(executingChain eth.ChainID, executingTimestamp uint64, logIdx uint32, execMsg *types.ExecutingMessage) error { + // Get the source chain's logsDB + sourceDB, ok := i.logsDBs[execMsg.ChainID] + if !ok { + return fmt.Errorf("source chain %s not found: %w", execMsg.ChainID, ErrUnknownChain) + } + + // Verify timestamp ordering: initiating message timestamp must be < executing block timestamp + if execMsg.Timestamp >= executingTimestamp { + return fmt.Errorf("initiating timestamp %d >= executing timestamp %d: %w", + execMsg.Timestamp, executingTimestamp, ErrTimestampViolation) + } + + // Verify the message hasn't expired: initiating timestamp + ExpiryTime must be >= executing timestamp + if execMsg.Timestamp+ExpiryTime < executingTimestamp { + return fmt.Errorf("initiating timestamp %d + expiry %d < executing timestamp %d: %w", + execMsg.Timestamp, ExpiryTime, executingTimestamp, ErrMessageExpired) + } + + // Build the query for the initiating message + query := types.ContainsQuery{ + BlockNum: execMsg.BlockNum, + LogIdx: execMsg.LogIdx, + Timestamp: execMsg.Timestamp, + Checksum: execMsg.Checksum, + } + + // Check if the initiating message exists in the source chain's logsDB + _, err := sourceDB.Contains(query) + return err +} diff --git a/op-supernode/supernode/activity/interop/algo_test.go b/op-supernode/supernode/activity/interop/algo_test.go new file mode 100644 index 00000000000..fd03926b3a3 --- /dev/null +++ b/op-supernode/supernode/activity/interop/algo_test.go @@ -0,0 +1,572 @@ +package interop + +import ( + "errors" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + gethlog "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" + "github.com/stretchr/testify/require" + + "github.com/ethereum-optimism/optimism/op-service/eth" + suptypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" +) + +// ============================================================================= +// TestVerifyInteropMessages_ValidBlocks +// ============================================================================= + +func TestVerifyInteropMessages_ValidBlocks(t *testing.T) { + t.Parallel() + + t.Run("block with no executing messages is valid", func(t *testing.T) { + t.Parallel() + + chainID := eth.ChainIDFromUInt64(10) + blockHash := common.HexToHash("0x123") + expectedBlock := eth.BlockID{Number: 100, Hash: blockHash} + + mockDB := &algoMockLogsDB{ + openBlockRef: eth.BlockRef{Hash: blockHash, Number: 100, Time: 1000}, + openBlockExecMsg: nil, + } + + interop := &Interop{ + log: gethlog.New(), + logsDBs: map[eth.ChainID]LogsDB{chainID: mockDB}, + } + + result, err := interop.verifyInteropMessages(1000, map[eth.ChainID]eth.BlockID{ + chainID: expectedBlock, + }) + + require.NoError(t, err) + require.True(t, result.IsValid()) + require.Empty(t, result.InvalidHeads) + require.Equal(t, expectedBlock, result.L2Heads[chainID]) + }) + + t.Run("valid executing message passes verification", func(t *testing.T) { + t.Parallel() + + sourceChainID := eth.ChainIDFromUInt64(10) + destChainID := eth.ChainIDFromUInt64(8453) + + sourceBlockHash := common.HexToHash("0xSource") + destBlockHash := common.HexToHash("0xDest") + + sourceBlock := eth.BlockID{Number: 50, Hash: sourceBlockHash} + destBlock := eth.BlockID{Number: 100, Hash: destBlockHash} + + execMsg := &suptypes.ExecutingMessage{ + ChainID: sourceChainID, + BlockNum: 50, + LogIdx: 0, + Timestamp: 500, // Source timestamp < dest timestamp (1000) + Checksum: suptypes.MessageChecksum{0x01}, + } + + sourceDB := &algoMockLogsDB{ + openBlockRef: eth.BlockRef{Hash: sourceBlockHash, Number: 50, Time: 500}, + containsSeal: suptypes.BlockSeal{Number: 50, Timestamp: 500}, + } + + destDB := &algoMockLogsDB{ + openBlockRef: eth.BlockRef{Hash: destBlockHash, Number: 100, Time: 1000}, + openBlockExecMsg: map[uint32]*suptypes.ExecutingMessage{ + 0: execMsg, + }, + } + + interop := &Interop{ + log: gethlog.New(), + logsDBs: map[eth.ChainID]LogsDB{ + sourceChainID: sourceDB, + destChainID: destDB, + }, + } + + result, err := interop.verifyInteropMessages(1000, map[eth.ChainID]eth.BlockID{ + sourceChainID: sourceBlock, + destChainID: destBlock, + }) + + require.NoError(t, err) + require.True(t, result.IsValid()) + require.Empty(t, result.InvalidHeads) + }) + + t.Run("message at expiry boundary passes verification", func(t *testing.T) { + t.Parallel() + + sourceChainID := eth.ChainIDFromUInt64(10) + destChainID := eth.ChainIDFromUInt64(8453) + + sourceBlockHash := common.HexToHash("0xSource") + destBlockHash := common.HexToHash("0xDest") + + // Message is exactly at the expiry boundary (should pass) + execTimestamp := uint64(1000000) + initTimestamp := execTimestamp - ExpiryTime // Exactly at boundary + + sourceBlock := eth.BlockID{Number: 50, Hash: sourceBlockHash} + destBlock := eth.BlockID{Number: 100, Hash: destBlockHash} + + execMsg := &suptypes.ExecutingMessage{ + ChainID: sourceChainID, + BlockNum: 50, + LogIdx: 0, + Timestamp: initTimestamp, // Exactly at expiry boundary + Checksum: suptypes.MessageChecksum{0x01}, + } + + sourceDB := &algoMockLogsDB{ + openBlockRef: eth.BlockRef{Hash: sourceBlockHash, Number: 50, Time: initTimestamp}, + containsSeal: suptypes.BlockSeal{Number: 50, Timestamp: initTimestamp}, + } + + destDB := &algoMockLogsDB{ + openBlockRef: eth.BlockRef{Hash: destBlockHash, Number: 100, Time: execTimestamp}, + openBlockExecMsg: map[uint32]*suptypes.ExecutingMessage{ + 0: execMsg, + }, + } + + interop := &Interop{ + log: gethlog.New(), + logsDBs: map[eth.ChainID]LogsDB{ + sourceChainID: sourceDB, + destChainID: destDB, + }, + } + + result, err := interop.verifyInteropMessages(execTimestamp, map[eth.ChainID]eth.BlockID{ + sourceChainID: sourceBlock, + destChainID: destBlock, + }) + + require.NoError(t, err) + require.True(t, result.IsValid()) + require.Empty(t, result.InvalidHeads) + }) + + t.Run("unregistered chains in blocksAtTimestamp are skipped", func(t *testing.T) { + t.Parallel() + + registeredChain := eth.ChainIDFromUInt64(10) + unregisteredChain := eth.ChainIDFromUInt64(9999) + + mockDB := &algoMockLogsDB{ + openBlockRef: eth.BlockRef{Hash: common.HexToHash("0x1"), Number: 100, Time: 1000}, + } + + interop := &Interop{ + log: gethlog.New(), + logsDBs: map[eth.ChainID]LogsDB{registeredChain: mockDB}, + } + + result, err := interop.verifyInteropMessages(1000, map[eth.ChainID]eth.BlockID{ + registeredChain: {Number: 100, Hash: common.HexToHash("0x1")}, + unregisteredChain: {Number: 200, Hash: common.HexToHash("0x2")}, + }) + + require.NoError(t, err) + require.True(t, result.IsValid()) + require.Contains(t, result.L2Heads, registeredChain) + require.NotContains(t, result.L2Heads, unregisteredChain) + }) +} + +// ============================================================================= +// TestVerifyInteropMessages_InvalidBlocks +// ============================================================================= + +func TestVerifyInteropMessages_InvalidBlocks(t *testing.T) { + t.Parallel() + + t.Run("block hash mismatch marked invalid", func(t *testing.T) { + t.Parallel() + + chainID := eth.ChainIDFromUInt64(10) + expectedBlock := eth.BlockID{Number: 100, Hash: common.HexToHash("0xExpected")} + + mockDB := &algoMockLogsDB{ + openBlockRef: eth.BlockRef{ + Hash: common.HexToHash("0xActual"), // Different from expected + Number: 100, + Time: 1000, + }, + } + + interop := &Interop{ + log: gethlog.New(), + logsDBs: map[eth.ChainID]LogsDB{chainID: mockDB}, + } + + result, err := interop.verifyInteropMessages(1000, map[eth.ChainID]eth.BlockID{ + chainID: expectedBlock, + }) + + require.NoError(t, err) + require.False(t, result.IsValid()) + require.Contains(t, result.InvalidHeads, chainID) + require.Equal(t, expectedBlock, result.InvalidHeads[chainID]) + }) + + t.Run("initiating message not found marked invalid", func(t *testing.T) { + t.Parallel() + + sourceChainID := eth.ChainIDFromUInt64(10) + destChainID := eth.ChainIDFromUInt64(8453) + + destBlockHash := common.HexToHash("0xDest") + destBlock := eth.BlockID{Number: 100, Hash: destBlockHash} + + execMsg := &suptypes.ExecutingMessage{ + ChainID: sourceChainID, + BlockNum: 50, + LogIdx: 0, + Timestamp: 500, + Checksum: suptypes.MessageChecksum{0x01}, + } + + sourceDB := &algoMockLogsDB{ + containsErr: suptypes.ErrConflict, // Message not found + } + + destDB := &algoMockLogsDB{ + openBlockRef: eth.BlockRef{Hash: destBlockHash, Number: 100, Time: 1000}, + openBlockExecMsg: map[uint32]*suptypes.ExecutingMessage{ + 0: execMsg, + }, + } + + interop := &Interop{ + log: gethlog.New(), + logsDBs: map[eth.ChainID]LogsDB{ + sourceChainID: sourceDB, + destChainID: destDB, + }, + } + + result, err := interop.verifyInteropMessages(1000, map[eth.ChainID]eth.BlockID{ + destChainID: destBlock, + }) + + require.NoError(t, err) + require.False(t, result.IsValid()) + require.Contains(t, result.InvalidHeads, destChainID) + }) + + t.Run("timestamp violation (init.ts >= exec.ts) marked invalid", func(t *testing.T) { + t.Parallel() + + sourceChainID := eth.ChainIDFromUInt64(10) + destChainID := eth.ChainIDFromUInt64(8453) + + destBlockHash := common.HexToHash("0xDest") + destBlock := eth.BlockID{Number: 100, Hash: destBlockHash} + + execMsg := &suptypes.ExecutingMessage{ + ChainID: sourceChainID, + BlockNum: 50, + LogIdx: 0, + Timestamp: 1000, // Same as dest block timestamp - INVALID! + Checksum: suptypes.MessageChecksum{0x01}, + } + + sourceDB := &algoMockLogsDB{ + containsSeal: suptypes.BlockSeal{Number: 50, Timestamp: 1000}, + } + + destDB := &algoMockLogsDB{ + openBlockRef: eth.BlockRef{Hash: destBlockHash, Number: 100, Time: 1000}, + openBlockExecMsg: map[uint32]*suptypes.ExecutingMessage{ + 0: execMsg, + }, + } + + interop := &Interop{ + log: gethlog.New(), + logsDBs: map[eth.ChainID]LogsDB{ + sourceChainID: sourceDB, + destChainID: destDB, + }, + } + + result, err := interop.verifyInteropMessages(1000, map[eth.ChainID]eth.BlockID{ + destChainID: destBlock, + }) + + require.NoError(t, err) + require.False(t, result.IsValid()) + require.Contains(t, result.InvalidHeads, destChainID) + }) + + t.Run("unknown source chain marked invalid", func(t *testing.T) { + t.Parallel() + + unknownSourceChain := eth.ChainIDFromUInt64(9999) + destChainID := eth.ChainIDFromUInt64(8453) + + destBlockHash := common.HexToHash("0xDest") + destBlock := eth.BlockID{Number: 100, Hash: destBlockHash} + + execMsg := &suptypes.ExecutingMessage{ + ChainID: unknownSourceChain, // Not registered + BlockNum: 50, + LogIdx: 0, + Timestamp: 500, + Checksum: suptypes.MessageChecksum{0x01}, + } + + destDB := &algoMockLogsDB{ + openBlockRef: eth.BlockRef{Hash: destBlockHash, Number: 100, Time: 1000}, + openBlockExecMsg: map[uint32]*suptypes.ExecutingMessage{ + 0: execMsg, + }, + } + + interop := &Interop{ + log: gethlog.New(), + logsDBs: map[eth.ChainID]LogsDB{ + destChainID: destDB, + // Note: unknownSourceChain NOT in logsDBs + }, + } + + result, err := interop.verifyInteropMessages(1000, map[eth.ChainID]eth.BlockID{ + destChainID: destBlock, + }) + + require.NoError(t, err) + require.False(t, result.IsValid()) + require.Contains(t, result.InvalidHeads, destChainID) + }) + + t.Run("expired message marked invalid", func(t *testing.T) { + t.Parallel() + + sourceChainID := eth.ChainIDFromUInt64(10) + destChainID := eth.ChainIDFromUInt64(8453) + + destBlockHash := common.HexToHash("0xDest") + // Executing block is at timestamp 1000000 (well after expiry) + execTimestamp := uint64(1000000) + // Initiating message timestamp is more than ExpiryTime (604800) before executing timestamp + initTimestamp := execTimestamp - ExpiryTime - 1 // 1 second past expiry + + destBlock := eth.BlockID{Number: 100, Hash: destBlockHash} + + execMsg := &suptypes.ExecutingMessage{ + ChainID: sourceChainID, + BlockNum: 50, + LogIdx: 0, + Timestamp: initTimestamp, // Expired! + Checksum: suptypes.MessageChecksum{0x01}, + } + + sourceDB := &algoMockLogsDB{ + containsSeal: suptypes.BlockSeal{Number: 50, Timestamp: initTimestamp}, + } + + destDB := &algoMockLogsDB{ + openBlockRef: eth.BlockRef{Hash: destBlockHash, Number: 100, Time: execTimestamp}, + openBlockExecMsg: map[uint32]*suptypes.ExecutingMessage{ + 0: execMsg, + }, + } + + interop := &Interop{ + log: gethlog.New(), + logsDBs: map[eth.ChainID]LogsDB{ + sourceChainID: sourceDB, + destChainID: destDB, + }, + } + + result, err := interop.verifyInteropMessages(execTimestamp, map[eth.ChainID]eth.BlockID{ + destChainID: destBlock, + }) + + require.NoError(t, err) + require.False(t, result.IsValid()) + require.Contains(t, result.InvalidHeads, destChainID) + }) + + t.Run("multiple chains with one invalid", func(t *testing.T) { + t.Parallel() + + sourceChainID := eth.ChainIDFromUInt64(10) + validChainID := eth.ChainIDFromUInt64(8453) + invalidChainID := eth.ChainIDFromUInt64(420) + + validBlockHash := common.HexToHash("0xValid") + invalidBlockHash := common.HexToHash("0xInvalid") + + validBlock := eth.BlockID{Number: 100, Hash: validBlockHash} + invalidBlock := eth.BlockID{Number: 200, Hash: invalidBlockHash} + + badExecMsg := &suptypes.ExecutingMessage{ + ChainID: sourceChainID, + BlockNum: 50, + LogIdx: 0, + Timestamp: 1000, // Same as block timestamp - INVALID + Checksum: suptypes.MessageChecksum{0x01}, + } + + sourceDB := &algoMockLogsDB{ + containsSeal: suptypes.BlockSeal{Number: 50, Timestamp: 1000}, + } + + validDB := &algoMockLogsDB{ + openBlockRef: eth.BlockRef{Hash: validBlockHash, Number: 100, Time: 1000}, + openBlockExecMsg: nil, // No executing messages - valid + } + + invalidDB := &algoMockLogsDB{ + openBlockRef: eth.BlockRef{Hash: invalidBlockHash, Number: 200, Time: 1000}, + openBlockExecMsg: map[uint32]*suptypes.ExecutingMessage{ + 0: badExecMsg, + }, + } + + interop := &Interop{ + log: gethlog.New(), + logsDBs: map[eth.ChainID]LogsDB{ + sourceChainID: sourceDB, + validChainID: validDB, + invalidChainID: invalidDB, + }, + } + + result, err := interop.verifyInteropMessages(1000, map[eth.ChainID]eth.BlockID{ + validChainID: validBlock, + invalidChainID: invalidBlock, + }) + + require.NoError(t, err) + require.False(t, result.IsValid()) + // Both chains in L2Heads + require.Contains(t, result.L2Heads, validChainID) + require.Contains(t, result.L2Heads, invalidChainID) + // Only invalid in InvalidHeads + require.NotContains(t, result.InvalidHeads, validChainID) + require.Contains(t, result.InvalidHeads, invalidChainID) + }) +} + +// ============================================================================= +// TestVerifyInteropMessages_Errors +// ============================================================================= + +func TestVerifyInteropMessages_Errors(t *testing.T) { + t.Parallel() + + t.Run("OpenBlock error propagated", func(t *testing.T) { + t.Parallel() + + chainID := eth.ChainIDFromUInt64(10) + block := eth.BlockID{Number: 100, Hash: common.HexToHash("0x123")} + + mockDB := &algoMockLogsDB{ + openBlockErr: errors.New("database error"), + } + + interop := &Interop{ + log: gethlog.New(), + logsDBs: map[eth.ChainID]LogsDB{chainID: mockDB}, + } + + result, err := interop.verifyInteropMessages(1000, map[eth.ChainID]eth.BlockID{ + chainID: block, + }) + + require.Error(t, err) + require.Contains(t, err.Error(), "database error") + require.True(t, result.IsEmpty()) + }) +} + +// ============================================================================= +// Mock Types for Algorithm Tests +// ============================================================================= + +// algoMockLogsDB is a mock LogsDB for algorithm tests +type algoMockLogsDB struct { + openBlockRef eth.BlockRef + openBlockLogCnt uint32 + openBlockExecMsg map[uint32]*suptypes.ExecutingMessage + openBlockErr error + + firstSealedBlock suptypes.BlockSeal + firstSealedBlockErr error + + containsSeal suptypes.BlockSeal + containsErr error +} + +func (m *algoMockLogsDB) LatestSealedBlock() (eth.BlockID, bool) { return eth.BlockID{}, false } +func (m *algoMockLogsDB) FirstSealedBlock() (suptypes.BlockSeal, error) { + if m.firstSealedBlockErr != nil { + return suptypes.BlockSeal{}, m.firstSealedBlockErr + } + return m.firstSealedBlock, nil +} +func (m *algoMockLogsDB) FindSealedBlock(number uint64) (suptypes.BlockSeal, error) { + return suptypes.BlockSeal{}, nil +} +func (m *algoMockLogsDB) OpenBlock(blockNum uint64) (eth.BlockRef, uint32, map[uint32]*suptypes.ExecutingMessage, error) { + if m.openBlockErr != nil { + return eth.BlockRef{}, 0, nil, m.openBlockErr + } + return m.openBlockRef, m.openBlockLogCnt, m.openBlockExecMsg, nil +} +func (m *algoMockLogsDB) Contains(query suptypes.ContainsQuery) (suptypes.BlockSeal, error) { + if m.containsErr != nil { + return suptypes.BlockSeal{}, m.containsErr + } + return m.containsSeal, nil +} +func (m *algoMockLogsDB) AddLog(logHash common.Hash, parentBlock eth.BlockID, logIdx uint32, execMsg *suptypes.ExecutingMessage) error { + return nil +} +func (m *algoMockLogsDB) SealBlock(parentHash common.Hash, block eth.BlockID, timestamp uint64) error { + return nil +} +func (m *algoMockLogsDB) Close() error { return nil } + +var _ LogsDB = (*algoMockLogsDB)(nil) + +// testBlockInfo implements eth.BlockInfo for testing +type testBlockInfo struct { + hash common.Hash + parentHash common.Hash + number uint64 + timestamp uint64 +} + +func (m *testBlockInfo) Hash() common.Hash { return m.hash } +func (m *testBlockInfo) ParentHash() common.Hash { return m.parentHash } +func (m *testBlockInfo) Coinbase() common.Address { return common.Address{} } +func (m *testBlockInfo) Root() common.Hash { return common.Hash{} } +func (m *testBlockInfo) NumberU64() uint64 { return m.number } +func (m *testBlockInfo) Time() uint64 { return m.timestamp } +func (m *testBlockInfo) MixDigest() common.Hash { return common.Hash{} } +func (m *testBlockInfo) BaseFee() *big.Int { return big.NewInt(1) } +func (m *testBlockInfo) BlobBaseFee(chainConfig *params.ChainConfig) *big.Int { return big.NewInt(1) } +func (m *testBlockInfo) ExcessBlobGas() *uint64 { return nil } +func (m *testBlockInfo) ReceiptHash() common.Hash { return common.Hash{} } +func (m *testBlockInfo) GasUsed() uint64 { return 0 } +func (m *testBlockInfo) GasLimit() uint64 { return 30000000 } +func (m *testBlockInfo) BlobGasUsed() *uint64 { return nil } +func (m *testBlockInfo) ParentBeaconRoot() *common.Hash { return nil } +func (m *testBlockInfo) WithdrawalsRoot() *common.Hash { return nil } +func (m *testBlockInfo) HeaderRLP() ([]byte, error) { return nil, nil } +func (m *testBlockInfo) Header() *types.Header { return nil } +func (m *testBlockInfo) ID() eth.BlockID { return eth.BlockID{Hash: m.hash, Number: m.number} } + +var _ eth.BlockInfo = (*testBlockInfo)(nil) diff --git a/op-supernode/supernode/activity/interop/interop.go b/op-supernode/supernode/activity/interop/interop.go index e54b66bae1c..4bf777cec78 100644 --- a/op-supernode/supernode/activity/interop/interop.go +++ b/op-supernode/supernode/activity/interop/interop.go @@ -39,8 +39,10 @@ type Interop struct { log log.Logger chains map[eth.ChainID]cc.ChainContainer activationTimestamp uint64 + dataDir string verifiedDB *VerifiedDB + logsDBs map[eth.ChainID]LogsDB mu sync.RWMutex ctx context.Context @@ -68,10 +70,29 @@ func New( log.Error("failed to open verified DB", "err", err) return nil } + + // Initialize logsDBs for each chain + logsDBs := make(map[eth.ChainID]LogsDB) + for chainID := range chains { + logsDB, err := openLogsDB(log, chainID, dataDir) + if err != nil { + log.Error("failed to open logs DB for chain", "chainID", chainID, "err", err) + // Clean up already created logsDBs + for _, db := range logsDBs { + _ = db.Close() + } + _ = verifiedDB.Close() + return nil + } + logsDBs[chainID] = logsDB + } + i := &Interop{ log: log, chains: chains, verifiedDB: verifiedDB, + logsDBs: logsDBs, + dataDir: dataDir, currentL1: eth.BlockID{}, activationTimestamp: activationTimestamp, } @@ -119,6 +140,12 @@ func (i *Interop) Stop(ctx context.Context) error { if i.cancel != nil { i.cancel() } + // Close all logsDBs + for chainID, db := range i.logsDBs { + if err := db.Close(); err != nil { + i.log.Error("failed to close logs DB", "chainID", chainID, "err", err) + } + } if i.verifiedDB != nil { return i.verifiedDB.Close() } @@ -261,6 +288,14 @@ func (i *Interop) handleResult(result Result) error { return nil } +// invalidateBlock handles an invalid block by notifying the chain to reorg. +func (i *Interop) invalidateBlock(chainID eth.ChainID, blockID eth.BlockID) error { + // TODO(#18944): Implement block invalidation + // This should trigger the chain container to reorg away from the invalid block + i.log.Warn("invalidateBlock called but not implemented", "chainID", chainID, "blockID", blockID) + return nil +} + // checkChainsReady checks if all chains are ready to process the next timestamp. // Queries all chains in parallel for better performance. func (i *Interop) checkChainsReady(ts uint64) (map[eth.ChainID]eth.BlockID, error) { @@ -297,26 +332,6 @@ func (i *Interop) checkChainsReady(ts uint64) (map[eth.ChainID]eth.BlockID, erro return blocksAtTimestamp, nil } -// TODO(#18743): Interop Algorithm -func (i *Interop) loadLogs(ts uint64) error { - return nil -} - -// TODO(#18743): Interop Algorithm -func (i *Interop) verifyInteropMessages(ts uint64, blocksAtTimestamp map[eth.ChainID]eth.BlockID) (Result, error) { - result := Result{Timestamp: ts, L2Heads: make(map[eth.ChainID]eth.BlockID)} - for _, chain := range i.chains { - blockID := blocksAtTimestamp[chain.ID()] - result.L2Heads[chain.ID()] = blockID - } - return result, nil -} - -// TODO(#18944): Invalidate Block -func (i *Interop) invalidateBlock(chainID eth.ChainID, blockID eth.BlockID) error { - return nil -} - func (i *Interop) commitVerifiedResult(timestamp uint64, verifiedResult VerifiedResult) error { return i.verifiedDB.Commit(verifiedResult) } diff --git a/op-supernode/supernode/activity/interop/interop_test.go b/op-supernode/supernode/activity/interop/interop_test.go index 2c51af6bfdf..79680fe0262 100644 --- a/op-supernode/supernode/activity/interop/interop_test.go +++ b/op-supernode/supernode/activity/interop/interop_test.go @@ -3,6 +3,7 @@ package interop import ( "context" "errors" + "math/big" "sync" "testing" "time" @@ -10,976 +11,925 @@ import ( "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-supernode/supernode/activity" cc "github.com/ethereum-optimism/optimism/op-supernode/supernode/chain_container" + suptypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" gethlog "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" "github.com/stretchr/testify/require" ) -// mockChainContainer implements cc.ChainContainer for testing -type mockChainContainer struct { - id eth.ChainID +// ============================================================================= +// TestNew +// ============================================================================= - currentL1 eth.BlockRef - currentL1Err error +func TestNew(t *testing.T) { + t.Parallel() - blockAtTimestamp eth.L2BlockRef - blockAtTimestampErr error + t.Run("valid inputs initializes all components", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() - mu sync.Mutex -} + chains := map[eth.ChainID]cc.ChainContainer{ + eth.ChainIDFromUInt64(10): newMockChainContainer(10), + eth.ChainIDFromUInt64(8453): newMockChainContainer(8453), + } -func newMockChainContainer(id uint64) *mockChainContainer { - return &mockChainContainer{ - id: eth.ChainIDFromUInt64(id), - } -} + interop := New(testLogger(), 1000, chains, dataDir) -func (m *mockChainContainer) ID() eth.ChainID { return m.id } + require.NotNil(t, interop) + require.Equal(t, uint64(1000), interop.activationTimestamp) + require.NotNil(t, interop.verifiedDB) + require.Len(t, interop.chains, 2) + require.Len(t, interop.logsDBs, 2) + require.NotNil(t, interop.verifyFn) -func (m *mockChainContainer) Start(ctx context.Context) error { return nil } -func (m *mockChainContainer) Stop(ctx context.Context) error { return nil } -func (m *mockChainContainer) Pause(ctx context.Context) error { return nil } -func (m *mockChainContainer) Resume(ctx context.Context) error { return nil } + // Verify logsDBs populated for each chain + for chainID := range chains { + require.Contains(t, interop.logsDBs, chainID) + require.NotNil(t, interop.logsDBs[chainID]) + } + }) -func (m *mockChainContainer) RegisterVerifier(v activity.VerificationActivity) { -} -func (m *mockChainContainer) BlockAtTimestamp(ctx context.Context, ts uint64, label eth.BlockLabel) (eth.L2BlockRef, error) { - m.mu.Lock() - defer m.mu.Unlock() - return m.blockAtTimestamp, m.blockAtTimestampErr -} + t.Run("invalid dataDir returns nil", func(t *testing.T) { + t.Parallel() -func (m *mockChainContainer) VerifiedAt(ctx context.Context, ts uint64) (eth.BlockID, eth.BlockID, error) { - return eth.BlockID{}, eth.BlockID{}, nil -} -func (m *mockChainContainer) L1ForL2(ctx context.Context, l2Block eth.BlockID) (eth.BlockID, error) { - return eth.BlockID{}, nil -} -func (m *mockChainContainer) OptimisticAt(ctx context.Context, ts uint64) (eth.BlockID, eth.BlockID, error) { - return eth.BlockID{}, eth.BlockID{}, nil -} -func (m *mockChainContainer) OutputRootAtL2BlockNumber(ctx context.Context, l2BlockNum uint64) (eth.Bytes32, error) { - return eth.Bytes32{}, nil -} -func (m *mockChainContainer) OptimisticOutputAtTimestamp(ctx context.Context, ts uint64) (*eth.OutputResponse, error) { - return nil, nil -} -func (m *mockChainContainer) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) { - m.mu.Lock() - defer m.mu.Unlock() - if m.currentL1Err != nil { - return nil, m.currentL1Err - } - return ð.SyncStatus{CurrentL1: m.currentL1}, nil -} -func (m *mockChainContainer) RewindEngine(ctx context.Context, timestamp uint64) error { - return nil -} - -var _ cc.ChainContainer = (*mockChainContainer)(nil) + interop := New(testLogger(), 1000, map[eth.ChainID]cc.ChainContainer{}, "/nonexistent/path") -// Helper to create a test logger -func testLogger() gethlog.Logger { - return gethlog.New() + require.Nil(t, interop) + }) } // ============================================================================= -// Constructor Tests +// TestStartStop // ============================================================================= -func TestNew_ValidInputs(t *testing.T) { +func TestStartStop(t *testing.T) { t.Parallel() - dataDir := t.TempDir() - chains := map[eth.ChainID]cc.ChainContainer{ - eth.ChainIDFromUInt64(10): newMockChainContainer(10), - } + t.Run("Start blocks until context cancelled", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() - interop := New(testLogger(), 1000, chains, dataDir) + mock := newMockChainContainer(10) + mock.currentL1 = eth.BlockRef{Number: 100, Hash: common.HexToHash("0x1")} + mock.blockAtTimestamp = eth.L2BlockRef{Number: 50} - require.NotNil(t, interop) - require.Equal(t, uint64(1000), interop.activationTimestamp) - require.NotNil(t, interop.verifiedDB) - require.Equal(t, eth.BlockID{}, interop.currentL1) // starts empty - require.Len(t, interop.chains, 1) -} + chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) -func TestNew_InvalidDataDir(t *testing.T) { - t.Parallel() - // Use a path that can't be written to - invalidDir := "/nonexistent/path/that/cannot/exist/db" + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- interop.Start(ctx) }() - chains := map[eth.ChainID]cc.ChainContainer{} + // Wait for start + require.Eventually(t, func() bool { + interop.mu.RLock() + defer interop.mu.RUnlock() + return interop.started + }, 5*time.Second, 100*time.Millisecond) - interop := New(testLogger(), 1000, chains, invalidDir) + cancel() - // New returns nil when DB fails to open - require.Nil(t, interop) -} + var err error + require.Eventually(t, func() bool { + select { + case err = <-done: + return true + default: + return false + } + }, 5*time.Second, 100*time.Millisecond) + require.ErrorIs(t, err, context.Canceled) + }) -func TestNew_EmptyChains(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + t.Run("double Start blocked", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() - chains := map[eth.ChainID]cc.ChainContainer{} + mock := newMockChainContainer(10) + mock.currentL1 = eth.BlockRef{Number: 100, Hash: common.HexToHash("0x1")} - interop := New(testLogger(), 0, chains, dataDir) + chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) - require.NotNil(t, interop) - require.Empty(t, interop.chains) -} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() -func TestNew_MultipleChains(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + go func() { _ = interop.Start(ctx) }() - chains := map[eth.ChainID]cc.ChainContainer{ - eth.ChainIDFromUInt64(10): newMockChainContainer(10), - eth.ChainIDFromUInt64(8453): newMockChainContainer(8453), - eth.ChainIDFromUInt64(420): newMockChainContainer(420), - } + require.Eventually(t, func() bool { + interop.mu.RLock() + defer interop.mu.RUnlock() + return interop.started + }, 5*time.Second, 100*time.Millisecond) - interop := New(testLogger(), 500, chains, dataDir) + ctx2, cancel2 := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel2() - require.NotNil(t, interop) - require.Len(t, interop.chains, 3) -} + err := interop.Start(ctx2) + require.ErrorIs(t, err, context.DeadlineExceeded) + }) -// ============================================================================= -// Lifecycle Tests -// ============================================================================= + t.Run("Stop cancels running Start and closes DB", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() -func TestStart_BlocksUntilContextCanceled(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + mock := newMockChainContainer(10) + mock.currentL1 = eth.BlockRef{Number: 100, Hash: common.HexToHash("0x1")} + mock.blockAtTimestampErr = ethereum.NotFound - mock := newMockChainContainer(10) - mock.currentL1 = eth.BlockRef{Number: 100, Hash: common.HexToHash("0x1")} - mock.blockAtTimestamp = eth.L2BlockRef{Number: 50} + chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) - chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) + done := make(chan error, 1) + go func() { done <- interop.Start(context.Background()) }() - ctx, cancel := context.WithCancel(context.Background()) - - done := make(chan error, 1) - go func() { - done <- interop.Start(ctx) - }() - - // Wait for it to start the loop - require.Eventually(t, func() bool { - interop.mu.RLock() - defer interop.mu.RUnlock() - return interop.started - }, 5*time.Second, 100*time.Millisecond, "Start should mark as started") - - // Cancel and verify it exits - cancel() - - var err error - require.Eventually(t, func() bool { - select { - case err = <-done: - return true - default: - return false - } - }, 5*time.Second, 100*time.Millisecond, "Start should exit after context cancellation") + require.Eventually(t, func() bool { + interop.mu.RLock() + defer interop.mu.RUnlock() + return interop.started + }, 5*time.Second, 100*time.Millisecond) - require.ErrorIs(t, err, context.Canceled) -} + err := interop.Stop(context.Background()) + require.NoError(t, err) -func TestStart_AlreadyStarted(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + require.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, 5*time.Second, 100*time.Millisecond) - mock := newMockChainContainer(10) - mock.currentL1 = eth.BlockRef{Number: 100, Hash: common.HexToHash("0x1")} + // Verify DB is closed + _, err = interop.verifiedDB.Has(100) + require.Error(t, err) + }) +} - chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) +// ============================================================================= +// TestCollectCurrentL1 +// ============================================================================= - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func TestCollectCurrentL1(t *testing.T) { + t.Parallel() - // Start first instance - go func() { - _ = interop.Start(ctx) - }() + t.Run("returns minimum L1 across multiple chains", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() - // Wait for it to mark as started - require.Eventually(t, func() bool { - interop.mu.RLock() - defer interop.mu.RUnlock() - return interop.started - }, 5*time.Second, 100*time.Millisecond, "Start should mark as started") + mock1 := newMockChainContainer(10) + mock1.currentL1 = eth.BlockRef{Number: 200, Hash: common.HexToHash("0x2")} - // Try to start again - should block on context and return deadline exceeded - ctx2, cancel2 := context.WithTimeout(context.Background(), 500*time.Millisecond) - defer cancel2() + mock2 := newMockChainContainer(8453) + mock2.currentL1 = eth.BlockRef{Number: 100, Hash: common.HexToHash("0x1")} // minimum - err := interop.Start(ctx2) - require.ErrorIs(t, err, context.DeadlineExceeded) -} + chains := map[eth.ChainID]cc.ChainContainer{mock1.id: mock1, mock2.id: mock2} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) + interop.ctx = context.Background() -func TestStop_ClosesVerifiedDB(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + l1, err := interop.collectCurrentL1() - chains := map[eth.ChainID]cc.ChainContainer{} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) + require.NoError(t, err) + require.Equal(t, uint64(100), l1.Number) + require.Equal(t, common.HexToHash("0x1"), l1.Hash) + }) - err := interop.Stop(context.Background()) - require.NoError(t, err) + t.Run("single chain returns its L1", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() - // Verify DB is closed by trying to use it (should fail) - _, err = interop.verifiedDB.Has(100) - require.Error(t, err) // LevelDB returns error on closed DB -} + mock := newMockChainContainer(10) + mock.currentL1 = eth.BlockRef{Number: 500, Hash: common.HexToHash("0x5")} -func TestStop_CancelsRunningContext(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) + interop.ctx = context.Background() - mock := newMockChainContainer(10) - mock.currentL1 = eth.BlockRef{Number: 100, Hash: common.HexToHash("0x1")} - mock.blockAtTimestampErr = ethereum.NotFound // Keep it in "not ready" state + l1, err := interop.collectCurrentL1() - chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) + require.NoError(t, err) + require.Equal(t, uint64(500), l1.Number) + }) - ctx := context.Background() - - done := make(chan error, 1) - go func() { - done <- interop.Start(ctx) - }() - - // Wait for it to start - require.Eventually(t, func() bool { - interop.mu.RLock() - defer interop.mu.RUnlock() - return interop.started - }, 5*time.Second, 100*time.Millisecond, "Start should mark as started") - - // Stop should cancel the internal context - err := interop.Stop(context.Background()) - require.NoError(t, err) - - // Verify Start exited - require.Eventually(t, func() bool { - select { - case <-done: - return true - default: - return false - } - }, 5*time.Second, 100*time.Millisecond, "Start should exit after Stop is called") -} + t.Run("chain error propagated", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() -func TestStop_NilCancel(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + mock := newMockChainContainer(10) + mock.currentL1Err = errors.New("chain not synced") - chains := map[eth.ChainID]cc.ChainContainer{} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) + chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) + interop.ctx = context.Background() + + l1, err := interop.collectCurrentL1() - // Stop without ever starting - cancel is nil - err := interop.Stop(context.Background()) - require.NoError(t, err) + require.Error(t, err) + require.Contains(t, err.Error(), "not ready") + require.Equal(t, eth.BlockID{}, l1) + }) } // ============================================================================= -// collectCurrentL1 Tests +// TestCheckChainsReady // ============================================================================= -func TestCollectCurrentL1_ReturnsMinimum(t *testing.T) { +func TestCheckChainsReady(t *testing.T) { t.Parallel() - dataDir := t.TempDir() - mock1 := newMockChainContainer(10) - mock1.currentL1 = eth.BlockRef{Number: 200, Hash: common.HexToHash("0x2")} + t.Run("all chains ready returns blocks", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() - mock2 := newMockChainContainer(8453) - mock2.currentL1 = eth.BlockRef{Number: 100, Hash: common.HexToHash("0x1")} // minimum + mock1 := newMockChainContainer(10) + mock1.blockAtTimestamp = eth.L2BlockRef{Number: 100, Hash: common.HexToHash("0x1")} - chains := map[eth.ChainID]cc.ChainContainer{ - mock1.id: mock1, - mock2.id: mock2, - } - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() + mock2 := newMockChainContainer(8453) + mock2.blockAtTimestamp = eth.L2BlockRef{Number: 200, Hash: common.HexToHash("0x2")} - l1, err := interop.collectCurrentL1() + chains := map[eth.ChainID]cc.ChainContainer{mock1.id: mock1, mock2.id: mock2} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) + interop.ctx = context.Background() - require.NoError(t, err) - require.Equal(t, uint64(100), l1.Number) - require.Equal(t, common.HexToHash("0x1"), l1.Hash) -} + blocks, err := interop.checkChainsReady(1000) -func TestCollectCurrentL1_ChainNotReady_Error(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + require.NoError(t, err) + require.Len(t, blocks, 2) + require.NotEqual(t, common.Hash{}, blocks[mock1.id].Hash) + require.NotEqual(t, common.Hash{}, blocks[mock2.id].Hash) + }) - mock := newMockChainContainer(10) - mock.currentL1Err = errors.New("chain not synced") + t.Run("one chain not ready returns error", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() - chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() + mock1 := newMockChainContainer(10) + mock1.blockAtTimestamp = eth.L2BlockRef{Number: 100} - l1, err := interop.collectCurrentL1() + mock2 := newMockChainContainer(8453) + mock2.blockAtTimestampErr = ethereum.NotFound - require.Error(t, err) - require.Contains(t, err.Error(), "not ready") - require.Equal(t, eth.BlockID{}, l1) -} + chains := map[eth.ChainID]cc.ChainContainer{mock1.id: mock1, mock2.id: mock2} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) + interop.ctx = context.Background() -func TestCollectCurrentL1_EmptyChains(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + blocks, err := interop.checkChainsReady(1000) - chains := map[eth.ChainID]cc.ChainContainer{} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() - - l1, err := interop.collectCurrentL1() + require.Error(t, err) + require.Nil(t, blocks) + }) - require.NoError(t, err) - require.Equal(t, eth.BlockID{}, l1) -} + t.Run("parallel execution works", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() -func TestCollectCurrentL1_SingleChain(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() - - mock := newMockChainContainer(10) - mock.currentL1 = eth.BlockRef{Number: 500, Hash: common.HexToHash("0x5")} + chains := make(map[eth.ChainID]cc.ChainContainer) + for i := 0; i < 5; i++ { + mock := newMockChainContainer(uint64(10 + i)) + mock.blockAtTimestamp = eth.L2BlockRef{Number: uint64(100 + i)} + chains[mock.id] = mock + } - chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) + interop.ctx = context.Background() - l1, err := interop.collectCurrentL1() + blocks, err := interop.checkChainsReady(1000) - require.NoError(t, err) - require.Equal(t, uint64(500), l1.Number) - require.Equal(t, common.HexToHash("0x5"), l1.Hash) + require.NoError(t, err) + require.Len(t, blocks, 5) + }) } // ============================================================================= -// checkChainsReady Tests +// TestProgressInterop // ============================================================================= -func TestCheckChainsReady_AllReady(t *testing.T) { +func TestProgressInterop(t *testing.T) { t.Parallel() - dataDir := t.TempDir() - mock1 := newMockChainContainer(10) - mock1.blockAtTimestamp = eth.L2BlockRef{Number: 100, Hash: common.HexToHash("0x1")} + t.Run("not initialized uses activation timestamp", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() - mock2 := newMockChainContainer(8453) - mock2.blockAtTimestamp = eth.L2BlockRef{Number: 200, Hash: common.HexToHash("0x2")} + mock := newMockChainContainer(10) + mock.blockAtTimestamp = eth.L2BlockRef{Number: 100, Hash: common.HexToHash("0x1")} - chains := map[eth.ChainID]cc.ChainContainer{ - mock1.id: mock1, - mock2.id: mock2, - } - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() + chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} + interop := New(testLogger(), 5000, chains, dataDir) + require.NotNil(t, interop) + interop.ctx = context.Background() - blocks, err := interop.checkChainsReady(1000) + var capturedTimestamp uint64 + interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + capturedTimestamp = ts + return Result{Timestamp: ts, L2Heads: blocks}, nil + } - require.NoError(t, err) - require.Len(t, blocks, 2) - require.Equal(t, uint64(100), blocks[mock1.id].Number) - require.Equal(t, uint64(200), blocks[mock2.id].Number) -} + result, err := interop.progressInterop() -func TestCheckChainsReady_OneNotReady(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + require.NoError(t, err) + require.Equal(t, uint64(5000), result.Timestamp) + require.Equal(t, uint64(5000), capturedTimestamp) + }) + + t.Run("initialized uses next timestamp", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() + + mock := newMockChainContainer(10) + mock.blockAtTimestamp = eth.L2BlockRef{Number: 100, Hash: common.HexToHash("0x1")} + + chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) + interop.ctx = context.Background() + interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + return Result{Timestamp: ts, L2Heads: blocks}, nil + } - mock1 := newMockChainContainer(10) - mock1.blockAtTimestamp = eth.L2BlockRef{Number: 100} + // First progress + result1, err := interop.progressInterop() + require.NoError(t, err) + require.Equal(t, uint64(1000), result1.Timestamp) - mock2 := newMockChainContainer(8453) - mock2.blockAtTimestampErr = ethereum.NotFound // Not ready + // Commit + err = interop.handleResult(result1) + require.NoError(t, err) - chains := map[eth.ChainID]cc.ChainContainer{ - mock1.id: mock1, - mock2.id: mock2, - } - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() + // Second progress should use next timestamp + result2, err := interop.progressInterop() + require.NoError(t, err) + require.Equal(t, uint64(1001), result2.Timestamp) + }) - blocks, err := interop.checkChainsReady(1000) + t.Run("chains not ready returns empty result", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() - require.Error(t, err) - require.Nil(t, blocks) -} + mock := newMockChainContainer(10) + mock.blockAtTimestampErr = ethereum.NotFound -func TestCheckChainsReady_EmptyChains(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) + interop.ctx = context.Background() - chains := map[eth.ChainID]cc.ChainContainer{} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() + result, err := interop.progressInterop() - blocks, err := interop.checkChainsReady(1000) + require.NoError(t, err) + require.True(t, result.IsEmpty()) + }) - require.NoError(t, err) - require.Empty(t, blocks) -} + t.Run("chain error propagated", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() -func TestCheckChainsReady_ParallelQueries(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + mock := newMockChainContainer(10) + mock.blockAtTimestampErr = errors.New("internal error") - // Create multiple chains to test parallel execution - var mocks []*mockChainContainer - chains := make(map[eth.ChainID]cc.ChainContainer) + chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) + interop.ctx = context.Background() - for i := 0; i < 5; i++ { - mock := newMockChainContainer(uint64(10 + i)) - mock.blockAtTimestamp = eth.L2BlockRef{Number: uint64(100 + i)} - mocks = append(mocks, mock) - chains[mock.id] = mock - } + result, err := interop.progressInterop() - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() + require.Error(t, err) + require.True(t, result.IsEmpty()) + }) - blocks, err := interop.checkChainsReady(1000) + t.Run("verifyFn error propagated", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() - require.NoError(t, err) - require.Len(t, blocks, 5) + mock := newMockChainContainer(10) + mock.currentL1 = eth.BlockRef{Number: 1000, Hash: common.HexToHash("0xL1")} + mock.blockAtTimestamp = eth.L2BlockRef{Number: 500, Hash: common.HexToHash("0xL2")} - // Verify all chains were queried - for _, mock := range mocks { - require.Contains(t, blocks, mock.id) - } + chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} + interop := New(testLogger(), 100, chains, dataDir) + require.NotNil(t, interop) + interop.ctx = context.Background() + interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + return Result{}, errors.New("verification failed") + } + + result, err := interop.progressInterop() + + require.Error(t, err) + require.Contains(t, err.Error(), "verification failed") + require.True(t, result.IsEmpty()) + }) } // ============================================================================= -// progressInterop Tests +// TestVerifiedAtTimestamp // ============================================================================= -func TestProgressInterop_NotInitialized_UsesActivationTimestamp(t *testing.T) { +func TestVerifiedAtTimestamp(t *testing.T) { t.Parallel() - dataDir := t.TempDir() - - mock := newMockChainContainer(10) - mock.blockAtTimestamp = eth.L2BlockRef{Number: 100, Hash: common.HexToHash("0x1")} - - chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} - interop := New(testLogger(), 5000, chains, dataDir) // activation at 5000 - require.NotNil(t, interop) - interop.ctx = context.Background() - - result, err := interop.progressInterop() - require.NoError(t, err) - require.False(t, result.IsEmpty()) - require.Equal(t, uint64(5000), result.Timestamp) - require.True(t, result.IsValid()) -} - -func TestProgressInterop_Initialized_UsesNextTimestamp(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + t.Run("before activation always verified", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() - mock := newMockChainContainer(10) - mock.blockAtTimestamp = eth.L2BlockRef{Number: 100, Hash: common.HexToHash("0x1")} + interop := New(testLogger(), 1000, map[eth.ChainID]cc.ChainContainer{}, dataDir) + require.NotNil(t, interop) - chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() + verified, err := interop.VerifiedAtTimestamp(999) + require.NoError(t, err) + require.True(t, verified) - // First progress - returns result for timestamp 1000 - result1, err := interop.progressInterop() - require.NoError(t, err) - require.Equal(t, uint64(1000), result1.Timestamp) + verified, err = interop.VerifiedAtTimestamp(0) + require.NoError(t, err) + require.True(t, verified) + }) - // Commit the result so DB is initialized - err = interop.handleResult(result1) - require.NoError(t, err) + t.Run("at/after activation not verified until committed", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() - // Second progress - should return result for timestamp 1001 - result2, err := interop.progressInterop() - require.NoError(t, err) - require.Equal(t, uint64(1001), result2.Timestamp) -} + interop := New(testLogger(), 1000, map[eth.ChainID]cc.ChainContainer{}, dataDir) + require.NotNil(t, interop) -func TestProgressInterop_ChainsNotReady_ReturnsEmptyResult(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + verified, err := interop.VerifiedAtTimestamp(1000) + require.NoError(t, err) + require.False(t, verified) - mock := newMockChainContainer(10) - mock.blockAtTimestampErr = ethereum.NotFound // Not ready + verified, err = interop.VerifiedAtTimestamp(9999) + require.NoError(t, err) + require.False(t, verified) + }) + + t.Run("committed timestamp verified", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() + + mock := newMockChainContainer(10) + mock.blockAtTimestamp = eth.L2BlockRef{Number: 100} + + chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) + interop.ctx = context.Background() + interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + return Result{Timestamp: ts, L2Heads: blocks}, nil + } - chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() + result, err := interop.progressInterop() + require.NoError(t, err) - result, err := interop.progressInterop() + err = interop.handleResult(result) + require.NoError(t, err) - require.NoError(t, err) // Returns nil error when chains not ready - require.True(t, result.IsEmpty()) + verified, err := interop.VerifiedAtTimestamp(1000) + require.NoError(t, err) + require.True(t, verified) + }) } -func TestProgressInterop_ChainError(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() - - mock := newMockChainContainer(10) - mock.blockAtTimestampErr = errors.New("internal error") +// ============================================================================= +// TestHandleResult +// ============================================================================= - chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() +func TestHandleResult(t *testing.T) { + t.Parallel() - result, err := interop.progressInterop() + t.Run("empty result is no-op", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() - require.Error(t, err) - require.Contains(t, err.Error(), "internal error") - require.True(t, result.IsEmpty()) -} + interop := New(testLogger(), 1000, map[eth.ChainID]cc.ChainContainer{}, dataDir) + require.NotNil(t, interop) -// ============================================================================= -// CurrentL1 Tests -// ============================================================================= + err := interop.handleResult(Result{}) + require.NoError(t, err) -func TestCurrentL1_ReturnsStoredValue(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + has, err := interop.verifiedDB.Has(0) + require.NoError(t, err) + require.False(t, has) + }) - chains := map[eth.ChainID]cc.ChainContainer{} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) + t.Run("valid result commits to DB with correct data", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() - interop.currentL1 = eth.BlockID{Number: 100, Hash: common.HexToHash("0x1")} + mock := newMockChainContainer(10) + chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) - result := interop.CurrentL1() + validResult := Result{ + Timestamp: 1000, + L1Head: eth.BlockID{Number: 100, Hash: common.HexToHash("0xL1")}, + L2Heads: map[eth.ChainID]eth.BlockID{ + mock.id: {Number: 500, Hash: common.HexToHash("0xL2")}, + }, + } - require.Equal(t, uint64(100), result.Number) - require.Equal(t, common.HexToHash("0x1"), result.Hash) -} + err := interop.handleResult(validResult) + require.NoError(t, err) -func TestCurrentL1_EmptyReturnsZero(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + has, err := interop.verifiedDB.Has(1000) + require.NoError(t, err) + require.True(t, has) - chains := map[eth.ChainID]cc.ChainContainer{} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) + retrieved, err := interop.verifiedDB.Get(1000) + require.NoError(t, err) + require.Equal(t, validResult.Timestamp, retrieved.Timestamp) + require.Equal(t, validResult.L1Head, retrieved.L1Head) + require.Equal(t, validResult.L2Heads[mock.id], retrieved.L2Heads[mock.id]) + }) + + t.Run("invalid result does not commit", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() + + mock := newMockChainContainer(10) + chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) + + invalidResult := Result{ + Timestamp: 1000, + L1Head: eth.BlockID{Number: 100, Hash: common.HexToHash("0xL1")}, + L2Heads: map[eth.ChainID]eth.BlockID{ + mock.id: {Number: 500, Hash: common.HexToHash("0xL2")}, + }, + InvalidHeads: map[eth.ChainID]eth.BlockID{ + mock.id: {Number: 500, Hash: common.HexToHash("0xBAD")}, + }, + } - result := interop.CurrentL1() + err := interop.handleResult(invalidResult) + require.NoError(t, err) - require.Equal(t, eth.BlockID{}, result) + has, err := interop.verifiedDB.Has(1000) + require.NoError(t, err) + require.False(t, has) + }) } // ============================================================================= -// VerifiedAtTimestamp Tests +// TestProgressAndRecord // ============================================================================= -func TestVerifiedAtTimestamp_Exists(t *testing.T) { +func TestProgressAndRecord(t *testing.T) { t.Parallel() - dataDir := t.TempDir() - mock := newMockChainContainer(10) - mock.blockAtTimestamp = eth.L2BlockRef{Number: 100} + t.Run("empty result sets L1 to collected minimum", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() - chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() + mock1 := newMockChainContainer(10) + mock1.currentL1 = eth.BlockRef{Number: 200, Hash: common.HexToHash("0x2")} + mock1.blockAtTimestampErr = ethereum.NotFound - // Progress to get result for timestamp 1000 - result, err := interop.progressInterop() - require.NoError(t, err) + mock2 := newMockChainContainer(8453) + mock2.currentL1 = eth.BlockRef{Number: 100, Hash: common.HexToHash("0x1")} + mock2.blockAtTimestampErr = ethereum.NotFound - // Commit the result to DB - err = interop.handleResult(result) - require.NoError(t, err) + chains := map[eth.ChainID]cc.ChainContainer{mock1.id: mock1, mock2.id: mock2} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) + interop.ctx = context.Background() - verified, err := interop.VerifiedAtTimestamp(1000) + require.Equal(t, eth.BlockID{}, interop.currentL1) - require.NoError(t, err) - require.True(t, verified) -} + err := interop.progressAndRecord() + require.NoError(t, err) -func TestVerifiedAtTimestamp_NotExists(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + require.Equal(t, uint64(100), interop.currentL1.Number) + require.Equal(t, common.HexToHash("0x1"), interop.currentL1.Hash) + }) - chains := map[eth.ChainID]cc.ChainContainer{} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) + t.Run("valid result sets L1 to result L1Head", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() - verified, err := interop.VerifiedAtTimestamp(9999) + mock := newMockChainContainer(10) + mock.currentL1 = eth.BlockRef{Number: 200, Hash: common.HexToHash("0x200")} + mock.blockAtTimestamp = eth.L2BlockRef{Number: 100, Hash: common.HexToHash("0xL2")} - require.NoError(t, err) - require.False(t, verified) -} + chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) + interop.ctx = context.Background() -// ============================================================================= -// verifyInteropMessages Tests -// ============================================================================= + expectedL1Head := eth.BlockID{Number: 150, Hash: common.HexToHash("0xL1Result")} + interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + return Result{Timestamp: ts, L1Head: expectedL1Head, L2Heads: blocks}, nil + } -func TestVerifyInteropMessages_CopiesBlocks(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + err := interop.progressAndRecord() + require.NoError(t, err) - mock1 := newMockChainContainer(10) - mock2 := newMockChainContainer(8453) + require.Equal(t, expectedL1Head.Number, interop.currentL1.Number) + require.Equal(t, expectedL1Head.Hash, interop.currentL1.Hash) + }) + + t.Run("invalid result does not update L1", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() + + mock := newMockChainContainer(10) + mock.currentL1 = eth.BlockRef{Number: 200, Hash: common.HexToHash("0x200")} + mock.blockAtTimestamp = eth.L2BlockRef{Number: 100, Hash: common.HexToHash("0xL2")} + + chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) + interop.ctx = context.Background() + + initialL1 := eth.BlockID{Number: 50, Hash: common.HexToHash("0x50")} + interop.currentL1 = initialL1 + + interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + return Result{ + Timestamp: ts, + L1Head: eth.BlockID{Number: 999, Hash: common.HexToHash("0xShouldNotBeUsed")}, + L2Heads: blocks, + InvalidHeads: map[eth.ChainID]eth.BlockID{mock.id: {Number: 100}}, + }, nil + } - chains := map[eth.ChainID]cc.ChainContainer{ - mock1.id: mock1, - mock2.id: mock2, - } - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) + err := interop.progressAndRecord() + require.NoError(t, err) - blocksAtTimestamp := map[eth.ChainID]eth.BlockID{ - mock1.id: {Number: 100, Hash: common.HexToHash("0x1")}, - mock2.id: {Number: 200, Hash: common.HexToHash("0x2")}, - } + require.Equal(t, initialL1.Number, interop.currentL1.Number) + require.Equal(t, initialL1.Hash, interop.currentL1.Hash) + }) + + t.Run("errors propagated", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() - result, err := interop.verifyInteropMessages(1000, blocksAtTimestamp) + mock := newMockChainContainer(10) + mock.currentL1Err = errors.New("L1 sync error") - require.NoError(t, err) - require.Equal(t, uint64(1000), result.Timestamp) - require.Len(t, result.L2Heads, 2) - require.Equal(t, blocksAtTimestamp[mock1.id], result.L2Heads[mock1.id]) - require.Equal(t, blocksAtTimestamp[mock2.id], result.L2Heads[mock2.id]) - require.True(t, result.IsValid()) // No invalid heads in stub implementation + chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} + interop := New(testLogger(), 1000, chains, dataDir) + require.NotNil(t, interop) + interop.ctx = context.Background() + + err := interop.progressAndRecord() + require.Error(t, err) + }) } // ============================================================================= -// handleResult Tests +// TestInterop_FullCycle // ============================================================================= -func TestHandleResult_EmptyResult_ReturnsNil(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() - - chains := map[eth.ChainID]cc.ChainContainer{} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - - emptyResult := Result{} - require.True(t, emptyResult.IsEmpty()) - - err := interop.handleResult(emptyResult) - - require.NoError(t, err) - // Empty result should not commit anything to DB - has, err := interop.verifiedDB.Has(0) - require.NoError(t, err) - require.False(t, has) -} - -func TestHandleResult_ValidResult_CommitsToDb(t *testing.T) { +func TestInterop_FullCycle(t *testing.T) { t.Parallel() dataDir := t.TempDir() mock := newMockChainContainer(10) + mock.currentL1 = eth.BlockRef{Number: 1000, Hash: common.HexToHash("0xL1")} + mock.blockAtTimestamp = eth.L2BlockRef{Number: 500, Hash: common.HexToHash("0xL2")} + chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} - interop := New(testLogger(), 1000, chains, dataDir) + interop := New(testLogger(), 100, chains, dataDir) require.NotNil(t, interop) + interop.ctx = context.Background() - validResult := Result{ - Timestamp: 1000, - L1Head: eth.BlockID{Number: 100, Hash: common.HexToHash("0xL1")}, - L2Heads: map[eth.ChainID]eth.BlockID{ - mock.id: {Number: 500, Hash: common.HexToHash("0xL2")}, - }, - InvalidHeads: nil, // No invalid heads = valid result + // Verify logsDB is empty initially + _, hasBlocks := interop.logsDBs[mock.id].LatestSealedBlock() + require.False(t, hasBlocks) + + // Stub verifyFn + interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + return Result{Timestamp: ts, L2Heads: blocks}, nil } - require.True(t, validResult.IsValid()) - require.False(t, validResult.IsEmpty()) - err := interop.handleResult(validResult) + // Run 3 cycles + for i := 0; i < 3; i++ { + l1, err := interop.collectCurrentL1() + require.NoError(t, err) + require.Equal(t, uint64(1000), l1.Number) - require.NoError(t, err) - // Valid result should be committed to DB - has, err := interop.verifiedDB.Has(1000) - require.NoError(t, err) - require.True(t, has) -} + result, err := interop.progressInterop() + require.NoError(t, err) + require.False(t, result.IsEmpty()) -func TestHandleResult_InvalidResult_DoesNotCommitToDb(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() + err = interop.handleResult(result) + require.NoError(t, err) + } - mock := newMockChainContainer(10) - chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) + // Verify timestamps committed with correct L2Heads + for ts := uint64(100); ts <= 102; ts++ { + has, err := interop.verifiedDB.Has(ts) + require.NoError(t, err) + require.True(t, has) - invalidResult := Result{ - Timestamp: 1000, - L1Head: eth.BlockID{Number: 100, Hash: common.HexToHash("0xL1")}, - L2Heads: map[eth.ChainID]eth.BlockID{ - mock.id: {Number: 500, Hash: common.HexToHash("0xL2")}, - }, - InvalidHeads: map[eth.ChainID]eth.BlockID{ - mock.id: {Number: 500, Hash: common.HexToHash("0xBAD")}, // Has invalid heads - }, + retrieved, err := interop.verifiedDB.Get(ts) + require.NoError(t, err) + require.Equal(t, ts, retrieved.Timestamp) + require.Contains(t, retrieved.L2Heads, mock.id) + require.Equal(t, ts, retrieved.L2Heads[mock.id].Number) } - require.False(t, invalidResult.IsValid()) - require.False(t, invalidResult.IsEmpty()) - - err := interop.handleResult(invalidResult) - require.NoError(t, err) - // Invalid results trigger block invalidation but are NOT committed to the verified DB - has, err := interop.verifiedDB.Has(1000) - require.NoError(t, err) - require.False(t, has) + // Verify logsDB populated + latestBlock, hasBlocks := interop.logsDBs[mock.id].LatestSealedBlock() + require.True(t, hasBlocks) + require.Equal(t, uint64(102), latestBlock.Number) } -func TestHandleResult_InvalidResult_MultipleInvalidHeads(t *testing.T) { +// ============================================================================= +// TestResult_IsEmpty +// ============================================================================= + +func TestResult_IsEmpty(t *testing.T) { t.Parallel() - dataDir := t.TempDir() - mock1 := newMockChainContainer(10) - mock2 := newMockChainContainer(8453) - chains := map[eth.ChainID]cc.ChainContainer{ - mock1.id: mock1, - mock2.id: mock2, + tests := []struct { + name string + result Result + isEmpty bool + }{ + {"zero value", Result{}, true}, + {"only timestamp", Result{Timestamp: 1000}, true}, + {"with L1Head", Result{Timestamp: 1000, L1Head: eth.BlockID{Number: 100}}, false}, + {"with L2Heads", Result{Timestamp: 1000, L2Heads: map[eth.ChainID]eth.BlockID{eth.ChainIDFromUInt64(10): {Number: 50}}}, false}, + {"with InvalidHeads", Result{Timestamp: 1000, InvalidHeads: map[eth.ChainID]eth.BlockID{eth.ChainIDFromUInt64(10): {Number: 50}}}, false}, } - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - invalidResult := Result{ - Timestamp: 1000, - L1Head: eth.BlockID{Number: 100, Hash: common.HexToHash("0xL1")}, - L2Heads: map[eth.ChainID]eth.BlockID{ - mock1.id: {Number: 500, Hash: common.HexToHash("0xL2a")}, - mock2.id: {Number: 600, Hash: common.HexToHash("0xL2b")}, - }, - InvalidHeads: map[eth.ChainID]eth.BlockID{ - mock1.id: {Number: 500, Hash: common.HexToHash("0xBAD1")}, - mock2.id: {Number: 600, Hash: common.HexToHash("0xBAD2")}, - }, + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.isEmpty, tt.result.IsEmpty()) + }) } - - err := interop.handleResult(invalidResult) - - require.NoError(t, err) - // Invalid results trigger block invalidation but are NOT committed to the verified DB - has, err := interop.verifiedDB.Has(1000) - require.NoError(t, err) - require.False(t, has) } // ============================================================================= -// progressAndRecord L1 Update Tests +// Mock Types // ============================================================================= -func TestProgressAndRecord_EmptyResult_SetsL1ToCollectedMinimum(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() - - mock1 := newMockChainContainer(10) - mock1.currentL1 = eth.BlockRef{Number: 200, Hash: common.HexToHash("0x2")} - mock1.blockAtTimestampErr = ethereum.NotFound // Chains not ready -> empty result +type mockBlockInfo struct { + hash common.Hash + parentHash common.Hash + number uint64 + timestamp uint64 +} + +func (m *mockBlockInfo) Hash() common.Hash { return m.hash } +func (m *mockBlockInfo) ParentHash() common.Hash { return m.parentHash } +func (m *mockBlockInfo) Coinbase() common.Address { return common.Address{} } +func (m *mockBlockInfo) Root() common.Hash { return common.Hash{} } +func (m *mockBlockInfo) NumberU64() uint64 { return m.number } +func (m *mockBlockInfo) Time() uint64 { return m.timestamp } +func (m *mockBlockInfo) MixDigest() common.Hash { return common.Hash{} } +func (m *mockBlockInfo) BaseFee() *big.Int { return big.NewInt(1) } +func (m *mockBlockInfo) BlobBaseFee(chainConfig *params.ChainConfig) *big.Int { return big.NewInt(1) } +func (m *mockBlockInfo) ExcessBlobGas() *uint64 { return nil } +func (m *mockBlockInfo) ReceiptHash() common.Hash { return common.Hash{} } +func (m *mockBlockInfo) GasUsed() uint64 { return 0 } +func (m *mockBlockInfo) GasLimit() uint64 { return 30000000 } +func (m *mockBlockInfo) BlobGasUsed() *uint64 { return nil } +func (m *mockBlockInfo) ParentBeaconRoot() *common.Hash { return nil } +func (m *mockBlockInfo) WithdrawalsRoot() *common.Hash { return nil } +func (m *mockBlockInfo) HeaderRLP() ([]byte, error) { return nil, nil } +func (m *mockBlockInfo) Header() *types.Header { return nil } +func (m *mockBlockInfo) ID() eth.BlockID { return eth.BlockID{Hash: m.hash, Number: m.number} } + +var _ eth.BlockInfo = (*mockBlockInfo)(nil) - mock2 := newMockChainContainer(8453) - mock2.currentL1 = eth.BlockRef{Number: 100, Hash: common.HexToHash("0x1")} // This is minimum - mock2.blockAtTimestampErr = ethereum.NotFound - - chains := map[eth.ChainID]cc.ChainContainer{ - mock1.id: mock1, - mock2.id: mock2, - } - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() +type mockChainContainer struct { + id eth.ChainID - // Verify currentL1 starts empty - require.Equal(t, eth.BlockID{}, interop.currentL1) + currentL1 eth.BlockRef + currentL1Err error - err := interop.progressAndRecord() + blockAtTimestamp eth.L2BlockRef + blockAtTimestampErr error - require.NoError(t, err) - // When result is empty, currentL1 should be set to the collected minimum - require.Equal(t, uint64(100), interop.currentL1.Number) - require.Equal(t, common.HexToHash("0x1"), interop.currentL1.Hash) + lastRequestedTimestamp uint64 + mu sync.Mutex } -func TestProgressAndRecord_ValidResult_SetsL1ToResultL1Head(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() - - mock := newMockChainContainer(10) - mock.currentL1 = eth.BlockRef{Number: 200, Hash: common.HexToHash("0x200")} - mock.blockAtTimestamp = eth.L2BlockRef{Number: 100, Hash: common.HexToHash("0xL2")} - - chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() +func newMockChainContainer(id uint64) *mockChainContainer { + return &mockChainContainer{id: eth.ChainIDFromUInt64(id)} +} - // Override verifyFn to return a valid result with a specific L1Head - expectedL1Head := eth.BlockID{Number: 150, Hash: common.HexToHash("0xL1Result")} - interop.verifyFn = func(ts uint64, blocksAtTimestamp map[eth.ChainID]eth.BlockID) (Result, error) { - return Result{ - Timestamp: ts, - L1Head: expectedL1Head, - L2Heads: map[eth.ChainID]eth.BlockID{ - mock.id: blocksAtTimestamp[mock.id], - }, - InvalidHeads: nil, // valid result - }, nil +func (m *mockChainContainer) ID() eth.ChainID { return m.id } +func (m *mockChainContainer) Start(ctx context.Context) error { return nil } +func (m *mockChainContainer) Stop(ctx context.Context) error { return nil } +func (m *mockChainContainer) Pause(ctx context.Context) error { return nil } +func (m *mockChainContainer) Resume(ctx context.Context) error { return nil } +func (m *mockChainContainer) RegisterVerifier(v activity.VerificationActivity) { +} +func (m *mockChainContainer) BlockAtTimestamp(ctx context.Context, ts uint64, label eth.BlockLabel) (eth.L2BlockRef, error) { + m.mu.Lock() + defer m.mu.Unlock() + if m.blockAtTimestampErr != nil { + return eth.L2BlockRef{}, m.blockAtTimestampErr } - - // Verify currentL1 starts empty - require.Equal(t, eth.BlockID{}, interop.currentL1) - - err := interop.progressAndRecord() - - require.NoError(t, err) - // When result is valid (non-empty), currentL1 should be set to result.L1Head - require.Equal(t, expectedL1Head.Number, interop.currentL1.Number) - require.Equal(t, expectedL1Head.Hash, interop.currentL1.Hash) + m.lastRequestedTimestamp = ts + ref := m.blockAtTimestamp + ref.Time = ts + ref.Number = ts + ref.Hash = common.BigToHash(big.NewInt(int64(ts))) + return ref, nil } - -func TestProgressAndRecord_InvalidResult_DoesNotUpdateL1(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() - - mock := newMockChainContainer(10) - mock.currentL1 = eth.BlockRef{Number: 200, Hash: common.HexToHash("0x200")} - mock.blockAtTimestamp = eth.L2BlockRef{Number: 100, Hash: common.HexToHash("0xL2")} - - chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() - - // Set an initial currentL1 value - initialL1 := eth.BlockID{Number: 50, Hash: common.HexToHash("0x50")} - interop.currentL1 = initialL1 - - // Override verifyFn to return an invalid result (has InvalidHeads) - interop.verifyFn = func(ts uint64, blocksAtTimestamp map[eth.ChainID]eth.BlockID) (Result, error) { - return Result{ - Timestamp: ts, - L1Head: eth.BlockID{Number: 999, Hash: common.HexToHash("0xShouldNotBeUsed")}, - L2Heads: map[eth.ChainID]eth.BlockID{ - mock.id: blocksAtTimestamp[mock.id], - }, - InvalidHeads: map[eth.ChainID]eth.BlockID{ - mock.id: {Number: 100, Hash: common.HexToHash("0xBAD")}, // marks result as invalid - }, - }, nil +func (m *mockChainContainer) VerifiedAt(ctx context.Context, ts uint64) (eth.BlockID, eth.BlockID, error) { + return eth.BlockID{}, eth.BlockID{}, nil +} +func (m *mockChainContainer) L1ForL2(ctx context.Context, l2Block eth.BlockID) (eth.BlockID, error) { + return eth.BlockID{}, nil +} +func (m *mockChainContainer) OptimisticAt(ctx context.Context, ts uint64) (eth.BlockID, eth.BlockID, error) { + return eth.BlockID{}, eth.BlockID{}, nil +} +func (m *mockChainContainer) OutputRootAtL2BlockNumber(ctx context.Context, l2BlockNum uint64) (eth.Bytes32, error) { + return eth.Bytes32{}, nil +} +func (m *mockChainContainer) OptimisticOutputAtTimestamp(ctx context.Context, ts uint64) (*eth.OutputResponse, error) { + return nil, nil +} +func (m *mockChainContainer) FetchReceipts(ctx context.Context, blockID eth.BlockID) (eth.BlockInfo, types.Receipts, error) { + m.mu.Lock() + defer m.mu.Unlock() + ts := m.lastRequestedTimestamp + var parentHash common.Hash + if ts > 0 { + parentHash = common.BigToHash(big.NewInt(int64(ts - 1))) } - - err := interop.progressAndRecord() - - require.NoError(t, err) - // When result is invalid, currentL1 should NOT be updated (remains at initial value) - require.Equal(t, initialL1.Number, interop.currentL1.Number) - require.Equal(t, initialL1.Hash, interop.currentL1.Hash) + blockInfo := &mockBlockInfo{ + hash: blockID.Hash, + parentHash: parentHash, + number: blockID.Number, + timestamp: ts, + } + return blockInfo, types.Receipts{}, nil } - -func TestProgressAndRecord_CollectL1Error_ReturnsError(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() - - mock := newMockChainContainer(10) - mock.currentL1Err = errors.New("L1 sync error") - - chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() - - err := interop.progressAndRecord() - - require.Error(t, err) - require.Contains(t, err.Error(), "not ready") +func (m *mockChainContainer) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) { + m.mu.Lock() + defer m.mu.Unlock() + if m.currentL1Err != nil { + return nil, m.currentL1Err + } + return ð.SyncStatus{CurrentL1: m.currentL1}, nil } - -func TestProgressAndRecord_ProgressInteropError_ReturnsError(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() - - mock := newMockChainContainer(10) - mock.currentL1 = eth.BlockRef{Number: 100, Hash: common.HexToHash("0x1")} - mock.blockAtTimestampErr = errors.New("internal chain error") - - chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} - interop := New(testLogger(), 1000, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() - - err := interop.progressAndRecord() - - require.Error(t, err) - require.Contains(t, err.Error(), "internal chain error") +func (m *mockChainContainer) RewindEngine(ctx context.Context, timestamp uint64) error { + return nil } +func (m *mockChainContainer) BlockTime() uint64 { return 1 } -// ============================================================================= -// Integration Tests -// ============================================================================= - -func TestInterop_FullCycle(t *testing.T) { - t.Parallel() - dataDir := t.TempDir() - - mock := newMockChainContainer(10) - mock.currentL1 = eth.BlockRef{Number: 1000, Hash: common.HexToHash("0xL1")} - mock.blockAtTimestamp = eth.L2BlockRef{Number: 500, Hash: common.HexToHash("0xL2")} - - chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} - interop := New(testLogger(), 100, chains, dataDir) - require.NotNil(t, interop) - interop.ctx = context.Background() +var _ cc.ChainContainer = (*mockChainContainer)(nil) - // Simulate multiple interop cycles - for i := 0; i < 3; i++ { - // Collect L1 (returns minimum across chains) - l1, err := interop.collectCurrentL1() - require.NoError(t, err) - require.Equal(t, uint64(1000), l1.Number) +func testLogger() gethlog.Logger { + return gethlog.New() +} - // Progress and get result - result, err := interop.progressInterop() - require.NoError(t, err) - require.False(t, result.IsEmpty()) +// mockLogsDBForInterop implements LogsDB for interop tests +type mockLogsDBForInterop struct { + openBlockRef eth.BlockRef + openBlockLogCnt uint32 + openBlockExecMsg map[uint32]*suptypes.ExecutingMessage + openBlockErr error + containsSeal suptypes.BlockSeal + containsErr error +} - // Handle the result (commits to DB) - err = interop.handleResult(result) - require.NoError(t, err) +func (m *mockLogsDBForInterop) LatestSealedBlock() (eth.BlockID, bool) { return eth.BlockID{}, false } +func (m *mockLogsDBForInterop) FirstSealedBlock() (suptypes.BlockSeal, error) { + return suptypes.BlockSeal{}, nil +} +func (m *mockLogsDBForInterop) FindSealedBlock(number uint64) (suptypes.BlockSeal, error) { + return suptypes.BlockSeal{}, nil +} +func (m *mockLogsDBForInterop) OpenBlock(blockNum uint64) (eth.BlockRef, uint32, map[uint32]*suptypes.ExecutingMessage, error) { + if m.openBlockErr != nil { + return eth.BlockRef{}, 0, nil, m.openBlockErr } - - // Verify timestamps were committed sequentially - for ts := uint64(100); ts <= 102; ts++ { - has, err := interop.verifiedDB.Has(ts) - require.NoError(t, err) - require.True(t, has, "timestamp %d should be verified", ts) + return m.openBlockRef, m.openBlockLogCnt, m.openBlockExecMsg, nil +} +func (m *mockLogsDBForInterop) Contains(query suptypes.ContainsQuery) (suptypes.BlockSeal, error) { + if m.containsErr != nil { + return suptypes.BlockSeal{}, m.containsErr } + return m.containsSeal, nil +} +func (m *mockLogsDBForInterop) AddLog(logHash common.Hash, parentBlock eth.BlockID, logIdx uint32, execMsg *suptypes.ExecutingMessage) error { + return nil } +func (m *mockLogsDBForInterop) SealBlock(parentHash common.Hash, block eth.BlockID, timestamp uint64) error { + return nil +} +func (m *mockLogsDBForInterop) Close() error { return nil } + +var _ LogsDB = (*mockLogsDBForInterop)(nil) diff --git a/op-supernode/supernode/activity/interop/logdb.go b/op-supernode/supernode/activity/interop/logdb.go new file mode 100644 index 00000000000..07c50173476 --- /dev/null +++ b/op-supernode/supernode/activity/interop/logdb.go @@ -0,0 +1,222 @@ +package interop + +import ( + "errors" + "fmt" + "os" + "path/filepath" + + "github.com/ethereum/go-ethereum/common" + gethTypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" +) + +// LogsDB is the interface for interacting with a chain's logs database. +// *logs.DB implements this interface. +type LogsDB interface { + // LatestSealedBlock returns the latest sealed block ID, or false if no blocks are sealed. + LatestSealedBlock() (eth.BlockID, bool) + // FirstSealedBlock returns the first block seal in the DB. + FirstSealedBlock() (types.BlockSeal, error) + // FindSealedBlock returns the block seal for the given block number. + FindSealedBlock(number uint64) (types.BlockSeal, error) + // OpenBlock returns the block reference, log count, and executing messages for a block. + OpenBlock(blockNum uint64) (ref eth.BlockRef, logCount uint32, execMsgs map[uint32]*types.ExecutingMessage, err error) + // Contains checks if an initiating message exists in the database. + // Returns the block seal if found, or an error (ErrConflict if not found, ErrFuture if not yet indexed). + Contains(query types.ContainsQuery) (types.BlockSeal, error) + // AddLog adds a log entry to the database. + AddLog(logHash common.Hash, parentBlock eth.BlockID, logIdx uint32, execMsg *types.ExecutingMessage) error + // SealBlock seals a block in the database. + SealBlock(parentHash common.Hash, block eth.BlockID, timestamp uint64) error + // Close closes the database. + Close() error +} + +// Compile-time check that *logs.DB implements LogsDB. +var _ LogsDB = (*logs.DB)(nil) + +// noopLogsDBMetrics implements the logs.Metrics interface with no-op methods. +type noopLogsDBMetrics struct{} + +func (n *noopLogsDBMetrics) RecordDBEntryCount(kind string, count int64) {} +func (n *noopLogsDBMetrics) RecordDBSearchEntriesRead(count int64) {} + +// openLogsDB opens a logs.DB for the given chain in the data directory. +func openLogsDB(logger log.Logger, chainID eth.ChainID, dataDir string) (LogsDB, error) { + chainDir := filepath.Join(dataDir, fmt.Sprintf("chain-%s", chainID)) + if err := os.MkdirAll(chainDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create chain directory: %w", err) + } + + dbPath := filepath.Join(chainDir, "logs.db") + db, err := logs.NewFromFile(logger, &noopLogsDBMetrics{}, chainID, dbPath, true) + if err != nil { + return nil, fmt.Errorf("failed to open logs DB for chain %s: %w", chainID, err) + } + + logger.Info("Initialized logs DB", "chain", chainID, "path", dbPath) + return db, nil +} + +var ( + // ErrPreviousTimestampNotSealed is returned when loadLogs is called but the + // previous timestamp has not been sealed in the logsDB. + ErrPreviousTimestampNotSealed = errors.New("previous timestamp not sealed in logsDB") + + // ErrParentHashMismatch is returned when the block's parent hash does not match + // the hash of the last sealed block in the logsDB. + ErrParentHashMismatch = errors.New("block parent hash does not match logsDB") +) + +// loadLogs loads and persists logs for the given timestamp for all chains. +// The previous timestamp MUST already be sealed in the database; if not, an error is returned. +// For the activation timestamp (first timestamp), the logsDB must be empty. +func (i *Interop) loadLogs(ts uint64) error { + for chainID, chain := range i.chains { + db := i.logsDBs[chainID] + + // Verify the previous timestamp is sealed (or DB is empty for activation timestamp) + // Returns the hash of the previous sealed block, or nil if DB is empty + latestBlock, hasBlocks, err := i.verifyCanAddTimestamp(chainID, db, ts, chain.BlockTime()) + if err != nil { + return err + } + + // Get the block at timestamp ts + block, err := chain.BlockAtTimestamp(i.ctx, ts, eth.Safe) + if err != nil { + return fmt.Errorf("chain %s: failed to get block at timestamp %d: %w", chainID, ts, err) + } + + // Fetch receipts for the block + blockInfo, receipts, err := chain.FetchReceipts(i.ctx, block.ID()) + if err != nil { + return fmt.Errorf("chain %s: failed to fetch receipts for block %d: %w", chainID, block.Number, err) + } + + // if the database has blocks, check if we can skip or need to verify continuity + if hasBlocks { + // if the latest block is the same or beyond the block we are loading, skip loading + if latestBlock.Number >= block.Number { + continue + } + + // Verify chain continuity: block's parent must match the last sealed block + if blockInfo.ParentHash() != latestBlock.Hash { + return fmt.Errorf("chain %s: block %d parent hash %s does not match logsDB last sealed block hash %s: %w", + chainID, block.Number, blockInfo.ParentHash(), latestBlock.Hash, ErrParentHashMismatch) + } + } + + // Process logs and seal the block + // If DB is empty (!hasBlocks), this is the first block - treat it as genesis for the logsDB + isFirstBlock := !hasBlocks + if err := i.processBlockLogs(db, blockInfo, receipts, isFirstBlock); err != nil { + return fmt.Errorf("chain %s: failed to process block logs for block %d: %w", chainID, block.Number, err) + } + + i.log.Debug("loaded logs for chain", + "chain", chainID, + "block", block.Number, + "timestamp", ts, + ) + } + + return nil +} + +func (i *Interop) verifyCanAddTimestamp(chainID eth.ChainID, db LogsDB, ts uint64, blockTime uint64) (eth.BlockID, bool, error) { + latestBlock, hasBlocks := db.LatestSealedBlock() + + // If no blocks in DB: + // - At activation timestamp: OK, proceed to load the first block + // - Not at activation timestamp: ERROR, we're missing data + if !hasBlocks { + if ts == i.activationTimestamp { + return eth.BlockID{}, hasBlocks, nil + } + return eth.BlockID{}, hasBlocks, fmt.Errorf("chain %s: logsDB is empty but expected blocks before timestamp %d: %w", + chainID, ts, ErrPreviousTimestampNotSealed) + } + + // DB has blocks - fall through to normal timestamp checks below + // This handles the case where we restart at activation timestamp but the logsDB already has data + + // determine the timestamp of the last sealed block + seal, err := db.FindSealedBlock(latestBlock.Number) + if err != nil { + return eth.BlockID{}, hasBlocks, fmt.Errorf("chain %s: failed to find sealed block %d: %w", chainID, latestBlock.Number, err) + } + + // if the last sealed block is already after the timestamp in question, return success + if seal.Timestamp > ts { + return latestBlock, hasBlocks, nil + } + + gap := ts - seal.Timestamp + + // if there is more than a block time of gap, we cannot append the timestamp to the database + if gap > blockTime { + return eth.BlockID{}, hasBlocks, fmt.Errorf("chain %s: the prior block timestamp %d (%d minus block time %d) is not sealed (last sealed block timestamp: %d): %w", + chainID, ts-blockTime, ts, blockTime, seal.Timestamp, ErrPreviousTimestampNotSealed) + } + + // if the gap is less than a block time, we can append the timestamp to the database, but warn the caller + if gap < blockTime { + i.log.Warn("verifyCanAddTimestamp: requested for timestamp which is not a multiple of block time", + "chain", chainID, + "timestamp", ts, + "block time", blockTime, + "gap", gap, + ) + } + + return latestBlock, hasBlocks, nil +} + +// processBlockLogs processes the receipts for a block and stores the logs in the database. +// If isFirstBlock is true, this is the first block being added to the logsDB (at activation timestamp), +// and we treat it as genesis by using an empty parent block. This allows the logsDB to start at any +// block number, not just genesis. +func (i *Interop) processBlockLogs(db LogsDB, blockInfo eth.BlockInfo, receipts gethTypes.Receipts, isFirstBlock bool) error { + blockNum := blockInfo.NumberU64() + blockID := eth.BlockID{Hash: blockInfo.Hash(), Number: blockNum} + parentHash := blockInfo.ParentHash() + + // For the first block in the logsDB (activation block), use empty parent to treat it as genesis. + // This allows OpenBlock to work correctly even when we start at a non-genesis block. + parentBlock := eth.BlockID{Hash: parentHash, Number: blockNum - 1} + sealParentHash := parentHash + if blockNum == 0 || isFirstBlock { + parentBlock = eth.BlockID{} + sealParentHash = common.Hash{} + } + + var logIndex uint32 + for _, receipt := range receipts { + for _, l := range receipt.Logs { + logHash := processors.LogToLogHash(l) + + // Decode executing message if present (nil if not an executing message) + execMsg, _ := processors.DecodeExecutingMessageLog(l) + + if err := db.AddLog(logHash, parentBlock, logIndex, execMsg); err != nil { + return fmt.Errorf("failed to add log %d: %w", logIndex, err) + } + logIndex++ + } + } + + // Seal the block - use empty parent hash for first block + if err := db.SealBlock(sealParentHash, blockID, blockInfo.Time()); err != nil { + return fmt.Errorf("failed to seal block: %w", err) + } + + return nil +} diff --git a/op-supernode/supernode/activity/interop/logdb_test.go b/op-supernode/supernode/activity/interop/logdb_test.go new file mode 100644 index 00000000000..47c2db75fe7 --- /dev/null +++ b/op-supernode/supernode/activity/interop/logdb_test.go @@ -0,0 +1,600 @@ +package interop + +import ( + "context" + "errors" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + gethlog "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" + + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-supernode/supernode/activity" + cc "github.com/ethereum-optimism/optimism/op-supernode/supernode/chain_container" + suptypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" +) + +// ============================================================================= +// TestLogsDB_Persistence +// ============================================================================= + +func TestLogsDB_Persistence(t *testing.T) { + t.Parallel() + + t.Run("data survives close and reopen", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() + chainID := eth.ChainIDFromUInt64(10) + + // Create and populate a logsDB + { + db, err := openLogsDB(gethlog.New(), chainID, dataDir) + require.NoError(t, err) + + // Seal parent block + parentBlock := eth.BlockID{Hash: common.Hash{0x01}, Number: 99} + err = db.SealBlock(common.Hash{}, parentBlock, 998) + require.NoError(t, err) + + // Add a log + logHash := common.Hash{0x02} + err = db.AddLog(logHash, parentBlock, 0, nil) + require.NoError(t, err) + + // Seal block 100 + block100 := eth.BlockID{Hash: common.Hash{0x03}, Number: 100} + err = db.SealBlock(parentBlock.Hash, block100, 1000) + require.NoError(t, err) + + err = db.Close() + require.NoError(t, err) + } + + // Reopen and verify persistence + { + db, err := openLogsDB(gethlog.New(), chainID, dataDir) + require.NoError(t, err) + defer db.Close() + + latestBlock, ok := db.LatestSealedBlock() + require.True(t, ok) + require.Equal(t, uint64(100), latestBlock.Number) + require.Equal(t, common.Hash{0x03}, latestBlock.Hash) + } + }) + + t.Run("multiple chains are isolated", func(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() + + chainID1 := eth.ChainIDFromUInt64(10) + chainID2 := eth.ChainIDFromUInt64(8453) + + db1, err := openLogsDB(gethlog.New(), chainID1, dataDir) + require.NoError(t, err) + defer db1.Close() + + db2, err := openLogsDB(gethlog.New(), chainID2, dataDir) + require.NoError(t, err) + defer db2.Close() + + // Seal different blocks on each chain + parentBlock1 := eth.BlockID{Hash: common.Hash{0x01}, Number: 99} + err = db1.SealBlock(common.Hash{}, parentBlock1, 998) + require.NoError(t, err) + + block1 := eth.BlockID{Hash: common.Hash{0x02}, Number: 100} + err = db1.SealBlock(parentBlock1.Hash, block1, 1000) + require.NoError(t, err) + + parentBlock2 := eth.BlockID{Hash: common.Hash{0x11}, Number: 199} + err = db2.SealBlock(common.Hash{}, parentBlock2, 1998) + require.NoError(t, err) + + block2 := eth.BlockID{Hash: common.Hash{0x12}, Number: 200} + err = db2.SealBlock(parentBlock2.Hash, block2, 2000) + require.NoError(t, err) + + // Verify each chain has its own data + latestBlock1, ok := db1.LatestSealedBlock() + require.True(t, ok) + require.Equal(t, uint64(100), latestBlock1.Number) + + latestBlock2, ok := db2.LatestSealedBlock() + require.True(t, ok) + require.Equal(t, uint64(200), latestBlock2.Number) + }) +} + +// ============================================================================= +// TestVerifyPreviousTimestampSealed +// ============================================================================= + +func TestVerifyPreviousTimestampSealed(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + activationTS uint64 + queryTS uint64 + blockTime uint64 + dbHasBlocks bool + sealTimestamp uint64 + findSealErr error + wantErr bool + wantErrIs error + wantHashNil bool + }{ + { + name: "activation timestamp with empty DB returns nil hash", + activationTS: 1000, + queryTS: 1000, + blockTime: 1, + dbHasBlocks: false, + wantErr: false, + wantHashNil: true, + }, + { + name: "activation timestamp with non-empty DB succeeds (restart case)", + activationTS: 1000, + queryTS: 1000, + blockTime: 1, + dbHasBlocks: true, + sealTimestamp: 1000, // DB has block at activation timestamp + wantErr: false, + wantHashNil: false, + }, + { + name: "non-activation timestamp with empty DB errors", + activationTS: 1000, + queryTS: 1001, + blockTime: 1, + dbHasBlocks: false, + wantErr: true, + wantErrIs: ErrPreviousTimestampNotSealed, + wantHashNil: true, + }, + { + name: "seal timestamp == query timestamp succeeds (already sealed)", + activationTS: 1000, + queryTS: 1001, + blockTime: 1, + dbHasBlocks: true, + sealTimestamp: 1001, // Same as queryTS - already past this timestamp + wantErr: false, + wantHashNil: false, + }, + { + name: "seal timestamp > query timestamp succeeds (already past)", + activationTS: 1000, + queryTS: 1001, + blockTime: 1, + dbHasBlocks: true, + sealTimestamp: 1005, // Past queryTS + wantErr: false, + wantHashNil: false, + }, + { + name: "seal timestamp < query timestamp (exact ts-1) succeeds", + activationTS: 1000, + queryTS: 1001, + blockTime: 1, + dbHasBlocks: true, + sealTimestamp: 1000, // gap = 1, blockTime = 1 + wantErr: false, + wantHashNil: false, + }, + { + name: "seal timestamp within block time succeeds", + activationTS: 1000, + queryTS: 1002, + blockTime: 2, // blockTime = 2 + dbHasBlocks: true, + sealTimestamp: 1000, // gap = 2, blockTime = 2 - OK + wantErr: false, + wantHashNil: false, + }, + { + name: "gap exceeds block time errors", + activationTS: 1000, + queryTS: 1003, + blockTime: 2, // blockTime = 2 + dbHasBlocks: true, + sealTimestamp: 1000, // gap = 3, blockTime = 2 - ERROR + wantErr: true, + wantErrIs: ErrPreviousTimestampNotSealed, + wantHashNil: true, + }, + { + name: "FindSealedBlock error propagated", + activationTS: 1000, + queryTS: 1001, + blockTime: 1, + dbHasBlocks: true, + findSealErr: errors.New("database error"), + wantErr: true, + wantHashNil: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + interop := &Interop{ + log: gethlog.New(), + activationTimestamp: tt.activationTS, + } + chainID := eth.ChainIDFromUInt64(10) + expectedHash := common.Hash{0x01} + db := &mockLogsDB{ + hasBlocks: tt.dbHasBlocks, + latestBlock: eth.BlockID{Hash: expectedHash, Number: 100}, + seal: suptypes.BlockSeal{ + Hash: expectedHash, + Number: 100, + Timestamp: tt.sealTimestamp, + }, + findSealErr: tt.findSealErr, + } + + block, _, err := interop.verifyCanAddTimestamp(chainID, db, tt.queryTS, tt.blockTime) + + if tt.wantErr { + require.Error(t, err) + if tt.wantErrIs != nil { + require.ErrorIs(t, err, tt.wantErrIs) + } + } else { + require.NoError(t, err) + } + + if tt.wantHashNil { + require.Equal(t, common.Hash{}, block.Hash, "expected zero hash") + } else { + require.NotEqual(t, common.Hash{}, block.Hash, "expected non-zero hash") + require.Equal(t, expectedHash, block.Hash) + } + }) + } +} + +// ============================================================================= +// TestProcessBlockLogs +// ============================================================================= + +func TestProcessBlockLogs(t *testing.T) { + t.Parallel() + + t.Run("empty receipts seals block with no logs", func(t *testing.T) { + t.Parallel() + + interop := &Interop{log: gethlog.New()} + db := &mockLogsDB{} + blockInfo := &testBlockInfo{ + hash: common.Hash{0x02}, + parentHash: common.Hash{0x01}, + number: 100, + timestamp: 1000, + } + + err := interop.processBlockLogs(db, blockInfo, types.Receipts{}, false) + + require.NoError(t, err) + require.NotNil(t, db.sealBlockCall) + require.Equal(t, common.Hash{0x01}, db.sealBlockCall.parentHash) + require.Equal(t, uint64(100), db.sealBlockCall.block.Number) + require.Equal(t, uint64(1000), db.sealBlockCall.timestamp) + require.Equal(t, 0, db.addLogCalls) + }) + + t.Run("multiple logs extracted from receipts", func(t *testing.T) { + t.Parallel() + + interop := &Interop{log: gethlog.New()} + db := &mockLogsDB{} + blockInfo := &testBlockInfo{ + hash: common.Hash{0x02}, + parentHash: common.Hash{0x01}, + number: 100, + timestamp: 1000, + } + + receipts := types.Receipts{ + &types.Receipt{ + Logs: []*types.Log{ + {Address: common.Address{0xAA}, Data: []byte{0x01}}, + {Address: common.Address{0xBB}, Data: []byte{0x02}}, + }, + }, + &types.Receipt{ + Logs: []*types.Log{ + {Address: common.Address{0xCC}, Data: []byte{0x03}}, + }, + }, + } + + err := interop.processBlockLogs(db, blockInfo, receipts, false) + + require.NoError(t, err) + require.Equal(t, 3, db.addLogCalls) + require.NotNil(t, db.sealBlockCall) + }) + + t.Run("genesis block handled correctly", func(t *testing.T) { + t.Parallel() + + interop := &Interop{log: gethlog.New()} + db := &mockLogsDB{} + blockInfo := &testBlockInfo{ + hash: common.Hash{0x01}, + parentHash: common.Hash{}, // Genesis has no parent + number: 0, + timestamp: 1000, + } + + err := interop.processBlockLogs(db, blockInfo, types.Receipts{}, true) + + require.NoError(t, err) + require.NotNil(t, db.sealBlockCall) + require.Equal(t, uint64(0), db.sealBlockCall.block.Number) + }) + + t.Run("first block at non-zero number uses empty parent", func(t *testing.T) { + t.Parallel() + + interop := &Interop{log: gethlog.New()} + db := &mockLogsDB{} + blockInfo := &testBlockInfo{ + hash: common.Hash{0x02}, + parentHash: common.Hash{0x01}, // Real parent hash + number: 10, // Non-zero block number + timestamp: 1000, + } + + // isFirstBlock=true should use empty parent for both AddLog and SealBlock + // This allows the logsDB to treat this block as its genesis + err := interop.processBlockLogs(db, blockInfo, types.Receipts{}, true) + + require.NoError(t, err) + require.NotNil(t, db.sealBlockCall) + // Both AddLog and SealBlock should use empty parent for first block + require.Equal(t, common.Hash{}, db.sealBlockCall.parentHash) + }) + + t.Run("AddLog error propagated", func(t *testing.T) { + t.Parallel() + + interop := &Interop{log: gethlog.New()} + db := &mockLogsDB{addLogErr: errors.New("add log failed")} + blockInfo := &testBlockInfo{ + hash: common.Hash{0x02}, + parentHash: common.Hash{0x01}, + number: 100, + timestamp: 1000, + } + receipts := types.Receipts{ + &types.Receipt{ + Logs: []*types.Log{{Address: common.Address{0xAA}}}, + }, + } + + err := interop.processBlockLogs(db, blockInfo, receipts, false) + + require.Error(t, err) + require.Contains(t, err.Error(), "add log failed") + }) + + t.Run("SealBlock error propagated", func(t *testing.T) { + t.Parallel() + + interop := &Interop{log: gethlog.New()} + db := &mockLogsDB{sealBlockErr: errors.New("seal failed")} + blockInfo := &testBlockInfo{ + hash: common.Hash{0x02}, + parentHash: common.Hash{0x01}, + number: 100, + timestamp: 1000, + } + + err := interop.processBlockLogs(db, blockInfo, types.Receipts{}, false) + + require.Error(t, err) + require.Contains(t, err.Error(), "seal failed") + }) +} + +// ============================================================================= +// TestLoadLogs_ParentHashMismatch +// ============================================================================= + +func TestLoadLogs_ParentHashMismatch(t *testing.T) { + t.Parallel() + dataDir := t.TempDir() + + chainID := eth.ChainIDFromUInt64(10) + firstBlockHash := common.Hash{0x01} + wrongParentHash := common.Hash{0xFF} + + callCount := 0 + mockChain := &statefulMockChainContainer{ + id: chainID, + blockAtTimestampFn: func(ts uint64) (eth.L2BlockRef, error) { + if ts == 1000 { + return eth.L2BlockRef{ + Hash: firstBlockHash, + Number: 100, + Time: 1000, + }, nil + } + return eth.L2BlockRef{ + Hash: common.Hash{0x02}, + Number: 101, + Time: 1001, + }, nil + }, + fetchReceiptsFn: func(blockID eth.BlockID) (eth.BlockInfo, types.Receipts, error) { + callCount++ + if callCount == 1 { + return &testBlockInfo{ + hash: firstBlockHash, + parentHash: common.Hash{}, + number: 100, + timestamp: 1000, + }, types.Receipts{}, nil + } + return &testBlockInfo{ + hash: common.Hash{0x02}, + parentHash: wrongParentHash, // Wrong parent! + number: 101, + timestamp: 1001, + }, types.Receipts{}, nil + }, + } + + chains := map[eth.ChainID]cc.ChainContainer{chainID: mockChain} + interop := New(gethlog.New(), 1000, chains, dataDir) + require.NotNil(t, interop) + interop.ctx = context.Background() + defer func() { _ = interop.Stop(context.Background()) }() + + // Load logs for activation timestamp + err := interop.loadLogs(1000) + require.NoError(t, err) + + // Try to load logs for 1001 - should fail due to parent hash mismatch + err = interop.loadLogs(1001) + require.Error(t, err) + require.ErrorIs(t, err, ErrParentHashMismatch) +} + +// ============================================================================= +// Mock Types for LogsDB Tests +// ============================================================================= + +type mockLogsDB struct { + latestBlock eth.BlockID + hasBlocks bool + seal suptypes.BlockSeal + findSealErr error + addLogErr error + sealBlockErr error + addLogCalls int + sealBlockCall *sealBlockCall + + firstSealedBlock suptypes.BlockSeal + firstSealedBlockErr error + + openBlockRef eth.BlockRef + openBlockLogCnt uint32 + openBlockExecMsg map[uint32]*suptypes.ExecutingMessage + openBlockErr error + + containsSeal suptypes.BlockSeal + containsErr error +} + +type sealBlockCall struct { + parentHash common.Hash + block eth.BlockID + timestamp uint64 +} + +func (m *mockLogsDB) LatestSealedBlock() (eth.BlockID, bool) { + return m.latestBlock, m.hasBlocks +} + +func (m *mockLogsDB) FirstSealedBlock() (suptypes.BlockSeal, error) { + if m.firstSealedBlockErr != nil { + return suptypes.BlockSeal{}, m.firstSealedBlockErr + } + return m.firstSealedBlock, nil +} + +func (m *mockLogsDB) FindSealedBlock(number uint64) (suptypes.BlockSeal, error) { + if m.findSealErr != nil { + return suptypes.BlockSeal{}, m.findSealErr + } + return m.seal, nil +} + +func (m *mockLogsDB) OpenBlock(blockNum uint64) (eth.BlockRef, uint32, map[uint32]*suptypes.ExecutingMessage, error) { + if m.openBlockErr != nil { + return eth.BlockRef{}, 0, nil, m.openBlockErr + } + return m.openBlockRef, m.openBlockLogCnt, m.openBlockExecMsg, nil +} + +func (m *mockLogsDB) Contains(query suptypes.ContainsQuery) (suptypes.BlockSeal, error) { + if m.containsErr != nil { + return suptypes.BlockSeal{}, m.containsErr + } + return m.containsSeal, nil +} + +func (m *mockLogsDB) AddLog(logHash common.Hash, parentBlock eth.BlockID, logIdx uint32, execMsg *suptypes.ExecutingMessage) error { + m.addLogCalls++ + return m.addLogErr +} + +func (m *mockLogsDB) SealBlock(parentHash common.Hash, block eth.BlockID, timestamp uint64) error { + m.sealBlockCall = &sealBlockCall{ + parentHash: parentHash, + block: block, + timestamp: timestamp, + } + return m.sealBlockErr +} + +func (m *mockLogsDB) Close() error { + return nil +} + +var _ LogsDB = (*mockLogsDB)(nil) + +// statefulMockChainContainer allows dynamic behavior based on test state +type statefulMockChainContainer struct { + id eth.ChainID + blockAtTimestampFn func(ts uint64) (eth.L2BlockRef, error) + fetchReceiptsFn func(blockID eth.BlockID) (eth.BlockInfo, types.Receipts, error) +} + +func (m *statefulMockChainContainer) ID() eth.ChainID { return m.id } +func (m *statefulMockChainContainer) Start(ctx context.Context) error { return nil } +func (m *statefulMockChainContainer) Stop(ctx context.Context) error { return nil } +func (m *statefulMockChainContainer) Pause(ctx context.Context) error { return nil } +func (m *statefulMockChainContainer) Resume(ctx context.Context) error { return nil } +func (m *statefulMockChainContainer) RegisterVerifier(v activity.VerificationActivity) { +} +func (m *statefulMockChainContainer) BlockAtTimestamp(ctx context.Context, ts uint64, label eth.BlockLabel) (eth.L2BlockRef, error) { + return m.blockAtTimestampFn(ts) +} +func (m *statefulMockChainContainer) VerifiedAt(ctx context.Context, ts uint64) (eth.BlockID, eth.BlockID, error) { + return eth.BlockID{}, eth.BlockID{}, nil +} +func (m *statefulMockChainContainer) L1ForL2(ctx context.Context, l2Block eth.BlockID) (eth.BlockID, error) { + return eth.BlockID{}, nil +} +func (m *statefulMockChainContainer) OptimisticAt(ctx context.Context, ts uint64) (eth.BlockID, eth.BlockID, error) { + return eth.BlockID{}, eth.BlockID{}, nil +} +func (m *statefulMockChainContainer) OutputRootAtL2BlockNumber(ctx context.Context, l2BlockNum uint64) (eth.Bytes32, error) { + return eth.Bytes32{}, nil +} +func (m *statefulMockChainContainer) OptimisticOutputAtTimestamp(ctx context.Context, ts uint64) (*eth.OutputResponse, error) { + return nil, nil +} +func (m *statefulMockChainContainer) FetchReceipts(ctx context.Context, blockID eth.BlockID) (eth.BlockInfo, types.Receipts, error) { + return m.fetchReceiptsFn(blockID) +} +func (m *statefulMockChainContainer) SyncStatus(ctx context.Context) (*eth.SyncStatus, error) { + return ð.SyncStatus{}, nil +} +func (m *statefulMockChainContainer) BlockTime() uint64 { return 1 } +func (m *statefulMockChainContainer) RewindEngine(ctx context.Context, timestamp uint64) error { + return nil +} + +var _ cc.ChainContainer = (*statefulMockChainContainer)(nil) diff --git a/op-supernode/supernode/activity/interop/types.go b/op-supernode/supernode/activity/interop/types.go index 629457dd2ce..252aa0f7e34 100644 --- a/op-supernode/supernode/activity/interop/types.go +++ b/op-supernode/supernode/activity/interop/types.go @@ -1,8 +1,6 @@ package interop import ( - "errors" - "github.com/ethereum-optimism/optimism/op-service/eth" ) @@ -32,8 +30,6 @@ func (r *Result) IsEmpty() bool { return r.L1Head == (eth.BlockID{}) && len(r.L2Heads) == 0 && len(r.InvalidHeads) == 0 } -var ErrInvalidResult = errors.New("result is invalid") - func (r *Result) ToVerifiedResult() VerifiedResult { return VerifiedResult{ Timestamp: r.Timestamp, diff --git a/op-supernode/supernode/activity/superroot/superroot_test.go b/op-supernode/supernode/activity/superroot/superroot_test.go index 41d563e6f86..443162d5a46 100644 --- a/op-supernode/supernode/activity/superroot/superroot_test.go +++ b/op-supernode/supernode/activity/superroot/superroot_test.go @@ -10,6 +10,7 @@ import ( cc "github.com/ethereum-optimism/optimism/op-supernode/supernode/chain_container" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" gethlog "github.com/ethereum/go-ethereum/log" "github.com/stretchr/testify/require" ) @@ -87,10 +88,16 @@ func (m *mockCC) L1ForL2(ctx context.Context, l2Block eth.BlockID) (eth.BlockID, return eth.BlockID{}, nil } +func (m *mockCC) FetchReceipts(ctx context.Context, blockID eth.BlockID) (eth.BlockInfo, types.Receipts, error) { + return nil, nil, nil +} + func (m *mockCC) ID() eth.ChainID { return eth.ChainIDFromUInt64(10) } +func (m *mockCC) BlockTime() uint64 { return 1 } + var _ cc.ChainContainer = (*mockCC)(nil) func TestSuperroot_AtTimestamp_Succeeds(t *testing.T) { diff --git a/op-supernode/supernode/chain_container/chain_container.go b/op-supernode/supernode/chain_container/chain_container.go index a9993278ad5..ddba119119a 100644 --- a/op-supernode/supernode/chain_container/chain_container.go +++ b/op-supernode/supernode/chain_container/chain_container.go @@ -20,6 +20,7 @@ import ( "github.com/ethereum-optimism/optimism/op-supernode/supernode/chain_container/engine_controller" "github.com/ethereum-optimism/optimism/op-supernode/supernode/chain_container/virtual_node" "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/core/types" gethlog "github.com/ethereum/go-ethereum/log" "github.com/prometheus/client_golang/prometheus" ) @@ -43,6 +44,11 @@ type ChainContainer interface { // RewindEngine rewinds the engine to the highest block with timestamp less than or equal to the given timestamp. RewindEngine(ctx context.Context, timestamp uint64) error RegisterVerifier(v activity.VerificationActivity) + // FetchReceipts fetches the receipts for a given block by hash. + // Returns block info and receipts, or an error if the block or receipts cannot be fetched. + FetchReceipts(ctx context.Context, blockHash eth.BlockID) (eth.BlockInfo, types.Receipts, error) + // BlockTime returns the block time in seconds for this chain. + BlockTime() uint64 } type virtualNodeFactory func(cfg *opnodecfg.Config, log gethlog.Logger, initOverrides *rollupNode.InitializationOverrides, appVersion string) virtual_node.VirtualNode @@ -373,6 +379,22 @@ func (c *simpleChainContainer) OptimisticOutputAtTimestamp(ctx context.Context, return out, nil } +// FetchReceipts fetches the receipts for a given block by hash. +func (c *simpleChainContainer) FetchReceipts(ctx context.Context, blockID eth.BlockID) (eth.BlockInfo, types.Receipts, error) { + if c.engine == nil { + return nil, nil, engine_controller.ErrNoEngineClient + } + return c.engine.FetchReceipts(ctx, blockID.Hash) +} + +// BlockTime returns the block time in seconds for this chain from the rollup config. +func (c *simpleChainContainer) BlockTime() uint64 { + if c.vncfg == nil { + return 0 + } + return c.vncfg.Rollup.BlockTime +} + // attachInProcRollupClient creates a new in-proc rollup RPC client bound to the current rpcHandler. // It will close any existing client before replacing it. func (c *simpleChainContainer) attachInProcRollupClient() error { diff --git a/op-supernode/supernode/chain_container/chain_container_test.go b/op-supernode/supernode/chain_container/chain_container_test.go index 4c8c5db640e..3f00cbfcc40 100644 --- a/op-supernode/supernode/chain_container/chain_container_test.go +++ b/op-supernode/supernode/chain_container/chain_container_test.go @@ -19,6 +19,8 @@ import ( "github.com/ethereum-optimism/optimism/op-supernode/supernode/chain_container/engine_controller" "github.com/ethereum-optimism/optimism/op-supernode/supernode/chain_container/virtual_node" "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" gethlog "github.com/ethereum/go-ethereum/log" "github.com/stretchr/testify/require" ) @@ -135,6 +137,10 @@ func (m *mockEngineController) OutputV0AtBlockNumber(ctx context.Context, num ui return nil, nil } +func (m *mockEngineController) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) { + return nil, nil, nil +} + func (m *mockEngineController) Close() error { return nil } diff --git a/op-supernode/supernode/chain_container/engine_controller/engine_controller.go b/op-supernode/supernode/chain_container/engine_controller/engine_controller.go index 696a88ccbf2..58053ef64c2 100644 --- a/op-supernode/supernode/chain_container/engine_controller/engine_controller.go +++ b/op-supernode/supernode/chain_container/engine_controller/engine_controller.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" gethlog "github.com/ethereum/go-ethereum/log" ) @@ -25,6 +26,8 @@ type EngineController interface { OutputV0AtBlockNumber(ctx context.Context, num uint64) (*eth.OutputV0, error) // RewindToTimestamp rewinds the L2 execution layer to block at or before the given timestamp. RewindToTimestamp(ctx context.Context, timestamp uint64) error + // FetchReceipts fetches the receipts for a given block by hash. + FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) // Close releases any underlying RPC resources. Close() error } @@ -37,6 +40,7 @@ type l2Provider interface { PayloadByNumber(ctx context.Context, number uint64) (*eth.ExecutionPayloadEnvelope, error) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceState, attr *eth.PayloadAttributes) (*eth.ForkchoiceUpdatedResult, error) NewPayload(ctx context.Context, payload *eth.ExecutionPayload, parentBeaconBlockRoot *common.Hash) (*eth.PayloadStatusV1, error) + FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) Close() } @@ -151,6 +155,13 @@ func (e *simpleEngineController) OutputV0AtBlockNumber(ctx context.Context, num return e.l2.OutputV0AtBlockNumber(ctx, num) } +func (e *simpleEngineController) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) { + if e.l2 == nil { + return nil, nil, ErrNoEngineClient + } + return e.l2.FetchReceipts(ctx, blockHash) +} + func (e *simpleEngineController) Close() error { if e.l2 != nil { e.l2.Close() diff --git a/op-supernode/supernode/chain_container/engine_controller/engine_controller_test.go b/op-supernode/supernode/chain_container/engine_controller/engine_controller_test.go index d756be2dff4..9794fd95d91 100644 --- a/op-supernode/supernode/chain_container/engine_controller/engine_controller_test.go +++ b/op-supernode/supernode/chain_container/engine_controller/engine_controller_test.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" gethlog "github.com/ethereum/go-ethereum/log" "github.com/stretchr/testify/require" ) @@ -147,6 +148,9 @@ func (m *mockL2) ForkchoiceUpdate(ctx context.Context, state *eth.ForkchoiceStat } return ð.ForkchoiceUpdatedResult{PayloadStatus: eth.PayloadStatusV1{Status: eth.ExecutionValid}}, nil } +func (m *mockL2) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) { + return nil, nil, nil +} func (m *mockL2) Close() { } func (m *mockL2) NewPayload(ctx context.Context, payload *eth.ExecutionPayload, parentBeaconBlockRoot *common.Hash) (*eth.PayloadStatusV1, error) {