Skip to content

Commit

Permalink
feat(metrics): Add Dgraph txn metrics (commits and discards). (#7339)
Browse files Browse the repository at this point in the history
* feat(metrics): Add metric dgraph_txn_commits_total.

* feat(metrics): Add metric dgraph_txn_discards_total.

* fix: Don't export method or status tags for dgraph_txn metrics.
  • Loading branch information
danielmai authored Jan 25, 2021
1 parent 764803c commit 34a96aa
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 46 deletions.
13 changes: 10 additions & 3 deletions dgraph/cmd/alpha/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,18 @@ func runWithRetriesForResp(method, contentType, url string, body string) (
return qr, respBody, resp, err
}

func commitWithTs(keys, preds []string, ts uint64) error {
func commitWithTs(keys, preds []string, ts uint64, abort bool) error {
url := addr + "/commit"
if ts != 0 {
url += "?startTs=" + strconv.FormatUint(ts, 10)
}
if abort {
if ts != 0 {
url += "&abort=true"
} else {
url += "?abort=true"
}
}

m := make(map[string]interface{})
m["keys"] = keys
Expand Down Expand Up @@ -410,7 +417,7 @@ func TestTransactionBasic(t *testing.T) {
require.Equal(t, `{"data":{"balances":[{"name":"Bob","balance":"110"}]}}`, data)

// Commit and query.
require.NoError(t, commitWithTs(mr.keys, mr.preds, ts))
require.NoError(t, commitWithTs(mr.keys, mr.preds, ts, false))
data, _, err = queryWithTs(q1, "application/dql", "", 0)
require.NoError(t, err)
require.Equal(t, `{"data":{"balances":[{"name":"Bob","balance":"110"}]}}`, data)
Expand Down Expand Up @@ -456,7 +463,7 @@ func TestTransactionBasicNoPreds(t *testing.T) {
require.Equal(t, `{"data":{"balances":[{"name":"Bob","balance":"110"}]}}`, data)

// Commit and query.
require.NoError(t, commitWithTs(mr.keys, nil, ts))
require.NoError(t, commitWithTs(mr.keys, nil, ts, false))
data, _, err = queryWithTs(q1, "application/dql", "", 0)
require.NoError(t, err)
require.Equal(t, `{"data":{"balances":[{"name":"Bob","balance":"110"}]}}`, data)
Expand Down
125 changes: 102 additions & 23 deletions dgraph/cmd/alpha/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,62 @@ import (
"github.com/stretchr/testify/require"
)

func TestMetricTxnCommits(t *testing.T) {
metricName := "dgraph_txn_commits_total"
mt := `
{
set {
<0x71> <name> "Bob" .
}
}
`

// first normal commit
mr, err := mutationWithTs(mt, "application/rdf", false, false, 0)
require.NoError(t, err)
require.NoError(t, commitWithTs(mr.keys, mr.preds, mr.startTs, false))

metrics := fetchMetrics(t, metricName)

// second normal commit
mr, err = mutationWithTs(mt, "application/rdf", false, false, 0)
require.NoError(t, err)
require.NoError(t, commitWithTs(mr.keys, mr.preds, mr.startTs, false))

require.NoError(t, retryableFetchMetrics(t, map[string]int{
metricName: metrics[metricName] + 1,
}))
}

func TestMetricTxnDiscards(t *testing.T) {
metricName := "dgraph_txn_discards_total"
mt := `
{
set {
<0x71> <name> "Bob" .
}
}
`

// first normal commit
mr, err := mutationWithTs(mt, "application/rdf", false, false, 0)
require.NoError(t, err)
require.NoError(t, commitWithTs(mr.keys, mr.preds, mr.startTs, false))

metrics := fetchMetrics(t, metricName)

// second commit discarded
mr, err = mutationWithTs(mt, "application/rdf", false, false, 0)
require.NoError(t, err)
require.NoError(t, commitWithTs(mr.keys, mr.preds, mr.startTs, true))

require.NoError(t, retryableFetchMetrics(t, map[string]int{
metricName: metrics[metricName] + 1,
}))
}

func TestMetricTxnAborts(t *testing.T) {
metricName := "dgraph_txn_aborts_total"
mt := `
{
set {
Expand All @@ -37,56 +92,77 @@ func TestMetricTxnAborts(t *testing.T) {
}
`

// Create initial 'dgraph_txn_aborts_total' metric
mr1, err := mutationWithTs(mt, "application/rdf", false, false, 0)
require.NoError(t, err)
mr2, err := mutationWithTs(mt, "application/rdf", false, false, 0)
require.NoError(t, err)
require.NoError(t, commitWithTs(mr1.keys, mr1.preds, mr1.startTs))
require.Error(t, commitWithTs(mr2.keys, mr2.preds, mr2.startTs))
require.NoError(t, commitWithTs(mr1.keys, mr1.preds, mr1.startTs, false))
require.Error(t, commitWithTs(mr2.keys, mr2.preds, mr2.startTs, false))

// Fetch Metrics
txnAbort1 := fetchMetric(t)
metrics := fetchMetrics(t, metricName)

// Create second 'dgraph_txn_aborts_total' metric
mr1, err = mutationWithTs(mt, "application/rdf", false, false, 0)
require.NoError(t, err)
mr2, err = mutationWithTs(mt, "application/rdf", false, false, 0)
require.NoError(t, err)
require.NoError(t, commitWithTs(mr1.keys, mr1.preds, mr1.startTs))
require.Error(t, commitWithTs(mr2.keys, mr2.preds, mr2.startTs))
require.NoError(t, commitWithTs(mr1.keys, mr1.preds, mr1.startTs, false))
require.Error(t, commitWithTs(mr2.keys, mr2.preds, mr2.startTs, false))

// Fetch and check updated metrics
require.NoError(t, retryableFetchMetrics(t, txnAbort1+1))
require.NoError(t, retryableFetchMetrics(t, map[string]int{
metricName: metrics[metricName] + 1,
}))
}

func retryableFetchMetrics(t *testing.T, expected int) error {
var txnMetric int
func retryableFetchMetrics(t *testing.T, expected map[string]int) error {
metricList := make([]string, 0)
for metric := range expected {
metricList = append(metricList, metric)
}

for i := 0; i < 10; i++ {
txnMetric = fetchMetric(t)
if expected == txnMetric {
metrics := fetchMetrics(t, metricList...)
found := 0
for expMetric, expCount := range expected {
count, ok := metrics[expMetric]
if !ok {
return fmt.Errorf("expected metric '%s' was not found", expMetric)
}
if count != expCount {
return fmt.Errorf("expected metric '%s' count was %d instead of %d",
expMetric, count, expCount)
}
found++
}
if found == len(metricList) {
return nil
}
time.Sleep(2 * time.Second)
time.Sleep(time.Second * 2)
}

return fmt.Errorf("txnAbort was not incremented. wanted %d, Got %d", expected,
txnMetric)
return fmt.Errorf("metrics were not found")
}

func fetchMetric(t *testing.T) int {
requiredMetric := "dgraph_txn_aborts_total"
func fetchMetrics(t *testing.T, metrics ...string) map[string]int {
req, err := http.NewRequest("GET", addr+"/debug/prometheus_metrics", nil)
require.NoError(t, err)

_, body, _, err := runRequest(req)
require.NoError(t, err)

metricsMap, err := extractMetrics(string(body))
require.NoError(t, err)

txnAbort, ok := metricsMap[requiredMetric]
require.True(t, ok, "the required metric '%s' is not found", requiredMetric)
m, _ := strconv.Atoi(txnAbort.(string))
return m
countMap := make(map[string]int)
for _, metric := range metrics {
if count, ok := metricsMap[metric]; ok {
n, err := strconv.Atoi(count.(string))
require.NoError(t, err)
countMap[metric] = n
} else {
t.Fatalf("the required metric '%s' was not found", metric)
}
}
return countMap
}

func TestMetrics(t *testing.T) {
Expand All @@ -108,6 +184,9 @@ func TestMetrics(t *testing.T) {
"badger_v3_memtable_gets_total", "badger_v3_puts_total", "badger_v3_read_bytes",
"badger_v3_written_bytes",

// Transaction Metrics
"dgraph_txn_aborts_total", "dgraph_txn_commits_total", "dgraph_txn_discards_total",

// Dgraph Memory Metrics
"dgraph_memory_idle_bytes", "dgraph_memory_inuse_bytes", "dgraph_memory_proc_bytes",
"dgraph_memory_alloc_bytes",
Expand Down
16 changes: 11 additions & 5 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func checkSchema(s *pb.SchemaUpdate) error {
return errors.Errorf("Nil schema")
}

if len(s.Predicate) == 0 {
if s.Predicate == "" {
return errors.Errorf("No predicate specified in schema mutation")
}

Expand Down Expand Up @@ -687,7 +687,7 @@ func verifyTypes(ctx context.Context, m *pb.Mutations) error {
// Create a set of all the predicates already present in the schema.
var fields []string
for _, t := range m.Types {
if len(t.TypeName) == 0 {
if t.TypeName == "" {
return errors.Errorf("Type name must be specified in type update")
}

Expand Down Expand Up @@ -742,11 +742,11 @@ func verifyTypes(ctx context.Context, m *pb.Mutations) error {
// typeSanityCheck performs basic sanity checks on the given type update.
func typeSanityCheck(t *pb.TypeUpdate) error {
for _, field := range t.Fields {
if len(field.Predicate) == 0 {
if field.Predicate == "" {
return errors.Errorf("Field in type definition must have a name")
}

if field.ValueType == pb.Posting_OBJECT && len(field.ObjectTypeName) == 0 {
if field.ValueType == pb.Posting_OBJECT && field.ObjectTypeName == "" {
return errors.Errorf(
"Field with value type OBJECT must specify the name of the object type")
}
Expand All @@ -767,6 +767,10 @@ func typeSanityCheck(t *pb.TypeUpdate) error {
func CommitOverNetwork(ctx context.Context, tc *api.TxnContext) (uint64, error) {
ctx, span := otrace.StartSpan(ctx, "worker.CommitOverNetwork")
defer span.End()
if tc.Aborted {
// The client called Discard
ostats.Record(ctx, x.TxnDiscards.M(1))
}

pl := groups().Leader(0)
if pl == nil {
Expand All @@ -785,9 +789,11 @@ func CommitOverNetwork(ctx context.Context, tc *api.TxnContext) (uint64, error)
span.Annotate(attributes, "")

if tctx.Aborted || tctx.CommitTs == 0 {
ostats.Record(context.Background(), x.TxnAborts.M(1))
// The server aborted the txn (not the client)
ostats.Record(ctx, x.TxnAborts.M(1))
return 0, dgo.ErrAborted
}
ostats.Record(ctx, x.TxnCommits.M(1))
return tctx.CommitTs, nil
}

Expand Down
51 changes: 36 additions & 15 deletions x/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ var (
// MaxAssignedTs records the latest max assigned timestamp.
MaxAssignedTs = stats.Int64("max_assigned_ts",
"Latest max assigned timestamp", stats.UnitDimensionless)
// TxnCommits records count of committed transactions.
TxnCommits = stats.Int64("txn_commits_total",
"Number of transaction commits", stats.UnitDimensionless)
// TxnDiscards records count of discarded transactions.
TxnDiscards = stats.Int64("txn_discards_total",
"Number of transaction discards", stats.UnitDimensionless)
// TxnAborts records count of aborted transactions.
TxnAborts = stats.Int64("txn_aborts_total",
"Number of transaction aborts", stats.UnitDimensionless)
Expand Down Expand Up @@ -177,12 +183,33 @@ var (
Aggregation: view.Count(),
TagKeys: allTagKeys,
},
{
Name: TxnCommits.Name(),
Measure: TxnCommits,
Description: TxnCommits.Description(),
Aggregation: view.Count(),
TagKeys: nil,
},
{
Name: TxnDiscards.Name(),
Measure: TxnDiscards,
Description: TxnDiscards.Description(),
Aggregation: view.Count(),
TagKeys: nil,
},
{
Name: TxnAborts.Name(),
Measure: TxnAborts,
Description: TxnAborts.Description(),
Aggregation: view.Count(),
TagKeys: allTagKeys,
TagKeys: nil,
},
{
Name: ActiveMutations.Name(),
Measure: ActiveMutations,
Description: ActiveMutations.Description(),
Aggregation: view.Sum(),
TagKeys: nil,
},

// Last value aggregations
Expand Down Expand Up @@ -228,13 +255,6 @@ var (
Aggregation: view.LastValue(),
TagKeys: allTagKeys,
},
{
Name: ActiveMutations.Name(),
Measure: ActiveMutations,
Description: ActiveMutations.Description(),
Aggregation: view.Sum(),
TagKeys: nil,
},
{
Name: AlphaHealth.Name(),
Measure: AlphaHealth,
Expand Down Expand Up @@ -263,6 +283,14 @@ var (
Aggregation: view.LastValue(),
TagKeys: allTagKeys,
},
{
Name: MaxAssignedTs.Name(),
Measure: MaxAssignedTs,
Description: MaxAssignedTs.Description(),
Aggregation: view.LastValue(),
TagKeys: allTagKeys,
},
// Raft metrics
{
Name: RaftAppliedIndex.Name(),
Measure: RaftAppliedIndex,
Expand Down Expand Up @@ -305,13 +333,6 @@ var (
Aggregation: view.Count(),
TagKeys: allRaftKeys,
},
{
Name: MaxAssignedTs.Name(),
Measure: MaxAssignedTs,
Description: MaxAssignedTs.Description(),
Aggregation: view.LastValue(),
TagKeys: allTagKeys,
},
}
)

Expand Down

0 comments on commit 34a96aa

Please sign in to comment.