Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions go/vt/vtgate/engine/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,9 +574,10 @@ func TestInsertShardedOwnedWithNull(t *testing.T) {
"onecol": {
Type: "lookup",
Params: map[string]string{
"table": "lkp1",
"from": "from",
"to": "toc",
"table": "lkp1",
"from": "from",
"to": "toc",
"ignore_nulls": "true",
},
Owner: "t1",
},
Expand Down Expand Up @@ -636,8 +637,6 @@ func TestInsertShardedOwnedWithNull(t *testing.T) {
t.Fatal(err)
}
vc.ExpectLog(t, []string{
`Execute insert into lkp1(from, toc) values(:from0, :toc0) from0: toc0: type:VARBINARY ` +
`value:"\026k@\264J\272K\326" true`,
`ResolveDestinations sharded [value:"0" ] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`,
`ExecuteMultiShard sharded.20-: prefix mid1 suffix ` +
`{_c30: _id0: type:INT64 value:"1" } true true`,
Expand Down Expand Up @@ -949,9 +948,10 @@ func TestInsertShardedIgnoreOwnedWithNull(t *testing.T) {
"onecol": {
Type: "lookup",
Params: map[string]string{
"table": "lkp1",
"from": "from",
"to": "toc",
"table": "lkp1",
"from": "from",
"to": "toc",
"ignore_nulls": "true",
},
Owner: "t1",
},
Expand Down Expand Up @@ -1023,8 +1023,6 @@ func TestInsertShardedIgnoreOwnedWithNull(t *testing.T) {
t.Fatal(err)
}
vc.ExpectLog(t, []string{
`Execute insert ignore into lkp1(from, toc) values(:from0, :toc0) from0: toc0: type:VARBINARY ` +
`value:"\026k@\264J\272K\326" true`,
`Execute select from from lkp1 where from = :from and toc = :toc from: toc: type:VARBINARY value:"\026k@\264J\272K\326" false`,
`ResolveDestinations sharded [value:"0" ] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`,
`ExecuteMultiShard sharded.-20: prefix mid1 suffix ` +
Expand Down
11 changes: 9 additions & 2 deletions go/vt/vtgate/vindexes/lookup_hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,22 @@ func TestLookupHashCreate(t *testing.T) {
t.Errorf("vc.queries length: %v, want %v", got, want)
}

err = lookuphash.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NULL}}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")}, false /* ignoreMode */)
want := "lookup.Create: input has null values: row: 0, col: 0"
if err == nil || err.Error() != want {
t.Errorf("lookuphash.Create(NULL) err: %v, want %s", err, want)
}

vc.queries = nil
lookuphash.(*LookupHash).lkp.IgnoreNulls = true
err = lookuphash.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NULL}}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")}, false /* ignoreMode */)
require.NoError(t, err)
if got, want := len(vc.queries), 1; got != want {
if got, want := len(vc.queries), 0; got != want {
t.Errorf("vc.queries length: %v, want %v", got, want)
}

err = lookuphash.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}}, [][]byte{[]byte("bogus")}, false /* ignoreMode */)
want := "lookup.Create.vunhash: invalid keyspace id: 626f677573"
want = "lookup.Create.vunhash: invalid keyspace id: 626f677573"
if err == nil || err.Error() != want {
t.Errorf("lookuphash.Create(bogus) err: %v, want %s", err, want)
}
Expand Down
44 changes: 32 additions & 12 deletions go/vt/vtgate/vindexes/lookup_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type lookupInternal struct {
To string `json:"to"`
Autocommit bool `json:"autocommit,omitempty"`
Upsert bool `json:"upsert,omitempty"`
IgnoreNulls bool `json:"ignore_nulls,omitempty"`
sel, ver, del string
}

Expand All @@ -48,6 +49,12 @@ func (lkp *lookupInternal) Init(lookupQueryParams map[string]string, autocommit,
}
lkp.FromColumns = fromColumns

var err error
lkp.IgnoreNulls, err = boolFromMap(lookupQueryParams, "ignore_nulls")
if err != nil {
return err
}

lkp.Autocommit = autocommit
lkp.Upsert = upsert

Expand Down Expand Up @@ -158,15 +165,32 @@ func (lkp *lookupInternal) Create(vcursor VCursor, rowsColValues [][]sqltypes.Va
}

func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqltypes.Value, toValues []sqltypes.Value, ignoreMode bool, co vtgatepb.CommitOrder) error {
if len(rowsColValues) == 0 {
// This code is unreachable. It's just a failsafe.
// Trim rows with null values
trimmedRowsCols := make([][]sqltypes.Value, 0, len(rowsColValues))
trimmedToValues := make([]sqltypes.Value, 0, len(toValues))
nextRow:
for i, row := range rowsColValues {
for j, col := range row {
if col.IsNull() {
if !lkp.IgnoreNulls {
return fmt.Errorf("lookup.Create: input has null values: row: %d, col: %d", i, j)
}
continue nextRow
}
}
trimmedRowsCols = append(trimmedRowsCols, row)
trimmedToValues = append(trimmedToValues, toValues[i])
}
if len(trimmedRowsCols) == 0 {
return nil
}
// We only need to check the first row. Number of cols per row
// is guaranteed by the engine to be uniform.
if len(rowsColValues[0]) != len(lkp.FromColumns) {
return fmt.Errorf("lookup.Create: column vindex count does not match the columns in the lookup: %d vs %v", len(rowsColValues[0]), lkp.FromColumns)
if len(trimmedRowsCols[0]) != len(lkp.FromColumns) {
return fmt.Errorf("lookup.Create: column vindex count does not match the columns in the lookup: %d vs %v", len(trimmedRowsCols[0]), lkp.FromColumns)
}
sort.Sort(&sorter{rowsColValues: trimmedRowsCols, toValues: trimmedToValues})

buf := new(bytes.Buffer)
if ignoreMode {
fmt.Fprintf(buf, "insert ignore into %s(", lkp.Table)
Expand All @@ -178,13 +202,9 @@ func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqlty
}
fmt.Fprintf(buf, "%s) values(", lkp.To)

bindVars := make(map[string]*querypb.BindVariable, 2*len(rowsColValues))
// Make a copy before sorting.
rowsColValues = append([][]sqltypes.Value(nil), rowsColValues...)
toValues = append([]sqltypes.Value(nil), toValues...)
sort.Sort(&sorter{rowsColValues: rowsColValues, toValues: toValues})
for rowIdx := range toValues {
colIds := rowsColValues[rowIdx]
bindVars := make(map[string]*querypb.BindVariable, 2*len(trimmedRowsCols))
for rowIdx := range trimmedToValues {
colIds := trimmedRowsCols[rowIdx]
if rowIdx != 0 {
buf.WriteString(", (")
}
Expand All @@ -195,7 +215,7 @@ func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqlty
}
toStr := lkp.To + strconv.Itoa(rowIdx)
buf.WriteString(":" + toStr + ")")
bindVars[toStr] = sqltypes.ValueBindVariable(toValues[rowIdx])
bindVars[toStr] = sqltypes.ValueBindVariable(trimmedToValues[rowIdx])
}

if lkp.Upsert {
Expand Down
26 changes: 24 additions & 2 deletions go/vt/vtgate/vindexes/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,16 +371,38 @@ func TestLookupNonUniqueCreate(t *testing.T) {
vc.queries = nil
err = lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(2)}, {sqltypes.NewInt64(1)}}, [][]byte{[]byte("test2"), []byte("test1")}, true /* ignoreMode */)
require.NoError(t, err)

wantqueries[0].Sql = "insert ignore into t(fromc, toc) values(:fromc0, :toc0), (:fromc1, :toc1)"
if !reflect.DeepEqual(vc.queries, wantqueries) {
t.Errorf("lookup.Create queries:\n%v, want\n%v", vc.queries, wantqueries)
}

// With ignore_nulls off
err = lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(2)}, {sqltypes.NULL}}, [][]byte{[]byte("test2"), []byte("test1")}, true /* ignoreMode */)
want := "lookup.Create: input has null values: row: 1, col: 0"
if err == nil || err.Error() != want {
t.Errorf("lookupNonUnique(query fail) err: %v, want %s", err, want)
}

// With ignore_nulls on
vc.queries = nil
lookupNonUnique.(*LookupNonUnique).lkp.IgnoreNulls = true
err = lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(2)}, {sqltypes.NULL}}, [][]byte{[]byte("test2"), []byte("test1")}, true /* ignoreMode */)
require.NoError(t, err)
wantqueries = []*querypb.BoundQuery{{
Sql: "insert ignore into t(fromc, toc) values(:fromc0, :toc0)",
BindVariables: map[string]*querypb.BindVariable{
"fromc0": sqltypes.Int64BindVariable(2),
"toc0": sqltypes.BytesBindVariable([]byte("test2")),
},
}}
if !reflect.DeepEqual(vc.queries, wantqueries) {
t.Errorf("lookup.Create queries:\n%v, want\n%v", vc.queries, wantqueries)
}

// Test query fail.
vc.mustFail = true
err = lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")}, false /* ignoreMode */)
want := "lookup.Create: execute failed"
want = "lookup.Create: execute failed"
if err == nil || err.Error() != want {
t.Errorf("lookupNonUnique(query fail) err: %v, want %s", err, want)
}
Expand Down