Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Isolate job output in cli #528

Merged
merged 6 commits into from
Mar 5, 2025
Merged
43 changes: 36 additions & 7 deletions pkg/jobcreator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,25 @@ import (
"go.opentelemetry.io/otel/trace"
)

// JobOfferSubscriber is a function that gets called when a job offer is updated
type JobOfferSubscriber func(offer data.JobOfferContainer)

// JobOfferSubscription represents a subscription to job offer updates
type JobOfferSubscription struct {
// The callback function to call when a job offer is updated
Callback JobOfferSubscriber
// Optional job ID filter - if specified, only updates for this job will be sent
JobID string
}

type JobCreatorController struct {
solverClient *solver.SolverClient
options JobCreatorOptions
web3SDK *web3.Web3SDK
web3Events *web3.EventChannels
loop *system.ControlLoop
log *system.ServiceLogger
jobOfferSubscriptions []JobOfferSubscriber
jobOfferSubscriptions map[string]JobOfferSubscription
tracer trace.Tracer
}

Expand Down Expand Up @@ -68,7 +77,7 @@ func NewJobCreatorController(
web3SDK: web3SDK,
web3Events: web3.NewEventChannels(),
log: system.NewServiceLogger(system.JobCreatorService),
jobOfferSubscriptions: []JobOfferSubscriber{},
jobOfferSubscriptions: make(map[string]JobOfferSubscription),
tracer: tracer,
}
return controller, nil
Expand All @@ -91,8 +100,24 @@ func (controller *JobCreatorController) AddJobOffer(offer data.JobOffer) (data.J
return controller.solverClient.AddJobOffer(offer)
}

func (controller *JobCreatorController) SubscribeToJobOfferUpdates(sub JobOfferSubscriber) {
controller.jobOfferSubscriptions = append(controller.jobOfferSubscriptions, sub)
// SubscribeToJobOfferUpdates subscribes to all job offer updates
// Returns a function that can be called to unsubscribe
func (controller *JobCreatorController) SubscribeToJobOfferUpdates(sub JobOfferSubscriber) func() {
return controller.SubscribeToJobOfferUpdatesWithFilter(sub, "")
}

// SubscribeToJobOfferUpdatesWithFilter subscribes to job offer updates with an optional job ID filter
// If jobID is not empty, only updates for that job will be sent to the subscriber
// Returns a function that can be called to unsubscribe
func (controller *JobCreatorController) SubscribeToJobOfferUpdatesWithFilter(sub JobOfferSubscriber, jobID string) func() {
controller.jobOfferSubscriptions[jobID] = JobOfferSubscription{
Callback: sub,
JobID: jobID,
}

return func() {
delete(controller.jobOfferSubscriptions, jobID)
}
}

/*
Expand Down Expand Up @@ -138,8 +163,13 @@ func (controller *JobCreatorController) subscribeToSolver() error {
return
}
metricsDashboard.TrackJobOfferUpdate(*ev.JobOffer)
for _, sub := range controller.jobOfferSubscriptions {
go sub(*ev.JobOffer)

// Make a copy of the subscriptions to avoid modification during iteration
for jobID, sub := range controller.jobOfferSubscriptions {
// If JobID is empty (global subscription) or matches the job offer ID, send the update
if jobID == "" || jobID == ev.JobOffer.JobOffer.ID {
sub.Callback(*ev.JobOffer)
}
}
}
})
Expand Down Expand Up @@ -398,7 +428,6 @@ func (controller *JobCreatorController) acceptResult(deal data.DealContainer) er
return fmt.Errorf("error adding AcceptResult tx hash for deal: %s", err.Error())
}


return nil
}

Expand Down
38 changes: 24 additions & 14 deletions pkg/jobcreator/jobcreator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,46 +43,56 @@ type JobCreatorOptions struct {
Telemetry system.TelemetryOptions
}

type JobCreator struct {
web3SDK *web3.Web3SDK
options JobCreatorOptions
type JobCreator interface {
Start(ctx context.Context, cm *system.CleanupManager) chan error
GetJobOfferFromOptions(options JobCreatorOfferOptions) (data.JobOffer, error)
AddJobOffer(offer data.JobOffer) (data.JobOfferContainer, error)
SubscribeToJobOfferUpdates(sub JobOfferSubscriber) func()
SubscribeToJobOfferUpdatesWithFilter(sub JobOfferSubscriber, jobID string) func()
GetResult(dealId string) (data.Result, error)
}

type BasicJobCreator struct {
controller *JobCreatorController
web3SDK *web3.Web3SDK
}

func NewJobCreator(
options JobCreatorOptions,
web3SDK *web3.Web3SDK,
tracer trace.Tracer,
) (*JobCreator, error) {
) (*BasicJobCreator, error) {
controller, err := NewJobCreatorController(options, web3SDK, tracer)
if err != nil {
return nil, err
}
jc := &JobCreator{
return &BasicJobCreator{
controller: controller,
options: options,
web3SDK: web3SDK,
}
return jc, nil
}, nil
}

func (jobCreator *JobCreator) Start(ctx context.Context, cm *system.CleanupManager) chan error {
func (jobCreator *BasicJobCreator) Start(ctx context.Context, cm *system.CleanupManager) chan error {
return jobCreator.controller.Start(ctx, cm)
}

func (jobCreator *JobCreator) GetJobOfferFromOptions(options JobCreatorOfferOptions) (data.JobOffer, error) {
func (jobCreator *BasicJobCreator) GetJobOfferFromOptions(options JobCreatorOfferOptions) (data.JobOffer, error) {
return getJobOfferFromOptions(options, jobCreator.web3SDK.GetAddress().String())
}

// adds the job offer to the solver
func (jobCreator *JobCreator) AddJobOffer(offer data.JobOffer) (data.JobOfferContainer, error) {
func (jobCreator *BasicJobCreator) AddJobOffer(offer data.JobOffer) (data.JobOfferContainer, error) {
return jobCreator.controller.AddJobOffer(offer)
}

func (jobCreator *JobCreator) SubscribeToJobOfferUpdates(sub JobOfferSubscriber) {
jobCreator.controller.SubscribeToJobOfferUpdates(sub)
func (jobCreator *BasicJobCreator) SubscribeToJobOfferUpdates(sub JobOfferSubscriber) func() {
return jobCreator.controller.SubscribeToJobOfferUpdates(sub)
}

func (jobCreator *BasicJobCreator) SubscribeToJobOfferUpdatesWithFilter(sub JobOfferSubscriber, jobID string) func() {
return jobCreator.controller.SubscribeToJobOfferUpdatesWithFilter(sub, jobID)
}

func (jobCreator *JobCreator) GetResult(dealId string) (data.Result, error) {
func (jobCreator *BasicJobCreator) GetResult(dealId string) (data.Result, error) {
return jobCreator.controller.solverClient.GetResult(dealId)
}
33 changes: 19 additions & 14 deletions pkg/jobcreator/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ func RunJob(
return nil, err
}

jobCreatorService.SubscribeToJobOfferUpdates(eventSub)

jobCreatorErrors := jobCreatorService.Start(ctx.Ctx, ctx.Cm)

// let's process our options into an actual job offer
Expand All @@ -60,6 +58,24 @@ func RunJob(
ctx.Ctx = c
defer span.End()

updateChan := make(chan data.JobOfferContainer)

// Set up the subscription BEFORE adding the job offer
cleanup := jobCreatorService.controller.SubscribeToJobOfferUpdatesWithFilter(func(evOffer data.JobOfferContainer) {
span.AddEvent("job_offer_update",
trace.WithAttributes(attribute.String("job_offer_container.state", data.GetAgreementStateString(evOffer.State))))
updateChan <- evOffer

// Additionally call the provided eventSub if one was passed in
if eventSub != nil {
eventSub(evOffer)
}
}, offer.ID)

// Ensure we clean up the subscription when we're done
defer cleanup()

// Add the job offer
span.AddEvent("add_job_offer.start")
jobOfferContainer, err := jobCreatorService.AddJobOffer(offer)
if err != nil {
Expand All @@ -68,7 +84,7 @@ func RunJob(
span.RecordError(err)
return nil, err
}
jobCreatorService.controller.log.Debug("job offer ID", jobOfferContainer.ID)
jobCreatorService.controller.log.Debug("job offer container ID", jobOfferContainer.ID)
span.AddEvent("add_job_offer.done",
trace.WithAttributes(
attribute.String("job_offer_container.deal_id", jobOfferContainer.DealID),
Expand All @@ -77,17 +93,6 @@ func RunJob(
span.SetAttributes(attribute.String("job_offer.id", jobOfferContainer.JobOffer.ID),
attribute.String("deal.id", jobOfferContainer.DealID))

updateChan := make(chan data.JobOfferContainer)

jobCreatorService.SubscribeToJobOfferUpdates(func(evOffer data.JobOfferContainer) {
if evOffer.JobOffer.ID != jobOfferContainer.ID {
return
}
span.AddEvent("job_offer_update",
trace.WithAttributes(attribute.String("job_offer_container.state", data.GetAgreementStateString(evOffer.State))))
updateChan <- evOffer
})

var finalJobOffer data.JobOfferContainer

// now we wait on the state of the job
Expand Down