@@ -29,7 +29,7 @@ type kvJobManager struct {
29
29
// TODO(0x5459): Consider putting `txn` into context?
30
30
func (tm * kvJobManager ) filter (ctx context.Context , txn kvstore.TxnExt , state core.WdPoStJobState , limit uint32 , f func (* core.WdPoStJob ) bool ) (jobs []* core.WdPoStJob , err error ) {
31
31
var it kvstore.Iter
32
- it , err = txn .Scan ([] byte (makeWdPoStPrefix (state )))
32
+ it , err = txn .Scan (kvstore . Prefix (makeWdPoStPrefix (state )))
33
33
if err != nil {
34
34
return
35
35
}
@@ -134,7 +134,7 @@ func (tm *kvJobManager) Create(ctx context.Context, deadlineIdx uint64, partitio
134
134
CreatedAt : uint64 (now ),
135
135
UpdatedAt : uint64 (now ),
136
136
}
137
- return txn .PutJson ([] byte (makeWdPoStKey (core .WdPoStJobReadyToRun , jobID )), & job )
137
+ return txn .PutJson (kvstore . Key (makeWdPoStKey (core .WdPoStJobReadyToRun , jobID )), & job )
138
138
})
139
139
140
140
if err == nil {
@@ -163,7 +163,7 @@ func (tm *kvJobManager) AllocateJobs(ctx context.Context, spec core.AllocateWdPo
163
163
now := uint64 (time .Now ().Unix ())
164
164
for _ , job := range readyToRun {
165
165
// Moving ready to run jobs to running jobs
166
- if err := txn .Del ([] byte (makeWdPoStKey (core .WdPoStJobReadyToRun , job .ID ))); err != nil {
166
+ if err := txn .Del (kvstore . Key (makeWdPoStKey (core .WdPoStJobReadyToRun , job .ID ))); err != nil {
167
167
return err
168
168
}
169
169
job .State = string (core .WdPoStJobRunning )
@@ -172,7 +172,7 @@ func (tm *kvJobManager) AllocateJobs(ctx context.Context, spec core.AllocateWdPo
172
172
job .WorkerName = workerName
173
173
job .HeartbeatAt = now
174
174
job .UpdatedAt = now
175
- if err := txn .PutJson ([] byte (makeWdPoStKey (core .WdPoStJobRunning , job .ID )), job ); err != nil {
175
+ if err := txn .PutJson (kvstore . Key (makeWdPoStKey (core .WdPoStJobRunning , job .ID )), job ); err != nil {
176
176
return err
177
177
}
178
178
allocatedJobs = append (allocatedJobs , & core.WdPoStAllocatedJob {
@@ -196,16 +196,28 @@ func (tm *kvJobManager) Heartbeat(ctx context.Context, jobIDs []string, workerNa
196
196
err := tm .kv .UpdateMustNoConflict (ctx , func (txn kvstore.TxnExt ) error {
197
197
for _ , jobID := range jobIDs {
198
198
var job core.WdPoStJob
199
- if err := txn .Peek ([]byte (makeWdPoStKey (core .WdPoStJobRunning , jobID )), kvstore .LoadJSON (& job )); err != nil {
199
+ key , err := txn .PeekAny (
200
+ kvstore .LoadJSON (& job ),
201
+ kvstore .Key (makeWdPoStKey (core .WdPoStJobRunning , jobID )),
202
+ kvstore .Key (makeWdPoStKey (core .WdPoStJobReadyToRun , jobID )),
203
+ kvstore .Key (makeWdPoStKey (core .WdPoStJobFinished , jobID )),
204
+ )
205
+ if err != nil {
206
+ return err
207
+ }
208
+
209
+ if err := txn .Del (key ); err != nil {
200
210
return err
201
211
}
212
+
202
213
if job .StartedAt == 0 {
203
214
job .StartedAt = now
204
215
}
205
216
job .HeartbeatAt = now
206
217
job .WorkerName = workerName
218
+ job .State = string (core .WdPoStJobRunning )
207
219
job .UpdatedAt = now
208
- if err := txn .PutJson ([] byte (makeWdPoStKey (core .WdPoStJobRunning , jobID )), & job ); err != nil {
220
+ if err := txn .PutJson (kvstore . Key (makeWdPoStKey (core .WdPoStJobRunning , jobID )), & job ); err != nil {
209
221
return err
210
222
}
211
223
}
@@ -219,12 +231,17 @@ func (tm *kvJobManager) Heartbeat(ctx context.Context, jobIDs []string, workerNa
219
231
220
232
func (tm * kvJobManager ) Finish (ctx context.Context , jobID string , output * stage.WindowPoStOutput , errorReason string ) error {
221
233
err := tm .kv .UpdateMustNoConflict (ctx , func (txn kvstore.TxnExt ) error {
222
- runningKey := []byte (makeWdPoStKey (core .WdPoStJobRunning , jobID ))
223
234
var job core.WdPoStJob
224
- if err := txn .Peek (runningKey , kvstore .LoadJSON (& job )); err != nil {
235
+ key , err := txn .PeekAny (
236
+ kvstore .LoadJSON (& job ),
237
+ kvstore .Key (makeWdPoStKey (core .WdPoStJobRunning , jobID )),
238
+ kvstore .Key (makeWdPoStKey (core .WdPoStJobReadyToRun , jobID )),
239
+ kvstore .Key (makeWdPoStKey (core .WdPoStJobFinished , jobID )),
240
+ )
241
+ if err != nil {
225
242
return err
226
243
}
227
- if err := txn .Del (runningKey ); err != nil {
244
+ if err := txn .Del (key ); err != nil {
228
245
return err
229
246
}
230
247
now := uint64 (time .Now ().Unix ())
@@ -233,7 +250,7 @@ func (tm *kvJobManager) Finish(ctx context.Context, jobID string, output *stage.
233
250
job .ErrorReason = errorReason
234
251
job .FinishedAt = now
235
252
job .UpdatedAt = now
236
- return txn .PutJson ([] byte (makeWdPoStKey (core .WdPoStJobFinished , jobID )), & job )
253
+ return txn .PutJson (kvstore . Key (makeWdPoStKey (core .WdPoStJobFinished , jobID )), & job )
237
254
})
238
255
239
256
if err == nil {
@@ -260,15 +277,15 @@ func (tm *kvJobManager) MakeJobsDie(ctx context.Context, heartbeatTimeout time.D
260
277
}
261
278
now := uint64 (time .Now ().Unix ())
262
279
for _ , job := range shouldDead {
263
- if err := txn .Del ([] byte (makeWdPoStKey (core .WdPoStJobRunning , job .ID ))); err != nil {
280
+ if err := txn .Del (kvstore . Key (makeWdPoStKey (core .WdPoStJobRunning , job .ID ))); err != nil {
264
281
return err
265
282
}
266
283
job .State = string (core .WdPoStJobFinished )
267
284
job .FinishedAt = now
268
285
job .Output = nil
269
286
job .ErrorReason = "heartbeat timeout"
270
287
job .UpdatedAt = now
271
- if err := txn .PutJson ([] byte (makeWdPoStKey (core .WdPoStJobFinished , job .ID )), job ); err != nil {
288
+ if err := txn .PutJson (kvstore . Key (makeWdPoStKey (core .WdPoStJobFinished , job .ID )), job ); err != nil {
272
289
return err
273
290
}
274
291
}
@@ -297,7 +314,7 @@ func (tm *kvJobManager) CleanupExpiredJobs(ctx context.Context, jobLifetime time
297
314
return err
298
315
}
299
316
for _ , job := range shouldClean {
300
- if err := txn .Del ([] byte (makeWdPoStKey (core .WdPoStJobFinished , job .ID ))); err != nil {
317
+ if err := txn .Del (kvstore . Key (makeWdPoStKey (core .WdPoStJobFinished , job .ID ))); err != nil {
301
318
return err
302
319
}
303
320
}
@@ -324,7 +341,7 @@ func (tm *kvJobManager) RetryFailedJobs(ctx context.Context, maxTry, limit uint3
324
341
}
325
342
now := uint64 (time .Now ().Unix ())
326
343
for _ , job := range shouldRetry {
327
- err := txn .Del ([] byte (makeWdPoStKey (core .WdPoStJobFinished , job .ID )))
344
+ err := txn .Del (kvstore . Key (makeWdPoStKey (core .WdPoStJobFinished , job .ID )))
328
345
if err != nil {
329
346
return err
330
347
}
@@ -334,7 +351,7 @@ func (tm *kvJobManager) RetryFailedJobs(ctx context.Context, maxTry, limit uint3
334
351
job .StartedAt = 0
335
352
job .FinishedAt = 0
336
353
job .UpdatedAt = now
337
- if err := txn .PutJson ([] byte (makeWdPoStKey (core .WdPoStJobReadyToRun , job .ID )), job ); err != nil {
354
+ if err := txn .PutJson (kvstore . Key (makeWdPoStKey (core .WdPoStJobReadyToRun , job .ID )), job ); err != nil {
338
355
return err
339
356
}
340
357
}
@@ -379,7 +396,7 @@ func (tm *kvJobManager) Reset(ctx context.Context, jobID string) error {
379
396
if err := txn .Del (key ); err != nil {
380
397
return err
381
398
}
382
- return txn .PutJson ([] byte (makeWdPoStKey (core .WdPoStJobReadyToRun , jobID )), & job )
399
+ return txn .PutJson (kvstore . Key (makeWdPoStKey (core .WdPoStJobReadyToRun , jobID )), & job )
383
400
})
384
401
385
402
if err == nil {
0 commit comments