diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 99048ec1dd1..82d3b04ad5b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -19,11 +19,11 @@ package vstreamer import ( "context" "fmt" - "sync" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" @@ -172,15 +172,10 @@ func (rs *rowStreamer) buildSelect() (string, error) { } func (rs *rowStreamer) streamQuery(conn *mysql.Conn, send func(*binlogdatapb.VStreamRowsResponse) error) error { - unlock, gtid, err := rs.lockTable() + gtid, err := rs.startStreaming(conn) if err != nil { return err } - defer unlock() - - if err := conn.ExecuteStreamFetch(rs.sendQuery); err != nil { - return err - } // first call the callback with the fields flds, err := conn.Fields() @@ -203,9 +198,6 @@ func (rs *rowStreamer) streamQuery(conn *mysql.Conn, send func(*binlogdatapb.VSt if err != nil { return fmt.Errorf("stream send error: %v", err) } - if err := unlock(); err != nil { - return err - } response := &binlogdatapb.VStreamRowsResponse{} lastpk := make([]sqltypes.Value, len(rs.pkColumns)) @@ -262,33 +254,40 @@ func (rs *rowStreamer) streamQuery(conn *mysql.Conn, send func(*binlogdatapb.VSt return nil } -func (rs *rowStreamer) lockTable() (unlock func() error, gtid string, err error) { - conn, err := rs.mysqlConnect() +func (rs *rowStreamer) startStreaming(conn *mysql.Conn) (string, error) { + lockConn, err := rs.mysqlConnect() if err != nil { - return nil, "", err + return "", err } + // To be safe, always unlock tables, even if lock tables might fail. + defer func() { + _, err := lockConn.ExecuteFetch("unlock tables", 0, false) + if err != nil { + log.Warning("Unlock tables failed: %v", err) + } else { + log.Infof("Tables unlocked", rs.plan.Table.Name) + } + lockConn.Close() + }() + + log.Infof("Locking table %s for copying", rs.plan.Table.Name) // mysql recommends this before locking tables. - if _, err := conn.ExecuteFetch("set autocommit=0", 0, false); err != nil { - return nil, "", err + if _, err := lockConn.ExecuteFetch("set autocommit=0", 0, false); err != nil { + return "", err } - if _, err := conn.ExecuteFetch(fmt.Sprintf("lock tables %s read", sqlparser.String(sqlparser.NewTableIdent(rs.plan.Table.Name))), 0, false); err != nil { - return nil, "", err - } - var once sync.Once - unlock = func() error { - var err error - once.Do(func() { - _, err = conn.ExecuteFetch("unlock tables", 0, false) - conn.Close() - }) - return err + if _, err := lockConn.ExecuteFetch(fmt.Sprintf("lock tables %s read", sqlparser.String(sqlparser.NewTableIdent(rs.plan.Table.Name))), 0, false); err != nil { + return "", err } - pos, err := conn.MasterPosition() + pos, err := lockConn.MasterPosition() if err != nil { - unlock() - return nil, "", err + return "", err } - return unlock, mysql.EncodePosition(pos), nil + + if err := conn.ExecuteStreamFetch(rs.sendQuery); err != nil { + return "", err + } + + return mysql.EncodePosition(pos), nil } func (rs *rowStreamer) mysqlConnect() (*mysql.Conn, error) {