From 491eb58c7496267b1fb1ea5b664c3c70f198eec4 Mon Sep 17 00:00:00 2001 From: Holly Grimm Date: Mon, 24 Feb 2025 16:26:37 -0700 Subject: [PATCH 1/4] feat: Enhance job offer subscription with filtering and unsubscribe support - Add JobOfferSubscription struct to support optional job ID filtering - Modify SubscribeToJobOfferUpdates to return an unsubscribe function - Introduce new SubscribeToJobOfferUpdatesWithFilter method - Update job offer update mechanism to respect job ID filters - Refactor JobCreator to use an interface for better extensibility --- pkg/jobcreator/controller.go | 45 +++++++++++++++++++++++++++++++----- pkg/jobcreator/jobcreator.go | 38 +++++++++++++++++++----------- 2 files changed, 63 insertions(+), 20 deletions(-) diff --git a/pkg/jobcreator/controller.go b/pkg/jobcreator/controller.go index ea6b08cc..6a4fb8d0 100644 --- a/pkg/jobcreator/controller.go +++ b/pkg/jobcreator/controller.go @@ -19,8 +19,17 @@ import ( "go.opentelemetry.io/otel/trace" ) +// JobOfferSubscriber is a function that gets called when a job offer is updated type JobOfferSubscriber func(offer data.JobOfferContainer) +// JobOfferSubscription represents a subscription to job offer updates +type JobOfferSubscription struct { + // The callback function to call when a job offer is updated + Callback JobOfferSubscriber + // Optional job ID filter - if specified, only updates for this job will be sent + JobID string +} + type JobCreatorController struct { solverClient *solver.SolverClient options JobCreatorOptions @@ -28,7 +37,8 @@ type JobCreatorController struct { web3Events *web3.EventChannels loop *system.ControlLoop log *system.ServiceLogger - jobOfferSubscriptions []JobOfferSubscriber + jobOfferSubscriptions map[uint64]JobOfferSubscription + nextSubID uint64 tracer trace.Tracer } @@ -68,7 +78,8 @@ func NewJobCreatorController( web3SDK: web3SDK, web3Events: web3.NewEventChannels(), log: system.NewServiceLogger(system.JobCreatorService), - jobOfferSubscriptions: []JobOfferSubscriber{}, + jobOfferSubscriptions: make(map[uint64]JobOfferSubscription), + nextSubID: 0, tracer: tracer, } return controller, nil @@ -91,8 +102,26 @@ func (controller *JobCreatorController) AddJobOffer(offer data.JobOffer) (data.J return controller.solverClient.AddJobOffer(offer) } -func (controller *JobCreatorController) SubscribeToJobOfferUpdates(sub JobOfferSubscriber) { - controller.jobOfferSubscriptions = append(controller.jobOfferSubscriptions, sub) +// SubscribeToJobOfferUpdates subscribes to all job offer updates +// Returns a function that can be called to unsubscribe +func (controller *JobCreatorController) SubscribeToJobOfferUpdates(sub JobOfferSubscriber) func() { + return controller.SubscribeToJobOfferUpdatesWithFilter(sub, "") +} + +// SubscribeToJobOfferUpdatesWithFilter subscribes to job offer updates with an optional job ID filter +// If jobID is not empty, only updates for that job will be sent to the subscriber +// Returns a function that can be called to unsubscribe +func (controller *JobCreatorController) SubscribeToJobOfferUpdatesWithFilter(sub JobOfferSubscriber, jobID string) func() { + id := controller.nextSubID + controller.nextSubID++ + controller.jobOfferSubscriptions[id] = JobOfferSubscription{ + Callback: sub, + JobID: jobID, + } + + return func() { + delete(controller.jobOfferSubscriptions, id) + } } /* @@ -138,8 +167,13 @@ func (controller *JobCreatorController) subscribeToSolver() error { return } metricsDashboard.TrackJobOfferUpdate(*ev.JobOffer) + + // Make a copy of the subscriptions to avoid modification during iteration for _, sub := range controller.jobOfferSubscriptions { - go sub(*ev.JobOffer) + // If JobID filter is set, only send updates for that job + if sub.JobID == "" || sub.JobID == ev.JobOffer.JobOffer.ID { + sub.Callback(*ev.JobOffer) + } } } }) @@ -398,7 +432,6 @@ func (controller *JobCreatorController) acceptResult(deal data.DealContainer) er return fmt.Errorf("error adding AcceptResult tx hash for deal: %s", err.Error()) } - return nil } diff --git a/pkg/jobcreator/jobcreator.go b/pkg/jobcreator/jobcreator.go index 94354880..7c906b00 100644 --- a/pkg/jobcreator/jobcreator.go +++ b/pkg/jobcreator/jobcreator.go @@ -43,46 +43,56 @@ type JobCreatorOptions struct { Telemetry system.TelemetryOptions } -type JobCreator struct { - web3SDK *web3.Web3SDK - options JobCreatorOptions +type JobCreator interface { + Start(ctx context.Context, cm *system.CleanupManager) chan error + GetJobOfferFromOptions(options JobCreatorOfferOptions) (data.JobOffer, error) + AddJobOffer(offer data.JobOffer) (data.JobOfferContainer, error) + SubscribeToJobOfferUpdates(sub JobOfferSubscriber) func() + SubscribeToJobOfferUpdatesWithFilter(sub JobOfferSubscriber, jobID string) func() + GetResult(dealId string) (data.Result, error) +} + +type BasicJobCreator struct { controller *JobCreatorController + web3SDK *web3.Web3SDK } func NewJobCreator( options JobCreatorOptions, web3SDK *web3.Web3SDK, tracer trace.Tracer, -) (*JobCreator, error) { +) (*BasicJobCreator, error) { controller, err := NewJobCreatorController(options, web3SDK, tracer) if err != nil { return nil, err } - jc := &JobCreator{ + return &BasicJobCreator{ controller: controller, - options: options, web3SDK: web3SDK, - } - return jc, nil + }, nil } -func (jobCreator *JobCreator) Start(ctx context.Context, cm *system.CleanupManager) chan error { +func (jobCreator *BasicJobCreator) Start(ctx context.Context, cm *system.CleanupManager) chan error { return jobCreator.controller.Start(ctx, cm) } -func (jobCreator *JobCreator) GetJobOfferFromOptions(options JobCreatorOfferOptions) (data.JobOffer, error) { +func (jobCreator *BasicJobCreator) GetJobOfferFromOptions(options JobCreatorOfferOptions) (data.JobOffer, error) { return getJobOfferFromOptions(options, jobCreator.web3SDK.GetAddress().String()) } // adds the job offer to the solver -func (jobCreator *JobCreator) AddJobOffer(offer data.JobOffer) (data.JobOfferContainer, error) { +func (jobCreator *BasicJobCreator) AddJobOffer(offer data.JobOffer) (data.JobOfferContainer, error) { return jobCreator.controller.AddJobOffer(offer) } -func (jobCreator *JobCreator) SubscribeToJobOfferUpdates(sub JobOfferSubscriber) { - jobCreator.controller.SubscribeToJobOfferUpdates(sub) +func (jobCreator *BasicJobCreator) SubscribeToJobOfferUpdates(sub JobOfferSubscriber) func() { + return jobCreator.controller.SubscribeToJobOfferUpdates(sub) +} + +func (jobCreator *BasicJobCreator) SubscribeToJobOfferUpdatesWithFilter(sub JobOfferSubscriber, jobID string) func() { + return jobCreator.controller.SubscribeToJobOfferUpdatesWithFilter(sub, jobID) } -func (jobCreator *JobCreator) GetResult(dealId string) (data.Result, error) { +func (jobCreator *BasicJobCreator) GetResult(dealId string) (data.Result, error) { return jobCreator.controller.solverClient.GetResult(dealId) } From 941352cc5afc0e0fd46186a7e7952d604216afcf Mon Sep 17 00:00:00 2001 From: Holly Grimm Date: Mon, 24 Feb 2025 18:30:54 -0700 Subject: [PATCH 2/4] feat: Improve job offer update handling and logging - Modify job offer update subscription to use filtered subscription - Add support for optional event subscriber - Enhance logging with job container ID - Print data ID immediately upon job result --- pkg/jobcreator/run.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/pkg/jobcreator/run.go b/pkg/jobcreator/run.go index 30044d7e..a4c6528e 100644 --- a/pkg/jobcreator/run.go +++ b/pkg/jobcreator/run.go @@ -35,8 +35,6 @@ func RunJob( return nil, err } - jobCreatorService.SubscribeToJobOfferUpdates(eventSub) - jobCreatorErrors := jobCreatorService.Start(ctx.Ctx, ctx.Cm) // let's process our options into an actual job offer @@ -68,7 +66,7 @@ func RunJob( span.RecordError(err) return nil, err } - jobCreatorService.controller.log.Debug("job offer ID", jobOfferContainer.ID) + jobCreatorService.controller.log.Debug("job offer container ID", jobOfferContainer.ID) span.AddEvent("add_job_offer.done", trace.WithAttributes( attribute.String("job_offer_container.deal_id", jobOfferContainer.DealID), @@ -79,14 +77,20 @@ func RunJob( updateChan := make(chan data.JobOfferContainer) - jobCreatorService.SubscribeToJobOfferUpdates(func(evOffer data.JobOfferContainer) { - if evOffer.JobOffer.ID != jobOfferContainer.ID { - return - } + // Now we use the filtered subscriber to only get events for this specific job + cleanup := jobCreatorService.controller.SubscribeToJobOfferUpdatesWithFilter(func(evOffer data.JobOfferContainer) { span.AddEvent("job_offer_update", trace.WithAttributes(attribute.String("job_offer_container.state", data.GetAgreementStateString(evOffer.State)))) updateChan <- evOffer - }) + + // Additionally call the provided eventSub if one was passed in + if eventSub != nil { + eventSub(evOffer) + } + }, jobOfferContainer.JobOffer.ID) + + // Ensure we clean up the subscription when we're done + defer cleanup() var finalJobOffer data.JobOfferContainer @@ -132,6 +136,9 @@ waitloop: } span.AddEvent("get_result.done", trace.WithAttributes(attribute.String("result.deal_id", result.DealID))) + // Print the result immediately when we get it + fmt.Printf("🆔 Data ID for job %s: %s\n", finalJobOffer.JobOffer.ID, result.DataID) + return &RunJobResults{ JobOffer: finalJobOffer, Result: result, From dcf62f353cd764a4f1a212c81e9e94dea12012d7 Mon Sep 17 00:00:00 2001 From: Holly Grimm Date: Wed, 5 Mar 2025 09:43:36 -0700 Subject: [PATCH 3/4] refactor: Subscribe to job offer updates using the offer's ID before calling AddJobOffer, improving the reliability of job tracking. --- pkg/jobcreator/run.go | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/pkg/jobcreator/run.go b/pkg/jobcreator/run.go index a4c6528e..9ee3eee8 100644 --- a/pkg/jobcreator/run.go +++ b/pkg/jobcreator/run.go @@ -59,25 +59,10 @@ func RunJob( defer span.End() span.AddEvent("add_job_offer.start") - jobOfferContainer, err := jobCreatorService.AddJobOffer(offer) - if err != nil { - jobCreatorService.controller.log.Error("failed to add job offer", err) - span.SetStatus(codes.Error, "failed to add job offer") - span.RecordError(err) - return nil, err - } - jobCreatorService.controller.log.Debug("job offer container ID", jobOfferContainer.ID) - span.AddEvent("add_job_offer.done", - trace.WithAttributes( - attribute.String("job_offer_container.deal_id", jobOfferContainer.DealID), - attribute.String("job_offer_container.state", data.GetAgreementStateString(jobOfferContainer.State)), - )) - span.SetAttributes(attribute.String("job_offer.id", jobOfferContainer.JobOffer.ID), - attribute.String("deal.id", jobOfferContainer.DealID)) updateChan := make(chan data.JobOfferContainer) - // Now we use the filtered subscriber to only get events for this specific job + // Set up the subscription BEFORE adding the job offer cleanup := jobCreatorService.controller.SubscribeToJobOfferUpdatesWithFilter(func(evOffer data.JobOfferContainer) { span.AddEvent("job_offer_update", trace.WithAttributes(attribute.String("job_offer_container.state", data.GetAgreementStateString(evOffer.State)))) @@ -87,11 +72,29 @@ func RunJob( if eventSub != nil { eventSub(evOffer) } - }, jobOfferContainer.JobOffer.ID) + }, offer.ID) // Ensure we clean up the subscription when we're done defer cleanup() + // Add the job offer + span.AddEvent("add_job_offer.start") + jobOfferContainer, err := jobCreatorService.AddJobOffer(offer) + if err != nil { + jobCreatorService.controller.log.Error("failed to add job offer", err) + span.SetStatus(codes.Error, "failed to add job offer") + span.RecordError(err) + return nil, err + } + jobCreatorService.controller.log.Debug("job offer container ID", jobOfferContainer.ID) + span.AddEvent("add_job_offer.done", + trace.WithAttributes( + attribute.String("job_offer_container.deal_id", jobOfferContainer.DealID), + attribute.String("job_offer_container.state", data.GetAgreementStateString(jobOfferContainer.State)), + )) + span.SetAttributes(attribute.String("job_offer.id", jobOfferContainer.JobOffer.ID), + attribute.String("deal.id", jobOfferContainer.DealID)) + var finalJobOffer data.JobOfferContainer // now we wait on the state of the job @@ -136,9 +139,6 @@ waitloop: } span.AddEvent("get_result.done", trace.WithAttributes(attribute.String("result.deal_id", result.DealID))) - // Print the result immediately when we get it - fmt.Printf("🆔 Data ID for job %s: %s\n", finalJobOffer.JobOffer.ID, result.DataID) - return &RunJobResults{ JobOffer: finalJobOffer, Result: result, From 76d98079735a73d788f0764cf3c30425c02ddd38 Mon Sep 17 00:00:00 2001 From: Holly Grimm Date: Wed, 5 Mar 2025 13:30:57 -0700 Subject: [PATCH 4/4] refactor: Simplify job offer subscription management by using job ID as key --- pkg/jobcreator/controller.go | 18 +++++++----------- pkg/jobcreator/run.go | 2 -- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/pkg/jobcreator/controller.go b/pkg/jobcreator/controller.go index 6a4fb8d0..80fdb594 100644 --- a/pkg/jobcreator/controller.go +++ b/pkg/jobcreator/controller.go @@ -37,8 +37,7 @@ type JobCreatorController struct { web3Events *web3.EventChannels loop *system.ControlLoop log *system.ServiceLogger - jobOfferSubscriptions map[uint64]JobOfferSubscription - nextSubID uint64 + jobOfferSubscriptions map[string]JobOfferSubscription tracer trace.Tracer } @@ -78,8 +77,7 @@ func NewJobCreatorController( web3SDK: web3SDK, web3Events: web3.NewEventChannels(), log: system.NewServiceLogger(system.JobCreatorService), - jobOfferSubscriptions: make(map[uint64]JobOfferSubscription), - nextSubID: 0, + jobOfferSubscriptions: make(map[string]JobOfferSubscription), tracer: tracer, } return controller, nil @@ -112,15 +110,13 @@ func (controller *JobCreatorController) SubscribeToJobOfferUpdates(sub JobOfferS // If jobID is not empty, only updates for that job will be sent to the subscriber // Returns a function that can be called to unsubscribe func (controller *JobCreatorController) SubscribeToJobOfferUpdatesWithFilter(sub JobOfferSubscriber, jobID string) func() { - id := controller.nextSubID - controller.nextSubID++ - controller.jobOfferSubscriptions[id] = JobOfferSubscription{ + controller.jobOfferSubscriptions[jobID] = JobOfferSubscription{ Callback: sub, JobID: jobID, } return func() { - delete(controller.jobOfferSubscriptions, id) + delete(controller.jobOfferSubscriptions, jobID) } } @@ -169,9 +165,9 @@ func (controller *JobCreatorController) subscribeToSolver() error { metricsDashboard.TrackJobOfferUpdate(*ev.JobOffer) // Make a copy of the subscriptions to avoid modification during iteration - for _, sub := range controller.jobOfferSubscriptions { - // If JobID filter is set, only send updates for that job - if sub.JobID == "" || sub.JobID == ev.JobOffer.JobOffer.ID { + for jobID, sub := range controller.jobOfferSubscriptions { + // If JobID is empty (global subscription) or matches the job offer ID, send the update + if jobID == "" || jobID == ev.JobOffer.JobOffer.ID { sub.Callback(*ev.JobOffer) } } diff --git a/pkg/jobcreator/run.go b/pkg/jobcreator/run.go index 9ee3eee8..784af279 100644 --- a/pkg/jobcreator/run.go +++ b/pkg/jobcreator/run.go @@ -58,8 +58,6 @@ func RunJob( ctx.Ctx = c defer span.End() - span.AddEvent("add_job_offer.start") - updateChan := make(chan data.JobOfferContainer) // Set up the subscription BEFORE adding the job offer