From cb6761aac3addaa898a97218c55cb2e33d44908c Mon Sep 17 00:00:00 2001 From: Holly Grimm Date: Thu, 6 Mar 2025 18:11:42 -0700 Subject: [PATCH] feat: Enhance JobCreatorController to manage single job lifecycle (#536) refactor: Enhance JobCreatorController to manage single job lifecycle - Modify JobCreatorController to track a single job via jobID - Update job offer and deal event handling to filter by specific job - Improve result checking and processing with more robust error handling - Remove deprecated job offer subscription filter method - Simplify job tracking and event subscription logic --- pkg/jobcreator/controller.go | 186 +++++++++++++++++---------- pkg/jobcreator/jobcreator.go | 7 +- pkg/jobcreator/onchain_jobcreator.go | 2 +- pkg/jobcreator/run.go | 4 +- 4 files changed, 125 insertions(+), 74 deletions(-) diff --git a/pkg/jobcreator/controller.go b/pkg/jobcreator/controller.go index 80fdb594..e0bc1825 100644 --- a/pkg/jobcreator/controller.go +++ b/pkg/jobcreator/controller.go @@ -2,10 +2,7 @@ package jobcreator import ( "context" - "errors" "fmt" - "io/fs" - "os" "time" "github.com/lilypad-tech/lilypad/pkg/data" @@ -26,10 +23,11 @@ type JobOfferSubscriber func(offer data.JobOfferContainer) type JobOfferSubscription struct { // The callback function to call when a job offer is updated Callback JobOfferSubscriber - // Optional job ID filter - if specified, only updates for this job will be sent - JobID string } +// JobCreatorController manages a single job identified by jobID. +// It handles all aspects of job processing including subscriptions, +// deal management, and result handling for this specific job. type JobCreatorController struct { solverClient *solver.SolverClient options JobCreatorOptions @@ -39,6 +37,7 @@ type JobCreatorController struct { log *system.ServiceLogger jobOfferSubscriptions map[string]JobOfferSubscription tracer trace.Tracer + jobID string // The single job this controller instance manages } // the background "even if we have not heard of an event" loop @@ -48,6 +47,7 @@ type JobCreatorController struct { const CONTROL_LOOP_INTERVAL = 10 * time.Second func NewJobCreatorController( + jobID string, options JobCreatorOptions, web3SDK *web3.Web3SDK, tracer trace.Tracer, @@ -79,6 +79,7 @@ func NewJobCreatorController( log: system.NewServiceLogger(system.JobCreatorService), jobOfferSubscriptions: make(map[string]JobOfferSubscription), tracer: tracer, + jobID: jobID, } return controller, nil } @@ -97,26 +98,29 @@ func NewJobCreatorController( func (controller *JobCreatorController) AddJobOffer(offer data.JobOffer) (data.JobOfferContainer, error) { controller.log.Debug("add job offer", offer) - return controller.solverClient.AddJobOffer(offer) -} + container, err := controller.solverClient.AddJobOffer(offer) + if err != nil { + return container, err + } -// SubscribeToJobOfferUpdates subscribes to all job offer updates -// Returns a function that can be called to unsubscribe -func (controller *JobCreatorController) SubscribeToJobOfferUpdates(sub JobOfferSubscriber) func() { - return controller.SubscribeToJobOfferUpdatesWithFilter(sub, "") + // Set the controller's jobID to track this specific job + controller.jobID = container.JobOffer.ID + controller.log.Debug("Set controller jobID", map[string]interface{}{ + "jobID": controller.jobID, + }) + + return container, nil } -// SubscribeToJobOfferUpdatesWithFilter subscribes to job offer updates with an optional job ID filter -// If jobID is not empty, only updates for that job will be sent to the subscriber +// SubscribeToJobOfferUpdates subscribes to job offer updates for this controller's job // Returns a function that can be called to unsubscribe -func (controller *JobCreatorController) SubscribeToJobOfferUpdatesWithFilter(sub JobOfferSubscriber, jobID string) func() { - controller.jobOfferSubscriptions[jobID] = JobOfferSubscription{ +func (controller *JobCreatorController) SubscribeToJobOfferUpdates(sub JobOfferSubscriber) func() { + controller.jobOfferSubscriptions[controller.jobID] = JobOfferSubscription{ Callback: sub, - JobID: jobID, } return func() { - delete(controller.jobOfferSubscriptions, jobID) + delete(controller.jobOfferSubscriptions, controller.jobID) } } @@ -133,6 +137,18 @@ func (controller *JobCreatorController) SubscribeToJobOfferUpdatesWithFilter(sub */ func (controller *JobCreatorController) subscribeToSolver() error { controller.solverClient.SubscribeEvents(func(ev solver.SolverEvent) { + // First check if this event is for our specific job + if controller.jobID != "" { + if ev.Deal != nil && ev.Deal.JobOffer != controller.jobID { + // Skip events for other jobs + return + } + if ev.JobOffer != nil && ev.JobOffer.JobOffer.ID != controller.jobID { + // Skip events for other jobs + return + } + } + if ev.EventType == "DealStateUpdated" { metricsDashboard.TrackDeal(metricsDashboard.DealPayload{ ID: ev.Deal.ID, @@ -141,6 +157,7 @@ func (controller *JobCreatorController) subscribeToSolver() error { ResourceProvider: ev.Deal.ResourceProvider, }) } + switch ev.EventType { case solver.DealAdded: if ev.Deal == nil { @@ -153,10 +170,14 @@ func (controller *JobCreatorController) subscribeToSolver() error { return } - solver.ServiceLogSolverEvent(system.JobCreatorService, ev) + controller.log.Debug("Received deal event", map[string]interface{}{ + "dealID": ev.Deal.ID, + "jobID": controller.jobID, + }) - // trigger the solver + solver.ServiceLogSolverEvent(system.JobCreatorService, ev) controller.loop.Trigger() + case solver.JobOfferStateUpdated: if ev.JobOffer == nil { controller.log.Error("solver event", fmt.Errorf("RP received nil job offer")) @@ -279,15 +300,26 @@ func (controller *JobCreatorController) solve() error { // list the deals we have been assigned to that we have not yet posted and agree tx to the contract for func (controller *JobCreatorController) agreeToDeals() error { - // load the deals that are in DealNegotiating - // and do not have a TransactionsResourceProvider.Agree tx + // If no jobID is set, we shouldn't process any deals + if controller.jobID == "" { + return nil + } + matchedDeals, err := controller.solverClient.GetDealsWithFilter( store.GetDealsQuery{ JobCreator: controller.web3SDK.GetAddress().String(), State: "DealNegotiating", }, - // this is where the solver has found us a match and we need to agree to it func(dealContainer data.DealContainer) bool { + // Only agree to deals for our specific job + if dealContainer.Deal.JobOffer.ID != controller.jobID { + controller.log.Debug("Skipping deal - wrong jobID", map[string]interface{}{ + "dealJobID": dealContainer.Deal.JobOffer.ID, + "ourJobID": controller.jobID, + }) + return false + } + return dealContainer.Transactions.JobCreator.Agree == "" }, ) @@ -328,15 +360,40 @@ func (controller *JobCreatorController) agreeToDeals() error { // we do this synchronously to prevent us racing with large result sets // also we are the client so have a lower chance of there being a chunky backlog func (controller *JobCreatorController) checkResults() error { - // load all deals in ResultsSubmitted state and don't have either results checked or accepted txs + controller.log.Debug("Checking results for jobID", controller.jobID) + + // If no jobID is set, we shouldn't process any results + if controller.jobID == "" { + return nil + } + completedDeals, err := controller.solverClient.GetDealsWithFilter( store.GetDealsQuery{ JobCreator: controller.web3SDK.GetAddress().String(), State: "ResultsSubmitted", }, - // this is where the solver has found us a match and we need to agree to it func(dealContainer data.DealContainer) bool { - return dealContainer.Transactions.JobCreator.AcceptResult == "" && dealContainer.Transactions.JobCreator.CheckResult == "" + // First check if this deal belongs to our job + if dealContainer.Deal.JobOffer.ID != controller.jobID { + controller.log.Debug("Skipping deal - wrong jobID", map[string]interface{}{ + "dealJobID": dealContainer.Deal.JobOffer.ID, + "ourJobID": controller.jobID, + }) + return false + } + + // Then check if we haven't processed it yet + match := dealContainer.Transactions.JobCreator.AcceptResult == "" && + dealContainer.Transactions.JobCreator.CheckResult == "" + + controller.log.Debug("Filtering deal", map[string]interface{}{ + "dealID": dealContainer.ID, + "dealJobOfferID": dealContainer.Deal.JobOffer.ID, + "controllerJobID": controller.jobID, + "match": match, + }) + + return match }, ) if err != nil { @@ -346,33 +403,50 @@ func (controller *JobCreatorController) checkResults() error { return nil } + // Process each deal for _, dealContainer := range completedDeals { + result, err := controller.solverClient.GetResult(dealContainer.ID) - if err != nil || result.Error != "" { - // there is an error with the job - // accept anyway - // TODO: trigger mediation here - err := controller.acceptResult(dealContainer) - if err != nil { - controller.log.Error("failed to accept results", err) - return err - } - } else { - // We check for all completed deals, including deals whose results - // we have already downloaded. Check the download path and download - // if results do not exist. - downloadPath := solver.GetDownloadsFilePath(dealContainer.ID) - if _, err := os.Stat(downloadPath); errors.Is(err, fs.ErrNotExist) { - err := controller.downloadResult(dealContainer) - if err != nil { - controller.log.Error("failed to download results", err) - return err - } - } + if err != nil { + controller.log.Debug("failed to get result metadata", map[string]interface{}{ + "dealID": dealContainer.ID, + }) + controller.log.Error("failed to get result", err) + continue // Continue to next deal instead of returning + } + + if result.Error != "" { + err := fmt.Errorf("result contains error: %s", result.Error) + controller.log.Debug("result error metadata", map[string]interface{}{ + "dealID": dealContainer.ID, + }) + controller.log.Error("result contains error", err) + continue // Continue to next deal + } + + if result.DataID == "" { + err := fmt.Errorf("result missing DataID for deal %s", dealContainer.ID) + controller.log.Debug("missing DataID metadata", map[string]interface{}{ + "dealID": dealContainer.ID, + }) + controller.log.Error("result missing DataID", err) + continue // Continue to next deal + } + + err = controller.downloadResult(dealContainer) + if err != nil { + controller.log.Error("failed to download results", err) + continue // Continue to next deal + } + + err = controller.acceptResult(dealContainer) + if err != nil { + controller.log.Error("failed to accept results", err) + continue // Continue to next deal } } - return err + return nil } func (controller *JobCreatorController) downloadResult(dealContainer data.DealContainer) error { @@ -430,21 +504,3 @@ func (controller *JobCreatorController) acceptResult(deal data.DealContainer) er return nil } - -func (controller *JobCreatorController) checkResult(deal data.DealContainer) error { - controller.log.Debug("Checking results for job", deal.ID) - txHash, err := controller.web3SDK.CheckResult(deal.ID) - if err != nil { - return fmt.Errorf("error calling check result tx for deal: %s", err.Error()) - } - controller.log.Debug("check result tx", txHash) - - // we have agreed to the deal so we need to update the tx in the solver - _, err = controller.solverClient.UpdateTransactionsJobCreator(deal.ID, data.DealTransactionsJobCreator{ - CheckResult: txHash, - }) - if err != nil { - return fmt.Errorf("error adding CheckResult tx hash for deal: %s", err.Error()) - } - return nil -} diff --git a/pkg/jobcreator/jobcreator.go b/pkg/jobcreator/jobcreator.go index 7c906b00..dca82ddd 100644 --- a/pkg/jobcreator/jobcreator.go +++ b/pkg/jobcreator/jobcreator.go @@ -48,7 +48,6 @@ type JobCreator interface { GetJobOfferFromOptions(options JobCreatorOfferOptions) (data.JobOffer, error) AddJobOffer(offer data.JobOffer) (data.JobOfferContainer, error) SubscribeToJobOfferUpdates(sub JobOfferSubscriber) func() - SubscribeToJobOfferUpdatesWithFilter(sub JobOfferSubscriber, jobID string) func() GetResult(dealId string) (data.Result, error) } @@ -62,7 +61,7 @@ func NewJobCreator( web3SDK *web3.Web3SDK, tracer trace.Tracer, ) (*BasicJobCreator, error) { - controller, err := NewJobCreatorController(options, web3SDK, tracer) + controller, err := NewJobCreatorController("", options, web3SDK, tracer) if err != nil { return nil, err } @@ -89,10 +88,6 @@ func (jobCreator *BasicJobCreator) SubscribeToJobOfferUpdates(sub JobOfferSubscr return jobCreator.controller.SubscribeToJobOfferUpdates(sub) } -func (jobCreator *BasicJobCreator) SubscribeToJobOfferUpdatesWithFilter(sub JobOfferSubscriber, jobID string) func() { - return jobCreator.controller.SubscribeToJobOfferUpdatesWithFilter(sub, jobID) -} - func (jobCreator *BasicJobCreator) GetResult(dealId string) (data.Result, error) { return jobCreator.controller.solverClient.GetResult(dealId) } diff --git a/pkg/jobcreator/onchain_jobcreator.go b/pkg/jobcreator/onchain_jobcreator.go index e4c841d3..9084c3a2 100644 --- a/pkg/jobcreator/onchain_jobcreator.go +++ b/pkg/jobcreator/onchain_jobcreator.go @@ -29,7 +29,7 @@ func NewOnChainJobCreator( web3SDK *web3.Web3SDK, tracer trace.Tracer, ) (*OnChainJobCreator, error) { - controller, err := NewJobCreatorController(options, web3SDK, tracer) + controller, err := NewJobCreatorController("", options, web3SDK, tracer) if err != nil { return nil, err } diff --git a/pkg/jobcreator/run.go b/pkg/jobcreator/run.go index 784af279..cb2879d5 100644 --- a/pkg/jobcreator/run.go +++ b/pkg/jobcreator/run.go @@ -61,7 +61,7 @@ func RunJob( updateChan := make(chan data.JobOfferContainer) // Set up the subscription BEFORE adding the job offer - cleanup := jobCreatorService.controller.SubscribeToJobOfferUpdatesWithFilter(func(evOffer data.JobOfferContainer) { + cleanup := jobCreatorService.controller.SubscribeToJobOfferUpdates(func(evOffer data.JobOfferContainer) { span.AddEvent("job_offer_update", trace.WithAttributes(attribute.String("job_offer_container.state", data.GetAgreementStateString(evOffer.State)))) updateChan <- evOffer @@ -70,7 +70,7 @@ func RunJob( if eventSub != nil { eventSub(evOffer) } - }, offer.ID) + }) // Ensure we clean up the subscription when we're done defer cleanup()