From bdd1aebf72c1bcce0a465826a6397d4d5342513e Mon Sep 17 00:00:00 2001 From: Ankit Kumar Sinha <46916395+a9kitkumarsinha@users.noreply.github.com> Date: Tue, 9 Jun 2020 20:00:46 +0800 Subject: [PATCH] Add stats for error in ingestion and compaction (#29) --- internal/server/server_ingest.go | 5 ++++- internal/storage/compact/compact.go | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/server/server_ingest.go b/internal/server/server_ingest.go index 434ed06e..8ce447de 100644 --- a/internal/server/server_ingest.go +++ b/internal/server/server_ingest.go @@ -12,6 +12,8 @@ import ( talaria "github.com/kelindar/talaria/proto" ) +const ingestErrorKey = "ingest.error" + // Ingest implements ingress.IngressServer func (s *Server) Ingest(ctx context.Context, request *talaria.IngestRequest) (*talaria.IngestResponse, error) { defer s.handlePanic() @@ -22,6 +24,7 @@ func (s *Server) Ingest(ctx context.Context, request *talaria.IngestRequest) (*t schema, _ := timeSeriesTable.Schema() blocks, err := block.FromRequestBy(request, timeSeriesConf.HashBy, &schema, s.computed...) if err != nil { + s.monitor.Count1(ctxTag, ingestErrorKey, "type:convert") return nil, errors.Internal("unable to read the block", err) } @@ -30,12 +33,12 @@ func (s *Server) Ingest(ctx context.Context, request *talaria.IngestRequest) (*t if appender, ok := t.(table.Appender); ok { for _, block := range blocks { if err := appender.Append(block); err != nil { + s.monitor.Count1(ctxTag, ingestErrorKey, "type:append") return nil, err } } } } - s.monitor.Count("server", "ingestCount", int64(len(blocks))) return nil, nil } diff --git a/internal/storage/compact/compact.go b/internal/storage/compact/compact.go index c5fb35d2..3a6b22d7 100644 --- a/internal/storage/compact/compact.go +++ b/internal/storage/compact/compact.go @@ -153,6 +153,7 @@ func (s *Storage) merge(keys []key.Key, blocks []block.Block, schema typeof.Sche // Append to the destination ttl := time.Duration(max-now) * time.Second if err = s.dest.Append(key, value, ttl); err != nil { + s.monitor.Count1(ctxTag, "error", "type:append") s.monitor.Error(err) return } @@ -160,9 +161,9 @@ func (s *Storage) merge(keys []key.Key, blocks []block.Block, schema typeof.Sche // Delete all of the keys that we have appended if err = s.buffer.Delete(keys...); err != nil { + s.monitor.Count1(ctxTag, "error", "type:delete") s.monitor.Error(err) } - s.monitor.Count(ctxTag, "deleteCount", int64(len(keys))) return }) }