Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/sqlparser/tracked_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewTrackedBuffer(nodeFormatter func(buf *TrackedBuffer, node SQLNode)) *Tra
}
}

// Convenience function, initiates the writing of a single SQLNode tree by passing
// WriteNode function, initiates the writing of a single SQLNode tree by passing
// through to Myprintf with a default format string
func (buf *TrackedBuffer) WriteNode(node SQLNode) *TrackedBuffer {
buf.Myprintf("%v", node)
Expand Down
95 changes: 59 additions & 36 deletions go/vt/vtgate/engine/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"encoding/json"
"fmt"

"encoding/hex"
"strconv"
"strings"

Expand Down Expand Up @@ -492,25 +491,29 @@ func (route *Route) getInsertShardedRoute(vcursor VCursor, queryConstruct *query
}

inputs := route.Values.([]interface{})
for rowNum, input := range inputs {
allKeys := make([][]interface{}, len(inputs[0].([]interface{})))
for _, input := range inputs {
keys, err := route.resolveKeys(input.([]interface{}), queryConstruct.BindVars)
if err != nil {
return "", nil, fmt.Errorf("getInsertShardedRoute: %v", err)
}
for colNum := 0; colNum < len(keys); colNum++ {
if colNum == 0 {
ksid, err := route.handlePrimary(vcursor, queryConstruct, keys[colNum], route.Table.ColumnVindexes[colNum], queryConstruct.BindVars, rowNum)
if err != nil {
return "", nil, fmt.Errorf("getInsertShardedRoute: %v", err)
}
keyspaceIDs = append(keyspaceIDs, ksid)
} else {
err := route.handleNonPrimary(vcursor, queryConstruct, keys[colNum], route.Table.ColumnVindexes[colNum], queryConstruct.BindVars, keyspaceIDs[rowNum], rowNum)
if err != nil {
return "", nil, fmt.Errorf("getInsertShardedRoute: %v", err)
}
}
allKeys[colNum] = append(allKeys[colNum], keys[colNum])
}
}

keyspaceIDs, err = route.handlePrimary(vcursor, allKeys[0], route.Table.ColumnVindexes[0], queryConstruct.BindVars)
if err != nil {
return "", nil, fmt.Errorf("getInsertShardedRoute: %v", err)
}

for colNum := 1; colNum < len(allKeys); colNum++ {
err := route.handleNonPrimary(vcursor, allKeys[colNum], route.Table.ColumnVindexes[colNum], queryConstruct.BindVars, keyspaceIDs)
if err != nil {
return "", nil, fmt.Errorf("getInsertShardedRoute: %v", err)
}
}
for rowNum := range keyspaceIDs {
shard, err := vcursor.GetShardForKeyspaceID(allShards, keyspaceIDs[rowNum])
routing[shard] = append(routing[shard], route.Mid[rowNum])
if err != nil {
Expand Down Expand Up @@ -744,57 +747,77 @@ func (route *Route) handleGenerate(vcursor VCursor, queryConstruct *queryinfo.Qu
return insertid, nil
}

func (route *Route) handlePrimary(vcursor VCursor, queryConstruct *queryinfo.QueryConstruct, vindexKey interface{}, colVindex *vindexes.ColumnVindex, bv map[string]interface{}, rowNum int) (ksid []byte, err error) {
if vindexKey == nil {
return nil, fmt.Errorf("value must be supplied for column %v", colVindex.Column)
func (route *Route) handlePrimary(vcursor VCursor, vindexKeys []interface{}, colVindex *vindexes.ColumnVindex, bv map[string]interface{}) (keyspaceIDs [][]byte, err error) {
for _, vindexkey := range vindexKeys {
if vindexkey == nil {
return nil, fmt.Errorf("value must be supplied for column %v", colVindex.Column)
}
}
mapper := colVindex.Vindex.(vindexes.Unique)
ksids, err := mapper.Map(vcursor, []interface{}{vindexKey})
keyspaceIDs, err = mapper.Map(vcursor, vindexKeys)
if err != nil {
return nil, err
}
ksid = ksids[0]
if len(ksid) == 0 {
return nil, fmt.Errorf("could not map %v to a keyspace id", vindexKey)
if len(keyspaceIDs) != len(vindexKeys) {
return nil, fmt.Errorf("could not map %v to a keyspaceids", vindexKeys)
}
for rowNum, vindexKey := range vindexKeys {
if len(keyspaceIDs[rowNum]) == 0 {
return nil, fmt.Errorf("could not map %v to a keyspace id", vindexKey)
}
bv["_"+colVindex.Column.CompliantName()+strconv.Itoa(rowNum)] = vindexKey
}
bv["_"+colVindex.Column.CompliantName()+strconv.Itoa(rowNum)] = vindexKey
return ksid, nil
return keyspaceIDs, nil
}

func (route *Route) handleNonPrimary(vcursor VCursor, queryConstruct *queryinfo.QueryConstruct, vindexKey interface{}, colVindex *vindexes.ColumnVindex, bv map[string]interface{}, ksid []byte, rowNum int) error {
func (route *Route) handleNonPrimary(vcursor VCursor, vindexKeys []interface{}, colVindex *vindexes.ColumnVindex, bv map[string]interface{}, ksids [][]byte) error {
if colVindex.Owned {
if vindexKey == nil {
return fmt.Errorf("value must be supplied for column %v", colVindex.Column)
for rowNum, vindexKey := range vindexKeys {
if vindexKey == nil {
return fmt.Errorf("value must be supplied for column %v", colVindex.Column)
}
bv["_"+colVindex.Column.CompliantName()+strconv.Itoa(rowNum)] = vindexKey
}
err := colVindex.Vindex.(vindexes.Lookup).Create(vcursor, vindexKey, ksid)
err := colVindex.Vindex.(vindexes.Lookup).Create(vcursor, vindexKeys, ksids)
if err != nil {
return err
}
} else {
if vindexKey == nil {
var reverseKsids [][]byte
var verifyKsids [][]byte
for rowNum, vindexKey := range vindexKeys {
if vindexKey == nil {
reverseKsids = append(reverseKsids, ksids[rowNum])
} else {
verifyKsids = append(verifyKsids, ksids[rowNum])
}
bv["_"+colVindex.Column.CompliantName()+strconv.Itoa(rowNum)] = vindexKey
}
var err error
if reverseKsids != nil {
reversible, ok := colVindex.Vindex.(vindexes.Reversible)
if !ok {
return fmt.Errorf("value must be supplied for column %v", colVindex.Column)
}
var err error
vindexKey, err = reversible.ReverseMap(vcursor, ksid)
vindexKeys, err = reversible.ReverseMap(vcursor, reverseKsids)
if err != nil {
return err
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test case for this. Is this possible?

}
if vindexKey == nil {
return fmt.Errorf("could not compute value for column %v", colVindex.Column)
for rowNum, vindexKey := range vindexKeys {
bv["_"+colVindex.Column.CompliantName()+strconv.Itoa(rowNum)] = vindexKey
}
} else {
ok, err := colVindex.Vindex.Verify(vcursor, vindexKey, ksid)
}

if verifyKsids != nil {
ok, err := colVindex.Vindex.Verify(vcursor, vindexKeys, verifyKsids)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("value %v for column %v does not map to keyspace id %v", vindexKey, colVindex.Column, hex.EncodeToString(ksid))
return fmt.Errorf("values %v for column %v does not map to keyspaceids", vindexKeys, colVindex.Column)
}
}
}
bv["_"+colVindex.Column.CompliantName()+strconv.Itoa(rowNum)] = vindexKey
return nil
}

Expand Down
20 changes: 10 additions & 10 deletions go/vt/vtgate/planbuilder/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ type hashIndex struct{ name string }

func (v *hashIndex) String() string { return v.name }
func (*hashIndex) Cost() int { return 1 }
func (*hashIndex) Verify(vindexes.VCursor, interface{}, []byte) (bool, error) {
func (*hashIndex) Verify(vindexes.VCursor, []interface{}, [][]byte) (bool, error) {
return false, nil
}
func (*hashIndex) Map(vindexes.VCursor, []interface{}) ([][]byte, error) { return nil, nil }
func (*hashIndex) Create(vindexes.VCursor, interface{}) error { return nil }
func (*hashIndex) Create(vindexes.VCursor, []interface{}) error { return nil }
func (*hashIndex) Delete(vindexes.VCursor, []interface{}, []byte) error { return nil }

func newHashIndex(name string, _ map[string]string) (vindexes.Vindex, error) {
Expand All @@ -41,12 +41,12 @@ type lookupIndex struct{ name string }

func (v *lookupIndex) String() string { return v.name }
func (*lookupIndex) Cost() int { return 2 }
func (*lookupIndex) Verify(vindexes.VCursor, interface{}, []byte) (bool, error) {
func (*lookupIndex) Verify(vindexes.VCursor, []interface{}, [][]byte) (bool, error) {
return false, nil
}
func (*lookupIndex) Map(vindexes.VCursor, []interface{}) ([][]byte, error) { return nil, nil }
func (*lookupIndex) Create(vindexes.VCursor, interface{}, []byte) error { return nil }
func (*lookupIndex) Delete(vindexes.VCursor, []interface{}, []byte) error { return nil }
func (*lookupIndex) Map(vindexes.VCursor, []interface{}) ([][]byte, error) { return nil, nil }
func (*lookupIndex) Create(vindexes.VCursor, []interface{}, [][]byte) error { return nil }
func (*lookupIndex) Delete(vindexes.VCursor, []interface{}, []byte) error { return nil }

func newLookupIndex(name string, _ map[string]string) (vindexes.Vindex, error) {
return &lookupIndex{name: name}, nil
Expand All @@ -57,11 +57,11 @@ type multiIndex struct{ name string }

func (v *multiIndex) String() string { return v.name }
func (*multiIndex) Cost() int { return 3 }
func (*multiIndex) Verify(vindexes.VCursor, interface{}, []byte) (bool, error) {
func (*multiIndex) Verify(vindexes.VCursor, []interface{}, [][]byte) (bool, error) {
return false, nil
}
func (*multiIndex) Map(vindexes.VCursor, []interface{}) ([][][]byte, error) { return nil, nil }
func (*multiIndex) Create(vindexes.VCursor, interface{}, []byte) error { return nil }
func (*multiIndex) Create(vindexes.VCursor, []interface{}, [][]byte) error { return nil }
func (*multiIndex) Delete(vindexes.VCursor, []interface{}, []byte) error { return nil }

func newMultiIndex(name string, _ map[string]string) (vindexes.Vindex, error) {
Expand All @@ -73,11 +73,11 @@ type costlyIndex struct{ name string }

func (v *costlyIndex) String() string { return v.name }
func (*costlyIndex) Cost() int { return 10 }
func (*costlyIndex) Verify(vindexes.VCursor, interface{}, []byte) (bool, error) {
func (*costlyIndex) Verify(vindexes.VCursor, []interface{}, [][]byte) (bool, error) {
return false, nil
}
func (*costlyIndex) Map(vindexes.VCursor, []interface{}) ([][][]byte, error) { return nil, nil }
func (*costlyIndex) Create(vindexes.VCursor, interface{}, []byte) error { return nil }
func (*costlyIndex) Create(vindexes.VCursor, []interface{}, [][]byte) error { return nil }
func (*costlyIndex) Delete(vindexes.VCursor, []interface{}, []byte) error { return nil }

func newCostlyIndex(name string, _ map[string]string) (vindexes.Vindex, error) {
Expand Down
Loading