diff --git a/go/vt/vtgate/engine/insert_test.go b/go/vt/vtgate/engine/insert_test.go index 3c46e5510af..c51d4f37a61 100644 --- a/go/vt/vtgate/engine/insert_test.go +++ b/go/vt/vtgate/engine/insert_test.go @@ -574,9 +574,10 @@ func TestInsertShardedOwnedWithNull(t *testing.T) { "onecol": { Type: "lookup", Params: map[string]string{ - "table": "lkp1", - "from": "from", - "to": "toc", + "table": "lkp1", + "from": "from", + "to": "toc", + "ignore_nulls": "true", }, Owner: "t1", }, @@ -636,8 +637,6 @@ func TestInsertShardedOwnedWithNull(t *testing.T) { t.Fatal(err) } vc.ExpectLog(t, []string{ - `Execute insert into lkp1(from, toc) values(:from0, :toc0) from0: toc0: type:VARBINARY ` + - `value:"\026k@\264J\272K\326" true`, `ResolveDestinations sharded [value:"0" ] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`, `ExecuteMultiShard sharded.20-: prefix mid1 suffix ` + `{_c30: _id0: type:INT64 value:"1" } true true`, @@ -949,9 +948,10 @@ func TestInsertShardedIgnoreOwnedWithNull(t *testing.T) { "onecol": { Type: "lookup", Params: map[string]string{ - "table": "lkp1", - "from": "from", - "to": "toc", + "table": "lkp1", + "from": "from", + "to": "toc", + "ignore_nulls": "true", }, Owner: "t1", }, @@ -1023,8 +1023,6 @@ func TestInsertShardedIgnoreOwnedWithNull(t *testing.T) { t.Fatal(err) } vc.ExpectLog(t, []string{ - `Execute insert ignore into lkp1(from, toc) values(:from0, :toc0) from0: toc0: type:VARBINARY ` + - `value:"\026k@\264J\272K\326" true`, `Execute select from from lkp1 where from = :from and toc = :toc from: toc: type:VARBINARY value:"\026k@\264J\272K\326" false`, `ResolveDestinations sharded [value:"0" ] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`, `ExecuteMultiShard sharded.-20: prefix mid1 suffix ` + diff --git a/go/vt/vtgate/vindexes/lookup_hash_test.go b/go/vt/vtgate/vindexes/lookup_hash_test.go index ba6ff9223ed..f94204a1a4d 100644 --- a/go/vt/vtgate/vindexes/lookup_hash_test.go +++ b/go/vt/vtgate/vindexes/lookup_hash_test.go @@ -220,15 +220,22 @@ func TestLookupHashCreate(t *testing.T) { t.Errorf("vc.queries length: %v, want %v", got, want) } + err = lookuphash.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NULL}}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")}, false /* ignoreMode */) + want := "lookup.Create: input has null values: row: 0, col: 0" + if err == nil || err.Error() != want { + t.Errorf("lookuphash.Create(NULL) err: %v, want %s", err, want) + } + vc.queries = nil + lookuphash.(*LookupHash).lkp.IgnoreNulls = true err = lookuphash.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NULL}}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")}, false /* ignoreMode */) require.NoError(t, err) - if got, want := len(vc.queries), 1; got != want { + if got, want := len(vc.queries), 0; got != want { t.Errorf("vc.queries length: %v, want %v", got, want) } err = lookuphash.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}}, [][]byte{[]byte("bogus")}, false /* ignoreMode */) - want := "lookup.Create.vunhash: invalid keyspace id: 626f677573" + want = "lookup.Create.vunhash: invalid keyspace id: 626f677573" if err == nil || err.Error() != want { t.Errorf("lookuphash.Create(bogus) err: %v, want %s", err, want) } diff --git a/go/vt/vtgate/vindexes/lookup_internal.go b/go/vt/vtgate/vindexes/lookup_internal.go index 2ea8a77e455..178922675a5 100644 --- a/go/vt/vtgate/vindexes/lookup_internal.go +++ b/go/vt/vtgate/vindexes/lookup_internal.go @@ -36,6 +36,7 @@ type lookupInternal struct { To string `json:"to"` Autocommit bool `json:"autocommit,omitempty"` Upsert bool `json:"upsert,omitempty"` + IgnoreNulls bool `json:"ignore_nulls,omitempty"` sel, ver, del string } @@ -48,6 +49,12 @@ func (lkp *lookupInternal) Init(lookupQueryParams map[string]string, autocommit, } lkp.FromColumns = fromColumns + var err error + lkp.IgnoreNulls, err = boolFromMap(lookupQueryParams, "ignore_nulls") + if err != nil { + return err + } + lkp.Autocommit = autocommit lkp.Upsert = upsert @@ -158,15 +165,32 @@ func (lkp *lookupInternal) Create(vcursor VCursor, rowsColValues [][]sqltypes.Va } func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqltypes.Value, toValues []sqltypes.Value, ignoreMode bool, co vtgatepb.CommitOrder) error { - if len(rowsColValues) == 0 { - // This code is unreachable. It's just a failsafe. + // Trim rows with null values + trimmedRowsCols := make([][]sqltypes.Value, 0, len(rowsColValues)) + trimmedToValues := make([]sqltypes.Value, 0, len(toValues)) +nextRow: + for i, row := range rowsColValues { + for j, col := range row { + if col.IsNull() { + if !lkp.IgnoreNulls { + return fmt.Errorf("lookup.Create: input has null values: row: %d, col: %d", i, j) + } + continue nextRow + } + } + trimmedRowsCols = append(trimmedRowsCols, row) + trimmedToValues = append(trimmedToValues, toValues[i]) + } + if len(trimmedRowsCols) == 0 { return nil } // We only need to check the first row. Number of cols per row // is guaranteed by the engine to be uniform. - if len(rowsColValues[0]) != len(lkp.FromColumns) { - return fmt.Errorf("lookup.Create: column vindex count does not match the columns in the lookup: %d vs %v", len(rowsColValues[0]), lkp.FromColumns) + if len(trimmedRowsCols[0]) != len(lkp.FromColumns) { + return fmt.Errorf("lookup.Create: column vindex count does not match the columns in the lookup: %d vs %v", len(trimmedRowsCols[0]), lkp.FromColumns) } + sort.Sort(&sorter{rowsColValues: trimmedRowsCols, toValues: trimmedToValues}) + buf := new(bytes.Buffer) if ignoreMode { fmt.Fprintf(buf, "insert ignore into %s(", lkp.Table) @@ -178,13 +202,9 @@ func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqlty } fmt.Fprintf(buf, "%s) values(", lkp.To) - bindVars := make(map[string]*querypb.BindVariable, 2*len(rowsColValues)) - // Make a copy before sorting. - rowsColValues = append([][]sqltypes.Value(nil), rowsColValues...) - toValues = append([]sqltypes.Value(nil), toValues...) - sort.Sort(&sorter{rowsColValues: rowsColValues, toValues: toValues}) - for rowIdx := range toValues { - colIds := rowsColValues[rowIdx] + bindVars := make(map[string]*querypb.BindVariable, 2*len(trimmedRowsCols)) + for rowIdx := range trimmedToValues { + colIds := trimmedRowsCols[rowIdx] if rowIdx != 0 { buf.WriteString(", (") } @@ -195,7 +215,7 @@ func (lkp *lookupInternal) createCustom(vcursor VCursor, rowsColValues [][]sqlty } toStr := lkp.To + strconv.Itoa(rowIdx) buf.WriteString(":" + toStr + ")") - bindVars[toStr] = sqltypes.ValueBindVariable(toValues[rowIdx]) + bindVars[toStr] = sqltypes.ValueBindVariable(trimmedToValues[rowIdx]) } if lkp.Upsert { diff --git a/go/vt/vtgate/vindexes/lookup_test.go b/go/vt/vtgate/vindexes/lookup_test.go index 827e0708dda..6784dcac5a6 100644 --- a/go/vt/vtgate/vindexes/lookup_test.go +++ b/go/vt/vtgate/vindexes/lookup_test.go @@ -371,16 +371,38 @@ func TestLookupNonUniqueCreate(t *testing.T) { vc.queries = nil err = lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(2)}, {sqltypes.NewInt64(1)}}, [][]byte{[]byte("test2"), []byte("test1")}, true /* ignoreMode */) require.NoError(t, err) - wantqueries[0].Sql = "insert ignore into t(fromc, toc) values(:fromc0, :toc0), (:fromc1, :toc1)" if !reflect.DeepEqual(vc.queries, wantqueries) { t.Errorf("lookup.Create queries:\n%v, want\n%v", vc.queries, wantqueries) } + // With ignore_nulls off + err = lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(2)}, {sqltypes.NULL}}, [][]byte{[]byte("test2"), []byte("test1")}, true /* ignoreMode */) + want := "lookup.Create: input has null values: row: 1, col: 0" + if err == nil || err.Error() != want { + t.Errorf("lookupNonUnique(query fail) err: %v, want %s", err, want) + } + + // With ignore_nulls on + vc.queries = nil + lookupNonUnique.(*LookupNonUnique).lkp.IgnoreNulls = true + err = lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(2)}, {sqltypes.NULL}}, [][]byte{[]byte("test2"), []byte("test1")}, true /* ignoreMode */) + require.NoError(t, err) + wantqueries = []*querypb.BoundQuery{{ + Sql: "insert ignore into t(fromc, toc) values(:fromc0, :toc0)", + BindVariables: map[string]*querypb.BindVariable{ + "fromc0": sqltypes.Int64BindVariable(2), + "toc0": sqltypes.BytesBindVariable([]byte("test2")), + }, + }} + if !reflect.DeepEqual(vc.queries, wantqueries) { + t.Errorf("lookup.Create queries:\n%v, want\n%v", vc.queries, wantqueries) + } + // Test query fail. vc.mustFail = true err = lookupNonUnique.(Lookup).Create(vc, [][]sqltypes.Value{{sqltypes.NewInt64(1)}}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")}, false /* ignoreMode */) - want := "lookup.Create: execute failed" + want = "lookup.Create: execute failed" if err == nil || err.Error() != want { t.Errorf("lookupNonUnique(query fail) err: %v, want %s", err, want) }