@@ -113,7 +113,6 @@ func (s *SQLLog) compactor(interval time.Duration) {
113
113
targetCompactRev , _ := s .CurrentRevision (s .ctx )
114
114
logrus .Tracef ("COMPACT starting compactRev=%d targetCompactRev=%d" , compactRev , targetCompactRev )
115
115
116
- outer:
117
116
for {
118
117
select {
119
118
case <- s .ctx .Done ():
@@ -127,12 +126,14 @@ outer:
127
126
// (several hundred ms) just for the database to execute the subquery to select the revisions to delete.
128
127
129
128
var (
129
+ resultLabel string
130
130
iterCompactRev int64
131
131
compactedRev int64
132
132
currentRev int64
133
133
err error
134
134
)
135
135
136
+ resultLabel = metrics .ResultSuccess
136
137
iterCompactRev = compactRev
137
138
compactedRev = compactRev
138
139
@@ -145,54 +146,69 @@ outer:
145
146
iterCompactRev = targetCompactRev
146
147
}
147
148
148
- compactedRev , currentRev , err = s . compact ( compactedRev , iterCompactRev )
149
- if err != nil {
150
- // ErrCompacted indicates that no further work is necessary - either compactRev changed since the
151
- // last iteration because another client has compacted, or the requested revision has already been compacted.
152
- if err == server . ErrCompacted {
153
- break
154
- }
155
- logrus . Errorf ( "Compact failed: %v" , err )
156
- metrics . CompactTotal . WithLabelValues ( metrics . ResultError ). Inc ()
157
- continue outer
149
+ // only update the compacted and current revisions if they are valid,
150
+ // but break out of the inner loop on any error.
151
+ compacted , current , cerr := s . compact ( compactedRev , iterCompactRev )
152
+ if compacted != 0 && current != 0 {
153
+ compactedRev = compacted
154
+ currentRev = current
155
+ }
156
+ if cerr != nil {
157
+ err = cerr
158
+ break
158
159
}
159
160
}
160
161
161
- if err := s .postCompact (); err != nil {
162
- logrus .Errorf ("Post-compact operations failed: %v" , err )
162
+ // post-compact operation errors are not critical, but should be reported
163
+ if perr := s .postCompact (); perr != nil {
164
+ logrus .Errorf ("Post-compact operations failed: %v" , perr )
163
165
}
164
166
165
- // Record the final results for the outer loop
167
+ // Store the final results for this compact interval.
168
+ // Note that one or more of the small-batch compact transactions
169
+ // may have succeeded and moved the compact revision forward, even if err is non-nil.
166
170
compactRev = compactedRev
167
171
targetCompactRev = currentRev
168
172
169
- metrics .CompactTotal .WithLabelValues (metrics .ResultSuccess ).Inc ()
173
+ // ErrCompacted indicates that no further work is necessary - either compactRev changed since the
174
+ // last iteration because another client has compacted, or the requested revision has already been compacted.
175
+ if err != nil && err != server .ErrCompacted {
176
+ logrus .Errorf ("Compact failed: %v" , err )
177
+ resultLabel = metrics .ResultError
178
+ }
179
+ metrics .CompactTotal .WithLabelValues (resultLabel ).Inc ()
170
180
}
171
181
}
172
182
173
- // compact removes deleted or replaced rows from the database. compactRev is the revision that was last compacted to.
174
- // If this changes between compactions, we know that someone else has compacted and we don't need to do it.
175
- // targetCompactRev is the revision that we should try to compact to. Upon success, the function returns the revision
176
- // compacted to, and the revision that we should try to compact to next time (the current revision).
177
- // This logic is directly cribbed from k8s.io/apiserver/pkg/storage/etcd3/compact.go
183
+ // compact removes deleted or replaced rows from the database, and updates the compact rev key.
184
+ // compactRev is the current compact revision; targetCompactRev is the revision to compact to.
185
+ // If compactRev does not match what's in the database, we know that someone else has compacted and we don't need to do it.
186
+ // Deletion of rows and update of the compact rev key is done within a single transaction. The transaction is rolled back on any error.
187
+ //
188
+ // On success, the function returns the revision compacted to, and the revision that we should try to compact to next time (the current revision).
189
+ // ErrCompacted is returned if the current revision is stale, or the target revision has already been compacted.
190
+ // In this case the compact and current revisions from the database are returned.
191
+ // On any other error, the returned compact and current revisions should not be used.
192
+ //
193
+ // This logic is cribbed from k8s.io/apiserver/pkg/storage/etcd3/compact.go
178
194
func (s * SQLLog ) compact (compactRev int64 , targetCompactRev int64 ) (int64 , int64 , error ) {
179
195
ctx , cancel := context .WithTimeout (s .ctx , compactTimeout )
180
196
defer cancel ()
181
197
182
198
t , err := s .d .BeginTx (ctx , & sql.TxOptions {Isolation : sql .LevelSerializable })
183
199
if err != nil {
184
- return compactRev , targetCompactRev , errors .Wrap (err , "failed to begin transaction" )
200
+ return 0 , 0 , errors .Wrap (err , "failed to begin transaction" )
185
201
}
186
202
defer t .MustRollback ()
187
203
188
204
currentRev , err := t .CurrentRevision (s .ctx )
189
205
if err != nil {
190
- return compactRev , targetCompactRev , errors .Wrap (err , "failed to get current revision" )
206
+ return 0 , 0 , errors .Wrap (err , "failed to get current revision" )
191
207
}
192
208
193
209
dbCompactRev , err := t .GetCompactRevision (s .ctx )
194
210
if err != nil {
195
- return compactRev , targetCompactRev , errors .Wrap (err , "failed to get compact revision" )
211
+ return 0 , 0 , errors .Wrap (err , "failed to get compact revision" )
196
212
}
197
213
198
214
// Check to see if another node already compacted. This is normal on a multi-server cluster.
@@ -206,7 +222,7 @@ func (s *SQLLog) compact(compactRev int64, targetCompactRev int64) (int64, int64
206
222
207
223
// Don't bother compacting to a revision that has already been compacted
208
224
if targetCompactRev <= compactRev {
209
- logrus .Infof ("COMPACT revision %d has already been compacted" , targetCompactRev )
225
+ logrus .Tracef ("COMPACT revision %d has already been compacted" , targetCompactRev )
210
226
return dbCompactRev , currentRev , server .ErrCompacted
211
227
}
212
228
@@ -215,13 +231,16 @@ func (s *SQLLog) compact(compactRev int64, targetCompactRev int64) (int64, int64
215
231
start := time .Now ()
216
232
deletedRows , err := t .Compact (s .ctx , targetCompactRev )
217
233
if err != nil {
218
- return compactRev , targetCompactRev , errors .Wrapf (err , "failed to compact to revision %d" , targetCompactRev )
234
+ return 0 , 0 , errors .Wrapf (err , "failed to compact to revision %d" , targetCompactRev )
219
235
}
220
236
221
237
if err := t .SetCompactRevision (s .ctx , targetCompactRev ); err != nil {
222
- return compactRev , targetCompactRev , errors .Wrap (err , "failed to record compact revision" )
238
+ return 0 , 0 , errors .Wrap (err , "failed to record compact revision" )
223
239
}
224
240
241
+ // only commit the transaction if we make it all the way through deleting and
242
+ // updating the compact revision without any errors. The deferred rollback
243
+ // becomes a no-op if the transaction is committed.
225
244
t .MustCommit ()
226
245
logrus .Infof ("COMPACT deleted %d rows from %d revisions in %s - compacted to %d/%d" , deletedRows , (targetCompactRev - compactRev ), time .Since (start ), targetCompactRev , currentRev )
227
246
0 commit comments