Skip to content

Commit

Permalink
Add stats for error in ingestion and compaction (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
a9kitkumarsinha committed Jun 9, 2020
1 parent d233adb commit bdd1aeb
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
5 changes: 4 additions & 1 deletion internal/server/server_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
talaria "github.com/kelindar/talaria/proto"
)

const ingestErrorKey = "ingest.error"

// Ingest implements ingress.IngressServer
func (s *Server) Ingest(ctx context.Context, request *talaria.IngestRequest) (*talaria.IngestResponse, error) {
defer s.handlePanic()
Expand All @@ -22,6 +24,7 @@ func (s *Server) Ingest(ctx context.Context, request *talaria.IngestRequest) (*t
schema, _ := timeSeriesTable.Schema()
blocks, err := block.FromRequestBy(request, timeSeriesConf.HashBy, &schema, s.computed...)
if err != nil {
s.monitor.Count1(ctxTag, ingestErrorKey, "type:convert")
return nil, errors.Internal("unable to read the block", err)
}

Expand All @@ -30,12 +33,12 @@ func (s *Server) Ingest(ctx context.Context, request *talaria.IngestRequest) (*t
if appender, ok := t.(table.Appender); ok {
for _, block := range blocks {
if err := appender.Append(block); err != nil {
s.monitor.Count1(ctxTag, ingestErrorKey, "type:append")
return nil, err
}
}
}
}

s.monitor.Count("server", "ingestCount", int64(len(blocks)))
return nil, nil
}
3 changes: 2 additions & 1 deletion internal/storage/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,17 @@ func (s *Storage) merge(keys []key.Key, blocks []block.Block, schema typeof.Sche
// Append to the destination
ttl := time.Duration(max-now) * time.Second
if err = s.dest.Append(key, value, ttl); err != nil {
s.monitor.Count1(ctxTag, "error", "type:append")
s.monitor.Error(err)
return
}
}

// Delete all of the keys that we have appended
if err = s.buffer.Delete(keys...); err != nil {
s.monitor.Count1(ctxTag, "error", "type:delete")
s.monitor.Error(err)
}
s.monitor.Count(ctxTag, "deleteCount", int64(len(keys)))
return
})
}
Expand Down

0 comments on commit bdd1aeb

Please sign in to comment.