diff --git a/backend/common.go b/backend/common.go index 8f4e0fe..2b55735 100644 --- a/backend/common.go +++ b/backend/common.go @@ -18,6 +18,7 @@ package backend import ( "encoding/binary" + "strconv" ) func uint64ToBytes(i uint64) []byte { @@ -25,3 +26,7 @@ func uint64ToBytes(i uint64) []byte { binary.BigEndian.PutUint64(buf[:], i) return buf[:] } + +func uint64ToHex(i uint64) []byte { + return []byte(strconv.FormatUint(i, 16)) +} diff --git a/backend/torrent.go b/backend/torrent.go index 1aa15ad..b8136f1 100644 --- a/backend/torrent.go +++ b/backend/torrent.go @@ -47,14 +47,14 @@ func (fs *ChainDB) SetTorrentProgress(ih string, size uint64) (bool, uint64, err v := buk.Get([]byte(ih)) if v == nil { - err = buk.Put([]byte(ih), []byte(strconv.FormatUint(size, 16))) + err = buk.Put([]byte(ih), uint64ToHex(size)) } else { s, err := strconv.ParseUint(string(v), 16, 64) if err != nil { return err } if size > s { - err = buk.Put([]byte(ih), []byte(strconv.FormatUint(size, 16))) + err = buk.Put([]byte(ih), uint64ToHex(size)) } else { size = s } diff --git a/config.go b/config.go index 2c0b443..912cf50 100644 --- a/config.go +++ b/config.go @@ -17,6 +17,7 @@ package robot import ( + "github.com/CortexFoundation/CortexTheseus/metrics" "time" ) @@ -25,3 +26,10 @@ const ( delay = 12 //params.Delay timeout = 30 * time.Second ) + +var ( + rpcBlockMeter = metrics.NewRegisteredMeter("torrent/block/call", nil) + rpcCurrentMeter = metrics.NewRegisteredMeter("torrent/current/call", nil) + rpcUploadMeter = metrics.NewRegisteredMeter("torrent/upload/call", nil) + rpcReceiptMeter = metrics.NewRegisteredMeter("torrent/receipt/call", nil) +) diff --git a/model_srv.go b/model_srv.go index 7a072f1..f9336d2 100644 --- a/model_srv.go +++ b/model_srv.go @@ -79,6 +79,7 @@ func (m *Monitor) parseBlockTorrentInfo(b *types.Block) (bool, error) { start = mclock.Now() final []types.Transaction ) + for _, tx := range b.Txs { if meta := tx.Parse(); meta != nil { log.Debug("Data encounter", "ih", meta.InfoHash, "number", b.Number, "meta", meta) @@ -112,7 +113,7 @@ func (m *Monitor) parseBlockTorrentInfo(b *types.Block) (bool, error) { file.LeftSize = remainingSize if _, progress, err := m.fs.AddFile(file); err != nil { return false, err - } else if progress { // && progress { + } else if progress { log.Debug("Update storage success", "ih", file.Meta.InfoHash, "left", file.LeftSize) var bytesRequested uint64 if file.Meta.RawSize > file.LeftSize { @@ -134,10 +135,12 @@ func (m *Monitor) parseBlockTorrentInfo(b *types.Block) (bool, error) { final = append(final, tx) } } + if len(final) > 0 && len(final) < len(b.Txs) { log.Debug("Final txs layout", "total", len(b.Txs), "final", len(final), "num", b.Number, "txs", m.fs.Txs()) b.Txs = final } + if record { if err := m.fs.AddBlock(b); err == nil { log.Info("Root has been changed", "number", b.Number, "hash", b.Hash, "root", m.fs.Root()) @@ -145,9 +148,11 @@ func (m *Monitor) parseBlockTorrentInfo(b *types.Block) (bool, error) { log.Warn("Block added failed", "number", b.Number, "hash", b.Hash, "root", m.fs.Root(), "err", err) } } + if len(b.Txs) > 0 { elapsed := time.Duration(mclock.Now()) - time.Duration(start) log.Trace("Transactions scanning", "count", len(b.Txs), "number", b.Number, "elapsed", common.PrettyDuration(elapsed)) } + return record, nil } diff --git a/monitor.go b/monitor.go index 91b5c47..084cfc2 100644 --- a/monitor.go +++ b/monitor.go @@ -22,7 +22,6 @@ import ( "github.com/CortexFoundation/CortexTheseus/common" "github.com/CortexFoundation/CortexTheseus/common/mclock" "github.com/CortexFoundation/CortexTheseus/log" - "github.com/CortexFoundation/CortexTheseus/metrics" "github.com/CortexFoundation/CortexTheseus/rpc" "github.com/CortexFoundation/robot/backend" "github.com/CortexFoundation/torrentfs/params" @@ -38,13 +37,6 @@ import ( "time" ) -var ( - rpcBlockMeter = metrics.NewRegisteredMeter("torrent/block/call", nil) - rpcCurrentMeter = metrics.NewRegisteredMeter("torrent/current/call", nil) - rpcUploadMeter = metrics.NewRegisteredMeter("torrent/upload/call", nil) - rpcReceiptMeter = metrics.NewRegisteredMeter("torrent/receipt/call", nil) -) - // Monitor observes the data changes on the blockchain and synchronizes. // cl for ipc/rpc communication, dl for download manager, and fs for data storage. type Monitor struct { @@ -188,7 +180,7 @@ func (m *Monitor) Callback() chan any { return m.callback } -func (m *Monitor) loadHistory() error { +/*func (m *Monitor) loadHistory() error { torrents, _ := m.fs.InitTorrents() if m.mode != params.LAZY { for k, v := range torrents { @@ -206,7 +198,7 @@ func (m *Monitor) loadHistory() error { } return nil -} +}*/ func (m *Monitor) download(ctx context.Context, k string, v uint64) error { if m.mode != params.LAZY && m.callback != nil {