Skip to content

Commit

Permalink
Write providers to disk to avoid memory leaks
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <[email protected]>
  • Loading branch information
whyrusleeping committed Jun 16, 2016
1 parent a4801ed commit d3a2034
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 25 deletions.
2 changes: 1 addition & 1 deletion routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Datastore) *IpfsDHT {
dht.ctx = ctx

h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
dht.providers = NewProviderManager(dht.ctx, dht.self)
dht.providers = NewProviderManager(dht.ctx, dht.self, dstore)
dht.proc.AddChild(dht.providers.proc)
goprocessctx.CloseAfterContext(dht.proc, ctx)

Expand Down
230 changes: 212 additions & 18 deletions routing/dht/providers.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
package dht

import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
"time"

key "github.com/ipfs/go-ipfs/blocks/key"
peer "gx/ipfs/QmQGwpJy9P4yXZySmqkZEXCmbBpJUb8xntCv8Ca4taZwDC/go-libp2p-peer"
goprocess "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
ds "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore"
dsq "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore/query"

context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)

var lruCacheSize = 256
var ProvideValidity = time.Hour * 24
var defaultCleanupInterval = time.Hour

type ProviderManager struct {
// all non channel fields are meant to be accessed only within
// the run method
providers map[key.Key]*providerSet
providers *lru.Cache
local map[key.Key]struct{}
lpeer peer.ID
dstore ds.Datastore

getlocal chan chan []key.Key
newprovs chan *addProv
Expand All @@ -45,11 +55,17 @@ type getProv struct {
resp chan []peer.ID
}

func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager {
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Datastore) *ProviderManager {
pm := new(ProviderManager)
pm.getprovs = make(chan *getProv)
pm.newprovs = make(chan *addProv)
pm.providers = make(map[key.Key]*providerSet)
pm.dstore = dstore
cache, err := lru.New(lruCacheSize)
if err != nil {
panic(err) //only happens if negative value is passed to lru constructor
}
pm.providers = cache

pm.getlocal = make(chan chan []key.Key)
pm.local = make(map[key.Key]struct{})
pm.proc = goprocessctx.WithContext(ctx)
Expand All @@ -59,6 +75,106 @@ func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager {
return pm
}

const providersKeyPrefix = "/providers/"

func mkProvKey(k key.Key) ds.Key {
return ds.NewKey(providersKeyPrefix + string(k))
}

func (pm *ProviderManager) getProvs(k key.Key) ([]peer.ID, error) {
pset, err := pm.getPset(k)
if err != nil {
return nil, err
}
return pset.providers, nil
}

func (pm *ProviderManager) getPset(k key.Key) (*providerSet, error) {
iprovs, ok := pm.providers.Get(k)
if ok {
provs := iprovs.(*providerSet)
return provs, nil
}
pset, err := pm.loadProvSet(k)
if err != nil {
return nil, err
}
pm.providers.Add(k, pset)
return pset, nil
}

func (pm *ProviderManager) addProv(k key.Key, p peer.ID) error {
iprovs, ok := pm.providers.Get(k)
if !ok {
iprovs = newProviderSet()
pm.providers.Add(k, iprovs)
}
provs := iprovs.(*providerSet)
provs.Add(p)

return pm.storeProvSet(k, provs)
}

func (pm *ProviderManager) storeProvSet(k key.Key, pset *providerSet) error {
buf := new(bytes.Buffer)
_, err := pset.WriteTo(buf)
if err != nil {
return err
}

return pm.dstore.Put(mkProvKey(k), buf.Bytes())
}

func (pm *ProviderManager) loadProvSet(k key.Key) (*providerSet, error) {
val, err := pm.dstore.Get(mkProvKey(k))
if err != nil {
return nil, err
}

valb, ok := val.([]byte)
if !ok {
log.Errorf("value for providers set was not bytes. (instead got %#v)", val)
return nil, fmt.Errorf("value was not bytes!")
}

pset, err := psetFromReader(bytes.NewReader(valb))
if err != nil {
return nil, err
}

return pset, nil
}

func (pm *ProviderManager) deleteProvSet(k key.Key) error {
pm.providers.Remove(k)

return pm.dstore.Delete(mkProvKey(k))
}

func (pm *ProviderManager) getAllProvKeys() ([]key.Key, error) {
res, err := pm.dstore.Query(dsq.Query{
KeysOnly: true,
Filters: []dsq.Filter{dsq.FilterKeyPrefix{Prefix: providersKeyPrefix}},
})

if err != nil {
return nil, err
}

entries, err := res.Rest()
if err != nil {
return nil, err
}

out := make([]key.Key, 0, len(entries))
for _, e := range entries {
prov := key.Key(e.Key[len(providersKeyPrefix):])
out = append(out, prov)
}

return out, nil
}

func (pm *ProviderManager) run() {
tick := time.NewTicker(pm.cleanupInterval)
for {
Expand All @@ -67,22 +183,17 @@ func (pm *ProviderManager) run() {
if np.val == pm.lpeer {
pm.local[np.k] = struct{}{}
}
provs, ok := pm.providers[np.k]
if !ok {
provs = newProviderSet()
pm.providers[np.k] = provs
err := pm.addProv(np.k, np.val)
if err != nil {
log.Error("error adding new providers: ", err)
}
provs.Add(np.val)

case gp := <-pm.getprovs:
var parr []peer.ID
provs, ok := pm.providers[gp.k]
if ok {
parr = provs.providers
provs, err := pm.getProvs(gp.k)
if err != nil && err != ds.ErrNotFound {
log.Error("error reading providers: ", err)
}

gp.resp <- parr

gp.resp <- provs
case lc := <-pm.getlocal:
var keys []key.Key
for k := range pm.local {
Expand All @@ -91,7 +202,17 @@ func (pm *ProviderManager) run() {
lc <- keys

case <-tick.C:
for k, provs := range pm.providers {
keys, err := pm.getAllProvKeys()
if err != nil {
log.Error("Error loading provider keys: ", err)
continue
}
for _, k := range keys {
provs, err := pm.getPset(k)
if err != nil {
log.Error("error loading known provset: ", err)
continue
}
var filtered []peer.ID
for p, t := range provs.set {
if time.Now().Sub(t) > ProvideValidity {
Expand All @@ -104,10 +225,12 @@ func (pm *ProviderManager) run() {
if len(filtered) > 0 {
provs.providers = filtered
} else {
delete(pm.providers, k)
err := pm.deleteProvSet(k)
if err != nil {
log.Error("error deleting provider set: ", err)
}
}
}

case <-pm.proc.Closing():
return
}
Expand Down Expand Up @@ -163,3 +286,74 @@ func (ps *providerSet) Add(p peer.ID) {

ps.set[p] = time.Now()
}

func psetFromReader(r io.Reader) (*providerSet, error) {
br, ok := r.(io.ByteReader)
if !ok {
bufr := bufio.NewReader(r)
br = bufr
r = bufr
}

out := newProviderSet()
buf := make([]byte, 128)
for {
v, err := binary.ReadUvarint(br)
if err != nil {
if err == io.EOF {
return out, nil
}
return nil, err
}

_, err = io.ReadFull(r, buf[:v])
if err != nil {
if err == io.EOF {
return out, nil
}
return nil, err
}

pid := peer.ID(buf[:v])

tnsec, err := binary.ReadVarint(br)
if err != nil {
if err == io.EOF {
return out, nil
}
return nil, err
}

t := time.Unix(0, tnsec)
out.set[pid] = t
out.providers = append(out.providers, pid)
}
}

func (ps *providerSet) WriteTo(w io.Writer) (int64, error) {
var written int64
buf := make([]byte, 16)
for _, p := range ps.providers {
t := ps.set[p]
id := []byte(p)
n := binary.PutUvarint(buf, uint64(len(id)))
_, err := w.Write(buf[:n])
if err != nil {
return written, err
}
written += int64(n)
n, err = w.Write(id)
if err != nil {
return written, err
}

written += int64(n)
n = binary.PutVarint(buf, t.UnixNano())
_, err = w.Write(buf[:n])
if err != nil {
return written, err
}
}

return written, nil
}
Loading

0 comments on commit d3a2034

Please sign in to comment.