Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go/cmd/vtgate/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
{{range $i, $skn := .SrvKeyspaceNames}}
<tr>
<td>{{github.com_youtube_vitess_vtctld_srv_cell $skn.Cell}}</td>
<td>{{if $skn.LastError}}<b>{{$skn.LastError}}</b><br/>Client: {{$skn.LastErrorHTML}}{{else}}{{range $j, $value := $skn.Value}}{{github.com_youtube_vitess_vtctld_srv_keyspace $skn.Cell $value}}&nbsp;{{end}}{{end}}</td>
<td>{{if $skn.LastError}}<b>{{$skn.LastError}}</b>{{else}}{{range $j, $value := $skn.Value}}{{github.com_youtube_vitess_vtctld_srv_keyspace $skn.Cell $value}}&nbsp;{{end}}{{end}}</td>
</tr>
{{end}}
</table>
Expand All @@ -45,7 +45,7 @@ var (
<tr>
<td>{{github.com_youtube_vitess_vtctld_srv_cell $sk.Cell}}</td>
<td>{{github.com_youtube_vitess_vtctld_srv_keyspace $sk.Cell $sk.Keyspace}}</td>
<td>{{if $sk.LastError}}<b>{{$sk.LastError}}</b><br/>Client: {{$sk.LastErrorHTML}}{{else}}{{$sk.StatusAsHTML}}{{end}}</td>
<td>{{if $sk.LastError}}<b>{{$sk.LastError}}</b>{{else}}{{$sk.StatusAsHTML}}{{end}}</td>
</tr>
{{end}}
</table>
Expand All @@ -67,7 +67,7 @@ var (
<td>{{github.com_youtube_vitess_vtctld_srv_keyspace $ep.Cell $ep.Keyspace}}</td>
<td>{{github.com_youtube_vitess_vtctld_srv_shard $ep.Cell $ep.Keyspace $ep.Shard}}</td>
<td>{{github.com_youtube_vitess_vtctld_srv_type $ep.Cell $ep.Keyspace $ep.Shard $ep.TabletType}}</td>
<td>{{if $ep.LastError}}<b>{{$ep.LastError}}</b><br/>Client: {{$ep.LastErrorHTML}}{{else}}{{$ep.StatusAsHTML}}{{end}}</td>
<td>{{if $ep.LastError}}<b>{{$ep.LastError}}</b>{{else}}{{$ep.StatusAsHTML}}{{end}}</td>
</tr>
{{end}}
</table>
Expand Down
31 changes: 15 additions & 16 deletions go/vt/vtgate/gorpcvtgateservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"flag"
"time"

"github.com/youtube/vitess/go/vt/callinfo"
"github.com/youtube/vitess/go/vt/rpc"
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/vtgate"
Expand All @@ -32,47 +31,47 @@ func (vtg *VTGate) Execute(ctx context.Context, query *proto.Query, reply *proto
defer vtg.server.HandlePanic(&err)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.Execute(callinfo.RPCWrapCallInfo(ctx), query, reply)
return vtg.server.Execute(ctx, query, reply)
}

// ExecuteShard is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGate) ExecuteShard(ctx context.Context, query *proto.QueryShard, reply *proto.QueryResult) (err error) {
defer vtg.server.HandlePanic(&err)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.ExecuteShard(callinfo.RPCWrapCallInfo(ctx), query, reply)
return vtg.server.ExecuteShard(ctx, query, reply)
}

// ExecuteKeyspaceIds is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGate) ExecuteKeyspaceIds(ctx context.Context, query *proto.KeyspaceIdQuery, reply *proto.QueryResult) (err error) {
defer vtg.server.HandlePanic(&err)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.ExecuteKeyspaceIds(callinfo.RPCWrapCallInfo(ctx), query, reply)
return vtg.server.ExecuteKeyspaceIds(ctx, query, reply)
}

// ExecuteKeyRanges is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGate) ExecuteKeyRanges(ctx context.Context, query *proto.KeyRangeQuery, reply *proto.QueryResult) (err error) {
defer vtg.server.HandlePanic(&err)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.ExecuteKeyRanges(callinfo.RPCWrapCallInfo(ctx), query, reply)
return vtg.server.ExecuteKeyRanges(ctx, query, reply)
}

// ExecuteEntityIds is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGate) ExecuteEntityIds(ctx context.Context, query *proto.EntityIdsQuery, reply *proto.QueryResult) (err error) {
defer vtg.server.HandlePanic(&err)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.ExecuteEntityIds(callinfo.RPCWrapCallInfo(ctx), query, reply)
return vtg.server.ExecuteEntityIds(ctx, query, reply)
}

// ExecuteBatchShard is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGate) ExecuteBatchShard(ctx context.Context, batchQuery *proto.BatchQueryShard, reply *proto.QueryResultList) (err error) {
defer vtg.server.HandlePanic(&err)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.ExecuteBatchShard(callinfo.RPCWrapCallInfo(ctx), batchQuery, reply)
return vtg.server.ExecuteBatchShard(ctx, batchQuery, reply)
}

// ExecuteBatchKeyspaceIds is the RPC version of
Expand All @@ -81,21 +80,21 @@ func (vtg *VTGate) ExecuteBatchKeyspaceIds(ctx context.Context, batchQuery *prot
defer vtg.server.HandlePanic(&err)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.ExecuteBatchKeyspaceIds(callinfo.RPCWrapCallInfo(ctx), batchQuery, reply)
return vtg.server.ExecuteBatchKeyspaceIds(ctx, batchQuery, reply)
}

// StreamExecute is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGate) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
return vtg.server.StreamExecute(callinfo.RPCWrapCallInfo(ctx), query, func(value *proto.QueryResult) error {
return vtg.server.StreamExecute(ctx, query, func(value *proto.QueryResult) error {
return sendReply(value)
})
}

// StreamExecuteShard is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGate) StreamExecuteShard(ctx context.Context, query *proto.QueryShard, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
return vtg.server.StreamExecuteShard(callinfo.RPCWrapCallInfo(ctx), query, func(value *proto.QueryResult) error {
return vtg.server.StreamExecuteShard(ctx, query, func(value *proto.QueryResult) error {
return sendReply(value)
})
}
Expand All @@ -104,7 +103,7 @@ func (vtg *VTGate) StreamExecuteShard(ctx context.Context, query *proto.QuerySha
// vtgateservice.VTGateService method
func (vtg *VTGate) StreamExecuteKeyRanges(ctx context.Context, query *proto.KeyRangeQuery, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
return vtg.server.StreamExecuteKeyRanges(callinfo.RPCWrapCallInfo(ctx), query, func(value *proto.QueryResult) error {
return vtg.server.StreamExecuteKeyRanges(ctx, query, func(value *proto.QueryResult) error {
return sendReply(value)
})
}
Expand All @@ -113,7 +112,7 @@ func (vtg *VTGate) StreamExecuteKeyRanges(ctx context.Context, query *proto.KeyR
// vtgateservice.VTGateService method
func (vtg *VTGate) StreamExecuteKeyspaceIds(ctx context.Context, query *proto.KeyspaceIdQuery, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
return vtg.server.StreamExecuteKeyspaceIds(callinfo.RPCWrapCallInfo(ctx), query, func(value *proto.QueryResult) error {
return vtg.server.StreamExecuteKeyspaceIds(ctx, query, func(value *proto.QueryResult) error {
return sendReply(value)
})
}
Expand All @@ -123,31 +122,31 @@ func (vtg *VTGate) Begin(ctx context.Context, noInput *rpc.Unused, outSession *p
defer vtg.server.HandlePanic(&err)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.Begin(callinfo.RPCWrapCallInfo(ctx), outSession)
return vtg.server.Begin(ctx, outSession)
}

// Commit is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGate) Commit(ctx context.Context, inSession *proto.Session, noOutput *rpc.Unused) (err error) {
defer vtg.server.HandlePanic(&err)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.Commit(callinfo.RPCWrapCallInfo(ctx), inSession)
return vtg.server.Commit(ctx, inSession)
}

// Rollback is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGate) Rollback(ctx context.Context, inSession *proto.Session, noOutput *rpc.Unused) (err error) {
defer vtg.server.HandlePanic(&err)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.Rollback(callinfo.RPCWrapCallInfo(ctx), inSession)
return vtg.server.Rollback(ctx, inSession)
}

// SplitQuery is the RPC version of vtgateservice.VTGateService method
func (vtg *VTGate) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) (err error) {
defer vtg.server.HandlePanic(&err)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.SplitQuery(callinfo.RPCWrapCallInfo(ctx), req, reply)
return vtg.server.SplitQuery(ctx, req, reply)
}

// New returns a new VTGate service
Expand Down
93 changes: 46 additions & 47 deletions go/vt/vtgate/srv_topo_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
log "github.com/golang/glog"

"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/vt/callinfo"
"github.com/youtube/vitess/go/vt/topo"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -469,10 +468,10 @@ func (server *ResilientSrvTopoServer) GetEndPoints(context context.Context, cell

// SrvKeyspaceNamesCacheStatus is the current value for SrvKeyspaceNames
type SrvKeyspaceNamesCacheStatus struct {
Cell string
Value []string
LastError error
LastErrorHTML template.HTML
Cell string
Value []string
LastError error
LastErrorContext context.Context
}

// SrvKeyspaceNamesCacheStatusList is used for sorting
Expand All @@ -495,11 +494,11 @@ func (skncsl SrvKeyspaceNamesCacheStatusList) Swap(i, j int) {

// SrvKeyspaceCacheStatus is the current value for a SrvKeyspace object
type SrvKeyspaceCacheStatus struct {
Cell string
Keyspace string
Value *topo.SrvKeyspace
LastError error
LastErrorHTML template.HTML
Cell string
Keyspace string
Value *topo.SrvKeyspace
LastError error
LastErrorContext context.Context
}

// StatusAsHTML returns an HTML version of our status.
Expand Down Expand Up @@ -554,12 +553,12 @@ func (skcsl SrvKeyspaceCacheStatusList) Swap(i, j int) {

// SrvShardCacheStatus is the current value for a SrvShard object
type SrvShardCacheStatus struct {
Cell string
Keyspace string
Shard string
Value *topo.SrvShard
LastError error
LastErrorHTML template.HTML
Cell string
Keyspace string
Shard string
Value *topo.SrvShard
LastError error
LastErrorContext context.Context
}

// StatusAsHTML returns an HTML version of our status.
Expand Down Expand Up @@ -604,14 +603,14 @@ func (sscsl SrvShardCacheStatusList) Swap(i, j int) {

// EndPointsCacheStatus is the current value for an EndPoints object
type EndPointsCacheStatus struct {
Cell string
Keyspace string
Shard string
TabletType topo.TabletType
Value *topo.EndPoints
OriginalValue *topo.EndPoints
LastError error
LastErrorHTML template.HTML
Cell string
Keyspace string
Shard string
TabletType topo.TabletType
Value *topo.EndPoints
OriginalValue *topo.EndPoints
LastError error
LastErrorContext context.Context
}

// StatusAsHTML returns an HTML version of our status.
Expand Down Expand Up @@ -699,50 +698,50 @@ func (server *ResilientSrvTopoServer) CacheStatus() *ResilientSrvTopoServerCache
for _, entry := range server.srvKeyspaceNamesCache {
entry.mutex.Lock()
result.SrvKeyspaceNames = append(result.SrvKeyspaceNames, &SrvKeyspaceNamesCacheStatus{
Cell: entry.cell,
Value: entry.value,
LastError: entry.lastError,
LastErrorHTML: callinfo.HTMLFromContext(entry.lastErrorContext),
Cell: entry.cell,
Value: entry.value,
LastError: entry.lastError,
LastErrorContext: entry.lastErrorContext,
})
entry.mutex.Unlock()
}

for _, entry := range server.srvKeyspaceCache {
entry.mutex.Lock()
result.SrvKeyspaces = append(result.SrvKeyspaces, &SrvKeyspaceCacheStatus{
Cell: entry.cell,
Keyspace: entry.keyspace,
Value: entry.value,
LastError: entry.lastError,
LastErrorHTML: callinfo.HTMLFromContext(entry.lastErrorContext),
Cell: entry.cell,
Keyspace: entry.keyspace,
Value: entry.value,
LastError: entry.lastError,
LastErrorContext: entry.lastErrorContext,
})
entry.mutex.Unlock()
}

for _, entry := range server.srvShardCache {
entry.mutex.Lock()
result.SrvShards = append(result.SrvShards, &SrvShardCacheStatus{
Cell: entry.cell,
Keyspace: entry.keyspace,
Shard: entry.shard,
Value: entry.value,
LastError: entry.lastError,
LastErrorHTML: callinfo.HTMLFromContext(entry.lastErrorContext),
Cell: entry.cell,
Keyspace: entry.keyspace,
Shard: entry.shard,
Value: entry.value,
LastError: entry.lastError,
LastErrorContext: entry.lastErrorContext,
})
entry.mutex.Unlock()
}

for _, entry := range server.endPointsCache {
entry.mutex.Lock()
result.EndPoints = append(result.EndPoints, &EndPointsCacheStatus{
Cell: entry.cell,
Keyspace: entry.keyspace,
Shard: entry.shard,
TabletType: entry.tabletType,
Value: entry.value,
OriginalValue: entry.originalValue,
LastError: entry.lastError,
LastErrorHTML: callinfo.HTMLFromContext(entry.lastErrorContext),
Cell: entry.cell,
Keyspace: entry.keyspace,
Shard: entry.shard,
TabletType: entry.tabletType,
Value: entry.value,
OriginalValue: entry.originalValue,
LastError: entry.lastError,
LastErrorContext: entry.lastErrorContext,
})
entry.mutex.Unlock()
}
Expand Down
8 changes: 1 addition & 7 deletions test/vtgatev2_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,20 +714,14 @@ def tablet_start(self, tablet, tablet_type, lameduck_period='0.5s'):
# target_tablet_type=tablet_type)

def test_status_with_error(self):
"""Tests that the status page loads correctly after a VTGate error.

More than anything, this is a smoke test for CallInfo working correctly.
"""
return
"""Tests that the status page loads correctly after a VTGate error."""
vtgate_conn = get_connection()
cursor = vtgate_conn.cursor('INVALID_KEYSPACE', 'replica', keyspace_ids=['0'])
# We expect to see a DatabaseError due to an invalid keyspace
with self.assertRaises(dbexceptions.DatabaseError):
cursor.execute('select * from vt_insert_test', {})
# Page should have loaded successfully
self.assertIn('</html>', utils.get_status(vtgate_port))
# Verify that the ContextHTML is loaded (i.e., verify that CallInfo works)
self.assertIn('RemoteAddr', utils.get_status(vtgate_port))

def test_tablet_restart_read(self):
try:
Expand Down