diff --git a/go.mod b/go.mod index ae588e7cb2..8e2cbebde4 100644 --- a/go.mod +++ b/go.mod @@ -71,6 +71,7 @@ require ( github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 // indirect github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 // indirect github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965 + github.com/tilinna/clock v1.0.2 github.com/tyler-smith/go-bip39 v0.0.0-20181017060643-dbb3b84ba2ef // indirect github.com/uber-go/atomic v1.4.0 // indirect github.com/uber/jaeger-client-go v0.0.0-20180607151842-f7e0d4744fa6 diff --git a/go.sum b/go.sum index b00075ba4b..53aa19a0e9 100644 --- a/go.sum +++ b/go.sum @@ -249,6 +249,8 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965 h1:V/AztY/q2oW5ghho7YMgUJQkKvSACHRxpeDyT5DxpIo= github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA= +github.com/tilinna/clock v1.0.2 h1:6BO2tyAC9JbPExKH/z9zl44FLu1lImh3nDNKA0kgrkI= +github.com/tilinna/clock v1.0.2/go.mod h1:ZsP7BcY7sEEz7ktc0IVy8Us6boDrK8VradlKRUGfOao= github.com/tyler-smith/go-bip39 v0.0.0-20181017060643-dbb3b84ba2ef h1:luEzjJzktS9eU0CmI0uApXHLP/lKzOoRPrJhd71J8ik= github.com/tyler-smith/go-bip39 v0.0.0-20181017060643-dbb3b84ba2ef/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs= github.com/uber-go/atomic v1.4.0 h1:yOuPqEq4ovnhEjpHmfFwsqBXDYbQeT6Nb0bwD6XnD5o= diff --git a/pss/internal/ticker/ticker.go b/pss/internal/ticker/ticker.go new file mode 100644 index 0000000000..24e1184967 --- /dev/null +++ b/pss/internal/ticker/ticker.go @@ -0,0 +1,54 @@ +package ticker + +import ( + "errors" + "time" + + "github.com/tilinna/clock" +) + +// Config defines the necessary information and dependencies to instantiate a Ticker +type Config struct { + Clock clock.Clock + Interval time.Duration + Callback func() +} + +// Ticker represents a periodic timer that invokes a callback +type Ticker struct { + quitC chan struct{} +} + +// ErrAlreadyStopped is returned if this service was already stopped and Stop() is called again +var ErrAlreadyStopped = errors.New("Already stopped") + +// New builds a ticker that will call the given callback function periodically +func New(config *Config) *Ticker { + + tk := &Ticker{ + quitC: make(chan struct{}), + } + ticker := config.Clock.NewTicker(config.Interval) + go func() { + defer ticker.Stop() + for { + select { + case <-ticker.C: + config.Callback() + case <-tk.quitC: + return + } + } + }() + return tk +} + +// Stop stops the timer and releases the goroutine running it. +func (tk *Ticker) Stop() error { + if tk.quitC == nil { + return ErrAlreadyStopped + } + close(tk.quitC) + tk.quitC = nil + return nil +} diff --git a/pss/internal/ticker/ticker_test.go b/pss/internal/ticker/ticker_test.go new file mode 100644 index 0000000000..7942ad37fd --- /dev/null +++ b/pss/internal/ticker/ticker_test.go @@ -0,0 +1,47 @@ +package ticker_test + +import ( + "sync" + "testing" + "time" + + "github.com/ethersphere/swarm/pss/internal/ticker" + "github.com/tilinna/clock" +) + +// TestNewTicker tests whether the ticker calls a callback function periodically +func TestNewTicker(t *testing.T) { + var err error + + testClock := clock.NewMock(time.Unix(0, 0)) + interval := 10 * time.Second + + wg := sync.WaitGroup{} + wg.Add(10) + tickWait := make(chan bool) + + testTicker := ticker.New(&ticker.Config{ + Interval: interval, + Clock: testClock, + Callback: func() { + wg.Done() + tickWait <- true + }, + }) + + for i := 0; i < 10; i++ { + testClock.Add(interval) + <-tickWait + } + + wg.Wait() + err = testTicker.Stop() + if err != nil { + t.Fatal(err) + } + + err = testTicker.Stop() + if err != ticker.ErrAlreadyStopped { + t.Fatal("Expected Stop() to return ticker.ErrAlreadyStopped when trying to stop an already stopped ticker") + } +} diff --git a/pss/internal/ttlset/ttlset.go b/pss/internal/ttlset/ttlset.go new file mode 100644 index 0000000000..2f87a2917a --- /dev/null +++ b/pss/internal/ttlset/ttlset.go @@ -0,0 +1,84 @@ +package ttlset + +import ( + "sync" + "time" + + "github.com/tilinna/clock" +) + +// Config defines the TTLSet configuration +type Config struct { + EntryTTL time.Duration // time after which items are removed + Clock clock.Clock // time reference +} + +// TTLSet implements a Set that automatically removes expired keys +// after a predefined expiration time +type TTLSet struct { + Config + set map[interface{}]setEntry + lock sync.RWMutex +} + +type setEntry struct { + expiresAt time.Time +} + +// New instances a TTLSet +func New(config *Config) *TTLSet { + ts := &TTLSet{ + set: make(map[interface{}]setEntry), + Config: *config, + } + return ts +} + +// Add adds a new key to the set +func (ts *TTLSet) Add(key interface{}) error { + var entry setEntry + var ok bool + + ts.lock.Lock() + defer ts.lock.Unlock() + + if entry, ok = ts.set[key]; !ok { + entry = setEntry{} + } + entry.expiresAt = ts.Clock.Now().Add(ts.EntryTTL) + ts.set[key] = entry + return nil +} + +// Has returns whether or not a key is already/still in the set +func (ts *TTLSet) Has(key interface{}) bool { + ts.lock.Lock() + defer ts.lock.Unlock() + + entry, ok := ts.set[key] + if ok { + if entry.expiresAt.After(ts.Clock.Now()) { + return true + } + delete(ts.set, key) // since we're holding the lock, take the chance to delete a expired record + } + return false +} + +// GC will remove expired entries from the set +func (ts *TTLSet) GC() { + ts.lock.Lock() + defer ts.lock.Unlock() + for k, v := range ts.set { + if v.expiresAt.Before(ts.Clock.Now()) { + delete(ts.set, k) + } + } +} + +// Count returns the number of entries in the set +func (ts *TTLSet) Count() int { + ts.lock.Lock() + defer ts.lock.Unlock() + return len(ts.set) +} diff --git a/pss/internal/ttlset/ttlset_test.go b/pss/internal/ttlset/ttlset_test.go new file mode 100644 index 0000000000..4079169678 --- /dev/null +++ b/pss/internal/ttlset/ttlset_test.go @@ -0,0 +1,150 @@ +package ttlset_test + +import ( + "testing" + "time" + + "github.com/ethersphere/swarm/pss/internal/ttlset" + "github.com/tilinna/clock" +) + +func TestTTLSet(t *testing.T) { + var err error + + testClock := clock.NewMock(time.Unix(0, 0)) + + testEntryTTL := 10 * time.Second + testSet := ttlset.New(&ttlset.Config{ + EntryTTL: testEntryTTL, + Clock: testClock, + }) + + key1 := "some key" + key2 := "some other key" + + // check adding a key to the set + err = testSet.Add(key1) + if err != nil { + t.Fatal((err)) + } + + // check if the key is now there: + hasKey := testSet.Has(key1) + if !(hasKey == true) { + t.Fatal("key1 should've been in the set, but Has() returned false") + } + + // check if Has() returns false when asked about a key that was never added: + hasKey = testSet.Has("some made up key") + if !(hasKey == false) { + t.Fatal("Has() should have returned false when presented with a key that was never added") + } + + // Let some time pass, but not enough to have the key expire: + testClock.Add(testEntryTTL / 2) + + // check if the key is still there: + hasKey = testSet.Has(key1) + if !(hasKey == true) { + t.Fatal("key1 should've been in the set, but Has() returned false") + } + + // Let some time pass well beyond the expiry time, so key1 expires: + testClock.Add(testEntryTTL * 2) + + // Add another key to the set: + err = testSet.Add(key2) + if err != nil { + t.Fatal((err)) + } + + hasKey = testSet.Has(key1) + if !(hasKey == false) { + t.Fatal("key1 should've been removed from the set, but Has() returned true") + } + + hasKey = testSet.Has(key2) + if !(hasKey == true) { + t.Fatal("key should remain in the set, but Has() returned false") + } + + // Let some time pass well beyond key2's expiry time, so key2 expires: + testClock.Add(testEntryTTL * 2) + + hasKey = testSet.Has(key2) + if !(hasKey == false) { + t.Fatal("key2 should have been wiped, but Has() returned true") + } +} + +func TestGC(t *testing.T) { + var err error + + testClock := clock.NewMock(time.Unix(0, 0)) + + testEntryTTL := 10 * time.Second + testSet := ttlset.New(&ttlset.Config{ + EntryTTL: testEntryTTL, + Clock: testClock, + }) + + key1 := "some key" + key2 := "some later key" + + // check adding a message to the cache + err = testSet.Add(key1) + if err != nil { + t.Fatal((err)) + } + + // move the clock 2 seconds + testClock.Add(2 * time.Second) + + // add a second key which will have a later expiration time + err = testSet.Add(key2) + if err != nil { + t.Fatal((err)) + } + + count := testSet.Count() + if !(count == 2) { + t.Fatal("Expected the set to contain 2 keys") + } + + testSet.GC() // attempt a cleanup. This cleanup should not affect any of the two keys, since they are not expired. + + count = testSet.Count() + if !(count == 2) { + t.Fatal("Expected the set to still contain 2 keys") + } + + //Now, move the clock forward 9 seconds. This will expire key1 but still keep key2 + testClock.Add(9 * time.Second) + testSet.GC() // invoke the internal cleaning function, which should wipe only key1 + count = testSet.Count() + if !(count == 1) { + t.Fatal("Expected the set to now have only 1 key") + } + + //Verify if key1 was wiped but key2 persists: + hasKey := testSet.Has(key1) + if !(hasKey == false) { + t.Fatal("Expected the set to have removed key1") + } + + hasKey = testSet.Has(key2) + if !(hasKey == true) { + t.Fatal("Expected the set to still contain key2") + } + + //Now, move the clock some more time. This will wipe key2 + testClock.Add(7 * time.Second) + testSet.GC() // invoke the internal cleaning function, which should wipe only key1 + + count = testSet.Count() + // verify the map is now empty + if !(count == 0) { + t.Fatal("Expected the set to be empty") + } + +} diff --git a/pss/pss.go b/pss/pss.go index ed8287d8a0..a895be495d 100644 --- a/pss/pss.go +++ b/pss/pss.go @@ -36,9 +36,10 @@ import ( "github.com/ethersphere/swarm/p2p/protocols" "github.com/ethersphere/swarm/pot" "github.com/ethersphere/swarm/pss/crypto" + "github.com/ethersphere/swarm/pss/internal/ticker" + "github.com/ethersphere/swarm/pss/internal/ttlset" "github.com/ethersphere/swarm/pss/message" - "github.com/ethersphere/swarm/storage" - "golang.org/x/crypto/sha3" + "github.com/tilinna/clock" ) const ( @@ -50,7 +51,6 @@ const ( defaultOutboxCapacity = 100000 protocolName = "pss" protocolVersion = 2 - hasherCount = 8 ) var ( @@ -66,13 +66,6 @@ var spec = &protocols.Spec{ }, } -// cache is used for preventing backwards routing -// will also be instrumental in flood guard mechanism -// and mailbox implementation -type cacheEntry struct { - expiresAt time.Time -} - // abstraction to enable access to p2p.protocols.Peer.Send type senderPeer interface { Info() *p2p.PeerInfo @@ -201,6 +194,8 @@ func (o outbox) reenqueue(slot int) { type Pss struct { *network.Kademlia // we can get the Kademlia address from this *KeyStore + forwardCache *ttlset.TTLSet + gcTicker *ticker.Ticker privateKey *ecdsa.PrivateKey // pss can have it's own independent key auxAPIs []rpc.API // builtins (handshake, test) can add APIs @@ -209,17 +204,13 @@ type Pss struct { peers map[string]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer peersMu sync.RWMutex - fwdCache map[message.Digest]cacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg - fwdCacheMu sync.RWMutex - cacheTTL time.Duration // how long to keep messages in fwdCache (not implemented) - msgTTL time.Duration - capstring string - outbox outbox + msgTTL time.Duration + capstring string + outbox outbox // message handling handlers map[message.Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle() handlersMu sync.RWMutex - hashPool sync.Pool topicHandlerCaps map[message.Topic]*handlerCaps // caches capabilities of each topic's handlers topicHandlerCapsMu sync.RWMutex @@ -239,6 +230,9 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) { if params.privateKey == nil { return nil, errors.New("missing private key for pss") } + + clock := clock.Realtime() //TODO: Clock should be injected by Params so it can be mocked. + c := p2p.Cap{ Name: protocolName, Version: protocolVersion, @@ -251,27 +245,26 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) { quitC: make(chan struct{}), peers: make(map[string]*protocols.Peer), - fwdCache: make(map[message.Digest]cacheEntry), - cacheTTL: params.CacheTTL, msgTTL: params.MsgTTL, capstring: c.String(), handlers: make(map[message.Topic]map[*handler]bool), topicHandlerCaps: make(map[message.Topic]*handlerCaps), - - hashPool: sync.Pool{ - New: func() interface{} { - return sha3.NewLegacyKeccak256() - }, - }, } + ps.forwardCache = ttlset.New(&ttlset.Config{ + EntryTTL: params.CacheTTL, + Clock: clock, + }) + ps.gcTicker = ticker.New(&ticker.Config{ + Clock: clock, + Interval: params.CacheTTL, + Callback: func() { + ps.forwardCache.GC() + metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1) + }, + }) ps.outbox = newOutbox(defaultOutboxCapacity, ps.quitC, ps.forward) - for i := 0; i < hasherCount; i++ { - hashfunc := storage.MakeHashFunc(storage.DefaultHash)() - ps.hashPool.Put(hashfunc) - } - return ps, nil } @@ -282,13 +275,9 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) { func (p *Pss) Start(srv *p2p.Server) error { go func() { ticker := time.NewTicker(defaultCleanInterval) - cacheTicker := time.NewTicker(p.cacheTTL) defer ticker.Stop() - defer cacheTicker.Stop() for { select { - case <-cacheTicker.C: - p.cleanFwdCache() case <-ticker.C: p.cleanKeys() case <-p.quitC: @@ -307,6 +296,9 @@ func (p *Pss) Start(srv *p2p.Server) error { func (p *Pss) Stop() error { log.Info("Pss shutting down") + if err := p.gcTicker.Stop(); err != nil { + return err + } close(p.quitC) return nil } @@ -819,23 +811,6 @@ func (p *Pss) forward(msg *message.Message) error { return nil } } - -///////////////////////////////////////////////////////////////////// -// SECTION: Caching -///////////////////////////////////////////////////////////////////// - -// cleanFwdCache is used to periodically remove expired entries from the forward cache -func (p *Pss) cleanFwdCache() { - metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1) - p.fwdCacheMu.Lock() - defer p.fwdCacheMu.Unlock() - for k, v := range p.fwdCache { - if v.expiresAt.Before(time.Now()) { - delete(p.fwdCache, k) - } - } -} - func label(b []byte) string { return fmt.Sprintf("%04x", b[:2]) } @@ -843,38 +818,18 @@ func label(b []byte) string { // add a message to the cache func (p *Pss) addFwdCache(msg *message.Message) error { defer metrics.GetOrRegisterResettingTimer("pss.addfwdcache", nil).UpdateSince(time.Now()) - - var entry cacheEntry - var ok bool - - p.fwdCacheMu.Lock() - defer p.fwdCacheMu.Unlock() - - digest := msg.Digest() - if entry, ok = p.fwdCache[digest]; !ok { - entry = cacheEntry{} - } - entry.expiresAt = time.Now().Add(p.cacheTTL) - p.fwdCache[digest] = entry - return nil + return p.forwardCache.Add(msg.Digest()) } // check if message is in the cache func (p *Pss) checkFwdCache(msg *message.Message) bool { - p.fwdCacheMu.Lock() - defer p.fwdCacheMu.Unlock() - - digest := msg.Digest() - entry, ok := p.fwdCache[digest] - if ok { - if entry.expiresAt.After(time.Now()) { - log.Trace("unexpired cache", "digest", fmt.Sprintf("%x", digest)) - metrics.GetOrRegisterCounter("pss.checkfwdcache.unexpired", nil).Inc(1) - return true - } - metrics.GetOrRegisterCounter("pss.checkfwdcache.expired", nil).Inc(1) + hit := p.forwardCache.Has(msg.Digest()) + if hit { + metrics.GetOrRegisterCounter("pss.checkfwdcache.hit", nil).Inc(1) + } else { + metrics.GetOrRegisterCounter("pss.checkfwdcache.miss", nil).Inc(1) } - return false + return hit } func validateAddress(addr PssAddress) error { diff --git a/pss/pss_test.go b/pss/pss_test.go index c73428404d..624b1f33fa 100644 --- a/pss/pss_test.go +++ b/pss/pss_test.go @@ -21,7 +21,6 @@ import ( "context" "crypto/ecdsa" "encoding/binary" - "encoding/hex" "errors" "fmt" "math/rand" @@ -105,91 +104,6 @@ func TestAPITopic(t *testing.T) { } // test if we can insert into cache, match items with cache and cache expiry -func TestCache(t *testing.T) { - var err error - to, _ := hex.DecodeString("08090a0b0c0d0e0f1011121314150001020304050607161718191a1b1c1d1e1f") - privkey, err := ethCrypto.GenerateKey() - if err != nil { - t.Fatal(err) - } - ps := newTestPss(privkey, nil, nil) - defer ps.Stop() - pp := NewParams().WithPrivateKey(privkey) - data := []byte("foo") - datatwo := []byte("bar") - datathree := []byte("baz") - wparams := &crypto.WrapParams{ - Sender: privkey, - Receiver: &privkey.PublicKey, - } - env, err := ps.Crypto.Wrap(data, wparams) - msg := &message.Message{ - Payload: env, - To: to, - Topic: PingTopic, - } - envtwo, err := ps.Crypto.Wrap(datatwo, wparams) - msgtwo := &message.Message{ - Payload: envtwo, - To: to, - Topic: PingTopic, - } - envthree, err := ps.Crypto.Wrap(datathree, wparams) - msgthree := &message.Message{ - Payload: envthree, - To: to, - Topic: PingTopic, - } - - digestone := msg.Digest() - if err != nil { - t.Fatalf("could not store cache msgone: %v", err) - } - digesttwo := msgtwo.Digest() - if err != nil { - t.Fatalf("could not store cache msgtwo: %v", err) - } - digestthree := msgthree.Digest() - if err != nil { - t.Fatalf("could not store cache msgthree: %v", err) - } - - if digestone == digesttwo { - t.Fatalf("different msgs return same hash: %d", digesttwo) - } - - // check the cache - err = ps.addFwdCache(msg) - if err != nil { - t.Fatalf("write to pss expire cache failed: %v", err) - } - - if !ps.checkFwdCache(msg) { - t.Fatalf("message %v should have EXPIRE record in cache but checkCache returned false", msg) - } - - if ps.checkFwdCache(msgtwo) { - t.Fatalf("message %v should NOT have EXPIRE record in cache but checkCache returned true", msgtwo) - } - - time.Sleep(pp.CacheTTL + 1*time.Second) - err = ps.addFwdCache(msgthree) - if err != nil { - t.Fatalf("write to pss expire cache failed: %v", err) - } - - if ps.checkFwdCache(msg) { - t.Fatalf("message %v should have expired from cache but checkCache returned true", msg) - } - - if _, ok := ps.fwdCache[digestthree]; !ok { - t.Fatalf("unexpired message should be in the cache: %v", digestthree) - } - - if _, ok := ps.fwdCache[digesttwo]; ok { - t.Fatalf("expired message should have been cleared from the cache: %v", digesttwo) - } -} // matching of address hints; whether a message could be or is for the node func TestAddressMatch(t *testing.T) { diff --git a/vendor/github.com/tilinna/clock/.gitignore b/vendor/github.com/tilinna/clock/.gitignore new file mode 100644 index 0000000000..ee1e57aef6 --- /dev/null +++ b/vendor/github.com/tilinna/clock/.gitignore @@ -0,0 +1 @@ +debug.test diff --git a/vendor/github.com/tilinna/clock/.travis.yml b/vendor/github.com/tilinna/clock/.travis.yml new file mode 100644 index 0000000000..26dae0dd55 --- /dev/null +++ b/vendor/github.com/tilinna/clock/.travis.yml @@ -0,0 +1,8 @@ +language: go +go: +- 1.11.x +- 1.10.x +- 1.9.x +- 1.8.x + +script: go test -v ./... diff --git a/vendor/github.com/tilinna/clock/LICENSE b/vendor/github.com/tilinna/clock/LICENSE new file mode 100644 index 0000000000..3b4833ea16 --- /dev/null +++ b/vendor/github.com/tilinna/clock/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 Timo Linna + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/tilinna/clock/README.md b/vendor/github.com/tilinna/clock/README.md new file mode 100644 index 0000000000..15bc816133 --- /dev/null +++ b/vendor/github.com/tilinna/clock/README.md @@ -0,0 +1,40 @@ +# clock [![GoDoc](https://godoc.org/github.com/tilinna/clock?status.png)](https://godoc.org/github.com/tilinna/clock) [![Go Report Card](https://goreportcard.com/badge/github.com/tilinna/clock)](https://goreportcard.com/report/github.com/tilinna/clock) + +A Go (golang) library for mocking standard time, optionally also with context.Context. + +## Basic Usage + +```go +// Use clock.Realtime() in production +mock := clock.NewMock(time.Date(2018, 1, 1, 10, 0, 0, 0, time.UTC)) +fmt.Println("Time is now", mock.Now()) +timer := mock.NewTimer(15 * time.Second) +mock.Add(25 * time.Second) +fmt.Println("Time is now", mock.Now()) +fmt.Println("Timeout was", <-timer.C) +// Output: +// Time is now 2018-01-01 10:00:00 +0000 UTC +// Time is now 2018-01-01 10:00:25 +0000 UTC +// Timeout was 2018-01-01 10:00:15 +0000 UTC +``` + +## Context Usage + +```go +start := time.Date(2018, 1, 1, 10, 0, 0, 0, time.UTC) +mock := clock.NewMock(start) +fmt.Println("now:", mock.Now()) +ctx, cfn := mock.DeadlineContext(context.Background(), start.Add(time.Hour)) +defer cfn() +fmt.Println("err:", ctx.Err()) +dl, _ := ctx.Deadline() +mock.Set(dl) +fmt.Println("now:", clock.Now(ctx)) +<-ctx.Done() +fmt.Println("err:", ctx.Err()) +// Output: +// now: 2018-01-01 10:00:00 +0000 UTC +// err: +// now: 2018-01-01 11:00:00 +0000 UTC +// err: context deadline exceeded +``` diff --git a/vendor/github.com/tilinna/clock/clock.go b/vendor/github.com/tilinna/clock/clock.go new file mode 100644 index 0000000000..54f100fa9b --- /dev/null +++ b/vendor/github.com/tilinna/clock/clock.go @@ -0,0 +1,111 @@ +// Package clock implements a library for mocking time. +// +// Usage +// +// Include a Clock variable on your application and initialize it with +// a Realtime() by default. Then use the Clock for all time-related API +// calls. So instead of time.NewTimer(), say myClock.NewTimer(). +// +// On a test setup, override or inject the variable with a Mock instance +// and use it to control how the time behaves during each test phase. +// +// To mock context.WithTimeout and context.WithDeadline, use the included +// Context, TimeoutContext and DeadlineContext methods. +// +// The Context method is also useful in cases where you need to pass a +// Clock via an 'func(ctx Context, ..)' API you can't change yourself. +// The FromContext method will then return the associated Clock instance. +// Alternatively, use the context'ed methods like Sleep(ctx) directly. +// +// All methods are safe for concurrent use. +package clock + +import ( + "context" + "time" +) + +// Clock represents an interface to the functions in the standard time and context packages. +type Clock interface { + After(d time.Duration) <-chan time.Time + AfterFunc(d time.Duration, f func()) *Timer + NewTicker(d time.Duration) *Ticker + NewTimer(d time.Duration) *Timer + Now() time.Time + Since(t time.Time) time.Duration + Sleep(d time.Duration) + Tick(d time.Duration) <-chan time.Time + Until(t time.Time) time.Duration + + // DeadlineContext returns a copy of the parent context with the associated + // Clock deadline adjusted to be no later than d. + DeadlineContext(parent context.Context, d time.Time) (context.Context, context.CancelFunc) + + // TimeoutContext returns DeadlineContext(parent, Now(parent).Add(timeout)). + TimeoutContext(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) +} + +type clock struct{} + +var realtime = clock{} + +// Realtime returns the standard real-time Clock. +func Realtime() Clock { + return realtime +} + +func (clock) After(d time.Duration) <-chan time.Time { + return time.After(d) +} + +func (clock) AfterFunc(d time.Duration, f func()) *Timer { + return &Timer{timer: time.AfterFunc(d, f)} +} + +func (clock) NewTicker(d time.Duration) *Ticker { + t := time.NewTicker(d) + return &Ticker{ + C: t.C, + ticker: t, + } +} + +func (clock) NewTimer(d time.Duration) *Timer { + t := time.NewTimer(d) + return &Timer{ + C: t.C, + timer: t, + } +} + +func (clock) Now() time.Time { + return time.Now() +} + +func (clock) Since(t time.Time) time.Duration { + return time.Since(t) +} + +func (clock) Sleep(d time.Duration) { + time.Sleep(d) +} + +func (clock) Tick(d time.Duration) <-chan time.Time { + // Using time.Tick would trigger a vet tool warning. + if d <= 0 { + return nil + } + return time.NewTicker(d).C +} + +func (clock) Until(t time.Time) time.Duration { + return time.Until(t) +} + +func (clock) DeadlineContext(parent context.Context, d time.Time) (context.Context, context.CancelFunc) { + return context.WithDeadline(parent, d) +} + +func (clock) TimeoutContext(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + return context.WithTimeout(parent, timeout) +} diff --git a/vendor/github.com/tilinna/clock/context.go b/vendor/github.com/tilinna/clock/context.go new file mode 100644 index 0000000000..d9ec72e7ac --- /dev/null +++ b/vendor/github.com/tilinna/clock/context.go @@ -0,0 +1,76 @@ +package clock + +import ( + "context" + "time" +) + +type clockKey struct{} + +// Context returns a copy of parent in which the Clock is associated with. +func Context(parent context.Context, c Clock) context.Context { + return context.WithValue(parent, clockKey{}, c) +} + +// FromContext returns the Clock associated with the context, or Realtime(). +func FromContext(ctx context.Context) Clock { + if c, ok := ctx.Value(clockKey{}).(Clock); ok { + return c + } + return Realtime() +} + +// After is a convenience wrapper for FromContext(ctx).After. +func After(ctx context.Context, d time.Duration) <-chan time.Time { + return FromContext(ctx).After(d) +} + +// AfterFunc is a convenience wrapper for FromContext(ctx).AfterFunc. +func AfterFunc(ctx context.Context, d time.Duration, f func()) *Timer { + return FromContext(ctx).AfterFunc(d, f) +} + +// NewTicker is a convenience wrapper for FromContext(ctx).NewTicker. +func NewTicker(ctx context.Context, d time.Duration) *Ticker { + return FromContext(ctx).NewTicker(d) +} + +// NewTimer is a convenience wrapper for FromContext(ctx).NewTimer. +func NewTimer(ctx context.Context, d time.Duration) *Timer { + return FromContext(ctx).NewTimer(d) +} + +// Now is a convenience wrapper for FromContext(ctx).Now. +func Now(ctx context.Context) time.Time { + return FromContext(ctx).Now() +} + +// Since is a convenience wrapper for FromContext(ctx).Since. +func Since(ctx context.Context, t time.Time) time.Duration { + return FromContext(ctx).Since(t) +} + +// Sleep is a convenience wrapper for FromContext(ctx).Sleep. +func Sleep(ctx context.Context, d time.Duration) { + FromContext(ctx).Sleep(d) +} + +// Tick is a convenience wrapper for FromContext(ctx).Tick. +func Tick(ctx context.Context, d time.Duration) <-chan time.Time { + return FromContext(ctx).Tick(d) +} + +// Until is a convenience wrapper for FromContext(ctx).Until. +func Until(ctx context.Context, t time.Time) time.Duration { + return FromContext(ctx).Until(t) +} + +// DeadlineContext is a convenience wrapper for FromContext(ctx).DeadlineContext. +func DeadlineContext(ctx context.Context, d time.Time) (context.Context, context.CancelFunc) { + return FromContext(ctx).DeadlineContext(ctx, d) +} + +// TimeoutContext is a convenience wrapper for FromContext(ctx).TimeoutContext. +func TimeoutContext(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + return FromContext(ctx).TimeoutContext(ctx, timeout) +} diff --git a/vendor/github.com/tilinna/clock/go.mod b/vendor/github.com/tilinna/clock/go.mod new file mode 100644 index 0000000000..f9e7912a2c --- /dev/null +++ b/vendor/github.com/tilinna/clock/go.mod @@ -0,0 +1,3 @@ +module github.com/tilinna/clock + +go 1.8 diff --git a/vendor/github.com/tilinna/clock/heap.go b/vendor/github.com/tilinna/clock/heap.go new file mode 100644 index 0000000000..3daf01af8b --- /dev/null +++ b/vendor/github.com/tilinna/clock/heap.go @@ -0,0 +1,83 @@ +package clock + +import ( + "container/heap" + "time" +) + +type mockTimer struct { + deadline time.Time + fire func() time.Duration + mock *Mock + heapIndex int +} + +const removed = -1 + +func newMockTimer(m *Mock, d time.Time) *mockTimer { + return &mockTimer{ + deadline: d, + mock: m, + heapIndex: removed, + } +} + +func (t mockTimer) stopped() bool { + return t.heapIndex == removed +} + +// timerHeap implements mockTimers with a heap. +type timerHeap []*mockTimer + +func (h timerHeap) Len() int { return len(h) } + +func (h timerHeap) Less(i, j int) bool { + return h[i].deadline.Before(h[j].deadline) +} + +func (h timerHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].heapIndex = i + h[j].heapIndex = j +} + +func (h *timerHeap) Push(x interface{}) { + n := len(*h) + t := x.(*mockTimer) + t.heapIndex = n + *h = append(*h, t) +} + +func (h *timerHeap) Pop() interface{} { + old := *h + n := len(old) + t := old[n-1] + t.heapIndex = removed + *h = old[0 : n-1] + return t +} + +func (h *timerHeap) start(t *mockTimer) { + heap.Push(h, t) +} + +func (h *timerHeap) stop(t *mockTimer) { + if !t.stopped() { + heap.Remove(h, t.heapIndex) + } +} + +func (h *timerHeap) reset(t *mockTimer) { + if !t.stopped() { + heap.Fix(h, t.heapIndex) + } else { + heap.Push(h, t) + } +} + +func (h timerHeap) next() *mockTimer { + if len(h) == 0 { + return nil + } + return h[0] +} diff --git a/vendor/github.com/tilinna/clock/mock.go b/vendor/github.com/tilinna/clock/mock.go new file mode 100644 index 0000000000..7127d7dfe2 --- /dev/null +++ b/vendor/github.com/tilinna/clock/mock.go @@ -0,0 +1,177 @@ +package clock + +import ( + "context" + "sync" + "time" +) + +type mockTimers interface { + start(t *mockTimer) + stop(t *mockTimer) + reset(t *mockTimer) + next() *mockTimer +} + +// Mock implements a Clock that only moves with Add, AddNext and Set. +// +// The clock can be suspended with Lock and resumed with Unlock. +// While suspended, all attempts to use the API will block. +// +// To increase predictability, all Mock methods acquire +// and release the Mutex only once during their execution. +type Mock struct { + sync.Mutex + now time.Time + mockTimers +} + +// NewMock returns a new Mock with current time set to now. +// +// Use Realtime to get the real-time Clock. +func NewMock(now time.Time) *Mock { + return &Mock{ + now: now, + mockTimers: &timerHeap{}, + } +} + +// Add advances the current time by duration d and fires all expired timers. +// +// Returns the new current time. +// To increase predictability and speed, Tickers are ticked only once per call. +func (m *Mock) Add(d time.Duration) time.Time { + m.Lock() + defer m.Unlock() + now, _ := m.set(m.now.Add(d)) + return now +} + +// AddNext advances the current time to the next available timer deadline +// and fires all expired timers. +// +// Returns the new current time and the advanced duration. +func (m *Mock) AddNext() (time.Time, time.Duration) { + m.Lock() + defer m.Unlock() + t := m.next() + if t == nil { + return m.now, 0 + } + return m.set(t.deadline) +} + +// Set advances the current time to t and fires all expired timers. +// +// Returns the advanced duration. +// To increase predictability and speed, Tickers are ticked only once per call. +func (m *Mock) Set(t time.Time) time.Duration { + m.Lock() + defer m.Unlock() + _, d := m.set(t) + return d +} + +func (m *Mock) set(now time.Time) (time.Time, time.Duration) { + cur := m.now + for { + t := m.next() + if t == nil || t.deadline.After(now) { + m.now = now + return m.now, m.now.Sub(cur) + } + m.now = t.deadline + if d := t.fire(); d == 0 { + // Timers are always stopped. + m.stop(t) + } else { + // Ticker's next deadline is set to the first tick after the new now. + dd := (now.Sub(m.now)/d + 1) * d + t.deadline = m.now.Add(dd) + m.reset(t) + } + } +} + +// Now returns the current mocked time. +func (m *Mock) Now() time.Time { + m.Lock() + defer m.Unlock() + return m.now +} + +// Since returns the time elapsed since t. +func (m *Mock) Since(t time.Time) time.Duration { + m.Lock() + defer m.Unlock() + return m.now.Sub(t) +} + +// Until returns the duration until t. +func (m *Mock) Until(t time.Time) time.Duration { + m.Lock() + defer m.Unlock() + return t.Sub(m.now) +} + +// DeadlineContext implements Clock. +func (m *Mock) DeadlineContext(parent context.Context, d time.Time) (context.Context, context.CancelFunc) { + m.Lock() + defer m.Unlock() + return m.deadlineContext(parent, d) +} + +// TimeoutContext implements Clock. +func (m *Mock) TimeoutContext(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + m.Lock() + defer m.Unlock() + return m.deadlineContext(parent, m.now.Add(timeout)) +} + +func (m *Mock) deadlineContext(parent context.Context, deadline time.Time) (context.Context, context.CancelFunc) { + cancelCtx, cancel := context.WithCancel(Context(parent, m)) + if pd, ok := parent.Deadline(); ok && !pd.After(deadline) { + return cancelCtx, cancel + } + ctx := &mockCtx{ + Context: cancelCtx, + done: make(chan struct{}), + deadline: deadline, + } + t := m.newTimerFunc(deadline, nil) + go func() { + select { + case <-t.C: + ctx.err = context.DeadlineExceeded + case <-cancelCtx.Done(): + ctx.err = cancelCtx.Err() + defer t.Stop() + } + close(ctx.done) + }() + return ctx, cancel +} + +type mockCtx struct { + context.Context + deadline time.Time + done chan struct{} + err error +} + +func (ctx *mockCtx) Deadline() (time.Time, bool) { + return ctx.deadline, true +} + +func (ctx *mockCtx) Done() <-chan struct{} { + return ctx.done +} + +func (ctx *mockCtx) Err() error { + select { + case <-ctx.done: + return ctx.err + default: + return nil + } +} diff --git a/vendor/github.com/tilinna/clock/ticker.go b/vendor/github.com/tilinna/clock/ticker.go new file mode 100644 index 0000000000..421c049568 --- /dev/null +++ b/vendor/github.com/tilinna/clock/ticker.go @@ -0,0 +1,63 @@ +package clock + +import ( + "errors" + "time" +) + +// Ticker represents a time.Ticker. +type Ticker struct { + C <-chan time.Time + ticker *time.Ticker + *mockTimer +} + +// NewTicker returns a new Ticker containing a channel that will send the +// current time with a period specified by the duration d. +func (m *Mock) NewTicker(d time.Duration) *Ticker { + m.Lock() + defer m.Unlock() + if d <= 0 { + panic(errors.New("non-positive interval for NewTicker")) + } + return m.newTicker(d) +} + +// Tick is a convenience wrapper for NewTicker providing access to the ticking +// channel only. +func (m *Mock) Tick(d time.Duration) <-chan time.Time { + m.Lock() + defer m.Unlock() + if d <= 0 { + return nil + } + return m.newTicker(d).C +} + +func (m *Mock) newTicker(d time.Duration) *Ticker { + c := make(chan time.Time, 1) + t := &Ticker{ + C: c, + mockTimer: newMockTimer(m, m.now.Add(d)), + } + t.fire = func() time.Duration { + select { + case c <- m.now: + default: + } + return d + } + m.start(t.mockTimer) + return t +} + +// Stop turns off a ticker. After Stop, no more ticks will be sent. +func (t *Ticker) Stop() { + if t.ticker != nil { + t.ticker.Stop() + return + } + t.mock.Lock() + defer t.mock.Unlock() + t.mock.stop(t.mockTimer) +} diff --git a/vendor/github.com/tilinna/clock/timer.go b/vendor/github.com/tilinna/clock/timer.go new file mode 100644 index 0000000000..f03a127505 --- /dev/null +++ b/vendor/github.com/tilinna/clock/timer.go @@ -0,0 +1,109 @@ +package clock + +import "time" + +// Timer represents a time.Timer. +type Timer struct { + C <-chan time.Time + timer *time.Timer + *mockTimer +} + +// After waits for the duration to elapse and then sends the current time on +// the returned channel. +// +// A negative or zero duration fires the underlying timer immediately. +func (m *Mock) After(d time.Duration) <-chan time.Time { + return m.NewTimer(d).C +} + +// AfterFunc waits for the duration to elapse and then calls f in its own goroutine. +// It returns a Timer that can be used to cancel the call using its Stop method. +// +// A negative or zero duration fires the timer immediately. +func (m *Mock) AfterFunc(d time.Duration, f func()) *Timer { + m.Lock() + defer m.Unlock() + return m.newTimerFunc(m.now.Add(d), f) +} + +// NewTimer creates a new Timer that will send the current time on its channel +// after at least duration d. +// +// A negative or zero duration fires the timer immediately. +func (m *Mock) NewTimer(d time.Duration) *Timer { + m.Lock() + defer m.Unlock() + return m.newTimerFunc(m.now.Add(d), nil) +} + +// Sleep pauses the current goroutine for at least the duration d. +// +// A negative or zero duration causes Sleep to return immediately. +func (m *Mock) Sleep(d time.Duration) { + <-m.After(d) +} + +func (m *Mock) newTimerFunc(deadline time.Time, afterFunc func()) *Timer { + t := &Timer{ + mockTimer: newMockTimer(m, deadline), + } + if afterFunc != nil { + t.fire = func() time.Duration { + go afterFunc() + return 0 + } + } else { + c := make(chan time.Time, 1) + t.C = c + t.fire = func() time.Duration { + select { + case c <- m.now: + default: + } + return 0 + } + } + if !t.deadline.After(m.now) { + t.fire() + } else { + m.start(t.mockTimer) + } + return t +} + +// Stop prevents the Timer from firing. +// It returns true if the call stops the timer, false if the timer has already +// expired or been stopped. +func (t *Timer) Stop() bool { + if t.timer != nil { + return t.timer.Stop() + } + t.mock.Lock() + defer t.mock.Unlock() + wasActive := !t.mockTimer.stopped() + t.mock.stop(t.mockTimer) + return wasActive +} + +// Reset changes the timer to expire after duration d. +// It returns true if the timer had been active, false if the timer had +// expired or been stopped. +// +// A negative or zero duration fires the timer immediately. +func (t *Timer) Reset(d time.Duration) bool { + if t.timer != nil { + return t.timer.Reset(d) + } + t.mock.Lock() + defer t.mock.Unlock() + wasActive := !t.mockTimer.stopped() + t.deadline = t.mock.now.Add(d) + if !t.deadline.After(t.mock.now) { + t.fire() + t.mock.stop(t.mockTimer) + } else { + t.mock.reset(t.mockTimer) + } + return wasActive +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 1d8ee48661..c95558cc6f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -328,6 +328,8 @@ github.com/syndtr/goleveldb/leveldb/filter github.com/syndtr/goleveldb/leveldb/journal github.com/syndtr/goleveldb/leveldb/memdb github.com/syndtr/goleveldb/leveldb/table +# github.com/tilinna/clock v1.0.2 +github.com/tilinna/clock # github.com/tyler-smith/go-bip39 v0.0.0-20181017060643-dbb3b84ba2ef github.com/tyler-smith/go-bip39 github.com/tyler-smith/go-bip39/wordlists