@@ -40,6 +40,9 @@ type InfoCache struct {
40
40
mu sync.RWMutex
41
41
// cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order
42
42
cache []schemaAndTimestamp
43
+
44
+ // emptySchemaVersions stores schema version which has no schema_diff.
45
+ emptySchemaVersions map [int64 ]struct {}
43
46
}
44
47
45
48
type schemaAndTimestamp struct {
@@ -50,7 +53,8 @@ type schemaAndTimestamp struct {
50
53
// NewCache creates a new InfoCache.
51
54
func NewCache (capacity int ) * InfoCache {
52
55
return & InfoCache {
53
- cache : make ([]schemaAndTimestamp , 0 , capacity ),
56
+ cache : make ([]schemaAndTimestamp , 0 , capacity ),
57
+ emptySchemaVersions : make (map [int64 ]struct {}),
54
58
}
55
59
}
56
60
@@ -85,6 +89,11 @@ func (h *InfoCache) Len() int {
85
89
return len (h .cache )
86
90
}
87
91
92
+ // GetEmptySchemaVersions returns emptySchemaVersions, exports for testing.
93
+ func (h * InfoCache ) GetEmptySchemaVersions () map [int64 ]struct {} {
94
+ return h .emptySchemaVersions
95
+ }
96
+
88
97
func (h * InfoCache ) getSchemaByTimestampNoLock (ts uint64 ) (InfoSchema , bool ) {
89
98
logutil .BgLogger ().Debug ("SCHEMA CACHE get schema" , zap .Uint64 ("timestamp" , ts ))
90
99
// search one by one instead of binary search, because the timestamp of a schema could be 0
@@ -102,11 +111,32 @@ func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, bool) {
102
111
// the first element is the latest schema, so we can return it directly.
103
112
return is .infoschema , true
104
113
}
105
- if h .cache [i - 1 ].infoschema .SchemaMetaVersion () == is .infoschema .SchemaMetaVersion ()+ 1 && uint64 (h .cache [i - 1 ].timestamp ) > ts {
106
- // This first condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10,
107
- // but current(cache[i]) schema-version is not 9, then current schema is not suitable for ts.
108
- // The second condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts.
109
- return is .infoschema , true
114
+
115
+ if uint64 (h .cache [i - 1 ].timestamp ) > ts {
116
+ // The first condition is to make sure the cache[i-1].timestamp > ts >= cache[i].timestamp, then the current schema is suitable for ts.
117
+ lastVersion := h .cache [i - 1 ].infoschema .SchemaMetaVersion ()
118
+ currentVersion := is .infoschema .SchemaMetaVersion ()
119
+ if lastVersion == currentVersion + 1 {
120
+ // This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10,
121
+ // but current(cache[i]) schema-version is not 9, then current schema may not suitable for ts.
122
+ return is .infoschema , true
123
+ }
124
+ if lastVersion > currentVersion {
125
+ found := true
126
+ for ver := currentVersion + 1 ; ver < lastVersion ; ver ++ {
127
+ _ , ok := h .emptySchemaVersions [ver ]
128
+ if ! ok {
129
+ found = false
130
+ break
131
+ }
132
+ }
133
+ if found {
134
+ // This condition is to make sure the schema version is continuous. If last(cache[i-1]) schema-version is 10, and
135
+ // current(cache[i]) schema-version is 8, then there is a gap exist, and if all the gap version can be found in cache.emptySchemaVersions
136
+ // which means those gap versions don't have schema info, then current schema is also suitable for ts.
137
+ return is .infoschema , true
138
+ }
139
+ }
110
140
}
111
141
// current schema is not suitable for ts, then break the loop to avoid the unnecessary search.
112
142
break
@@ -216,3 +246,25 @@ func (h *InfoCache) Insert(is InfoSchema, schemaTS uint64) bool {
216
246
217
247
return true
218
248
}
249
+
250
+ // InsertEmptySchemaVersion inserts empty schema version into a map. If exceeded the cache capacity, remove the oldest version.
251
+ func (h * InfoCache ) InsertEmptySchemaVersion (version int64 ) {
252
+ h .mu .Lock ()
253
+ defer h .mu .Unlock ()
254
+
255
+ h .emptySchemaVersions [version ] = struct {}{}
256
+ if len (h .emptySchemaVersions ) > cap (h .cache ) {
257
+ // remove oldest version.
258
+ versions := make ([]int64 , 0 , len (h .emptySchemaVersions ))
259
+ for ver := range h .emptySchemaVersions {
260
+ versions = append (versions , ver )
261
+ }
262
+ sort .Slice (versions , func (i , j int ) bool { return versions [i ] < versions [j ] })
263
+ for _ , ver := range versions {
264
+ delete (h .emptySchemaVersions , ver )
265
+ if len (h .emptySchemaVersions ) <= cap (h .cache ) {
266
+ break
267
+ }
268
+ }
269
+ }
270
+ }
0 commit comments