diff --git a/go/test/endtoend/tabletmanager/replication_manager/tablet_test.go b/go/test/endtoend/tabletmanager/replication_manager/tablet_test.go index 34f3b272e61..86b02244762 100644 --- a/go/test/endtoend/tabletmanager/replication_manager/tablet_test.go +++ b/go/test/endtoend/tabletmanager/replication_manager/tablet_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/sidecardb" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/test/endtoend/cluster" @@ -164,11 +166,12 @@ func waitForSourcePort(ctx context.Context, t *testing.T, tablet cluster.Vttable return fmt.Errorf("time out before source port became %v for %v", expectedPort, tablet.Alias) } -func getSidecarDbDDLQueryCount(tablet *cluster.VttabletProcess) (int64, error) { +func getSidecarDBDDLQueryCount(tablet *cluster.VttabletProcess) (int64, error) { vars := tablet.GetVars() - val, ok := vars["SidecarDbDDLQueryCount"] + key := sidecardb.StatsKeyQueryCount + val, ok := vars[key] if !ok { - return 0, fmt.Errorf("SidecarDbDDLQueryCount not found in debug/vars") + return 0, fmt.Errorf("%s not found in debug/vars", key) } return int64(val.(float64)), nil } @@ -178,7 +181,7 @@ func TestReplicationRepairAfterPrimaryTabletChange(t *testing.T) { err := waitForSourcePort(ctx, t, replicaTablet, int32(primaryTablet.MySQLPort)) require.NoError(t, err) - sidecarDDLCount, err := getSidecarDbDDLQueryCount(primaryTablet.VttabletProcess) + sidecarDDLCount, err := getSidecarDBDDLQueryCount(primaryTablet.VttabletProcess) require.NoError(t, err) // sidecar db should create all _vt tables when vttablet started require.Greater(t, sidecarDDLCount, int64(0)) @@ -197,7 +200,7 @@ func TestReplicationRepairAfterPrimaryTabletChange(t *testing.T) { err = waitForSourcePort(ctx, t, replicaTablet, int32(newMysqlPort)) require.NoError(t, err) - sidecarDDLCount, err = getSidecarDbDDLQueryCount(primaryTablet.VttabletProcess) + sidecarDDLCount, err = getSidecarDBDDLQueryCount(primaryTablet.VttabletProcess) require.NoError(t, err) // sidecardb should find the desired _vt schema and not apply any new creates or upgrades when the tablet comes up again require.Equal(t, sidecarDDLCount, int64(0)) diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index cda35f42cfc..bc3ace3f064 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -526,6 +526,7 @@ func getDebugVar(t *testing.T, port int, varPath []string) (string, error) { var val []byte var err error url := fmt.Sprintf("http://localhost:%d/debug/vars", port) + log.Infof("url: %s, varPath: %s", url, strings.Join(varPath, ":")) body := getHTTPBody(url) val, _, _, err = jsonparser.Get([]byte(body), varPath...) require.NoError(t, err) diff --git a/go/test/endtoend/vreplication/sidecardb_test.go b/go/test/endtoend/vreplication/sidecardb_test.go index a40a80736af..56ca2d08acd 100644 --- a/go/test/endtoend/vreplication/sidecardb_test.go +++ b/go/test/endtoend/vreplication/sidecardb_test.go @@ -5,6 +5,8 @@ import ( "strconv" "testing" + "vitess.io/vitess/go/vt/sidecardb" + "github.com/stretchr/testify/require" "github.com/tidwall/gjson" ) @@ -130,7 +132,7 @@ func modifySidecarDBSchema(t *testing.T, vc *VitessCluster, tabletID string, ddl } func getNumExecutedDDLQueries(t *testing.T, port int) int { - val, err := getDebugVar(t, port, []string{"SidecarDbDDLQueryCount"}) + val, err := getDebugVar(t, port, []string{sidecardb.StatsKeyQueryCount}) require.NoError(t, err) i, err := strconv.Atoi(val) require.NoError(t, err) diff --git a/go/vt/sidecardb/sidecardb.go b/go/vt/sidecardb/sidecardb.go index 840fb16b849..3d955995a6a 100644 --- a/go/vt/sidecardb/sidecardb.go +++ b/go/vt/sidecardb/sidecardb.go @@ -25,7 +25,9 @@ import ( "regexp" "runtime" "strings" + "sync" + "vitess.io/vitess/go/history" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/fakesqldb" @@ -71,10 +73,50 @@ func (t *sidecarTable) String() string { var sidecarTables []*sidecarTable var ddlCount *stats.Counter +var ddlErrorCount *stats.Counter +var ddlErrorHistory *history.History +var mu sync.Mutex + +type ddlError struct { + tableName string + err error +} + +const maxDDLErrorHistoryLength = 100 + +// failOnSchemaInitError decides whether we fail the schema init process when we encounter an error while +// applying a table schema upgrade DDL or continue with the next table. +// If true, tablets will not launch. The cluster will not come up until the issue is resolved. +// If false, the init process will continue trying to upgrade other tables. So some functionality might be broken +// due to an incorrect schema, but the cluster should come up and serve queries. +// This is an operational trade-off: if we always fail it could cause a major incident since the entire cluster will be down. +// If we are more permissive, it could cause hard-to-detect errors, because a module +// doesn't load or behaves incorrectly due to an incomplete upgrade. Errors however will be reported and if the +// related stats endpoints are monitored we should be able to diagnose/get alerted in a timely fashion. +const failOnSchemaInitError = false + +const StatsKeyPrefix = "SidecarDBDDL" +const StatsKeyQueryCount = StatsKeyPrefix + "QueryCount" +const StatsKeyErrorCount = StatsKeyPrefix + "ErrorCount" +const StatsKeyErrors = StatsKeyPrefix + "Errors" func init() { initSchemaFiles() - ddlCount = stats.NewCounter("SidecarDbDDLQueryCount", "Number of create/upgrade queries executed") + ddlCount = stats.NewCounter(StatsKeyQueryCount, "Number of queries executed") + ddlErrorCount = stats.NewCounter(StatsKeyErrorCount, "Number of errors during sidecar schema upgrade") + ddlErrorHistory = history.New(maxDDLErrorHistoryLength) + stats.Publish(StatsKeyErrors, stats.StringMapFunc(func() map[string]string { + mu.Lock() + defer mu.Unlock() + result := make(map[string]string, len(ddlErrorHistory.Records())) + for _, e := range ddlErrorHistory.Records() { + d, ok := e.(*ddlError) + if ok { + result[d.tableName] = d.err.Error() + } + } + return result + })) } func validateSchemaDefinition(name, schema string) (string, error) { @@ -90,14 +132,14 @@ func validateSchemaDefinition(name, schema string) (string, error) { tableName := createTable.Table.Name.String() qualifier := createTable.Table.Qualifier.String() if qualifier != SidecarDBName { - return "", fmt.Errorf("database qualifier specified for the %s table is %s rather than the expected value of %s", + return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "database qualifier specified for the %s table is %s rather than the expected value of %s", name, qualifier, SidecarDBName) } if !strings.EqualFold(tableName, name) { - return "", fmt.Errorf("table name of %s does not match the table name specified within the file: %s", name, tableName) + return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table name of %s does not match the table name specified within the file: %s", name, tableName) } if !createTable.IfNotExists { - return "", fmt.Errorf("%s file did not include the required IF NOT EXISTS clause in the CREATE TABLE statement for the %s table", name, tableName) + return "", vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "%s file did not include the required IF NOT EXISTS clause in the CREATE TABLE statement for the %s table", name, tableName) } normalizedSchema := sqlparser.CanonicalString(createTable) return normalizedSchema, nil @@ -123,7 +165,7 @@ func initSchemaFiles() { case 2: module = fmt.Sprintf("%s/%s", dirparts[0], dirparts[1]) default: - return fmt.Errorf("unexpected path value of %s specified for sidecar schema table; expected structure is [/]/.sql", dir) + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected path value of %s specified for sidecar schema table; expected structure is [/]/.sql", dir) } name := strings.Split(fname, ".")[0] @@ -163,11 +205,28 @@ type schemaInit struct { // Exec is a callback that has to be passed to Init() to execute the specified query in the database. type Exec func(ctx context.Context, query string, maxRows int, useDB bool) (*sqltypes.Result, error) -// GetDDLCount metric returns the count of sidecardb ddls that have been run as part of this vttablet's init process. +// GetDDLCount returns the count of sidecardb DDLs that have been run as part of this vttablet's init process. func GetDDLCount() int64 { return ddlCount.Get() } +// GetDDLErrorCount returns the count of sidecardb DDLs that have been errored out as part of this vttablet's init process. +func GetDDLErrorCount() int64 { + return ddlErrorCount.Get() +} + +// GetDDLErrorHistory returns the errors encountered as part of this vttablet's init process.. +func GetDDLErrorHistory() []*ddlError { + var errors []*ddlError + for _, e := range ddlErrorHistory.Records() { + ddle, ok := e.(*ddlError) + if ok { + errors = append(errors, ddle) + } + } + return errors +} + // Init creates or upgrades the sidecar database based on declarative schema for all tables in the schema. func Init(ctx context.Context, exec Exec) error { printCallerDetails() // for debug purposes only, remove in v17 @@ -249,7 +308,7 @@ func (si *schemaInit) doesSidecarDBExist() (bool, error) { return true, nil default: log.Errorf("found too many rows for sidecarDB %s: %d", SidecarDBName, len(rs.Rows)) - return false, fmt.Errorf("found too many rows for sidecarDB %s: %d", SidecarDBName, len(rs.Rows)) + return false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "found too many rows for sidecarDB %s: %d", SidecarDBName, len(rs.Rows)) } } @@ -322,7 +381,7 @@ func (si *schemaInit) findTableSchemaDiff(tableName, current, desired string) (s if ddl == "" { log.Infof("No changes needed for table %s", tableName) } else { - log.Infof("Applying ddl for table %s:\n%s", tableName, ddl) + log.Infof("Applying DDL for table %s:\n%s", tableName, ddl) } } @@ -358,10 +417,15 @@ func (si *schemaInit) ensureSchema(table *sidecarTable) error { } _, err := si.exec(ctx, ddl, 1, true) if err != nil { - log.Errorf("Error running ddl %s for table %s during sidecar database initialization %s: %+v", ddl, table, err) - return err + ddlErr := vterrors.Wrapf(err, + "Error running DDL %s for table %s during sidecar database initialization", ddl, table) + recordDDLError(table.name, ddlErr) + if failOnSchemaInitError { + return ddlErr + } + return nil } - log.Infof("Applied ddl %s for table %s during sidecar database initialization %s", ddl, table) + log.Infof("Applied DDL %s for table %s during sidecar database initialization", ddl, table) ddlCount.Add(1) return nil } @@ -369,6 +433,15 @@ func (si *schemaInit) ensureSchema(table *sidecarTable) error { return nil } +func recordDDLError(tableName string, err error) { + log.Error(err) + ddlErrorCount.Add(1) + ddlErrorHistory.Add(&ddlError{ + tableName: tableName, + err: err, + }) +} + // region unit-test-only // This section uses helpers used in tests, but also in the go/vt/vtexplain/vtexplain_vttablet.go. // Hence, it is here and not in the _test.go file. diff --git a/go/vt/sidecardb/sidecardb_test.go b/go/vt/sidecardb/sidecardb_test.go index 1f6b58ac493..1ca8f2f63a4 100644 --- a/go/vt/sidecardb/sidecardb_test.go +++ b/go/vt/sidecardb/sidecardb_test.go @@ -18,18 +18,26 @@ package sidecardb import ( "context" + "expvar" + "fmt" + "sort" + "strings" "testing" "vitess.io/vitess/go/vt/sqlparser" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" ) -// Tests all non-error code paths in sidecardb -func TestAllSidecarDB(t *testing.T) { +// TestInitErrors validates that the schema init error stats are being correctly set +func TestInitErrors(t *testing.T) { + ctx := context.Background() + db := fakesqldb.New(t) defer db.Close() AddSchemaInitQueries(db, false) @@ -42,50 +50,79 @@ func TestAllSidecarDB(t *testing.T) { db.AddQuery("select @@session.sql_mode as sql_mode", sqlMode) db.AddQueryPattern("set @@session.sql_mode=.*", &sqltypes.Result{}) - ctx := context.Background() + ddlErrorCount.Set(0) + ddlCount.Set(0) + cp := db.ConnParams() conn, err := cp.Connect(ctx) require.NoError(t, err) + + type schemaError struct { + tableName string + errorValue string + } + + // simulate two errors during table creation to validate error stats + schemaErrors := []schemaError{ + {"vreplication_log", "vreplication_log error"}, + {"copy_state", "copy_state error"}, + } + exec := func(ctx context.Context, query string, maxRows int, useDB bool) (*sqltypes.Result, error) { if useDB { if _, err := conn.ExecuteFetch(UseSidecarDatabaseQuery, maxRows, true); err != nil { return nil, err } } + + // simulate errors for the table creation DDLs applied for tables specified in schemaErrors + stmt, err := sqlparser.Parse(query) + if err != nil { + return nil, err + } + createTable, ok := stmt.(*sqlparser.CreateTable) + if ok { + for _, e := range schemaErrors { + if strings.EqualFold(e.tableName, createTable.Table.Name.String()) { + return nil, fmt.Errorf(e.errorValue) + } + } + } return conn.ExecuteFetch(query, maxRows, true) } - // tests init on empty db require.Equal(t, int64(0), GetDDLCount()) err = Init(ctx, exec) require.NoError(t, err) - require.Equal(t, int64(len(sidecarTables)), GetDDLCount()) - - // tests init on already inited db - AddSchemaInitQueries(db, true) - err = Init(ctx, exec) - require.NoError(t, err) - require.Equal(t, int64(len(sidecarTables)), GetDDLCount()) + require.Equal(t, int64(len(sidecarTables)-len(schemaErrors)), GetDDLCount()) + require.Equal(t, int64(len(schemaErrors)), GetDDLErrorCount()) - // tests misc paths not covered above - si := &schemaInit{ - ctx: ctx, - exec: exec, + var want []string + for _, e := range schemaErrors { + want = append(want, e.errorValue) } - result := sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "Database", - "varchar"), - "currentDB", - ) - db.AddQuery(SelectCurrentDatabaseQuery, result) - - currentDB, err := si.setCurrentDatabase("dbname") - require.NoError(t, err) - require.Equal(t, "currentDB", currentDB) + // sort expected and reported errors for easy comparison + sort.Strings(want) + got := GetDDLErrorHistory() + sort.Slice(got, func(i, j int) bool { + return got[i].tableName < got[j].tableName + }) + var gotErrors string + stats.Register(func(name string, v expvar.Var) { + if name == StatsKeyErrors { + gotErrors = v.String() + } + }) - require.False(t, MatchesInitQuery("abc")) - require.True(t, MatchesInitQuery(SelectCurrentDatabaseQuery)) - require.True(t, MatchesInitQuery("CREATE TABLE IF NOT EXISTS `_vt`.vreplication")) + // for DDL errors, validate both the internal data structure and the stats endpoint + for i := range want { + if !strings.Contains(got[i].err.Error(), want[i]) { + require.FailNowf(t, "incorrect schema error", "got %s, want %s", got[i], want[i]) + } + if !strings.Contains(gotErrors, want[i]) { + require.FailNowf(t, "schema error not published", "got %s, want %s", gotErrors, want[i]) + } + } } // test the logic that confirms that the user defined schema's table name and qualifier are valid @@ -149,3 +186,60 @@ func TestAlterTableAlgorithm(t *testing.T) { }) } } + +// Tests various non-error code paths in sidecardb +func TestMiscSidecarDB(t *testing.T) { + ctx := context.Background() + + db := fakesqldb.New(t) + defer db.Close() + AddSchemaInitQueries(db, false) + db.AddQuery("use dbname", &sqltypes.Result{}) + db.AddQueryPattern("set @@session.sql_mode=.*", &sqltypes.Result{}) + + cp := db.ConnParams() + conn, err := cp.Connect(ctx) + require.NoError(t, err) + exec := func(ctx context.Context, query string, maxRows int, useDB bool) (*sqltypes.Result, error) { + if useDB { + if _, err := conn.ExecuteFetch(UseSidecarDatabaseQuery, maxRows, true); err != nil { + return nil, err + } + } + return conn.ExecuteFetch(query, maxRows, true) + } + + // tests init on empty db + ddlErrorCount.Set(0) + ddlCount.Set(0) + require.Equal(t, int64(0), GetDDLCount()) + err = Init(ctx, exec) + require.NoError(t, err) + require.Equal(t, int64(len(sidecarTables)), GetDDLCount()) + + // tests init on already inited db + AddSchemaInitQueries(db, true) + err = Init(ctx, exec) + require.NoError(t, err) + require.Equal(t, int64(len(sidecarTables)), GetDDLCount()) + + // tests misc paths not covered above + si := &schemaInit{ + ctx: ctx, + exec: exec, + } + result := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "Database", + "varchar"), + "currentDB", + ) + db.AddQuery(SelectCurrentDatabaseQuery, result) + + currentDB, err := si.setCurrentDatabase("dbname") + require.NoError(t, err) + require.Equal(t, "currentDB", currentDB) + + require.False(t, MatchesInitQuery("abc")) + require.True(t, MatchesInitQuery(SelectCurrentDatabaseQuery)) + require.True(t, MatchesInitQuery("CREATE TABLE IF NOT EXISTS `_vt`.vreplication")) +}