Skip to content

Commit

Permalink
Merge pull request #246 from ethpandaops/pk910/update-node-version
Browse files Browse the repository at this point in the history
refresh client versions in regular intervals
  • Loading branch information
pk910 authored Feb 13, 2025
2 parents 63802c1 + a00d5d0 commit e49f6b5
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 65 deletions.
2 changes: 1 addition & 1 deletion clients/consensus/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Client struct {
finalizedRoot phase0.Root
finalizedEpoch phase0.Epoch
lastFinalityUpdateEpoch phase0.Epoch
lastPeerUpdateEpoch phase0.Epoch
lastMetadataUpdate time.Time
lastSyncUpdateEpoch phase0.Epoch
peers []*v1.Peer
blockDispatcher Dispatcher[*v1.BlockEvent]
Expand Down
40 changes: 21 additions & 19 deletions clients/consensus/clientlogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,9 @@ func (client *Client) checkClient() error {
return fmt.Errorf("initialization of attestantio/go-eth2-client failed: %w", err)
}

// get node version
nodeVersion, err := client.rpcClient.GetNodeVersion(ctx)
if err != nil {
return fmt.Errorf("error while fetching node version: %v", err)
}

client.versionStr = nodeVersion
client.parseClientVersion(nodeVersion)

// update node peers
if err = client.updateNodePeers(ctx); err != nil {
return fmt.Errorf("could not get node peers for %s: %v", client.endpointConfig.Name, err)
// update node metadata
if err = client.updateNodeMetadata(ctx); err != nil {
return fmt.Errorf("could not get node metadata for %s: %v", client.endpointConfig.Name, err)
}

// get & compare genesis
Expand Down Expand Up @@ -223,12 +214,12 @@ func (client *Client) runClientLogic() error {
}()
}

if currentEpoch-client.lastPeerUpdateEpoch >= 1 {
client.lastPeerUpdateEpoch = currentEpoch
if time.Since(client.lastMetadataUpdate) >= 5*time.Minute {
client.lastMetadataUpdate = time.Now()
go func() {
// update node peers
if err = client.updateNodePeers(client.clientCtx); err != nil {
client.logger.Errorf("could not get node peers for %s: %v", client.endpointConfig.Name, err)
if err = client.updateNodeMetadata(client.clientCtx); err != nil {
client.logger.Errorf("could not get node metadata for %s: %v", client.endpointConfig.Name, err)
} else {
client.logger.WithFields(logrus.Fields{"epoch": currentEpoch, "peers": len(client.peers)}).Debug("updated consensus node peers")
}
Expand Down Expand Up @@ -258,23 +249,34 @@ func (client *Client) updateSynchronizationStatus(ctx context.Context) error {
return nil
}

func (client *Client) updateNodePeers(ctx context.Context) error {
func (client *Client) updateNodeMetadata(ctx context.Context) error {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, 30*time.Second)
defer cancel()

var err error
client.lastMetadataUpdate = time.Now()

// get node version
nodeVersion, err := client.rpcClient.GetNodeVersion(ctx)
if err != nil {
return fmt.Errorf("error while fetching node version: %v", err)
}

client.versionStr = nodeVersion
client.parseClientVersion(nodeVersion)

// get node identity
client.nodeIdentity, err = client.rpcClient.GetNodeIdentity(ctx)
if err != nil {
return fmt.Errorf("could not get node peer id: %v", err)
}

// get node peers
peers, err := client.rpcClient.GetNodePeers(ctx)
if err != nil {
return fmt.Errorf("could not get peers: %v", err)
}
client.peers = peers
client.lastPeerUpdateEpoch = client.pool.chainState.CurrentEpoch()

return nil
}
Expand Down
46 changes: 23 additions & 23 deletions clients/execution/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,29 @@ type ClientConfig struct {
}

type Client struct {
pool *Pool
clientIdx uint16
endpointConfig *ClientConfig
clientCtx context.Context
clientCtxCancel context.CancelFunc
rpcClient *rpc.ExecutionClient
logger *logrus.Entry
isOnline bool
isSyncing bool
versionStr string
clientType ClientType
lastEvent time.Time
lastFilterPoll time.Time
lastPeersUpdate time.Time
blockFilterId rpc.BlockFilterId
retryCounter uint64
lastError error
headMutex sync.RWMutex
headHash common.Hash
headNumber uint64
nodeInfo *p2p.NodeInfo
peers []*p2p.PeerInfo
didFetchPeers bool
pool *Pool
clientIdx uint16
endpointConfig *ClientConfig
clientCtx context.Context
clientCtxCancel context.CancelFunc
rpcClient *rpc.ExecutionClient
logger *logrus.Entry
isOnline bool
isSyncing bool
versionStr string
clientType ClientType
lastEvent time.Time
lastFilterPoll time.Time
lastMetadataUpdate time.Time
blockFilterId rpc.BlockFilterId
retryCounter uint64
lastError error
headMutex sync.RWMutex
headHash common.Hash
headNumber uint64
nodeInfo *p2p.NodeInfo
peers []*p2p.PeerInfo
didFetchPeers bool
}

func (pool *Pool) newPoolClient(clientIdx uint16, endpoint *ClientConfig) (*Client, error) {
Expand Down
44 changes: 22 additions & 22 deletions clients/execution/clientlogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,10 @@ func (client *Client) checkClient() error {
return fmt.Errorf("initialization of execution client failed: %w", err)
}

// get node version
nodeVersion, err := client.rpcClient.GetClientVersion(ctx)
if err != nil {
return fmt.Errorf("error while fetching node version: %v", err)
}

client.versionStr = nodeVersion
client.parseClientVersion(nodeVersion)

// get peers
err = client.updateNodePeers(ctx)
// get node metadata
err = client.updateNodeMetadata(ctx)
if err != nil {
client.logger.Warnf("error updating node peers: %v", err)
client.logger.Warnf("error updating node metadata: %v", err)
}

// get & compare chain specs
Expand Down Expand Up @@ -100,13 +91,22 @@ func (client *Client) checkClient() error {
return nil
}

func (client *Client) updateNodePeers(ctx context.Context) error {
func (client *Client) updateNodeMetadata(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

client.lastPeersUpdate = time.Now()
client.lastMetadataUpdate = time.Now()

// get node version
nodeVersion, err := client.rpcClient.GetClientVersion(ctx)
if err != nil {
return fmt.Errorf("error while fetching node version: %v", err)
}

client.versionStr = nodeVersion
client.parseClientVersion(nodeVersion)

var err error
// get node peers
client.nodeInfo, err = client.rpcClient.GetAdminNodeInfo(ctx)
if err != nil {
client.didFetchPeers = false
Expand Down Expand Up @@ -173,11 +173,11 @@ func (client *Client) runClientLogic() error {
pollTimeout = 12*time.Second - pollTimeout
}

peerRefreshTimeout := time.Since(client.lastPeersUpdate)
if peerRefreshTimeout > 5*time.Minute {
peerRefreshTimeout = 0
metadataRefreshTimeout := time.Since(client.lastMetadataUpdate)
if metadataRefreshTimeout > 5*time.Minute {
metadataRefreshTimeout = 0
} else {
peerRefreshTimeout = 5*time.Minute - peerRefreshTimeout
metadataRefreshTimeout = 5*time.Minute - metadataRefreshTimeout
}

select {
Expand Down Expand Up @@ -221,10 +221,10 @@ func (client *Client) runClientLogic() error {
}

client.lastEvent = time.Now()
case <-time.After(peerRefreshTimeout):
err := client.updateNodePeers(client.clientCtx)
case <-time.After(metadataRefreshTimeout):
err := client.updateNodeMetadata(client.clientCtx)
if err != nil {
client.logger.Warnf("error updating node peers: %v", err)
client.logger.Warnf("error updating node metadata: %v", err)
}

}
Expand Down

0 comments on commit e49f6b5

Please sign in to comment.