Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"testing"
"time"

"vitess.io/vitess/go/vt/sidecardb"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
Expand Down Expand Up @@ -164,11 +166,12 @@ func waitForSourcePort(ctx context.Context, t *testing.T, tablet cluster.Vttable
return fmt.Errorf("time out before source port became %v for %v", expectedPort, tablet.Alias)
}

func getSidecarDbDDLQueryCount(tablet *cluster.VttabletProcess) (int64, error) {
func getSidecarDBDDLQueryCount(tablet *cluster.VttabletProcess) (int64, error) {
vars := tablet.GetVars()
val, ok := vars["SidecarDbDDLQueryCount"]
key := sidecardb.StatsKeyQueryCount
val, ok := vars[key]
if !ok {
return 0, fmt.Errorf("SidecarDbDDLQueryCount not found in debug/vars")
return 0, fmt.Errorf("%s not found in debug/vars", key)
}
return int64(val.(float64)), nil
}
Expand All @@ -178,7 +181,7 @@ func TestReplicationRepairAfterPrimaryTabletChange(t *testing.T) {
err := waitForSourcePort(ctx, t, replicaTablet, int32(primaryTablet.MySQLPort))
require.NoError(t, err)

sidecarDDLCount, err := getSidecarDbDDLQueryCount(primaryTablet.VttabletProcess)
sidecarDDLCount, err := getSidecarDBDDLQueryCount(primaryTablet.VttabletProcess)
require.NoError(t, err)
// sidecar db should create all _vt tables when vttablet started
require.Greater(t, sidecarDDLCount, int64(0))
Expand All @@ -197,7 +200,7 @@ func TestReplicationRepairAfterPrimaryTabletChange(t *testing.T) {
err = waitForSourcePort(ctx, t, replicaTablet, int32(newMysqlPort))
require.NoError(t, err)

sidecarDDLCount, err = getSidecarDbDDLQueryCount(primaryTablet.VttabletProcess)
sidecarDDLCount, err = getSidecarDBDDLQueryCount(primaryTablet.VttabletProcess)
require.NoError(t, err)
// sidecardb should find the desired _vt schema and not apply any new creates or upgrades when the tablet comes up again
require.Equal(t, sidecarDDLCount, int64(0))
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ func getDebugVar(t *testing.T, port int, varPath []string) (string, error) {
var val []byte
var err error
url := fmt.Sprintf("http://localhost:%d/debug/vars", port)
log.Infof("url: %s, varPath: %s", url, strings.Join(varPath, ":"))
body := getHTTPBody(url)
val, _, _, err = jsonparser.Get([]byte(body), varPath...)
require.NoError(t, err)
Expand Down
4 changes: 3 additions & 1 deletion go/test/endtoend/vreplication/sidecardb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"strconv"
"testing"

"vitess.io/vitess/go/vt/sidecardb"

"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
)
Expand Down Expand Up @@ -130,7 +132,7 @@ func modifySidecarDBSchema(t *testing.T, vc *VitessCluster, tabletID string, ddl
}

func getNumExecutedDDLQueries(t *testing.T, port int) int {
val, err := getDebugVar(t, port, []string{"SidecarDbDDLQueryCount"})
val, err := getDebugVar(t, port, []string{sidecardb.StatsKeyQueryCount})
require.NoError(t, err)
i, err := strconv.Atoi(val)
require.NoError(t, err)
Expand Down
95 changes: 84 additions & 11 deletions go/vt/sidecardb/sidecardb.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"regexp"
"runtime"
"strings"
"sync"

"vitess.io/vitess/go/history"
"vitess.io/vitess/go/mysql"

"vitess.io/vitess/go/mysql/fakesqldb"
Expand Down Expand Up @@ -71,10 +73,50 @@ func (t *sidecarTable) String() string {

var sidecarTables []*sidecarTable
var ddlCount *stats.Counter
var ddlErrorCount *stats.Counter
var ddlErrorHistory *history.History
var mu sync.Mutex

type ddlError struct {
tableName string
err error
}

const maxDDLErrorHistoryLength = 100

// failOnSchemaInitError decides whether we fail the schema init process when we encounter an error while
// applying a table schema upgrade DDL or continue with the next table.
// If true, tablets will not launch. The cluster will not come up until the issue is resolved.
// If false, the init process will continue trying to upgrade other tables. So some functionality might be broken
// due to an incorrect schema, but the cluster should come up and serve queries.
// This is an operational trade-off: if we always fail it could cause a major incident since the entire cluster will be down.
// If we are more permissive, it could cause hard-to-detect errors, because a module
// doesn't load or behaves incorrectly due to an incomplete upgrade. Errors however will be reported and if the
// related stats endpoints are monitored we should be able to diagnose/get alerted in a timely fashion.
const failOnSchemaInitError = false

const StatsKeyPrefix = "SidecarDBDDL"
const StatsKeyQueryCount = StatsKeyPrefix + "QueryCount"
const StatsKeyErrorCount = StatsKeyPrefix + "ErrorCount"
const StatsKeyErrors = StatsKeyPrefix + "Errors"

func init() {
initSchemaFiles()
ddlCount = stats.NewCounter("SidecarDbDDLQueryCount", "Number of create/upgrade queries executed")
ddlCount = stats.NewCounter(StatsKeyQueryCount, "Number of queries executed")
ddlErrorCount = stats.NewCounter(StatsKeyErrorCount, "Number of errors during sidecar schema upgrade")
ddlErrorHistory = history.New(maxDDLErrorHistoryLength)
stats.Publish(StatsKeyErrors, stats.StringMapFunc(func() map[string]string {
mu.Lock()
defer mu.Unlock()
result := make(map[string]string, len(ddlErrorHistory.Records()))
for _, e := range ddlErrorHistory.Records() {
d, ok := e.(*ddlError)
if ok {
result[d.tableName] = d.err.Error()
}
}
return result
}))
}

func validateSchemaDefinition(name, schema string) (string, error) {
Expand All @@ -90,14 +132,14 @@ func validateSchemaDefinition(name, schema string) (string, error) {
tableName := createTable.Table.Name.String()
qualifier := createTable.Table.Qualifier.String()
if qualifier != SidecarDBName {
return "", fmt.Errorf("database qualifier specified for the %s table is %s rather than the expected value of %s",
return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "database qualifier specified for the %s table is %s rather than the expected value of %s",
name, qualifier, SidecarDBName)
}
if !strings.EqualFold(tableName, name) {
return "", fmt.Errorf("table name of %s does not match the table name specified within the file: %s", name, tableName)
return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "table name of %s does not match the table name specified within the file: %s", name, tableName)
}
if !createTable.IfNotExists {
return "", fmt.Errorf("%s file did not include the required IF NOT EXISTS clause in the CREATE TABLE statement for the %s table", name, tableName)
return "", vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "%s file did not include the required IF NOT EXISTS clause in the CREATE TABLE statement for the %s table", name, tableName)
}
normalizedSchema := sqlparser.CanonicalString(createTable)
return normalizedSchema, nil
Expand All @@ -123,7 +165,7 @@ func initSchemaFiles() {
case 2:
module = fmt.Sprintf("%s/%s", dirparts[0], dirparts[1])
default:
return fmt.Errorf("unexpected path value of %s specified for sidecar schema table; expected structure is <module>[/<submodule>]/<tablename>.sql", dir)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected path value of %s specified for sidecar schema table; expected structure is <module>[/<submodule>]/<tablename>.sql", dir)
}

name := strings.Split(fname, ".")[0]
Expand Down Expand Up @@ -163,11 +205,28 @@ type schemaInit struct {
// Exec is a callback that has to be passed to Init() to execute the specified query in the database.
type Exec func(ctx context.Context, query string, maxRows int, useDB bool) (*sqltypes.Result, error)

// GetDDLCount metric returns the count of sidecardb ddls that have been run as part of this vttablet's init process.
// GetDDLCount returns the count of sidecardb DDLs that have been run as part of this vttablet's init process.
func GetDDLCount() int64 {
return ddlCount.Get()
}

// GetDDLErrorCount returns the count of sidecardb DDLs that have been errored out as part of this vttablet's init process.
func GetDDLErrorCount() int64 {
return ddlErrorCount.Get()
}

// GetDDLErrorHistory returns the errors encountered as part of this vttablet's init process..
func GetDDLErrorHistory() []*ddlError {
var errors []*ddlError
for _, e := range ddlErrorHistory.Records() {
ddle, ok := e.(*ddlError)
if ok {
errors = append(errors, ddle)
}
}
return errors
}

// Init creates or upgrades the sidecar database based on declarative schema for all tables in the schema.
func Init(ctx context.Context, exec Exec) error {
printCallerDetails() // for debug purposes only, remove in v17
Expand Down Expand Up @@ -249,7 +308,7 @@ func (si *schemaInit) doesSidecarDBExist() (bool, error) {
return true, nil
default:
log.Errorf("found too many rows for sidecarDB %s: %d", SidecarDBName, len(rs.Rows))
return false, fmt.Errorf("found too many rows for sidecarDB %s: %d", SidecarDBName, len(rs.Rows))
return false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "found too many rows for sidecarDB %s: %d", SidecarDBName, len(rs.Rows))
}
}

Expand Down Expand Up @@ -322,7 +381,7 @@ func (si *schemaInit) findTableSchemaDiff(tableName, current, desired string) (s
if ddl == "" {
log.Infof("No changes needed for table %s", tableName)
} else {
log.Infof("Applying ddl for table %s:\n%s", tableName, ddl)
log.Infof("Applying DDL for table %s:\n%s", tableName, ddl)
}
}

Expand Down Expand Up @@ -358,17 +417,31 @@ func (si *schemaInit) ensureSchema(table *sidecarTable) error {
}
_, err := si.exec(ctx, ddl, 1, true)
if err != nil {
log.Errorf("Error running ddl %s for table %s during sidecar database initialization %s: %+v", ddl, table, err)
return err
ddlErr := vterrors.Wrapf(err,
"Error running DDL %s for table %s during sidecar database initialization", ddl, table)
recordDDLError(table.name, ddlErr)
if failOnSchemaInitError {
return ddlErr
}
return nil
}
log.Infof("Applied ddl %s for table %s during sidecar database initialization %s", ddl, table)
log.Infof("Applied DDL %s for table %s during sidecar database initialization", ddl, table)
ddlCount.Add(1)
return nil
}
log.Infof("Table schema was already up to date for the %s table in the %s sidecar database", table.name, SidecarDBName)
return nil
}

func recordDDLError(tableName string, err error) {
log.Error(err)
ddlErrorCount.Add(1)
ddlErrorHistory.Add(&ddlError{
tableName: tableName,
err: err,
})
}

// region unit-test-only
// This section uses helpers used in tests, but also in the go/vt/vtexplain/vtexplain_vttablet.go.
// Hence, it is here and not in the _test.go file.
Expand Down
Loading