@@ -113,7 +113,6 @@ func (s *SQLLog) compactor(interval time.Duration) {
113
113
targetCompactRev , _ := s .CurrentRevision (s .ctx )
114
114
logrus .Tracef ("COMPACT starting compactRev=%d targetCompactRev=%d" , compactRev , targetCompactRev )
115
115
116
- outer:
117
116
for {
118
117
select {
119
118
case <- s .ctx .Done ():
@@ -127,14 +126,20 @@ outer:
127
126
// (several hundred ms) just for the database to execute the subquery to select the revisions to delete.
128
127
129
128
var (
129
+ resultLabel string
130
130
iterCompactRev int64
131
+ iterStart time.Time
132
+ iterCount int64
131
133
compactedRev int64
132
134
currentRev int64
133
135
err error
134
136
)
135
137
138
+ resultLabel = metrics .ResultSuccess
136
139
iterCompactRev = compactRev
137
140
compactedRev = compactRev
141
+ iterStart = time .Now ()
142
+ iterCount = 0
138
143
139
144
for iterCompactRev < targetCompactRev {
140
145
// Set move iteration target compactBatchSize revisions forward, or
@@ -145,54 +150,74 @@ outer:
145
150
iterCompactRev = targetCompactRev
146
151
}
147
152
148
- compactedRev , currentRev , err = s .compact (compactedRev , iterCompactRev )
149
- if err != nil {
150
- // ErrCompacted indicates that no further work is necessary - either compactRev changed since the
151
- // last iteration because another client has compacted, or the requested revision has already been compacted.
152
- if err == server .ErrCompacted {
153
- break
154
- }
155
- logrus .Errorf ("Compact failed: %v" , err )
156
- metrics .CompactTotal .WithLabelValues (metrics .ResultError ).Inc ()
157
- continue outer
153
+ // only update the compacted and current revisions if they are valid,
154
+ // but break out of the inner loop on any error.
155
+ compacted , current , cerr := s .compact (compactedRev , iterCompactRev )
156
+ if compacted != 0 && current != 0 {
157
+ compactedRev = compacted
158
+ currentRev = current
158
159
}
160
+ if cerr != nil {
161
+ err = cerr
162
+ break
163
+ }
164
+ iterCount ++
159
165
}
160
166
161
- if err := s .postCompact (); err != nil {
162
- logrus .Errorf ("Post-compact operations failed: %v" , err )
167
+ if iterCount > 0 {
168
+ logrus .Infof ("COMPACT compacted from %d to %d in %d transactions over %s" , compactRev , compactedRev , iterCount , time .Now ().Sub (iterStart ).Round (time .Millisecond ))
169
+
170
+ // post-compact operation errors are not critical, but should be reported
171
+ if perr := s .postCompact (); perr != nil {
172
+ logrus .Errorf ("Post-compact operations failed: %v" , perr )
173
+ }
163
174
}
164
175
165
- // Record the final results for the outer loop
176
+ // Store the final results for this compact interval.
177
+ // Note that one or more of the small-batch compact transactions
178
+ // may have succeeded and moved the compact revision forward, even if err is non-nil.
166
179
compactRev = compactedRev
167
180
targetCompactRev = currentRev
168
181
169
- metrics .CompactTotal .WithLabelValues (metrics .ResultSuccess ).Inc ()
182
+ // ErrCompacted indicates that no further work is necessary - either compactRev changed since the
183
+ // last iteration because another client has compacted, or the requested revision has already been compacted.
184
+ if err != nil && err != server .ErrCompacted {
185
+ logrus .Errorf ("Compact failed: %v" , err )
186
+ resultLabel = metrics .ResultError
187
+ }
188
+ metrics .CompactTotal .WithLabelValues (resultLabel ).Inc ()
170
189
}
171
190
}
172
191
173
- // compact removes deleted or replaced rows from the database. compactRev is the revision that was last compacted to.
174
- // If this changes between compactions, we know that someone else has compacted and we don't need to do it.
175
- // targetCompactRev is the revision that we should try to compact to. Upon success, the function returns the revision
176
- // compacted to, and the revision that we should try to compact to next time (the current revision).
177
- // This logic is directly cribbed from k8s.io/apiserver/pkg/storage/etcd3/compact.go
192
+ // compact removes deleted or replaced rows from the database, and updates the compact rev key.
193
+ // compactRev is the current compact revision; targetCompactRev is the revision to compact to.
194
+ // If compactRev does not match what's in the database, we know that someone else has compacted and we don't need to do it.
195
+ // Deletion of rows and update of the compact rev key is done within a single transaction. The transaction is rolled back on any error.
196
+ //
197
+ // On success, the function returns the revision compacted to, and the revision that we should try to compact to next time (the current revision).
198
+ // ErrCompacted is returned if the current revision is stale, or the target revision has already been compacted.
199
+ // In this case the compact and current revisions from the database are returned.
200
+ // On any other error, the returned compact and current revisions should not be used.
201
+ //
202
+ // This logic is cribbed from k8s.io/apiserver/pkg/storage/etcd3/compact.go
178
203
func (s * SQLLog ) compact (compactRev int64 , targetCompactRev int64 ) (int64 , int64 , error ) {
179
204
ctx , cancel := context .WithTimeout (s .ctx , compactTimeout )
180
205
defer cancel ()
181
206
182
207
t , err := s .d .BeginTx (ctx , & sql.TxOptions {Isolation : sql .LevelSerializable })
183
208
if err != nil {
184
- return compactRev , targetCompactRev , errors .Wrap (err , "failed to begin transaction" )
209
+ return 0 , 0 , errors .Wrap (err , "failed to begin transaction" )
185
210
}
186
211
defer t .MustRollback ()
187
212
188
213
currentRev , err := t .CurrentRevision (s .ctx )
189
214
if err != nil {
190
- return compactRev , targetCompactRev , errors .Wrap (err , "failed to get current revision" )
215
+ return 0 , 0 , errors .Wrap (err , "failed to get current revision" )
191
216
}
192
217
193
218
dbCompactRev , err := t .GetCompactRevision (s .ctx )
194
219
if err != nil {
195
- return compactRev , targetCompactRev , errors .Wrap (err , "failed to get compact revision" )
220
+ return 0 , 0 , errors .Wrap (err , "failed to get compact revision" )
196
221
}
197
222
198
223
// Check to see if another node already compacted. This is normal on a multi-server cluster.
@@ -206,7 +231,7 @@ func (s *SQLLog) compact(compactRev int64, targetCompactRev int64) (int64, int64
206
231
207
232
// Don't bother compacting to a revision that has already been compacted
208
233
if targetCompactRev <= compactRev {
209
- logrus .Infof ("COMPACT revision %d has already been compacted" , targetCompactRev )
234
+ logrus .Tracef ("COMPACT revision %d has already been compacted" , targetCompactRev )
210
235
return dbCompactRev , currentRev , server .ErrCompacted
211
236
}
212
237
@@ -215,13 +240,16 @@ func (s *SQLLog) compact(compactRev int64, targetCompactRev int64) (int64, int64
215
240
start := time .Now ()
216
241
deletedRows , err := t .Compact (s .ctx , targetCompactRev )
217
242
if err != nil {
218
- return compactRev , targetCompactRev , errors .Wrapf (err , "failed to compact to revision %d" , targetCompactRev )
243
+ return 0 , 0 , errors .Wrapf (err , "failed to compact to revision %d" , targetCompactRev )
219
244
}
220
245
221
246
if err := t .SetCompactRevision (s .ctx , targetCompactRev ); err != nil {
222
- return compactRev , targetCompactRev , errors .Wrap (err , "failed to record compact revision" )
247
+ return 0 , 0 , errors .Wrap (err , "failed to record compact revision" )
223
248
}
224
249
250
+ // only commit the transaction if we make it all the way through deleting and
251
+ // updating the compact revision without any errors. The deferred rollback
252
+ // becomes a no-op if the transaction is committed.
225
253
t .MustCommit ()
226
254
logrus .Infof ("COMPACT deleted %d rows from %d revisions in %s - compacted to %d/%d" , deletedRows , (targetCompactRev - compactRev ), time .Since (start ), targetCompactRev , currentRev )
227
255
0 commit comments