Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/ServerConfiguration.md
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ Load-balancer in front of vtgate to scale up (not covered by Vitess). Stateless,
### Parameters

* **cells_to_watch**: which cell vtgate is in and will monitor tablets from. Cross-cell master access needs multiple cells here.
* **keyspaces_to_watch**: Specifies that a vtgate will only be able to perform queries against or view the topology of these keyspaces
* **tablet_types_to_wait**: VTGate waits for at least one serving tablet per tablet type specified here during startup, before listening to the serving port. So VTGate does not serve error. It should match the available tablet types VTGate connects to (master, replica, rdonly).
* **discovery_low_replication_lag**: when replication lags of all VTTablet in a particular shard and tablet type are less than or equal the flag (in seconds), VTGate does not filter them by replication lag and uses all to balance traffic.
* **degraded_threshold (30s)**: a tablet will publish itself as degraded if replication lag exceeds this threshold. This will cause VTGates to choose more up-to-date servers over this one. If all servers are degraded, VTGate resorts to serving from all of them.
Expand Down
116 changes: 116 additions & 0 deletions go/vt/srvtopo/keyspace_filtering_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
Copyright 2018 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package srvtopo

import (
"fmt"

"golang.org/x/net/context"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/topo"
)

var (
// ErrNilUnderlyingServer is returned when attempting to create a new keyspace
// filtering server if a nil underlying server implementation is provided.
ErrNilUnderlyingServer = fmt.Errorf("Unable to construct filtering server without an underlying server")

// ErrTopoServerNotAvailable is returned if a caller tries to access the
// topo.Server supporting this srvtopo.Server.
ErrTopoServerNotAvailable = fmt.Errorf("Cannot access underlying topology server when keyspace filtering is enabled")
)

// NewKeyspaceFilteringServer constructs a new server based on the provided
// implementation that prevents the specified keyspaces from being exposed
// to consumers of the new Server.
//
// A filtering server will not allow access to the topo.Server to prevent
// updates that may corrupt the global VSchema keyspace.
func NewKeyspaceFilteringServer(underlying Server, selectedKeyspaces []string) (Server, error) {
if underlying == nil {
return nil, ErrNilUnderlyingServer
}

keyspaces := map[string]bool{}
for _, ks := range selectedKeyspaces {
keyspaces[ks] = true
}

return keyspaceFilteringServer{
server: underlying,
selectKeyspaces: keyspaces,
}, nil
}

type keyspaceFilteringServer struct {
server Server
selectKeyspaces map[string]bool
}

// GetTopoServer returns an error; filtering srvtopo.Server consumers may not
// access the underlying topo.Server.
func (ksf keyspaceFilteringServer) GetTopoServer() (*topo.Server, error) {
return nil, ErrTopoServerNotAvailable
}

func (ksf keyspaceFilteringServer) GetSrvKeyspaceNames(
ctx context.Context,
cell string,
) ([]string, error) {
keyspaces, err := ksf.server.GetSrvKeyspaceNames(ctx, cell)
ret := make([]string, 0, len(keyspaces))
for _, ks := range keyspaces {
if ksf.selectKeyspaces[ks] {
ret = append(ret, ks)
}
}
return ret, err
}

func (ksf keyspaceFilteringServer) GetSrvKeyspace(
ctx context.Context,
cell,
keyspace string,
) (*topodatapb.SrvKeyspace, error) {
if !ksf.selectKeyspaces[keyspace] {
return nil, topo.NewError(topo.NoNode, keyspace)
}

return ksf.server.GetSrvKeyspace(ctx, cell, keyspace)
}

func (ksf keyspaceFilteringServer) WatchSrvVSchema(
ctx context.Context,
cell string,
callback func(*vschemapb.SrvVSchema, error),
) {
filteringCallback := func(schema *vschemapb.SrvVSchema, err error) {
if schema != nil {
for ks := range schema.Keyspaces {
if !ksf.selectKeyspaces[ks] {
delete(schema.Keyspaces, ks)
}
}
}

callback(schema, err)
}

ksf.server.WatchSrvVSchema(ctx, cell, filteringCallback)
}
229 changes: 229 additions & 0 deletions go/vt/srvtopo/keyspace_filtering_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
Copyright 2018 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package srvtopo

import (
"fmt"
"reflect"
"sync"
"testing"

"golang.org/x/net/context"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/srvtopo/srvtopotest"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
)

var (
stockCell = "some-cell"
stockCtx = context.Background()
stockFilters = []string{"bar", "baz"}
stockKeyspaces = map[string]*topodatapb.SrvKeyspace{
"foo": &topodatapb.SrvKeyspace{ShardingColumnName: "foo"},
"bar": &topodatapb.SrvKeyspace{ShardingColumnName: "bar"},
"baz": &topodatapb.SrvKeyspace{ShardingColumnName: "baz"},
}
stockVSchema = &vschemapb.SrvVSchema{
Keyspaces: map[string]*vschemapb.Keyspace{
"foo": &vschemapb.Keyspace{Sharded: true},
"bar": &vschemapb.Keyspace{Sharded: true},
"baz": &vschemapb.Keyspace{Sharded: false},
},
}
)

func newFiltering(filter []string) (*topo.Server, *srvtopotest.PassthroughSrvTopoServer, Server) {
testServer := srvtopotest.NewPassthroughSrvTopoServer()

testServer.TopoServer = memorytopo.NewServer(stockCell)
testServer.SrvKeyspaceNames = []string{"foo", "bar", "baz"}
testServer.SrvKeyspace = &topodatapb.SrvKeyspace{ShardingColumnName: "test-column"}
testServer.WatchedSrvVSchema = stockVSchema

filtering, _ := NewKeyspaceFilteringServer(testServer, filter)
return testServer.TopoServer, testServer, filtering
}

func TestFilteringServerHandlesNilUnderlying(t *testing.T) {
got, gotErr := NewKeyspaceFilteringServer(nil, []string{})
if got != nil {
t.Errorf("got: %v, wanted: nil server", got)
}
if gotErr != ErrNilUnderlyingServer {
t.Errorf("Bad error returned: got %v wanted %v", gotErr, ErrNilUnderlyingServer)
}
}

func TestFilteringServerReturnsUnderlyingServer(t *testing.T) {
_, _, f := newFiltering(nil)
got, gotErr := f.GetTopoServer()
if got != nil {
t.Errorf("Got non-nil topo.Server from FilteringServer")
}
if gotErr != ErrTopoServerNotAvailable {
t.Errorf("Unexpected error from GetTopoServer; wanted %v but got %v", ErrTopoServerNotAvailable, gotErr)
}
}

func doTestGetSrvKeyspaceNames(
t *testing.T,
f Server,
cell string,
want []string,
wantErr error,
) {
got, gotErr := f.GetSrvKeyspaceNames(stockCtx, cell)

if got == nil {
t.Errorf("GetSrvKeyspaceNames failed: should not return nil")
}
if !reflect.DeepEqual(got, want) {
t.Errorf("GetSrvKeyspaceNames failed: want %v, got %v", want, got)
}
if wantErr != gotErr {
t.Errorf("GetSrvKeyspaceNames returned incorrect error: want %v, got %v", wantErr, gotErr)
}
}

func TestFilteringServerGetSrvKeyspameNamesFiltersEverythingOut(t *testing.T) {
_, _, f := newFiltering(nil)
doTestGetSrvKeyspaceNames(t, f, stockCell, []string{}, nil)
}

func TestFilteringServerGetSrvKeyspaceNamesFiltersKeyspaces(t *testing.T) {
_, _, f := newFiltering(stockFilters)
doTestGetSrvKeyspaceNames(t, f, stockCell, stockFilters, nil)
}

func TestFilteringServerGetSrvKeyspaceNamesPassesThroughErrors(t *testing.T) {
_, mock, f := newFiltering(stockFilters)
wantErr := fmt.Errorf("some badcell error")
mock.SrvKeyspaceNamesError = wantErr
doTestGetSrvKeyspaceNames(t, f, "badcell", stockFilters, wantErr)
}

func doTestGetSrvKeyspace(
t *testing.T,
f Server,
cell,
ksName string,
want *topodatapb.SrvKeyspace,
wantErr error,
) {
got, gotErr := f.GetSrvKeyspace(stockCtx, cell, ksName)

gotColumnName := ""
wantColumnName := ""
if got != nil {
gotColumnName = got.ShardingColumnName
}
if want != nil {
wantColumnName = want.ShardingColumnName
}

// a different pointer comes back so compare the expected return by proxy
// of a field we know the value of
if gotColumnName != wantColumnName {
t.Errorf("keyspace incorrect: got %v, want %v", got, want)
}

if wantErr != gotErr {
t.Errorf("returned error incorrect: got %v, want %v", gotErr, wantErr)
}
}

func TestFilteringServerGetSrvKeyspaceReturnsSelectedKeyspaces(t *testing.T) {
_, mock, f := newFiltering(stockFilters)
mock.SrvKeyspace = stockKeyspaces["bar"]
doTestGetSrvKeyspace(t, f, stockCell, "bar", stockKeyspaces["bar"], nil)
}

func TestFilteringServerGetSrvKeyspaceErrorPassthrough(t *testing.T) {
wantErr := fmt.Errorf("some error")
_, mock, f := newFiltering(stockFilters)
mock.SrvKeyspace = stockKeyspaces["bar"]
mock.SrvKeyspaceError = wantErr
doTestGetSrvKeyspace(t, f, "badcell", "bar", stockKeyspaces["bar"], wantErr)
}

func TestFilteringServerGetSrvKeyspaceFilters(t *testing.T) {
wantErr := topo.NewError(topo.NoNode, "foo")
_, mock, f := newFiltering(stockFilters)
mock.SrvKeyspaceError = wantErr
doTestGetSrvKeyspace(t, f, stockCell, "foo", nil, wantErr)
}

func TestFilteringServerWatchSrvVSchemaFiltersPassthroughSrvVSchema(t *testing.T) {
_, mock, f := newFiltering(stockFilters)

allowed := map[string]bool{}
for _, ks := range stockFilters {
allowed[ks] = true
}

// we need to verify that the nested callback actually gets called
wg := sync.WaitGroup{}
wg.Add(1)

cb := func(gotSchema *vschemapb.SrvVSchema, gotErr error) {
// ensure that only selected keyspaces made it into the callback
for name, ks := range gotSchema.Keyspaces {
if !allowed[name] {
t.Errorf("Unexpected keyspace found in callback: %v", ks)
}
wantKS := mock.WatchedSrvVSchema.Keyspaces[name]
if !reflect.DeepEqual(ks, wantKS) {
t.Errorf(
"Expected keyspace to be passed through unmodified: want %#v got %#v",
wantKS,
ks,
)
}
}
wg.Done()
}

f.WatchSrvVSchema(stockCtx, stockCell, cb)
wg.Wait()
}

func TestFilteringServerWatchSrvVSchemaHandlesNilSchema(t *testing.T) {
wantErr := fmt.Errorf("some err")
_, mock, f := newFiltering(stockFilters)
mock.WatchedSrvVSchema = nil
mock.WatchedSrvVSchemaError = wantErr

// we need to verify that the nested callback actually gets called
wg := sync.WaitGroup{}
wg.Add(1)

cb := func(gotSchema *vschemapb.SrvVSchema, gotErr error) {
if gotSchema != nil {
t.Errorf("Expected nil gotSchema: got %#v", gotSchema)
}
if gotErr != wantErr {
t.Errorf("Unexpected error: want %v got %v", wantErr, gotErr)
}
wg.Done()
}

f.WatchSrvVSchema(stockCtx, "other-cell", cb)
wg.Wait()
}
4 changes: 2 additions & 2 deletions go/vt/srvtopo/resilient_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ func NewResilientServer(base *topo.Server, counterPrefix string) *ResilientServe
}

// GetTopoServer returns the topo.Server that backs the resilient server.
func (server *ResilientServer) GetTopoServer() *topo.Server {
return server.topoServer
func (server *ResilientServer) GetTopoServer() (*topo.Server, error) {
return server.topoServer, nil
}

// GetSrvKeyspaceNames returns all keyspace names for the given cell.
Expand Down
4 changes: 2 additions & 2 deletions go/vt/srvtopo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
// the serving graph read-only calls used by clients to resolve
// serving addresses, and to get VSchema.
type Server interface {
// GetTopoServer returns the full topo.Server instance
GetTopoServer() *topo.Server
// GetTopoServer returns the full topo.Server instance.
GetTopoServer() (*topo.Server, error)

// GetSrvKeyspaceNames returns the list of keyspaces served in
// the provided cell.
Expand Down
Loading