-
Notifications
You must be signed in to change notification settings - Fork 629
feat(prover): support multi tasks #1585
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
791fcaf
71acdb3
8ce5121
3e0589e
df92616
a75075d
1c5d88d
e4c0779
3da7567
37da7b8
4215b6b
3e97105
9e63761
707267a
0d1c303
6b5cf47
f957b0f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -4,6 +4,7 @@ import ( | |||||||||||||||||||||||||||||||||||||||||||||
| "context" | ||||||||||||||||||||||||||||||||||||||||||||||
| "encoding/json" | ||||||||||||||||||||||||||||||||||||||||||||||
| "fmt" | ||||||||||||||||||||||||||||||||||||||||||||||
| "strings" | ||||||||||||||||||||||||||||||||||||||||||||||
| "time" | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| "github.com/gin-gonic/gin" | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -63,29 +64,59 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat | |||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession | ||||||||||||||||||||||||||||||||||||||||||||||
| maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts | ||||||||||||||||||||||||||||||||||||||||||||||
| if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) { | ||||||||||||||||||||||||||||||||||||||||||||||
| unassignedBundleCount, getCountError := bp.bundleOrm.GetUnassignedBundleCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) | ||||||||||||||||||||||||||||||||||||||||||||||
| if getCountError != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
| log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) | ||||||||||||||||||||||||||||||||||||||||||||||
| return nil, ErrCoordinatorInternalFailure | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
| // Assign external prover if unassigned task number exceeds threshold | ||||||||||||||||||||||||||||||||||||||||||||||
| if unassignedBundleCount < bp.cfg.ProverManager.ExternalProverThreshold { | ||||||||||||||||||||||||||||||||||||||||||||||
| return nil, nil | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+67
to
+77
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix logging variable mismatch. - log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", err)
+ log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", getCountError)📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| var bundleTask *orm.Bundle | ||||||||||||||||||||||||||||||||||||||||||||||
| for i := 0; i < 5; i++ { | ||||||||||||||||||||||||||||||||||||||||||||||
| var getTaskError error | ||||||||||||||||||||||||||||||||||||||||||||||
| var tmpBundleTask *orm.Bundle | ||||||||||||||||||||||||||||||||||||||||||||||
| tmpBundleTask, getTaskError = bp.bundleOrm.GetAssignedBundle(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) | ||||||||||||||||||||||||||||||||||||||||||||||
| var assignedOffset, unassignedOffset = 0, 0 | ||||||||||||||||||||||||||||||||||||||||||||||
| tmpAssignedBundleTasks, getTaskError := bp.bundleOrm.GetAssignedBundles(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50) | ||||||||||||||||||||||||||||||||||||||||||||||
| if getTaskError != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
| log.Error("failed to get assigned bundle proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) | ||||||||||||||||||||||||||||||||||||||||||||||
| log.Error("failed to get assigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) | ||||||||||||||||||||||||||||||||||||||||||||||
| return nil, ErrCoordinatorInternalFailure | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| // Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned` | ||||||||||||||||||||||||||||||||||||||||||||||
| // bundle to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. | ||||||||||||||||||||||||||||||||||||||||||||||
| if tmpBundleTask == nil { | ||||||||||||||||||||||||||||||||||||||||||||||
| tmpBundleTask, getTaskError = bp.bundleOrm.GetUnassignedBundle(ctx.Copy(), maxActiveAttempts, maxTotalAttempts) | ||||||||||||||||||||||||||||||||||||||||||||||
| // chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. | ||||||||||||||||||||||||||||||||||||||||||||||
| tmpUnassignedBundleTask, getTaskError := bp.bundleOrm.GetUnassignedBundles(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50) | ||||||||||||||||||||||||||||||||||||||||||||||
| if getTaskError != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
| log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) | ||||||||||||||||||||||||||||||||||||||||||||||
| return nil, ErrCoordinatorInternalFailure | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
| for { | ||||||||||||||||||||||||||||||||||||||||||||||
| tmpBundleTask = nil | ||||||||||||||||||||||||||||||||||||||||||||||
| if assignedOffset < len(tmpAssignedBundleTasks) { | ||||||||||||||||||||||||||||||||||||||||||||||
| tmpBundleTask = tmpAssignedBundleTasks[assignedOffset] | ||||||||||||||||||||||||||||||||||||||||||||||
| assignedOffset++ | ||||||||||||||||||||||||||||||||||||||||||||||
| } else if unassignedOffset < len(tmpUnassignedBundleTask) { | ||||||||||||||||||||||||||||||||||||||||||||||
| tmpBundleTask = tmpUnassignedBundleTask[unassignedOffset] | ||||||||||||||||||||||||||||||||||||||||||||||
| unassignedOffset++ | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| if tmpBundleTask == nil { | ||||||||||||||||||||||||||||||||||||||||||||||
| log.Debug("get empty bundle", "height", getTaskParameter.ProverHeight) | ||||||||||||||||||||||||||||||||||||||||||||||
| return nil, nil | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| // Don't dispatch the same failing job to the same prover | ||||||||||||||||||||||||||||||||||||||||||||||
| proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBundleTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) | ||||||||||||||||||||||||||||||||||||||||||||||
| if getTaskError != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
| log.Error("failed to get unassigned bundle proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) | ||||||||||||||||||||||||||||||||||||||||||||||
| log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBundleTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError) | ||||||||||||||||||||||||||||||||||||||||||||||
| return nil, ErrCoordinatorInternalFailure | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| if tmpBundleTask == nil { | ||||||||||||||||||||||||||||||||||||||||||||||
| log.Debug("get empty bundle", "height", getTaskParameter.ProverHeight) | ||||||||||||||||||||||||||||||||||||||||||||||
| return nil, nil | ||||||||||||||||||||||||||||||||||||||||||||||
| if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid { | ||||||||||||||||||||||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| rowsAffected, updateAttemptsErr := bp.bundleOrm.UpdateBundleAttempts(ctx.Copy(), tmpBundleTask.Hash, tmpBundleTask.ActiveAttempts, tmpBundleTask.TotalAttempts) | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -4,6 +4,7 @@ import ( | |||||||||||||||||||||||||||||||||||||||||||||
| "context" | ||||||||||||||||||||||||||||||||||||||||||||||
| "encoding/json" | ||||||||||||||||||||||||||||||||||||||||||||||
| "fmt" | ||||||||||||||||||||||||||||||||||||||||||||||
| "strings" | ||||||||||||||||||||||||||||||||||||||||||||||
| "time" | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| "github.com/gin-gonic/gin" | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -61,29 +62,59 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato | |||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| maxActiveAttempts := cp.cfg.ProverManager.ProversPerSession | ||||||||||||||||||||||||||||||||||||||||||||||
| maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts | ||||||||||||||||||||||||||||||||||||||||||||||
| if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) { | ||||||||||||||||||||||||||||||||||||||||||||||
| unassignedChunkCount, getCountError := cp.chunkOrm.GetUnassignedChunkCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight) | ||||||||||||||||||||||||||||||||||||||||||||||
| if getCountError != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
| log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err) | ||||||||||||||||||||||||||||||||||||||||||||||
| return nil, ErrCoordinatorInternalFailure | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
| // Assign external prover if unassigned task number exceeds threshold | ||||||||||||||||||||||||||||||||||||||||||||||
| if unassignedChunkCount < cp.cfg.ProverManager.ExternalProverThreshold { | ||||||||||||||||||||||||||||||||||||||||||||||
| return nil, nil | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+65
to
+75
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix logging variable mismatch. - log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err)
+ log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", getCountError)📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| var chunkTask *orm.Chunk | ||||||||||||||||||||||||||||||||||||||||||||||
| for i := 0; i < 5; i++ { | ||||||||||||||||||||||||||||||||||||||||||||||
| var getTaskError error | ||||||||||||||||||||||||||||||||||||||||||||||
| var tmpChunkTask *orm.Chunk | ||||||||||||||||||||||||||||||||||||||||||||||
| tmpChunkTask, getTaskError = cp.chunkOrm.GetAssignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight) | ||||||||||||||||||||||||||||||||||||||||||||||
| var assignedOffset, unassignedOffset = 0, 0 | ||||||||||||||||||||||||||||||||||||||||||||||
| tmpAssignedChunkTasks, getTaskError := cp.chunkOrm.GetAssignedChunks(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight, 50) | ||||||||||||||||||||||||||||||||||||||||||||||
| if getTaskError != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
| log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) | ||||||||||||||||||||||||||||||||||||||||||||||
| return nil, ErrCoordinatorInternalFailure | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| // Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned` | ||||||||||||||||||||||||||||||||||||||||||||||
| // chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql. | ||||||||||||||||||||||||||||||||||||||||||||||
| if tmpChunkTask == nil { | ||||||||||||||||||||||||||||||||||||||||||||||
| tmpChunkTask, getTaskError = cp.chunkOrm.GetUnassignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight) | ||||||||||||||||||||||||||||||||||||||||||||||
| tmpUnassignedChunkTask, getTaskError := cp.chunkOrm.GetUnassignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight, 50) | ||||||||||||||||||||||||||||||||||||||||||||||
| if getTaskError != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
| log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) | ||||||||||||||||||||||||||||||||||||||||||||||
| return nil, ErrCoordinatorInternalFailure | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
| for { | ||||||||||||||||||||||||||||||||||||||||||||||
| tmpChunkTask = nil | ||||||||||||||||||||||||||||||||||||||||||||||
| if assignedOffset < len(tmpAssignedChunkTasks) { | ||||||||||||||||||||||||||||||||||||||||||||||
| tmpChunkTask = tmpAssignedChunkTasks[assignedOffset] | ||||||||||||||||||||||||||||||||||||||||||||||
| assignedOffset++ | ||||||||||||||||||||||||||||||||||||||||||||||
| } else if unassignedOffset < len(tmpUnassignedChunkTask) { | ||||||||||||||||||||||||||||||||||||||||||||||
| tmpChunkTask = tmpUnassignedChunkTask[unassignedOffset] | ||||||||||||||||||||||||||||||||||||||||||||||
| unassignedOffset++ | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| if tmpChunkTask == nil { | ||||||||||||||||||||||||||||||||||||||||||||||
| log.Debug("get empty chunk", "height", getTaskParameter.ProverHeight) | ||||||||||||||||||||||||||||||||||||||||||||||
| return nil, nil | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| // Don't dispatch the same failing job to the same prover | ||||||||||||||||||||||||||||||||||||||||||||||
| proverTask, getTaskError := cp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion) | ||||||||||||||||||||||||||||||||||||||||||||||
| if getTaskError != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
| log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError) | ||||||||||||||||||||||||||||||||||||||||||||||
| log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeChunk.String(), "taskID", tmpChunkTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError) | ||||||||||||||||||||||||||||||||||||||||||||||
| return nil, ErrCoordinatorInternalFailure | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| if tmpChunkTask == nil { | ||||||||||||||||||||||||||||||||||||||||||||||
| log.Debug("get empty chunk", "height", getTaskParameter.ProverHeight) | ||||||||||||||||||||||||||||||||||||||||||||||
| return nil, nil | ||||||||||||||||||||||||||||||||||||||||||||||
| if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid { | ||||||||||||||||||||||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx.Copy(), tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts) | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix logging variable mismatch.
At line 70, you log
errinstead ofgetCountError, potentially hiding the real error.📝 Committable suggestion