Skip to content

Commit

Permalink
stats: use available type metadata when hydrating UDTs
Browse files Browse the repository at this point in the history
This commit updates the table stats cache to use the correct metadata when
hydrating the UDTs for the table stats read from disk. Previously, we
would always use the `DescsTxn` helper to run a separate txn to create
the type resolved, which I think meant that we would use latest
_committed_ metadata; however, if the UDT modification happened within
the current not-yet-committed txn, then we would use stale metadata
which could lead to failing an assertion later (that we used enums of
different versions). In particular, this would happen if we added a new
value to the UDT and then would use the previously-existing value in
a filter. Note that we correctly determined that the stats cache entry
was stale, we simply used stale type metadata to hydrate the UDTs.

To fix the problem this commit plumbs the type resolver all the way from
the caller who is requesting the table stats. The previous behavior is
acceptable in some cases (in backups), so the type resolver is optional.

Release note (bug fix): Previously, CockroachDB could encounter an
internal error `comparison of two different versions of enum` in some
cases when a user-defined type was modified within a transaction and
following statements read the column of that user-defined type. The bug
was introduced in 24.2 version and is now fixed.
  • Loading branch information
yuzefovich committed Dec 24, 2024
1 parent d58f071 commit 032e1be
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 51 deletions.
4 changes: 3 additions & 1 deletion pkg/backup/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,9 @@ func getTableStatsForBackup(
for i := range descs {
if tbl, _, _, _, _ := descpb.GetDescriptors(&descs[i]); tbl != nil {
tableDesc := tabledesc.NewBuilder(tbl).BuildImmutableTable()
tableStatisticsAcc, err := statsCache.GetTableStats(ctx, tableDesc)
// nil typeResolver means that we'll use the latest committed type
// metadata which is acceptable.
tableStatisticsAcc, err := statsCache.GetTableStats(ctx, tableDesc, nil /* typeResolver */)
if err != nil {
log.Warningf(ctx, "failed to collect stats for table: %s, "+
"table ID: %d during a backup: %s", tableDesc.GetName(), tableDesc.GetID(),
Expand Down
15 changes: 13 additions & 2 deletions pkg/sql/distsql_plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand Down Expand Up @@ -300,8 +301,13 @@ func (dsp *DistSQLPlanner) createPartialStatsPlan(
return nil, pgerror.Newf(pgcode.FeatureNotSupported, "multi-column partial statistics are not currently supported")
}

var typeResolver *descs.DistSQLTypeResolver
if p := planCtx.planner; p != nil {
r := descs.NewDistSQLTypeResolver(p.Descriptors(), p.Txn())
typeResolver = &r
}
// Fetch all stats for the table that matches the given table descriptor.
tableStats, err := planCtx.ExtendedEvalCtx.ExecCfg.TableStatsCache.GetTableStats(ctx, desc)
tableStats, err := planCtx.ExtendedEvalCtx.ExecCfg.TableStatsCache.GetTableStats(ctx, desc, typeResolver)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -690,7 +696,12 @@ func (dsp *DistSQLPlanner) createStatsPlan(
}
}

tableStats, err := planCtx.ExtendedEvalCtx.ExecCfg.TableStatsCache.GetTableStats(ctx, desc)
var typeResolver *descs.DistSQLTypeResolver
if p := planCtx.planner; p != nil {
r := descs.NewDistSQLTypeResolver(p.Descriptors(), p.Txn())
typeResolver = &r
}
tableStats, err := planCtx.ExtendedEvalCtx.ExecCfg.TableStatsCache.GetTableStats(ctx, desc, typeResolver)
if err != nil {
return nil, err
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/stats
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,17 @@ CREATE TABLE t122312 (s STRING, g greeting AS (s::greeting) STORED);

statement ok
ANALYZE t122312;

# Regression for not using the latest type metadata after the UDT modification
# within the same txn (#129623).
statement ok
INSERT INTO t122312 VALUES ('hi');

statement ok
ANALYZE t122312;

statement ok
BEGIN;
ALTER TYPE greeting ADD VALUE 'hey';
SELECT * FROM t122312 WHERE g = 'hi';
COMMIT;
8 changes: 7 additions & 1 deletion pkg/sql/opt_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/opt/cat"
Expand Down Expand Up @@ -567,8 +568,13 @@ func (oc *optCatalog) dataSourceForTable(
// statistics and the zone config haven't changed.
var tableStats []*stats.TableStatistic
if !flags.NoTableStats {
var typeResolver *descs.DistSQLTypeResolver
if p := oc.planner; p != nil {
r := descs.NewDistSQLTypeResolver(p.Descriptors(), p.Txn())
typeResolver = &r
}
var err error
tableStats, err = oc.planner.execCfg.TableStatsCache.GetTableStats(ctx, desc)
tableStats, err = oc.planner.execCfg.TableStatsCache.GetTableStats(ctx, desc, typeResolver)
if err != nil {
// Ignore any error. We still want to be able to run queries even if we lose
// access to the statistics table.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sem/tree/datum.go
Original file line number Diff line number Diff line change
Expand Up @@ -5363,7 +5363,7 @@ func (d *DEnum) Compare(ctx context.Context, cmpCtx CompareContext, other Datum)
if v.EnumTyp.TypeMeta.Version != d.EnumTyp.TypeMeta.Version {
panic(errors.AssertionFailedf(
"comparison of two different versions of enum %s oid %d: versions %d and %d",
errors.Safe(d.EnumTyp.SQLString), d.EnumTyp.Oid(), d.EnumTyp.TypeMeta.Version,
d.EnumTyp.SQLStringForError(), errors.Safe(d.EnumTyp.Oid()), d.EnumTyp.TypeMeta.Version,
v.EnumTyp.TypeMeta.Version,
))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/stats/automatic_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ func (r *Refresher) maybeRefreshStats(
asOf time.Duration,
maybeRefreshPartialStats bool,
) {
tableStats, err := r.cache.getTableStatsFromCache(ctx, tableID, nil /* forecast */, nil /* udtCols */)
tableStats, err := r.cache.getTableStatsFromCache(ctx, tableID, nil /* forecast */, nil /* udtCols */, nil /* typeResolver */)
if err != nil {
log.Errorf(ctx, "failed to get table statistics: %v", err)
return
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/stats/automatic_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func TestAverageRefreshTime(t *testing.T) {

checkAverageRefreshTime := func(expected time.Duration) error {
return testutils.SucceedsSoonError(func() error {
stats, err := cache.GetTableStats(ctx, table)
stats, err := cache.GetTableStats(ctx, table, nil /* typeResolver */)
if err != nil {
return err
}
Expand All @@ -425,7 +425,7 @@ func TestAverageRefreshTime(t *testing.T) {
// expectedAge time ago if lessThan is true (false).
checkMostRecentStat := func(expectedAge time.Duration, lessThan bool) error {
return testutils.SucceedsSoonError(func() error {
stats, err := cache.GetTableStats(ctx, table)
stats, err := cache.GetTableStats(ctx, table, nil /* typeResolver */)
if err != nil {
return err
}
Expand Down Expand Up @@ -913,7 +913,7 @@ func checkStatsCount(
return testutils.SucceedsSoonError(func() error {
cache.InvalidateTableStats(ctx, table.GetID())

stats, err := cache.GetTableStats(ctx, table)
stats, err := cache.GetTableStats(ctx, table, nil /* typeResolver */)
if err != nil {
return err
}
Expand Down Expand Up @@ -946,7 +946,7 @@ func compareStatsCountWithZero(
desc :=
desctestutils.TestingGetPublicTableDescriptor(s.DB(), s.Codec(), "system", tableName)
return testutils.SucceedsSoonError(func() error {
stats, err := cache.GetTableStats(ctx, desc)
stats, err := cache.GetTableStats(ctx, desc, nil /* typeResolver */)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/stats/delete_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func TestDeleteOldStatsForColumns(t *testing.T) {

return testutils.SucceedsSoonError(func() error {
tableStats, err := cache.getTableStatsFromCache(
ctx, tableID, nil /* forecast */, nil, /* udtCols */
ctx, tableID, nil /* forecast */, nil /* udtCols */, nil, /* typeResolver */
)
if err != nil {
return err
Expand All @@ -270,7 +270,7 @@ func TestDeleteOldStatsForColumns(t *testing.T) {
stat := &testData[i]
if stat.TableID != tableID {
stats, err := cache.getTableStatsFromCache(
ctx, stat.TableID, nil /* forecast */, nil, /* udtCols */
ctx, stat.TableID, nil /* forecast */, nil /* udtCols */, nil, /* typeResolver */
)
if err != nil {
return err
Expand Down Expand Up @@ -558,7 +558,7 @@ func TestDeleteOldStatsForOtherColumns(t *testing.T) {

return testutils.SucceedsSoonError(func() error {
tableStats, err := cache.getTableStatsFromCache(
ctx, tableID, nil /* forecast */, nil, /* udtCols */
ctx, tableID, nil /* forecast */, nil /* udtCols */, nil, /* typeResolver */
)
if err != nil {
return err
Expand All @@ -568,7 +568,7 @@ func TestDeleteOldStatsForOtherColumns(t *testing.T) {
stat := &testData[i]
if stat.TableID != tableID {
stats, err := cache.getTableStatsFromCache(
ctx, stat.TableID, nil /* forecast */, nil, /* udtCols */
ctx, stat.TableID, nil /* forecast */, nil /* udtCols */, nil, /* typeResolver */
)
if err != nil {
return err
Expand Down
76 changes: 44 additions & 32 deletions pkg/sql/stats/stats_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,20 +214,24 @@ func decodeTableStatisticsKV(
// and if the stats are not present in the cache, it looks them up in
// system.table_statistics.
//
// typeResolver argument is optional and will be used to hydrate all
// user-defined types. If the resolver is not provided, then the latest
// committed type metadata will be used.
//
// The function returns an error if we could not query the system table. It
// silently ignores any statistics that can't be decoded (e.g. because
// user-defined types don't exit).
//
// The statistics are ordered by their CreatedAt time (newest-to-oldest).
func (sc *TableStatisticsCache) GetTableStats(
ctx context.Context, table catalog.TableDescriptor,
ctx context.Context, table catalog.TableDescriptor, typeResolver *descs.DistSQLTypeResolver,
) (stats []*TableStatistic, err error) {
if !statsUsageAllowed(table, sc.settings) {
return nil, nil
}
forecast := forecastAllowed(table, sc.settings)
return sc.getTableStatsFromCache(
ctx, table.GetID(), &forecast, table.UserDefinedTypeColumns(),
ctx, table.GetID(), &forecast, table.UserDefinedTypeColumns(), typeResolver,
)
}

Expand Down Expand Up @@ -313,7 +317,11 @@ func forecastAllowed(table catalog.TableDescriptor, clusterSettings *cluster.Set
// getTableStatsFromCache is like GetTableStats but assumes that the table ID
// is safe to fetch statistics for: non-system, non-virtual, non-view, etc.
func (sc *TableStatisticsCache) getTableStatsFromCache(
ctx context.Context, tableID descpb.ID, forecast *bool, udtCols []catalog.Column,
ctx context.Context,
tableID descpb.ID,
forecast *bool,
udtCols []catalog.Column,
typeResolver *descs.DistSQLTypeResolver,
) ([]*TableStatistic, error) {
sc.mu.Lock()
defer sc.mu.Unlock()
Expand All @@ -327,7 +335,7 @@ func (sc *TableStatisticsCache) getTableStatsFromCache(
}
}

return sc.addCacheEntryLocked(ctx, tableID, forecast != nil && *forecast)
return sc.addCacheEntryLocked(ctx, tableID, forecast != nil && *forecast, typeResolver)
}

// isStale checks whether we need to evict and re-load the cache entry.
Expand Down Expand Up @@ -403,7 +411,7 @@ func (sc *TableStatisticsCache) lookupStatsLocked(
// - stats are retrieved from database:
// - mutex is locked again and the entry is updated.
func (sc *TableStatisticsCache) addCacheEntryLocked(
ctx context.Context, tableID descpb.ID, forecast bool,
ctx context.Context, tableID descpb.ID, forecast bool, typeResolver *descs.DistSQLTypeResolver,
) (stats []*TableStatistic, err error) {
// Add a cache entry that other queries can find and wait on until we have the
// stats.
Expand All @@ -420,7 +428,7 @@ func (sc *TableStatisticsCache) addCacheEntryLocked(
defer sc.mu.Lock()

log.VEventf(ctx, 1, "reading statistics for table %d", tableID)
stats, udts, err = sc.getTableStatsFromDB(ctx, tableID, forecast, sc.settings)
stats, udts, err = sc.getTableStatsFromDB(ctx, tableID, forecast, sc.settings, typeResolver)
log.VEventf(ctx, 1, "finished reading statistics for table %d", tableID)
}()

Expand Down Expand Up @@ -486,7 +494,7 @@ func (sc *TableStatisticsCache) refreshCacheEntry(

log.VEventf(ctx, 1, "refreshing statistics for table %d", tableID)
// TODO(radu): pass the timestamp and use AS OF SYSTEM TIME.
stats, udts, err = sc.getTableStatsFromDB(ctx, tableID, forecast, sc.settings)
stats, udts, err = sc.getTableStatsFromDB(ctx, tableID, forecast, sc.settings, nil /* typeResolver */)
log.VEventf(ctx, 1, "done refreshing statistics for table %d", tableID)
}()
if e.lastRefreshTimestamp.Equal(ts) {
Expand Down Expand Up @@ -625,7 +633,7 @@ func NewTableStatisticProto(datums tree.Datums) (*TableStatisticProto, error) {
// parseStats converts the given datums to a TableStatistic object. It might
// need to run a query to get user defined type metadata.
func (sc *TableStatisticsCache) parseStats(
ctx context.Context, datums tree.Datums,
ctx context.Context, datums tree.Datums, typeResolver *descs.DistSQLTypeResolver,
) (_ *TableStatistic, _ *types.T, err error) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -651,33 +659,33 @@ func (sc *TableStatisticsCache) parseStats(
res := &TableStatistic{TableStatisticProto: *tsp}
var udt *types.T
if res.HistogramData != nil && (len(res.HistogramData.Buckets) > 0 || res.RowCount == res.NullCount) {
// hydrate the type in case any user defined types are present.
// Hydrate the type in case any user defined types are present.
// There are cases where typ is nil, so don't do anything if so.
if typ := res.HistogramData.ColumnType; typ != nil && typ.UserDefined() {
// The metadata accessed here is never older than the metadata used when
// collecting the stats. Changes to types are backwards compatible across
// versions, so using a newer version of the type metadata here is safe.
// Given that we never delete members from enum types, a descriptor we
// get from the lease manager will be able to be used to decode these stats,
// even if it wasn't the descriptor that was used to collect the stats.
// If have types that are not backwards compatible in this way, then we
// will need to start writing a timestamp on the stats objects and request
// TypeDescriptor's with the timestamp that the stats were recorded with.
//
// TODO(ajwerner): We now do delete members from enum types. See #67050.
if err := sc.db.DescsTxn(ctx, func(
ctx context.Context, txn descs.Txn,
) error {
resolver := descs.NewDistSQLTypeResolver(txn.Descriptors(), txn.KV())
var err error
udt, err = resolver.ResolveTypeByOID(ctx, typ.Oid())
if typeResolver != nil {
udt, err = typeResolver.ResolveTypeByOID(ctx, typ.Oid())
if err != nil {
return nil, nil, err
}
res.HistogramData.ColumnType = udt
return err
}); err != nil {
return nil, nil, err
} else {
// The metadata accessed here is never older than the metadata
// used when collecting the stats. Changes to types are
// backwards compatible across versions, so using a newer
// version of the type metadata here is safe.
if err = sc.db.DescsTxn(ctx, func(
ctx context.Context, txn descs.Txn,
) error {
resolver := descs.NewDistSQLTypeResolver(txn.Descriptors(), txn.KV())
udt, err = resolver.ResolveTypeByOID(ctx, typ.Oid())
res.HistogramData.ColumnType = udt
return err
}); err != nil {
return nil, nil, err
}
}
}
if err := DecodeHistogramBuckets(res); err != nil {
if err = DecodeHistogramBuckets(res); err != nil {
return nil, nil, err
}
}
Expand Down Expand Up @@ -791,7 +799,11 @@ func (tsp *TableStatisticProto) IsAuto() bool {
// It ignores any statistics that cannot be decoded (e.g. because a user-defined
// type that doesn't exist) and returns the rest (with no error).
func (sc *TableStatisticsCache) getTableStatsFromDB(
ctx context.Context, tableID descpb.ID, forecast bool, st *cluster.Settings,
ctx context.Context,
tableID descpb.ID,
forecast bool,
st *cluster.Settings,
typeResolver *descs.DistSQLTypeResolver,
) (_ []*TableStatistic, _ map[descpb.ColumnID]*types.T, err error) {
getTableStatisticsStmt := `
SELECT
Expand Down Expand Up @@ -842,7 +854,7 @@ ORDER BY "createdAt" DESC, "columnIDs" DESC, "statisticID" DESC
var udts map[descpb.ColumnID]*types.T
var ok bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
stats, udt, err := sc.parseStats(ctx, it.Cur())
stats, udt, err := sc.parseStats(ctx, it.Cur(), typeResolver)
if err != nil {
log.Warningf(ctx, "could not decode statistic for table %d: %v", tableID, err)
continue
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/stats/stats_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func checkStatsForTable(

// Perform the lookup and refresh, and confirm the
// returned stats match the expected values.
statsList, err := sc.getTableStatsFromCache(ctx, tableID, nil /* forecast */, nil /* udtCols */)
statsList, err := sc.getTableStatsFromCache(ctx, tableID, nil /* forecast */, nil /* udtCols */, nil /* typeResolver */)
if err != nil {
t.Fatalf("error retrieving stats: %s", err)
}
Expand Down Expand Up @@ -338,7 +338,7 @@ func TestCacheUserDefinedTypes(t *testing.T) {
tbl := desctestutils.TestingGetPublicTableDescriptor(kvDB, s.Codec(), "t", "tt")
// Get stats for our table. We are ensuring here that the access to the stats
// for tt properly hydrates the user defined type t before access.
stats, err := sc.GetTableStats(ctx, tbl)
stats, err := sc.GetTableStats(ctx, tbl, nil /* typeResolver */)
if err != nil {
t.Fatal(err)
}
Expand All @@ -353,7 +353,7 @@ func TestCacheUserDefinedTypes(t *testing.T) {
sc.InvalidateTableStats(ctx, tbl.GetID())
// Verify that GetTableStats ignores the statistic on the now unknown type and
// returns the rest.
stats, err = sc.GetTableStats(ctx, tbl)
stats, err = sc.GetTableStats(ctx, tbl, nil /* typeResolver */)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -402,7 +402,7 @@ func TestCacheWait(t *testing.T) {
for n := 0; n < 10; n++ {
wg.Add(1)
go func() {
stats, err := sc.getTableStatsFromCache(ctx, id, nil /* forecast */, nil /* udtCols */)
stats, err := sc.getTableStatsFromCache(ctx, id, nil /* forecast */, nil /* udtCols */, nil /* typeResolver */)
if err != nil {
t.Error(err)
} else if !checkStats(stats, expectedStats[id]) {
Expand Down Expand Up @@ -451,7 +451,7 @@ func TestCacheAutoRefresh(t *testing.T) {
tableDesc := desctestutils.TestingGetPublicTableDescriptor(s.DB(), s.Codec(), "test", "t")

expectNStats := func(n int) error {
stats, err := sc.GetTableStats(ctx, tableDesc)
stats, err := sc.GetTableStats(ctx, tableDesc, nil /* typeResolver */)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 032e1be

Please sign in to comment.