Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Add Dgraph txn metrics (commits and discards). #7339

Merged
merged 13 commits into from
Jan 25, 2021
13 changes: 10 additions & 3 deletions dgraph/cmd/alpha/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,18 @@ func runWithRetriesForResp(method, contentType, url string, body string) (
return qr, respBody, resp, err
}

func commitWithTs(keys, preds []string, ts uint64) error {
func commitWithTs(keys, preds []string, ts uint64, abort bool) error {
url := addr + "/commit"
if ts != 0 {
url += "?startTs=" + strconv.FormatUint(ts, 10)
}
if abort {
if ts != 0 {
url += "&abort=true"
} else {
url += "?abort=true"
}
}

m := make(map[string]interface{})
m["keys"] = keys
Expand Down Expand Up @@ -410,7 +417,7 @@ func TestTransactionBasic(t *testing.T) {
require.Equal(t, `{"data":{"balances":[{"name":"Bob","balance":"110"}]}}`, data)

// Commit and query.
require.NoError(t, commitWithTs(mr.keys, mr.preds, ts))
require.NoError(t, commitWithTs(mr.keys, mr.preds, ts, false))
data, _, err = queryWithTs(q1, "application/dql", "", 0)
require.NoError(t, err)
require.Equal(t, `{"data":{"balances":[{"name":"Bob","balance":"110"}]}}`, data)
Expand Down Expand Up @@ -456,7 +463,7 @@ func TestTransactionBasicNoPreds(t *testing.T) {
require.Equal(t, `{"data":{"balances":[{"name":"Bob","balance":"110"}]}}`, data)

// Commit and query.
require.NoError(t, commitWithTs(mr.keys, nil, ts))
require.NoError(t, commitWithTs(mr.keys, nil, ts, false))
data, _, err = queryWithTs(q1, "application/dql", "", 0)
require.NoError(t, err)
require.Equal(t, `{"data":{"balances":[{"name":"Bob","balance":"110"}]}}`, data)
Expand Down
125 changes: 102 additions & 23 deletions dgraph/cmd/alpha/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,62 @@ import (
"github.com/stretchr/testify/require"
)

func TestMetricTxnCommits(t *testing.T) {
metricName := "dgraph_txn_commits_total"
mt := `
{
set {
<0x71> <name> "Bob" .
}
}
`

// first normal commit
mr, err := mutationWithTs(mt, "application/rdf", false, false, 0)
require.NoError(t, err)
require.NoError(t, commitWithTs(mr.keys, mr.preds, mr.startTs, false))

metrics := fetchMetrics(t, metricName)

// second normal commit
mr, err = mutationWithTs(mt, "application/rdf", false, false, 0)
require.NoError(t, err)
require.NoError(t, commitWithTs(mr.keys, mr.preds, mr.startTs, false))

require.NoError(t, retryableFetchMetrics(t, map[string]int{
metricName: metrics[metricName] + 1,
}))
}

func TestMetricTxnDiscards(t *testing.T) {
metricName := "dgraph_txn_discards_total"
mt := `
{
set {
<0x71> <name> "Bob" .
}
}
`

// first normal commit
mr, err := mutationWithTs(mt, "application/rdf", false, false, 0)
require.NoError(t, err)
require.NoError(t, commitWithTs(mr.keys, mr.preds, mr.startTs, false))

metrics := fetchMetrics(t, metricName)

// second commit discarded
mr, err = mutationWithTs(mt, "application/rdf", false, false, 0)
require.NoError(t, err)
require.NoError(t, commitWithTs(mr.keys, mr.preds, mr.startTs, true))

require.NoError(t, retryableFetchMetrics(t, map[string]int{
metricName: metrics[metricName] + 1,
}))
}

func TestMetricTxnAborts(t *testing.T) {
metricName := "dgraph_txn_aborts_total"
mt := `
{
set {
Expand All @@ -37,56 +92,77 @@ func TestMetricTxnAborts(t *testing.T) {
}
`

// Create initial 'dgraph_txn_aborts_total' metric
mr1, err := mutationWithTs(mt, "application/rdf", false, false, 0)
require.NoError(t, err)
mr2, err := mutationWithTs(mt, "application/rdf", false, false, 0)
require.NoError(t, err)
require.NoError(t, commitWithTs(mr1.keys, mr1.preds, mr1.startTs))
require.Error(t, commitWithTs(mr2.keys, mr2.preds, mr2.startTs))
require.NoError(t, commitWithTs(mr1.keys, mr1.preds, mr1.startTs, false))
require.Error(t, commitWithTs(mr2.keys, mr2.preds, mr2.startTs, false))

// Fetch Metrics
txnAbort1 := fetchMetric(t)
metrics := fetchMetrics(t, metricName)

// Create second 'dgraph_txn_aborts_total' metric
mr1, err = mutationWithTs(mt, "application/rdf", false, false, 0)
require.NoError(t, err)
mr2, err = mutationWithTs(mt, "application/rdf", false, false, 0)
require.NoError(t, err)
require.NoError(t, commitWithTs(mr1.keys, mr1.preds, mr1.startTs))
require.Error(t, commitWithTs(mr2.keys, mr2.preds, mr2.startTs))
require.NoError(t, commitWithTs(mr1.keys, mr1.preds, mr1.startTs, false))
require.Error(t, commitWithTs(mr2.keys, mr2.preds, mr2.startTs, false))

// Fetch and check updated metrics
require.NoError(t, retryableFetchMetrics(t, txnAbort1+1))
require.NoError(t, retryableFetchMetrics(t, map[string]int{
metricName: metrics[metricName] + 1,
}))
}

func retryableFetchMetrics(t *testing.T, expected int) error {
var txnMetric int
func retryableFetchMetrics(t *testing.T, expected map[string]int) error {
metricList := make([]string, 0)
for metric := range expected {
metricList = append(metricList, metric)
}

for i := 0; i < 10; i++ {
txnMetric = fetchMetric(t)
if expected == txnMetric {
metrics := fetchMetrics(t, metricList...)
found := 0
for expMetric, expCount := range expected {
count, ok := metrics[expMetric]
if !ok {
return fmt.Errorf("expected metric '%s' was not found", expMetric)
}
if count != expCount {
return fmt.Errorf("expected metric '%s' count was %d instead of %d",
expMetric, count, expCount)
}
found++
}
if found == len(metricList) {
return nil
}
time.Sleep(2 * time.Second)
time.Sleep(time.Second * 2)
}

return fmt.Errorf("txnAbort was not incremented. wanted %d, Got %d", expected,
txnMetric)
return fmt.Errorf("metrics were not found")
}

func fetchMetric(t *testing.T) int {
requiredMetric := "dgraph_txn_aborts_total"
func fetchMetrics(t *testing.T, metrics ...string) map[string]int {
req, err := http.NewRequest("GET", addr+"/debug/prometheus_metrics", nil)
require.NoError(t, err)

_, body, _, err := runRequest(req)
require.NoError(t, err)

metricsMap, err := extractMetrics(string(body))
require.NoError(t, err)

txnAbort, ok := metricsMap[requiredMetric]
require.True(t, ok, "the required metric '%s' is not found", requiredMetric)
m, _ := strconv.Atoi(txnAbort.(string))
return m
countMap := make(map[string]int)
for _, metric := range metrics {
if count, ok := metricsMap[metric]; ok {
n, err := strconv.Atoi(count.(string))
require.NoError(t, err)
countMap[metric] = n
} else {
t.Fatalf("the required metric '%s' was not found", metric)
}
}
return countMap
}

func TestMetrics(t *testing.T) {
Expand All @@ -108,6 +184,9 @@ func TestMetrics(t *testing.T) {
"badger_v3_memtable_gets_total", "badger_v3_puts_total", "badger_v3_read_bytes",
"badger_v3_written_bytes",

// Transaction Metrics
"dgraph_txn_aborts_total", "dgraph_txn_commits_total", "dgraph_txn_discards_total",

// Dgraph Memory Metrics
"dgraph_memory_idle_bytes", "dgraph_memory_inuse_bytes", "dgraph_memory_proc_bytes",
"dgraph_memory_alloc_bytes",
Expand Down
1 change: 1 addition & 0 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1767,6 +1767,7 @@ func (n *node) AmLeader() bool {

func (n *node) monitorRaftMetrics() {
ticker := time.NewTicker(5 * time.Second)

defer ticker.Stop()
for range ticker.C {
curPendingSize := atomic.LoadInt64(&n.pendingSize)
Expand Down
16 changes: 11 additions & 5 deletions worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func checkSchema(s *pb.SchemaUpdate) error {
return errors.Errorf("Nil schema")
}

if len(s.Predicate) == 0 {
if s.Predicate == "" {
return errors.Errorf("No predicate specified in schema mutation")
}

Expand Down Expand Up @@ -687,7 +687,7 @@ func verifyTypes(ctx context.Context, m *pb.Mutations) error {
// Create a set of all the predicates already present in the schema.
var fields []string
for _, t := range m.Types {
if len(t.TypeName) == 0 {
if t.TypeName == "" {
return errors.Errorf("Type name must be specified in type update")
}

Expand Down Expand Up @@ -742,11 +742,11 @@ func verifyTypes(ctx context.Context, m *pb.Mutations) error {
// typeSanityCheck performs basic sanity checks on the given type update.
func typeSanityCheck(t *pb.TypeUpdate) error {
for _, field := range t.Fields {
if len(field.Predicate) == 0 {
if field.Predicate == "" {
return errors.Errorf("Field in type definition must have a name")
}

if field.ValueType == pb.Posting_OBJECT && len(field.ObjectTypeName) == 0 {
if field.ValueType == pb.Posting_OBJECT && field.ObjectTypeName == "" {
return errors.Errorf(
"Field with value type OBJECT must specify the name of the object type")
}
Expand All @@ -767,6 +767,10 @@ func typeSanityCheck(t *pb.TypeUpdate) error {
func CommitOverNetwork(ctx context.Context, tc *api.TxnContext) (uint64, error) {
ctx, span := otrace.StartSpan(ctx, "worker.CommitOverNetwork")
defer span.End()
if tc.Aborted {
// The client called Discard
ostats.Record(ctx, x.TxnDiscards.M(1))
}

pl := groups().Leader(0)
if pl == nil {
Expand All @@ -785,9 +789,11 @@ func CommitOverNetwork(ctx context.Context, tc *api.TxnContext) (uint64, error)
span.Annotate(attributes, "")

if tctx.Aborted || tctx.CommitTs == 0 {
ostats.Record(context.Background(), x.TxnAborts.M(1))
// The server aborted the txn (not the client)
ostats.Record(ctx, x.TxnAborts.M(1))
return 0, dgo.ErrAborted
}
ostats.Record(ctx, x.TxnCommits.M(1))
return tctx.CommitTs, nil
}

Expand Down
51 changes: 36 additions & 15 deletions x/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ var (
// MaxAssignedTs records the latest max assigned timestamp.
MaxAssignedTs = stats.Int64("max_assigned_ts",
"Latest max assigned timestamp", stats.UnitDimensionless)
// TxnCommits records count of committed transactions.
TxnCommits = stats.Int64("txn_commits_total",
"Number of transaction commits", stats.UnitDimensionless)
// TxnDiscards records count of discarded transactions.
TxnDiscards = stats.Int64("txn_discards_total",
"Number of transaction discards", stats.UnitDimensionless)
// TxnAborts records count of aborted transactions.
TxnAborts = stats.Int64("txn_aborts_total",
"Number of transaction aborts", stats.UnitDimensionless)
Expand Down Expand Up @@ -177,12 +183,33 @@ var (
Aggregation: view.Count(),
TagKeys: allTagKeys,
},
{
Name: TxnCommits.Name(),
Measure: TxnCommits,
Description: TxnCommits.Description(),
Aggregation: view.Count(),
TagKeys: nil,
},
{
Name: TxnDiscards.Name(),
Measure: TxnDiscards,
Description: TxnDiscards.Description(),
Aggregation: view.Count(),
TagKeys: nil,
},
{
Name: TxnAborts.Name(),
Measure: TxnAborts,
Description: TxnAborts.Description(),
Aggregation: view.Count(),
TagKeys: allTagKeys,
TagKeys: nil,
},
{
Name: ActiveMutations.Name(),
Measure: ActiveMutations,
Description: ActiveMutations.Description(),
Aggregation: view.Sum(),
TagKeys: nil,
},

// Last value aggregations
Expand Down Expand Up @@ -228,13 +255,6 @@ var (
Aggregation: view.LastValue(),
TagKeys: allTagKeys,
},
{
Name: ActiveMutations.Name(),
Measure: ActiveMutations,
Description: ActiveMutations.Description(),
Aggregation: view.Sum(),
TagKeys: nil,
},
{
Name: AlphaHealth.Name(),
Measure: AlphaHealth,
Expand Down Expand Up @@ -263,6 +283,14 @@ var (
Aggregation: view.LastValue(),
TagKeys: allTagKeys,
},
{
Name: MaxAssignedTs.Name(),
Measure: MaxAssignedTs,
Description: MaxAssignedTs.Description(),
Aggregation: view.LastValue(),
TagKeys: allTagKeys,
},
// Raft metrics
{
Name: RaftAppliedIndex.Name(),
Measure: RaftAppliedIndex,
Expand Down Expand Up @@ -305,13 +333,6 @@ var (
Aggregation: view.Count(),
TagKeys: allRaftKeys,
},
{
Name: MaxAssignedTs.Name(),
Measure: MaxAssignedTs,
Description: MaxAssignedTs.Description(),
Aggregation: view.LastValue(),
TagKeys: allTagKeys,
},
}
)

Expand Down