@@ -30,6 +30,9 @@ type InfoCache struct {
30
30
mu sync.RWMutex
31
31
// cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order
32
32
cache []schemaAndTimestamp
33
+
34
+ // emptySchemaVersions stores schema version which has no schema_diff.
35
+ emptySchemaVersions map [int64 ]struct {}
33
36
}
34
37
35
38
type schemaAndTimestamp struct {
@@ -40,10 +43,18 @@ type schemaAndTimestamp struct {
40
43
// NewCache creates a new InfoCache.
41
44
func NewCache (capacity int ) * InfoCache {
42
45
return & InfoCache {
43
- cache : make ([]schemaAndTimestamp , 0 , capacity ),
46
+ cache : make ([]schemaAndTimestamp , 0 , capacity ),
47
+ emptySchemaVersions : make (map [int64 ]struct {}),
44
48
}
45
49
}
46
50
51
+ // Size returns the size of the cache, export for test.
52
+ func (h * InfoCache ) Size () int {
53
+ h .mu .Lock ()
54
+ defer h .mu .Unlock ()
55
+ return len (h .cache )
56
+ }
57
+
47
58
// Reset resets the cache.
48
59
func (h * InfoCache ) Reset (capacity int ) {
49
60
h .mu .Lock ()
@@ -63,6 +74,11 @@ func (h *InfoCache) GetLatest() InfoSchema {
63
74
return nil
64
75
}
65
76
77
+ // GetEmptySchemaVersions returns emptySchemaVersions, exports for testing.
78
+ func (h * InfoCache ) GetEmptySchemaVersions () map [int64 ]struct {} {
79
+ return h .emptySchemaVersions
80
+ }
81
+
66
82
func (h * InfoCache ) getSchemaByTimestampNoLock (ts uint64 ) (InfoSchema , bool ) {
67
83
logutil .BgLogger ().Debug ("SCHEMA CACHE get schema" , zap .Uint64 ("timestamp" , ts ))
68
84
// search one by one instead of binary search, because the timestamp of a schema could be 0
@@ -80,11 +96,32 @@ func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) {
80
96
// the first element is the latest schema, so we can return it directly.
81
97
return is .infoschema , true
82
98
}
83
- if h .cache [i - 1 ].infoschema .SchemaMetaVersion () == is .infoschema .SchemaMetaVersion ()+ 1 && uint64 (h .cache [i - 1 ].timestamp ) > ts {
84
- // This first condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10,
85
- // but current(cache[i]) schema-version is not 9, then current schema is not suitable for ts.
86
- // The second condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts.
87
- return is .infoschema , true
99
+
100
+ if uint64 (h .cache [i - 1 ].timestamp ) > ts {
101
+ // The first condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts.
102
+ lastVersion := h .cache [i - 1 ].infoschema .SchemaMetaVersion ()
103
+ currentVersion := is .infoschema .SchemaMetaVersion ()
104
+ if lastVersion == currentVersion + 1 {
105
+ // This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10,
106
+ // but current(cache[i]) schema-version is not 9, then current schema may not suitable for ts.
107
+ return is .infoschema , true
108
+ }
109
+ if lastVersion > currentVersion {
110
+ found := true
111
+ for ver := currentVersion + 1 ; ver < lastVersion ; ver ++ {
112
+ _ , ok := h .emptySchemaVersions [ver ]
113
+ if ! ok {
114
+ found = false
115
+ break
116
+ }
117
+ }
118
+ if found {
119
+ // This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, and
120
+ // current(cache[i]) schema-version is 8, then there is a gap exist, and if all the gap version can be found in cache.emptySchemaVersions
121
+ // which means those gap versions don't have schema info, then current schema is also suitable for ts.
122
+ return is .infoschema , true
123
+ }
124
+ }
88
125
}
89
126
// current schema is not suitable for ts, then break the loop to avoid the unnecessary search.
90
127
break
@@ -194,3 +231,25 @@ func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool {
194
231
195
232
return true
196
233
}
234
+
235
+ // InsertEmptySchemaVersion inserts empty schema version into a map. If exceeded the cache capacity, remove the oldest version.
236
+ func (h * InfoCache ) InsertEmptySchemaVersion (version int64 ) {
237
+ h .mu .Lock ()
238
+ defer h .mu .Unlock ()
239
+
240
+ h .emptySchemaVersions [version ] = struct {}{}
241
+ if len (h .emptySchemaVersions ) > cap (h .cache ) {
242
+ // remove oldest version.
243
+ versions := make ([]int64 , 0 , len (h .emptySchemaVersions ))
244
+ for ver := range h .emptySchemaVersions {
245
+ versions = append (versions , ver )
246
+ }
247
+ sort .Slice (versions , func (i , j int ) bool { return versions [i ] < versions [j ] })
248
+ for _ , ver := range versions {
249
+ delete (h .emptySchemaVersions , ver )
250
+ if len (h .emptySchemaVersions ) <= cap (h .cache ) {
251
+ break
252
+ }
253
+ }
254
+ }
255
+ }
0 commit comments