Skip to content

Commit

Permalink
feat: Enhance JobCreatorController to manage single job lifecycle (#536)
Browse files Browse the repository at this point in the history
refactor: Enhance JobCreatorController to manage single job lifecycle

- Modify JobCreatorController to track a single job via jobID
- Update job offer and deal event handling to filter by specific job
- Improve result checking and processing with more robust error handling
- Remove deprecated job offer subscription filter method
- Simplify job tracking and event subscription logic
  • Loading branch information
hollygrimm authored Mar 7, 2025
1 parent 56b2789 commit cb6761a
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 74 deletions.
186 changes: 121 additions & 65 deletions pkg/jobcreator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ package jobcreator

import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"time"

"github.com/lilypad-tech/lilypad/pkg/data"
Expand All @@ -26,10 +23,11 @@ type JobOfferSubscriber func(offer data.JobOfferContainer)
type JobOfferSubscription struct {
// The callback function to call when a job offer is updated
Callback JobOfferSubscriber
// Optional job ID filter - if specified, only updates for this job will be sent
JobID string
}

// JobCreatorController manages a single job identified by jobID.
// It handles all aspects of job processing including subscriptions,
// deal management, and result handling for this specific job.
type JobCreatorController struct {
solverClient *solver.SolverClient
options JobCreatorOptions
Expand All @@ -39,6 +37,7 @@ type JobCreatorController struct {
log *system.ServiceLogger
jobOfferSubscriptions map[string]JobOfferSubscription
tracer trace.Tracer
jobID string // The single job this controller instance manages
}

// the background "even if we have not heard of an event" loop
Expand All @@ -48,6 +47,7 @@ type JobCreatorController struct {
const CONTROL_LOOP_INTERVAL = 10 * time.Second

func NewJobCreatorController(
jobID string,
options JobCreatorOptions,
web3SDK *web3.Web3SDK,
tracer trace.Tracer,
Expand Down Expand Up @@ -79,6 +79,7 @@ func NewJobCreatorController(
log: system.NewServiceLogger(system.JobCreatorService),
jobOfferSubscriptions: make(map[string]JobOfferSubscription),
tracer: tracer,
jobID: jobID,
}
return controller, nil
}
Expand All @@ -97,26 +98,29 @@ func NewJobCreatorController(

func (controller *JobCreatorController) AddJobOffer(offer data.JobOffer) (data.JobOfferContainer, error) {
controller.log.Debug("add job offer", offer)
return controller.solverClient.AddJobOffer(offer)
}
container, err := controller.solverClient.AddJobOffer(offer)
if err != nil {
return container, err
}

// SubscribeToJobOfferUpdates subscribes to all job offer updates
// Returns a function that can be called to unsubscribe
func (controller *JobCreatorController) SubscribeToJobOfferUpdates(sub JobOfferSubscriber) func() {
return controller.SubscribeToJobOfferUpdatesWithFilter(sub, "")
// Set the controller's jobID to track this specific job
controller.jobID = container.JobOffer.ID
controller.log.Debug("Set controller jobID", map[string]interface{}{
"jobID": controller.jobID,
})

return container, nil
}

// SubscribeToJobOfferUpdatesWithFilter subscribes to job offer updates with an optional job ID filter
// If jobID is not empty, only updates for that job will be sent to the subscriber
// SubscribeToJobOfferUpdates subscribes to job offer updates for this controller's job
// Returns a function that can be called to unsubscribe
func (controller *JobCreatorController) SubscribeToJobOfferUpdatesWithFilter(sub JobOfferSubscriber, jobID string) func() {
controller.jobOfferSubscriptions[jobID] = JobOfferSubscription{
func (controller *JobCreatorController) SubscribeToJobOfferUpdates(sub JobOfferSubscriber) func() {
controller.jobOfferSubscriptions[controller.jobID] = JobOfferSubscription{
Callback: sub,
JobID: jobID,
}

return func() {
delete(controller.jobOfferSubscriptions, jobID)
delete(controller.jobOfferSubscriptions, controller.jobID)
}
}

Expand All @@ -133,6 +137,18 @@ func (controller *JobCreatorController) SubscribeToJobOfferUpdatesWithFilter(sub
*/
func (controller *JobCreatorController) subscribeToSolver() error {
controller.solverClient.SubscribeEvents(func(ev solver.SolverEvent) {
// First check if this event is for our specific job
if controller.jobID != "" {
if ev.Deal != nil && ev.Deal.JobOffer != controller.jobID {
// Skip events for other jobs
return
}
if ev.JobOffer != nil && ev.JobOffer.JobOffer.ID != controller.jobID {
// Skip events for other jobs
return
}
}

if ev.EventType == "DealStateUpdated" {
metricsDashboard.TrackDeal(metricsDashboard.DealPayload{
ID: ev.Deal.ID,
Expand All @@ -141,6 +157,7 @@ func (controller *JobCreatorController) subscribeToSolver() error {
ResourceProvider: ev.Deal.ResourceProvider,
})
}

switch ev.EventType {
case solver.DealAdded:
if ev.Deal == nil {
Expand All @@ -153,10 +170,14 @@ func (controller *JobCreatorController) subscribeToSolver() error {
return
}

solver.ServiceLogSolverEvent(system.JobCreatorService, ev)
controller.log.Debug("Received deal event", map[string]interface{}{
"dealID": ev.Deal.ID,
"jobID": controller.jobID,
})

// trigger the solver
solver.ServiceLogSolverEvent(system.JobCreatorService, ev)
controller.loop.Trigger()

case solver.JobOfferStateUpdated:
if ev.JobOffer == nil {
controller.log.Error("solver event", fmt.Errorf("RP received nil job offer"))
Expand Down Expand Up @@ -279,15 +300,26 @@ func (controller *JobCreatorController) solve() error {

// list the deals we have been assigned to that we have not yet posted and agree tx to the contract for
func (controller *JobCreatorController) agreeToDeals() error {
// load the deals that are in DealNegotiating
// and do not have a TransactionsResourceProvider.Agree tx
// If no jobID is set, we shouldn't process any deals
if controller.jobID == "" {
return nil
}

matchedDeals, err := controller.solverClient.GetDealsWithFilter(
store.GetDealsQuery{
JobCreator: controller.web3SDK.GetAddress().String(),
State: "DealNegotiating",
},
// this is where the solver has found us a match and we need to agree to it
func(dealContainer data.DealContainer) bool {
// Only agree to deals for our specific job
if dealContainer.Deal.JobOffer.ID != controller.jobID {
controller.log.Debug("Skipping deal - wrong jobID", map[string]interface{}{
"dealJobID": dealContainer.Deal.JobOffer.ID,
"ourJobID": controller.jobID,
})
return false
}

return dealContainer.Transactions.JobCreator.Agree == ""
},
)
Expand Down Expand Up @@ -328,15 +360,40 @@ func (controller *JobCreatorController) agreeToDeals() error {
// we do this synchronously to prevent us racing with large result sets
// also we are the client so have a lower chance of there being a chunky backlog
func (controller *JobCreatorController) checkResults() error {
// load all deals in ResultsSubmitted state and don't have either results checked or accepted txs
controller.log.Debug("Checking results for jobID", controller.jobID)

// If no jobID is set, we shouldn't process any results
if controller.jobID == "" {
return nil
}

completedDeals, err := controller.solverClient.GetDealsWithFilter(
store.GetDealsQuery{
JobCreator: controller.web3SDK.GetAddress().String(),
State: "ResultsSubmitted",
},
// this is where the solver has found us a match and we need to agree to it
func(dealContainer data.DealContainer) bool {
return dealContainer.Transactions.JobCreator.AcceptResult == "" && dealContainer.Transactions.JobCreator.CheckResult == ""
// First check if this deal belongs to our job
if dealContainer.Deal.JobOffer.ID != controller.jobID {
controller.log.Debug("Skipping deal - wrong jobID", map[string]interface{}{
"dealJobID": dealContainer.Deal.JobOffer.ID,
"ourJobID": controller.jobID,
})
return false
}

// Then check if we haven't processed it yet
match := dealContainer.Transactions.JobCreator.AcceptResult == "" &&
dealContainer.Transactions.JobCreator.CheckResult == ""

controller.log.Debug("Filtering deal", map[string]interface{}{
"dealID": dealContainer.ID,
"dealJobOfferID": dealContainer.Deal.JobOffer.ID,
"controllerJobID": controller.jobID,
"match": match,
})

return match
},
)
if err != nil {
Expand All @@ -346,33 +403,50 @@ func (controller *JobCreatorController) checkResults() error {
return nil
}

// Process each deal
for _, dealContainer := range completedDeals {

result, err := controller.solverClient.GetResult(dealContainer.ID)
if err != nil || result.Error != "" {
// there is an error with the job
// accept anyway
// TODO: trigger mediation here
err := controller.acceptResult(dealContainer)
if err != nil {
controller.log.Error("failed to accept results", err)
return err
}
} else {
// We check for all completed deals, including deals whose results
// we have already downloaded. Check the download path and download
// if results do not exist.
downloadPath := solver.GetDownloadsFilePath(dealContainer.ID)
if _, err := os.Stat(downloadPath); errors.Is(err, fs.ErrNotExist) {
err := controller.downloadResult(dealContainer)
if err != nil {
controller.log.Error("failed to download results", err)
return err
}
}
if err != nil {
controller.log.Debug("failed to get result metadata", map[string]interface{}{
"dealID": dealContainer.ID,
})
controller.log.Error("failed to get result", err)
continue // Continue to next deal instead of returning
}

if result.Error != "" {
err := fmt.Errorf("result contains error: %s", result.Error)
controller.log.Debug("result error metadata", map[string]interface{}{
"dealID": dealContainer.ID,
})
controller.log.Error("result contains error", err)
continue // Continue to next deal
}

if result.DataID == "" {
err := fmt.Errorf("result missing DataID for deal %s", dealContainer.ID)
controller.log.Debug("missing DataID metadata", map[string]interface{}{
"dealID": dealContainer.ID,
})
controller.log.Error("result missing DataID", err)
continue // Continue to next deal
}

err = controller.downloadResult(dealContainer)
if err != nil {
controller.log.Error("failed to download results", err)
continue // Continue to next deal
}

err = controller.acceptResult(dealContainer)
if err != nil {
controller.log.Error("failed to accept results", err)
continue // Continue to next deal
}
}

return err
return nil
}

func (controller *JobCreatorController) downloadResult(dealContainer data.DealContainer) error {
Expand Down Expand Up @@ -430,21 +504,3 @@ func (controller *JobCreatorController) acceptResult(deal data.DealContainer) er

return nil
}

func (controller *JobCreatorController) checkResult(deal data.DealContainer) error {
controller.log.Debug("Checking results for job", deal.ID)
txHash, err := controller.web3SDK.CheckResult(deal.ID)
if err != nil {
return fmt.Errorf("error calling check result tx for deal: %s", err.Error())
}
controller.log.Debug("check result tx", txHash)

// we have agreed to the deal so we need to update the tx in the solver
_, err = controller.solverClient.UpdateTransactionsJobCreator(deal.ID, data.DealTransactionsJobCreator{
CheckResult: txHash,
})
if err != nil {
return fmt.Errorf("error adding CheckResult tx hash for deal: %s", err.Error())
}
return nil
}
7 changes: 1 addition & 6 deletions pkg/jobcreator/jobcreator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type JobCreator interface {
GetJobOfferFromOptions(options JobCreatorOfferOptions) (data.JobOffer, error)
AddJobOffer(offer data.JobOffer) (data.JobOfferContainer, error)
SubscribeToJobOfferUpdates(sub JobOfferSubscriber) func()
SubscribeToJobOfferUpdatesWithFilter(sub JobOfferSubscriber, jobID string) func()
GetResult(dealId string) (data.Result, error)
}

Expand All @@ -62,7 +61,7 @@ func NewJobCreator(
web3SDK *web3.Web3SDK,
tracer trace.Tracer,
) (*BasicJobCreator, error) {
controller, err := NewJobCreatorController(options, web3SDK, tracer)
controller, err := NewJobCreatorController("", options, web3SDK, tracer)
if err != nil {
return nil, err
}
Expand All @@ -89,10 +88,6 @@ func (jobCreator *BasicJobCreator) SubscribeToJobOfferUpdates(sub JobOfferSubscr
return jobCreator.controller.SubscribeToJobOfferUpdates(sub)
}

func (jobCreator *BasicJobCreator) SubscribeToJobOfferUpdatesWithFilter(sub JobOfferSubscriber, jobID string) func() {
return jobCreator.controller.SubscribeToJobOfferUpdatesWithFilter(sub, jobID)
}

func (jobCreator *BasicJobCreator) GetResult(dealId string) (data.Result, error) {
return jobCreator.controller.solverClient.GetResult(dealId)
}
2 changes: 1 addition & 1 deletion pkg/jobcreator/onchain_jobcreator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewOnChainJobCreator(
web3SDK *web3.Web3SDK,
tracer trace.Tracer,
) (*OnChainJobCreator, error) {
controller, err := NewJobCreatorController(options, web3SDK, tracer)
controller, err := NewJobCreatorController("", options, web3SDK, tracer)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobcreator/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func RunJob(
updateChan := make(chan data.JobOfferContainer)

// Set up the subscription BEFORE adding the job offer
cleanup := jobCreatorService.controller.SubscribeToJobOfferUpdatesWithFilter(func(evOffer data.JobOfferContainer) {
cleanup := jobCreatorService.controller.SubscribeToJobOfferUpdates(func(evOffer data.JobOfferContainer) {
span.AddEvent("job_offer_update",
trace.WithAttributes(attribute.String("job_offer_container.state", data.GetAgreementStateString(evOffer.State))))
updateChan <- evOffer
Expand All @@ -70,7 +70,7 @@ func RunJob(
if eventSub != nil {
eventSub(evOffer)
}
}, offer.ID)
})

// Ensure we clean up the subscription when we're done
defer cleanup()
Expand Down

0 comments on commit cb6761a

Please sign in to comment.