Skip to content

Commit

Permalink
Add prometheus metrics (#62)
Browse files Browse the repository at this point in the history
Adds prometheus metrics
  • Loading branch information
adammilnesmith committed Jul 23, 2024
1 parent 5ace847 commit dcefb56
Show file tree
Hide file tree
Showing 11 changed files with 265 additions and 2 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ RUN apk add --no-cache ca-certificates

COPY --from=builder /app/indexer /
ENTRYPOINT ["/indexer"]
EXPOSE 2112
5 changes: 5 additions & 0 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,16 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
req.Header.Set("x-dune-batch-size", fmt.Sprintf("%d", request.BatchSize))
req = req.WithContext(ctx)

t0 := time.Now()
resp, err := c.httpClient.Do(req)
if err != nil {
observeSendBlocksRequestErr(err, request.BatchSize, t0)
return err
}
defer resp.Body.Close()
observeSendBlocksRequestCode(resp.StatusCode, request.BatchSize, t0)

responseStatus = resp.Status

if resp.StatusCode != http.StatusOK {
Expand Down
82 changes: 82 additions & 0 deletions client/duneapi/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package duneapi

import (
"errors"
"net/url"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var metricSendBlockCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "node_indexer",
Subsystem: "dune_client",
Name: "sent_block_total",
Help: "Total number of blocks sent in requests",
},
[]string{"status"},
)

var metricSendBlockBatchSize = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "node_indexer",
Subsystem: "dune_client",
Name: "block_per_batch",
Help: "Number of blocks per batch",
Buckets: []float64{1, 2, 4, 8, 16, 32, 64, 128, 256},
},
[]string{"status"},
)

var metricSendRequestsCount = promauto.NewCounterVec(

prometheus.CounterOpts{
Namespace: "node_indexer",
Subsystem: "dune_client",
Name: "send_requests_total",
Help: "Number of send requests",
},
[]string{"status"},
)

var metricSendBlockBatchDurationMillis = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "node_indexer",
Subsystem: "dune_client",
Name: "send_block_batch_duration_millis",
Help: "Duration of a send block batch request in milliseconds",
Buckets: []float64{10, 25, 50, 100, 250, 500, 1000, 2000, 4000},
},
[]string{"status"},
)

func observeSendBlocksRequest(status string, numberOfBlocks int, t0 time.Time) {
metricSendBlockCount.WithLabelValues(status).Inc()
metricSendBlockBatchSize.WithLabelValues(status).Observe(float64(numberOfBlocks))
metricSendBlockBatchDurationMillis.WithLabelValues(status).Observe(float64(time.Since(t0).Milliseconds()))
metricSendRequestsCount.WithLabelValues(status).Add(float64(numberOfBlocks))
}

func observeSendBlocksRequestCode(statusCode int, numberOfBlocks int, t0 time.Time) {
observeSendBlocksRequest(strconv.Itoa(statusCode), numberOfBlocks, t0)
}

func observeSendBlocksRequestErr(err error, numberOfBlocks int, t0 time.Time) {
observeSendBlocksRequest(errorToStatus(err), numberOfBlocks, t0)
}

func errorToStatus(err error) string {
status := "unknown_error"
var urlErr *url.Error
if errors.As(err, &urlErr) {
if urlErr.Timeout() {
status = "timeout"
} else {
status = "connection_refused"
}
}
return status
}
5 changes: 5 additions & 0 deletions client/jsonrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,16 @@ func (c *rpcClient) getResponseBody(
req.Header.Set(k, v)
}
}

t0 := time.Now()
resp, err := c.client.Do(req)
if err != nil {
observeRPCRequestErr(err, method, t0)
return fmt.Errorf("failed to send request for method %s: %w", method, err)
}
defer resp.Body.Close()
observeRPCRequestCode(resp.StatusCode, method, t0)

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("response for method %s has status code %d", method, resp.StatusCode)
}
Expand Down
58 changes: 58 additions & 0 deletions client/jsonrpc/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package jsonrpc

import (
"errors"
"net/url"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var rpcRequestCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "node_indexer",
Subsystem: "rpc_client",
Name: "request_total",
Help: "Total number of RPC node requests",
},
[]string{"status", "method"},
)

var rpcRequestDurationMillis = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "node_indexer",
Subsystem: "rpc_client",
Name: "request_duration_millis",
Help: "Duration of RPC node requests in milliseconds",
Buckets: []float64{10, 25, 50, 100, 250, 500, 1000, 2000, 4000},
},
[]string{"status", "method"},
)

func observeRPCRequest(status string, method string, t0 time.Time) {
rpcRequestCount.WithLabelValues(status, method).Inc()
rpcRequestDurationMillis.WithLabelValues(status, method).Observe(float64(time.Since(t0).Milliseconds()))
}

func observeRPCRequestCode(statusCode int, method string, t0 time.Time) {
observeRPCRequest(strconv.Itoa(statusCode), method, t0)
}

func observeRPCRequestErr(err error, method string, t0 time.Time) {
observeRPCRequest(errorToStatus(err), method, t0)
}

func errorToStatus(err error) string {
status := "unknown_error"
var urlErr *url.Error
if errors.As(err, &urlErr) {
if urlErr.Timeout() {
status = "timeout"
} else {
status = "connection_refused"
}
}
return status
}
12 changes: 12 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (
"context"
stdlog "log"
"log/slog"
"net/http"
"os"
"os/signal"
"strings"
stdsync "sync"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/duneanalytics/blockchain-ingester/client/duneapi"
"github.com/duneanalytics/blockchain-ingester/client/jsonrpc"
"github.com/duneanalytics/blockchain-ingester/config"
Expand Down Expand Up @@ -96,6 +99,15 @@ func main() {

ctx, cancel := context.WithCancel(context.Background())

go func() {
http.Handle("/metrics", promhttp.Handler())
err = http.ListenAndServe(":2112", nil)
if err != nil {
cancel()
stdlog.Fatal(err)
}
}()

// Get stored progress unless config indicates we should start from 0
var startBlockNumber int64
// Default to -1 to start where the ingester left off
Expand Down
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,22 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/jessevdk/go-flags v1.5.0
github.com/klauspost/compress v1.17.8
github.com/prometheus/client_golang v1.19.1
github.com/stretchr/testify v1.9.0
golang.org/x/sync v0.7.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
golang.org/x/sys v0.20.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
26 changes: 25 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
Expand All @@ -6,6 +11,8 @@ github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk=
github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k=
Expand All @@ -16,20 +23,37 @@ github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LF
github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE=
github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
3 changes: 2 additions & 1 deletion ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
// The SendBlocks goroutine receives all blocks on an unbuffered channel,
// but buffers them in a map until they can be sent in order.
func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error {
//
registerIngesterMetrics(i)

if i.cfg.DLQOnly {
i.cfg.MaxConcurrentRequests = 0 // if running DLQ Only mode, ignore the MaxConcurrentRequests and set this to 0
} else {
Expand Down
17 changes: 17 additions & 0 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/duneanalytics/blockchain-ingester/lib/dlq"

"github.com/duneanalytics/blockchain-ingester/ingester"
Expand All @@ -22,6 +24,7 @@ import (
)

func TestRunUntilCancel(t *testing.T) {
t.Cleanup(resetDefaultRegistry)
ctx, cancel := context.WithCancel(context.Background())
maxBlockNumber := int64(10)
sentBlockNumber := int64(0)
Expand Down Expand Up @@ -92,6 +95,7 @@ func TestRunUntilCancel(t *testing.T) {
}

func TestProduceBlockNumbers(t *testing.T) {
t.Cleanup(resetDefaultRegistry)
duneapi := &duneapi_mock.BlockchainIngesterMock{
PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error {
return nil
Expand Down Expand Up @@ -136,6 +140,7 @@ func TestProduceBlockNumbers(t *testing.T) {
}

func TestSendBlocks(t *testing.T) {
t.Cleanup(resetDefaultRegistry)
sentBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error {
Expand Down Expand Up @@ -202,6 +207,7 @@ func TestSendBlocks(t *testing.T) {
// TestRunBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order
// even if they are produced out of order. We ensure they are produced out of order by sleeping a random amount of time.
func TestRunBlocksOutOfOrder(t *testing.T) {
t.Cleanup(resetDefaultRegistry)
ctx, cancel := context.WithCancel(context.Background())
maxBlockNumber := int64(1000)
sentBlockNumber := int64(0)
Expand Down Expand Up @@ -274,6 +280,7 @@ func TestRunBlocksOutOfOrder(t *testing.T) {

// TestRunRPCNodeFails shows that we crash if the RPC client fails to fetch a block
func TestRunRPCNodeFails(t *testing.T) {
t.Cleanup(resetDefaultRegistry)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
maxBlockNumber := int64(1000)
Expand Down Expand Up @@ -323,6 +330,7 @@ func TestRunRPCNodeFails(t *testing.T) {

// TestRunRPCNodeFails shows that we crash if the RPC client fails to fetch a block
func TestRunFailsIfNoConcurrentRequests(t *testing.T) {
t.Cleanup(resetDefaultRegistry)
logOutput := io.Discard
ing := ingester.New(
slog.New(slog.NewTextHandler(logOutput, nil)),
Expand All @@ -341,6 +349,7 @@ func TestRunFailsIfNoConcurrentRequests(t *testing.T) {
}

func TestRunFailsIfNoConcurrentRequestsDLQ(t *testing.T) {
t.Cleanup(resetDefaultRegistry)
logOutput := io.Discard
ing := ingester.New(
slog.New(slog.NewTextHandler(logOutput, nil)),
Expand All @@ -360,6 +369,7 @@ func TestRunFailsIfNoConcurrentRequestsDLQ(t *testing.T) {
}

func TestRunWithDLQ(t *testing.T) {
t.Cleanup(resetDefaultRegistry)
ctx, cancel := context.WithCancel(context.Background())
maxBlockNumber := int64(1000)
startBlockNumber := int64(10)
Expand Down Expand Up @@ -523,3 +533,10 @@ func lenSyncMap(m *sync.Map) int {
})
return length
}

// resetDefaultRegistry resets the default Prometheus registry.
func resetDefaultRegistry() {
registry := prometheus.NewRegistry()
prometheus.DefaultRegisterer = registry
prometheus.DefaultGatherer = registry
}
Loading

0 comments on commit dcefb56

Please sign in to comment.