diff --git a/Dockerfile b/Dockerfile index 5163eba..f9dfa62 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,3 +22,4 @@ RUN apk add --no-cache ca-certificates COPY --from=builder /app/indexer / ENTRYPOINT ["/indexer"] +EXPOSE 2112 diff --git a/client/duneapi/client.go b/client/duneapi/client.go index 29acf1e..b91e114 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -186,11 +186,16 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques req.Header.Set("x-dune-api-key", c.cfg.APIKey) req.Header.Set("x-dune-batch-size", fmt.Sprintf("%d", request.BatchSize)) req = req.WithContext(ctx) + + t0 := time.Now() resp, err := c.httpClient.Do(req) if err != nil { + observeSendBlocksRequestErr(err, request.BatchSize, t0) return err } defer resp.Body.Close() + observeSendBlocksRequestCode(resp.StatusCode, request.BatchSize, t0) + responseStatus = resp.Status if resp.StatusCode != http.StatusOK { diff --git a/client/duneapi/metrics.go b/client/duneapi/metrics.go new file mode 100644 index 0000000..df5dcc2 --- /dev/null +++ b/client/duneapi/metrics.go @@ -0,0 +1,82 @@ +package duneapi + +import ( + "errors" + "net/url" + "strconv" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var metricSendBlockCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "node_indexer", + Subsystem: "dune_client", + Name: "sent_block_total", + Help: "Total number of blocks sent in requests", + }, + []string{"status"}, +) + +var metricSendBlockBatchSize = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "node_indexer", + Subsystem: "dune_client", + Name: "block_per_batch", + Help: "Number of blocks per batch", + Buckets: []float64{1, 2, 4, 8, 16, 32, 64, 128, 256}, + }, + []string{"status"}, +) + +var metricSendRequestsCount = promauto.NewCounterVec( + + prometheus.CounterOpts{ + Namespace: "node_indexer", + Subsystem: "dune_client", + Name: "send_requests_total", + Help: "Number of send requests", + }, + []string{"status"}, +) + +var metricSendBlockBatchDurationMillis = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "node_indexer", + Subsystem: "dune_client", + Name: "send_block_batch_duration_millis", + Help: "Duration of a send block batch request in milliseconds", + Buckets: []float64{10, 25, 50, 100, 250, 500, 1000, 2000, 4000}, + }, + []string{"status"}, +) + +func observeSendBlocksRequest(status string, numberOfBlocks int, t0 time.Time) { + metricSendBlockCount.WithLabelValues(status).Inc() + metricSendBlockBatchSize.WithLabelValues(status).Observe(float64(numberOfBlocks)) + metricSendBlockBatchDurationMillis.WithLabelValues(status).Observe(float64(time.Since(t0).Milliseconds())) + metricSendRequestsCount.WithLabelValues(status).Add(float64(numberOfBlocks)) +} + +func observeSendBlocksRequestCode(statusCode int, numberOfBlocks int, t0 time.Time) { + observeSendBlocksRequest(strconv.Itoa(statusCode), numberOfBlocks, t0) +} + +func observeSendBlocksRequestErr(err error, numberOfBlocks int, t0 time.Time) { + observeSendBlocksRequest(errorToStatus(err), numberOfBlocks, t0) +} + +func errorToStatus(err error) string { + status := "unknown_error" + var urlErr *url.Error + if errors.As(err, &urlErr) { + if urlErr.Timeout() { + status = "timeout" + } else { + status = "connection_refused" + } + } + return status +} diff --git a/client/jsonrpc/client.go b/client/jsonrpc/client.go index 37bae2f..e156eb0 100644 --- a/client/jsonrpc/client.go +++ b/client/jsonrpc/client.go @@ -119,11 +119,16 @@ func (c *rpcClient) getResponseBody( req.Header.Set(k, v) } } + + t0 := time.Now() resp, err := c.client.Do(req) if err != nil { + observeRPCRequestErr(err, method, t0) return fmt.Errorf("failed to send request for method %s: %w", method, err) } defer resp.Body.Close() + observeRPCRequestCode(resp.StatusCode, method, t0) + if resp.StatusCode != http.StatusOK { return fmt.Errorf("response for method %s has status code %d", method, resp.StatusCode) } diff --git a/client/jsonrpc/metrics.go b/client/jsonrpc/metrics.go new file mode 100644 index 0000000..498ebe5 --- /dev/null +++ b/client/jsonrpc/metrics.go @@ -0,0 +1,58 @@ +package jsonrpc + +import ( + "errors" + "net/url" + "strconv" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var rpcRequestCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "node_indexer", + Subsystem: "rpc_client", + Name: "request_total", + Help: "Total number of RPC node requests", + }, + []string{"status", "method"}, +) + +var rpcRequestDurationMillis = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "node_indexer", + Subsystem: "rpc_client", + Name: "request_duration_millis", + Help: "Duration of RPC node requests in milliseconds", + Buckets: []float64{10, 25, 50, 100, 250, 500, 1000, 2000, 4000}, + }, + []string{"status", "method"}, +) + +func observeRPCRequest(status string, method string, t0 time.Time) { + rpcRequestCount.WithLabelValues(status, method).Inc() + rpcRequestDurationMillis.WithLabelValues(status, method).Observe(float64(time.Since(t0).Milliseconds())) +} + +func observeRPCRequestCode(statusCode int, method string, t0 time.Time) { + observeRPCRequest(strconv.Itoa(statusCode), method, t0) +} + +func observeRPCRequestErr(err error, method string, t0 time.Time) { + observeRPCRequest(errorToStatus(err), method, t0) +} + +func errorToStatus(err error) string { + status := "unknown_error" + var urlErr *url.Error + if errors.As(err, &urlErr) { + if urlErr.Timeout() { + status = "timeout" + } else { + status = "connection_refused" + } + } + return status +} diff --git a/cmd/main.go b/cmd/main.go index dd15eae..89db1b2 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -7,6 +7,7 @@ import ( "context" stdlog "log" "log/slog" + "net/http" "os" "os/signal" "strings" @@ -14,6 +15,8 @@ import ( "syscall" "time" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/duneanalytics/blockchain-ingester/client/duneapi" "github.com/duneanalytics/blockchain-ingester/client/jsonrpc" "github.com/duneanalytics/blockchain-ingester/config" @@ -100,6 +103,15 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) + go func() { + http.Handle("/metrics", promhttp.Handler()) + err = http.ListenAndServe(":2112", nil) + if err != nil { + cancel() + stdlog.Fatal(err) + } + }() + // Get stored progress unless config indicates we should start from 0 var startBlockNumber int64 // Default to -1 to start where the ingester left off diff --git a/go.mod b/go.mod index 26973a4..eed1f14 100644 --- a/go.mod +++ b/go.mod @@ -8,14 +8,22 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.7 github.com/jessevdk/go-flags v1.5.0 github.com/klauspost/compress v1.17.8 + github.com/prometheus/client_golang v1.19.1 github.com/stretchr/testify v1.9.0 golang.org/x/sync v0.7.0 ) require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect + github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/common v0.48.0 // indirect + github.com/prometheus/procfs v0.12.0 // indirect golang.org/x/sys v0.20.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 1835ce7..d9a2477 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,8 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= @@ -6,6 +11,8 @@ github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk= github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= @@ -16,12 +23,26 @@ github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LF github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= +github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= +github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= +github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= @@ -29,7 +50,10 @@ golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/ingester/mainloop.go b/ingester/mainloop.go index b417d38..ee2629d 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -25,7 +25,8 @@ import ( // The SendBlocks goroutine receives all blocks on an unbuffered channel, // but buffers them in a map until they can be sent in order. func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error { - // + registerIngesterMetrics(i) + if i.cfg.DLQOnly { i.cfg.MaxConcurrentRequests = 0 // if running DLQ Only mode, ignore the MaxConcurrentRequests and set this to 0 } else { diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 8209f1e..b4a6261 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/duneanalytics/blockchain-ingester/lib/dlq" "github.com/duneanalytics/blockchain-ingester/ingester" @@ -22,6 +24,7 @@ import ( ) func TestRunUntilCancel(t *testing.T) { + t.Cleanup(resetDefaultRegistry) ctx, cancel := context.WithCancel(context.Background()) maxBlockNumber := int64(10) sentBlockNumber := int64(0) @@ -92,6 +95,7 @@ func TestRunUntilCancel(t *testing.T) { } func TestProduceBlockNumbers(t *testing.T) { + t.Cleanup(resetDefaultRegistry) duneapi := &duneapi_mock.BlockchainIngesterMock{ PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { return nil @@ -136,6 +140,7 @@ func TestProduceBlockNumbers(t *testing.T) { } func TestSendBlocks(t *testing.T) { + t.Cleanup(resetDefaultRegistry) sentBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { @@ -202,6 +207,7 @@ func TestSendBlocks(t *testing.T) { // TestRunBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order // even if they are produced out of order. We ensure they are produced out of order by sleeping a random amount of time. func TestRunBlocksOutOfOrder(t *testing.T) { + t.Cleanup(resetDefaultRegistry) ctx, cancel := context.WithCancel(context.Background()) maxBlockNumber := int64(1000) sentBlockNumber := int64(0) @@ -274,6 +280,7 @@ func TestRunBlocksOutOfOrder(t *testing.T) { // TestRunRPCNodeFails shows that we crash if the RPC client fails to fetch a block func TestRunRPCNodeFails(t *testing.T) { + t.Cleanup(resetDefaultRegistry) ctx, cancel := context.WithCancel(context.Background()) defer cancel() maxBlockNumber := int64(1000) @@ -323,6 +330,7 @@ func TestRunRPCNodeFails(t *testing.T) { // TestRunRPCNodeFails shows that we crash if the RPC client fails to fetch a block func TestRunFailsIfNoConcurrentRequests(t *testing.T) { + t.Cleanup(resetDefaultRegistry) logOutput := io.Discard ing := ingester.New( slog.New(slog.NewTextHandler(logOutput, nil)), @@ -341,6 +349,7 @@ func TestRunFailsIfNoConcurrentRequests(t *testing.T) { } func TestRunFailsIfNoConcurrentRequestsDLQ(t *testing.T) { + t.Cleanup(resetDefaultRegistry) logOutput := io.Discard ing := ingester.New( slog.New(slog.NewTextHandler(logOutput, nil)), @@ -360,6 +369,7 @@ func TestRunFailsIfNoConcurrentRequestsDLQ(t *testing.T) { } func TestRunWithDLQ(t *testing.T) { + t.Cleanup(resetDefaultRegistry) ctx, cancel := context.WithCancel(context.Background()) maxBlockNumber := int64(1000) startBlockNumber := int64(10) @@ -523,3 +533,10 @@ func lenSyncMap(m *sync.Map) int { }) return length } + +// resetDefaultRegistry resets the default Prometheus registry. +func resetDefaultRegistry() { + registry := prometheus.NewRegistry() + prometheus.DefaultRegisterer = registry + prometheus.DefaultGatherer = registry +} diff --git a/ingester/metrics.go b/ingester/metrics.go new file mode 100644 index 0000000..de34c3d --- /dev/null +++ b/ingester/metrics.go @@ -0,0 +1,50 @@ +package ingester + +import ( + "sync/atomic" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +func registerIngesterMetrics(i *ingester) { + registerLatestBlockNumberGauge(func() int64 { + return atomic.LoadInt64(&i.info.LatestBlockNumber) + }) + registerIngestedBlockNumberGauge(func() int64 { + return atomic.LoadInt64(&i.info.IngestedBlockNumber) + }) + registerDlqSizeGauge(func() int { + return i.dlq.Size() + }) +} + +func registerLatestBlockNumberGauge(function func() int64) { + promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "node_indexer", + Name: "latest_block_number", + Help: "The latest known block number for the chain", + }, func() float64 { + return float64(function()) + }) +} + +func registerIngestedBlockNumberGauge(function func() int64) { + promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "node_indexer", + Name: "ingested_block_number", + Help: "The highest block number ingested so far", + }, func() float64 { + return float64(function()) + }) +} + +func registerDlqSizeGauge(function func() int) { + promauto.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "node_indexer", + Name: "dlq_size", + Help: "The number of blocks in the DLQ", + }, func() float64 { + return float64(function()) + }) +}