Skip to content

Commit

Permalink
dumpling: check table-list types before dumping (#53683)
Browse files Browse the repository at this point in the history
close #53682
  • Loading branch information
tangenta authored Jun 3, 2024
1 parent fc3132c commit 073f746
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 12 deletions.
28 changes: 16 additions & 12 deletions dumpling/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,9 +1145,24 @@ func getListTableTypeByConf(conf *Config) listTableType {
}

func prepareTableListToDump(tctx *tcontext.Context, conf *Config, db *sql.Conn) error {
if conf.SpecifiedTables || conf.SQL != "" {
if conf.SQL != "" {
return nil
}

ifSeqExists, err := CheckIfSeqExists(db)
if err != nil {
return err
}
var listType listTableType
if ifSeqExists {
listType = listTableByShowFullTables
} else {
listType = getListTableTypeByConf(conf)
}

if conf.SpecifiedTables {
return updateSpecifiedTablesMeta(tctx, db, conf.Tables, listType)
}
databases, err := prepareDumpingDatabases(tctx, conf, db)
if err != nil {
return err
Expand All @@ -1161,17 +1176,6 @@ func prepareTableListToDump(tctx *tcontext.Context, conf *Config, db *sql.Conn)
tableTypes = append(tableTypes, TableTypeSequence)
}

ifSeqExists, err := CheckIfSeqExists(db)
if err != nil {
return err
}
var listType listTableType
if ifSeqExists {
listType = listTableByShowFullTables
} else {
listType = getListTableTypeByConf(conf)
}

conf.Tables, err = ListAllDatabasesTables(tctx, db, databases, listType, tableTypes...)
if err != nil {
return err
Expand Down
113 changes: 113 additions & 0 deletions dumpling/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,119 @@ func RestoreCharset(w io.StringWriter) {
_, _ = w.WriteString("SET collation_connection = @PREV_COLLATION_CONNECTION;\n")
}

// updateSpecifiedTablesMeta updates DatabaseTables with correct table type and avg row size.
func updateSpecifiedTablesMeta(tctx *tcontext.Context, db *sql.Conn, dbTables DatabaseTables, listType listTableType) error {
var (
schema, table, tableTypeStr string
tableType TableType
avgRowLength uint64
err error
)
switch listType {
case listTableByInfoSchema:
dbNames := make([]string, 0, len(dbTables))
for db := range dbTables {
dbNames = append(dbNames, fmt.Sprintf("'%s'", db))
}
query := fmt.Sprintf("SELECT TABLE_SCHEMA,TABLE_NAME,TABLE_TYPE,AVG_ROW_LENGTH FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA IN (%s)", strings.Join(dbNames, ","))
if err := simpleQueryWithArgs(tctx, db, func(rows *sql.Rows) error {
var (
sqlAvgRowLength sql.NullInt64
err2 error
)
if err2 = rows.Scan(&schema, &table, &tableTypeStr, &sqlAvgRowLength); err != nil {
return errors.Trace(err2)
}

tbls, ok := dbTables[schema]
if !ok {
return nil
}
for _, tbl := range tbls {
if tbl.Name == table {
tableType, err2 = ParseTableType(tableTypeStr)
if err2 != nil {
return errors.Trace(err2)
}
if sqlAvgRowLength.Valid {
avgRowLength = uint64(sqlAvgRowLength.Int64)
} else {
avgRowLength = 0
}
tbl.Type = tableType
tbl.AvgRowLength = avgRowLength
}
}
return nil
}, query); err != nil {
return errors.Annotatef(err, "sql: %s", query)
}
return nil
case listTableByShowFullTables:
for schema, tbls := range dbTables {
query := fmt.Sprintf("SHOW FULL TABLES FROM `%s`",
escapeString(schema))
if err := simpleQueryWithArgs(tctx, db, func(rows *sql.Rows) error {
var err2 error
if err2 = rows.Scan(&table, &tableTypeStr); err != nil {
return errors.Trace(err2)
}
for _, tbl := range tbls {
if tbl.Name == table {
tableType, err2 = ParseTableType(tableTypeStr)
if err2 != nil {
return errors.Trace(err2)
}
tbl.Type = tableType
}
}
return nil
}, query); err != nil {
return errors.Annotatef(err, "sql: %s", query)
}
}
return nil
default:
const queryTemplate = "SHOW TABLE STATUS FROM `%s`"
for schema, tbls := range dbTables {
query := fmt.Sprintf(queryTemplate, escapeString(schema))
rows, err := db.QueryContext(tctx, query)
if err != nil {
return errors.Annotatef(err, "sql: %s", query)
}
results, err := GetSpecifiedColumnValuesAndClose(rows, "NAME", "ENGINE", "AVG_ROW_LENGTH", "COMMENT")
if err != nil {
return errors.Annotatef(err, "sql: %s", query)
}
for _, oneRow := range results {
table, engine, avgRowLengthStr, comment := oneRow[0], oneRow[1], oneRow[2], oneRow[3]
for _, tbl := range tbls {
if tbl.Name == table {
if avgRowLengthStr != "" {
avgRowLength, err = strconv.ParseUint(avgRowLengthStr, 10, 64)
if err != nil {
return errors.Annotatef(err, "sql: %s", query)
}
} else {
avgRowLength = 0
}
tbl.AvgRowLength = avgRowLength
tableType = TableTypeBase
if engine == "" && (comment == "" || comment == TableTypeViewStr) {
tableType = TableTypeView
} else if engine == "" {
tctx.L().Warn("invalid table without engine found", zap.String("database", schema), zap.String("table", table))
continue
}
tbl.Type = tableType
}
}
}
}
return nil
}
}

// ListAllDatabasesTables lists all the databases and tables from the database
// listTableByInfoSchema list tables by table information_schema in MySQL
// listTableByShowTableStatus has better performance than listTableByInfoSchema
Expand Down
31 changes: 31 additions & 0 deletions dumpling/tests/specified_table_view/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/bin/sh
#
# Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0.

set -eu
cur=$(cd `dirname $0`; pwd)

DB_NAME="specified_table_view"
TABLE_NAME="t"
VIEW_NAME="v"

run_sql "drop database if exists \`$DB_NAME\`;"
run_sql "create database \`$DB_NAME\`;"
run_sql "create table \`$DB_NAME\`.\`$TABLE_NAME\` (a int);"
run_sql "create view \`$DB_NAME\`.\`$VIEW_NAME\` as select * from \`$DB_NAME\`.\`$TABLE_NAME\`;"

set +e
rm -rf "$DUMPLING_OUTPUT_DIR"
run_dumpling --consistency=lock -T="$DB_NAME.$TABLE_NAME,$DB_NAME.$VIEW_NAME" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log
set -e

file_should_exist "$DUMPLING_OUTPUT_DIR/$DB_NAME.$TABLE_NAME-schema.sql"
file_should_exist "$DUMPLING_OUTPUT_DIR/$DB_NAME.$VIEW_NAME-schema-view.sql"

set +e
rm -rf "$DUMPLING_OUTPUT_DIR"
run_dumpling --consistency=lock -T="$DB_NAME.$TABLE_NAME,$DB_NAME.$VIEW_NAME" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log
set -e

file_should_exist "$DUMPLING_OUTPUT_DIR/$DB_NAME.$TABLE_NAME-schema.sql"
file_should_exist "$DUMPLING_OUTPUT_DIR/$DB_NAME.$VIEW_NAME-schema-view.sql"

0 comments on commit 073f746

Please sign in to comment.