diff --git a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go index 6b3fed0c7e..b5ffc43d20 100644 --- a/cmd/swarm/swarm-smoke/feed_upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/feed_upload_and_sync.go @@ -26,11 +26,11 @@ const ( feedRandomDataLength = 8 ) -func feedUploadAndSyncCmd(ctx *cli.Context, tuid string) error { +func feedUploadAndSyncCmd(ctx *cli.Context) error { errc := make(chan error) go func() { - errc <- feedUploadAndSync(ctx, tuid) + errc <- feedUploadAndSync(ctx) }() select { @@ -46,7 +46,7 @@ func feedUploadAndSyncCmd(ctx *cli.Context, tuid string) error { } } -func feedUploadAndSync(c *cli.Context, tuid string) error { +func feedUploadAndSync(c *cli.Context) error { log.Info("generating and uploading feeds to " + httpEndpoint(hosts[0]) + " and syncing") // create a random private key to sign updates with and derive the address @@ -272,7 +272,7 @@ func feedUploadAndSync(c *cli.Context, tuid string) error { ruid := uuid.New()[:8] go func(url string, endpoint string, ruid string) { for { - err := fetch(url, endpoint, fileHash, ruid, "") + err := fetch(url, endpoint, fileHash, ruid) if err != nil { continue } diff --git a/cmd/swarm/swarm-smoke/main.go b/cmd/swarm/swarm-smoke/main.go index 860fbcc1dd..2c1dd65a06 100644 --- a/cmd/swarm/swarm-smoke/main.go +++ b/cmd/swarm/swarm-smoke/main.go @@ -37,17 +37,17 @@ var ( ) var ( - allhosts string - hosts []string - filesize int - inputSeed int - syncDelay int - httpPort int - wsPort int - verbosity int - timeout int - single bool - trackTimeout int + allhosts string + hosts []string + filesize int + syncDelay int + inputSeed int + httpPort int + wsPort int + verbosity int + timeout int + single bool + onlyUpload bool ) func main() { @@ -101,7 +101,7 @@ func main() { }, cli.IntFlag{ Name: "timeout", - Value: 120, + Value: 180, Usage: "timeout in seconds after which kill the process", Destination: &timeout, }, @@ -110,11 +110,10 @@ func main() { Usage: "whether to fetch content from a single node or from all nodes", Destination: &single, }, - cli.IntFlag{ - Name: "track-timeout", - Value: 5, - Usage: "timeout in seconds to wait for GetAllReferences to return", - Destination: &trackTimeout, + cli.BoolFlag{ + Name: "only-upload", + Usage: "whether to only upload content to a single node without fetching", + Destination: &onlyUpload, }, } diff --git a/cmd/swarm/swarm-smoke/sliding_window.go b/cmd/swarm/swarm-smoke/sliding_window.go index d589124bd1..ab082c5435 100644 --- a/cmd/swarm/swarm-smoke/sliding_window.go +++ b/cmd/swarm/swarm-smoke/sliding_window.go @@ -35,11 +35,11 @@ type uploadResult struct { digest []byte } -func slidingWindowCmd(ctx *cli.Context, tuid string) error { +func slidingWindowCmd(ctx *cli.Context) error { errc := make(chan error) go func() { - errc <- slidingWindow(ctx, tuid) + errc <- slidingWindow(ctx) }() err := <-errc @@ -49,10 +49,10 @@ func slidingWindowCmd(ctx *cli.Context, tuid string) error { return err } -func slidingWindow(ctx *cli.Context, tuid string) error { +func slidingWindow(ctx *cli.Context) error { var hashes []uploadResult //swarm hashes of the uploads nodes := len(hosts) - log.Info("sliding window test started", "tuid", tuid, "nodes", nodes, "filesize(kb)", filesize, "timeout", timeout) + log.Info("sliding window test started", "nodes", nodes, "filesize(kb)", filesize, "timeout", timeout) uploadedBytes := 0 networkDepth := 0 errored := false @@ -107,7 +107,7 @@ outer: start = time.Now() // fetch hangs when swarm dies out, so we have to jump through a bit more hoops to actually // catch the timeout, but also allow this retry logic - err := fetch(v.hash, httpEndpoint(hosts[idx]), v.digest, ruid, "") + err := fetch(v.hash, httpEndpoint(hosts[idx]), v.digest, ruid) if err != nil { log.Error("error fetching hash", "err", err) continue diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index 6c20a4fa63..6a434a0b2f 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -23,22 +23,20 @@ import ( "io/ioutil" "math/rand" "os" - "strings" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/swarm/api" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/testutil" - "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) -func uploadAndSyncCmd(ctx *cli.Context, tuid string) error { +func uploadAndSyncCmd(ctx *cli.Context) error { // use input seed if it has been set if inputSeed != 0 { seed = inputSeed @@ -49,7 +47,7 @@ func uploadAndSyncCmd(ctx *cli.Context, tuid string) error { errc := make(chan error) go func() { - errc <- uploadAndSync(ctx, randomBytes, tuid) + errc <- uploadAndSync(ctx, randomBytes) }() var err error @@ -65,7 +63,7 @@ func uploadAndSyncCmd(ctx *cli.Context, tuid string) error { } // trigger debug functionality on randomBytes - e := trackChunks(randomBytes[:]) + e := trackChunks(randomBytes[:], true) if e != nil { log.Error(e.Error()) } @@ -73,50 +71,84 @@ func uploadAndSyncCmd(ctx *cli.Context, tuid string) error { return err } -func trackChunks(testData []byte) error { +func trackChunks(testData []byte, submitMetrics bool) error { addrs, err := getAllRefs(testData) if err != nil { return err } for i, ref := range addrs { - log.Trace(fmt.Sprintf("ref %d", i), "ref", ref) + log.Debug(fmt.Sprintf("ref %d", i), "ref", ref) } + var globalYes, globalNo int + var globalMu sync.Mutex + var hasErr bool + + var wg sync.WaitGroup + wg.Add(len(hosts)) + for _, host := range hosts { - httpHost := fmt.Sprintf("ws://%s:%d", host, 8546) + host := host + go func() { + defer wg.Done() + httpHost := fmt.Sprintf("ws://%s:%d", host, 8546) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + rpcClient, err := rpc.DialContext(ctx, httpHost) + if rpcClient != nil { + defer rpcClient.Close() + } + if err != nil { + log.Error("error dialing host", "err", err, "host", httpHost) + hasErr = true + return + } - hostChunks := []string{} + var hostChunks string + err = rpcClient.Call(&hostChunks, "bzz_has", addrs) + if err != nil { + log.Error("error calling rpc client", "err", err, "host", httpHost) + hasErr = true + return + } - rpcClient, err := rpc.Dial(httpHost) - if err != nil { - log.Error("error dialing host", "err", err, "host", httpHost) - continue - } + yes, no := 0, 0 + for _, val := range hostChunks { + if val == '1' { + yes++ + } else { + no++ + } + } - var hasInfo []api.HasInfo - err = rpcClient.Call(&hasInfo, "bzz_has", addrs) - if err != nil { - log.Error("error calling rpc client", "err", err, "host", httpHost) - continue - } + if no == 0 { + log.Info("host reported to have all chunks", "host", host) + } - count := 0 - for _, info := range hasInfo { - if info.Has { - hostChunks = append(hostChunks, "1") - } else { - hostChunks = append(hostChunks, "0") - count++ + log.Debug("chunks", "chunks", hostChunks, "yes", yes, "no", no, "host", host) + + if submitMetrics { + globalMu.Lock() + globalYes += yes + globalNo += no + globalMu.Unlock() } - } + }() + } - if count == 0 { - log.Info("host reported to have all chunks", "host", host) - } + wg.Wait() + + if !hasErr && submitMetrics { + // remove the chunks stored on the uploader node + globalYes -= len(addrs) - log.Trace("chunks", "chunks", strings.Join(hostChunks, ""), "host", host) + metrics.GetOrRegisterCounter("deployment.chunks.yes", nil).Inc(int64(globalYes)) + metrics.GetOrRegisterCounter("deployment.chunks.no", nil).Inc(int64(globalNo)) + metrics.GetOrRegisterCounter("deployment.chunks.refs", nil).Inc(int64(len(addrs))) } + return nil } @@ -130,15 +162,13 @@ func getAllRefs(testData []byte) (storage.AddressCollection, error) { if err != nil { return nil, err } - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(trackTimeout)*time.Second) - defer cancel() reader := bytes.NewReader(testData) - return fileStore.GetAllReferences(ctx, reader, false) + return fileStore.GetAllReferences(context.Background(), reader, false) } -func uploadAndSync(c *cli.Context, randomBytes []byte, tuid string) error { - log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "tuid", tuid, "seed", seed) +func uploadAndSync(c *cli.Context, randomBytes []byte) error { + log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed) t1 := time.Now() hash, err := upload(randomBytes, httpEndpoint(hosts[0])) @@ -155,53 +185,91 @@ func uploadAndSync(c *cli.Context, randomBytes []byte, tuid string) error { return err } - log.Info("uploaded successfully", "tuid", tuid, "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash)) + log.Info("uploaded successfully", "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash)) - time.Sleep(time.Duration(syncDelay) * time.Second) + waitToSync() - wg := sync.WaitGroup{} - if single { - randIndex := 1 + rand.Intn(len(hosts)-1) - ruid := uuid.New()[:8] - wg.Add(1) - go func(endpoint string, ruid string) { - for { - start := time.Now() - err := fetch(hash, endpoint, fhash, ruid, tuid) - if err != nil { - continue - } - ended := time.Since(start) + log.Debug("chunks before fetch attempt", "hash", hash) - metrics.GetOrRegisterResettingTimer("upload-and-sync.single.fetch-time", nil).Update(ended) - log.Info("fetch successful", "tuid", tuid, "ruid", ruid, "took", ended, "endpoint", endpoint) - wg.Done() - return - } - }(httpEndpoint(hosts[randIndex]), ruid) - } else { - for _, endpoint := range hosts[1:] { - ruid := uuid.New()[:8] - wg.Add(1) - go func(endpoint string, ruid string) { - for { - start := time.Now() - err := fetch(hash, endpoint, fhash, ruid, tuid) - if err != nil { - continue - } - ended := time.Since(start) - - metrics.GetOrRegisterResettingTimer("upload-and-sync.each.fetch-time", nil).Update(ended) - log.Info("fetch successful", "tuid", tuid, "ruid", ruid, "took", ended, "endpoint", endpoint) - wg.Done() - return - } - }(httpEndpoint(endpoint), ruid) + err = trackChunks(randomBytes, false) + if err != nil { + log.Error(err.Error()) + } + + if onlyUpload { + log.Debug("only-upload is true, stoppping test", "hash", hash) + return nil + } + + randIndex := 1 + rand.Intn(len(hosts)-1) + + for { + start := time.Now() + err := fetch(hash, httpEndpoint(hosts[randIndex]), fhash, "") + if err != nil { + time.Sleep(2 * time.Second) + continue } + ended := time.Since(start) + + metrics.GetOrRegisterResettingTimer("upload-and-sync.single.fetch-time", nil).Update(ended) + log.Info("fetch successful", "took", ended, "endpoint", httpEndpoint(hosts[randIndex])) + break } - wg.Wait() - log.Info("all hosts synced random file successfully") return nil } + +func isSyncing(wsHost string) (bool, error) { + rpcClient, err := rpc.Dial(wsHost) + if rpcClient != nil { + defer rpcClient.Close() + } + + if err != nil { + log.Error("error dialing host", "err", err) + return false, err + } + + var isSyncing bool + err = rpcClient.Call(&isSyncing, "bzz_isSyncing") + if err != nil { + log.Error("error calling host for isSyncing", "err", err) + return false, err + } + + log.Debug("isSyncing result", "host", wsHost, "isSyncing", isSyncing) + + return isSyncing, nil +} + +func waitToSync() { + t1 := time.Now() + + ns := uint64(1) + + for ns > 0 { + time.Sleep(3 * time.Second) + + notSynced := uint64(0) + var wg sync.WaitGroup + wg.Add(len(hosts)) + for i := 0; i < len(hosts); i++ { + i := i + go func(idx int) { + stillSyncing, err := isSyncing(wsEndpoint(hosts[idx])) + + if stillSyncing || err != nil { + atomic.AddUint64(¬Synced, 1) + } + wg.Done() + }(i) + } + wg.Wait() + + ns = atomic.LoadUint64(¬Synced) + } + + t2 := time.Since(t1) + metrics.GetOrRegisterResettingTimer("upload-and-sync.single.wait-for-sync.deployment", nil).Update(t2) +} diff --git a/cmd/swarm/swarm-smoke/upload_speed.go b/cmd/swarm/swarm-smoke/upload_speed.go index 20bf7b86ca..047ea00925 100644 --- a/cmd/swarm/swarm-smoke/upload_speed.go +++ b/cmd/swarm/swarm-smoke/upload_speed.go @@ -28,14 +28,14 @@ import ( cli "gopkg.in/urfave/cli.v1" ) -func uploadSpeedCmd(ctx *cli.Context, tuid string) error { - log.Info("uploading to "+hosts[0], "tuid", tuid, "seed", seed) +func uploadSpeedCmd(ctx *cli.Context) error { + log.Info("uploading to "+hosts[0], "seed", seed) randomBytes := testutil.RandomBytes(seed, filesize*1000) errc := make(chan error) go func() { - errc <- uploadSpeed(ctx, tuid, randomBytes) + errc <- uploadSpeed(ctx, randomBytes) }() select { @@ -53,7 +53,7 @@ func uploadSpeedCmd(ctx *cli.Context, tuid string) error { } } -func uploadSpeed(c *cli.Context, tuid string, data []byte) error { +func uploadSpeed(c *cli.Context, data []byte) error { t1 := time.Now() hash, err := upload(data, hosts[0]) if err != nil { diff --git a/cmd/swarm/swarm-smoke/util.go b/cmd/swarm/swarm-smoke/util.go index 87abb44b0b..b95f993e8e 100644 --- a/cmd/swarm/swarm-smoke/util.go +++ b/cmd/swarm/swarm-smoke/util.go @@ -38,7 +38,6 @@ import ( "github.com/ethereum/go-ethereum/swarm/api/client" "github.com/ethereum/go-ethereum/swarm/spancontext" opentracing "github.com/opentracing/opentracing-go" - "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) @@ -59,28 +58,25 @@ func wsEndpoint(host string) string { return fmt.Sprintf("ws://%s:%d", host, wsPort) } -func wrapCliCommand(name string, command func(*cli.Context, string) error) func(*cli.Context) error { +func wrapCliCommand(name string, command func(*cli.Context) error) func(*cli.Context) error { return func(ctx *cli.Context) error { log.PrintOrigins(true) log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(false)))) - // test uuid - tuid := uuid.New()[:8] - commandName = name hosts = strings.Split(allhosts, ",") defer func(now time.Time) { totalTime := time.Since(now) - log.Info("total time", "tuid", tuid, "time", totalTime, "kb", filesize) + log.Info("total time", "time", totalTime, "kb", filesize) metrics.GetOrRegisterResettingTimer(name+".total-time", nil).Update(totalTime) }(time.Now()) - log.Info("smoke test starting", "tuid", tuid, "task", name, "timeout", timeout) + log.Info("smoke test starting", "task", name, "timeout", timeout) metrics.GetOrRegisterCounter(name, nil).Inc(1) - return command(ctx, tuid) + return command(ctx) } } @@ -142,11 +138,11 @@ func fetchFeed(topic string, user string, endpoint string, original []byte, ruid } // fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file -func fetch(hash string, endpoint string, original []byte, ruid string, tuid string) error { +func fetch(hash string, endpoint string, original []byte, ruid string) error { ctx, sp := spancontext.StartSpan(context.Background(), "upload-and-sync.fetch") defer sp.Finish() - log.Info("http get request", "tuid", tuid, "ruid", ruid, "endpoint", endpoint, "hash", hash) + log.Info("http get request", "ruid", ruid, "endpoint", endpoint, "hash", hash) var tn time.Time reqUri := endpoint + "/bzz:/" + hash + "/" @@ -170,7 +166,7 @@ func fetch(hash string, endpoint string, original []byte, ruid string, tuid stri log.Error(err.Error(), "ruid", ruid) return err } - log.Info("http get response", "tuid", tuid, "ruid", ruid, "endpoint", endpoint, "hash", hash, "code", res.StatusCode, "len", res.ContentLength) + log.Info("http get response", "ruid", ruid, "endpoint", endpoint, "hash", hash, "code", res.StatusCode, "len", res.ContentLength) if res.StatusCode != 200 { err := fmt.Errorf("expected status code %d, got %v", 200, res.StatusCode) diff --git a/swarm/api/inspector.go b/swarm/api/inspector.go index 2ae6b4da83..c4151bf20a 100644 --- a/swarm/api/inspector.go +++ b/swarm/api/inspector.go @@ -19,7 +19,11 @@ package api import ( "context" "fmt" + "strings" + "time" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/storage" ) @@ -47,25 +51,34 @@ func (inspector *Inspector) ListKnown() []string { return res } -type HasInfo struct { - Addr string `json:"address"` - Has bool `json:"has"` +func (inspector *Inspector) IsSyncing() bool { + lastReceivedChunksMsg := metrics.GetOrRegisterGauge("network.stream.received_chunks", nil) + + // last received chunks msg time + lrct := time.Unix(0, lastReceivedChunksMsg.Value()) + + // if last received chunks msg time is after now-15sec. (i.e. within the last 15sec.) then we say that the node is still syncing + // technically this is not correct, because this might have been a retrieve request, but for the time being it works for our purposes + // because we know we are not making retrieve requests on the node while checking this + return lrct.After(time.Now().Add(-15 * time.Second)) } // Has checks whether each chunk address is present in the underlying datastore, // the bool in the returned structs indicates if the underlying datastore has // the chunk stored with the given address (true), or not (false) -func (inspector *Inspector) Has(chunkAddresses []storage.Address) []HasInfo { - results := make([]HasInfo, 0) +func (inspector *Inspector) Has(chunkAddresses []storage.Address) string { + hostChunks := []string{} for _, addr := range chunkAddresses { - res := HasInfo{} - res.Addr = addr.String() has, err := inspector.netStore.Has(context.Background(), addr) if err != nil { - has = false + log.Error(err.Error()) + } + if has { + hostChunks = append(hostChunks, "1") + } else { + hostChunks = append(hostChunks, "0") } - res.Has = has - results = append(results, res) } - return results + + return strings.Join(hostChunks, "") } diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index 304f9cd778..e700459d21 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -25,6 +25,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/pot" sv "github.com/ethereum/go-ethereum/swarm/version" @@ -138,6 +139,9 @@ func (e *entry) Hex() string { func (k *Kademlia) Register(peers ...*BzzAddr) error { k.lock.Lock() defer k.lock.Unlock() + + metrics.GetOrRegisterCounter("kad.register", nil).Inc(1) + var known, size int for _, p := range peers { log.Trace("kademlia trying to register", "addr", p) @@ -164,8 +168,6 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error { return newEntry(p) } - log.Trace("found among known peers, underlay addr is same, do nothing", "new", p, "old", e.BzzAddr) - return v }) if found { @@ -186,6 +188,9 @@ func (k *Kademlia) Register(peers ...*BzzAddr) error { func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, changed bool) { k.lock.Lock() defer k.lock.Unlock() + + metrics.GetOrRegisterCounter("kad.suggestpeer", nil).Inc(1) + radius := neighbourhoodRadiusForPot(k.conns, k.NeighbourhoodSize, k.base) // collect undersaturated bins in ascending order of number of connected peers // and from shallow to deep (ascending order of PO) @@ -297,6 +302,9 @@ func (k *Kademlia) SuggestPeer() (suggestedPeer *BzzAddr, saturationDepth int, c func (k *Kademlia) On(p *Peer) (uint8, bool) { k.lock.Lock() defer k.lock.Unlock() + + metrics.GetOrRegisterCounter("kad.on", nil).Inc(1) + var ins bool k.conns, _, _, _ = pot.Swap(k.conns, p, Pof, func(v pot.Val) pot.Val { // if not found live @@ -320,7 +328,6 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) { k.addrCountC <- k.addrs.Size() } } - log.Trace(k.string()) // calculate if depth of saturation changed depth := uint8(k.saturation()) var changed bool @@ -608,7 +615,7 @@ func (k *Kademlia) string() string { if len(sv.GitCommit) > 0 { rows = append(rows, fmt.Sprintf("commit hash: %s", sv.GitCommit)) } - rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %x", time.Now().UTC().Format(time.UnixDate), k.BaseAddr()[:3])) + rows = append(rows, fmt.Sprintf("%v KΛÐΞMLIΛ hive: queen's address: %x", time.Now().UTC().Format(time.UnixDate), k.BaseAddr())) rows = append(rows, fmt.Sprintf("population: %d (%d), NeighbourhoodSize: %d, MinBinSize: %d, MaxBinSize: %d", k.conns.Size(), k.addrs.Size(), k.NeighbourhoodSize, k.MinBinSize, k.MaxBinSize)) liverows := make([]string, k.MaxProxDisplay) diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go index b4663eee5e..93b9901381 100644 --- a/swarm/network/kademlia_test.go +++ b/swarm/network/kademlia_test.go @@ -541,7 +541,7 @@ func TestKademliaHiveString(t *testing.T) { tk.Register("10000000", "10000001") tk.MaxProxDisplay = 8 h := tk.String() - expH := "\n=========================================================================\nMon Feb 27 12:10:28 UTC 2017 KΛÐΞMLIΛ hive: queen's address: 000000\npopulation: 2 (4), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n============ DEPTH: 0 ==========================================\n000 0 | 2 8100 (0) 8000 (0)\n001 1 4000 | 1 4000 (0)\n002 1 2000 | 1 2000 (0)\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n=========================================================================" + expH := "\n=========================================================================\nMon Feb 27 12:10:28 UTC 2017 KΛÐΞMLIΛ hive: queen's address: 0000000000000000000000000000000000000000000000000000000000000000\npopulation: 2 (4), NeighbourhoodSize: 2, MinBinSize: 2, MaxBinSize: 4\n============ DEPTH: 0 ==========================================\n000 0 | 2 8100 (0) 8000 (0)\n001 1 4000 | 1 4000 (0)\n002 1 2000 | 1 2000 (0)\n003 0 | 0\n004 0 | 0\n005 0 | 0\n006 0 | 0\n007 0 | 0\n=========================================================================" if expH[104:] != h[104:] { t.Fatalf("incorrect hive output. expected %v, got %v", expH, h) } diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 0596667239..521e9d19a8 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/enode" @@ -45,6 +46,8 @@ var ( requestFromPeersCount = metrics.NewRegisteredCounter("network.stream.request_from_peers.count", nil) requestFromPeersEachCount = metrics.NewRegisteredCounter("network.stream.request_from_peers_each.count", nil) + + lastReceivedChunksMsg = metrics.GetOrRegisterGauge("network.stream.received_chunks", nil) ) type Delivery struct { @@ -225,6 +228,9 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int processReceivedChunksCount.Inc(1) + // record the last time we received a chunk delivery message + lastReceivedChunksMsg.Update(time.Now().UnixNano()) + var msg *ChunkDeliveryMsg var mode chunk.ModePut switch r := req.(type) {