diff --git a/go/vt/topo/server.go b/go/vt/topo/server.go index 3f6fc48a605..6c4a07a2483 100644 --- a/go/vt/topo/server.go +++ b/go/vt/topo/server.go @@ -48,6 +48,7 @@ import ( "sync" "golang.org/x/net/context" + "vitess.io/vitess/go/vt/log" ) @@ -175,6 +176,7 @@ func NewWithFactory(factory Factory, serverAddress, root string) (*Server, error if err != nil { return nil, err } + conn = NewStatsConn(GlobalCell, conn) var connReadOnly Conn if factory.HasGlobalReadOnlyCell(serverAddress, root) { @@ -182,6 +184,7 @@ func NewWithFactory(factory Factory, serverAddress, root string) (*Server, error if err != nil { return nil, err } + connReadOnly = NewStatsConn(GlobalReadOnlyCell, connReadOnly) } else { connReadOnly = conn } @@ -257,6 +260,7 @@ func (ts *Server) ConnForCell(ctx context.Context, cell string) (Conn, error) { if err != nil { return nil, fmt.Errorf("failed to create topo connection to %v, %v: %v", ci.ServerAddress, ci.Root, err) } + conn = NewStatsConn(cell, conn) ts.cells[cell] = conn return conn, nil } diff --git a/go/vt/topo/stats_conn.go b/go/vt/topo/stats_conn.go new file mode 100644 index 00000000000..dadef0e1861 --- /dev/null +++ b/go/vt/topo/stats_conn.go @@ -0,0 +1,157 @@ +/* +Copyright 2018 The Vitess Authors + Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topo + +import ( + "time" + + "golang.org/x/net/context" + + "vitess.io/vitess/go/stats" +) + +var _ Conn = (*StatsConn)(nil) + +var ( + topoStatsConnTimings = stats.NewMultiTimings( + "TopologyConnOperations", + "TopologyConnOperations timings", + []string{"Operation", "Cell"}) + + topoStatsConnErrors = stats.NewCountersWithMultiLabels( + "TopologyConnErrors", + "TopologyConnErrors errors per operation", + []string{"Operation", "Cell"}) +) + +// The StatsConn is a wrapper for a Conn that emits stats for every operation +type StatsConn struct { + cell string + conn Conn +} + +// NewStatsConn returns a StatsConn +func NewStatsConn(cell string, conn Conn) *StatsConn { + return &StatsConn{ + cell: cell, + conn: conn, + } +} + +// ListDir is part of the Conn interface +func (st *StatsConn) ListDir(ctx context.Context, dirPath string, full bool) ([]DirEntry, error) { + startTime := time.Now() + statsKey := []string{"ListDir", st.cell} + defer topoStatsConnTimings.Record(statsKey, startTime) + res, err := st.conn.ListDir(ctx, dirPath, full) + if err != nil { + topoStatsConnErrors.Add(statsKey, int64(1)) + return res, err + } + return res, err +} + +// Create is part of the Conn interface +func (st *StatsConn) Create(ctx context.Context, filePath string, contents []byte) (Version, error) { + startTime := time.Now() + statsKey := []string{"Create", st.cell} + defer topoStatsConnTimings.Record(statsKey, startTime) + res, err := st.conn.Create(ctx, filePath, contents) + if err != nil { + topoStatsConnErrors.Add(statsKey, int64(1)) + return res, err + } + return res, err +} + +// Update is part of the Conn interface +func (st *StatsConn) Update(ctx context.Context, filePath string, contents []byte, version Version) (Version, error) { + startTime := time.Now() + statsKey := []string{"Update", st.cell} + defer topoStatsConnTimings.Record(statsKey, startTime) + res, err := st.conn.Update(ctx, filePath, contents, version) + if err != nil { + topoStatsConnErrors.Add(statsKey, int64(1)) + return res, err + } + return res, err +} + +// Get is part of the Conn interface +func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version, error) { + startTime := time.Now() + statsKey := []string{"Get", st.cell} + defer topoStatsConnTimings.Record(statsKey, startTime) + bytes, version, err := st.conn.Get(ctx, filePath) + if err != nil { + topoStatsConnErrors.Add(statsKey, int64(1)) + return bytes, version, err + } + return bytes, version, err +} + +// Delete is part of the Conn interface +func (st *StatsConn) Delete(ctx context.Context, filePath string, version Version) error { + startTime := time.Now() + statsKey := []string{"Delete", st.cell} + defer topoStatsConnTimings.Record(statsKey, startTime) + err := st.conn.Delete(ctx, filePath, version) + if err != nil { + topoStatsConnErrors.Add(statsKey, int64(1)) + return err + } + return err +} + +// Lock is part of the Conn interface +func (st *StatsConn) Lock(ctx context.Context, dirPath, contents string) (LockDescriptor, error) { + startTime := time.Now() + statsKey := []string{"Lock", st.cell} + defer topoStatsConnTimings.Record(statsKey, startTime) + res, err := st.conn.Lock(ctx, dirPath, contents) + if err != nil { + topoStatsConnErrors.Add(statsKey, int64(1)) + return res, err + } + return res, err +} + +// Watch is part of the Conn interface +func (st *StatsConn) Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, cancel CancelFunc) { + startTime := time.Now() + statsKey := []string{"Watch", st.cell} + defer topoStatsConnTimings.Record(statsKey, startTime) + return st.conn.Watch(ctx, filePath) +} + +// NewMasterParticipation is part of the Conn interface +func (st *StatsConn) NewMasterParticipation(name, id string) (MasterParticipation, error) { + startTime := time.Now() + statsKey := []string{"NewMasterParticipation", st.cell} + defer topoStatsConnTimings.Record(statsKey, startTime) + res, err := st.conn.NewMasterParticipation(name, id) + if err != nil { + topoStatsConnErrors.Add(statsKey, int64(1)) + return res, err + } + return res, err +} + +// Close is part of the Conn interface +func (st *StatsConn) Close() { + startTime := time.Now() + statsKey := []string{"Close", st.cell} + defer topoStatsConnTimings.Record(statsKey, startTime) + st.conn.Close() +} diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go new file mode 100644 index 00000000000..6b7f57c5559 --- /dev/null +++ b/go/vt/topo/stats_conn_test.go @@ -0,0 +1,311 @@ +/* +Copyright 2018 The Vitess Authors + Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topo + +import ( + "fmt" + "testing" + + "golang.org/x/net/context" +) + +// The fakeConn is a wrapper for a Conn that emits stats for every operation +type fakeConn struct { + v Version +} + +// ListDir is part of the Conn interface +func (st *fakeConn) ListDir(ctx context.Context, dirPath string, full bool) (res []DirEntry, err error) { + if dirPath == "error" { + return res, fmt.Errorf("Dummy error") + + } + return res, err +} + +// Create is part of the Conn interface +func (st *fakeConn) Create(ctx context.Context, filePath string, contents []byte) (ver Version, err error) { + if filePath == "error" { + return ver, fmt.Errorf("Dummy error") + + } + return ver, err +} + +// Update is part of the Conn interface +func (st *fakeConn) Update(ctx context.Context, filePath string, contents []byte, version Version) (ver Version, err error) { + if filePath == "error" { + return ver, fmt.Errorf("Dummy error") + + } + return ver, err +} + +// Get is part of the Conn interface +func (st *fakeConn) Get(ctx context.Context, filePath string) (bytes []byte, ver Version, err error) { + if filePath == "error" { + return bytes, ver, fmt.Errorf("Dummy error") + + } + return bytes, ver, err +} + +// Delete is part of the Conn interface +func (st *fakeConn) Delete(ctx context.Context, filePath string, version Version) (err error) { + if filePath == "error" { + return fmt.Errorf("Dummy error") + } + return err +} + +// Lock is part of the Conn interface +func (st *fakeConn) Lock(ctx context.Context, dirPath, contents string) (lock LockDescriptor, err error) { + if dirPath == "error" { + return lock, fmt.Errorf("Dummy error") + + } + return lock, err +} + +// Watch is part of the Conn interface +func (st *fakeConn) Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, cancel CancelFunc) { + return current, changes, cancel +} + +// NewMasterParticipation is part of the Conn interface +func (st *fakeConn) NewMasterParticipation(name, id string) (mp MasterParticipation, err error) { + if name == "error" { + return mp, fmt.Errorf("Dummy error") + + } + return mp, err +} + +// Close is part of the Conn interface +func (st *fakeConn) Close() { +} + +//TestStatsConnTopoListDir emits stats on ListDir +func TestStatsConnTopoListDir(t *testing.T) { + conn := &fakeConn{} + statsConn := NewStatsConn("global", conn) + ctx := context.Background() + + statsConn.ListDir(ctx, "", true) + timingCounts := topoStatsConnTimings.Counts()["ListDir.global"] + if got, want := timingCounts, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } + + // error is zero before getting an error + errorCount := topoStatsConnErrors.Counts()["ListDir.global"] + if got, want := errorCount, int64(0); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } + + statsConn.ListDir(ctx, "error", true) + + // error stats gets emitted + errorCount = topoStatsConnErrors.Counts()["ListDir.global"] + if got, want := errorCount, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } +} + +//TestStatsConnTopoCreate emits stats on Create +func TestStatsConnTopoCreate(t *testing.T) { + conn := &fakeConn{} + statsConn := NewStatsConn("global", conn) + ctx := context.Background() + + statsConn.Create(ctx, "", []byte{}) + timingCounts := topoStatsConnTimings.Counts()["Create.global"] + if got, want := timingCounts, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } + + // error is zero before getting an error + errorCount := topoStatsConnErrors.Counts()["Create.global"] + if got, want := errorCount, int64(0); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } + + statsConn.Create(ctx, "error", []byte{}) + + // error stats gets emitted + errorCount = topoStatsConnErrors.Counts()["Create.global"] + if got, want := errorCount, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } +} + +//TestStatsConnTopoUpdate emits stats on Update +func TestStatsConnTopoUpdate(t *testing.T) { + conn := &fakeConn{} + statsConn := NewStatsConn("global", conn) + ctx := context.Background() + + statsConn.Update(ctx, "", []byte{}, conn.v) + timingCounts := topoStatsConnTimings.Counts()["Update.global"] + if got, want := timingCounts, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } + + // error is zero before getting an error + errorCount := topoStatsConnErrors.Counts()["Update.global"] + if got, want := errorCount, int64(0); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } + + statsConn.Update(ctx, "error", []byte{}, conn.v) + + // error stats gets emitted + errorCount = topoStatsConnErrors.Counts()["Update.global"] + if got, want := errorCount, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } +} + +//TestStatsConnTopoGet emits stats on Get +func TestStatsConnTopoGet(t *testing.T) { + conn := &fakeConn{} + statsConn := NewStatsConn("global", conn) + ctx := context.Background() + + statsConn.Get(ctx, "") + timingCounts := topoStatsConnTimings.Counts()["Get.global"] + if got, want := timingCounts, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } + + // error is zero before getting an error + errorCount := topoStatsConnErrors.Counts()["Get.global"] + if got, want := errorCount, int64(0); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } + + statsConn.Get(ctx, "error") + + // error stats gets emitted + errorCount = topoStatsConnErrors.Counts()["Get.global"] + if got, want := errorCount, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } +} + +//TestStatsConnTopoDelete emits stats on Delete +func TestStatsConnTopoDelete(t *testing.T) { + conn := &fakeConn{} + statsConn := NewStatsConn("global", conn) + ctx := context.Background() + + statsConn.Delete(ctx, "", conn.v) + timingCounts := topoStatsConnTimings.Counts()["Delete.global"] + if got, want := timingCounts, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } + + // error is zero before getting an error + errorCount := topoStatsConnErrors.Counts()["Delete.global"] + if got, want := errorCount, int64(0); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } + + statsConn.Delete(ctx, "error", conn.v) + + // error stats gets emitted + errorCount = topoStatsConnErrors.Counts()["Delete.global"] + if got, want := errorCount, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } +} + +//TestStatsConnTopoLock emits stats on Lock +func TestStatsConnTopoLock(t *testing.T) { + conn := &fakeConn{} + statsConn := NewStatsConn("global", conn) + ctx := context.Background() + + statsConn.Lock(ctx, "", "") + timingCounts := topoStatsConnTimings.Counts()["Lock.global"] + if got, want := timingCounts, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } + + // error is zero before getting an error + errorCount := topoStatsConnErrors.Counts()["Lock.global"] + if got, want := errorCount, int64(0); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } + + statsConn.Lock(ctx, "error", "") + + // error stats gets emitted + errorCount = topoStatsConnErrors.Counts()["Lock.global"] + if got, want := errorCount, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } +} + +//TestStatsConnTopoWatch emits stats on Watch +func TestStatsConnTopoWatch(t *testing.T) { + conn := &fakeConn{} + statsConn := NewStatsConn("global", conn) + ctx := context.Background() + + statsConn.Watch(ctx, "") + timingCounts := topoStatsConnTimings.Counts()["Watch.global"] + if got, want := timingCounts, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } + +} + +//TestStatsConnTopoNewMasterParticipation emits stats on NewMasterParticipation +func TestStatsConnTopoNewMasterParticipation(t *testing.T) { + conn := &fakeConn{} + statsConn := NewStatsConn("global", conn) + + statsConn.NewMasterParticipation("", "") + timingCounts := topoStatsConnTimings.Counts()["NewMasterParticipation.global"] + if got, want := timingCounts, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } + + // error is zero before getting an error + errorCount := topoStatsConnErrors.Counts()["NewMasterParticipation.global"] + if got, want := errorCount, int64(0); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } + + statsConn.NewMasterParticipation("error", "") + + // error stats gets emitted + errorCount = topoStatsConnErrors.Counts()["NewMasterParticipation.global"] + if got, want := errorCount, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } +} + +//TestStatsConnTopoClose emits stats on Close +func TestStatsConnTopoClose(t *testing.T) { + conn := &fakeConn{} + statsConn := NewStatsConn("global", conn) + + statsConn.Close() + timingCounts := topoStatsConnTimings.Counts()["Close.global"] + if got, want := timingCounts, int64(1); got != want { + t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want) + } +}