From 8126b7ef5d1a7faa0d4f576065971900329465c4 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Fri, 23 Sep 2022 18:15:07 -0400 Subject: [PATCH] schemachanger/rel: fix race due to failure to clone constraint slots The fundamental race here is that while the slots themselves were being copied by value, the "any" clauses which are a slice were not. The second bug here is that the "inline" values were not being properly reset. That bug could lead to problems when the query was run again in the context of a different element set. We need to reset those inline values too. Fixes #88628 Release note: None --- pkg/sql/schemachanger/rel/BUILD.bazel | 3 + pkg/sql/schemachanger/rel/query_build.go | 2 +- pkg/sql/schemachanger/rel/query_data.go | 25 ++++- pkg/sql/schemachanger/rel/query_eval.go | 29 ++++- pkg/sql/schemachanger/rel/rel_test.go | 106 +++++++++++++++++- pkg/sql/tests/BUILD.bazel | 1 + .../tests/schema_changes_in_parallel_test.go | 77 +++++++++++++ 7 files changed, 233 insertions(+), 10 deletions(-) create mode 100644 pkg/sql/tests/schema_changes_in_parallel_test.go diff --git a/pkg/sql/schemachanger/rel/BUILD.bazel b/pkg/sql/schemachanger/rel/BUILD.bazel index 7c7b803dbc39..300f5b5118c3 100644 --- a/pkg/sql/schemachanger/rel/BUILD.bazel +++ b/pkg/sql/schemachanger/rel/BUILD.bazel @@ -62,7 +62,10 @@ go_test( "//pkg/sql/schemachanger/rel/internal/cyclegraphtest", "//pkg/sql/schemachanger/rel/internal/entitynodetest", "//pkg/sql/schemachanger/rel/reltest", + "//pkg/util/leaktest", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", + "@org_golang_x_sync//errgroup", ], ) diff --git a/pkg/sql/schemachanger/rel/query_build.go b/pkg/sql/schemachanger/rel/query_build.go index f4484810d738..cd975b6b6b15 100644 --- a/pkg/sql/schemachanger/rel/query_build.go +++ b/pkg/sql/schemachanger/rel/query_build.go @@ -303,7 +303,7 @@ func (p *queryBuilder) processValueExpr(rawValue expr) slotIdx { if err != nil { panic(err) } - return p.fillSlot(slot{not: tv}, false /* isEntity */) + return p.fillSlot(slot{not: &tv}, false /* isEntity */) case containsExpr: return p.processValueExpr(v.v) default: diff --git a/pkg/sql/schemachanger/rel/query_data.go b/pkg/sql/schemachanger/rel/query_data.go index 968cb0441bb8..385afb823b8b 100644 --- a/pkg/sql/schemachanger/rel/query_data.go +++ b/pkg/sql/schemachanger/rel/query_data.go @@ -41,7 +41,7 @@ type slot struct { // not holds a value which this slot must not be equal to. Additionally, // the value which fills this slot must have the same type as the value // in the not container. - not typedValue + not *typedValue } // typedValue is a value in its comparable form, which is to say, it is a @@ -75,6 +75,10 @@ func (tv typedValue) toInterface() interface{} { return tv.toValue().Interface() } +// inlineValue populates the inline value for the typedValue. The inline +// value is a single scalar which can be used to efficiently compare +// values, but it only has meaning in the context of the current entitySet. +// It must be cleared when moving to a new entity set. func (tv *typedValue) inlineValue(es *entitySet, attr ordinal) (uintptr, error) { if tv.inlineSet { return tv.inline, nil @@ -87,6 +91,11 @@ func (tv *typedValue) inlineValue(es *entitySet, attr ordinal) (uintptr, error) return tv.inline, nil } +// resetInline clears the inline value. +func (tv *typedValue) resetInline() { + tv.inlineSet, tv.inline = false, 0 +} + func (s *slot) eq(other slot) bool { // TODO(ajwerner): Deal with types. We may have two slots which both have // nil values but they differ in terms of types. @@ -107,6 +116,18 @@ func (s *slot) empty() bool { return s.value == nil } +func (s *slot) reset() { + s.typedValue = typedValue{} + if s.any != nil { + for i := 0; i < len(s.any); i++ { + s.any[i].resetInline() + } + } + if s.not != nil { + s.not.resetInline() + } +} + func maybeSet( slots []slot, idx slotIdx, tv typedValue, set *util.FastIntSet, ) (foundContradiction bool) { @@ -131,7 +152,7 @@ func maybeSet( } return false, false } - if s.not.typ != nil { + if s.not != nil { if tv.typ != s.not.typ || eqNotNil(s.not.value, tv.value) { return false, true } diff --git a/pkg/sql/schemachanger/rel/query_eval.go b/pkg/sql/schemachanger/rel/query_eval.go index 51c92cb7597c..6a4aedac3338 100644 --- a/pkg/sql/schemachanger/rel/query_eval.go +++ b/pkg/sql/schemachanger/rel/query_eval.go @@ -38,11 +38,32 @@ func newEvalContext(q *Query) *evalContext { return &evalContext{ q: q, depth: queryDepth(len(q.entities)), - slots: append(make([]slot, 0, len(q.slots)), q.slots...), + slots: cloneSlots(q.slots), facts: q.facts, } } +// cloneSlots clones the slots of a query for use in an evalContext. +func cloneSlots(slots []slot) []slot { + clone := append(make([]slot, 0, len(slots)), slots...) + for i := range clone { + // If there are any slots which map to a set of allowed values, we need + // to clone those values because during query evaluation, we'll fill in + // inline values in the context of the current entity set. This matters + // in particular for constraints related to entities or strings; their + // inline values depend on the entitySet. + if clone[i].any != nil { + vals := clone[i].any + clone[i].any = append(make([]typedValue, 0, len(vals)), vals...) + } + if clone[i].not != nil { + cloned := *clone[i].not + clone[i].not = &cloned + } + } + return clone +} + type evalResult evalContext func (ec *evalResult) Var(name Var) interface{} { @@ -135,9 +156,7 @@ func (ec *evalContext) visit(e entity) error { // evaluation and then unset them when we pop out of this stack frame. var slotsFilled util.FastIntSet defer func() { - slotsFilled.ForEach(func(i int) { - ec.slots[i].typedValue = typedValue{} - }) + slotsFilled.ForEach(func(i int) { ec.slots[i].reset() }) }() // Fill in the slot corresponding to this entity. It should not be filled @@ -408,7 +427,7 @@ func (ec *evalContext) visitSubquery(query int) (done bool, _ error) { defer sub.query.putEvalContext(sec) defer func() { // reset the slots populated to run the subquery sub.inputSlotMappings.ForEach(func(_, subSlot int) { - sec.slots[subSlot].typedValue = typedValue{} + sec.slots[subSlot].reset() }) }() if err := ec.bindSubQuerySlots(sub.inputSlotMappings, sec); err != nil { diff --git a/pkg/sql/schemachanger/rel/rel_test.go b/pkg/sql/schemachanger/rel/rel_test.go index 023cc5eeb9a2..ce88cbf18873 100644 --- a/pkg/sql/schemachanger/rel/rel_test.go +++ b/pkg/sql/schemachanger/rel/rel_test.go @@ -12,6 +12,7 @@ package rel_test import ( "fmt" + "math/rand" "reflect" "testing" @@ -20,7 +21,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/rel/internal/cyclegraphtest" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/rel/internal/entitynodetest" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/rel/reltest" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func TestRel(t *testing.T) { @@ -337,12 +341,12 @@ func TestTooManyAttributesInValues(t *testing.T) { } func TestRuleValidation(t *testing.T) { - type tooManyAttrs struct { + type entity struct { F1, F2 *uint32 } a1, a2 := stringAttr("a1"), stringAttr("a2") sc := rel.MustSchema("rules", - rel.EntityMapping(reflect.TypeOf((*tooManyAttrs)(nil)), + rel.EntityMapping(reflect.TypeOf((*entity)(nil)), rel.EntityAttr(a1, "F1"), rel.EntityAttr(a2, "F2"), ), @@ -439,3 +443,101 @@ func TestEmbeddedFieldsWork(t *testing.T) { "embedded pointer outer") }) } + +// TestConcurrentQueryInDifferentDatabases stresses some logic of the +// evalContext pooling to ensure that the state is properly reset between +// queries. An important property of this test is that it uses an any +// clause over entity pointers. When these pointers are inlined in the +// context of different databases, they will have different values. By +// randomizing the insertion order, we ensure that the inline values for +// the different entities differ. +// +// This test exercises the code which resets the inline values of slots in +// in the evalContext corresponding to the "not" and "any" constraints on that +// slot. These fields are pointers and need to be reset explicitly. The bug +// which motivated this test was that the slots were only being reset by value. +func TestConcurrentQueryInDifferentDatabases(t *testing.T) { + defer leaktest.AfterTest(t)() + + type entity struct { + Str string + Other *entity + } + var str, other stringAttr = "str", "other" + schema := rel.MustSchema("test", + rel.EntityMapping( + reflect.TypeOf((*entity)(nil)), + rel.EntityAttr(str, "Str"), + rel.EntityAttr(other, "Other"), + ), + ) + newDB := func() *rel.Database { + db, err := rel.NewDatabase(schema, rel.Index{Attrs: []rel.Attr{other}}) + require.NoError(t, err) + return db + } + const ( + numDBs = 3 + numEntities = 5 + numContainsVals = 3 + ) + makeEntities := func() (ret []*entity) { + for i := 0; i < numEntities; i++ { + ret = append(ret, &entity{Str: fmt.Sprintf("s%d", i)}) + } + for i := 0; i < numEntities; i++ { + ret[i].Other = ret[(i+1)%numEntities] + } + return ret + } + makeDBs := func() (ret []*rel.Database) { + for i := 0; i < numDBs; i++ { + ret = append(ret, newDB()) + } + return ret + } + addEntitiesToDB := func(db *rel.Database, entities []*entity) { + for _, i := range rand.Perm(len(entities)) { + require.NoError(t, db.Insert(entities[i])) + } + } + addEntitiesToDBs := func(dbs []*rel.Database, entities []*entity) { + for _, db := range dbs { + addEntitiesToDB(db, entities) + } + } + + dbs, entities := makeDBs(), makeEntities() + addEntitiesToDBs(dbs, entities) + assert.Less(t, numContainsVals, numEntities) + makeContainsVals := func(entities []*entity) (ret []interface{}) { + for i := 0; i < numContainsVals; i++ { + ret = append(ret, entities[i+1]) + } + return ret + } + type v = rel.Var + q, err := rel.NewQuery(schema, + v("e").AttrIn(other, makeContainsVals(entities)...), + v("e").AttrNeq(rel.Self, entities[0]), // exclude the first entity + ) + require.NoError(t, err) + var N = 8 + exp := entities[1:numContainsVals] // the first entity is excluded + run := func(i int) func() error { + return func() error { + var got []*entity + assert.NoError(t, q.Iterate(dbs[i%len(dbs)], func(r rel.Result) error { + got = append(got, r.Var("e").(*entity)) + return nil + })) + assert.EqualValues(t, exp, got) + return nil + } + } + var g errgroup.Group + for i := 0; i < N; i++ { + g.Go(run(i)) + } + require.NoError(t, g.Wait()) +} diff --git a/pkg/sql/tests/BUILD.bazel b/pkg/sql/tests/BUILD.bazel index d31aef3a43d1..5cf34956a2d2 100644 --- a/pkg/sql/tests/BUILD.bazel +++ b/pkg/sql/tests/BUILD.bazel @@ -42,6 +42,7 @@ go_test( "rename_column_test.go", "repair_test.go", "rsg_test.go", + "schema_changes_in_parallel_test.go", "split_test.go", "system_table_test.go", "table_split_test.go", diff --git a/pkg/sql/tests/schema_changes_in_parallel_test.go b/pkg/sql/tests/schema_changes_in_parallel_test.go new file mode 100644 index 000000000000..4b96f7e86556 --- /dev/null +++ b/pkg/sql/tests/schema_changes_in_parallel_test.go @@ -0,0 +1,77 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +// TestSchemaChangesInParallel exists to try to shake out races in the +// declarative schema changer infrastructure. At its time of writing, it +// effectively reproduced a race in the rules engine's object pooling. +func TestSchemaChangesInParallel(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + GCJob: &sql.GCJobTestingKnobs{ + SkipWaitingForMVCCGC: true, + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + }) + defer s.Stopper().Stop(ctx) + + const N = 4 + run := func(i int) func() (retErr error) { + return func() (retErr error) { + conn, err := sqlDB.Conn(ctx) + if err != nil { + return err + } + defer func() { + retErr = errors.CombineErrors(retErr, conn.Close()) + }() + for _, stmt := range []string{ + fmt.Sprintf("CREATE DATABASE db%d", i), + fmt.Sprintf("USE db%d", i), + "CREATE TABLE t (i INT PRIMARY KEY, k INT)", + "ALTER TABLE t ADD COLUMN j INT DEFAULT 42", + "ALTER TABLE t DROP COLUMN k", + "CREATE SEQUENCE s", + "ALTER TABLE t ADD COLUMN l INT DEFAULT nextval('s')", + fmt.Sprintf("DROP DATABASE db%d", i), + } { + if _, err := conn.ExecContext(ctx, stmt); err != nil { + return err + } + } + return nil + } + } + var g errgroup.Group + for i := 0; i < N; i++ { + g.Go(run(i)) + } + require.NoError(t, g.Wait()) +}