Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,8 +603,9 @@ func TestExecutorShow(t *testing.T) {
buildVarCharRow(KsTestSharded),
buildVarCharRow(KsTestUnsharded),
buildVarCharRow("TestXBadSharding"),
buildVarCharRow(KsTestBadVSchema),
},
RowsAffected: 4,
RowsAffected: 5,
}
if !reflect.DeepEqual(qr, wantqr) {
t.Errorf("show databases:\n%+v, want\n%+v", qr, wantqr)
Expand Down Expand Up @@ -632,9 +633,9 @@ func TestExecutorShow(t *testing.T) {
Fields: buildVarCharFields("Shards"),
Rows: [][]sqltypes.Value{
buildVarCharRow("TestExecutor/-20"),
buildVarCharRow("TestXBadSharding/e0-"),
buildVarCharRow("TestXBadVSchema/e0-"),
},
RowsAffected: 25,
RowsAffected: 33,
}
if !reflect.DeepEqual(qr, wantqr) {
t.Errorf("show vitess_shards:\n%+v, want\n%+v", qr, wantqr)
Expand Down Expand Up @@ -802,9 +803,9 @@ func TestExecutorShow(t *testing.T) {
Fields: buildVarCharFields("Shards"),
Rows: [][]sqltypes.Value{
buildVarCharRow("TestSharded/-20"),
buildVarCharRow("TestXBadSharding/e0-"),
buildVarCharRow("TestXBadVSchema/e0-"),
},
RowsAffected: 17,
RowsAffected: 25,
}
if !reflect.DeepEqual(qr, wantqr) {
t.Errorf("show databases:\n%+v, want\n%+v", qr, wantqr)
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/sandbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ const (
KsTestSharded = "TestSharded"
KsTestUnsharded = "TestUnsharded"
KsTestUnshardedServedFrom = "TestUnshardedServedFrom"
KsTestBadVSchema = "TestXBadVSchema"
)

func init() {
ksToSandbox = make(map[string]*sandbox)
createSandbox(KsTestSharded)
createSandbox(KsTestUnsharded)
createSandbox(KsTestBadVSchema)
tabletconn.RegisterDialer("sandbox", sandboxDialer)
flag.Set("tablet_protocol", "sandbox")
}
Expand Down
66 changes: 38 additions & 28 deletions go/vt/vtgate/vindexes/vschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type KeyspaceSchema struct {
Keyspace *Keyspace
Tables map[string]*Table
Vindexes map[string]Vindex
Error error
}

// MarshalJSON returns a JSON representation of KeyspaceSchema.
Expand All @@ -97,10 +98,17 @@ func (ks *KeyspaceSchema) MarshalJSON() ([]byte, error) {
Sharded bool `json:"sharded,omitempty"`
Tables map[string]*Table `json:"tables,omitempty"`
Vindexes map[string]Vindex `json:"vindexes,omitempty"`
Error string `json:"error,omitempty"`
}{
Sharded: ks.Keyspace.Sharded,
Tables: ks.Tables,
Vindexes: ks.Vindexes,
Error: func(ks *KeyspaceSchema) string {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok here, but should be used with care because inline functions incur additional overhead.

if ks.Error == nil {
return ""
}
return ks.Error.Error()
}(ks),
})
}

Expand All @@ -118,14 +126,8 @@ func BuildVSchema(source *vschemapb.SrvVSchema) (vschema *VSchema, err error) {
Keyspaces: make(map[string]*KeyspaceSchema),
}
buildKeyspaces(source, vschema)
err = buildTables(source, vschema)
if err != nil {
return nil, err
}
err = resolveAutoIncrement(source, vschema)
if err != nil {
return nil, err
}
buildTables(source, vschema)
resolveAutoIncrement(source, vschema)
addDual(vschema)
return vschema, nil
}
Expand All @@ -148,11 +150,9 @@ func BuildKeyspaceSchema(input *vschemapb.Keyspace, keyspace string) (*KeyspaceS
Keyspaces: make(map[string]*KeyspaceSchema),
}
buildKeyspaces(formal, vschema)
err := buildTables(formal, vschema)
if err != nil {
return nil, err
}
return vschema.Keyspaces[keyspace], nil
buildTables(formal, vschema)
err := vschema.Keyspaces[keyspace].Error
return vschema.Keyspaces[keyspace], err
}

// ValidateKeyspace ensures that the keyspace vschema is valid.
Expand All @@ -175,13 +175,16 @@ func buildKeyspaces(source *vschemapb.SrvVSchema, vschema *VSchema) {
}
}

func buildTables(source *vschemapb.SrvVSchema, vschema *VSchema) error {
func buildTables(source *vschemapb.SrvVSchema, vschema *VSchema) {
outer:
for ksname, ks := range source.Keyspaces {
keyspace := vschema.Keyspaces[ksname].Keyspace
ksvschema := vschema.Keyspaces[ksname]
keyspace := ksvschema.Keyspace
for vname, vindexInfo := range ks.Vindexes {
vindex, err := CreateVindex(vindexInfo.Type, vname, vindexInfo.Params)
if err != nil {
return err
ksvschema.Error = err
continue outer
}
if _, ok := vschema.uniqueVindexes[vname]; ok {
vschema.uniqueVindexes[vname] = nil
Expand All @@ -207,19 +210,22 @@ func buildTables(source *vschemapb.SrvVSchema, vschema *VSchema) error {
if table.Pinned != "" {
decoded, err := hex.DecodeString(table.Pinned)
if err != nil {
return fmt.Errorf("could not decode the keyspace id for pin: %v", err)
ksvschema.Error = fmt.Errorf("could not decode the keyspace id for pin: %v", err)
continue outer
}
t.Pinned = decoded
} else if keyspace.Sharded && len(table.ColumnVindexes) == 0 {
return fmt.Errorf("missing primary col vindex for table: %s", tname)
ksvschema.Error = fmt.Errorf("missing primary col vindex for table: %s", tname)
continue outer
}

// Initialize Columns.
colNames := make(map[string]bool)
for _, col := range table.Columns {
name := sqlparser.NewColIdent(col.Name)
if colNames[name.Lowered()] {
return fmt.Errorf("duplicate column name '%v' for table: %s", name, tname)
ksvschema.Error = fmt.Errorf("duplicate column name '%v' for table: %s", name, tname)
continue outer
}
colNames[name.Lowered()] = true
t.Columns = append(t.Columns, Column{Name: name, Type: col.Type})
Expand All @@ -229,7 +235,8 @@ func buildTables(source *vschemapb.SrvVSchema, vschema *VSchema) error {
for i, ind := range table.ColumnVindexes {
vindexInfo, ok := ks.Vindexes[ind.Name]
if !ok {
return fmt.Errorf("vindex %s not found for table %s", ind.Name, tname)
ksvschema.Error = fmt.Errorf("vindex %s not found for table %s", ind.Name, tname)
continue outer
}
vindex := vschema.Keyspaces[ksname].Vindexes[ind.Name]
owned := false
Expand All @@ -239,12 +246,14 @@ func buildTables(source *vschemapb.SrvVSchema, vschema *VSchema) error {
var columns []sqlparser.ColIdent
if ind.Column != "" {
if len(ind.Columns) > 0 {
return fmt.Errorf("can't use column and columns at the same time in vindex (%s) and table (%s)", ind.Name, tname)
ksvschema.Error = fmt.Errorf("can't use column and columns at the same time in vindex (%s) and table (%s)", ind.Name, tname)
continue outer
}
columns = []sqlparser.ColIdent{sqlparser.NewColIdent(ind.Column)}
} else {
if len(ind.Columns) == 0 {
return fmt.Errorf("must specify at least one column for vindex (%s) and table (%s)", ind.Name, tname)
ksvschema.Error = fmt.Errorf("must specify at least one column for vindex (%s) and table (%s)", ind.Name, tname)
continue outer
}
for _, indCol := range ind.Columns {
columns = append(columns, sqlparser.NewColIdent(indCol))
Expand All @@ -260,10 +269,12 @@ func buildTables(source *vschemapb.SrvVSchema, vschema *VSchema) error {
if i == 0 {
// Perform Primary vindex check.
if !columnVindex.Vindex.IsUnique() {
return fmt.Errorf("primary vindex %s is not Unique for table %s", ind.Name, tname)
ksvschema.Error = fmt.Errorf("primary vindex %s is not Unique for table %s", ind.Name, tname)
continue outer
}
if owned {
return fmt.Errorf("primary vindex %s cannot be owned for table %s", ind.Name, tname)
ksvschema.Error = fmt.Errorf("primary vindex %s cannot be owned for table %s", ind.Name, tname)
continue outer
}
}
t.ColumnVindexes = append(t.ColumnVindexes, columnVindex)
Expand All @@ -274,10 +285,9 @@ func buildTables(source *vschemapb.SrvVSchema, vschema *VSchema) error {
t.Ordered = colVindexSorted(t.ColumnVindexes)
}
}
return nil
}

func resolveAutoIncrement(source *vschemapb.SrvVSchema, vschema *VSchema) error {
func resolveAutoIncrement(source *vschemapb.SrvVSchema, vschema *VSchema) {
for ksname, ks := range source.Keyspaces {
ksvschema := vschema.Keyspaces[ksname]
for tname, table := range ks.Tables {
Expand All @@ -288,12 +298,12 @@ func resolveAutoIncrement(source *vschemapb.SrvVSchema, vschema *VSchema) error
t.AutoIncrement = &AutoIncrement{Column: sqlparser.NewColIdent(table.AutoIncrement.Column)}
seq, err := vschema.findQualified(table.AutoIncrement.Sequence)
if err != nil {
return fmt.Errorf("cannot resolve sequence %s: %v", table.AutoIncrement.Sequence, err)
ksvschema.Error = fmt.Errorf("cannot resolve sequence %s: %v", table.AutoIncrement.Sequence, err)
break
}
t.AutoIncrement.Sequence = seq
}
}
return nil
}

// addDual adds dual as a valid table to all keyspaces.
Expand Down
Loading