Skip to content

Commit 8599e8e

Browse files
committed
Wip p2p enhancements
1 parent 9280060 commit 8599e8e

File tree

6 files changed

+187
-44
lines changed

6 files changed

+187
-44
lines changed

.github/release.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ changelog:
1313
labels:
1414
- bug
1515
- regression
16+
- title: "🖧 P2P area"
17+
labels:
18+
- area/p2p
1619
- title: Exciting New Features 🎉
1720
labels:
1821
- Semver-Minor

core/cli/worker/worker_p2p.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020

2121
type P2P struct {
2222
WorkerFlags `embed:""`
23-
Token string `env:"LOCALAI_TOKEN,TOKEN" help:"JSON list of galleries"`
23+
Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
2424
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
2525
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
2626
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`

core/p2p/node.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package p2p
2+
3+
import (
4+
"sync"
5+
"time"
6+
)
7+
8+
type NodeData struct {
9+
Name string
10+
ID string
11+
TunnelAddress string
12+
LastSeen time.Time
13+
}
14+
15+
func (d NodeData) IsOnline() bool {
16+
now := time.Now()
17+
// if the node was seen in the last 40 seconds, it's online
18+
return now.Sub(d.LastSeen) < 40*time.Second
19+
}
20+
21+
var mu sync.Mutex
22+
var nodes = map[string]NodeData{}
23+
24+
func GetAvailableNodes() []NodeData {
25+
mu.Lock()
26+
defer mu.Unlock()
27+
var availableNodes = []NodeData{}
28+
for _, v := range nodes {
29+
availableNodes = append(availableNodes, v)
30+
}
31+
return availableNodes
32+
}
33+
34+
func AddNode(node NodeData) {
35+
mu.Lock()
36+
defer mu.Unlock()
37+
nodes[node.ID] = node
38+
}

core/p2p/p2p.go

Lines changed: 100 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,18 @@ import (
1111
"net"
1212
"os"
1313
"strings"
14+
"sync"
1415
"time"
1516

17+
"github.com/ipfs/go-log"
1618
"github.com/libp2p/go-libp2p/core/peer"
1719
"github.com/mudler/LocalAI/pkg/utils"
20+
"github.com/mudler/edgevpn/pkg/config"
1821
"github.com/mudler/edgevpn/pkg/node"
1922
"github.com/mudler/edgevpn/pkg/protocol"
23+
"github.com/mudler/edgevpn/pkg/services"
2024
"github.com/mudler/edgevpn/pkg/types"
2125
"github.com/phayes/freeport"
22-
23-
"github.com/ipfs/go-log"
24-
"github.com/mudler/edgevpn/pkg/config"
25-
"github.com/mudler/edgevpn/pkg/services"
2626
zlog "github.com/rs/zerolog/log"
2727

2828
"github.com/mudler/edgevpn/pkg/logger"
@@ -34,6 +34,15 @@ func GenerateToken() string {
3434
return newData.Base64()
3535
}
3636

37+
func IsP2PEnabled() bool {
38+
return true
39+
}
40+
41+
func nodeID() string {
42+
hostname, _ := os.Hostname()
43+
return hostname
44+
}
45+
3746
func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, service string) error {
3847

3948
zlog.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr)
@@ -53,16 +62,16 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv
5362
10*time.Second,
5463
func() {
5564
// Retrieve current ID for ip in the blockchain
56-
_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
65+
//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
5766
// If mismatch, update the blockchain
58-
if !found {
59-
updatedMap := map[string]interface{}{}
60-
updatedMap[node.Host().ID().String()] = &types.User{
61-
PeerID: node.Host().ID().String(),
62-
Timestamp: time.Now().String(),
63-
}
64-
ledger.Add(protocol.UsersLedgerKey, updatedMap)
67+
//if !found {
68+
updatedMap := map[string]interface{}{}
69+
updatedMap[node.Host().ID().String()] = &types.User{
70+
PeerID: node.Host().ID().String(),
71+
Timestamp: time.Now().String(),
6572
}
73+
ledger.Add(protocol.UsersLedgerKey, updatedMap)
74+
// }
6675
},
6776
)
6877

@@ -142,28 +151,41 @@ func LLamaCPPRPCServerDiscoverer(ctx context.Context, token string) error {
142151
if err != nil {
143152
return err
144153
}
145-
154+
// TODO: discoveryTunnels should return all the nodes that are available?
155+
// In this way we updated availableNodes here instead of appending
156+
// e.g. we have a LastSeen field in NodeData that is updated in discoveryTunnels
157+
// each time the node is seen
158+
// In this case the below function should be idempotent and just keep track of the nodes
146159
go func() {
147-
totalTunnels := []string{}
148160
for {
149161
select {
150162
case <-ctx.Done():
151163
zlog.Error().Msg("Discoverer stopped")
152164
return
153165
case tunnel := <-tunnels:
166+
AddNode(tunnel)
154167

155-
totalTunnels = append(totalTunnels, tunnel)
156-
os.Setenv("LLAMACPP_GRPC_SERVERS", strings.Join(totalTunnels, ","))
157-
zlog.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", strings.Join(totalTunnels, ","))
168+
var tunnelAddresses []string
169+
for _, v := range nodes {
170+
if v.IsOnline() {
171+
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
172+
}
173+
}
174+
tunnelEnvVar := strings.Join(tunnelAddresses, ",")
175+
176+
os.Setenv("LLAMACPP_GRPC_SERVERS", tunnelEnvVar)
177+
zlog.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", tunnelEnvVar)
178+
179+
zlog.Info().Msgf("Node %s available", tunnel.ID)
158180
}
159181
}
160182
}()
161183

162184
return nil
163185
}
164186

165-
func discoveryTunnels(ctx context.Context, token string) (chan string, error) {
166-
tunnels := make(chan string)
187+
func discoveryTunnels(ctx context.Context, token string) (chan NodeData, error) {
188+
tunnels := make(chan NodeData)
167189

168190
nodeOpts, err := newNodeOpts(token)
169191
if err != nil {
@@ -184,8 +206,14 @@ func discoveryTunnels(ctx context.Context, token string) (chan string, error) {
184206
}
185207

186208
// get new services, allocate and return to the channel
209+
210+
// TODO:
211+
// a function ensureServices that:
212+
// - starts a service if not started, if the worker is Online
213+
// - checks that workers are Online, if not cancel the context of allocateLocalService
214+
// - discoveryTunnels should return all the nodes and addresses associated with it
215+
// - the caller should take now care of the fact that we are always returning fresh informations
187216
go func() {
188-
emitted := map[string]bool{}
189217
for {
190218
select {
191219
case <-ctx.Done():
@@ -196,19 +224,15 @@ func discoveryTunnels(ctx context.Context, token string) (chan string, error) {
196224
zlog.Debug().Msg("Searching for workers")
197225

198226
data := ledger.LastBlock().Storage["services_localai"]
199-
for k := range data {
227+
for k, v := range data {
200228
zlog.Info().Msgf("Found worker %s", k)
201-
if _, found := emitted[k]; !found {
202-
emitted[k] = true
203-
//discoveredPeers <- k
204-
port, err := freeport.GetFreePort()
205-
if err != nil {
206-
fmt.Print(err)
207-
}
208-
tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port)
209-
go allocateLocalService(ctx, n, tunnelAddress, k)
210-
tunnels <- tunnelAddress
229+
nd := &NodeData{}
230+
if err := v.Unmarshal(nd); err != nil {
231+
zlog.Error().Msg("cannot unmarshal node data")
232+
continue
211233
}
234+
ensureService(ctx, n, nd, k)
235+
tunnels <- *nd
212236
}
213237
}
214238
}
@@ -217,6 +241,41 @@ func discoveryTunnels(ctx context.Context, token string) (chan string, error) {
217241
return tunnels, err
218242
}
219243

244+
type nodeServiceData struct {
245+
NodeData NodeData
246+
CancelFunc context.CancelFunc
247+
}
248+
249+
var service = map[string]nodeServiceData{}
250+
var muservice sync.Mutex
251+
252+
func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string) {
253+
muservice.Lock()
254+
defer muservice.Unlock()
255+
if ndService, found := service[nd.Name]; !found {
256+
newCtxm, cancel := context.WithCancel(ctx)
257+
service[nd.Name] = nodeServiceData{
258+
NodeData: *nd,
259+
CancelFunc: cancel,
260+
}
261+
// Start the service
262+
port, err := freeport.GetFreePort()
263+
if err != nil {
264+
fmt.Print(err)
265+
}
266+
tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port)
267+
nd.TunnelAddress = tunnelAddress
268+
go allocateLocalService(newCtxm, n, tunnelAddress, sserv)
269+
} else {
270+
// Check if the service is still alive
271+
// if not cancel the context
272+
if !ndService.NodeData.IsOnline() {
273+
ndService.CancelFunc()
274+
delete(service, nd.Name)
275+
}
276+
}
277+
}
278+
220279
// This is the P2P worker main
221280
func BindLLamaCPPWorker(ctx context.Context, host, port, token string) error {
222281
llger := logger.New(log.LevelFatal)
@@ -248,16 +307,20 @@ func BindLLamaCPPWorker(ctx context.Context, host, port, token string) error {
248307

249308
ledger.Announce(
250309
ctx,
251-
10*time.Second,
310+
20*time.Second,
252311
func() {
253312
// Retrieve current ID for ip in the blockchain
254-
_, found := ledger.GetKey("services_localai", name)
313+
//_, found := ledger.GetKey("services_localai", name)
255314
// If mismatch, update the blockchain
256-
if !found {
257-
updatedMap := map[string]interface{}{}
258-
updatedMap[name] = "p2p"
259-
ledger.Add("services_localai", updatedMap)
315+
//if !found {
316+
updatedMap := map[string]interface{}{}
317+
updatedMap[name] = &NodeData{
318+
Name: name,
319+
LastSeen: time.Now(),
320+
ID: nodeID(),
260321
}
322+
ledger.Add("services_localai", updatedMap)
323+
// }
261324
},
262325
)
263326

core/p2p/p2p_disabled.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,11 @@ func LLamaCPPRPCServerDiscoverer(ctx context.Context, token string) error {
1919
func BindLLamaCPPWorker(ctx context.Context, host, port, token string) error {
2020
return fmt.Errorf("not implemented")
2121
}
22+
23+
func GetAvailableNodes() []NodeData {
24+
return []NodeData{}
25+
}
26+
27+
func IsP2PEnabled() bool {
28+
return false
29+
}

0 commit comments

Comments
 (0)