Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 100 additions & 3 deletions cmd/swarm/swarm-smoke/upload_and_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@ package main
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io/ioutil"
"math/rand"
"os"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/testutil"

Expand Down Expand Up @@ -88,6 +91,10 @@ func trackChunks(testData []byte, submitMetrics bool) error {
var wg sync.WaitGroup
wg.Add(len(hosts))

var mu sync.Mutex // mutex protecting the allHostsChunks and bzzAddrs maps
allHostChunks := map[string]string{} // host->bitvector of presence for chunks
bzzAddrs := map[string]string{} // host->bzzAddr

for _, host := range hosts {
host := host
go func() {
Expand All @@ -96,6 +103,7 @@ func trackChunks(testData []byte, submitMetrics bool) error {

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

rpcClient, err := rpc.DialContext(ctx, httpHost)
if rpcClient != nil {
defer rpcClient.Close()
Expand All @@ -106,14 +114,25 @@ func trackChunks(testData []byte, submitMetrics bool) error {
return
}

var hostChunks string
err = rpcClient.Call(&hostChunks, "bzz_has", addrs)
hostChunks, err := getChunksBitVectorFromHost(rpcClient, addrs)
if err != nil {
log.Error("error getting chunks bit vector from host", "err", err, "host", httpHost)
hasErr = true
return
}

bzzAddr, err := getBzzAddrFromHost(rpcClient)
if err != nil {
log.Error("error calling rpc client", "err", err, "host", httpHost)
log.Error("error getting bzz addrs from host", "err", err, "host", httpHost)
hasErr = true
return
}

mu.Lock()
allHostChunks[host] = hostChunks
bzzAddrs[host] = bzzAddr
mu.Unlock()

yes, no := 0, 0
for _, val := range hostChunks {
if val == '1' {
Expand All @@ -140,6 +159,8 @@ func trackChunks(testData []byte, submitMetrics bool) error {

wg.Wait()

checkChunksVsMostProxHosts(addrs, allHostChunks, bzzAddrs)

if !hasErr && submitMetrics {
// remove the chunks stored on the uploader node
globalYes -= len(addrs)
Expand All @@ -152,6 +173,82 @@ func trackChunks(testData []byte, submitMetrics bool) error {
return nil
}

// getChunksBitVectorFromHost returns a bit vector of presence for a given slice of chunks from a given host
func getChunksBitVectorFromHost(client *rpc.Client, addrs []storage.Address) (string, error) {
Copy link
Copy Markdown
Contributor Author

@nonsense nonsense Apr 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getChunksBitVectorFromHost and getBzzAddrFromHost really don't belong here - there should be a package which provides a type-safe bzz client from an rpcClient, which has all APIs exposed in bzz configured in it, but since we don't have one, I am defining them here.

We might want to add this once we start using these APIs a bit more often, but for now, I think its fine for this to live here.

var hostChunks string

err := client.Call(&hostChunks, "bzz_has", addrs)
if err != nil {
return "", err
}

return hostChunks, nil
}

// getBzzAddrFromHost returns the bzzAddr for a given host
func getBzzAddrFromHost(client *rpc.Client) (string, error) {
var hive string

err := client.Call(&hive, "bzz_hive")
if err != nil {
return "", err
}

// we make an ugly assumption about the output format of the hive.String() method
// ideally we should replace this with an API call that returns the bzz addr for a given host,
// but this also works for now (provided we don't change the hive.String() method, which we haven't in some time
return strings.Split(strings.Split(hive, "\n")[3], " ")[10], nil
}

// checkChunksVsMostProxHosts is checking:
// 1. whether a chunk has been found at less than 2 hosts. Considering our NN size, this should not happen.
// 2. if a chunk is not found at its closest node. This should also not happen.
// Together with the --only-upload flag, we could run this smoke test and make sure that our syncing
// functionality is correct (without even trying to retrieve the content).
//
// addrs - a slice with all uploaded chunk refs
// allHostChunks - host->bit vector, showing what chunks are present on what hosts
// bzzAddrs - host->bzz address, used when determining the most proximate host for a given chunk
func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[string]string, bzzAddrs map[string]string) {
for k, v := range bzzAddrs {
log.Trace("bzzAddr", "bzz", v, "host", k)
}

for i := range addrs {
var foundAt int
maxProx := -1
var maxProxHost string
for host := range allHostChunks {
if allHostChunks[host][i] == '1' {
foundAt++
}

ba, err := hex.DecodeString(bzzAddrs[host])
if err != nil {
panic(err)
}

// calculate the host closest to any chunk
prox := chunk.Proximity(addrs[i], ba)
if prox > maxProx {
maxProx = prox
maxProxHost = host
}
}

if allHostChunks[maxProxHost][i] == '0' {
log.Error("chunk not found at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
} else {
log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
}

// if chunk found at less than 2 hosts
if foundAt < 2 {
log.Error("chunk found at less than two hosts", "foundAt", foundAt, "ref", addrs[i])
}
}
}

func getAllRefs(testData []byte) (storage.AddressCollection, error) {
datadir, err := ioutil.TempDir("", "chunk-debug")
if err != nil {
Expand Down