From 029b8c96702f73d8225b31f060aeab6adb4ad898 Mon Sep 17 00:00:00 2001 From: Javier Peletier Date: Tue, 10 Sep 2019 15:34:37 +0200 Subject: [PATCH 1/8] pss: refactor forward_cache. Started extracting into own package --- pss/forwardcache/forward_cache.go | 106 +++++++++++++++++++++++++ pss/forwardcache/forward_cache_test.go | 92 +++++++++++++++++++++ pss/forwardcache/mock.go | 29 +++++++ pss/pss.go | 106 ++++--------------------- pss/pss_test.go | 85 -------------------- 5 files changed, 241 insertions(+), 177 deletions(-) create mode 100644 pss/forwardcache/forward_cache.go create mode 100644 pss/forwardcache/forward_cache_test.go create mode 100644 pss/forwardcache/mock.go diff --git a/pss/forwardcache/forward_cache.go b/pss/forwardcache/forward_cache.go new file mode 100644 index 0000000000..3c5254651d --- /dev/null +++ b/pss/forwardcache/forward_cache.go @@ -0,0 +1,106 @@ +package forwardcache + +import ( + "fmt" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/pss/message" + "sync" + "time" +) + +// ForwardCache is used for preventing backwards routing +// will also be instrumental in flood guard mechanism +// and mailbox implementation +type ForwardCache interface { + AddFwdCache(msg *message.Message) error + CheckFwdCache(msg *message.Message) bool +} + +type Config struct { + CacheTTL time.Duration + QuitC chan struct{} +} + +type forwardCache struct { + Config + fwdCache map[message.Digest]cacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg + fwdCacheMu sync.RWMutex +} + +type cacheEntry struct { + expiresAt time.Time +} + +func NewForwardCache(config *Config) *forwardCache { + fc := &forwardCache{ + fwdCache: make(map[message.Digest]cacheEntry), + Config: *config, + } + return fc + +} + +// add a message to the cache +func (fc *forwardCache) AddFwdCache(msg *message.Message) error { + defer metrics.GetOrRegisterResettingTimer("pss.addfwdcache", nil).UpdateSince(time.Now()) + + var entry cacheEntry + var ok bool + + fc.fwdCacheMu.Lock() + defer fc.fwdCacheMu.Unlock() + + digest := msg.Digest() + if entry, ok = fc.fwdCache[digest]; !ok { + entry = cacheEntry{} + } + entry.expiresAt = time.Now().Add(fc.CacheTTL) + fc.fwdCache[digest] = entry + return nil +} + +// check if message is in the cache +func (fc *forwardCache) CheckFwdCache(msg *message.Message) bool { + fc.fwdCacheMu.Lock() + defer fc.fwdCacheMu.Unlock() + + digest := msg.Digest() + entry, ok := fc.fwdCache[digest] + if ok { + if entry.expiresAt.After(time.Now()) { + log.Trace("unexpired cache", "digest", fmt.Sprintf("%x", digest)) + metrics.GetOrRegisterCounter("pss.checkfwdcache.unexpired", nil).Inc(1) + return true + } + metrics.GetOrRegisterCounter("pss.checkfwdcache.expired", nil).Inc(1) + } + return false +} + +// cleanFwdCache is used to periodically remove expired entries from the forward cache +func (fc *forwardCache) cleanFwdCache() { + metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1) + fc.fwdCacheMu.Lock() + defer fc.fwdCacheMu.Unlock() + for k, v := range fc.fwdCache { + if v.expiresAt.Before(time.Now()) { + delete(fc.fwdCache, k) + } + } +} + +func (fc *forwardCache) startCacheCleaner() { + go func() { + cacheTicker := time.NewTicker(fc.CacheTTL) + defer cacheTicker.Stop() + for { + select { + case <-cacheTicker.C: + fc.cleanFwdCache() + case <-fc.QuitC: + return + } + } + }() +} diff --git a/pss/forwardcache/forward_cache_test.go b/pss/forwardcache/forward_cache_test.go new file mode 100644 index 0000000000..e2fddfbd32 --- /dev/null +++ b/pss/forwardcache/forward_cache_test.go @@ -0,0 +1,92 @@ +package forwardcache + +import ( + "context" + "encoding/hex" + "github.com/ethersphere/swarm/pss/crypto" + "github.com/ethersphere/swarm/pss/message" + "testing" + "time" +) + +func TestCache(t *testing.T) { + var err error + to, _ := hex.DecodeString("08090a0b0c0d0e0f1011121314150001020304050607161718191a1b1c1d1e1f") + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + cryptoBackend := crypto.New() + keys, err := cryptoBackend.NewKeyPair(ctx) + privkey, err := cryptoBackend.GetPrivateKey(keys) + if err != nil { + t.Fatal(err) + } + + testCache := NewMockForwardCache(nil) + + data := []byte("foo") + datatwo := []byte("bar") + datathree := []byte("baz") + wparams := &crypto.WrapParams{ + Src: privkey, + Dst: &privkey.PublicKey, + } + env, err := cryptoBackend.WrapMessage(data, wparams) + msg := &message.Message{ + Payload: env, + To: to, + Topic: message.Topic{}, + } + envtwo, err := cryptoBackend.WrapMessage(datatwo, wparams) + msgtwo := &message.Message{ + Payload: envtwo, + To: to, + Topic: message.Topic{}, + } + envthree, err := cryptoBackend.WrapMessage(datathree, wparams) + msgthree := &message.Message{ + Payload: envthree, + To: to, + Topic: message.Topic{}, + } + + digestone := msg.Digest() + digesttwo := msgtwo.Digest() + digestthree := msgthree.Digest() + + if digestone == digesttwo { + t.Fatalf("different msgs return same hash: %d", digesttwo) + } + + // check the cache + err = testCache.AddFwdCache(msg) + if err != nil { + t.Fatalf("write to pss expire cache failed: %v", err) + } + + if !testCache.CheckFwdCache(msg) { + t.Fatalf("message %v should have EXPIRE record in cache but checkCache returned false", msg) + } + + if testCache.CheckFwdCache(msgtwo) { + t.Fatalf("message %v should NOT have EXPIRE record in cache but checkCache returned true", msgtwo) + } + + time.Sleep(testCache.CacheTTL + 1*time.Second) + err = testCache.AddFwdCache(msgthree) + if err != nil { + t.Fatalf("write to pss expire cache failed: %v", err) + } + + if testCache.CheckFwdCache(msg) { + t.Fatalf("message %v should have expired from cache but checkCache returned true", msg) + } + + if _, ok := testCache.fwdCache[digestthree]; !ok { + t.Fatalf("unexpired message should be in the cache: %v", digestthree) + } + + if _, ok := testCache.fwdCache[digesttwo]; ok { + t.Fatalf("expired message should have been cleared from the cache: %v", digesttwo) + } +} diff --git a/pss/forwardcache/mock.go b/pss/forwardcache/mock.go new file mode 100644 index 0000000000..e8d8449ee7 --- /dev/null +++ b/pss/forwardcache/mock.go @@ -0,0 +1,29 @@ +package forwardcache + +import ( + "github.com/ethersphere/swarm/pss/message" + "time" +) + +const ( + defaultCacheTTL = time.Second * 10 +) + +func NewMockForwardCache(config *Config) *forwardCache { + if config == nil { + config = &Config{ + CacheTTL: defaultCacheTTL, + QuitC: nil, + } + } + + if &config.CacheTTL == nil { + config.CacheTTL = defaultCacheTTL + } + fc := &forwardCache{ + fwdCache: make(map[message.Digest]cacheEntry), + Config: *config, + } + return fc + +} diff --git a/pss/pss.go b/pss/pss.go index ed8287d8a0..f06ff0b82f 100644 --- a/pss/pss.go +++ b/pss/pss.go @@ -26,6 +26,8 @@ import ( "sync" "time" + "github.com/ethersphere/swarm/pss/forwardcache" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" @@ -37,8 +39,6 @@ import ( "github.com/ethersphere/swarm/pot" "github.com/ethersphere/swarm/pss/crypto" "github.com/ethersphere/swarm/pss/message" - "github.com/ethersphere/swarm/storage" - "golang.org/x/crypto/sha3" ) const ( @@ -50,7 +50,6 @@ const ( defaultOutboxCapacity = 100000 protocolName = "pss" protocolVersion = 2 - hasherCount = 8 ) var ( @@ -66,13 +65,6 @@ var spec = &protocols.Spec{ }, } -// cache is used for preventing backwards routing -// will also be instrumental in flood guard mechanism -// and mailbox implementation -type cacheEntry struct { - expiresAt time.Time -} - // abstraction to enable access to p2p.protocols.Peer.Send type senderPeer interface { Info() *p2p.PeerInfo @@ -201,6 +193,7 @@ func (o outbox) reenqueue(slot int) { type Pss struct { *network.Kademlia // we can get the Kademlia address from this *KeyStore + forwardcache.ForwardCache privateKey *ecdsa.PrivateKey // pss can have it's own independent key auxAPIs []rpc.API // builtins (handshake, test) can add APIs @@ -209,17 +202,13 @@ type Pss struct { peers map[string]*protocols.Peer // keep track of all peers sitting on the pssmsg routing layer peersMu sync.RWMutex - fwdCache map[message.Digest]cacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg - fwdCacheMu sync.RWMutex - cacheTTL time.Duration // how long to keep messages in fwdCache (not implemented) - msgTTL time.Duration - capstring string - outbox outbox + msgTTL time.Duration + capstring string + outbox outbox // message handling handlers map[message.Topic]map[*handler]bool // topic and version based pss payload handlers. See pss.Handle() handlersMu sync.RWMutex - hashPool sync.Pool topicHandlerCaps map[message.Topic]*handlerCaps // caches capabilities of each topic's handlers topicHandlerCapsMu sync.RWMutex @@ -251,27 +240,18 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) { quitC: make(chan struct{}), peers: make(map[string]*protocols.Peer), - fwdCache: make(map[message.Digest]cacheEntry), - cacheTTL: params.CacheTTL, msgTTL: params.MsgTTL, capstring: c.String(), handlers: make(map[message.Topic]map[*handler]bool), topicHandlerCaps: make(map[message.Topic]*handlerCaps), - - hashPool: sync.Pool{ - New: func() interface{} { - return sha3.NewLegacyKeccak256() - }, - }, } + ps.ForwardCache = forwardcache.NewForwardCache(&forwardcache.Config{ + CacheTTL: params.CacheTTL, + QuitC: ps.quitC, + }) ps.outbox = newOutbox(defaultOutboxCapacity, ps.quitC, ps.forward) - for i := 0; i < hasherCount; i++ { - hashfunc := storage.MakeHashFunc(storage.DefaultHash)() - ps.hashPool.Put(hashfunc) - } - return ps, nil } @@ -282,13 +262,9 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) { func (p *Pss) Start(srv *p2p.Server) error { go func() { ticker := time.NewTicker(defaultCleanInterval) - cacheTicker := time.NewTicker(p.cacheTTL) defer ticker.Stop() - defer cacheTicker.Stop() for { select { - case <-cacheTicker.C: - p.cleanFwdCache() case <-ticker.C: p.cleanKeys() case <-p.quitC: @@ -472,11 +448,11 @@ func (p *Pss) handle(ctx context.Context, msg interface{}) error { log.Warn("pss filtered expired message", "from", hex.EncodeToString(p.Kademlia.BaseAddr()), "to", hex.EncodeToString(pssmsg.To)) return nil } - if p.checkFwdCache(pssmsg) { + if p.CheckFwdCache(pssmsg) { log.Trace("pss relay block-cache match (process)", "from", hex.EncodeToString(p.Kademlia.BaseAddr()), "to", (hex.EncodeToString(pssmsg.To))) return nil } - p.addFwdCache(pssmsg) + p.AddFwdCache(pssmsg) psstopic := pssmsg.Topic @@ -642,7 +618,7 @@ func (p *Pss) SendRaw(address PssAddress, topic message.Topic, msg []byte) error pssMsg.Payload = msg pssMsg.Topic = topic - p.addFwdCache(pssMsg) + p.AddFwdCache(pssMsg) return p.enqueue(pssMsg) } @@ -811,7 +787,7 @@ func (p *Pss) forward(msg *message.Message) error { }) // cache the message - p.addFwdCache(msg) + p.AddFwdCache(msg) if sent == 0 { return errors.New("unable to forward to any peers") @@ -819,64 +795,10 @@ func (p *Pss) forward(msg *message.Message) error { return nil } } - -///////////////////////////////////////////////////////////////////// -// SECTION: Caching -///////////////////////////////////////////////////////////////////// - -// cleanFwdCache is used to periodically remove expired entries from the forward cache -func (p *Pss) cleanFwdCache() { - metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1) - p.fwdCacheMu.Lock() - defer p.fwdCacheMu.Unlock() - for k, v := range p.fwdCache { - if v.expiresAt.Before(time.Now()) { - delete(p.fwdCache, k) - } - } -} - func label(b []byte) string { return fmt.Sprintf("%04x", b[:2]) } -// add a message to the cache -func (p *Pss) addFwdCache(msg *message.Message) error { - defer metrics.GetOrRegisterResettingTimer("pss.addfwdcache", nil).UpdateSince(time.Now()) - - var entry cacheEntry - var ok bool - - p.fwdCacheMu.Lock() - defer p.fwdCacheMu.Unlock() - - digest := msg.Digest() - if entry, ok = p.fwdCache[digest]; !ok { - entry = cacheEntry{} - } - entry.expiresAt = time.Now().Add(p.cacheTTL) - p.fwdCache[digest] = entry - return nil -} - -// check if message is in the cache -func (p *Pss) checkFwdCache(msg *message.Message) bool { - p.fwdCacheMu.Lock() - defer p.fwdCacheMu.Unlock() - - digest := msg.Digest() - entry, ok := p.fwdCache[digest] - if ok { - if entry.expiresAt.After(time.Now()) { - log.Trace("unexpired cache", "digest", fmt.Sprintf("%x", digest)) - metrics.GetOrRegisterCounter("pss.checkfwdcache.unexpired", nil).Inc(1) - return true - } - metrics.GetOrRegisterCounter("pss.checkfwdcache.expired", nil).Inc(1) - } - return false -} - func validateAddress(addr PssAddress) error { if len(addr) > addressLength { return errors.New("address too long") diff --git a/pss/pss_test.go b/pss/pss_test.go index c73428404d..b1deb94808 100644 --- a/pss/pss_test.go +++ b/pss/pss_test.go @@ -105,91 +105,6 @@ func TestAPITopic(t *testing.T) { } // test if we can insert into cache, match items with cache and cache expiry -func TestCache(t *testing.T) { - var err error - to, _ := hex.DecodeString("08090a0b0c0d0e0f1011121314150001020304050607161718191a1b1c1d1e1f") - privkey, err := ethCrypto.GenerateKey() - if err != nil { - t.Fatal(err) - } - ps := newTestPss(privkey, nil, nil) - defer ps.Stop() - pp := NewParams().WithPrivateKey(privkey) - data := []byte("foo") - datatwo := []byte("bar") - datathree := []byte("baz") - wparams := &crypto.WrapParams{ - Sender: privkey, - Receiver: &privkey.PublicKey, - } - env, err := ps.Crypto.Wrap(data, wparams) - msg := &message.Message{ - Payload: env, - To: to, - Topic: PingTopic, - } - envtwo, err := ps.Crypto.Wrap(datatwo, wparams) - msgtwo := &message.Message{ - Payload: envtwo, - To: to, - Topic: PingTopic, - } - envthree, err := ps.Crypto.Wrap(datathree, wparams) - msgthree := &message.Message{ - Payload: envthree, - To: to, - Topic: PingTopic, - } - - digestone := msg.Digest() - if err != nil { - t.Fatalf("could not store cache msgone: %v", err) - } - digesttwo := msgtwo.Digest() - if err != nil { - t.Fatalf("could not store cache msgtwo: %v", err) - } - digestthree := msgthree.Digest() - if err != nil { - t.Fatalf("could not store cache msgthree: %v", err) - } - - if digestone == digesttwo { - t.Fatalf("different msgs return same hash: %d", digesttwo) - } - - // check the cache - err = ps.addFwdCache(msg) - if err != nil { - t.Fatalf("write to pss expire cache failed: %v", err) - } - - if !ps.checkFwdCache(msg) { - t.Fatalf("message %v should have EXPIRE record in cache but checkCache returned false", msg) - } - - if ps.checkFwdCache(msgtwo) { - t.Fatalf("message %v should NOT have EXPIRE record in cache but checkCache returned true", msgtwo) - } - - time.Sleep(pp.CacheTTL + 1*time.Second) - err = ps.addFwdCache(msgthree) - if err != nil { - t.Fatalf("write to pss expire cache failed: %v", err) - } - - if ps.checkFwdCache(msg) { - t.Fatalf("message %v should have expired from cache but checkCache returned true", msg) - } - - if _, ok := ps.fwdCache[digestthree]; !ok { - t.Fatalf("unexpired message should be in the cache: %v", digestthree) - } - - if _, ok := ps.fwdCache[digesttwo]; ok { - t.Fatalf("expired message should have been cleared from the cache: %v", digesttwo) - } -} // matching of address hints; whether a message could be or is for the node func TestAddressMatch(t *testing.T) { From 251ae3992a28aa6925556bb49ce635356b3658ae Mon Sep 17 00:00:00 2001 From: Javier Peletier Date: Tue, 10 Sep 2019 18:33:42 +0200 Subject: [PATCH 2/8] pss: Refactored forwardCache into a TTLSet --- pss/forwardcache/forward_cache.go | 106 ---------------- pss/forwardcache/forward_cache_test.go | 92 -------------- pss/forwardcache/mock.go | 29 ----- pss/internal/ttlset/ttlset.go | 131 ++++++++++++++++++++ pss/internal/ttlset/ttlset_test.go | 81 ++++++++++++ pss/internal/ttlset/ttlset_whitebox_test.go | 108 ++++++++++++++++ pss/pss.go | 28 +++-- pss/pss_test.go | 1 - 8 files changed, 338 insertions(+), 238 deletions(-) delete mode 100644 pss/forwardcache/forward_cache.go delete mode 100644 pss/forwardcache/forward_cache_test.go delete mode 100644 pss/forwardcache/mock.go create mode 100644 pss/internal/ttlset/ttlset.go create mode 100644 pss/internal/ttlset/ttlset_test.go create mode 100644 pss/internal/ttlset/ttlset_whitebox_test.go diff --git a/pss/forwardcache/forward_cache.go b/pss/forwardcache/forward_cache.go deleted file mode 100644 index 3c5254651d..0000000000 --- a/pss/forwardcache/forward_cache.go +++ /dev/null @@ -1,106 +0,0 @@ -package forwardcache - -import ( - "fmt" - "github.com/ethereum/go-ethereum/metrics" - "github.com/ethersphere/swarm/log" - "github.com/ethersphere/swarm/pss/message" - "sync" - "time" -) - -// ForwardCache is used for preventing backwards routing -// will also be instrumental in flood guard mechanism -// and mailbox implementation -type ForwardCache interface { - AddFwdCache(msg *message.Message) error - CheckFwdCache(msg *message.Message) bool -} - -type Config struct { - CacheTTL time.Duration - QuitC chan struct{} -} - -type forwardCache struct { - Config - fwdCache map[message.Digest]cacheEntry // checksum of unique fields from pssmsg mapped to expiry, cache to determine whether to drop msg - fwdCacheMu sync.RWMutex -} - -type cacheEntry struct { - expiresAt time.Time -} - -func NewForwardCache(config *Config) *forwardCache { - fc := &forwardCache{ - fwdCache: make(map[message.Digest]cacheEntry), - Config: *config, - } - return fc - -} - -// add a message to the cache -func (fc *forwardCache) AddFwdCache(msg *message.Message) error { - defer metrics.GetOrRegisterResettingTimer("pss.addfwdcache", nil).UpdateSince(time.Now()) - - var entry cacheEntry - var ok bool - - fc.fwdCacheMu.Lock() - defer fc.fwdCacheMu.Unlock() - - digest := msg.Digest() - if entry, ok = fc.fwdCache[digest]; !ok { - entry = cacheEntry{} - } - entry.expiresAt = time.Now().Add(fc.CacheTTL) - fc.fwdCache[digest] = entry - return nil -} - -// check if message is in the cache -func (fc *forwardCache) CheckFwdCache(msg *message.Message) bool { - fc.fwdCacheMu.Lock() - defer fc.fwdCacheMu.Unlock() - - digest := msg.Digest() - entry, ok := fc.fwdCache[digest] - if ok { - if entry.expiresAt.After(time.Now()) { - log.Trace("unexpired cache", "digest", fmt.Sprintf("%x", digest)) - metrics.GetOrRegisterCounter("pss.checkfwdcache.unexpired", nil).Inc(1) - return true - } - metrics.GetOrRegisterCounter("pss.checkfwdcache.expired", nil).Inc(1) - } - return false -} - -// cleanFwdCache is used to periodically remove expired entries from the forward cache -func (fc *forwardCache) cleanFwdCache() { - metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1) - fc.fwdCacheMu.Lock() - defer fc.fwdCacheMu.Unlock() - for k, v := range fc.fwdCache { - if v.expiresAt.Before(time.Now()) { - delete(fc.fwdCache, k) - } - } -} - -func (fc *forwardCache) startCacheCleaner() { - go func() { - cacheTicker := time.NewTicker(fc.CacheTTL) - defer cacheTicker.Stop() - for { - select { - case <-cacheTicker.C: - fc.cleanFwdCache() - case <-fc.QuitC: - return - } - } - }() -} diff --git a/pss/forwardcache/forward_cache_test.go b/pss/forwardcache/forward_cache_test.go deleted file mode 100644 index e2fddfbd32..0000000000 --- a/pss/forwardcache/forward_cache_test.go +++ /dev/null @@ -1,92 +0,0 @@ -package forwardcache - -import ( - "context" - "encoding/hex" - "github.com/ethersphere/swarm/pss/crypto" - "github.com/ethersphere/swarm/pss/message" - "testing" - "time" -) - -func TestCache(t *testing.T) { - var err error - to, _ := hex.DecodeString("08090a0b0c0d0e0f1011121314150001020304050607161718191a1b1c1d1e1f") - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - - cryptoBackend := crypto.New() - keys, err := cryptoBackend.NewKeyPair(ctx) - privkey, err := cryptoBackend.GetPrivateKey(keys) - if err != nil { - t.Fatal(err) - } - - testCache := NewMockForwardCache(nil) - - data := []byte("foo") - datatwo := []byte("bar") - datathree := []byte("baz") - wparams := &crypto.WrapParams{ - Src: privkey, - Dst: &privkey.PublicKey, - } - env, err := cryptoBackend.WrapMessage(data, wparams) - msg := &message.Message{ - Payload: env, - To: to, - Topic: message.Topic{}, - } - envtwo, err := cryptoBackend.WrapMessage(datatwo, wparams) - msgtwo := &message.Message{ - Payload: envtwo, - To: to, - Topic: message.Topic{}, - } - envthree, err := cryptoBackend.WrapMessage(datathree, wparams) - msgthree := &message.Message{ - Payload: envthree, - To: to, - Topic: message.Topic{}, - } - - digestone := msg.Digest() - digesttwo := msgtwo.Digest() - digestthree := msgthree.Digest() - - if digestone == digesttwo { - t.Fatalf("different msgs return same hash: %d", digesttwo) - } - - // check the cache - err = testCache.AddFwdCache(msg) - if err != nil { - t.Fatalf("write to pss expire cache failed: %v", err) - } - - if !testCache.CheckFwdCache(msg) { - t.Fatalf("message %v should have EXPIRE record in cache but checkCache returned false", msg) - } - - if testCache.CheckFwdCache(msgtwo) { - t.Fatalf("message %v should NOT have EXPIRE record in cache but checkCache returned true", msgtwo) - } - - time.Sleep(testCache.CacheTTL + 1*time.Second) - err = testCache.AddFwdCache(msgthree) - if err != nil { - t.Fatalf("write to pss expire cache failed: %v", err) - } - - if testCache.CheckFwdCache(msg) { - t.Fatalf("message %v should have expired from cache but checkCache returned true", msg) - } - - if _, ok := testCache.fwdCache[digestthree]; !ok { - t.Fatalf("unexpired message should be in the cache: %v", digestthree) - } - - if _, ok := testCache.fwdCache[digesttwo]; ok { - t.Fatalf("expired message should have been cleared from the cache: %v", digesttwo) - } -} diff --git a/pss/forwardcache/mock.go b/pss/forwardcache/mock.go deleted file mode 100644 index e8d8449ee7..0000000000 --- a/pss/forwardcache/mock.go +++ /dev/null @@ -1,29 +0,0 @@ -package forwardcache - -import ( - "github.com/ethersphere/swarm/pss/message" - "time" -) - -const ( - defaultCacheTTL = time.Second * 10 -) - -func NewMockForwardCache(config *Config) *forwardCache { - if config == nil { - config = &Config{ - CacheTTL: defaultCacheTTL, - QuitC: nil, - } - } - - if &config.CacheTTL == nil { - config.CacheTTL = defaultCacheTTL - } - fc := &forwardCache{ - fwdCache: make(map[message.Digest]cacheEntry), - Config: *config, - } - return fc - -} diff --git a/pss/internal/ttlset/ttlset.go b/pss/internal/ttlset/ttlset.go new file mode 100644 index 0000000000..b3c43f216c --- /dev/null +++ b/pss/internal/ttlset/ttlset.go @@ -0,0 +1,131 @@ +package ttlset + +import ( + "errors" + "sync" + "time" + + "github.com/tilinna/clock" +) + +// TTLSet implements a Set that automatically removes expired items +// after a predefined expiration time +type TTLSet interface { + Add(key interface{}) error // Add adds a new key to the set + Has(key interface{}) bool // Check returns whether or not the key is already/still in the set + Start() error // Start launches this service + Stop() error // Stop will close the service and release all resources +} + +// Config defines the TTLSet configuration +type Config struct { + EntryTTL time.Duration // time after which items are removed + Clock clock.Clock // time reference +} + +type setEntry struct { + expiresAt time.Time +} + +type ttlSet struct { + Config + quitC chan struct{} + set map[interface{}]setEntry + lock sync.RWMutex +} + +// ErrAlreadyStarted is returned if this service was already started and Start() is called again +var ErrAlreadyStarted = errors.New("Already started") + +// ErrAlreadyStopped is returned if this service was already stopped and Stop() is called again +var ErrAlreadyStopped = errors.New("Already stopped") + +// New instances a the default ForwardCache implementation +func New(config *Config) TTLSet { + ts := &ttlSet{ + set: make(map[interface{}]setEntry), + Config: *config, + } + return ts +} + +// Add adds a new key to the set +func (ts *ttlSet) Add(key interface{}) error { + var entry setEntry + var ok bool + + ts.lock.Lock() + defer ts.lock.Unlock() + + if entry, ok = ts.set[key]; !ok { + entry = setEntry{} + } + entry.expiresAt = ts.Clock.Now().Add(ts.EntryTTL) + ts.set[key] = entry + return nil +} + +// Has returns whether or not a key is already/still in the set +func (ts *ttlSet) Has(key interface{}) bool { + ts.lock.Lock() + defer ts.lock.Unlock() + + entry, ok := ts.set[key] + if ok { + if entry.expiresAt.After(ts.Clock.Now()) { + return true + } + delete(ts.set, key) // since we're holding the lock, take the chance to delete a expired record + } + return false +} + +// clean is used to periodically remove expired entries from the set +func (ts *ttlSet) clean() { + ts.lock.Lock() + defer ts.lock.Unlock() + for k, v := range ts.set { + if v.expiresAt.Before(ts.Clock.Now()) { + delete(ts.set, k) + } + } +} + +func (ts *ttlSet) newTicker(callback func()) { + ticker := ts.Clock.NewTicker(ts.EntryTTL) + + go func() { + defer ticker.Stop() + for { + select { + case <-ticker.C: + callback() + case <-ts.quitC: + return + } + } + }() +} + +// Start launches this service +func (ts *ttlSet) Start() error { + if ts.quitC != nil { + return ErrAlreadyStarted + } + ts.quitC = make(chan struct{}) + ts.newTicker(func() { + ts.clean() + }) + + return nil +} + +// Stop will close the service and release all resources +func (ts *ttlSet) Stop() error { + if ts.quitC == nil { + return ErrAlreadyStopped + } + close(ts.quitC) + ts.quitC = nil + return nil +} diff --git a/pss/internal/ttlset/ttlset_test.go b/pss/internal/ttlset/ttlset_test.go new file mode 100644 index 0000000000..9ae79943e6 --- /dev/null +++ b/pss/internal/ttlset/ttlset_test.go @@ -0,0 +1,81 @@ +package ttlset_test + +import ( + "testing" + "time" + + "github.com/epiclabs-io/ut" + "github.com/ethersphere/swarm/pss/internal/ttlset" + "github.com/tilinna/clock" +) + +func TestTTLSet(tx *testing.T) { + t := ut.BeginTest(tx, false) // set to true to generate test results + defer t.FinishTest() + var err error + + testClock := clock.NewMock(time.Unix(0, 0)) + + testEntryTTL := 10 * time.Second + testSet := ttlset.New(&ttlset.Config{ + EntryTTL: testEntryTTL, + Clock: testClock, + }) + + // start the service + err = testSet.Start() + t.Ok(err) + + // starting again must return an error + err = testSet.Start() + t.MustFailWith(err, ttlset.ErrAlreadyStarted) + + key1 := "some key" + key2 := "some other key" + + // check adding a key to the set + err = testSet.Add(key1) + t.Ok(err) + + // check if the key is now there: + hasKey := testSet.Has(key1) + t.Assert(hasKey == true, "key1 should've been in the set, but Has() returned false") + + // check if Has() returns false when asked about a key that was never added: + hasKey = testSet.Has("some made up key") + t.Assert(hasKey == false, "Has() should have returned false when presented with a key that was never added") + + // Let some time pass, but not enough to have the key expire: + testClock.Add(testEntryTTL / 2) + + // check if the key is still there: + hasKey = testSet.Has(key1) + t.Assert(hasKey == true, "key1 should've been in the set, but Has() returned false") + + // Let some time pass well beyond the expiry time, so key1 expires: + testClock.Add(testEntryTTL * 2) + + // Add another key to the set: + err = testSet.Add(key2) + t.Ok(err) + + hasKey = testSet.Has(key1) + t.Assert(hasKey == false, "key1 should've been removed from the set, but Has() returned true") + + hasKey = testSet.Has(key2) + t.Assert(hasKey == true, "key should remain in the set, but Has() returned false") + + // Let some time pass well beyond key2's expiry time, so key2 expires: + testClock.Add(testEntryTTL * 2) + + hasKey = testSet.Has(key2) + t.Assert(hasKey == false, "key2 should have been wiped, but Has() returned true") + + // stop the service + err = testSet.Stop() + t.Ok(err) + + // stopping again must return an error + err = testSet.Stop() + t.MustFailWith(err, ttlset.ErrAlreadyStopped) +} diff --git a/pss/internal/ttlset/ttlset_whitebox_test.go b/pss/internal/ttlset/ttlset_whitebox_test.go new file mode 100644 index 0000000000..4c8b86b046 --- /dev/null +++ b/pss/internal/ttlset/ttlset_whitebox_test.go @@ -0,0 +1,108 @@ +package ttlset + +import ( + "sync" + "testing" + "time" + + "github.com/epiclabs-io/ut" + "github.com/tilinna/clock" +) + +// white-box testing for automatic cleaning feature + +func TestClean(tx *testing.T) { + t := ut.BeginTest(tx, false) // set to true to generate test results + defer t.FinishTest() + var err error + + testClock := clock.NewMock(time.Unix(0, 0)) + + testEntryTTL := 10 * time.Second + testSet := New(&Config{ + EntryTTL: testEntryTTL, + Clock: testClock, + }).(*ttlSet) + + key1 := "some key" + key2 := "some later key" + + // check adding a message to the cache + err = testSet.Add(key1) + t.Ok(err) + + // move the clock 2 seconds + testClock.Add(2 * time.Second) + + // add a second key which will have a later expiration time + err = testSet.Add(key2) + t.Ok(err) + + // Check if both keys were added to the internal map. + _, hasKey := testSet.set[key1] + t.Assert(hasKey == true, "Expected the set to contain key1") + _, hasKey = testSet.set[key2] + t.Assert(hasKey == true, "Expected the set to contain key2") + + testSet.clean() // attempt a cleanup. This cleanup should not affect any of the two keys, since they are not expired. + + // Thus, check if both keys are still in the internal map: + _, hasKey = testSet.set[key1] + t.Assert(hasKey == true, "Expected the set to still contain key1") + _, hasKey = testSet.set[key2] + t.Assert(hasKey == true, "Expected the set to still contain key2") + + //Now, move the clock forward 9 seconds. This will have the effect of wiping key1 but keeping key2 + testClock.Add(9 * time.Second) + testSet.clean() // invoke the internal cleaning function, which should wipe only key1 + + //Verify if key1 was wiped but key2 persists: + _, hasKey = testSet.set[key1] + t.Assert(hasKey == false, "Expected the set to have removed key1") + _, hasKey = testSet.set[key2] + t.Assert(hasKey == true, "Expected the set to still contain key2") + + //Now, move the clock some more time. This will wipe key2 + testClock.Add(7 * time.Second) + testSet.clean() // invoke the internal cleaning function, which should wipe only key1 + + // verify the map is now empty + t.Assert(len(testSet.set) == 0, "Expected the set to be empty") + +} + +// TestNewTicker tests whether newTicker calls the callback function periodically +func TestNewTicker(tx *testing.T) { + t := ut.BeginTest(tx, false) // set to true to generate test results + defer t.FinishTest() + var err error + + testClock := clock.NewMock(time.Unix(0, 0)) + + testEntryTTL := 1 * time.Second + testSet := New(&Config{ + EntryTTL: testEntryTTL, + Clock: testClock, + }).(*ttlSet) + + err = testSet.Start() + t.Ok(err) + + wg := sync.WaitGroup{} + wg.Add(10) + tickWait := make(chan bool) + testSet.newTicker(func() { + wg.Done() + tickWait <- true + }) + + for i := 0; i < 10; i++ { + testClock.Add(testEntryTTL) + <-tickWait + } + + wg.Wait() + err = testSet.Stop() + t.Ok(err) + +} diff --git a/pss/pss.go b/pss/pss.go index f06ff0b82f..e902b74bee 100644 --- a/pss/pss.go +++ b/pss/pss.go @@ -26,8 +26,6 @@ import ( "sync" "time" - "github.com/ethersphere/swarm/pss/forwardcache" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" @@ -38,7 +36,9 @@ import ( "github.com/ethersphere/swarm/p2p/protocols" "github.com/ethersphere/swarm/pot" "github.com/ethersphere/swarm/pss/crypto" + "github.com/ethersphere/swarm/pss/internal/ttlset" "github.com/ethersphere/swarm/pss/message" + "github.com/tilinna/clock" ) const ( @@ -193,7 +193,7 @@ func (o outbox) reenqueue(slot int) { type Pss struct { *network.Kademlia // we can get the Kademlia address from this *KeyStore - forwardcache.ForwardCache + forwardCache ttlset.TTLSet privateKey *ecdsa.PrivateKey // pss can have it's own independent key auxAPIs []rpc.API // builtins (handshake, test) can add APIs @@ -246,9 +246,9 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) { handlers: make(map[message.Topic]map[*handler]bool), topicHandlerCaps: make(map[message.Topic]*handlerCaps), } - ps.ForwardCache = forwardcache.NewForwardCache(&forwardcache.Config{ - CacheTTL: params.CacheTTL, - QuitC: ps.quitC, + ps.forwardCache = ttlset.New(&ttlset.Config{ + EntryTTL: params.CacheTTL, + Clock: clock.Realtime(), }) ps.outbox = newOutbox(defaultOutboxCapacity, ps.quitC, ps.forward) @@ -273,6 +273,10 @@ func (p *Pss) Start(srv *p2p.Server) error { } }() + if err := p.forwardCache.Start(); err != nil { + return err + } + // Forward outbox messages go p.outbox.processOutbox() @@ -283,6 +287,9 @@ func (p *Pss) Start(srv *p2p.Server) error { func (p *Pss) Stop() error { log.Info("Pss shutting down") + if err := p.forwardCache.Stop(); err != nil { + return err + } close(p.quitC) return nil } @@ -448,11 +455,12 @@ func (p *Pss) handle(ctx context.Context, msg interface{}) error { log.Warn("pss filtered expired message", "from", hex.EncodeToString(p.Kademlia.BaseAddr()), "to", hex.EncodeToString(pssmsg.To)) return nil } - if p.CheckFwdCache(pssmsg) { + msgDigest := pssmsg.Digest() + if p.forwardCache.Has(msgDigest) { log.Trace("pss relay block-cache match (process)", "from", hex.EncodeToString(p.Kademlia.BaseAddr()), "to", (hex.EncodeToString(pssmsg.To))) return nil } - p.AddFwdCache(pssmsg) + p.forwardCache.Add(msgDigest) psstopic := pssmsg.Topic @@ -618,7 +626,7 @@ func (p *Pss) SendRaw(address PssAddress, topic message.Topic, msg []byte) error pssMsg.Payload = msg pssMsg.Topic = topic - p.AddFwdCache(pssMsg) + p.forwardCache.Add(pssMsg.Digest()) return p.enqueue(pssMsg) } @@ -787,7 +795,7 @@ func (p *Pss) forward(msg *message.Message) error { }) // cache the message - p.AddFwdCache(msg) + p.forwardCache.Add(msg.Digest()) if sent == 0 { return errors.New("unable to forward to any peers") diff --git a/pss/pss_test.go b/pss/pss_test.go index b1deb94808..624b1f33fa 100644 --- a/pss/pss_test.go +++ b/pss/pss_test.go @@ -21,7 +21,6 @@ import ( "context" "crypto/ecdsa" "encoding/binary" - "encoding/hex" "errors" "fmt" "math/rand" From 24a21473e8f0ee94667d3661871b6576124c4514 Mon Sep 17 00:00:00 2001 From: Javier Peletier Date: Tue, 10 Sep 2019 18:46:22 +0200 Subject: [PATCH 3/8] Added tilina/clock mockable time package for testing --- go.mod | 1 + go.sum | 2 ++ 2 files changed, 3 insertions(+) diff --git a/go.mod b/go.mod index ae588e7cb2..8e2cbebde4 100644 --- a/go.mod +++ b/go.mod @@ -71,6 +71,7 @@ require ( github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 // indirect github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 // indirect github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965 + github.com/tilinna/clock v1.0.2 github.com/tyler-smith/go-bip39 v0.0.0-20181017060643-dbb3b84ba2ef // indirect github.com/uber-go/atomic v1.4.0 // indirect github.com/uber/jaeger-client-go v0.0.0-20180607151842-f7e0d4744fa6 diff --git a/go.sum b/go.sum index b00075ba4b..53aa19a0e9 100644 --- a/go.sum +++ b/go.sum @@ -249,6 +249,8 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965 h1:V/AztY/q2oW5ghho7YMgUJQkKvSACHRxpeDyT5DxpIo= github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA= +github.com/tilinna/clock v1.0.2 h1:6BO2tyAC9JbPExKH/z9zl44FLu1lImh3nDNKA0kgrkI= +github.com/tilinna/clock v1.0.2/go.mod h1:ZsP7BcY7sEEz7ktc0IVy8Us6boDrK8VradlKRUGfOao= github.com/tyler-smith/go-bip39 v0.0.0-20181017060643-dbb3b84ba2ef h1:luEzjJzktS9eU0CmI0uApXHLP/lKzOoRPrJhd71J8ik= github.com/tyler-smith/go-bip39 v0.0.0-20181017060643-dbb3b84ba2ef/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs= github.com/uber-go/atomic v1.4.0 h1:yOuPqEq4ovnhEjpHmfFwsqBXDYbQeT6Nb0bwD6XnD5o= From 8c7a8e5affe8f80971fbc904706049003aa0c785 Mon Sep 17 00:00:00 2001 From: Javier Peletier Date: Wed, 11 Sep 2019 15:52:33 +0200 Subject: [PATCH 4/8] pss/ttlset: carved out ticker for testability --- pss/internal/ttlset/ticker/ticker.go | 54 ++++++++++++ pss/internal/ttlset/ticker/ticker_test.go | 47 ++++++++++ pss/internal/ttlset/ttlset.go | 95 +++++++-------------- pss/internal/ttlset/ttlset_test.go | 10 +-- pss/internal/ttlset/ttlset_whitebox_test.go | 39 +-------- pss/pss.go | 6 +- 6 files changed, 133 insertions(+), 118 deletions(-) create mode 100644 pss/internal/ttlset/ticker/ticker.go create mode 100644 pss/internal/ttlset/ticker/ticker_test.go diff --git a/pss/internal/ttlset/ticker/ticker.go b/pss/internal/ttlset/ticker/ticker.go new file mode 100644 index 0000000000..24e1184967 --- /dev/null +++ b/pss/internal/ttlset/ticker/ticker.go @@ -0,0 +1,54 @@ +package ticker + +import ( + "errors" + "time" + + "github.com/tilinna/clock" +) + +// Config defines the necessary information and dependencies to instantiate a Ticker +type Config struct { + Clock clock.Clock + Interval time.Duration + Callback func() +} + +// Ticker represents a periodic timer that invokes a callback +type Ticker struct { + quitC chan struct{} +} + +// ErrAlreadyStopped is returned if this service was already stopped and Stop() is called again +var ErrAlreadyStopped = errors.New("Already stopped") + +// New builds a ticker that will call the given callback function periodically +func New(config *Config) *Ticker { + + tk := &Ticker{ + quitC: make(chan struct{}), + } + ticker := config.Clock.NewTicker(config.Interval) + go func() { + defer ticker.Stop() + for { + select { + case <-ticker.C: + config.Callback() + case <-tk.quitC: + return + } + } + }() + return tk +} + +// Stop stops the timer and releases the goroutine running it. +func (tk *Ticker) Stop() error { + if tk.quitC == nil { + return ErrAlreadyStopped + } + close(tk.quitC) + tk.quitC = nil + return nil +} diff --git a/pss/internal/ttlset/ticker/ticker_test.go b/pss/internal/ttlset/ticker/ticker_test.go new file mode 100644 index 0000000000..4f0fb19aa8 --- /dev/null +++ b/pss/internal/ttlset/ticker/ticker_test.go @@ -0,0 +1,47 @@ +package ticker_test + +import ( + "sync" + "testing" + "time" + + "github.com/epiclabs-io/ut" + "github.com/ethersphere/swarm/pss/internal/ttlset/ticker" + "github.com/tilinna/clock" +) + +// TestNewTicker tests whether the ticker calls a callback function periodically +func TestNewTicker(tx *testing.T) { + t := ut.BeginTest(tx, false) // set to true to generate test results + defer t.FinishTest() + var err error + + testClock := clock.NewMock(time.Unix(0, 0)) + interval := 10 * time.Second + + wg := sync.WaitGroup{} + wg.Add(10) + tickWait := make(chan bool) + + testTicker := ticker.New(&ticker.Config{ + Interval: interval, + Clock: testClock, + Callback: func() { + wg.Done() + tickWait <- true + }, + }) + + for i := 0; i < 10; i++ { + testClock.Add(interval) + <-tickWait + } + + wg.Wait() + err = testTicker.Stop() + t.Ok(err) + + err = testTicker.Stop() + t.MustFailWith(err, ticker.ErrAlreadyStopped) + +} diff --git a/pss/internal/ttlset/ttlset.go b/pss/internal/ttlset/ttlset.go index b3c43f216c..79e5f34219 100644 --- a/pss/internal/ttlset/ttlset.go +++ b/pss/internal/ttlset/ttlset.go @@ -1,56 +1,53 @@ package ttlset import ( - "errors" "sync" "time" + "github.com/ethersphere/swarm/pss/internal/ttlset/ticker" "github.com/tilinna/clock" ) -// TTLSet implements a Set that automatically removes expired items -// after a predefined expiration time -type TTLSet interface { - Add(key interface{}) error // Add adds a new key to the set - Has(key interface{}) bool // Check returns whether or not the key is already/still in the set - Start() error // Start launches this service - Stop() error // Stop will close the service and release all resources -} - // Config defines the TTLSet configuration type Config struct { EntryTTL time.Duration // time after which items are removed Clock clock.Clock // time reference } -type setEntry struct { - expiresAt time.Time -} - -type ttlSet struct { +// TTLSet implements a Set that automatically removes expired keys +// after a predefined expiration time +type TTLSet struct { Config - quitC chan struct{} - set map[interface{}]setEntry - lock sync.RWMutex + set map[interface{}]setEntry + lock sync.RWMutex + ticker *ticker.Ticker } -// ErrAlreadyStarted is returned if this service was already started and Start() is called again -var ErrAlreadyStarted = errors.New("Already started") - -// ErrAlreadyStopped is returned if this service was already stopped and Stop() is called again -var ErrAlreadyStopped = errors.New("Already stopped") +type setEntry struct { + expiresAt time.Time +} -// New instances a the default ForwardCache implementation -func New(config *Config) TTLSet { - ts := &ttlSet{ +// New instances a TTLSet +func New(config *Config) *TTLSet { + ts := &TTLSet{ set: make(map[interface{}]setEntry), Config: *config, } + + ticker := ticker.New(&ticker.Config{ + Interval: config.EntryTTL, + Clock: config.Clock, + Callback: func() { + ts.clean() + }, + }) + ts.ticker = ticker + return ts } // Add adds a new key to the set -func (ts *ttlSet) Add(key interface{}) error { +func (ts *TTLSet) Add(key interface{}) error { var entry setEntry var ok bool @@ -66,7 +63,7 @@ func (ts *ttlSet) Add(key interface{}) error { } // Has returns whether or not a key is already/still in the set -func (ts *ttlSet) Has(key interface{}) bool { +func (ts *TTLSet) Has(key interface{}) bool { ts.lock.Lock() defer ts.lock.Unlock() @@ -80,8 +77,8 @@ func (ts *ttlSet) Has(key interface{}) bool { return false } -// clean is used to periodically remove expired entries from the set -func (ts *ttlSet) clean() { +// clean is used internally to periodically remove expired entries from the set +func (ts *TTLSet) clean() { ts.lock.Lock() defer ts.lock.Unlock() for k, v := range ts.set { @@ -91,41 +88,7 @@ func (ts *ttlSet) clean() { } } -func (ts *ttlSet) newTicker(callback func()) { - ticker := ts.Clock.NewTicker(ts.EntryTTL) - - go func() { - defer ticker.Stop() - for { - select { - case <-ticker.C: - callback() - case <-ts.quitC: - return - } - } - }() -} - -// Start launches this service -func (ts *ttlSet) Start() error { - if ts.quitC != nil { - return ErrAlreadyStarted - } - ts.quitC = make(chan struct{}) - ts.newTicker(func() { - ts.clean() - }) - - return nil -} - // Stop will close the service and release all resources -func (ts *ttlSet) Stop() error { - if ts.quitC == nil { - return ErrAlreadyStopped - } - close(ts.quitC) - ts.quitC = nil - return nil +func (ts *TTLSet) Stop() error { + return ts.ticker.Stop() } diff --git a/pss/internal/ttlset/ttlset_test.go b/pss/internal/ttlset/ttlset_test.go index 9ae79943e6..813b84ca77 100644 --- a/pss/internal/ttlset/ttlset_test.go +++ b/pss/internal/ttlset/ttlset_test.go @@ -22,14 +22,6 @@ func TestTTLSet(tx *testing.T) { Clock: testClock, }) - // start the service - err = testSet.Start() - t.Ok(err) - - // starting again must return an error - err = testSet.Start() - t.MustFailWith(err, ttlset.ErrAlreadyStarted) - key1 := "some key" key2 := "some other key" @@ -77,5 +69,5 @@ func TestTTLSet(tx *testing.T) { // stopping again must return an error err = testSet.Stop() - t.MustFailWith(err, ttlset.ErrAlreadyStopped) + t.MustFail(err, "Expected Stop() to fail if the service is already stopped") } diff --git a/pss/internal/ttlset/ttlset_whitebox_test.go b/pss/internal/ttlset/ttlset_whitebox_test.go index 4c8b86b046..7bbd3eeb16 100644 --- a/pss/internal/ttlset/ttlset_whitebox_test.go +++ b/pss/internal/ttlset/ttlset_whitebox_test.go @@ -1,7 +1,6 @@ package ttlset import ( - "sync" "testing" "time" @@ -22,7 +21,7 @@ func TestClean(tx *testing.T) { testSet := New(&Config{ EntryTTL: testEntryTTL, Clock: testClock, - }).(*ttlSet) + }) key1 := "some key" key2 := "some later key" @@ -70,39 +69,3 @@ func TestClean(tx *testing.T) { t.Assert(len(testSet.set) == 0, "Expected the set to be empty") } - -// TestNewTicker tests whether newTicker calls the callback function periodically -func TestNewTicker(tx *testing.T) { - t := ut.BeginTest(tx, false) // set to true to generate test results - defer t.FinishTest() - var err error - - testClock := clock.NewMock(time.Unix(0, 0)) - - testEntryTTL := 1 * time.Second - testSet := New(&Config{ - EntryTTL: testEntryTTL, - Clock: testClock, - }).(*ttlSet) - - err = testSet.Start() - t.Ok(err) - - wg := sync.WaitGroup{} - wg.Add(10) - tickWait := make(chan bool) - testSet.newTicker(func() { - wg.Done() - tickWait <- true - }) - - for i := 0; i < 10; i++ { - testClock.Add(testEntryTTL) - <-tickWait - } - - wg.Wait() - err = testSet.Stop() - t.Ok(err) - -} diff --git a/pss/pss.go b/pss/pss.go index e902b74bee..3591bc514a 100644 --- a/pss/pss.go +++ b/pss/pss.go @@ -193,7 +193,7 @@ func (o outbox) reenqueue(slot int) { type Pss struct { *network.Kademlia // we can get the Kademlia address from this *KeyStore - forwardCache ttlset.TTLSet + forwardCache *ttlset.TTLSet privateKey *ecdsa.PrivateKey // pss can have it's own independent key auxAPIs []rpc.API // builtins (handshake, test) can add APIs @@ -273,10 +273,6 @@ func (p *Pss) Start(srv *p2p.Server) error { } }() - if err := p.forwardCache.Start(); err != nil { - return err - } - // Forward outbox messages go p.outbox.processOutbox() From fed32b693e36191ec363d35dfc4adfb303886257 Mon Sep 17 00:00:00 2001 From: Javier Peletier Date: Wed, 11 Sep 2019 16:35:20 +0200 Subject: [PATCH 5/8] pss: Added metrics back --- pss/internal/ttlset/ttlset.go | 4 ++++ pss/internal/ttlset/ttlset_test.go | 7 +++++++ pss/pss.go | 31 ++++++++++++++++++++++++------ 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/pss/internal/ttlset/ttlset.go b/pss/internal/ttlset/ttlset.go index 79e5f34219..6c1b7abf9a 100644 --- a/pss/internal/ttlset/ttlset.go +++ b/pss/internal/ttlset/ttlset.go @@ -12,6 +12,7 @@ import ( type Config struct { EntryTTL time.Duration // time after which items are removed Clock clock.Clock // time reference + OnClean func() // Callback that will be fired every time items are automatically removed on expiry } // TTLSet implements a Set that automatically removes expired keys @@ -86,6 +87,9 @@ func (ts *TTLSet) clean() { delete(ts.set, k) } } + if ts.Config.OnClean != nil { + ts.Config.OnClean() + } } // Stop will close the service and release all resources diff --git a/pss/internal/ttlset/ttlset_test.go b/pss/internal/ttlset/ttlset_test.go index 813b84ca77..4017006307 100644 --- a/pss/internal/ttlset/ttlset_test.go +++ b/pss/internal/ttlset/ttlset_test.go @@ -15,11 +15,15 @@ func TestTTLSet(tx *testing.T) { var err error testClock := clock.NewMock(time.Unix(0, 0)) + waitClean := make(chan bool) testEntryTTL := 10 * time.Second testSet := ttlset.New(&ttlset.Config{ EntryTTL: testEntryTTL, Clock: testClock, + OnClean: func() { + waitClean <- true + }, }) key1 := "some key" @@ -47,6 +51,8 @@ func TestTTLSet(tx *testing.T) { // Let some time pass well beyond the expiry time, so key1 expires: testClock.Add(testEntryTTL * 2) + <-waitClean // Will only continue if the clean function was indeed called + // Add another key to the set: err = testSet.Add(key2) t.Ok(err) @@ -59,6 +65,7 @@ func TestTTLSet(tx *testing.T) { // Let some time pass well beyond key2's expiry time, so key2 expires: testClock.Add(testEntryTTL * 2) + <-waitClean // Will only continue if the clean function was indeed called hasKey = testSet.Has(key2) t.Assert(hasKey == false, "key2 should have been wiped, but Has() returned true") diff --git a/pss/pss.go b/pss/pss.go index 3591bc514a..5afa193c6d 100644 --- a/pss/pss.go +++ b/pss/pss.go @@ -248,7 +248,10 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) { } ps.forwardCache = ttlset.New(&ttlset.Config{ EntryTTL: params.CacheTTL, - Clock: clock.Realtime(), + Clock: clock.Realtime(), //TODO: Clock should be injected by Params so it can be mocked. + OnClean: func() { + metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1) + }, }) ps.outbox = newOutbox(defaultOutboxCapacity, ps.quitC, ps.forward) @@ -451,12 +454,11 @@ func (p *Pss) handle(ctx context.Context, msg interface{}) error { log.Warn("pss filtered expired message", "from", hex.EncodeToString(p.Kademlia.BaseAddr()), "to", hex.EncodeToString(pssmsg.To)) return nil } - msgDigest := pssmsg.Digest() - if p.forwardCache.Has(msgDigest) { + if p.checkFwdCache(pssmsg) { log.Trace("pss relay block-cache match (process)", "from", hex.EncodeToString(p.Kademlia.BaseAddr()), "to", (hex.EncodeToString(pssmsg.To))) return nil } - p.forwardCache.Add(msgDigest) + p.addFwdCache(pssmsg) psstopic := pssmsg.Topic @@ -622,7 +624,7 @@ func (p *Pss) SendRaw(address PssAddress, topic message.Topic, msg []byte) error pssMsg.Payload = msg pssMsg.Topic = topic - p.forwardCache.Add(pssMsg.Digest()) + p.addFwdCache(pssMsg) return p.enqueue(pssMsg) } @@ -791,7 +793,7 @@ func (p *Pss) forward(msg *message.Message) error { }) // cache the message - p.forwardCache.Add(msg.Digest()) + p.addFwdCache(msg) if sent == 0 { return errors.New("unable to forward to any peers") @@ -803,6 +805,23 @@ func label(b []byte) string { return fmt.Sprintf("%04x", b[:2]) } +// add a message to the cache +func (p *Pss) addFwdCache(msg *message.Message) error { + defer metrics.GetOrRegisterResettingTimer("pss.addfwdcache", nil).UpdateSince(time.Now()) + return p.forwardCache.Add(msg.Digest()) +} + +// check if message is in the cache +func (p *Pss) checkFwdCache(msg *message.Message) bool { + hit := p.forwardCache.Has(msg.Digest()) + if hit { + metrics.GetOrRegisterCounter("pss.checkfwdcache.hit", nil).Inc(1) + } else { + metrics.GetOrRegisterCounter("pss.checkfwdcache.miss", nil).Inc(1) + } + return hit +} + func validateAddress(addr PssAddress) error { if len(addr) > addressLength { return errors.New("address too long") From 77c7a74615a498cdd4e6a9f4856b01837296ac1d Mon Sep 17 00:00:00 2001 From: Javier Peletier Date: Thu, 12 Sep 2019 10:17:46 +0200 Subject: [PATCH 6/8] pss: take ticker out of ttlset --- pss/internal/{ttlset => }/ticker/ticker.go | 0 .../{ttlset => }/ticker/ticker_test.go | 2 +- pss/internal/ttlset/ttlset.go | 32 +++------ pss/internal/ttlset/ttlset_test.go | 65 +++++++++++++---- pss/internal/ttlset/ttlset_whitebox_test.go | 71 ------------------- pss/pss.go | 16 ++++- 6 files changed, 76 insertions(+), 110 deletions(-) rename pss/internal/{ttlset => }/ticker/ticker.go (100%) rename pss/internal/{ttlset => }/ticker/ticker_test.go (93%) delete mode 100644 pss/internal/ttlset/ttlset_whitebox_test.go diff --git a/pss/internal/ttlset/ticker/ticker.go b/pss/internal/ticker/ticker.go similarity index 100% rename from pss/internal/ttlset/ticker/ticker.go rename to pss/internal/ticker/ticker.go diff --git a/pss/internal/ttlset/ticker/ticker_test.go b/pss/internal/ticker/ticker_test.go similarity index 93% rename from pss/internal/ttlset/ticker/ticker_test.go rename to pss/internal/ticker/ticker_test.go index 4f0fb19aa8..7090b45bba 100644 --- a/pss/internal/ttlset/ticker/ticker_test.go +++ b/pss/internal/ticker/ticker_test.go @@ -6,7 +6,7 @@ import ( "time" "github.com/epiclabs-io/ut" - "github.com/ethersphere/swarm/pss/internal/ttlset/ticker" + "github.com/ethersphere/swarm/pss/internal/ticker" "github.com/tilinna/clock" ) diff --git a/pss/internal/ttlset/ttlset.go b/pss/internal/ttlset/ttlset.go index 6c1b7abf9a..2f87a2917a 100644 --- a/pss/internal/ttlset/ttlset.go +++ b/pss/internal/ttlset/ttlset.go @@ -4,7 +4,6 @@ import ( "sync" "time" - "github.com/ethersphere/swarm/pss/internal/ttlset/ticker" "github.com/tilinna/clock" ) @@ -12,16 +11,14 @@ import ( type Config struct { EntryTTL time.Duration // time after which items are removed Clock clock.Clock // time reference - OnClean func() // Callback that will be fired every time items are automatically removed on expiry } // TTLSet implements a Set that automatically removes expired keys // after a predefined expiration time type TTLSet struct { Config - set map[interface{}]setEntry - lock sync.RWMutex - ticker *ticker.Ticker + set map[interface{}]setEntry + lock sync.RWMutex } type setEntry struct { @@ -34,16 +31,6 @@ func New(config *Config) *TTLSet { set: make(map[interface{}]setEntry), Config: *config, } - - ticker := ticker.New(&ticker.Config{ - Interval: config.EntryTTL, - Clock: config.Clock, - Callback: func() { - ts.clean() - }, - }) - ts.ticker = ticker - return ts } @@ -78,8 +65,8 @@ func (ts *TTLSet) Has(key interface{}) bool { return false } -// clean is used internally to periodically remove expired entries from the set -func (ts *TTLSet) clean() { +// GC will remove expired entries from the set +func (ts *TTLSet) GC() { ts.lock.Lock() defer ts.lock.Unlock() for k, v := range ts.set { @@ -87,12 +74,11 @@ func (ts *TTLSet) clean() { delete(ts.set, k) } } - if ts.Config.OnClean != nil { - ts.Config.OnClean() - } } -// Stop will close the service and release all resources -func (ts *TTLSet) Stop() error { - return ts.ticker.Stop() +// Count returns the number of entries in the set +func (ts *TTLSet) Count() int { + ts.lock.Lock() + defer ts.lock.Unlock() + return len(ts.set) } diff --git a/pss/internal/ttlset/ttlset_test.go b/pss/internal/ttlset/ttlset_test.go index 4017006307..04dcbf92ef 100644 --- a/pss/internal/ttlset/ttlset_test.go +++ b/pss/internal/ttlset/ttlset_test.go @@ -15,15 +15,11 @@ func TestTTLSet(tx *testing.T) { var err error testClock := clock.NewMock(time.Unix(0, 0)) - waitClean := make(chan bool) testEntryTTL := 10 * time.Second testSet := ttlset.New(&ttlset.Config{ EntryTTL: testEntryTTL, Clock: testClock, - OnClean: func() { - waitClean <- true - }, }) key1 := "some key" @@ -51,8 +47,6 @@ func TestTTLSet(tx *testing.T) { // Let some time pass well beyond the expiry time, so key1 expires: testClock.Add(testEntryTTL * 2) - <-waitClean // Will only continue if the clean function was indeed called - // Add another key to the set: err = testSet.Add(key2) t.Ok(err) @@ -65,16 +59,63 @@ func TestTTLSet(tx *testing.T) { // Let some time pass well beyond key2's expiry time, so key2 expires: testClock.Add(testEntryTTL * 2) - <-waitClean // Will only continue if the clean function was indeed called hasKey = testSet.Has(key2) t.Assert(hasKey == false, "key2 should have been wiped, but Has() returned true") +} + +func TestGC(tx *testing.T) { + t := ut.BeginTest(tx, false) // set to true to generate test results + defer t.FinishTest() + var err error + + testClock := clock.NewMock(time.Unix(0, 0)) - // stop the service - err = testSet.Stop() + testEntryTTL := 10 * time.Second + testSet := ttlset.New(&ttlset.Config{ + EntryTTL: testEntryTTL, + Clock: testClock, + }) + + key1 := "some key" + key2 := "some later key" + + // check adding a message to the cache + err = testSet.Add(key1) t.Ok(err) - // stopping again must return an error - err = testSet.Stop() - t.MustFail(err, "Expected Stop() to fail if the service is already stopped") + // move the clock 2 seconds + testClock.Add(2 * time.Second) + + // add a second key which will have a later expiration time + err = testSet.Add(key2) + t.Ok(err) + + count := testSet.Count() + t.Assert(count == 2, "Expected the set to contain 2 keys") + + testSet.GC() // attempt a cleanup. This cleanup should not affect any of the two keys, since they are not expired. + + count = testSet.Count() + t.Assert(count == 2, "Expected the set to still contain 2 keys") + + //Now, move the clock forward 9 seconds. This will expire key1 but still keep key2 + testClock.Add(9 * time.Second) + testSet.GC() // invoke the internal cleaning function, which should wipe only key1 + count = testSet.Count() + t.Assert(count == 1, "Expected the set to now have only 1 key") + //Verify if key1 was wiped but key2 persists: + hasKey := testSet.Has(key1) + t.Assert(hasKey == false, "Expected the set to have removed key1") + hasKey = testSet.Has(key2) + t.Assert(hasKey == true, "Expected the set to still contain key2") + + //Now, move the clock some more time. This will wipe key2 + testClock.Add(7 * time.Second) + testSet.GC() // invoke the internal cleaning function, which should wipe only key1 + + count = testSet.Count() + // verify the map is now empty + t.Assert(count == 0, "Expected the set to be empty") + } diff --git a/pss/internal/ttlset/ttlset_whitebox_test.go b/pss/internal/ttlset/ttlset_whitebox_test.go deleted file mode 100644 index 7bbd3eeb16..0000000000 --- a/pss/internal/ttlset/ttlset_whitebox_test.go +++ /dev/null @@ -1,71 +0,0 @@ -package ttlset - -import ( - "testing" - "time" - - "github.com/epiclabs-io/ut" - "github.com/tilinna/clock" -) - -// white-box testing for automatic cleaning feature - -func TestClean(tx *testing.T) { - t := ut.BeginTest(tx, false) // set to true to generate test results - defer t.FinishTest() - var err error - - testClock := clock.NewMock(time.Unix(0, 0)) - - testEntryTTL := 10 * time.Second - testSet := New(&Config{ - EntryTTL: testEntryTTL, - Clock: testClock, - }) - - key1 := "some key" - key2 := "some later key" - - // check adding a message to the cache - err = testSet.Add(key1) - t.Ok(err) - - // move the clock 2 seconds - testClock.Add(2 * time.Second) - - // add a second key which will have a later expiration time - err = testSet.Add(key2) - t.Ok(err) - - // Check if both keys were added to the internal map. - _, hasKey := testSet.set[key1] - t.Assert(hasKey == true, "Expected the set to contain key1") - _, hasKey = testSet.set[key2] - t.Assert(hasKey == true, "Expected the set to contain key2") - - testSet.clean() // attempt a cleanup. This cleanup should not affect any of the two keys, since they are not expired. - - // Thus, check if both keys are still in the internal map: - _, hasKey = testSet.set[key1] - t.Assert(hasKey == true, "Expected the set to still contain key1") - _, hasKey = testSet.set[key2] - t.Assert(hasKey == true, "Expected the set to still contain key2") - - //Now, move the clock forward 9 seconds. This will have the effect of wiping key1 but keeping key2 - testClock.Add(9 * time.Second) - testSet.clean() // invoke the internal cleaning function, which should wipe only key1 - - //Verify if key1 was wiped but key2 persists: - _, hasKey = testSet.set[key1] - t.Assert(hasKey == false, "Expected the set to have removed key1") - _, hasKey = testSet.set[key2] - t.Assert(hasKey == true, "Expected the set to still contain key2") - - //Now, move the clock some more time. This will wipe key2 - testClock.Add(7 * time.Second) - testSet.clean() // invoke the internal cleaning function, which should wipe only key1 - - // verify the map is now empty - t.Assert(len(testSet.set) == 0, "Expected the set to be empty") - -} diff --git a/pss/pss.go b/pss/pss.go index 5afa193c6d..a895be495d 100644 --- a/pss/pss.go +++ b/pss/pss.go @@ -36,6 +36,7 @@ import ( "github.com/ethersphere/swarm/p2p/protocols" "github.com/ethersphere/swarm/pot" "github.com/ethersphere/swarm/pss/crypto" + "github.com/ethersphere/swarm/pss/internal/ticker" "github.com/ethersphere/swarm/pss/internal/ttlset" "github.com/ethersphere/swarm/pss/message" "github.com/tilinna/clock" @@ -194,6 +195,7 @@ type Pss struct { *network.Kademlia // we can get the Kademlia address from this *KeyStore forwardCache *ttlset.TTLSet + gcTicker *ticker.Ticker privateKey *ecdsa.PrivateKey // pss can have it's own independent key auxAPIs []rpc.API // builtins (handshake, test) can add APIs @@ -228,6 +230,9 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) { if params.privateKey == nil { return nil, errors.New("missing private key for pss") } + + clock := clock.Realtime() //TODO: Clock should be injected by Params so it can be mocked. + c := p2p.Cap{ Name: protocolName, Version: protocolVersion, @@ -248,8 +253,13 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) { } ps.forwardCache = ttlset.New(&ttlset.Config{ EntryTTL: params.CacheTTL, - Clock: clock.Realtime(), //TODO: Clock should be injected by Params so it can be mocked. - OnClean: func() { + Clock: clock, + }) + ps.gcTicker = ticker.New(&ticker.Config{ + Clock: clock, + Interval: params.CacheTTL, + Callback: func() { + ps.forwardCache.GC() metrics.GetOrRegisterCounter("pss.cleanfwdcache", nil).Inc(1) }, }) @@ -286,7 +296,7 @@ func (p *Pss) Start(srv *p2p.Server) error { func (p *Pss) Stop() error { log.Info("Pss shutting down") - if err := p.forwardCache.Stop(); err != nil { + if err := p.gcTicker.Stop(); err != nil { return err } close(p.quitC) From b0232cc9dc5b270b17bc62818ad25fd44844e5b2 Mon Sep 17 00:00:00 2001 From: Javier Peletier Date: Tue, 17 Sep 2019 15:39:43 +0200 Subject: [PATCH 7/8] pss: removed ut --- pss/internal/ticker/ticker_test.go | 14 +++--- pss/internal/ttlset/ttlset_test.go | 75 +++++++++++++++++++++--------- 2 files changed, 59 insertions(+), 30 deletions(-) diff --git a/pss/internal/ticker/ticker_test.go b/pss/internal/ticker/ticker_test.go index 7090b45bba..7942ad37fd 100644 --- a/pss/internal/ticker/ticker_test.go +++ b/pss/internal/ticker/ticker_test.go @@ -5,15 +5,12 @@ import ( "testing" "time" - "github.com/epiclabs-io/ut" "github.com/ethersphere/swarm/pss/internal/ticker" "github.com/tilinna/clock" ) // TestNewTicker tests whether the ticker calls a callback function periodically -func TestNewTicker(tx *testing.T) { - t := ut.BeginTest(tx, false) // set to true to generate test results - defer t.FinishTest() +func TestNewTicker(t *testing.T) { var err error testClock := clock.NewMock(time.Unix(0, 0)) @@ -39,9 +36,12 @@ func TestNewTicker(tx *testing.T) { wg.Wait() err = testTicker.Stop() - t.Ok(err) + if err != nil { + t.Fatal(err) + } err = testTicker.Stop() - t.MustFailWith(err, ticker.ErrAlreadyStopped) - + if err != ticker.ErrAlreadyStopped { + t.Fatal("Expected Stop() to return ticker.ErrAlreadyStopped when trying to stop an already stopped ticker") + } } diff --git a/pss/internal/ttlset/ttlset_test.go b/pss/internal/ttlset/ttlset_test.go index 04dcbf92ef..4079169678 100644 --- a/pss/internal/ttlset/ttlset_test.go +++ b/pss/internal/ttlset/ttlset_test.go @@ -4,14 +4,11 @@ import ( "testing" "time" - "github.com/epiclabs-io/ut" "github.com/ethersphere/swarm/pss/internal/ttlset" "github.com/tilinna/clock" ) -func TestTTLSet(tx *testing.T) { - t := ut.BeginTest(tx, false) // set to true to generate test results - defer t.FinishTest() +func TestTTLSet(t *testing.T) { var err error testClock := clock.NewMock(time.Unix(0, 0)) @@ -27,46 +24,60 @@ func TestTTLSet(tx *testing.T) { // check adding a key to the set err = testSet.Add(key1) - t.Ok(err) + if err != nil { + t.Fatal((err)) + } // check if the key is now there: hasKey := testSet.Has(key1) - t.Assert(hasKey == true, "key1 should've been in the set, but Has() returned false") + if !(hasKey == true) { + t.Fatal("key1 should've been in the set, but Has() returned false") + } // check if Has() returns false when asked about a key that was never added: hasKey = testSet.Has("some made up key") - t.Assert(hasKey == false, "Has() should have returned false when presented with a key that was never added") + if !(hasKey == false) { + t.Fatal("Has() should have returned false when presented with a key that was never added") + } // Let some time pass, but not enough to have the key expire: testClock.Add(testEntryTTL / 2) // check if the key is still there: hasKey = testSet.Has(key1) - t.Assert(hasKey == true, "key1 should've been in the set, but Has() returned false") + if !(hasKey == true) { + t.Fatal("key1 should've been in the set, but Has() returned false") + } // Let some time pass well beyond the expiry time, so key1 expires: testClock.Add(testEntryTTL * 2) // Add another key to the set: err = testSet.Add(key2) - t.Ok(err) + if err != nil { + t.Fatal((err)) + } hasKey = testSet.Has(key1) - t.Assert(hasKey == false, "key1 should've been removed from the set, but Has() returned true") + if !(hasKey == false) { + t.Fatal("key1 should've been removed from the set, but Has() returned true") + } hasKey = testSet.Has(key2) - t.Assert(hasKey == true, "key should remain in the set, but Has() returned false") + if !(hasKey == true) { + t.Fatal("key should remain in the set, but Has() returned false") + } // Let some time pass well beyond key2's expiry time, so key2 expires: testClock.Add(testEntryTTL * 2) hasKey = testSet.Has(key2) - t.Assert(hasKey == false, "key2 should have been wiped, but Has() returned true") + if !(hasKey == false) { + t.Fatal("key2 should have been wiped, but Has() returned true") + } } -func TestGC(tx *testing.T) { - t := ut.BeginTest(tx, false) // set to true to generate test results - defer t.FinishTest() +func TestGC(t *testing.T) { var err error testClock := clock.NewMock(time.Unix(0, 0)) @@ -82,33 +93,49 @@ func TestGC(tx *testing.T) { // check adding a message to the cache err = testSet.Add(key1) - t.Ok(err) + if err != nil { + t.Fatal((err)) + } // move the clock 2 seconds testClock.Add(2 * time.Second) // add a second key which will have a later expiration time err = testSet.Add(key2) - t.Ok(err) + if err != nil { + t.Fatal((err)) + } count := testSet.Count() - t.Assert(count == 2, "Expected the set to contain 2 keys") + if !(count == 2) { + t.Fatal("Expected the set to contain 2 keys") + } testSet.GC() // attempt a cleanup. This cleanup should not affect any of the two keys, since they are not expired. count = testSet.Count() - t.Assert(count == 2, "Expected the set to still contain 2 keys") + if !(count == 2) { + t.Fatal("Expected the set to still contain 2 keys") + } //Now, move the clock forward 9 seconds. This will expire key1 but still keep key2 testClock.Add(9 * time.Second) testSet.GC() // invoke the internal cleaning function, which should wipe only key1 count = testSet.Count() - t.Assert(count == 1, "Expected the set to now have only 1 key") + if !(count == 1) { + t.Fatal("Expected the set to now have only 1 key") + } + //Verify if key1 was wiped but key2 persists: hasKey := testSet.Has(key1) - t.Assert(hasKey == false, "Expected the set to have removed key1") + if !(hasKey == false) { + t.Fatal("Expected the set to have removed key1") + } + hasKey = testSet.Has(key2) - t.Assert(hasKey == true, "Expected the set to still contain key2") + if !(hasKey == true) { + t.Fatal("Expected the set to still contain key2") + } //Now, move the clock some more time. This will wipe key2 testClock.Add(7 * time.Second) @@ -116,6 +143,8 @@ func TestGC(tx *testing.T) { count = testSet.Count() // verify the map is now empty - t.Assert(count == 0, "Expected the set to be empty") + if !(count == 0) { + t.Fatal("Expected the set to be empty") + } } From 5205a71d572dc401b7bf91152d585f2cc393267b Mon Sep 17 00:00:00 2001 From: Javier Peletier Date: Tue, 17 Sep 2019 15:46:01 +0200 Subject: [PATCH 8/8] pss: vendor in tilina/clock --- vendor/github.com/tilinna/clock/.gitignore | 1 + vendor/github.com/tilinna/clock/.travis.yml | 8 + vendor/github.com/tilinna/clock/LICENSE | 21 +++ vendor/github.com/tilinna/clock/README.md | 40 +++++ vendor/github.com/tilinna/clock/clock.go | 111 ++++++++++++ vendor/github.com/tilinna/clock/context.go | 76 +++++++++ vendor/github.com/tilinna/clock/go.mod | 3 + vendor/github.com/tilinna/clock/heap.go | 83 +++++++++ vendor/github.com/tilinna/clock/mock.go | 177 ++++++++++++++++++++ vendor/github.com/tilinna/clock/ticker.go | 63 +++++++ vendor/github.com/tilinna/clock/timer.go | 109 ++++++++++++ vendor/modules.txt | 2 + 12 files changed, 694 insertions(+) create mode 100644 vendor/github.com/tilinna/clock/.gitignore create mode 100644 vendor/github.com/tilinna/clock/.travis.yml create mode 100644 vendor/github.com/tilinna/clock/LICENSE create mode 100644 vendor/github.com/tilinna/clock/README.md create mode 100644 vendor/github.com/tilinna/clock/clock.go create mode 100644 vendor/github.com/tilinna/clock/context.go create mode 100644 vendor/github.com/tilinna/clock/go.mod create mode 100644 vendor/github.com/tilinna/clock/heap.go create mode 100644 vendor/github.com/tilinna/clock/mock.go create mode 100644 vendor/github.com/tilinna/clock/ticker.go create mode 100644 vendor/github.com/tilinna/clock/timer.go diff --git a/vendor/github.com/tilinna/clock/.gitignore b/vendor/github.com/tilinna/clock/.gitignore new file mode 100644 index 0000000000..ee1e57aef6 --- /dev/null +++ b/vendor/github.com/tilinna/clock/.gitignore @@ -0,0 +1 @@ +debug.test diff --git a/vendor/github.com/tilinna/clock/.travis.yml b/vendor/github.com/tilinna/clock/.travis.yml new file mode 100644 index 0000000000..26dae0dd55 --- /dev/null +++ b/vendor/github.com/tilinna/clock/.travis.yml @@ -0,0 +1,8 @@ +language: go +go: +- 1.11.x +- 1.10.x +- 1.9.x +- 1.8.x + +script: go test -v ./... diff --git a/vendor/github.com/tilinna/clock/LICENSE b/vendor/github.com/tilinna/clock/LICENSE new file mode 100644 index 0000000000..3b4833ea16 --- /dev/null +++ b/vendor/github.com/tilinna/clock/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 Timo Linna + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/tilinna/clock/README.md b/vendor/github.com/tilinna/clock/README.md new file mode 100644 index 0000000000..15bc816133 --- /dev/null +++ b/vendor/github.com/tilinna/clock/README.md @@ -0,0 +1,40 @@ +# clock [![GoDoc](https://godoc.org/github.com/tilinna/clock?status.png)](https://godoc.org/github.com/tilinna/clock) [![Go Report Card](https://goreportcard.com/badge/github.com/tilinna/clock)](https://goreportcard.com/report/github.com/tilinna/clock) + +A Go (golang) library for mocking standard time, optionally also with context.Context. + +## Basic Usage + +```go +// Use clock.Realtime() in production +mock := clock.NewMock(time.Date(2018, 1, 1, 10, 0, 0, 0, time.UTC)) +fmt.Println("Time is now", mock.Now()) +timer := mock.NewTimer(15 * time.Second) +mock.Add(25 * time.Second) +fmt.Println("Time is now", mock.Now()) +fmt.Println("Timeout was", <-timer.C) +// Output: +// Time is now 2018-01-01 10:00:00 +0000 UTC +// Time is now 2018-01-01 10:00:25 +0000 UTC +// Timeout was 2018-01-01 10:00:15 +0000 UTC +``` + +## Context Usage + +```go +start := time.Date(2018, 1, 1, 10, 0, 0, 0, time.UTC) +mock := clock.NewMock(start) +fmt.Println("now:", mock.Now()) +ctx, cfn := mock.DeadlineContext(context.Background(), start.Add(time.Hour)) +defer cfn() +fmt.Println("err:", ctx.Err()) +dl, _ := ctx.Deadline() +mock.Set(dl) +fmt.Println("now:", clock.Now(ctx)) +<-ctx.Done() +fmt.Println("err:", ctx.Err()) +// Output: +// now: 2018-01-01 10:00:00 +0000 UTC +// err: +// now: 2018-01-01 11:00:00 +0000 UTC +// err: context deadline exceeded +``` diff --git a/vendor/github.com/tilinna/clock/clock.go b/vendor/github.com/tilinna/clock/clock.go new file mode 100644 index 0000000000..54f100fa9b --- /dev/null +++ b/vendor/github.com/tilinna/clock/clock.go @@ -0,0 +1,111 @@ +// Package clock implements a library for mocking time. +// +// Usage +// +// Include a Clock variable on your application and initialize it with +// a Realtime() by default. Then use the Clock for all time-related API +// calls. So instead of time.NewTimer(), say myClock.NewTimer(). +// +// On a test setup, override or inject the variable with a Mock instance +// and use it to control how the time behaves during each test phase. +// +// To mock context.WithTimeout and context.WithDeadline, use the included +// Context, TimeoutContext and DeadlineContext methods. +// +// The Context method is also useful in cases where you need to pass a +// Clock via an 'func(ctx Context, ..)' API you can't change yourself. +// The FromContext method will then return the associated Clock instance. +// Alternatively, use the context'ed methods like Sleep(ctx) directly. +// +// All methods are safe for concurrent use. +package clock + +import ( + "context" + "time" +) + +// Clock represents an interface to the functions in the standard time and context packages. +type Clock interface { + After(d time.Duration) <-chan time.Time + AfterFunc(d time.Duration, f func()) *Timer + NewTicker(d time.Duration) *Ticker + NewTimer(d time.Duration) *Timer + Now() time.Time + Since(t time.Time) time.Duration + Sleep(d time.Duration) + Tick(d time.Duration) <-chan time.Time + Until(t time.Time) time.Duration + + // DeadlineContext returns a copy of the parent context with the associated + // Clock deadline adjusted to be no later than d. + DeadlineContext(parent context.Context, d time.Time) (context.Context, context.CancelFunc) + + // TimeoutContext returns DeadlineContext(parent, Now(parent).Add(timeout)). + TimeoutContext(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) +} + +type clock struct{} + +var realtime = clock{} + +// Realtime returns the standard real-time Clock. +func Realtime() Clock { + return realtime +} + +func (clock) After(d time.Duration) <-chan time.Time { + return time.After(d) +} + +func (clock) AfterFunc(d time.Duration, f func()) *Timer { + return &Timer{timer: time.AfterFunc(d, f)} +} + +func (clock) NewTicker(d time.Duration) *Ticker { + t := time.NewTicker(d) + return &Ticker{ + C: t.C, + ticker: t, + } +} + +func (clock) NewTimer(d time.Duration) *Timer { + t := time.NewTimer(d) + return &Timer{ + C: t.C, + timer: t, + } +} + +func (clock) Now() time.Time { + return time.Now() +} + +func (clock) Since(t time.Time) time.Duration { + return time.Since(t) +} + +func (clock) Sleep(d time.Duration) { + time.Sleep(d) +} + +func (clock) Tick(d time.Duration) <-chan time.Time { + // Using time.Tick would trigger a vet tool warning. + if d <= 0 { + return nil + } + return time.NewTicker(d).C +} + +func (clock) Until(t time.Time) time.Duration { + return time.Until(t) +} + +func (clock) DeadlineContext(parent context.Context, d time.Time) (context.Context, context.CancelFunc) { + return context.WithDeadline(parent, d) +} + +func (clock) TimeoutContext(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + return context.WithTimeout(parent, timeout) +} diff --git a/vendor/github.com/tilinna/clock/context.go b/vendor/github.com/tilinna/clock/context.go new file mode 100644 index 0000000000..d9ec72e7ac --- /dev/null +++ b/vendor/github.com/tilinna/clock/context.go @@ -0,0 +1,76 @@ +package clock + +import ( + "context" + "time" +) + +type clockKey struct{} + +// Context returns a copy of parent in which the Clock is associated with. +func Context(parent context.Context, c Clock) context.Context { + return context.WithValue(parent, clockKey{}, c) +} + +// FromContext returns the Clock associated with the context, or Realtime(). +func FromContext(ctx context.Context) Clock { + if c, ok := ctx.Value(clockKey{}).(Clock); ok { + return c + } + return Realtime() +} + +// After is a convenience wrapper for FromContext(ctx).After. +func After(ctx context.Context, d time.Duration) <-chan time.Time { + return FromContext(ctx).After(d) +} + +// AfterFunc is a convenience wrapper for FromContext(ctx).AfterFunc. +func AfterFunc(ctx context.Context, d time.Duration, f func()) *Timer { + return FromContext(ctx).AfterFunc(d, f) +} + +// NewTicker is a convenience wrapper for FromContext(ctx).NewTicker. +func NewTicker(ctx context.Context, d time.Duration) *Ticker { + return FromContext(ctx).NewTicker(d) +} + +// NewTimer is a convenience wrapper for FromContext(ctx).NewTimer. +func NewTimer(ctx context.Context, d time.Duration) *Timer { + return FromContext(ctx).NewTimer(d) +} + +// Now is a convenience wrapper for FromContext(ctx).Now. +func Now(ctx context.Context) time.Time { + return FromContext(ctx).Now() +} + +// Since is a convenience wrapper for FromContext(ctx).Since. +func Since(ctx context.Context, t time.Time) time.Duration { + return FromContext(ctx).Since(t) +} + +// Sleep is a convenience wrapper for FromContext(ctx).Sleep. +func Sleep(ctx context.Context, d time.Duration) { + FromContext(ctx).Sleep(d) +} + +// Tick is a convenience wrapper for FromContext(ctx).Tick. +func Tick(ctx context.Context, d time.Duration) <-chan time.Time { + return FromContext(ctx).Tick(d) +} + +// Until is a convenience wrapper for FromContext(ctx).Until. +func Until(ctx context.Context, t time.Time) time.Duration { + return FromContext(ctx).Until(t) +} + +// DeadlineContext is a convenience wrapper for FromContext(ctx).DeadlineContext. +func DeadlineContext(ctx context.Context, d time.Time) (context.Context, context.CancelFunc) { + return FromContext(ctx).DeadlineContext(ctx, d) +} + +// TimeoutContext is a convenience wrapper for FromContext(ctx).TimeoutContext. +func TimeoutContext(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + return FromContext(ctx).TimeoutContext(ctx, timeout) +} diff --git a/vendor/github.com/tilinna/clock/go.mod b/vendor/github.com/tilinna/clock/go.mod new file mode 100644 index 0000000000..f9e7912a2c --- /dev/null +++ b/vendor/github.com/tilinna/clock/go.mod @@ -0,0 +1,3 @@ +module github.com/tilinna/clock + +go 1.8 diff --git a/vendor/github.com/tilinna/clock/heap.go b/vendor/github.com/tilinna/clock/heap.go new file mode 100644 index 0000000000..3daf01af8b --- /dev/null +++ b/vendor/github.com/tilinna/clock/heap.go @@ -0,0 +1,83 @@ +package clock + +import ( + "container/heap" + "time" +) + +type mockTimer struct { + deadline time.Time + fire func() time.Duration + mock *Mock + heapIndex int +} + +const removed = -1 + +func newMockTimer(m *Mock, d time.Time) *mockTimer { + return &mockTimer{ + deadline: d, + mock: m, + heapIndex: removed, + } +} + +func (t mockTimer) stopped() bool { + return t.heapIndex == removed +} + +// timerHeap implements mockTimers with a heap. +type timerHeap []*mockTimer + +func (h timerHeap) Len() int { return len(h) } + +func (h timerHeap) Less(i, j int) bool { + return h[i].deadline.Before(h[j].deadline) +} + +func (h timerHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].heapIndex = i + h[j].heapIndex = j +} + +func (h *timerHeap) Push(x interface{}) { + n := len(*h) + t := x.(*mockTimer) + t.heapIndex = n + *h = append(*h, t) +} + +func (h *timerHeap) Pop() interface{} { + old := *h + n := len(old) + t := old[n-1] + t.heapIndex = removed + *h = old[0 : n-1] + return t +} + +func (h *timerHeap) start(t *mockTimer) { + heap.Push(h, t) +} + +func (h *timerHeap) stop(t *mockTimer) { + if !t.stopped() { + heap.Remove(h, t.heapIndex) + } +} + +func (h *timerHeap) reset(t *mockTimer) { + if !t.stopped() { + heap.Fix(h, t.heapIndex) + } else { + heap.Push(h, t) + } +} + +func (h timerHeap) next() *mockTimer { + if len(h) == 0 { + return nil + } + return h[0] +} diff --git a/vendor/github.com/tilinna/clock/mock.go b/vendor/github.com/tilinna/clock/mock.go new file mode 100644 index 0000000000..7127d7dfe2 --- /dev/null +++ b/vendor/github.com/tilinna/clock/mock.go @@ -0,0 +1,177 @@ +package clock + +import ( + "context" + "sync" + "time" +) + +type mockTimers interface { + start(t *mockTimer) + stop(t *mockTimer) + reset(t *mockTimer) + next() *mockTimer +} + +// Mock implements a Clock that only moves with Add, AddNext and Set. +// +// The clock can be suspended with Lock and resumed with Unlock. +// While suspended, all attempts to use the API will block. +// +// To increase predictability, all Mock methods acquire +// and release the Mutex only once during their execution. +type Mock struct { + sync.Mutex + now time.Time + mockTimers +} + +// NewMock returns a new Mock with current time set to now. +// +// Use Realtime to get the real-time Clock. +func NewMock(now time.Time) *Mock { + return &Mock{ + now: now, + mockTimers: &timerHeap{}, + } +} + +// Add advances the current time by duration d and fires all expired timers. +// +// Returns the new current time. +// To increase predictability and speed, Tickers are ticked only once per call. +func (m *Mock) Add(d time.Duration) time.Time { + m.Lock() + defer m.Unlock() + now, _ := m.set(m.now.Add(d)) + return now +} + +// AddNext advances the current time to the next available timer deadline +// and fires all expired timers. +// +// Returns the new current time and the advanced duration. +func (m *Mock) AddNext() (time.Time, time.Duration) { + m.Lock() + defer m.Unlock() + t := m.next() + if t == nil { + return m.now, 0 + } + return m.set(t.deadline) +} + +// Set advances the current time to t and fires all expired timers. +// +// Returns the advanced duration. +// To increase predictability and speed, Tickers are ticked only once per call. +func (m *Mock) Set(t time.Time) time.Duration { + m.Lock() + defer m.Unlock() + _, d := m.set(t) + return d +} + +func (m *Mock) set(now time.Time) (time.Time, time.Duration) { + cur := m.now + for { + t := m.next() + if t == nil || t.deadline.After(now) { + m.now = now + return m.now, m.now.Sub(cur) + } + m.now = t.deadline + if d := t.fire(); d == 0 { + // Timers are always stopped. + m.stop(t) + } else { + // Ticker's next deadline is set to the first tick after the new now. + dd := (now.Sub(m.now)/d + 1) * d + t.deadline = m.now.Add(dd) + m.reset(t) + } + } +} + +// Now returns the current mocked time. +func (m *Mock) Now() time.Time { + m.Lock() + defer m.Unlock() + return m.now +} + +// Since returns the time elapsed since t. +func (m *Mock) Since(t time.Time) time.Duration { + m.Lock() + defer m.Unlock() + return m.now.Sub(t) +} + +// Until returns the duration until t. +func (m *Mock) Until(t time.Time) time.Duration { + m.Lock() + defer m.Unlock() + return t.Sub(m.now) +} + +// DeadlineContext implements Clock. +func (m *Mock) DeadlineContext(parent context.Context, d time.Time) (context.Context, context.CancelFunc) { + m.Lock() + defer m.Unlock() + return m.deadlineContext(parent, d) +} + +// TimeoutContext implements Clock. +func (m *Mock) TimeoutContext(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + m.Lock() + defer m.Unlock() + return m.deadlineContext(parent, m.now.Add(timeout)) +} + +func (m *Mock) deadlineContext(parent context.Context, deadline time.Time) (context.Context, context.CancelFunc) { + cancelCtx, cancel := context.WithCancel(Context(parent, m)) + if pd, ok := parent.Deadline(); ok && !pd.After(deadline) { + return cancelCtx, cancel + } + ctx := &mockCtx{ + Context: cancelCtx, + done: make(chan struct{}), + deadline: deadline, + } + t := m.newTimerFunc(deadline, nil) + go func() { + select { + case <-t.C: + ctx.err = context.DeadlineExceeded + case <-cancelCtx.Done(): + ctx.err = cancelCtx.Err() + defer t.Stop() + } + close(ctx.done) + }() + return ctx, cancel +} + +type mockCtx struct { + context.Context + deadline time.Time + done chan struct{} + err error +} + +func (ctx *mockCtx) Deadline() (time.Time, bool) { + return ctx.deadline, true +} + +func (ctx *mockCtx) Done() <-chan struct{} { + return ctx.done +} + +func (ctx *mockCtx) Err() error { + select { + case <-ctx.done: + return ctx.err + default: + return nil + } +} diff --git a/vendor/github.com/tilinna/clock/ticker.go b/vendor/github.com/tilinna/clock/ticker.go new file mode 100644 index 0000000000..421c049568 --- /dev/null +++ b/vendor/github.com/tilinna/clock/ticker.go @@ -0,0 +1,63 @@ +package clock + +import ( + "errors" + "time" +) + +// Ticker represents a time.Ticker. +type Ticker struct { + C <-chan time.Time + ticker *time.Ticker + *mockTimer +} + +// NewTicker returns a new Ticker containing a channel that will send the +// current time with a period specified by the duration d. +func (m *Mock) NewTicker(d time.Duration) *Ticker { + m.Lock() + defer m.Unlock() + if d <= 0 { + panic(errors.New("non-positive interval for NewTicker")) + } + return m.newTicker(d) +} + +// Tick is a convenience wrapper for NewTicker providing access to the ticking +// channel only. +func (m *Mock) Tick(d time.Duration) <-chan time.Time { + m.Lock() + defer m.Unlock() + if d <= 0 { + return nil + } + return m.newTicker(d).C +} + +func (m *Mock) newTicker(d time.Duration) *Ticker { + c := make(chan time.Time, 1) + t := &Ticker{ + C: c, + mockTimer: newMockTimer(m, m.now.Add(d)), + } + t.fire = func() time.Duration { + select { + case c <- m.now: + default: + } + return d + } + m.start(t.mockTimer) + return t +} + +// Stop turns off a ticker. After Stop, no more ticks will be sent. +func (t *Ticker) Stop() { + if t.ticker != nil { + t.ticker.Stop() + return + } + t.mock.Lock() + defer t.mock.Unlock() + t.mock.stop(t.mockTimer) +} diff --git a/vendor/github.com/tilinna/clock/timer.go b/vendor/github.com/tilinna/clock/timer.go new file mode 100644 index 0000000000..f03a127505 --- /dev/null +++ b/vendor/github.com/tilinna/clock/timer.go @@ -0,0 +1,109 @@ +package clock + +import "time" + +// Timer represents a time.Timer. +type Timer struct { + C <-chan time.Time + timer *time.Timer + *mockTimer +} + +// After waits for the duration to elapse and then sends the current time on +// the returned channel. +// +// A negative or zero duration fires the underlying timer immediately. +func (m *Mock) After(d time.Duration) <-chan time.Time { + return m.NewTimer(d).C +} + +// AfterFunc waits for the duration to elapse and then calls f in its own goroutine. +// It returns a Timer that can be used to cancel the call using its Stop method. +// +// A negative or zero duration fires the timer immediately. +func (m *Mock) AfterFunc(d time.Duration, f func()) *Timer { + m.Lock() + defer m.Unlock() + return m.newTimerFunc(m.now.Add(d), f) +} + +// NewTimer creates a new Timer that will send the current time on its channel +// after at least duration d. +// +// A negative or zero duration fires the timer immediately. +func (m *Mock) NewTimer(d time.Duration) *Timer { + m.Lock() + defer m.Unlock() + return m.newTimerFunc(m.now.Add(d), nil) +} + +// Sleep pauses the current goroutine for at least the duration d. +// +// A negative or zero duration causes Sleep to return immediately. +func (m *Mock) Sleep(d time.Duration) { + <-m.After(d) +} + +func (m *Mock) newTimerFunc(deadline time.Time, afterFunc func()) *Timer { + t := &Timer{ + mockTimer: newMockTimer(m, deadline), + } + if afterFunc != nil { + t.fire = func() time.Duration { + go afterFunc() + return 0 + } + } else { + c := make(chan time.Time, 1) + t.C = c + t.fire = func() time.Duration { + select { + case c <- m.now: + default: + } + return 0 + } + } + if !t.deadline.After(m.now) { + t.fire() + } else { + m.start(t.mockTimer) + } + return t +} + +// Stop prevents the Timer from firing. +// It returns true if the call stops the timer, false if the timer has already +// expired or been stopped. +func (t *Timer) Stop() bool { + if t.timer != nil { + return t.timer.Stop() + } + t.mock.Lock() + defer t.mock.Unlock() + wasActive := !t.mockTimer.stopped() + t.mock.stop(t.mockTimer) + return wasActive +} + +// Reset changes the timer to expire after duration d. +// It returns true if the timer had been active, false if the timer had +// expired or been stopped. +// +// A negative or zero duration fires the timer immediately. +func (t *Timer) Reset(d time.Duration) bool { + if t.timer != nil { + return t.timer.Reset(d) + } + t.mock.Lock() + defer t.mock.Unlock() + wasActive := !t.mockTimer.stopped() + t.deadline = t.mock.now.Add(d) + if !t.deadline.After(t.mock.now) { + t.fire() + t.mock.stop(t.mockTimer) + } else { + t.mock.reset(t.mockTimer) + } + return wasActive +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 1d8ee48661..c95558cc6f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -328,6 +328,8 @@ github.com/syndtr/goleveldb/leveldb/filter github.com/syndtr/goleveldb/leveldb/journal github.com/syndtr/goleveldb/leveldb/memdb github.com/syndtr/goleveldb/leveldb/table +# github.com/tilinna/clock v1.0.2 +github.com/tilinna/clock # github.com/tyler-smith/go-bip39 v0.0.0-20181017060643-dbb3b84ba2ef github.com/tyler-smith/go-bip39 github.com/tyler-smith/go-bip39/wordlists