From f83119ad4cf928e640af601b453360b42a2591d7 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Wed, 10 Jul 2019 12:28:28 +0200 Subject: [PATCH 1/2] swarm-smoke: fix check max prox hosts for pull/push sync modes --- cmd/swarm-smoke/main.go | 32 +++++++++++++--------- cmd/swarm-smoke/upload_and_sync.go | 43 +++++++++++++++++++++++------- 2 files changed, 54 insertions(+), 21 deletions(-) diff --git a/cmd/swarm-smoke/main.go b/cmd/swarm-smoke/main.go index 415eebe122..cce9728ab8 100644 --- a/cmd/swarm-smoke/main.go +++ b/cmd/swarm-smoke/main.go @@ -37,18 +37,20 @@ var ( ) var ( - allhosts string - hosts []string - filesize int - syncDelay bool - inputSeed int - httpPort int - wsPort int - verbosity int - timeout int - single bool - onlyUpload bool - debug bool + allhosts string + hosts []string + filesize int + syncDelay bool + pushsyncDelay bool + syncMode string + inputSeed int + httpPort int + wsPort int + verbosity int + timeout int + single bool + onlyUpload bool + debug bool ) func main() { @@ -88,6 +90,12 @@ func main() { Usage: "file size for generated random file in KB", Destination: &filesize, }, + cli.StringFlag{ + Name: "sync-mode", + Value: "pullsync", + Usage: "sync mode - pushsync or pullsync or both", + Destination: &syncMode, + }, cli.BoolFlag{ Name: "sync-delay", Usage: "wait for content to be synced", diff --git a/cmd/swarm-smoke/upload_and_sync.go b/cmd/swarm-smoke/upload_and_sync.go index 7accef1c01..5680f2a45f 100644 --- a/cmd/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm-smoke/upload_and_sync.go @@ -232,7 +232,7 @@ func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[strin for i := range addrs { var foundAt int maxProx := -1 - var maxProxHost string + var maxProxHosts []string for host := range allHostChunks { if allHostChunks[host][i] == '1' { foundAt++ @@ -247,19 +247,44 @@ func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[strin prox := chunk.Proximity(addrs[i], ba) if prox > maxProx { maxProx = prox - maxProxHost = host + maxProxHosts = []string{host} + } else if prox == maxProx { + maxProxHosts = append(maxProxHosts, host) } } - if allHostChunks[maxProxHost][i] == '0' { - log.Error("chunk not found at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost]) - } else { - log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost]) + log.Debug("sync mode", "sync mode", syncMode) + + if syncMode == "pullsync" || syncMode == "both" { + for _, maxProxHost := range maxProxHosts { + if allHostChunks[maxProxHost][i] == '0' { + log.Error("chunk not found at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost]) + } else { + log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost]) + } + } + + // if chunk found at less than 2 hosts + if foundAt < 2 { + log.Error("chunk found at less than two hosts", "foundAt", foundAt, "ref", addrs[i]) + } } - // if chunk found at less than 2 hosts - if foundAt < 2 { - log.Error("chunk found at less than two hosts", "foundAt", foundAt, "ref", addrs[i]) + if syncMode == "pushsync" { + var found bool + for _, maxProxHost := range maxProxHosts { + if allHostChunks[maxProxHost][i] == '0' { + } else { + found = true + log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost]) + } + } + + if !found { + for _, maxProxHost := range maxProxHosts { + log.Error("chunk not found at any max prox host", "ref", addrs[i], "hosts", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost]) + } + } } } } From 27e63d0b186197afc37adf0a4df1f88a120530f3 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Wed, 17 Jul 2019 11:54:12 +0200 Subject: [PATCH 2/2] cmd/swarm-smoke: upload with tag; wait to push sync; --- cmd/swarm-smoke/main.go | 6 +++- cmd/swarm-smoke/upload_and_sync.go | 51 +++++++++++++++++++++++++++--- cmd/swarm-smoke/util.go | 12 +++++-- 3 files changed, 61 insertions(+), 8 deletions(-) diff --git a/cmd/swarm-smoke/main.go b/cmd/swarm-smoke/main.go index cce9728ab8..91e4afc634 100644 --- a/cmd/swarm-smoke/main.go +++ b/cmd/swarm-smoke/main.go @@ -54,7 +54,6 @@ var ( ) func main() { - app := cli.NewApp() app.Name = "smoke-test" app.Usage = "" @@ -96,6 +95,11 @@ func main() { Usage: "sync mode - pushsync or pullsync or both", Destination: &syncMode, }, + cli.BoolFlag{ + Name: "pushsync-delay", + Usage: "wait for content to be push synced", + Destination: &pushsyncDelay, + }, cli.BoolFlag{ Name: "sync-delay", Usage: "wait for content to be synced", diff --git a/cmd/swarm-smoke/upload_and_sync.go b/cmd/swarm-smoke/upload_and_sync.go index 5680f2a45f..c3ae7d1c86 100644 --- a/cmd/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm-smoke/upload_and_sync.go @@ -35,6 +35,7 @@ import ( "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/storage" "github.com/ethersphere/swarm/testutil" + "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) @@ -264,7 +265,7 @@ func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[strin } } - // if chunk found at less than 2 hosts + // if chunk found at less than 2 hosts, which is actually less that the min size of a NN if foundAt < 2 { log.Error("chunk found at less than two hosts", "foundAt", foundAt, "ref", addrs[i]) } @@ -273,8 +274,7 @@ func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[strin if syncMode == "pushsync" { var found bool for _, maxProxHost := range maxProxHosts { - if allHostChunks[maxProxHost][i] == '0' { - } else { + if allHostChunks[maxProxHost][i] == '1' { found = true log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost]) } @@ -309,7 +309,8 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error { log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed) t1 := time.Now() - hash, err := upload(randomBytes, httpEndpoint(hosts[0])) + tag := uuid.New()[:8] + hash, err := uploadWithTag(randomBytes, httpEndpoint(hosts[0]), tag) if err != nil { log.Error(err.Error()) return err @@ -325,6 +326,11 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error { log.Info("uploaded successfully", "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash)) + // wait to push sync sync + if pushsyncDelay { + waitToPushSynced(tag) + } + // wait to sync and log chunks before fetch attempt, only if syncDelay is set to true if syncDelay { waitToSync() @@ -363,6 +369,29 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error { return nil } +func isPushSynced(wsHost string, tagname string) (bool, error) { + rpcClient, err := rpc.Dial(wsHost) + if rpcClient != nil { + defer rpcClient.Close() + } + + if err != nil { + log.Error("error dialing host", "err", err) + return false, err + } + + var isSynced bool + err = rpcClient.Call(&isSynced, "bzz_isPushSynced", tagname) + if err != nil { + log.Error("error calling host for isPushSynced", "err", err) + return false, err + } + + log.Debug("isSynced result", "host", wsHost, "isSynced", isSynced) + + return isSynced, nil +} + func isSyncing(wsHost string) (bool, error) { rpcClient, err := rpc.Dial(wsHost) if rpcClient != nil { @@ -386,6 +415,20 @@ func isSyncing(wsHost string) (bool, error) { return isSyncing, nil } +func waitToPushSynced(tagname string) { + for { + synced, err := isPushSynced(wsEndpoint(hosts[0]), tagname) + if err != nil { + log.Error(err.Error()) + } + + if synced { + return + } + time.Sleep(200 * time.Millisecond) + } +} + func waitToSync() { t1 := time.Now() diff --git a/cmd/swarm-smoke/util.go b/cmd/swarm-smoke/util.go index f838fa07e5..99f4aa5a4e 100644 --- a/cmd/swarm-smoke/util.go +++ b/cmd/swarm-smoke/util.go @@ -38,6 +38,7 @@ import ( "github.com/ethersphere/swarm/api/client" "github.com/ethersphere/swarm/spancontext" opentracing "github.com/opentracing/opentracing-go" + "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) @@ -193,8 +194,13 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error { return nil } -// upload an arbitrary byte as a plaintext file to `endpoint` using the api client +// upload an arbitrary byte as a plaintext file to `endpoint` using the api client func upload(data []byte, endpoint string) (string, error) { + return uploadWithTag(data, endpoint, uuid.New()[:8]) +} + +// uploadWithTag an arbitrary byte as a plaintext file to `endpoint` using the api client with a given tag +func uploadWithTag(data []byte, endpoint string, tag string) (string, error) { swarm := client.NewClient(endpoint) f := &client.File{ ReadCloser: ioutil.NopCloser(bytes.NewReader(data)), @@ -203,10 +209,10 @@ func upload(data []byte, endpoint string) (string, error) { Mode: 0660, Size: int64(len(data)), }, + Tag: tag, } - // upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded. - return swarm.Upload(f, "", false) + return swarm.TarUpload("", &client.FileUploader{f}, "", false) } func digest(r io.Reader) ([]byte, error) {