Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdcevent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ go_test(
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
Expand Down
77 changes: 63 additions & 14 deletions pkg/ccl/changefeedccl/cdcevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/cockroachdb/redact"
)

const virtualColOrd = 1<<31 - 1

// Metadata describes event metadata.
type Metadata struct {
TableID descpb.ID // Table ID.
Expand Down Expand Up @@ -129,18 +131,29 @@ func (r Row) HasValues() bool {

// forEachColumn is a helper which invokes fn for reach column in the ordColumn list.
func (r Row) forEachDatum(fn DatumFn, colIndexes []int) error {
numVirtualCols := 0
for _, colIdx := range colIndexes {
col := r.cols[colIdx]
if col.ord >= len(r.datums) {
return errors.AssertionFailedf("index [%d] out of range for column %q", col.ord, col.Name)
}
encDatum := r.datums[col.ord]
if err := encDatum.EnsureDecoded(col.Typ, r.alloc); err != nil {
return errors.Wrapf(err, "error decoding column %q as type %s", col.Name, col.Typ.String())
}
// A datum row will never contain virtual columns. If we encounter a column that is virtual,
// then we need to offset each subsequent col.ord by 1. This offset is tracked by numVirtualCols.
physicalOrd := col.ord - numVirtualCols
if physicalOrd < len(r.datums) {
encDatum := r.datums[physicalOrd]
if err := encDatum.EnsureDecoded(col.Typ, r.alloc); err != nil {
return errors.Wrapf(err, "error decoding column %q as type %s", col.Name, col.Typ.String())
}

if err := fn(encDatum.Datum, col); err != nil {
return iterutil.Map(err)
if err := fn(encDatum.Datum, col); err != nil {
return iterutil.Map(err)
}
} else if col.ord == virtualColOrd {
// Insert null values as placeholders for virtual columns.
if err := fn(tree.DNull, col); err != nil {
return iterutil.Map(err)
}
numVirtualCols++
} else {
return errors.AssertionFailedf("index [%d] out of range for column %q", physicalOrd, col.Name)
}
}
return nil
Expand Down Expand Up @@ -248,22 +261,29 @@ func NewEventDescriptor(
primaryKeyOrdinal.Set(desc.PublicColumns()[ord].GetID(), i)
}

// Remaining columns go in same order as public columns.
// Remaining columns go in same order as public columns,
// with the exception that virtual columns are reordered
// to be at the end.
inFamily := catalog.MakeTableColSet(family.ColumnIDs...)
for ord, col := range desc.PublicColumns() {
ord := 0
for _, col := range desc.PublicColumns() {
isInFamily := inFamily.Contains(col.GetID())
virtual := col.IsVirtual() && includeVirtualColumns
isValueCol := isInFamily || virtual
pKeyOrd, isPKey := primaryKeyOrdinal.Get(col.GetID())
if isValueCol || isPKey {
if isInFamily || isPKey {
colIdx := addColumn(col, ord)
if isValueCol {
if isInFamily {
sd.valueCols = append(sd.valueCols, colIdx)
}

if isPKey {
sd.keyCols[pKeyOrd] = colIdx
}
ord++
} else if virtual {
colIdx := addColumn(col, virtualColOrd)
sd.valueCols = append(sd.valueCols, colIdx)
ord++
}
}

Expand Down Expand Up @@ -633,3 +653,32 @@ func TestingMakeEventRowFromEncDatums(
alloc: &alloc,
}
}

// getRelevantColumnsForFamily returns an array of column ids for public columns
// including only primary key columns and columns in the specified familyDesc,
// If includeVirtual is true, virtual columns, which may be outside the specified
// family, will be included.
func getRelevantColumnsForFamily(
tableDesc catalog.TableDescriptor, familyDesc *descpb.ColumnFamilyDescriptor,
) ([]descpb.ColumnID, error) {
cols := tableDesc.GetPrimaryIndex().CollectKeyColumnIDs()
for _, colID := range familyDesc.ColumnIDs {
cols.Add(colID)
}

// Maintain the ordering of tableDesc.PublicColumns(), which is
// matches the order of columns in the SQL table.
idx := 0
result := make([]descpb.ColumnID, cols.Len())
for _, colID := range tableDesc.PublicColumnIDs() {
if cols.Contains(colID) {
result[idx] = colID
idx++
}
}

// Some columns in familyDesc.ColumnIDs may not be public, so
// result may contain fewer columns than cols.
result = result[:idx]
return result, nil
}
Loading