Skip to content

Commit 727a968

Browse files
Rishabh Mittalti-chi-bot
authored and
GitHub Enterprise
committed
infoschema: fix issue of information schema cache miss cause by schema version gap (pingcap#53445) (pingcap#53583) (pingcap#97)
close pingcap#53428 Co-authored-by: Ti Chi Robot <[email protected]>
1 parent 1c01241 commit 727a968

File tree

3 files changed

+191
-3
lines changed

3 files changed

+191
-3
lines changed

domain/domain.go

+2
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,8 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
380380
if diff == nil {
381381
// Empty diff means the txn of generating schema version is committed, but the txn of `runDDLJob` is not or fail.
382382
// It is safe to skip the empty diff because the infoschema is new enough and consistent.
383+
logutil.BgLogger().Info("diff load InfoSchema get empty schema diff", zap.Int64("version", usedVersion))
384+
do.infoCache.InsertEmptySchemaVersion(usedVersion)
383385
continue
384386
}
385387
diffs = append(diffs, diff)

infoschema/cache.go

+63-3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ type InfoCache struct {
4040
mu sync.RWMutex
4141
// cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order
4242
cache []schemaAndTimestamp
43+
44+
// emptySchemaVersions stores schema version which has no schema_diff.
45+
emptySchemaVersions map[int64]struct{}
4346
}
4447

4548
type schemaAndTimestamp struct {
@@ -50,7 +53,8 @@ type schemaAndTimestamp struct {
5053
// NewCache creates a new InfoCache.
5154
func NewCache(capacity int) *InfoCache {
5255
return &InfoCache{
53-
cache: make([]schemaAndTimestamp, 0, capacity),
56+
cache: make([]schemaAndTimestamp, 0, capacity),
57+
emptySchemaVersions: make(map[int64]struct{}),
5458
}
5559
}
5660

@@ -102,6 +106,11 @@ func (h *InfoCache) Len() int {
102106
return len(h.cache)
103107
}
104108

109+
// GetEmptySchemaVersions returns emptySchemaVersions, exports for testing.
110+
func (h *InfoCache) GetEmptySchemaVersions() map[int64]struct{} {
111+
return h.emptySchemaVersions
112+
}
113+
105114
func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) {
106115
logutil.BgLogger().Debug("SCHEMA CACHE get schema", zap.Uint64("timestamp", ts))
107116
// search one by one instead of binary search, because the timestamp of a schema could be 0
@@ -110,14 +119,43 @@ func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) {
110119
// thus it may have better performance than binary search
111120
for i, is := range h.cache {
112121
if is.timestamp == 0 || ts < uint64(is.timestamp) {
122+
// is.timestamp == 0 means the schema ts is unknown, so we can't use it, then just skip it.
123+
// ts < is.timestamp means the schema is newer than ts, so we can't use it too, just skip it to find the older one.
113124
continue
114125
}
126+
// ts >= is.timestamp must be true after the above condition.
115127
if i == 0 {
128+
// the first element is the latest schema, so we can return it directly.
116129
return is.infoschema, true
117130
}
118-
if h.cache[i-1].infoschema.SchemaMetaVersion() == is.infoschema.SchemaMetaVersion()+1 && uint64(h.cache[i-1].timestamp) > ts {
119-
return is.infoschema, true
131+
132+
if uint64(h.cache[i-1].timestamp) > ts {
133+
// The first condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts.
134+
lastVersion := h.cache[i-1].infoschema.SchemaMetaVersion()
135+
currentVersion := is.infoschema.SchemaMetaVersion()
136+
if lastVersion == currentVersion+1 {
137+
// This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10,
138+
// but current(cache[i]) schema-version is not 9, then current schema may not suitable for ts.
139+
return is.infoschema, true
140+
}
141+
if lastVersion > currentVersion {
142+
found := true
143+
for ver := currentVersion + 1; ver < lastVersion; ver++ {
144+
_, ok := h.emptySchemaVersions[ver]
145+
if !ok {
146+
found = false
147+
break
148+
}
149+
}
150+
if found {
151+
// This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, and
152+
// current(cache[i]) schema-version is 8, then there is a gap exist, and if all the gap version can be found in cache.emptySchemaVersions
153+
// which means those gap versions don't have schema info, then current schema is also suitable for ts.
154+
return is.infoschema, true
155+
}
156+
}
120157
}
158+
// current schema is not suitable for ts, then break the loop to avoid the unnecessary search.
121159
break
122160
}
123161

@@ -225,3 +263,25 @@ func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool {
225263

226264
return true
227265
}
266+
267+
// InsertEmptySchemaVersion inserts empty schema version into a map. If exceeded the cache capacity, remove the oldest version.
268+
func (h *InfoCache) InsertEmptySchemaVersion(version int64) {
269+
h.mu.Lock()
270+
defer h.mu.Unlock()
271+
272+
h.emptySchemaVersions[version] = struct{}{}
273+
if len(h.emptySchemaVersions) > cap(h.cache) {
274+
// remove oldest version.
275+
versions := make([]int64, 0, len(h.emptySchemaVersions))
276+
for ver := range h.emptySchemaVersions {
277+
versions = append(versions, ver)
278+
}
279+
sort.Slice(versions, func(i, j int) bool { return versions[i] < versions[j] })
280+
for _, ver := range versions {
281+
delete(h.emptySchemaVersions, ver)
282+
if len(h.emptySchemaVersions) <= cap(h.cache) {
283+
break
284+
}
285+
}
286+
}
287+
}

infoschema/cache_test.go

+126
Original file line numberDiff line numberDiff line change
@@ -292,3 +292,129 @@ func TestCacheWithSchemaTsZero(t *testing.T) {
292292
checkFn(85, 100, true)
293293
require.Equal(t, 16, ic.Size())
294294
}
295+
296+
func TestCacheWithSchemaTsZero(t *testing.T) {
297+
ic := infoschema.NewCache(16)
298+
require.NotNil(t, ic)
299+
300+
for i := 1; i <= 8; i++ {
301+
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i))
302+
}
303+
304+
checkFn := func(start, end int64, exist bool) {
305+
require.True(t, start <= end)
306+
latestSchemaVersion := ic.GetLatest().SchemaMetaVersion()
307+
for ts := start; ts <= end; ts++ {
308+
is := ic.GetBySnapshotTS(uint64(ts))
309+
if exist {
310+
require.NotNil(t, is, fmt.Sprintf("ts %d", ts))
311+
if ts > latestSchemaVersion {
312+
require.Equal(t, latestSchemaVersion, is.SchemaMetaVersion(), fmt.Sprintf("ts %d", ts))
313+
} else {
314+
require.Equal(t, ts, is.SchemaMetaVersion(), fmt.Sprintf("ts %d", ts))
315+
}
316+
} else {
317+
require.Nil(t, is, fmt.Sprintf("ts %d", ts))
318+
}
319+
}
320+
}
321+
checkFn(1, 8, true)
322+
checkFn(8, 10, true)
323+
324+
// mock for meet error There is no Write MVCC info for the schema version
325+
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 9), 0)
326+
checkFn(1, 7, true)
327+
checkFn(8, 9, false)
328+
checkFn(9, 10, false)
329+
330+
for i := 10; i <= 16; i++ {
331+
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i))
332+
checkFn(1, 7, true)
333+
checkFn(8, 9, false)
334+
checkFn(10, 16, true)
335+
}
336+
require.Equal(t, 16, ic.Size())
337+
338+
// refill the cache
339+
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 9), 9)
340+
checkFn(1, 16, true)
341+
require.Equal(t, 16, ic.Size())
342+
343+
// Test more than capacity
344+
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 17), 17)
345+
checkFn(1, 1, false)
346+
checkFn(2, 17, true)
347+
checkFn(2, 20, true)
348+
require.Equal(t, 16, ic.Size())
349+
350+
// Test for there is a hole in the middle.
351+
ic = infoschema.NewCache(16)
352+
353+
// mock for restart with full load the latest version schema.
354+
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 100), 100)
355+
checkFn(1, 99, false)
356+
checkFn(100, 100, true)
357+
358+
for i := 1; i <= 16; i++ {
359+
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i))
360+
}
361+
checkFn(1, 1, false)
362+
checkFn(2, 15, true)
363+
checkFn(16, 16, false)
364+
checkFn(100, 100, true)
365+
require.Equal(t, 16, ic.Size())
366+
367+
for i := 85; i < 100; i++ {
368+
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i))
369+
}
370+
checkFn(1, 84, false)
371+
checkFn(85, 100, true)
372+
require.Equal(t, 16, ic.Size())
373+
374+
// Test cache with schema version hole, which is cause by schema version doesn't has related schema-diff.
375+
ic = infoschema.NewCache(16)
376+
require.NotNil(t, ic)
377+
for i := 1; i <= 8; i++ {
378+
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, int64(i)), uint64(i))
379+
}
380+
checkFn(1, 10, true)
381+
// mock for schema version hole, schema-version 9 is missing.
382+
ic.Insert(infoschema.MockInfoSchemaWithSchemaVer(nil, 10), 10)
383+
checkFn(1, 7, true)
384+
// without empty schema version map, get snapshot by ts 8, 9 will both failed.
385+
checkFn(8, 9, false)
386+
checkFn(10, 10, true)
387+
// add empty schema version 9.
388+
ic.InsertEmptySchemaVersion(9)
389+
// after set empty schema version, get snapshot by ts 8, 9 will both success.
390+
checkFn(1, 8, true)
391+
checkFn(10, 10, true)
392+
is := ic.GetBySnapshotTS(uint64(9))
393+
require.NotNil(t, is)
394+
// since schema version 9 is empty, so get by ts 9 will get schema which version is 8.
395+
require.Equal(t, int64(8), is.SchemaMetaVersion())
396+
}
397+
398+
func TestCacheEmptySchemaVersion(t *testing.T) {
399+
ic := infoschema.NewCache(16)
400+
require.NotNil(t, ic)
401+
require.Equal(t, 0, len(ic.GetEmptySchemaVersions()))
402+
for i := 0; i < 16; i++ {
403+
ic.InsertEmptySchemaVersion(int64(i))
404+
}
405+
emptyVersions := ic.GetEmptySchemaVersions()
406+
require.Equal(t, 16, len(emptyVersions))
407+
for i := 0; i < 16; i++ {
408+
_, ok := emptyVersions[int64(i)]
409+
require.True(t, ok)
410+
}
411+
for i := 16; i < 20; i++ {
412+
ic.InsertEmptySchemaVersion(int64(i))
413+
}
414+
emptyVersions = ic.GetEmptySchemaVersions()
415+
require.Equal(t, 16, len(emptyVersions))
416+
for i := 4; i < 20; i++ {
417+
_, ok := emptyVersions[int64(i)]
418+
require.True(t, ok)
419+
}
420+
}

0 commit comments

Comments
 (0)