diff --git a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel index 29ae625e8736..55b5298bcdfe 100644 --- a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel @@ -70,6 +70,7 @@ go_test( "//pkg/sql/sem/tree", "//pkg/sql/types", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/leaktest", diff --git a/pkg/ccl/changefeedccl/cdcevent/event.go b/pkg/ccl/changefeedccl/cdcevent/event.go index 1b3f39a36f71..35581c09b5aa 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event.go +++ b/pkg/ccl/changefeedccl/cdcevent/event.go @@ -33,6 +33,8 @@ import ( "github.com/cockroachdb/redact" ) +const virtualColOrd = 1<<31 - 1 + // Metadata describes event metadata. type Metadata struct { TableID descpb.ID // Table ID. @@ -129,18 +131,29 @@ func (r Row) HasValues() bool { // forEachColumn is a helper which invokes fn for reach column in the ordColumn list. func (r Row) forEachDatum(fn DatumFn, colIndexes []int) error { + numVirtualCols := 0 for _, colIdx := range colIndexes { col := r.cols[colIdx] - if col.ord >= len(r.datums) { - return errors.AssertionFailedf("index [%d] out of range for column %q", col.ord, col.Name) - } - encDatum := r.datums[col.ord] - if err := encDatum.EnsureDecoded(col.Typ, r.alloc); err != nil { - return errors.Wrapf(err, "error decoding column %q as type %s", col.Name, col.Typ.String()) - } + // A datum row will never contain virtual columns. If we encounter a column that is virtual, + // then we need to offset each subsequent col.ord by 1. This offset is tracked by numVirtualCols. + physicalOrd := col.ord - numVirtualCols + if physicalOrd < len(r.datums) { + encDatum := r.datums[physicalOrd] + if err := encDatum.EnsureDecoded(col.Typ, r.alloc); err != nil { + return errors.Wrapf(err, "error decoding column %q as type %s", col.Name, col.Typ.String()) + } - if err := fn(encDatum.Datum, col); err != nil { - return iterutil.Map(err) + if err := fn(encDatum.Datum, col); err != nil { + return iterutil.Map(err) + } + } else if col.ord == virtualColOrd { + // Insert null values as placeholders for virtual columns. + if err := fn(tree.DNull, col); err != nil { + return iterutil.Map(err) + } + numVirtualCols++ + } else { + return errors.AssertionFailedf("index [%d] out of range for column %q", physicalOrd, col.Name) } } return nil @@ -248,22 +261,29 @@ func NewEventDescriptor( primaryKeyOrdinal.Set(desc.PublicColumns()[ord].GetID(), i) } - // Remaining columns go in same order as public columns. + // Remaining columns go in same order as public columns, + // with the exception that virtual columns are reordered + // to be at the end. inFamily := catalog.MakeTableColSet(family.ColumnIDs...) - for ord, col := range desc.PublicColumns() { + ord := 0 + for _, col := range desc.PublicColumns() { isInFamily := inFamily.Contains(col.GetID()) virtual := col.IsVirtual() && includeVirtualColumns - isValueCol := isInFamily || virtual pKeyOrd, isPKey := primaryKeyOrdinal.Get(col.GetID()) - if isValueCol || isPKey { + if isInFamily || isPKey { colIdx := addColumn(col, ord) - if isValueCol { + if isInFamily { sd.valueCols = append(sd.valueCols, colIdx) } if isPKey { sd.keyCols[pKeyOrd] = colIdx } + ord++ + } else if virtual { + colIdx := addColumn(col, virtualColOrd) + sd.valueCols = append(sd.valueCols, colIdx) + ord++ } } @@ -633,3 +653,32 @@ func TestingMakeEventRowFromEncDatums( alloc: &alloc, } } + +// getRelevantColumnsForFamily returns an array of column ids for public columns +// including only primary key columns and columns in the specified familyDesc, +// If includeVirtual is true, virtual columns, which may be outside the specified +// family, will be included. +func getRelevantColumnsForFamily( + tableDesc catalog.TableDescriptor, familyDesc *descpb.ColumnFamilyDescriptor, +) ([]descpb.ColumnID, error) { + cols := tableDesc.GetPrimaryIndex().CollectKeyColumnIDs() + for _, colID := range familyDesc.ColumnIDs { + cols.Add(colID) + } + + // Maintain the ordering of tableDesc.PublicColumns(), which is + // matches the order of columns in the SQL table. + idx := 0 + result := make([]descpb.ColumnID, cols.Len()) + for _, colID := range tableDesc.PublicColumnIDs() { + if cols.Contains(colID) { + result[idx] = colID + idx++ + } + } + + // Some columns in familyDesc.ColumnIDs may not be public, so + // result may contain fewer columns than cols. + result = result[:idx] + return result, nil +} diff --git a/pkg/ccl/changefeedccl/cdcevent/event_test.go b/pkg/ccl/changefeedccl/cdcevent/event_test.go index b2dbbbd08956..4821c5b34a23 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event_test.go +++ b/pkg/ccl/changefeedccl/cdcevent/event_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -61,35 +62,35 @@ CREATE TABLE foo ( for _, tc := range []struct { family *descpb.ColumnFamilyDescriptor includeVirtual bool - expectedKeyCols []ResultColumn - expectedColumns []ResultColumn - expectedUDTCols []ResultColumn + expectedKeyCols []string + expectedColumns []string + expectedUDTCols []string }{ { family: mainFamily, includeVirtual: false, - expectedKeyCols: expectResultColumns(t, tableDesc, "b", "a"), - expectedColumns: expectResultColumns(t, tableDesc, "a", "b", "e"), - expectedUDTCols: expectResultColumns(t, tableDesc, "e"), + expectedKeyCols: []string{"b", "a"}, + expectedColumns: []string{"a", "b", "e"}, + expectedUDTCols: []string{"e"}, }, { family: mainFamily, includeVirtual: true, - expectedKeyCols: expectResultColumns(t, tableDesc, "b", "a"), - expectedColumns: expectResultColumns(t, tableDesc, "a", "b", "d", "e"), - expectedUDTCols: expectResultColumns(t, tableDesc, "e"), + expectedKeyCols: []string{"b", "a"}, + expectedColumns: []string{"a", "b", "d", "e"}, + expectedUDTCols: []string{"e"}, }, { family: cFamily, includeVirtual: false, - expectedKeyCols: expectResultColumns(t, tableDesc, "b", "a"), - expectedColumns: expectResultColumns(t, tableDesc, "c"), + expectedKeyCols: []string{"b", "a"}, + expectedColumns: []string{"c"}, }, { family: cFamily, includeVirtual: true, - expectedKeyCols: expectResultColumns(t, tableDesc, "b", "a"), - expectedColumns: expectResultColumns(t, tableDesc, "c", "d"), + expectedKeyCols: []string{"b", "a"}, + expectedColumns: []string{"c", "d"}, }, } { t.Run(fmt.Sprintf("%s/includeVirtual=%t", tc.family.Name, tc.includeVirtual), func(t *testing.T) { @@ -104,9 +105,9 @@ CREATE TABLE foo ( // Verify primary key and family columns are as expected. r := Row{EventDescriptor: ed} - require.Equal(t, tc.expectedKeyCols, slurpColumns(t, r.ForEachKeyColumn())) - require.Equal(t, tc.expectedColumns, slurpColumns(t, r.ForEachColumn())) - require.Equal(t, tc.expectedUDTCols, slurpColumns(t, r.ForEachUDTColumn())) + require.Equal(t, expectResultColumns(t, tableDesc, tc.includeVirtual, tc.expectedKeyCols), slurpColumns(t, r.ForEachKeyColumn())) + require.Equal(t, expectResultColumns(t, tableDesc, tc.includeVirtual, tc.expectedColumns), slurpColumns(t, r.ForEachColumn())) + require.Equal(t, expectResultColumns(t, tableDesc, tc.includeVirtual, tc.expectedUDTCols), slurpColumns(t, r.ForEachUDTColumn())) }) } } @@ -351,6 +352,207 @@ CREATE TABLE foo ( } +func TestEventColumnOrderingWithSchemaChanges(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderRace(t) + skip.UnderStress(t) + + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + // Use alter column type to force column reordering. + sqlDB.Exec(t, `SET enable_experimental_alter_column_type_general = true`) + + type decodeExpectation struct { + expectUnwatchedErr bool + + keyValues []string + allValues []string + + refreshDescriptor bool + } + + for _, tc := range []struct { + testName string + familyName string // Must be set if targetType ChangefeedTargetSpecification_COLUMN_FAMILY + includeVirtual bool + actions []string + expectMainFamily []decodeExpectation + expectECFamily []decodeExpectation + }{ + { + testName: "main/main_cols", + familyName: "main", + actions: []string{ + "INSERT INTO foo (i,j,a,b) VALUES (0,1,'a0','b0')", + "ALTER TABLE foo ALTER COLUMN a SET DATA TYPE VARCHAR(16)", + "INSERT INTO foo (i,j,a,b) VALUES (1,2,'a1','b1')", + }, + expectMainFamily: []decodeExpectation{ + { + keyValues: []string{"1", "0"}, + allValues: []string{"0", "1", "a0", "b0"}, + }, + { + keyValues: []string{"1", "0"}, + allValues: []string{"0", "1", "a0", "b0"}, + }, + { + keyValues: []string{"1", "0"}, + allValues: []string{"0", "1", "a0", "b0"}, + }, + { + keyValues: []string{"2", "1"}, + allValues: []string{"1", "2", "a1", "b1"}, + }, + }, + }, + { + testName: "ec/ec_cols", + familyName: "ec", + actions: []string{ + "INSERT INTO foo (i,j,e,c) VALUES (2,3,'e2','c2')", + "ALTER TABLE foo ALTER COLUMN c SET DATA TYPE VARCHAR(16)", + "INSERT INTO foo (i,j,e,c) VALUES (3,4,'e3','c3')", + }, + expectMainFamily: []decodeExpectation{ + { + expectUnwatchedErr: true, + }, + { + expectUnwatchedErr: true, + }, + }, + expectECFamily: []decodeExpectation{ + { + keyValues: []string{"3", "2"}, + allValues: []string{"c2", "e2"}, + }, + { + keyValues: []string{"3", "2"}, + allValues: []string{"c2", "e2"}, + }, + { + keyValues: []string{"3", "2"}, + allValues: []string{"c2", "e2"}, + }, + { + keyValues: []string{"4", "3"}, + allValues: []string{"c3", "e3"}, + }, + }, + }, + { + testName: "ec/ec_cols_with_virtual", + familyName: "ec", + actions: []string{ + "INSERT INTO foo (i,j,e,c) VALUES (4,5,'e4','c4')", + "ALTER TABLE foo ALTER COLUMN c SET DATA TYPE VARCHAR(16)", + "INSERT INTO foo (i,j,e,c) VALUES (5,6,'e5','c5')", + }, + includeVirtual: true, + expectMainFamily: []decodeExpectation{ + { + expectUnwatchedErr: true, + }, + { + expectUnwatchedErr: true, + }, + }, + expectECFamily: []decodeExpectation{ + { + keyValues: []string{"5", "4"}, + allValues: []string{"c4", "NULL", "e4"}, + }, + { + keyValues: []string{"5", "4"}, + allValues: []string{"c4", "NULL", "e4"}, + refreshDescriptor: true, + }, + { + keyValues: []string{"5", "4"}, + allValues: []string{"c4", "NULL", "e4"}, + }, + { + keyValues: []string{"6", "5"}, + allValues: []string{"c5", "NULL", "e5"}, + }, + }, + }, + } { + t.Run(tc.testName, func(t *testing.T) { + sqlDB.Exec(t, ` + CREATE TABLE foo ( + i INT, + j INT, + a STRING, + b STRING, + c STRING, + d STRING AS (concat(e, c)) VIRTUAL, + e STRING, + PRIMARY KEY(j,i), + FAMILY main (i,j,a,b), + FAMILY ec (e,c) + )`) + + tableDesc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "foo") + popRow, cleanup := cdctest.MakeRangeFeedValueReader(t, s.ExecutorConfig(), tableDesc) + defer cleanup() + + targetType := jobspb.ChangefeedTargetSpecification_EACH_FAMILY + if tc.familyName != "" { + targetType = jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY + } + + for _, action := range tc.actions { + sqlDB.Exec(t, action) + } + + targets := changefeedbase.Targets{} + targets.Add(changefeedbase.Target{ + Type: targetType, + TableID: tableDesc.GetID(), + FamilyName: tc.familyName, + }) + serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig + ctx := context.Background() + decoder, err := NewEventDecoder(ctx, &serverCfg, targets, tc.includeVirtual) + require.NoError(t, err) + + expectedEvents := len(tc.expectMainFamily) + len(tc.expectECFamily) + for i := 0; i < expectedEvents; i++ { + v := popRow(t) + + eventFamilyID, err := TestingGetFamilyIDFromKey(decoder, v.Key, v.Timestamp()) + require.NoError(t, err) + + var expect decodeExpectation + if eventFamilyID == 0 { + expect, tc.expectMainFamily = tc.expectMainFamily[0], tc.expectMainFamily[1:] + } else { + expect, tc.expectECFamily = tc.expectECFamily[0], tc.expectECFamily[1:] + } + updatedRow, err := decoder.DecodeKV( + ctx, roachpb.KeyValue{Key: v.Key, Value: v.Value}, v.Timestamp()) + + if expect.expectUnwatchedErr { + require.ErrorIs(t, err, ErrUnwatchedFamily) + continue + } + + require.NoError(t, err) + require.True(t, updatedRow.IsInitialized()) + + require.Equal(t, expect.keyValues, slurpDatums(t, updatedRow.ForEachKeyColumn())) + require.Equal(t, expect.allValues, slurpDatums(t, updatedRow.ForEachColumn())) + } + sqlDB.Exec(t, `DROP TABLE foo`) + }) + } +} + func mustGetFamily( t *testing.T, desc catalog.TableDescriptor, familyID descpb.FamilyID, ) *descpb.ColumnFamilyDescriptor { @@ -361,9 +563,44 @@ func mustGetFamily( } func expectResultColumns( - t *testing.T, desc catalog.TableDescriptor, colNames ...string, + t *testing.T, desc catalog.TableDescriptor, includeVirtual bool, colNames []string, ) (res []ResultColumn) { t.Helper() + + // Map the column names to the expected ordinality. + // + // The ordinality values in EventDescriptor.keyCols, + // EventDescriptor.valueCols, and EventDescriptor.udtCols (which are indexes + // into a rowenc.EncDatumRow) are calculated in the following manner: Start + // with catalog.TableDescriptor.PublicColumns() and keep (i) the primary key + // columns, (ii) columns in a specified family, and (iii) virtual columns ( + // which may be outside the specified family). The + // remaining columns get filtered out. The position of a particular column in + // this array determines its ordinality. + // + // This function generates ordinality in the same manner, except it uses + // colNames instead of a column family descriptor when filtering columns. + colNamesSet := make(map[string]int) + for _, colName := range colNames { + colNamesSet[colName] = -1 + } + ord := 0 + for _, col := range desc.PublicColumns() { + colName := string(col.ColName()) + if _, ok := colNamesSet[colName]; ok { + if col.IsVirtual() { + colNamesSet[colName] = virtualColOrd + } else { + colNamesSet[colName] = ord + } + ord++ + } else if desc.GetPrimaryIndex().CollectKeyColumnIDs().Contains(col.GetID()) { + ord++ + } else if col.IsVirtual() && includeVirtual { + ord++ + } + } + for _, colName := range colNames { col, err := desc.FindColumnWithName(tree.Name(colName)) require.NoError(t, err) @@ -374,7 +611,7 @@ func expectResultColumns( TableID: desc.GetID(), PGAttributeNum: uint32(col.GetPGAttributeNum()), }, - ord: col.Ordinal(), + ord: colNamesSet[colName], sqlString: col.ColumnDesc().SQLStringNotHumanReadable(), }) } diff --git a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go index 7d29b5af9c91..50636c18d448 100644 --- a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go @@ -217,10 +217,13 @@ func (c *rowFetcherCache) RowFetcherForColumnFamily( var spec descpb.IndexFetchSpec - // TODO (zinger): Make fetchColumnIDs only the family and the primary key. - // This seems to cause an error without further work but would be more efficient. + relevantColumns, err := getRelevantColumnsForFamily(tableDesc, familyDesc) + if err != nil { + return nil, nil, err + } + if err := rowenc.InitIndexFetchSpec( - &spec, c.codec, tableDesc, tableDesc.GetPrimaryIndex(), tableDesc.PublicColumnIDs(), + &spec, c.codec, tableDesc, tableDesc.GetPrimaryIndex(), relevantColumns, ); err != nil { return nil, nil, err } @@ -236,10 +239,6 @@ func (c *rowFetcherCache) RowFetcherForColumnFamily( return nil, nil, err } - // Necessary because virtual columns are not populated. - // TODO(radu): should we stop requesting those columns from the fetcher? - rf.IgnoreUnexpectedNulls = true - c.fetchers.Add(idVer, f) return rf, familyDesc, nil }