Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ccl/changeedccl: Add changefeed options into nemesis tests #137947

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 63 additions & 10 deletions pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,54 @@ import (
"github.com/cockroachdb/errors"
)

type ChangefeedOption struct {
FullTableName bool
Format string
KeyInValue bool
SinkType string
}

func newChangefeedOption(sinkType string) ChangefeedOption {
isCloudstorage := strings.Contains(sinkType, "cloudstorage")
isWebhook := strings.Contains(sinkType, "webhook")
cfo := ChangefeedOption{
FullTableName: rand.Intn(2) < 1,

// Because key_in_value is on by default for cloudstorage and webhook sinks,
// the key in the value is extracted and removed from the test feed
// messages (see extractKeyFromJSONValue function).
// TODO: enable testing key_in_value for cloudstorage and webhook sinks
KeyInValue: !isCloudstorage && !isWebhook && rand.Intn(2) < 1,
Format: "json",
SinkType: sinkType,
}

if isCloudstorage && rand.Intn(2) < 1 {
cfo.Format = "parquet"
}

return cfo
}

func (co ChangefeedOption) String() string {
return fmt.Sprintf("full_table_name=%t,key_in_value=%t,format=%s,sink_type=%s",
co.FullTableName, co.KeyInValue, co.Format, co.SinkType)
}

func (cfo ChangefeedOption) OptionString() string {
options := ""
if cfo.Format == "parquet" {
options = ", format=parquet"
}
if cfo.FullTableName {
options = options + ", full_table_name"
}
if cfo.KeyInValue {
options = options + ", key_in_value"
}
return options
}

type NemesesOption struct {
EnableFpValidator bool
EnableSQLSmith bool
Expand All @@ -36,7 +84,8 @@ var NemesesOptions = []NemesesOption{
}

func (no NemesesOption) String() string {
return fmt.Sprintf("fp_validator=%t,sql_smith=%t", no.EnableFpValidator, no.EnableSQLSmith)
return fmt.Sprintf("fp_validator=%t,sql_smith=%t",
no.EnableFpValidator, no.EnableSQLSmith)
}

// RunNemesis runs a jepsen-style validation of whether a changefeed meets our
Expand All @@ -50,8 +99,7 @@ func (no NemesesOption) String() string {
func RunNemesis(
f TestFeedFactory,
db *gosql.DB,
isSinkless bool,
isCloudstorage bool,
sinkType string,
withLegacySchemaChanger bool,
rng *rand.Rand,
nOp NemesesOption,
Expand All @@ -69,6 +117,9 @@ func RunNemesis(
ctx := context.Background()

eventPauseCount := 10

// TODO(dan): Ugly hack to disable `eventPause` in sinkless feeds.
isSinkless := strings.Contains(sinkType, "sinkless")
if isSinkless {
// Disable eventPause for sinkless changefeeds because we currently do not
// have "correct" pause and unpause mechanisms for changefeeds that aren't
Expand Down Expand Up @@ -199,11 +250,12 @@ func RunNemesis(
}
}

withFormatParquet := ""
if isCloudstorage && rand.Intn(2) < 1 {
withFormatParquet = ", format=parquet"
}
foo, err := f.Feed(fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff %s`, withFormatParquet))
cfo := newChangefeedOption(sinkType)
log.Infof(ctx, "Using changefeed options: %s", cfo.String())
foo, err := f.Feed(fmt.Sprintf(
`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff%s`,
cfo.OptionString(),
))
if err != nil {
return nil, err
}
Expand All @@ -218,7 +270,8 @@ func RunNemesis(
if _, err := db.Exec(createFprintStmtBuf.String()); err != nil {
return nil, err
}
baV, err := NewBeforeAfterValidator(db, `foo`)

baV, err := NewBeforeAfterValidator(db, `foo`, cfo)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -817,7 +870,7 @@ func noteFeedMessage(a fsm.Args) error {
}
ns.availableRows--
log.Infof(a.Ctx, "%s->%s", m.Key, m.Value)
return ns.v.NoteRow(m.Partition, string(m.Key), string(m.Value), ts)
return ns.v.NoteRow(m.Partition, string(m.Key), string(m.Value), ts, m.Topic)
}
}
}
Expand Down
66 changes: 56 additions & 10 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// guarantees in a single table.
type Validator interface {
// NoteRow accepts a changed row entry.
NoteRow(partition string, key, value string, updated hlc.Timestamp) error
NoteRow(partition, key, value string, updated hlc.Timestamp, topic string) error
// NoteResolved accepts a resolved timestamp entry.
NoteResolved(partition string, resolved hlc.Timestamp) error
// Failures returns any violations seen so far.
Expand Down Expand Up @@ -64,7 +64,7 @@ var _ StreamValidator = &orderValidator{}
type noOpValidator struct{}

// NoteRow accepts a changed row entry.
func (v *noOpValidator) NoteRow(string, string, string, hlc.Timestamp) error { return nil }
func (v *noOpValidator) NoteRow(string, string, string, hlc.Timestamp, string) error { return nil }

// NoteResolved accepts a resolved timestamp entry.
func (v *noOpValidator) NoteResolved(string, hlc.Timestamp) error { return nil }
Expand Down Expand Up @@ -125,7 +125,9 @@ func (v *orderValidator) GetValuesForKeyBelowTimestamp(
}

// NoteRow implements the Validator interface.
func (v *orderValidator) NoteRow(partition string, key, value string, updated hlc.Timestamp) error {
func (v *orderValidator) NoteRow(
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
if prev, ok := v.partitionForKey[key]; ok && prev != partition {
v.failures = append(v.failures, fmt.Sprintf(
`key [%s] received on two partitions: %s and %s`, key, prev, partition,
Expand Down Expand Up @@ -189,14 +191,18 @@ type beforeAfterValidator struct {
table string
primaryKeyCols []string
resolved map[string]hlc.Timestamp
fullTableName bool
keyInValue bool

failures []string
}

// NewBeforeAfterValidator returns a Validator verifies that the "before" and
// "after" fields in each row agree with the source table when performing AS OF
// SYSTEM TIME lookups before and at the row's timestamp.
func NewBeforeAfterValidator(sqlDB *gosql.DB, table string) (Validator, error) {
func NewBeforeAfterValidator(
sqlDB *gosql.DB, table string, option ChangefeedOption,
) (Validator, error) {
primaryKeyCols, err := fetchPrimaryKeyCols(sqlDB, table)
if err != nil {
return nil, errors.Wrap(err, "fetchPrimaryKeyCols failed")
Expand All @@ -205,15 +211,31 @@ func NewBeforeAfterValidator(sqlDB *gosql.DB, table string) (Validator, error) {
return &beforeAfterValidator{
sqlDB: sqlDB,
table: table,
fullTableName: option.FullTableName,
keyInValue: option.KeyInValue,
primaryKeyCols: primaryKeyCols,
resolved: make(map[string]hlc.Timestamp),
}, nil
}

// NoteRow implements the Validator interface.
func (v *beforeAfterValidator) NoteRow(
partition string, key, value string, updated hlc.Timestamp,
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
if v.fullTableName {
// TODO: fetch the actual database and schema name for the full table name
if topic != fmt.Sprintf(`d.public.%s`, v.table) {
v.failures = append(v.failures, fmt.Sprintf(
"topic %s does not match expected table d.public.%s", topic, v.table,
))
}
} else {
if topic != v.table {
v.failures = append(v.failures, fmt.Sprintf(
"topic %s does not match expected table %s", topic, v.table,
))
}
}
keyJSON, err := json.ParseJSON(key)
if err != nil {
return err
Expand All @@ -230,6 +252,26 @@ func (v *beforeAfterValidator) NoteRow(
return err
}

if v.keyInValue {
keyString := keyJSON.String()
keyInValueJSON, err := valueJSON.FetchValKey("key")
if err != nil {
return err
}

if keyInValueJSON == nil {
v.failures = append(v.failures, fmt.Sprintf(
"no key in value, expected key value %s", keyString))
} else {
keyInValueString := keyInValueJSON.String()
if keyInValueString != keyString {
v.failures = append(v.failures, fmt.Sprintf(
"key in value %s does not match expected key value %s",
keyInValueString, keyString))
}
}
}

afterJSON, err := valueJSON.FetchValKey("after")
if err != nil {
return err
Expand Down Expand Up @@ -451,7 +493,7 @@ func (v *FingerprintValidator) DBFunc(

// NoteRow implements the Validator interface.
func (v *FingerprintValidator) NoteRow(
ignoredPartition string, key, value string, updated hlc.Timestamp,
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
if v.firstRowTimestamp.IsEmpty() || updated.Less(v.firstRowTimestamp) {
v.firstRowTimestamp = updated
Expand Down Expand Up @@ -663,9 +705,11 @@ func (v *FingerprintValidator) Failures() []string {
type Validators []Validator

// NoteRow implements the Validator interface.
func (vs Validators) NoteRow(partition string, key, value string, updated hlc.Timestamp) error {
func (vs Validators) NoteRow(
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
for _, v := range vs {
if err := v.NoteRow(partition, key, value, updated); err != nil {
if err := v.NoteRow(partition, key, value, updated, topic); err != nil {
return err
}
}
Expand Down Expand Up @@ -707,10 +751,12 @@ func NewCountValidator(v Validator) *CountValidator {
}

// NoteRow implements the Validator interface.
func (v *CountValidator) NoteRow(partition string, key, value string, updated hlc.Timestamp) error {
func (v *CountValidator) NoteRow(
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
v.NumRows++
v.rowsSinceResolved++
return v.v.NoteRow(partition, key, value, updated)
return v.v.NoteRow(partition, key, value, updated, topic)
}

// NoteResolved implements the Validator interface.
Expand Down
Loading
Loading