Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions cmd/swarm-smoke/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,23 @@ var (
)

var (
allhosts string
hosts []string
filesize int
syncDelay bool
inputSeed int
httpPort int
wsPort int
verbosity int
timeout int
single bool
onlyUpload bool
debug bool
allhosts string
hosts []string
filesize int
syncDelay bool
pushsyncDelay bool
syncMode string
inputSeed int
httpPort int
wsPort int
verbosity int
timeout int
single bool
onlyUpload bool
debug bool
)

func main() {

app := cli.NewApp()
app.Name = "smoke-test"
app.Usage = ""
Expand Down Expand Up @@ -88,6 +89,17 @@ func main() {
Usage: "file size for generated random file in KB",
Destination: &filesize,
},
cli.StringFlag{
Name: "sync-mode",
Value: "pullsync",
Usage: "sync mode - pushsync or pullsync or both",
Destination: &syncMode,
},
cli.BoolFlag{
Name: "pushsync-delay",
Usage: "wait for content to be push synced",
Destination: &pushsyncDelay,
},
cli.BoolFlag{
Name: "sync-delay",
Usage: "wait for content to be synced",
Expand Down
88 changes: 78 additions & 10 deletions cmd/swarm-smoke/upload_and_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/testutil"
"github.com/pborman/uuid"

cli "gopkg.in/urfave/cli.v1"
)
Expand Down Expand Up @@ -232,7 +233,7 @@ func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[strin
for i := range addrs {
var foundAt int
maxProx := -1
var maxProxHost string
var maxProxHosts []string
for host := range allHostChunks {
if allHostChunks[host][i] == '1' {
foundAt++
Expand All @@ -247,19 +248,43 @@ func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[strin
prox := chunk.Proximity(addrs[i], ba)
if prox > maxProx {
maxProx = prox
maxProxHost = host
maxProxHosts = []string{host}
} else if prox == maxProx {
maxProxHosts = append(maxProxHosts, host)
}
}

if allHostChunks[maxProxHost][i] == '0' {
log.Error("chunk not found at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
} else {
log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
log.Debug("sync mode", "sync mode", syncMode)

if syncMode == "pullsync" || syncMode == "both" {
for _, maxProxHost := range maxProxHosts {
if allHostChunks[maxProxHost][i] == '0' {
log.Error("chunk not found at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
} else {
log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
}
}

// if chunk found at less than 2 hosts, which is actually less that the min size of a NN
if foundAt < 2 {
log.Error("chunk found at less than two hosts", "foundAt", foundAt, "ref", addrs[i])
}
}

// if chunk found at less than 2 hosts
if foundAt < 2 {
log.Error("chunk found at less than two hosts", "foundAt", foundAt, "ref", addrs[i])
if syncMode == "pushsync" {
var found bool
for _, maxProxHost := range maxProxHosts {
if allHostChunks[maxProxHost][i] == '1' {
found = true
log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
}
}

if !found {
for _, maxProxHost := range maxProxHosts {
log.Error("chunk not found at any max prox host", "ref", addrs[i], "hosts", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
}
}
}
}
}
Expand All @@ -284,7 +309,8 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error {
log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed)

t1 := time.Now()
hash, err := upload(randomBytes, httpEndpoint(hosts[0]))
tag := uuid.New()[:8]
hash, err := uploadWithTag(randomBytes, httpEndpoint(hosts[0]), tag)
if err != nil {
log.Error(err.Error())
return err
Expand All @@ -300,6 +326,11 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error {

log.Info("uploaded successfully", "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash))

// wait to push sync sync
if pushsyncDelay {
waitToPushSynced(tag)
}

// wait to sync and log chunks before fetch attempt, only if syncDelay is set to true
if syncDelay {
waitToSync()
Expand Down Expand Up @@ -338,6 +369,29 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error {
return nil
}

func isPushSynced(wsHost string, tagname string) (bool, error) {
rpcClient, err := rpc.Dial(wsHost)
if rpcClient != nil {
defer rpcClient.Close()
}

if err != nil {
log.Error("error dialing host", "err", err)
return false, err
}

var isSynced bool
err = rpcClient.Call(&isSynced, "bzz_isPushSynced", tagname)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is still not merged to master, but since it is behind a feature flag, I think it is fine to get the tests part first.

if err != nil {
log.Error("error calling host for isPushSynced", "err", err)
return false, err
}

log.Debug("isSynced result", "host", wsHost, "isSynced", isSynced)

return isSynced, nil
}

func isSyncing(wsHost string) (bool, error) {
rpcClient, err := rpc.Dial(wsHost)
if rpcClient != nil {
Expand All @@ -361,6 +415,20 @@ func isSyncing(wsHost string) (bool, error) {
return isSyncing, nil
}

func waitToPushSynced(tagname string) {
for {
synced, err := isPushSynced(wsEndpoint(hosts[0]), tagname)
if err != nil {
log.Error(err.Error())
}

if synced {
return
}
time.Sleep(200 * time.Millisecond)
}
}

func waitToSync() {
t1 := time.Now()

Expand Down
12 changes: 9 additions & 3 deletions cmd/swarm-smoke/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/ethersphere/swarm/api/client"
"github.com/ethersphere/swarm/spancontext"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pborman/uuid"
cli "gopkg.in/urfave/cli.v1"
)

Expand Down Expand Up @@ -193,8 +194,13 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error {
return nil
}

// upload an arbitrary byte as a plaintext file to `endpoint` using the api client
// upload an arbitrary byte as a plaintext file to `endpoint` using the api client
func upload(data []byte, endpoint string) (string, error) {
return uploadWithTag(data, endpoint, uuid.New()[:8])
}

// uploadWithTag an arbitrary byte as a plaintext file to `endpoint` using the api client with a given tag
func uploadWithTag(data []byte, endpoint string, tag string) (string, error) {
swarm := client.NewClient(endpoint)
f := &client.File{
ReadCloser: ioutil.NopCloser(bytes.NewReader(data)),
Expand All @@ -203,10 +209,10 @@ func upload(data []byte, endpoint string) (string, error) {
Mode: 0660,
Size: int64(len(data)),
},
Tag: tag,
}

// upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded.
return swarm.Upload(f, "", false)
return swarm.TarUpload("", &client.FileUploader{f}, "", false)
}

func digest(r io.Reader) ([]byte, error) {
Expand Down