Skip to content

Commit

Permalink
Skip scanners if nodes are syncing (#66)
Browse files Browse the repository at this point in the history
* add is syncing to execution

* add is syncing to beacon

* skip if execution or consensus are syncing
  • Loading branch information
pablomendezroyo authored Dec 5, 2024
1 parent 2bd942f commit 3c48f85
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 0 deletions.
26 changes: 26 additions & 0 deletions internal/adapters/beaconchain/beaconchain_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,32 @@ func NewBeaconchainAdapter(beaconChainUrl string) *BeaconchainAdapter {
}
}

// GetSyncingStatus checks if the beacon node is syncing.
func (b *BeaconchainAdapter) GetSyncingStatus() (bool, error) {
url := fmt.Sprintf("%s/eth/v1/node/syncing", b.beaconChainUrl)
resp, err := http.Get(url)
if err != nil {
return false, fmt.Errorf("failed to send request to Beaconchain at %s: %w", url, err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return false, fmt.Errorf("unexpected status code %d received from Beaconchain", resp.StatusCode)
}

// Parse the response
var result struct {
Data struct {
IsSyncing bool `json:"is_syncing"`
} `json:"data"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return false, fmt.Errorf("failed to decode response from Beaconchain: %w", err)
}

return result.Data.IsSyncing, nil
}

func (b *BeaconchainAdapter) GetValidatorStatus(pubkey string) (domain.ValidatorStatus, error) {
validatorData, err := b.PostStateValidators("finalized", []string{pubkey}, nil)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,22 @@ func TestGetEpochHeaderIntegration(t *testing.T) {
assert.Greater(t, epoch, uint64(0))
t.Logf("Epoch for finalized block: %d", epoch)
}

// TestGetSyncingStatusIntegration tests the GetSyncingStatus method of the BeaconchainAdapter
func TestGetSyncingStatusIntegration(t *testing.T) {
adapter := setupBeaconchainAdapter(t)

// Call the GetSyncingStatus method
isSyncing, err := adapter.GetSyncingStatus()
assert.NoError(t, err)

// Assert the result is a valid boolean
assert.IsType(t, isSyncing, false, "Expected the result to be a boolean")

// Log the result for debugging
if isSyncing {
t.Log("The beacon node is syncing.")
} else {
t.Log("The beacon node is not syncing.")
}
}
44 changes: 44 additions & 0 deletions internal/adapters/execution/execution_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,47 @@ func (e *ExecutionAdapter) GetBlockTimestampByNumber(blockNumber uint64) (uint64

return timestamp, nil
}

// IsSyncing checks if the Ethereum execution client is currently syncing.
func (e *ExecutionAdapter) IsSyncing() (bool, error) {
// Create the request payload for eth_syncing
payload := map[string]interface{}{
"jsonrpc": "2.0",
"method": "eth_syncing",
"params": []interface{}{},
"id": 1,
}

// Marshal the payload to JSON
jsonPayload, err := json.Marshal(payload)
if err != nil {
return false, fmt.Errorf("failed to marshal request payload for eth_syncing: %w", err)
}

// Send the request to the execution client
resp, err := http.Post(e.rpcURL, "application/json", bytes.NewBuffer(jsonPayload))
if err != nil {
return false, fmt.Errorf("failed to send request to execution client at %s: %w", e.rpcURL, err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return false, fmt.Errorf("unexpected status code %d received from execution client", resp.StatusCode)
}

// Parse the response
var result struct {
Result interface{} `json:"result"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return false, fmt.Errorf("failed to decode response from execution client: %w", err)
}

// If result is false, the node is not syncing
if result.Result == false {
return false, nil
}

// If result is a map or object, the node is syncing
return true, nil
}
21 changes: 21 additions & 0 deletions internal/adapters/execution/execution_adapter_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,24 @@ func TestGetBlockTimestampByNumberIntegration(t *testing.T) {
// Log the timestamp for debugging
t.Logf("Timestamp for block %d: %d (Unix)", blockNumber, timestamp)
}

// TestIsSyncingIntegration tests the IsSyncing method of the ExecutionAdapter
func TestIsSyncingIntegration(t *testing.T) {
// Set up the execution adapter
adapter, err := setupExecutionAdapter(t)
assert.NoError(t, err)

// Call the IsSyncing method
isSyncing, err := adapter.IsSyncing()
assert.NoError(t, err)

// Assert the result is a valid boolean
assert.IsType(t, isSyncing, false, "Expected the result to be a boolean")

// Log the result for debugging
if isSyncing {
t.Log("The Ethereum node is syncing.")
} else {
t.Log("The Ethereum node is not syncing.")
}
}
1 change: 1 addition & 0 deletions internal/application/ports/beaconchain_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ import "lido-events/internal/application/domain"
type Beaconchain interface {
GetValidatorStatus(pubkey string) (domain.ValidatorStatus, error)
GetEpochHeader(blockID string) (uint64, error)
GetSyncingStatus() (bool, error)
}
1 change: 1 addition & 0 deletions internal/application/ports/execution_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ package ports
type ExecutionPort interface {
GetMostRecentBlockNumber() (uint64, error)
GetBlockTimestampByNumber(blockNumber uint64) (uint64, error)
IsSyncing() (bool, error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ func (ds *DistributionLogUpdatedEventScanner) ScanDistributionLogUpdatedEventsCr

// runScan contains the execution logic for scanning DistributionLogUpdated events
func (ds *DistributionLogUpdatedEventScanner) runScan(ctx context.Context) {
// Skip if node is syncing
isSyncing, err := ds.executionPort.IsSyncing()
if err != nil {
logger.ErrorWithPrefix(ds.servicePrefix, "Error checking if node is syncing: %v", err)
return
}

if isSyncing {
logger.InfoWithPrefix(ds.servicePrefix, "Node is syncing, skipping DistributionLogUpdated scan")
return
}

// Retrieve start and end blocks for scanning
start, err := ds.storagePort.GetDistributionLogLastProcessedBlock()
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions internal/application/services/validatorExitRequestEventScanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,26 @@ func (vs *ValidatorExitRequestEventScanner) ScanValidatorExitRequestEventsCron(c

// runScan contains the execution logic for scanning ValidatorExitRequest events
func (vs *ValidatorExitRequestEventScanner) runScan(ctx context.Context) {
// Skip if execution or beaconchain nodes are syncing
executionSyncing, err := vs.executionPort.IsSyncing()
if err != nil {
logger.ErrorWithPrefix(vs.servicePrefix, "Error checking if execution node is syncing: %v", err)
return
}
if executionSyncing {
logger.InfoWithPrefix(vs.servicePrefix, "Execution node is syncing, skipping ValidatorExitRequest event scan")
return
}
beaconchainSyncing, err := vs.beaconchainPort.GetSyncingStatus()
if err != nil {
logger.ErrorWithPrefix(vs.servicePrefix, "Error checking if beaconchain node is syncing: %v", err)
return
}
if beaconchainSyncing {
logger.InfoWithPrefix(vs.servicePrefix, "Beaconchain node is syncing, skipping ValidatorExitRequest event scan")
return
}

// Retrieve start and end blocks for scanning
start, err := vs.storagePort.GetValidatorExitRequestLastProcessedBlock()
if err != nil {
Expand Down

0 comments on commit 3c48f85

Please sign in to comment.