Skip to content

Commit

Permalink
ccl/changeedccl: add changefeed options into nemesis tests
Browse files Browse the repository at this point in the history
This work makes sure our nemesis tests for changefeeds randomize
over the options we use upon changefeed creation. This randomly adds
the key_in_value option (see below) and full_table_name option half
of the time and checks that the changefeed messages respect them in
the beforeAfter validator.

Note the following limitations: the full_table_name option, when on,
asserts that the topic in the output will be d.public.{table_name}
instead of checking for the actual name of the database/schema.

This change also does not add the key_in_value option when for the
webhook and cloudstorage sinks. Even before this change, since
key_in_value is on by default for those sinks, we remove the key
from the value in those testfeed messages for ease of testing.
Unfortunately, this makes these cases hard to test, so we leave them
out for now.

See also: cockroachdb#134119

Epic: CRDB-42866

Release note: None
  • Loading branch information
aerfrei committed Jan 7, 2025
1 parent 70c83cc commit c70abab
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 104 deletions.
75 changes: 65 additions & 10 deletions pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,52 @@ import (
"github.com/cockroachdb/errors"
)

type ChangefeedOption struct {
FullTableName bool
Format string
KeyInValue bool
}

func newChangefeedOption(testName string) ChangefeedOption {
isCloudstorage := strings.Contains(testName, "cloudstorage")
isWebhook := strings.Contains(testName, "webhook")
cfo := ChangefeedOption{
FullTableName: rand.Intn(2) < 1,

// Because key_in_value is on by default for cloudstorage and webhook sinks,
// the key in the value is extracted and removed from the test feed
// messages (see extractKeyFromJSONValue function).
// TODO: enable testing key_in_value for cloudstorage and webhook sinks
KeyInValue: !isCloudstorage && !isWebhook && rand.Intn(2) < 1,
Format: "json",
}

if isCloudstorage && rand.Intn(2) < 1 {
cfo.Format = "parquet"
}

return cfo
}

func (co ChangefeedOption) String() string {
return fmt.Sprintf("full_table_name=%t,key_in_value=%t,format=%s",
co.FullTableName, co.KeyInValue, co.Format)
}

func (cfo ChangefeedOption) OptionString() string {
options := ""
if cfo.Format == "parquet" {
options = ", format=parquet"
}
if cfo.FullTableName {
options = options + ", full_table_name"
}
if cfo.KeyInValue {
options = options + ", key_in_value"
}
return options
}

type NemesesOption struct {
EnableFpValidator bool
EnableSQLSmith bool
Expand All @@ -33,10 +79,15 @@ var NemesesOptions = []NemesesOption{
EnableFpValidator: false,
EnableSQLSmith: true,
},
{
EnableFpValidator: false,
EnableSQLSmith: false,
},
}

func (no NemesesOption) String() string {
return fmt.Sprintf("fp_validator=%t,sql_smith=%t", no.EnableFpValidator, no.EnableSQLSmith)
return fmt.Sprintf("fp_validator=%t,sql_smith=%t",
no.EnableFpValidator, no.EnableSQLSmith)
}

// RunNemesis runs a jepsen-style validation of whether a changefeed meets our
Expand All @@ -50,8 +101,7 @@ func (no NemesesOption) String() string {
func RunNemesis(
f TestFeedFactory,
db *gosql.DB,
isSinkless bool,
isCloudstorage bool,
testName string,
withLegacySchemaChanger bool,
rng *rand.Rand,
nOp NemesesOption,
Expand All @@ -69,6 +119,9 @@ func RunNemesis(
ctx := context.Background()

eventPauseCount := 10

// TODO(dan): Ugly hack to disable `eventPause` in sinkless feeds.
isSinkless := strings.Contains(testName, "sinkless")
if isSinkless {
// Disable eventPause for sinkless changefeeds because we currently do not
// have "correct" pause and unpause mechanisms for changefeeds that aren't
Expand Down Expand Up @@ -199,11 +252,12 @@ func RunNemesis(
}
}

withFormatParquet := ""
if isCloudstorage && rand.Intn(2) < 1 {
withFormatParquet = ", format=parquet"
}
foo, err := f.Feed(fmt.Sprintf(`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff %s`, withFormatParquet))
cfo := newChangefeedOption(testName)
log.Infof(ctx, "Using changefeed options: %s", cfo.String())
foo, err := f.Feed(fmt.Sprintf(
`CREATE CHANGEFEED FOR foo WITH updated, resolved, diff%s`,
cfo.OptionString(),
))
if err != nil {
return nil, err
}
Expand All @@ -218,7 +272,8 @@ func RunNemesis(
if _, err := db.Exec(createFprintStmtBuf.String()); err != nil {
return nil, err
}
baV, err := NewBeforeAfterValidator(db, `foo`)

baV, err := NewBeforeAfterValidator(db, `foo`, cfo)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -817,7 +872,7 @@ func noteFeedMessage(a fsm.Args) error {
}
ns.availableRows--
log.Infof(a.Ctx, "%s->%s", m.Key, m.Value)
return ns.v.NoteRow(m.Partition, string(m.Key), string(m.Value), ts)
return ns.v.NoteRow(m.Partition, string(m.Key), string(m.Value), ts, m.Topic)
}
}
}
Expand Down
66 changes: 56 additions & 10 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// guarantees in a single table.
type Validator interface {
// NoteRow accepts a changed row entry.
NoteRow(partition string, key, value string, updated hlc.Timestamp) error
NoteRow(partition, key, value string, updated hlc.Timestamp, topic string) error
// NoteResolved accepts a resolved timestamp entry.
NoteResolved(partition string, resolved hlc.Timestamp) error
// Failures returns any violations seen so far.
Expand Down Expand Up @@ -64,7 +64,7 @@ var _ StreamValidator = &orderValidator{}
type noOpValidator struct{}

// NoteRow accepts a changed row entry.
func (v *noOpValidator) NoteRow(string, string, string, hlc.Timestamp) error { return nil }
func (v *noOpValidator) NoteRow(string, string, string, hlc.Timestamp, string) error { return nil }

// NoteResolved accepts a resolved timestamp entry.
func (v *noOpValidator) NoteResolved(string, hlc.Timestamp) error { return nil }
Expand Down Expand Up @@ -125,7 +125,9 @@ func (v *orderValidator) GetValuesForKeyBelowTimestamp(
}

// NoteRow implements the Validator interface.
func (v *orderValidator) NoteRow(partition string, key, value string, updated hlc.Timestamp) error {
func (v *orderValidator) NoteRow(
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
if prev, ok := v.partitionForKey[key]; ok && prev != partition {
v.failures = append(v.failures, fmt.Sprintf(
`key [%s] received on two partitions: %s and %s`, key, prev, partition,
Expand Down Expand Up @@ -189,14 +191,18 @@ type beforeAfterValidator struct {
table string
primaryKeyCols []string
resolved map[string]hlc.Timestamp
fullTableName bool
keyInValue bool

failures []string
}

// NewBeforeAfterValidator returns a Validator verifies that the "before" and
// "after" fields in each row agree with the source table when performing AS OF
// SYSTEM TIME lookups before and at the row's timestamp.
func NewBeforeAfterValidator(sqlDB *gosql.DB, table string) (Validator, error) {
func NewBeforeAfterValidator(
sqlDB *gosql.DB, table string, option ChangefeedOption,
) (Validator, error) {
primaryKeyCols, err := fetchPrimaryKeyCols(sqlDB, table)
if err != nil {
return nil, errors.Wrap(err, "fetchPrimaryKeyCols failed")
Expand All @@ -205,15 +211,31 @@ func NewBeforeAfterValidator(sqlDB *gosql.DB, table string) (Validator, error) {
return &beforeAfterValidator{
sqlDB: sqlDB,
table: table,
fullTableName: option.FullTableName,
keyInValue: option.KeyInValue,
primaryKeyCols: primaryKeyCols,
resolved: make(map[string]hlc.Timestamp),
}, nil
}

// NoteRow implements the Validator interface.
func (v *beforeAfterValidator) NoteRow(
partition string, key, value string, updated hlc.Timestamp,
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
if v.fullTableName {
// TODO: fetch the actual database and schema name for the full table name
if topic != fmt.Sprintf(`d.public.%s`, v.table) {
v.failures = append(v.failures, fmt.Sprintf(
"topic %s does not match expected table d.public.%s", topic, v.table,
))
}
} else {
if topic != v.table {
v.failures = append(v.failures, fmt.Sprintf(
"topic %s does not match expected table %s", topic, v.table,
))
}
}
keyJSON, err := json.ParseJSON(key)
if err != nil {
return err
Expand All @@ -230,6 +252,26 @@ func (v *beforeAfterValidator) NoteRow(
return err
}

if v.keyInValue {
keyString := keyJSON.String()
keyInValueJSON, err := valueJSON.FetchValKey("key")
if err != nil {
return err
}

if keyInValueJSON == nil {
v.failures = append(v.failures, fmt.Sprintf(
"no key in value, expected key value %s", keyString))
} else {
keyInValueString := keyInValueJSON.String()
if keyInValueString != keyString {
v.failures = append(v.failures, fmt.Sprintf(
"key in value %s does not match expected key value %s",
keyInValueString, keyString))
}
}
}

afterJSON, err := valueJSON.FetchValKey("after")
if err != nil {
return err
Expand Down Expand Up @@ -451,7 +493,7 @@ func (v *FingerprintValidator) DBFunc(

// NoteRow implements the Validator interface.
func (v *FingerprintValidator) NoteRow(
ignoredPartition string, key, value string, updated hlc.Timestamp,
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
if v.firstRowTimestamp.IsEmpty() || updated.Less(v.firstRowTimestamp) {
v.firstRowTimestamp = updated
Expand Down Expand Up @@ -663,9 +705,11 @@ func (v *FingerprintValidator) Failures() []string {
type Validators []Validator

// NoteRow implements the Validator interface.
func (vs Validators) NoteRow(partition string, key, value string, updated hlc.Timestamp) error {
func (vs Validators) NoteRow(
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
for _, v := range vs {
if err := v.NoteRow(partition, key, value, updated); err != nil {
if err := v.NoteRow(partition, key, value, updated, topic); err != nil {
return err
}
}
Expand Down Expand Up @@ -707,10 +751,12 @@ func NewCountValidator(v Validator) *CountValidator {
}

// NoteRow implements the Validator interface.
func (v *CountValidator) NoteRow(partition string, key, value string, updated hlc.Timestamp) error {
func (v *CountValidator) NoteRow(
partition, key, value string, updated hlc.Timestamp, topic string,
) error {
v.NumRows++
v.rowsSinceResolved++
return v.v.NoteRow(partition, key, value, updated)
return v.v.NoteRow(partition, key, value, updated, topic)
}

// NoteResolved implements the Validator interface.
Expand Down
Loading

0 comments on commit c70abab

Please sign in to comment.