Skip to content

Commit

Permalink
Add DLQ for reprocessing of failed blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
adammilnesmith committed Jul 12, 2024
1 parent 67dbd31 commit 9760102
Show file tree
Hide file tree
Showing 14 changed files with 585 additions and 18 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ go.work
go.work.sum

# Binary
indexer
indexer
bin

.idea
75 changes: 75 additions & 0 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type BlockchainIngester interface {
// PostProgressReport sends a progress report to DuneAPI
PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error

GetBlockGaps(ctx context.Context) (*models.BlockchainGaps, error)

// - API to discover the latest block number ingested
// this can also provide "next block ranges" to push to DuneAPI
// - log/metrics on catching up/falling behind, distance from tip of chain
Expand Down Expand Up @@ -366,3 +368,76 @@ func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndex
}
return progress, nil
}

func (c *client) GetBlockGaps(ctx context.Context) (*models.BlockchainGaps, error) {
var response BlockchainGapsResponse
var err error
var responseStatus string
start := time.Now()

// Log response
defer func() {
if err != nil {
c.log.Error("Getting block gaps failed",
"error", err,
"statusCode", responseStatus,
"duration", time.Since(start),
)
} else {
c.log.Info("Got block gaps",
"blockGaps", response.String(),
"duration", time.Since(start),
)
}
}()

url := fmt.Sprintf("%s/api/beta/blockchain/%s/gaps", c.cfg.URL, c.cfg.BlockchainName)
c.log.Debug("Sending request", "url", url)
req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // nil: empty body
if err != nil {
return nil, err
}
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
req = req.WithContext(ctx)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
bs, _ := io.ReadAll(resp.Body)
responseBody := string(bs)
// We mutate the global err here because we have deferred a log message where we check for non-nil err
err = fmt.Errorf("unexpected status code: %v, %v with body '%s'", resp.StatusCode, resp.Status, responseBody)
return nil, err
}

err = json.Unmarshal(responseBody, &response)
if err != nil {
return nil, err
}

gaps := &models.BlockchainGaps{
Gaps: mapSlice(response.Gaps, func(gap BlockGap) models.BlockGap {
return models.BlockGap{
FirstMissing: gap.FirstMissing,
LastMissing: gap.LastMissing,
}
}),
}
return gaps, nil
}

func mapSlice[T any, U any](slice []T, mapper func(T) U) []U {
result := make([]U, len(slice))
for i, v := range slice {
result[i] = mapper(v)
}
return result
}
14 changes: 14 additions & 0 deletions client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,17 @@ type BlockchainError struct {
Error string `json:"error"`
Source string `json:"source"`
}

type BlockchainGapsResponse struct {
Gaps []BlockGap `json:"gaps"`
}

// BlockGap declares an inclusive range of missing block numbers
type BlockGap struct {
FirstMissing int64 `json:"first_missing"`
LastMissing int64 `json:"last_missing"`
}

func (b *BlockchainGapsResponse) String() string {
return fmt.Sprintf("%+v", *b)
}
15 changes: 15 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"syscall"
"time"

"github.com/duneanalytics/blockchain-ingester/lib/dlq"

"github.com/duneanalytics/blockchain-ingester/client/duneapi"
"github.com/duneanalytics/blockchain-ingester/client/jsonrpc"
"github.com/duneanalytics/blockchain-ingester/config"
Expand Down Expand Up @@ -101,6 +103,17 @@ func main() {
startBlockNumber = cfg.BlockHeight
}

dlqBlockNumbers := dlq.NewDLQWithDelay[int64](dlq.RetryDelayLinear(cfg.DLQRetryInterval))

if !cfg.DisableGapsQuery {
blockGaps, err := duneClient.GetBlockGaps(ctx)
if err != nil {
stdlog.Fatal(err)
} else {
ingester.AddBlockGaps(dlqBlockNumbers, blockGaps.Gaps)
}
}

maxCount := int64(0) // 0 means ingest until cancelled
ingester := ingester.New(
logger,
Expand All @@ -110,12 +123,14 @@ func main() {
MaxConcurrentRequests: cfg.RPCConcurrency,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
PollDLQInterval: cfg.PollDLQInterval,
Stack: cfg.RPCStack,
BlockchainName: cfg.BlockchainName,
BlockSubmitInterval: cfg.BlockSubmitInterval,
SkipFailedBlocks: cfg.RPCNode.SkipFailedBlocks,
},
progress,
dlqBlockNumbers,
)

wg.Add(1)
Expand Down
9 changes: 6 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ func (r RPCClient) HasError() error {
}

type Config struct {
BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll
BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll
DisableCompression bool `long:"disable-compression" env:"DISABLE_COMPRESSION" description:"disable compression when sending data to Dune"` // nolint:lll
BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll
BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll
DisableCompression bool `long:"disable-compression" env:"DISABLE_COMPRESSION" description:"disable compression when sending data to Dune"` // nolint:lll
DisableGapsQuery bool `long:"disable-gaps-query" env:"DISABLE_GAPS_QUERY" description:"disable gaps query used to populate the initial DLQ"` // nolint:lll
Dune DuneClient
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll
PollDLQInterval time.Duration `long:"dlq-poll-interval" env:"DLQ_POLL_INTERVAL" description:"Interval to poll the dlq" default:"300ms"` // nolint:lll
DLQRetryInterval time.Duration `long:"dlq-retry-interval" env:"DLQ_RETRY_INTERVAL" description:"Interval for linear backoff in DLQ " default:"1m"` // nolint:lll
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/duneanalytics/blockchain-ingester
go 1.22.2

require (
github.com/emirpasic/gods v1.18.1
github.com/go-errors/errors v1.5.1
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/jessevdk/go-flags v1.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk=
Expand Down
21 changes: 14 additions & 7 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"log/slog"
"time"

"github.com/duneanalytics/blockchain-ingester/lib/dlq"

"github.com/duneanalytics/blockchain-ingester/client/duneapi"
"github.com/duneanalytics/blockchain-ingester/client/jsonrpc"
"github.com/duneanalytics/blockchain-ingester/models"
Expand All @@ -31,20 +33,26 @@ type Ingester interface {
// - a fatal error occurs
SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startFrom int64) error

// This is just a placeholder for now
Info() Info
ProduceBlockNumbersDLQ(ctx context.Context, outChan chan dlq.Item[int64]) error

FetchBlockLoopDLQ(ctx context.Context,
blockNumbers <-chan dlq.Item[int64],
blocks chan<- dlq.Item[models.RPCBlock],
) error

SendBlocksDLQ(ctx context.Context, blocks <-chan dlq.Item[models.RPCBlock]) error

Close() error
}

const (
defaultMaxBatchSize = 5
defaultReportProgressInterval = 30 * time.Second
)

type Config struct {
MaxConcurrentRequests int
PollInterval time.Duration
PollDLQInterval time.Duration
ReportProgressInterval time.Duration
Stack models.EVMStack
BlockchainName string
Expand All @@ -58,6 +66,7 @@ type ingester struct {
dune duneapi.BlockchainIngester
cfg Config
info Info
dlq *dlq.DLQ[int64]
}

func New(
Expand All @@ -66,6 +75,7 @@ func New(
dune duneapi.BlockchainIngester,
cfg Config,
progress *models.BlockchainIndexProgress,
dlq *dlq.DLQ[int64],
) Ingester {
info := NewInfo(cfg.BlockchainName, cfg.Stack.String())
if progress != nil {
Expand All @@ -78,13 +88,10 @@ func New(
dune: dune,
cfg: cfg,
info: info,
dlq: dlq,
}
if ing.cfg.ReportProgressInterval == 0 {
ing.cfg.ReportProgressInterval = defaultReportProgressInterval
}
return ing
}

func (i *ingester) Info() Info {
return i.info
}
Loading

0 comments on commit 9760102

Please sign in to comment.