Skip to content

Commit

Permalink
feat(explorer): make possible to run sync in a separate process (#3224)
Browse files Browse the repository at this point in the history
Signed-off-by: Ettore Di Giacinto <[email protected]>
  • Loading branch information
mudler authored Aug 12, 2024
1 parent 4dfa085 commit 9729d2a
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 67 deletions.
19 changes: 16 additions & 3 deletions core/cli/explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ type ExplorerCMD struct {
PoolDatabase string `env:"LOCALAI_POOL_DATABASE,POOL_DATABASE" default:"explorer.json" help:"Path to the pool database" group:"api"`
ConnectionTimeout string `env:"LOCALAI_CONNECTION_TIMEOUT,CONNECTION_TIMEOUT" default:"2m" help:"Connection timeout for the explorer" group:"api"`
ConnectionErrorThreshold int `env:"LOCALAI_CONNECTION_ERROR_THRESHOLD,CONNECTION_ERROR_THRESHOLD" default:"3" help:"Connection failure threshold for the explorer" group:"api"`

WithSync bool `env:"LOCALAI_WITH_SYNC,WITH_SYNC" default:"false" help:"Enable sync with the network" group:"api"`
OnlySync bool `env:"LOCALAI_ONLY_SYNC,ONLY_SYNC" default:"false" help:"Only sync with the network" group:"api"`
}

func (e *ExplorerCMD) Run(ctx *cliContext.Context) error {
Expand All @@ -27,10 +30,20 @@ func (e *ExplorerCMD) Run(ctx *cliContext.Context) error {
if err != nil {
return err
}
ds := explorer.NewDiscoveryServer(db, dur, e.ConnectionErrorThreshold)

go ds.Start(context.Background())
appHTTP := http.Explorer(db, ds)
if e.WithSync {
ds := explorer.NewDiscoveryServer(db, dur, e.ConnectionErrorThreshold)
go ds.Start(context.Background(), true)
}

if e.OnlySync {
ds := explorer.NewDiscoveryServer(db, dur, e.ConnectionErrorThreshold)
ctx := context.Background()

return ds.Start(ctx, false)
}

appHTTP := http.Explorer(db)

return appHTTP.Listen(e.Address)
}
59 changes: 39 additions & 20 deletions core/explorer/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,58 +7,83 @@ import (
"os"
"sort"
"sync"

"github.com/gofrs/flock"
)

// Database is a simple JSON database for storing and retrieving p2p network tokens and a name and description.
type Database struct {
sync.RWMutex
path string
data map[string]TokenData
path string
data map[string]TokenData
flock *flock.Flock
sync.Mutex
}

// TokenData is a p2p network token with a name and description.
type TokenData struct {
Name string `json:"name"`
Description string `json:"description"`
Clusters []ClusterData
Failures int
}

type ClusterData struct {
Workers []string
Type string
NetworkID string
}

// NewDatabase creates a new Database with the given path.
func NewDatabase(path string) (*Database, error) {
fileLock := flock.New(path + ".lock")
db := &Database{
data: make(map[string]TokenData),
path: path,
data: make(map[string]TokenData),
path: path,
flock: fileLock,
}
return db, db.load()
}

// Get retrieves a Token from the Database by its token.
func (db *Database) Get(token string) (TokenData, bool) {
db.RLock()
defer db.RUnlock()
db.flock.Lock() // we are making sure that the file is not being written to
defer db.flock.Unlock()
db.Lock() // we are making sure that is safe if called by another instance in the same process
defer db.Unlock()
db.load()
t, ok := db.data[token]
return t, ok
}

// Set stores a Token in the Database by its token.
func (db *Database) Set(token string, t TokenData) error {
db.flock.Lock()
defer db.flock.Unlock()
db.Lock()
defer db.Unlock()
db.load()
db.data[token] = t
db.Unlock()

return db.Save()
return db.save()
}

// Delete removes a Token from the Database by its token.
func (db *Database) Delete(token string) error {
db.flock.Lock()
defer db.flock.Unlock()
db.Lock()
defer db.Unlock()
db.load()
delete(db.data, token)
db.Unlock()
return db.Save()
return db.save()
}

func (db *Database) TokenList() []string {
db.RLock()
defer db.RUnlock()
db.flock.Lock()
defer db.flock.Unlock()
db.Lock()
defer db.Unlock()
db.load()
tokens := []string{}
for k := range db.data {
tokens = append(tokens, k)
Expand All @@ -74,9 +99,6 @@ func (db *Database) TokenList() []string {

// load reads the Database from disk.
func (db *Database) load() error {
db.Lock()
defer db.Unlock()

if _, err := os.Stat(db.path); os.IsNotExist(err) {
return nil
}
Expand All @@ -91,10 +113,7 @@ func (db *Database) load() error {
}

// Save writes the Database to disk.
func (db *Database) Save() error {
db.RLock()
defer db.RUnlock()

func (db *Database) save() error {
// Marshal db.data into JSON
// Write the JSON to the file
f, err := os.Create(db.path)
Expand Down
49 changes: 16 additions & 33 deletions core/explorer/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,10 @@ import (
type DiscoveryServer struct {
sync.Mutex
database *Database
networkState *NetworkState
connectionTime time.Duration
failures map[string]int
errorThreshold int
}

type NetworkState struct {
Networks map[string]Network
}

func (s *DiscoveryServer) NetworkState() *NetworkState {
s.Lock()
defer s.Unlock()
return s.networkState
}

// NewDiscoveryServer creates a new DiscoveryServer with the given Database.
// it keeps the db state in sync with the network state
func NewDiscoveryServer(db *Database, dur time.Duration, failureThreshold int) *DiscoveryServer {
Expand All @@ -44,11 +32,7 @@ func NewDiscoveryServer(db *Database, dur time.Duration, failureThreshold int) *
return &DiscoveryServer{
database: db,
connectionTime: dur,
networkState: &NetworkState{
Networks: map[string]Network{},
},
errorThreshold: failureThreshold,
failures: make(map[string]int),
}
}

Expand Down Expand Up @@ -116,10 +100,10 @@ func (s *DiscoveryServer) runBackground() {

if hasWorkers {
s.Lock()
s.networkState.Networks[token] = Network{
Clusters: ledgerK,
}
delete(s.failures, token)
data, _ := s.database.Get(token)
(&data).Clusters = ledgerK
(&data).Failures = 0
s.database.Set(token, data)
s.Unlock()
} else {
s.failedToken(token)
Expand All @@ -132,27 +116,23 @@ func (s *DiscoveryServer) runBackground() {
func (s *DiscoveryServer) failedToken(token string) {
s.Lock()
defer s.Unlock()
s.failures[token]++
data, _ := s.database.Get(token)
(&data).Failures++
s.database.Set(token, data)
}

func (s *DiscoveryServer) deleteFailedConnections() {
s.Lock()
defer s.Unlock()
for k, v := range s.failures {
if v > s.errorThreshold {
log.Info().Any("network", k).Msg("Network has been removed from the database")
s.database.Delete(k)
delete(s.failures, k)
for _, t := range s.database.TokenList() {
data, _ := s.database.Get(t)
if data.Failures > s.errorThreshold {
log.Info().Any("token", t).Msg("Token has been removed from the database")
s.database.Delete(t)
}
}
}

type ClusterData struct {
Workers []string
Type string
NetworkID string
}

func (s *DiscoveryServer) retrieveNetworkData(c context.Context, ledger *blockchain.Ledger, networkData chan ClusterData) {
clusters := map[string]ClusterData{}

Expand Down Expand Up @@ -217,14 +197,17 @@ func (s *DiscoveryServer) retrieveNetworkData(c context.Context, ledger *blockch
}

// Start the discovery server. This is meant to be run in to a goroutine.
func (s *DiscoveryServer) Start(ctx context.Context) error {
func (s *DiscoveryServer) Start(ctx context.Context, keepRunning bool) error {
for {
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled")
default:
// Collect data
s.runBackground()
if !keepRunning {
return nil
}
}
}
}
11 changes: 4 additions & 7 deletions core/http/endpoints/explorer/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

func Dashboard() func(*fiber.Ctx) error {
return func(c *fiber.Ctx) error {

summary := fiber.Map{
"Title": "LocalAI API - " + internal.PrintableVersion(),
"Version": internal.PrintableVersion(),
Expand All @@ -34,26 +33,24 @@ type AddNetworkRequest struct {
}

type Network struct {
explorer.Network
explorer.TokenData
Token string `json:"token"`
}

func ShowNetworks(db *explorer.Database, ds *explorer.DiscoveryServer) func(*fiber.Ctx) error {
func ShowNetworks(db *explorer.Database) func(*fiber.Ctx) error {
return func(c *fiber.Ctx) error {
networkState := ds.NetworkState()
results := []Network{}
for token, network := range networkState.Networks {
for _, token := range db.TokenList() {
networkData, exists := db.Get(token) // get the token data
hasWorkers := false
for _, cluster := range network.Clusters {
for _, cluster := range networkData.Clusters {
if len(cluster.Workers) > 0 {
hasWorkers = true
break
}
}
if exists && hasWorkers {
results = append(results, Network{Network: network, TokenData: networkData, Token: token})
results = append(results, Network{TokenData: networkData, Token: token})
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/http/explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/mudler/LocalAI/core/http/routes"
)

func Explorer(db *explorer.Database, discoveryServer *explorer.DiscoveryServer) *fiber.App {
func Explorer(db *explorer.Database) *fiber.App {

fiberCfg := fiber.Config{
Views: renderEngine(),
Expand All @@ -22,7 +22,7 @@ func Explorer(db *explorer.Database, discoveryServer *explorer.DiscoveryServer)

app := fiber.New(fiberCfg)

routes.RegisterExplorerRoutes(app, db, discoveryServer)
routes.RegisterExplorerRoutes(app, db)

httpFS := http.FS(embedDirStatic)

Expand Down
4 changes: 2 additions & 2 deletions core/http/routes/explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"github.com/mudler/LocalAI/core/http/endpoints/explorer"
)

func RegisterExplorerRoutes(app *fiber.App, db *coreExplorer.Database, ds *coreExplorer.DiscoveryServer) {
func RegisterExplorerRoutes(app *fiber.App, db *coreExplorer.Database) {
app.Get("/", explorer.Dashboard())
app.Post("/network/add", explorer.AddNetwork(db))
app.Get("/networks", explorer.ShowNetworks(db, ds))
app.Get("/networks", explorer.ShowNetworks(db))
}
1 change: 1 addition & 0 deletions core/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string
if ndService, found := service[nd.Name]; !found {
if !nd.IsOnline() {
// if node is offline and not present, do nothing
zlog.Debug().Msgf("Node %s is offline", nd.ID)
return
}
newCtxm, cancel := context.WithCancel(ctx)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ require (
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0 // indirect
github.com/gofrs/flock v0.12.1 // indirect
github.com/labstack/echo/v4 v4.12.0 // indirect
github.com/labstack/gommon v0.4.2 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ github.com/gofiber/template/html/v2 v2.1.2 h1:wkK/mYJ3nIhongTkG3t0QgV4ADdgOYJYVS
github.com/gofiber/template/html/v2 v2.1.2/go.mod h1:E98Z/FzvpaSib06aWEgYk6GXNf3ctoyaJH8yW5ay5ak=
github.com/gofiber/utils v1.1.0 h1:vdEBpn7AzIUJRhe+CiTOJdUcTg4Q9RK+pEa0KPbLdrM=
github.com/gofiber/utils v1.1.0/go.mod h1:poZpsnhBykfnY1Mc0KeEa6mSHrS3dV0+oBWyeQmb2e0=
github.com/gofrs/flock v0.12.1 h1:MTLVXXHf8ekldpJk3AKicLij9MdwOWkZ+a/jHHZby9E=
github.com/gofrs/flock v0.12.1/go.mod h1:9zxTsyu5xtJ9DK+1tFZyibEV7y3uwDxPPfbxeeHCoD0=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
Expand Down

0 comments on commit 9729d2a

Please sign in to comment.