diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 82d3b04ad5b..c4114d451c0 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -147,20 +147,20 @@ func (rs *rowStreamer) buildSelect() (string, error) { if len(rs.lastpk) != len(rs.pkColumns) { return "", fmt.Errorf("primary key values don't match length: %v vs %v", rs.lastpk, rs.pkColumns) } - buf.WriteString(" where (") + buf.WriteString(" where ") prefix := "" - for _, pk := range rs.pkColumns { - buf.Myprintf("%s%v", prefix, rs.plan.Table.Columns[pk].Name) - prefix = "," - } - buf.WriteString(") > (") - prefix = "" - for _, val := range rs.lastpk { - buf.WriteString(prefix) - prefix = "," - val.EncodeSQL(buf) + for lastcol := len(rs.pkColumns) - 1; lastcol >= 0; lastcol-- { + buf.Myprintf("%s(", prefix) + prefix = " or " + for i, pk := range rs.pkColumns[:lastcol] { + buf.Myprintf("%v = ", rs.plan.Table.Columns[pk].Name) + rs.lastpk[i].EncodeSQL(buf) + buf.Myprintf(" and ") + } + buf.Myprintf("%v > ", rs.plan.Table.Columns[rs.pkColumns[lastcol]].Name) + rs.lastpk[lastcol].EncodeSQL(buf) + buf.Myprintf(")") } - buf.WriteString(")") } buf.Myprintf(" order by ", sqlparser.NewTableIdent(rs.plan.Table.Name)) prefix = "" @@ -271,10 +271,6 @@ func (rs *rowStreamer) startStreaming(conn *mysql.Conn) (string, error) { }() log.Infof("Locking table %s for copying", rs.plan.Table.Name) - // mysql recommends this before locking tables. - if _, err := lockConn.ExecuteFetch("set autocommit=0", 0, false); err != nil { - return "", err - } if _, err := lockConn.ExecuteFetch(fmt.Sprintf("lock tables %s read", sqlparser.String(sqlparser.NewTableIdent(rs.plan.Table.Name))), 0, false); err != nil { return "", err } diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go index 5266b3fe864..c57135f4d12 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go @@ -42,11 +42,15 @@ func TestStreamRowsScan(t *testing.T) { // No PK "create table t3(id int, val varbinary(128))", "insert into t3 values (1, 'aaa'), (2, 'bbb')", + // Three-column PK + "create table t4(id1 int, id2 int, id3 int, val varbinary(128), primary key(id1, id2, id3))", + "insert into t4 values (1, 2, 3, 'aaa'), (2, 3, 4, 'bbb')", }) defer execStatements(t, []string{ "drop table t1", "drop table t2", "drop table t3", + "drop table t4", }) engine.se.Reload(context.Background()) @@ -63,7 +67,7 @@ func TestStreamRowsScan(t *testing.T) { `fields: fields: pkfields: `, `rows: lastpk: `, } - wantQuery = "select id, val from t1 where (id) > (1) order by id" + wantQuery = "select id, val from t1 where (id > 1) order by id" checkStream(t, "select * from t1", []sqltypes.Value{sqltypes.NewInt64(1)}, wantQuery, wantStream) // t1: different column ordering @@ -87,7 +91,7 @@ func TestStreamRowsScan(t *testing.T) { `fields: fields: fields: pkfields: pkfields: `, `rows: lastpk: `, } - wantQuery = "select id1, id2, val from t2 where (id1,id2) > (1,2) order by id1, id2" + wantQuery = "select id1, id2, val from t2 where (id1 = 1 and id2 > 2) or (id1 > 1) order by id1, id2" checkStream(t, "select * from t2", []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, wantQuery, wantStream) // t3: all rows @@ -103,8 +107,24 @@ func TestStreamRowsScan(t *testing.T) { `fields: fields: pkfields: pkfields: `, `rows: lastpk: `, } - wantQuery = "select id, val from t3 where (id,val) > (1,'aaa') order by id, val" + wantQuery = "select id, val from t3 where (id = 1 and val > 'aaa') or (id > 1) order by id, val" checkStream(t, "select * from t3", []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewVarBinary("aaa")}, wantQuery, wantStream) + + // t4: all rows + wantStream = []string{ + `fields: fields: fields: fields: pkfields: pkfields: pkfields: `, + `rows: rows: lastpk: `, + } + wantQuery = "select id1, id2, id3, val from t4 order by id1, id2, id3" + checkStream(t, "select * from t4", nil, wantQuery, wantStream) + + // t4: lastpk: 1,2,3 + wantStream = []string{ + `fields: fields: fields: fields: pkfields: pkfields: pkfields: `, + `rows: lastpk: `, + } + wantQuery = "select id1, id2, id3, val from t4 where (id1 = 1 and id2 = 2 and id3 > 3) or (id1 = 1 and id2 > 2) or (id1 > 1) order by id1, id2, id3" + checkStream(t, "select * from t4", []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2), sqltypes.NewInt64(3)}, wantQuery, wantStream) } func TestStreamRowsUnicode(t *testing.T) {