@@ -40,6 +40,9 @@ type InfoCache struct {
40
40
mu sync.RWMutex
41
41
// cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order
42
42
cache []schemaAndTimestamp
43
+
44
+ // emptySchemaVersions stores schema version which has no schema_diff.
45
+ emptySchemaVersions map [int64 ]struct {}
43
46
}
44
47
45
48
type schemaAndTimestamp struct {
@@ -50,7 +53,8 @@ type schemaAndTimestamp struct {
50
53
// NewCache creates a new InfoCache.
51
54
func NewCache (capacity int ) * InfoCache {
52
55
return & InfoCache {
53
- cache : make ([]schemaAndTimestamp , 0 , capacity ),
56
+ cache : make ([]schemaAndTimestamp , 0 , capacity ),
57
+ emptySchemaVersions : make (map [int64 ]struct {}),
54
58
}
55
59
}
56
60
@@ -102,6 +106,11 @@ func (h *InfoCache) Len() int {
102
106
return len (h .cache )
103
107
}
104
108
109
+ // GetEmptySchemaVersions returns emptySchemaVersions, exports for testing.
110
+ func (h * InfoCache ) GetEmptySchemaVersions () map [int64 ]struct {} {
111
+ return h .emptySchemaVersions
112
+ }
113
+
105
114
func (h * InfoCache ) getSchemaByTimestampNoLock (ts uint64 ) (InfoSchema , bool ) {
106
115
logutil .BgLogger ().Debug ("SCHEMA CACHE get schema" , zap .Uint64 ("timestamp" , ts ))
107
116
// search one by one instead of binary search, because the timestamp of a schema could be 0
@@ -115,8 +124,32 @@ func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) {
115
124
if i == 0 {
116
125
return is .infoschema , true
117
126
}
118
- if h .cache [i - 1 ].infoschema .SchemaMetaVersion () == is .infoschema .SchemaMetaVersion ()+ 1 && uint64 (h .cache [i - 1 ].timestamp ) > ts {
119
- return is .infoschema , true
127
+
128
+ if uint64 (h .cache [i - 1 ].timestamp ) > ts {
129
+ // The first condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts.
130
+ lastVersion := h .cache [i - 1 ].infoschema .SchemaMetaVersion ()
131
+ currentVersion := is .infoschema .SchemaMetaVersion ()
132
+ if lastVersion == currentVersion + 1 {
133
+ // This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10,
134
+ // but current(cache[i]) schema-version is not 9, then current schema may not suitable for ts.
135
+ return is .infoschema , true
136
+ }
137
+ if lastVersion > currentVersion {
138
+ found := true
139
+ for ver := currentVersion + 1 ; ver < lastVersion ; ver ++ {
140
+ _ , ok := h .emptySchemaVersions [ver ]
141
+ if ! ok {
142
+ found = false
143
+ break
144
+ }
145
+ }
146
+ if found {
147
+ // This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, and
148
+ // current(cache[i]) schema-version is 8, then there is a gap exist, and if all the gap version can be found in cache.emptySchemaVersions
149
+ // which means those gap versions don't have schema info, then current schema is also suitable for ts.
150
+ return is .infoschema , true
151
+ }
152
+ }
120
153
}
121
154
break
122
155
}
@@ -225,3 +258,25 @@ func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool {
225
258
226
259
return true
227
260
}
261
+
262
+ // InsertEmptySchemaVersion inserts empty schema version into a map. If exceeded the cache capacity, remove the oldest version.
263
+ func (h * InfoCache ) InsertEmptySchemaVersion (version int64 ) {
264
+ h .mu .Lock ()
265
+ defer h .mu .Unlock ()
266
+
267
+ h .emptySchemaVersions [version ] = struct {}{}
268
+ if len (h .emptySchemaVersions ) > cap (h .cache ) {
269
+ // remove oldest version.
270
+ versions := make ([]int64 , 0 , len (h .emptySchemaVersions ))
271
+ for ver := range h .emptySchemaVersions {
272
+ versions = append (versions , ver )
273
+ }
274
+ sort .Slice (versions , func (i , j int ) bool { return versions [i ] < versions [j ] })
275
+ for _ , ver := range versions {
276
+ delete (h .emptySchemaVersions , ver )
277
+ if len (h .emptySchemaVersions ) <= cap (h .cache ) {
278
+ break
279
+ }
280
+ }
281
+ }
282
+ }
0 commit comments