Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DLQ for reprocessing of failed blocks #61

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ go.work
go.work.sum

# Binary
indexer
indexer
adammilnesmith marked this conversation as resolved.
Show resolved Hide resolved
bin

adammilnesmith marked this conversation as resolved.
Show resolved Hide resolved
.idea
75 changes: 75 additions & 0 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type BlockchainIngester interface {
// PostProgressReport sends a progress report to DuneAPI
PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error

GetBlockGaps(ctx context.Context) (*models.BlockchainGaps, error)

// - API to discover the latest block number ingested
// this can also provide "next block ranges" to push to DuneAPI
// - log/metrics on catching up/falling behind, distance from tip of chain
Expand Down Expand Up @@ -366,3 +368,76 @@ func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndex
}
return progress, nil
}

func (c *client) GetBlockGaps(ctx context.Context) (*models.BlockchainGaps, error) {
var response BlockchainGapsResponse
var err error
var responseStatus string
start := time.Now()

// Log response
defer func() {
if err != nil {
c.log.Error("Getting block gaps failed",
"error", err,
"statusCode", responseStatus,
"duration", time.Since(start),
)
} else {
c.log.Info("Got block gaps",
"blockGaps", response.String(),
"duration", time.Since(start),
)
}
}()

url := fmt.Sprintf("%s/api/beta/blockchain/%s/gaps", c.cfg.URL, c.cfg.BlockchainName)
c.log.Debug("Sending request", "url", url)
req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // nil: empty body
if err != nil {
return nil, err
}
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
req = req.WithContext(ctx)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
bs, _ := io.ReadAll(resp.Body)
responseBody := string(bs)
// We mutate the global err here because we have deferred a log message where we check for non-nil err
err = fmt.Errorf("unexpected status code: %v, %v with body '%s'", resp.StatusCode, resp.Status, responseBody)
return nil, err
}

err = json.Unmarshal(responseBody, &response)
if err != nil {
return nil, err
}

gaps := &models.BlockchainGaps{
Gaps: mapSlice(response.Gaps, func(gap BlockGap) models.BlockGap {
return models.BlockGap{
FirstMissing: gap.FirstMissing,
LastMissing: gap.LastMissing,
}
}),
}
helanto marked this conversation as resolved.
Show resolved Hide resolved
return gaps, nil
}

func mapSlice[T any, U any](slice []T, mapper func(T) U) []U {
adammilnesmith marked this conversation as resolved.
Show resolved Hide resolved
result := make([]U, len(slice))
for i, v := range slice {
result[i] = mapper(v)
}
return result
}
14 changes: 14 additions & 0 deletions client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,17 @@ type BlockchainError struct {
Error string `json:"error"`
Source string `json:"source"`
}

type BlockchainGapsResponse struct {
Gaps []BlockGap `json:"gaps"`
}

// BlockGap declares an inclusive range of missing block numbers
type BlockGap struct {
FirstMissing int64 `json:"first_missing"`
LastMissing int64 `json:"last_missing"`
adammilnesmith marked this conversation as resolved.
Show resolved Hide resolved
}

func (b *BlockchainGapsResponse) String() string {
return fmt.Sprintf("%+v", *b)
}
44 changes: 37 additions & 7 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/duneanalytics/blockchain-ingester/client/jsonrpc"
"github.com/duneanalytics/blockchain-ingester/config"
"github.com/duneanalytics/blockchain-ingester/ingester"
"github.com/duneanalytics/blockchain-ingester/lib/dlq"
"github.com/duneanalytics/blockchain-ingester/models"
)

Expand Down Expand Up @@ -59,6 +60,19 @@ func main() {
}
defer duneClient.Close()

// Create an extra Dune API client for DLQ processing since it is not thread-safe yet
duneClientDLQ, err := duneapi.New(logger, duneapi.Config{
APIKey: cfg.Dune.APIKey,
URL: cfg.Dune.URL,
BlockchainName: cfg.BlockchainName,
Stack: cfg.RPCStack,
DisableCompression: cfg.DisableCompression,
})
if err != nil {
stdlog.Fatal(err)
}
defer duneClientDLQ.Close()

var wg stdsync.WaitGroup
var rpcClient jsonrpc.BlockchainClient

Expand Down Expand Up @@ -101,21 +115,37 @@ func main() {
startBlockNumber = cfg.BlockHeight
}

dlqBlockNumbers := dlq.NewDLQWithDelay[int64](dlq.RetryDelayLinear(cfg.DLQRetryInterval))

if !cfg.DisableGapsQuery {
blockGaps, err := duneClient.GetBlockGaps(ctx)
if err != nil {
stdlog.Fatal(err)
} else {
ingester.AddBlockGaps(dlqBlockNumbers, blockGaps.Gaps)
}
}

maxCount := int64(0) // 0 means ingest until cancelled
ingester := ingester.New(
logger,
rpcClient,
duneClient,
duneClientDLQ,
ingester.Config{
MaxConcurrentRequests: cfg.RPCConcurrency,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
Stack: cfg.RPCStack,
BlockchainName: cfg.BlockchainName,
BlockSubmitInterval: cfg.BlockSubmitInterval,
SkipFailedBlocks: cfg.RPCNode.SkipFailedBlocks,
MaxConcurrentRequests: cfg.RPCConcurrency,
MaxConcurrentRequestsDLQ: cfg.DLQConcurrency,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
PollDLQInterval: cfg.PollDLQInterval,
Stack: cfg.RPCStack,
BlockchainName: cfg.BlockchainName,
BlockSubmitInterval: cfg.BlockSubmitInterval,
SkipFailedBlocks: cfg.RPCNode.SkipFailedBlocks,
DLQOnly: cfg.DLQOnly,
},
progress,
dlqBlockNumbers,
)

wg.Add(1)
Expand Down
11 changes: 8 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,20 @@ func (r RPCClient) HasError() error {
}

type Config struct {
BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll
BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll
DisableCompression bool `long:"disable-compression" env:"DISABLE_COMPRESSION" description:"disable compression when sending data to Dune"` // nolint:lll
BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll
BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll
DisableCompression bool `long:"disable-compression" env:"DISABLE_COMPRESSION" description:"disable compression when sending data to Dune"` // nolint:lll
DisableGapsQuery bool `long:"disable-gaps-query" env:"DISABLE_GAPS_QUERY" description:"disable gaps query used to populate the initial DLQ"` // nolint:lll
DLQOnly bool `long:"dlq-only" env:"DLQ_ONLY" description:"Runs just the DLQ processing on its own"` // nolint:lll
Dune DuneClient
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll
PollDLQInterval time.Duration `long:"dlq-poll-interval" env:"DLQ_POLL_INTERVAL" description:"Interval to poll the dlq" default:"300ms"` // nolint:lll
DLQRetryInterval time.Duration `long:"dlq-retry-interval" env:"DLQ_RETRY_INTERVAL" description:"Interval for linear backoff in DLQ " default:"1m"` // nolint:lll
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent requests to the RPC node" default:"25"` // nolint:lll
DLQConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent requests to the RPC node for DLQ processing" default:"2"` // nolint:lll
BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll
LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/duneanalytics/blockchain-ingester
go 1.22.2

require (
github.com/emirpasic/gods v1.18.1
github.com/go-errors/errors v1.5.1
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/jessevdk/go-flags v1.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk=
Expand Down
69 changes: 44 additions & 25 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"log/slog"
"time"

"github.com/duneanalytics/blockchain-ingester/lib/dlq"

"github.com/duneanalytics/blockchain-ingester/client/duneapi"
"github.com/duneanalytics/blockchain-ingester/client/jsonrpc"
"github.com/duneanalytics/blockchain-ingester/models"
Expand All @@ -24,67 +26,84 @@ type Ingester interface {
// It can safely be run concurrently.
FetchBlockLoop(context.Context, chan int64, chan models.RPCBlock) error

// SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop
// SendBlocks consumes RPCBlocks from the channel, reorders them, and sends batches to DuneAPI in an endless loop
// it will block until:
// - the context is cancelled
// - channel is closed
// - a fatal error occurs
SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startFrom int64) error

// This is just a placeholder for now
Info() Info
// ProduceBlockNumbersDLQ sends block numbers from the DLQ to outChan.
// It will run continuously until the context is cancelled.
// When the DLQ does not return an eligible next block, it waits for PollDLQInterval before trying again
ProduceBlockNumbersDLQ(ctx context.Context, outChan chan dlq.Item[int64]) error

// FetchBlockLoopDLQ fetches blocks sent on the channel and sends them on the other channel.
// It will run continuously until the context is cancelled, or the channel is closed.
// It can safely be run concurrently.
FetchBlockLoopDLQ(ctx context.Context,
blockNumbers <-chan dlq.Item[int64],
blocks chan<- dlq.Item[models.RPCBlock],
) error

// SendBlocksDLQ pushes one RPCBlock at a time to DuneAPI in the order they are received in
SendBlocksDLQ(ctx context.Context, blocks <-chan dlq.Item[models.RPCBlock]) error

Close() error
}

const (
defaultMaxBatchSize = 5
defaultReportProgressInterval = 30 * time.Second
)

type Config struct {
MaxConcurrentRequests int
PollInterval time.Duration
ReportProgressInterval time.Duration
Stack models.EVMStack
BlockchainName string
BlockSubmitInterval time.Duration
SkipFailedBlocks bool
MaxConcurrentRequests int
MaxConcurrentRequestsDLQ int
PollInterval time.Duration
PollDLQInterval time.Duration
ReportProgressInterval time.Duration
Stack models.EVMStack
BlockchainName string
BlockSubmitInterval time.Duration
SkipFailedBlocks bool
DLQOnly bool
}

type ingester struct {
log *slog.Logger
node jsonrpc.BlockchainClient
dune duneapi.BlockchainIngester
cfg Config
info Info
log *slog.Logger
node jsonrpc.BlockchainClient
dune duneapi.BlockchainIngester
duneDLQ duneapi.BlockchainIngester
cfg Config
info Info
dlq *dlq.DLQ[int64]
}

func New(
log *slog.Logger,
node jsonrpc.BlockchainClient,
dune duneapi.BlockchainIngester,
duneDLQ duneapi.BlockchainIngester,
cfg Config,
progress *models.BlockchainIndexProgress,
dlq *dlq.DLQ[int64],
) Ingester {
info := NewInfo(cfg.BlockchainName, cfg.Stack.String())
if progress != nil {
info.LatestBlockNumber = progress.LatestBlockNumber
info.IngestedBlockNumber = progress.LastIngestedBlockNumber
}
ing := &ingester{
log: log.With("module", "ingester"),
node: node,
dune: dune,
cfg: cfg,
info: info,
log: log.With("module", "ingester"),
node: node,
dune: dune,
duneDLQ: duneDLQ,
cfg: cfg,
info: info,
dlq: dlq,
}
if ing.cfg.ReportProgressInterval == 0 {
ing.cfg.ReportProgressInterval = defaultReportProgressInterval
}
return ing
}

func (i *ingester) Info() Info {
return i.info
}
Loading
Loading