Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go/vt/binlog/keyspace_id_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ func newKeyspaceIDResolverFactoryV3(ctx context.Context, ts *topo.Server, keyspa
if col.Name.EqualString(shardingColumnName) {
// We found the column.
return i, &keyspaceIDResolverFactoryV3{
vindex: colVindex.Vindex,
// Only SingleColumn vindexes are returned by FindVindexForSharding.
vindex: colVindex.Vindex.(vindexes.SingleColumn),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can bad things happen if this turns out to be nil?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous function guarantees that the value won't be nil.

}, nil
}
}
Expand All @@ -158,7 +159,7 @@ func newKeyspaceIDResolverFactoryV3(ctx context.Context, ts *topo.Server, keyspa

// keyspaceIDResolverFactoryV3 uses the Vindex to compute the value.
type keyspaceIDResolverFactoryV3 struct {
vindex vindexes.Vindex
vindex vindexes.SingleColumn
}

func (r *keyspaceIDResolverFactoryV3) keyspaceID(v sqltypes.Value) ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Delete struct {
Query string

// Vindex specifies the vindex to be used.
Vindex vindexes.Vindex
Vindex vindexes.SingleColumn
// Values specifies the vindex values to use for routing.
// For now, only one value is specified.
Values []sqltypes.PlanValue
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/engine/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestDeleteEqual(t *testing.T) {
Sharded: true,
},
Query: "dummy_delete",
Vindex: vindex,
Vindex: vindex.(vindexes.SingleColumn),
Values: []sqltypes.PlanValue{{Value: sqltypes.NewInt64(1)}},
}

Expand Down Expand Up @@ -98,7 +98,7 @@ func TestDeleteEqualNoRoute(t *testing.T) {
Sharded: true,
},
Query: "dummy_delete",
Vindex: vindex,
Vindex: vindex.(vindexes.SingleColumn),
Values: []sqltypes.PlanValue{{Value: sqltypes.NewInt64(1)}},
}

Expand Down Expand Up @@ -127,7 +127,7 @@ func TestDeleteEqualNoScatter(t *testing.T) {
Sharded: true,
},
Query: "dummy_delete",
Vindex: vindex,
Vindex: vindex.(vindexes.SingleColumn),
Values: []sqltypes.PlanValue{{Value: sqltypes.NewInt64(1)}},
}

Expand All @@ -142,7 +142,7 @@ func TestDeleteOwnedVindex(t *testing.T) {
Opcode: DeleteEqual,
Keyspace: ks.Keyspace,
Query: "dummy_delete",
Vindex: ks.Vindexes["hash"],
Vindex: ks.Vindexes["hash"].(vindexes.SingleColumn),
Values: []sqltypes.PlanValue{{Value: sqltypes.NewInt64(1)}},
Table: ks.Tables["t1"],
OwnedVindexQuery: "dummy_subquery",
Expand Down
11 changes: 1 addition & 10 deletions go/vt/vtgate/engine/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,19 +391,10 @@ func (ins *Insert) getInsertShardedRoute(vcursor VCursor, bindVars map[string]*q
// keyspace ids. For regular inserts, a failure to find a route
// results in an error. For 'ignore' type inserts, the keyspace
// id is returned as nil, which is used later to drop the corresponding rows.
colVindex := ins.Table.ColumnVindexes[0]
keyspaceIDs, err := ins.processPrimary(vcursor, vindexRowsValues[0], colVindex)
keyspaceIDs, err := ins.processPrimary(vcursor, vindexRowsValues[0], ins.Table.ColumnVindexes[0])
if err != nil {
return nil, nil, vterrors.Wrap(err, "getInsertShardedRoute")
}
// Primary vindex can be owned. If so, go through the processOwned flow.
// If not owned, we don't do processUnowned because there's no need to verify
// the keyspace ids we just generated.
if colVindex.Owned {
if err := ins.processOwned(vcursor, vindexRowsValues[0], colVindex, keyspaceIDs); err != nil {
return nil, nil, vterrors.Wrap(err, "getInsertShardedRoute")
}
}

for vIdx := 1; vIdx < len(ins.Table.ColumnVindexes); vIdx++ {
colVindex := ins.Table.ColumnVindexes[vIdx]
Expand Down
224 changes: 28 additions & 196 deletions go/vt/vtgate/engine/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vtgate/vindexes"

Expand Down Expand Up @@ -655,9 +654,14 @@ func TestInsertShardedGeo(t *testing.T) {
Type: "region_experimental",
Params: map[string]string{
"region_bytes": "1",
"table": "lkp",
"from": "id,region",
"to": "toc",
},
},
"lookup": {
Type: "lookup_unique",
Params: map[string]string{
"table": "id_idx",
"from": "id",
"to": "keyspace_id",
},
Owner: "t1",
},
Expand All @@ -666,7 +670,10 @@ func TestInsertShardedGeo(t *testing.T) {
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "geo",
Columns: []string{"id", "region"},
Columns: []string{"region", "id"},
}, {
Name: "lookup",
Columns: []string{"id"},
}},
},
},
Expand All @@ -683,20 +690,30 @@ func TestInsertShardedGeo(t *testing.T) {
InsertSharded,
ks.Keyspace,
[]sqltypes.PlanValue{{
// colVindex columns: id, region
// colVindex columns: region, id
Values: []sqltypes.PlanValue{{
// rows for region
Values: []sqltypes.PlanValue{{
Value: sqltypes.NewInt64(1),
}, {
Value: sqltypes.NewInt64(255),
}},
}, {
// rows for id
Values: []sqltypes.PlanValue{{
Value: sqltypes.NewInt64(1),
}, {
Value: sqltypes.NewInt64(1),
}},
}, {
// rows for region
}},
}, {
// colVindex columns: id
Values: []sqltypes.PlanValue{{
// rows for id
Values: []sqltypes.PlanValue{{
Value: sqltypes.NewInt64(1),
}, {
Value: sqltypes.NewInt64(255),
Value: sqltypes.NewInt64(1),
}},
}},
}},
Expand All @@ -715,11 +732,9 @@ func TestInsertShardedGeo(t *testing.T) {
t.Fatal(err)
}
vc.ExpectLog(t, []string{
// ExecutePre proves that keyspace ids are generated, and that they are inserted into the lookup.
`ExecutePre insert into lkp(id, region, toc) values(:id0, :region0, :toc0), (:id1, :region1, :toc1) ` +
`Execute insert into id_idx(id, keyspace_id) values(:id0, :keyspace_id0), (:id1, :keyspace_id1) ` +
`id0: type:INT64 value:"1" id1: type:INT64 value:"1" ` +
`region0: type:INT64 value:"1" region1: type:INT64 value:"255" ` +
`toc0: type:VARBINARY value:"\001\026k@\264J\272K\326" toc1: type:VARBINARY value:"\377\026k@\264J\272K\326" true`,
`keyspace_id0: type:VARBINARY value:"\001\026k@\264J\272K\326" keyspace_id1: type:VARBINARY value:"\377\026k@\264J\272K\326" true`,
`ResolveDestinations sharded [value:"0" value:"1" ] Destinations:DestinationKeyspaceID(01166b40b44aba4bd6),DestinationKeyspaceID(ff166b40b44aba4bd6)`,
`ExecuteMultiShard sharded.20-: prefix mid1 suffix /* vtgate:: keyspace_id:01166b40b44aba4bd6 */ ` +
`{_id0: type:INT64 value:"1" _id1: type:INT64 value:"1" ` +
Expand Down Expand Up @@ -922,104 +937,6 @@ func TestInsertShardedIgnoreOwned(t *testing.T) {
})
}

func TestInsertIgnoreGeo(t *testing.T) {
invschema := &vschemapb.SrvVSchema{
Keyspaces: map[string]*vschemapb.Keyspace{
"sharded": {
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"geo": {
Type: "region_experimental",
Params: map[string]string{
"region_bytes": "1",
"table": "lkp",
"from": "id,region",
"to": "toc",
},
Owner: "t1",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "geo",
Columns: []string{"id", "region"},
}},
},
},
},
},
}
vs, err := vindexes.BuildVSchema(invschema)
if err != nil {
t.Fatal(err)
}
ks := vs.Keyspaces["sharded"]

ins := NewInsert(
InsertShardedIgnore,
ks.Keyspace,
[]sqltypes.PlanValue{{
// colVindex columns: id, region
Values: []sqltypes.PlanValue{{
// rows for id
Values: []sqltypes.PlanValue{{
Value: sqltypes.NewInt64(1),
}, {
Value: sqltypes.NewInt64(2),
}},
}, {
// rows for region
Values: []sqltypes.PlanValue{{
Value: sqltypes.NewInt64(1),
}, {
Value: sqltypes.NewInt64(2),
}},
}},
}},
ks.Tables["t1"],
"prefix",
[]string{" mid1", " mid2"},
" suffix",
)

ksid0 := sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"to",
"varbinary",
),
"\x00",
)
noresult := &sqltypes.Result{}
vc := &loggingVCursor{
shards: []string{"-20", "20-"},
shardForKsid: []string{"20-", "-20"},
results: []*sqltypes.Result{
// insert lkp
noresult,
// fail one verification (row 2)
ksid0,
noresult,
},
}
_, err = ins.Execute(vc, map[string]*querypb.BindVariable{}, false)
if err != nil {
t.Fatal(err)
}
vc.ExpectLog(t, []string{
`ExecutePre insert ignore into lkp(id, region, toc) values(:id0, :region0, :toc0), (:id1, :region1, :toc1) ` +
`id0: type:INT64 value:"1" id1: type:INT64 value:"2" ` +
`region0: type:INT64 value:"1" region1: type:INT64 value:"2" ` +
`toc0: type:VARBINARY value:"\001\026k@\264J\272K\326" toc1: type:VARBINARY value:"\002\006\347\352\"\316\222p\217" true`,
// Row 2 will fail verification. This is what we're testing. The second row should not get inserted.
`ExecutePre select id from lkp where id = :id and toc = :toc id: type:INT64 value:"1" toc: type:VARBINARY value:"\001\026k@\264J\272K\326" false`,
`ExecutePre select id from lkp where id = :id and toc = :toc id: type:INT64 value:"2" toc: type:VARBINARY value:"\002\006\347\352\"\316\222p\217" false`,
`ResolveDestinations sharded [value:"0" ] Destinations:DestinationKeyspaceID(01166b40b44aba4bd6)`,
`ExecuteMultiShard sharded.20-: prefix mid1 suffix /* vtgate:: keyspace_id:01166b40b44aba4bd6 */ ` +
`{_id0: type:INT64 value:"1" _region0: type:INT64 value:"1" } true true`,
})
}

func TestInsertShardedIgnoreOwnedWithNull(t *testing.T) {
invschema := &vschemapb.SrvVSchema{
Keyspaces: map[string]*vschemapb.Keyspace{
Expand Down Expand Up @@ -1270,91 +1187,6 @@ func TestInsertShardedUnownedVerify(t *testing.T) {
})
}

func TestInsertUnownedGeo(t *testing.T) {
invschema := &vschemapb.SrvVSchema{
Keyspaces: map[string]*vschemapb.Keyspace{
"sharded": {
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"primary": {
Type: "hash",
},
"geo": {
Type: "region_experimental",
Params: map[string]string{
"region_bytes": "1",
"table": "lkp",
"from": "other_id,region",
"to": "toc",
},
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "primary",
Columns: []string{"id"},
}, {
Name: "geo",
Columns: []string{"other_id", "region"},
}},
},
},
},
},
}
vs, err := vindexes.BuildVSchema(invschema)
if err != nil {
t.Fatal(err)
}
ks := vs.Keyspaces["sharded"]

ins := NewInsert(
InsertSharded,
ks.Keyspace,
[]sqltypes.PlanValue{{
// colVindex columns: id
Values: []sqltypes.PlanValue{{
// rows for id
Values: []sqltypes.PlanValue{{
Value: sqltypes.NewInt64(1),
}},
}},
}, {
// colVindex columns: other_id, region
Values: []sqltypes.PlanValue{{
// rows for other_id
Values: []sqltypes.PlanValue{{
Value: sqltypes.NewInt64(2),
}},
}, {
// rows for region
Values: []sqltypes.PlanValue{{
Value: sqltypes.NewInt64(3),
}},
}},
}},
ks.Tables["t1"],
"prefix",
[]string{" mid1"},
" suffix",
)

noresult := &sqltypes.Result{}
vc := &loggingVCursor{
shards: []string{"-20", "20-"},
results: []*sqltypes.Result{
// fail verification
noresult,
},
}
_, err = ins.Execute(vc, map[string]*querypb.BindVariable{}, false)
assert.EqualError(t, err, "execInsertSharded: getInsertShardedRoute: values [[INT64(2) INT64(3)]] for column [other_id region] does not map to keyspace ids")
vc.ExpectLog(t, []string{
`ExecutePre select other_id from lkp where other_id = :other_id and toc = :toc other_id: type:INT64 value:"2" toc: type:VARBINARY value:"\026k@\264J\272K\326" false`,
})
}

func TestInsertShardedIgnoreUnownedVerify(t *testing.T) {
invschema := &vschemapb.SrvVSchema{
Keyspaces: map[string]*vschemapb.Keyspace{
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Route struct {
FieldQuery string

// Vindex specifies the vindex to be used.
Vindex vindexes.Vindex
Vindex vindexes.SingleColumn
// Values specifies the vindex values to use for routing.
Values []sqltypes.PlanValue

Expand Down Expand Up @@ -463,7 +463,7 @@ func (route *Route) sort(in *sqltypes.Result) (*sqltypes.Result, error) {
return out, err
}

func resolveSingleShard(vcursor VCursor, vindex vindexes.Vindex, keyspace *vindexes.Keyspace, vindexKey sqltypes.Value) (*srvtopo.ResolvedShard, []byte, error) {
func resolveSingleShard(vcursor VCursor, vindex vindexes.SingleColumn, keyspace *vindexes.Keyspace, vindexKey sqltypes.Value) (*srvtopo.ResolvedShard, []byte, error) {
destinations, err := vindex.Map(vcursor, []sqltypes.Value{vindexKey})
if err != nil {
return nil, nil, err
Expand Down
Loading