Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4bb4cb0
p2p/discv5: implement talkRequest
zsfelfoldi Nov 14, 2019
2d11d74
les: implement clientPool.balanceMissing
zsfelfoldi Nov 15, 2019
2b96b4f
les: add talk request handler
zsfelfoldi Nov 15, 2019
95a05d0
p2p/discv5: talkRequest works
zsfelfoldi Nov 20, 2019
3354df8
les: deduplicated client priority checking logic
zsfelfoldi Nov 21, 2019
923fc54
les: implement token sale
zsfelfoldi Nov 23, 2019
1ddc30d
les: add tokensale.go
zsfelfoldi Nov 24, 2019
6ac3b2b
les: lespay added to les/4
zsfelfoldi Nov 24, 2019
ab4a4af
les: implement lespay client side
zsfelfoldi Nov 24, 2019
be4fb57
les: fixed nexClientPool balance iterator
zsfelfoldi Nov 25, 2019
db4bc52
les: lespay API
zsfelfoldi Nov 25, 2019
94bf8de
les: lespay API
zsfelfoldi Nov 27, 2019
8c9f0eb
les: added tokensale info call
zsfelfoldi Nov 27, 2019
9c7e717
les: use single command format for lespay/reply
zsfelfoldi Nov 27, 2019
91b69d0
les: lespay rate limiting
zsfelfoldi Dec 8, 2019
936328f
p2p/discv5: fixed talkResponse send condition
zsfelfoldi Dec 8, 2019
c52605c
les: more lespay API calls
zsfelfoldi Dec 16, 2019
21f760b
les: using stateFeedback instead of BV
zsfelfoldi Dec 17, 2019
0fc06ea
les: implement inactive peer state
zsfelfoldi Dec 20, 2019
6f1abbf
les: fixed peerSet.Disconnect
zsfelfoldi Dec 26, 2019
c0b6ff2
les: drop inactive peers
zsfelfoldi Dec 26, 2019
848683a
1
zsfelfoldi Dec 27, 2019
5486e28
go.sum
zsfelfoldi Jan 3, 2020
d338384
les: fixed deadlock
zsfelfoldi Jan 3, 2020
6ba9c5a
les: missing API functions
zsfelfoldi Jan 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ github.com/influxdata/influxdb v1.2.3-0.20180221223340-01288bdb0883 h1:FSeK4fZCo
github.com/influxdata/influxdb v1.2.3-0.20180221223340-01288bdb0883/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY=
github.com/jackpal/go-nat-pmp v1.0.2-0.20160603034137-1fa385a6f458 h1:6OvNmYgJyexcZ3pYbTI9jWx5tHo1Dee/tWbLMfPe2TA=
github.com/jackpal/go-nat-pmp v1.0.2-0.20160603034137-1fa385a6f458/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/julienschmidt/httprouter v1.1.1-0.20170430222011-975b5c4c7c21 h1:F/iKcka0K2LgnKy/fgSBf235AETtm1n1TvBzqu40LE0=
github.com/julienschmidt/httprouter v1.1.1-0.20170430222011-975b5c4c7c21/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
Expand Down
43 changes: 43 additions & 0 deletions internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var Modules = map[string]string{
"swarmfs": SwarmfsJs,
"txpool": TxpoolJs,
"les": LESJs,
"lespay": LESPAYJs,
}

const ChequebookJs = `
Expand Down Expand Up @@ -856,3 +857,45 @@ web3._extend({
]
});
`

const LESPAYJs = `
web3._extend({
property: 'lespay',
methods:
[
new web3._extend.Method({
name: 'connection',
call: 'lespay_connection',
params: 6
}),
new web3._extend.Method({
name: 'deposit',
call: 'lespay_deposit',
params: 4
}),
new web3._extend.Method({
name: 'buyTokens',
call: 'lespay_buyTokens',
params: 6
}),
new web3._extend.Method({
name: 'getBalance',
call: 'lespay_getBalance',
params: 2
}),
new web3._extend.Method({
name: 'info',
call: 'lespay_info',
params: 2
}),
new web3._extend.Method({
name: 'remoteInfo',
call: 'lespay_remoteInfo',
params: 3
}),
],
properties:
[
]
});
`
174 changes: 169 additions & 5 deletions les/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
package les

import (
"context"
"errors"
"fmt"
"math"
"reflect"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
)

var (
Expand All @@ -35,8 +38,6 @@ var (
errNoPriority = errors.New("priority too low to raise capacity")
)

const maxBalance = math.MaxInt64

// PrivateLightServerAPI provides an API to access the LES light server.
type PrivateLightServerAPI struct {
server *LesServer
Expand Down Expand Up @@ -150,7 +151,7 @@ func (api *PrivateLightServerAPI) setParams(params map[string]interface{}, clien
setFactor(&negFactors.requestFactor)
case !defParams && name == "capacity":
if capacity, ok := value.(float64); ok && uint64(capacity) >= api.server.minCapacity {
err = api.server.clientPool.setCapacity(client, uint64(capacity))
_, _, err = api.server.clientPool.setCapacity(client.id, client.freeID, uint64(capacity), 0, true)
// Don't have to call factor update explicitly. It's already done
// in setCapacity function.
} else {
Expand Down Expand Up @@ -184,7 +185,7 @@ func (api *PrivateLightServerAPI) SetClientParams(ids []enode.ID, params map[str
if client != nil {
update, err := api.setParams(params, client, nil, nil)
if update {
client.updatePriceFactors()
updatePriceFactors(&client.balanceTracker, client.posFactors, client.negFactors, client.capacity)
}
return err
} else {
Expand Down Expand Up @@ -352,3 +353,166 @@ func (api *PrivateLightAPI) GetCheckpointContractAddress() (string, error) {
}
return api.backend.oracle.config.Address.Hex(), nil
}

type PrivateLespayAPI struct {
peerSet *peerSet
clientHandler *clientHandler
dht *discv5.Network
tokenSale *tokenSale
}

// NewPrivateLespayAPI creates a new LESPAY API.
func NewPrivateLespayAPI(peerSet *peerSet, clientHandler *clientHandler, dht *discv5.Network, tokenSale *tokenSale) *PrivateLespayAPI {
return &PrivateLespayAPI{
peerSet: peerSet,
clientHandler: clientHandler,
dht: dht,
tokenSale: tokenSale,
}
}

func (api *PrivateLespayAPI) makeCall(ctx context.Context, remote bool, nodeStr string, cmd []byte) ([]byte, error) {
var (
id enode.ID
freeID string
peer *peer
node *enode.Node
err error
)
if nodeStr != "" {
if id, err = enode.ParseID(nodeStr); err == nil {
if peer = api.peerSet.Peer(peerIdToString(id)); peer == nil {
return nil, errors.New("peer not connected")
}
freeID = peer.freeClientId()
} else {
var err error
if node, err = enode.Parse(enode.ValidSchemes, nodeStr); err == nil {
id = node.ID()
freeID = node.IP().String()
} else {
return nil, err
}
}
}

if remote {
var (
reply []byte
cancelFn func() bool
)
delivered := make(chan struct{})
if peer != nil {
// remote call to a connected peer through LES
if api.clientHandler == nil {
return nil, errors.New("client handler not available")
}
cancelFn = api.clientHandler.makeLespayCall(peer, cmd, func(r []byte) bool {
reply = r
close(delivered)
return reply != nil
})
} else {
// remote call through UDP TALK
if api.dht == nil {
return nil, errors.New("UDP DHT not available")
}
cancelFn = api.dht.SendTalkRequest(node, "lespay", [][]byte{cmd}, func(payload interface{}) bool {
fmt.Println("dht delivered", payload, reflect.TypeOf(payload))
if replies, ok := payload.([]interface{}); ok && len(replies) == 1 {
reply, ok = replies[0].([]byte)
}
close(delivered)
return reply != nil
})
}
select {
case <-time.After(time.Second * 5):
cancelFn()
return nil, errors.New("timeout")
case <-ctx.Done():
cancelFn()
return nil, ctx.Err()
case <-delivered:
if len(reply) == 0 {
return nil, errors.New("unknown command")
}
return reply, nil
}
} else {
if api.tokenSale == nil {
return nil, errors.New("token sale module not available")
}
// execute call locally
return api.tokenSale.runCommand(cmd, id, freeID), nil
}

}

func (api *PrivateLespayAPI) Connection(ctx context.Context, remote bool, node string, requestedCapacity, stayConnected uint64, paymentModule []string, setCap bool) (results tsConnectionResults, err error) {
params := tsConnectionParams{requestedCapacity, stayConnected, paymentModule, setCap}
enc, _ := rlp.EncodeToBytes(&params)
var resEnc []byte
resEnc, err = api.makeCall(ctx, remote, node, append([]byte{tsConnection}, enc...))
if err != nil {
return
}
err = rlp.DecodeBytes(resEnc, &results)
return
}

func (api *PrivateLespayAPI) Deposit(ctx context.Context, remote bool, node string, paymentModule string, proofOfPayment []byte) (results tsDepositResults, err error) {
params := tsDepositParams{paymentModule, proofOfPayment}
enc, _ := rlp.EncodeToBytes(&params)
var resEnc []byte
resEnc, err = api.makeCall(ctx, remote, node, append([]byte{tsDeposit}, enc...))
if err != nil {
return
}
err = rlp.DecodeBytes(resEnc, &results)
return
}

func (api *PrivateLespayAPI) BuyTokens(ctx context.Context, remote bool, node string, maxSpend, minReceive uint64, relative, spendAll bool) (results tsBuyTokensResults, err error) {
params := tsBuyTokensParams{maxSpend, minReceive, relative, spendAll}
enc, _ := rlp.EncodeToBytes(&params)
var resEnc []byte
resEnc, err = api.makeCall(ctx, remote, node, append([]byte{tsBuyTokens}, enc...))
if err != nil {
return
}
err = rlp.DecodeBytes(resEnc, &results)
return
}

func (api *PrivateLespayAPI) GetBalance(ctx context.Context, remote bool, node string) (results tsGetBalanceResults, err error) {
var resEnc []byte
resEnc, err = api.makeCall(ctx, remote, node, []byte{tsGetBalance})
if err != nil {
return
}
err = rlp.DecodeBytes(resEnc, &results)
return
}

func (api *PrivateLespayAPI) Info(ctx context.Context, remote bool, node string) (results tsInfoResults, err error) {
var resEnc []byte
resEnc, err = api.makeCall(ctx, remote, node, []byte{tsInfo})
if err != nil {
return
}
err = rlp.DecodeBytes(resEnc, &results)
return
}

func (api *PrivateLespayAPI) ReceiverInfo(ctx context.Context, remote bool, node string, receiverIDs []string) (results tsReceiverInfoResults, err error) {
params := tsReceiverInfoParams(receiverIDs)
enc, _ := rlp.EncodeToBytes(&params)
var resEnc []byte
resEnc, err = api.makeCall(ctx, remote, node, append([]byte{tsReceiverInfo}, enc...))
if err != nil {
return
}
err = rlp.DecodeBytes(resEnc, &results)
return
}
39 changes: 35 additions & 4 deletions les/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
package les

import (
"math"
"sync"
"time"

"github.com/ethereum/go-ethereum/common/mclock"
)

const maxBalance = math.MaxInt64

const (
balanceCallbackQueue = iota
balanceCallbackZero
Expand Down Expand Up @@ -65,6 +68,7 @@ type balanceCallback struct {
}

// init initializes balanceTracker
// Note: capacity should never be zero
func (bt *balanceTracker) init(clock mclock.Clock, capacity uint64) {
bt.clock = clock
bt.initTime, bt.lastUpdate = clock.Now(), clock.Now() // Init timestamps
Expand Down Expand Up @@ -96,11 +100,36 @@ func (bt *balanceTracker) stop(now mclock.AbsTime) {
// balance is zero then negative balance translates to a positive priority.
func (bt *balanceTracker) balanceToPriority(b balance) int64 {
if b.pos > 0 {
return ^int64(b.pos / bt.capacity)
return -int64(b.pos / bt.capacity)
}
return int64(b.neg)
}

func (bt *balanceTracker) posBalanceMissing(targetPriority int64, targetCapacity uint64, after time.Duration) uint64 {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the function description and relative unit tests.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func (bt *balanceTracker) budget(targetPriority int64, targetCapacity uint64, after time.Duration)  uint64 {
	current := bt.balanceToPriority(bt.balance)
	if targetPriority >= current {
		if bt.negTimeFactor == 0 {
			return 0
		}
		diff :=  time.Duration(float64(targetPriority - current)/bt.negTimeFactor)
		if diff >= after {
			return 0
		}
		after -= diff
	}
	// We have to cover the connection expense by positive balance.
	if targetPriority >= 0 {
		targetPriority = -1
	}
	budget := uint64(float64(^targetPriority)*float64(targetCapacity) + float64(after)*bt.timeFactor)
	if budget >= maxBalance {
		return math.MaxUint64 // target not reachable
	}
	if budget > bt.balance.pos {
		return budget - bt.balance.pos
	}
	return 0
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just open a question here. So we can only guarantee the idle connection by the budget. However request can also cost a lot. TBH I think request cost should be much higher than connection cost.

So should we consider the estimated request cost when we calculate the budget? Although we can never have a correct number.

if targetPriority > 0 {
negPrice := uint64(float64(after) * bt.negTimeFactor)
if negPrice+bt.balance.neg < uint64(targetPriority) {
return 0
}
if uint64(targetPriority) > bt.balance.neg && bt.negTimeFactor > 1e-100 {
if negTime := time.Duration(float64(uint64(targetPriority)-bt.balance.neg) / bt.negTimeFactor); negTime < after {
after -= negTime
} else {
after = 0
}
}
targetPriority = 0
}
posRequired := uint64(float64(-targetPriority)*float64(targetCapacity)+float64(after)*bt.timeFactor) + 1
if posRequired >= maxBalance {
return math.MaxUint64 // target not reachable
}
if posRequired > bt.balance.pos {
return posRequired - bt.balance.pos
}
return 0
}

// reducedBalance estimates the reduced balance at a given time in the fututre based
// on the current balance, the time factor and an estimated average request cost per time ratio
func (bt *balanceTracker) reducedBalance(at mclock.AbsTime, avgReqCost float64) balance {
Expand Down Expand Up @@ -136,7 +165,7 @@ func (bt *balanceTracker) timeUntil(priority int64) (time.Duration, bool) {
return 0, false
}
if priority < 0 {
newBalance := uint64(^priority) * bt.capacity
newBalance := uint64(-priority) * bt.capacity
if newBalance > bt.balance.pos {
return 0, false
}
Expand All @@ -161,6 +190,7 @@ func (bt *balanceTracker) timeUntil(priority int64) (time.Duration, bool) {
}

// setCapacity updates the capacity value used for priority calculation
// Note: capacity should never be zero
func (bt *balanceTracker) setCapacity(capacity uint64) {
bt.lock.Lock()
defer bt.lock.Unlock()
Expand Down Expand Up @@ -262,12 +292,12 @@ func (bt *balanceTracker) updateAfter(dt time.Duration) {
}

// requestCost should be called after serving a request for the given peer
func (bt *balanceTracker) requestCost(cost uint64) {
func (bt *balanceTracker) requestCost(cost uint64) uint64 {
bt.lock.Lock()
defer bt.lock.Unlock()

if bt.stopped {
return
return 0
}
now := bt.clock.Now()
bt.addBalance(now)
Expand Down Expand Up @@ -295,6 +325,7 @@ func (bt *balanceTracker) requestCost(cost uint64) {
}
}
bt.sumReqCost += cost
return bt.balance.pos
}

// getBalance returns the current positive and negative balance
Expand Down
Loading