diff --git a/tx-submitter/db/db.go b/tx-submitter/db/db.go new file mode 100644 index 000000000..4a5914c49 --- /dev/null +++ b/tx-submitter/db/db.go @@ -0,0 +1,49 @@ +package db + +import ( + "fmt" + "strconv" + + "morph-l2/tx-submitter/utils" + + "github.com/morph-l2/go-ethereum/ethdb/leveldb" +) + +var ( + ErrKeyNotFound = fmt.Errorf("not found") +) + +type Db struct { + db *leveldb.Database +} + +func New(pathname string) (*Db, error) { + // leveldb + ldb, err := leveldb.New(pathname, 0, 0, "tx-submitter", false) + if err != nil { + return nil, fmt.Errorf("failed to create leveldb: %w", err) + } + return &Db{db: ldb}, nil +} +func (d *Db) GetFloat(key string) (float64, error) { + v, err := d.db.Get([]byte(key)) + if err != nil { + return 0, fmt.Errorf("failed get key from leveldb %s: %w", key, err) + } + res, err := utils.ParseStringToType[float64](string(v)) + if err != nil { + return 0, fmt.Errorf("failed to parse string to float64 %s", err) + } + return res, nil +} +func (d *Db) PutFloat(key string, val float64) error { + valStr := strconv.FormatFloat(val, 'f', -1, 64) + err := d.db.Put([]byte(key), []byte(valStr)) + if err != nil { + return fmt.Errorf("failed to put key into leveldb %w", err) + } + return nil +} +func (d *Db) Close() error { + return d.db.Close() +} diff --git a/tx-submitter/db/db_test.go b/tx-submitter/db/db_test.go new file mode 100644 index 000000000..a7ecac6da --- /dev/null +++ b/tx-submitter/db/db_test.go @@ -0,0 +1,21 @@ +package db + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_Get_Put(t *testing.T) { + db, err := New("test") + require.NoError(t, err) + + expect := 1.123456789012345 + // expectStr := "1.123456789012345" + + err = db.PutFloat("test", expect) + require.NoError(t, err) + v, err := db.GetFloat("test") + require.NoError(t, err) + require.Equal(t, expect, v) +} diff --git a/tx-submitter/entry.go b/tx-submitter/entry.go index 0145243ad..1b9809b3a 100644 --- a/tx-submitter/entry.go +++ b/tx-submitter/entry.go @@ -10,8 +10,10 @@ import ( "os" "os/signal" "strings" + "time" "morph-l2/bindings/bindings" + "morph-l2/tx-submitter/db" "morph-l2/tx-submitter/event" "morph-l2/tx-submitter/iface" "morph-l2/tx-submitter/metrics" @@ -65,6 +67,7 @@ func Main() func(ctx *cli.Context) error { "rough_estimate_base_gas", cfg.RollupTxGasBase, "rough_estimate_per_l1_msg", cfg.RollupTxGasPerL1Msg, "log_level", cfg.LogLevel, + "leveldb_pathname", cfg.LeveldbPathName, ) ctx, cancel := context.WithCancel(context.Background()) @@ -96,14 +99,11 @@ func Main() func(ctx *cli.Context) error { } output = io.MultiWriter(output, logFile) } - logHandler := log.StreamHandler(output, log.TerminalFormat(false)) - logLevel, err := log.LvlFromString(cfg.LogLevel) if err != nil { return err } - log.Root().SetHandler(log.LvlFilterHandler(logLevel, logHandler)) l1RpcClient, err := rpc.Dial(cfg.L1EthRpc) @@ -189,6 +189,11 @@ func Main() func(ctx *cli.Context) error { // start rorator event indexer rotator.StartEventIndexer() + ldb, err := db.New(cfg.LeveldbPathName) + if err != nil { + return fmt.Errorf("failed to connect leveldb: %w", err) + } + // new rollup service sr := services.NewRollup( ctx, @@ -205,11 +210,9 @@ func Main() func(ctx *cli.Context) error { cfg, rsaPriv, rotator, + ldb, ) - // init rollup service - if err := sr.PreCheck(); err != nil { - return err - } + // metrics { if cfg.MetricsServerEnable { @@ -223,11 +226,6 @@ func Main() func(ctx *cli.Context) error { log.Info("metrics server enabled", "host", cfg.MetricsHostname, "port", cfg.MetricsPort) } - // log external sign info - if cfg.ExternalSign { - - } - log.Info("external sign info", "external_sign", cfg.ExternalSign, "appid", cfg.ExternalSignAppid, @@ -236,7 +234,12 @@ func Main() func(ctx *cli.Context) error { "url", cfg.ExternalSignUrl, ) - sr.Start() + err = sr.Start() + for err != nil { + log.Error("rollup service start failed", "error", err) + time.Sleep(time.Second * 5) + err = sr.Start() + } // Catch CTRL-C to ensure a graceful shutdown. interrupt := make(chan os.Signal, 1) diff --git a/tx-submitter/flags/flags.go b/tx-submitter/flags/flags.go index dcfe874ca..522546488 100644 --- a/tx-submitter/flags/flags.go +++ b/tx-submitter/flags/flags.go @@ -292,6 +292,12 @@ var ( Value: 100, EnvVar: prefixEnvVar("EVENT_INDEX_STEP"), } + LeveldbPathNameFlag = cli.StringFlag{ + Name: "leveldb_path_name", + Usage: "The path name of the leveldb", + EnvVar: prefixEnvVar("LEVELDB_PATH_NAME"), + Value: "submitter-leveldb", + } ) var requiredFlags = []cli.Flag{ @@ -345,6 +351,7 @@ var optionalFlags = []cli.Flag{ RotatorBufferFlag, StakingEventStoreFileFlag, EventIndexStepFlag, + LeveldbPathNameFlag, } // Flags contains the list of configuration options available to the binary. diff --git a/tx-submitter/metrics/metrics.go b/tx-submitter/metrics/metrics.go index 05f9d1afe..2b09d467c 100644 --- a/tx-submitter/metrics/metrics.go +++ b/tx-submitter/metrics/metrics.go @@ -19,8 +19,8 @@ type Metrics struct { FinalizeCostSum prometheus.Counter RollupCost prometheus.Gauge FinalizeCost prometheus.Gauge - L1FeeCollectionSum prometheus.Counter - L1FeeCollection prometheus.Gauge + CollectedL1FeeSum prometheus.Counter + CollectedL1Fee prometheus.Gauge IndexerBlockProcessed prometheus.Gauge } @@ -57,14 +57,14 @@ func NewMetrics() *Metrics { Help: "Finalize cost", Namespace: metricsNamespace, }), - L1FeeCollection: promauto.NewGauge(prometheus.GaugeOpts{ - Name: "submitter_l1_fee_collection", - Help: "L1 fee collection", + CollectedL1Fee: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "submitter_collected_l1_fee", + Help: "Collected L1 fee for every batch", Namespace: metricsNamespace, }), - L1FeeCollectionSum: promauto.NewCounter(prometheus.CounterOpts{ - Name: "submitter_l1_fee_collection_sum", - Help: "L1 fee collection", + CollectedL1FeeSum: promauto.NewCounter(prometheus.CounterOpts{ + Name: "submitter_collected_l1_fee_sum", + Help: "Collected L1 fee for all batches committed ", Namespace: metricsNamespace, }), @@ -101,12 +101,12 @@ func (m *Metrics) SetRollupCost(cost float64) { func (m *Metrics) SetFinalizeCost(cost float64) { m.FinalizeCostSum.Add(cost) - m.RollupCost.Set(cost) + m.FinalizeCost.Set(cost) } func (m *Metrics) SetCollectedL1Fee(cost float64) { - m.L1FeeCollectionSum.Add(cost) - m.L1FeeCollection.Set(cost) + m.CollectedL1FeeSum.Add(cost) + m.CollectedL1Fee.Set(cost) } func (m *Metrics) SetIndexerBlockProcessed(blockNumber uint64) { diff --git a/tx-submitter/services/rollup.go b/tx-submitter/services/rollup.go index f18e11b61..63a654123 100644 --- a/tx-submitter/services/rollup.go +++ b/tx-submitter/services/rollup.go @@ -27,6 +27,7 @@ import ( "github.com/tendermint/tendermint/blssignatures" "morph-l2/bindings/bindings" + "morph-l2/tx-submitter/db" "morph-l2/tx-submitter/event" "morph-l2/tx-submitter/iface" "morph-l2/tx-submitter/localpool" @@ -35,38 +36,43 @@ import ( ) const ( - txSlotSize = 32 * 1024 - txMaxSize = 4 * txSlotSize // 128KB - rotatorWait = 3 * time.Second + txSlotSize = 32 * 1024 + txMaxSize = 4 * txSlotSize // 128KB + rotatorWait = 3 * time.Second + rollupSumKey = "rollup_sum" + finalizeSumKey = "finalize_sum" + collectedL1FeeSumKey = "collected_l1_fee_sum" ) type Rollup struct { - ctx context.Context - metrics *metrics.Metrics - + ctx context.Context + metrics *metrics.Metrics l1RpcClient *rpc.Client L1Client iface.Client L2Clients []iface.L2Client Rollup iface.IRollup - - Staking iface.IL1Staking - - chainId *big.Int - privKey *ecdsa.PrivateKey - rollupAddr common.Address - abi *abi.ABI - + Staking iface.IL1Staking + chainId *big.Int + privKey *ecdsa.PrivateKey + rollupAddr common.Address + abi *abi.ABI // rotator - rotator *Rotator - pendingTxs *PendingTxs - + rotator *Rotator + pendingTxs *PendingTxs rollupFinalizeMu sync.Mutex externalRsaPriv *rsa.PrivateKey // cfg cfg utils.Config // signer signer types.Signer - + // leveldb + ldb *db.Db + // rollupFeeSum + rollupFeeSum float64 + // finalizeFeeSum + finalizeFeeSum float64 + // collectedL1FeeSum + collectedL1FeeSum float64 // batchcache batchCache map[uint64]*eth.RPCRollupBatch } @@ -86,6 +92,7 @@ func NewRollup( cfg utils.Config, rsaPriv *rsa.PrivateKey, rotator *Rotator, + ldb *db.Db, ) *Rollup { return &Rollup{ @@ -105,10 +112,16 @@ func NewRollup( signer: types.LatestSignerForChainID(chainId), externalRsaPriv: rsaPriv, batchCache: make(map[uint64]*eth.RPCRollupBatch), + ldb: ldb, } } -func (r *Rollup) Start() { +func (r *Rollup) Start() error { + + // init rollup service + if err := r.PreCheck(); err != nil { + return err + } // journal jn := localpool.New(r.cfg.JournalFilePath) @@ -125,6 +138,12 @@ func (r *Rollup) Start() { r.pendingTxs.Recover(txs, r.abi) } + // init fee metrics sum + err = r.InitFeeMetricsSum() + if err != nil { + return fmt.Errorf("init fee metrics sum failed: %w", err) + } + // metrics go utils.Loop(r.ctx, 10*time.Second, func() { @@ -189,7 +208,7 @@ func (r *Rollup) Start() { } } }) - + return nil } func (r *Rollup) ProcessTx() error { @@ -324,10 +343,21 @@ func (r *Rollup) ProcessTx() error { log.Warn("fee is zero", "hash", rtx.Hash().Hex()) } if method == "commitBatch" { + r.rollupFeeSum += fee + err = r.ldb.PutFloat(rollupSumKey, r.rollupFeeSum) + if err != nil { + log.Warn("put rollup fee sum error", "error", err) + } r.metrics.SetRollupCost(fee) index := utils.ParseParentBatchIndex(rtx.Data()) + 1 batch, ok := r.batchCache[index] if ok { + collectedL1FeeFloat := ToEtherFloat((*big.Int)(batch.CollectedL1Fee)) + r.collectedL1FeeSum += collectedL1FeeFloat + err = r.ldb.PutFloat(collectedL1FeeSumKey, r.collectedL1FeeSum) + if err != nil { + log.Warn("put collected l1 fee sum error", "error", err) + } r.metrics.SetCollectedL1Fee(ToEtherFloat((*big.Int)(batch.CollectedL1Fee))) // remove batch from cache delete(r.batchCache, index) @@ -338,6 +368,11 @@ func (r *Rollup) ProcessTx() error { } } else if method == "finalizeBatch" { + r.finalizeFeeSum += fee + err = r.ldb.PutFloat(finalizeSumKey, r.finalizeFeeSum) + if err != nil { + log.Warn("put finalize fee sum error", "error", err) + } r.metrics.SetFinalizeCost(fee) } } @@ -1235,3 +1270,57 @@ func (r *Rollup) RoughFinalizeGasEstimate() uint64 { func (r *Rollup) GetModuleName() string { return "rollup" } + +func (r *Rollup) InitFeeMetricsSum() error { + // try to init rollupFeeSum & finalizeFeeSum + // read rollupFeeSum + rollupFeeSum, err := r.ldb.GetFloat(rollupSumKey) + if err != nil { + log.Warn("read rollupFeeSum from leveldb failed", "error", err) + if utils.ErrStringMatch(err, db.ErrKeyNotFound) { + err = r.ldb.PutFloat(rollupSumKey, 0) + if err != nil { + return fmt.Errorf("put rollupFeeSum to leveldb failed, key: %s, %w", rollupSumKey, err) + } + } else { + return fmt.Errorf("get data from leveldb faild, key: %s, %w", rollupSumKey, err) + } + } + log.Info("rollupFeeSum: %f", rollupFeeSum) + finalizeFeeSum, err := r.ldb.GetFloat(finalizeSumKey) + if err != nil { + log.Warn("read finalizeFeeSum from leveldb failed", "error", err) + if utils.ErrStringMatch(err, db.ErrKeyNotFound) { + err = r.ldb.PutFloat(finalizeSumKey, 0) + if err != nil { + return fmt.Errorf("put finalizeFeeSum to leveldb failed, key: %s, %w", finalizeSumKey, err) + } + } else { + return fmt.Errorf("get data from leveldb faild, key: %s, %w", finalizeSumKey, err) + } + } + log.Info("finalizeFeeSum: %f", finalizeFeeSum) + collectedL1FeeSum, err := r.ldb.GetFloat(collectedL1FeeSumKey) + if err != nil { + log.Warn("read collectedL1FeeSum from leveldb failed", "error", err) + if utils.ErrStringMatch(err, db.ErrKeyNotFound) { + err = r.ldb.PutFloat(collectedL1FeeSumKey, 0) + if err != nil { + return fmt.Errorf("put collectedL1FeeSum to leveldb failed, key: %s, %w", collectedL1FeeSumKey, err) + } + } else { + return fmt.Errorf("get data from leveldb faild, key: %s, %w", collectedL1FeeSumKey, err) + } + } + r.collectedL1FeeSum = collectedL1FeeSum + log.Info("collectedL1FeeSum: %f", collectedL1FeeSum) + + r.rollupFeeSum = rollupFeeSum + r.finalizeFeeSum = finalizeFeeSum + r.collectedL1FeeSum = collectedL1FeeSum + // set fee sum init val + r.metrics.RollupCostSum.Add(r.rollupFeeSum) + r.metrics.FinalizeCostSum.Add(r.finalizeFeeSum) + r.metrics.CollectedL1FeeSum.Add(r.collectedL1FeeSum) + return nil +} diff --git a/tx-submitter/utils/config.go b/tx-submitter/utils/config.go index 5a6a9c814..3d63fa042 100644 --- a/tx-submitter/utils/config.go +++ b/tx-submitter/utils/config.go @@ -104,6 +104,8 @@ type Config struct { L1StakingDeployedBlockNumber uint64 // event indexer index step EventIndexStep uint64 + // leveldb path name + LeveldbPathName string } // NewConfig parses the DriverConfig from the provided flags or environment variables. @@ -171,6 +173,8 @@ func NewConfig(ctx *cli.Context) (Config, error) { L1StakingDeployedBlockNumber: ctx.GlobalUint64(flags.L1StakingDeployedBlocknumFlag.Name), // index step EventIndexStep: ctx.GlobalUint64(flags.EventIndexStepFlag.Name), + // leveldb path name + LeveldbPathName: ctx.GlobalString(flags.LeveldbPathNameFlag.Name), } return cfg, nil diff --git a/tx-submitter/utils/methods.go b/tx-submitter/utils/methods.go index 12d94c2d3..b2e19782a 100644 --- a/tx-submitter/utils/methods.go +++ b/tx-submitter/utils/methods.go @@ -1,7 +1,10 @@ package utils import ( + "fmt" "math/big" + "reflect" + "strconv" "github.com/morph-l2/go-ethereum" "github.com/morph-l2/go-ethereum/common" @@ -61,3 +64,74 @@ func IntersectionOfAddresses(a, b []common.Address) []common.Address { return intersection } + +func ParseStringToType[T any](s string) (T, error) { + var result T + var err error + + // 获取目标类型的名称 + switch any(result).(type) { + case int: + var v int64 + v, err = strconv.ParseInt(s, 10, 0) + result = reflect.ValueOf(int(v)).Interface().(T) + case int8: + var v int64 + v, err = strconv.ParseInt(s, 10, 8) + result = reflect.ValueOf(int8(v)).Interface().(T) + case int16: + var v int64 + v, err = strconv.ParseInt(s, 10, 16) + result = reflect.ValueOf(int16(v)).Interface().(T) + case int32: + var v int64 + v, err = strconv.ParseInt(s, 10, 32) + result = reflect.ValueOf(int32(v)).Interface().(T) + case int64: + var v int64 + v, err = strconv.ParseInt(s, 10, 64) + result = reflect.ValueOf(v).Interface().(T) + case uint: + var v uint64 + v, err = strconv.ParseUint(s, 10, 0) + result = reflect.ValueOf(uint(v)).Interface().(T) + case uint8: + var v uint64 + v, err = strconv.ParseUint(s, 10, 8) + result = reflect.ValueOf(uint8(v)).Interface().(T) + case uint16: + var v uint64 + v, err = strconv.ParseUint(s, 10, 16) + result = reflect.ValueOf(uint16(v)).Interface().(T) + case uint32: + var v uint64 + v, err = strconv.ParseUint(s, 10, 32) + result = reflect.ValueOf(uint32(v)).Interface().(T) + case uint64: + var v uint64 + v, err = strconv.ParseUint(s, 10, 64) + result = reflect.ValueOf(v).Interface().(T) + case float32: + var v float64 + v, err = strconv.ParseFloat(s, 32) + result = reflect.ValueOf(float32(v)).Interface().(T) + case float64: + var v float64 + v, err = strconv.ParseFloat(s, 64) + result = reflect.ValueOf(v).Interface().(T) + case bool: + var v bool + v, err = strconv.ParseBool(s) + result = reflect.ValueOf(v).Interface().(T) + case string: + result = reflect.ValueOf(s).Interface().(T) + default: + return *new(T), fmt.Errorf("unsupported type: %v", reflect.TypeOf(result)) + } + + if err != nil { + return *new(T), err + } + + return result, nil +} diff --git a/tx-submitter/utils/methods_test.go b/tx-submitter/utils/methods_test.go new file mode 100644 index 000000000..381cb65e4 --- /dev/null +++ b/tx-submitter/utils/methods_test.go @@ -0,0 +1,86 @@ +package utils + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseStringToType(t *testing.T) { + tests := []struct { + input string + expected any + hasError bool + }{ + // int cases + {"123", int(123), false}, + {"-123", int(-123), false}, + {"notanumber", int(0), true}, + + // int8 cases + {"123", int8(123), false}, + {"-128", int8(-128), false}, + {"notanumber", int8(0), true}, + + // int16 cases + {"123", int16(123), false}, + {"32767", int16(32767), false}, + {"notanumber", int16(0), true}, + + // uint cases + {"123", uint(123), false}, + {"notanumber", uint(0), true}, + + // float32 cases + {"123.45", float32(123.45), false}, + {"-123.45", float32(-123.45), false}, + {"notanumber", float32(0), true}, + + // float64 cases + {"123.45", float64(123.45), false}, + {"-123.45", float64(-123.45), false}, + {"notanumber", float64(0), true}, + + // bool cases + {"true", true, false}, + {"false", false, false}, + {"notabool", false, true}, + + // string cases + {"test", "test", false}, + } + + for _, tc := range tests { + t.Run(fmt.Sprintf("%v -> %v", tc.input, tc.expected), func(t *testing.T) { + var result any + var err error + + switch tc.expected.(type) { + case int: + result, err = ParseStringToType[int](tc.input) + case int8: + result, err = ParseStringToType[int8](tc.input) + case int16: + result, err = ParseStringToType[int16](tc.input) + case uint: + result, err = ParseStringToType[uint](tc.input) + case float32: + result, err = ParseStringToType[float32](tc.input) + case float64: + result, err = ParseStringToType[float64](tc.input) + case bool: + result, err = ParseStringToType[bool](tc.input) + case string: + result, err = ParseStringToType[string](tc.input) + } + + if tc.hasError { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tc.expected, result) + } + }) + } +}