Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add contextual logger #527

Merged
merged 12 commits into from
Mar 5, 2025
9 changes: 5 additions & 4 deletions cmd/lilypad/solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,18 @@ func runSolver(cmd *cobra.Command, options solver.SolverOptions, network string,
commandCtx := system.NewCommandContext(cmd)
defer commandCtx.Cleanup()

if lilynext {
log.Info().Msg("🍃 Running the new lilypad protocol")
}

telemetry, err := configureTelemetry(commandCtx.Ctx, system.SolverService, network, options.Telemetry, &options.Metrics, options.Web3)
if err != nil {
log.Warn().Msgf("failed to setup opentelemetry: %s", err)
}
commandCtx.Cm.RegisterCallbackWithContext(telemetry.Shutdown)
tracer := telemetry.TracerProvider.Tracer(system.GetOTelServiceName(system.SolverService))
meter := telemetry.MeterProvider.Meter(system.GetOTelServiceName(system.SolverService))
log := system.GetLogger(system.SolverService)

if lilynext {
log.Info().Msg("🍃 Running the new lilypad protocol")
}

unregisterMetrics, err := system.NewMetrics(meter)
if err != nil {
Expand Down
107 changes: 66 additions & 41 deletions pkg/solver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/lilypad-tech/lilypad/pkg/web3"
"github.com/lilypad-tech/lilypad/pkg/web3/bindings/mediation"
"github.com/lilypad-tech/lilypad/pkg/web3/bindings/storage"
"github.com/rs/zerolog/log"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
Expand Down Expand Up @@ -51,7 +51,7 @@ type SolverController struct {
loop *system.ControlLoop
solverEventSubs []func(SolverEvent)
options SolverOptions
log *system.ServiceLogger
log *zerolog.Logger
tracer trace.Tracer
meter metric.Meter
}
Expand All @@ -75,7 +75,7 @@ func NewSolverController(
web3Events: web3.NewEventChannels(),
store: store,
options: options,
log: system.NewServiceLogger(system.SolverService),
log: system.GetLogger(system.SolverService),
tracer: tracer,
meter: meter,
}
Expand All @@ -92,7 +92,7 @@ func (controller *SolverController) Start(ctx context.Context, cm *system.Cleanu
}

// activate the web3 event listeners
log.Debug().Msgf("controller.web3Events.Start")
controller.log.Debug().Msgf("controller.web3Events.Start")
err = controller.web3Events.Start(ctx, cm, controller.web3SDK)
if err != nil {
errorChan <- err
Expand All @@ -101,7 +101,7 @@ func (controller *SolverController) Start(ctx context.Context, cm *system.Cleanu

// make sure we are registered as a solver
// so that users can lookup our URL
log.Debug().Msgf("controller.registerAsSolver")
controller.log.Debug().Msgf("controller.registerAsSolver")
err = controller.registerAsSolver()
if err != nil {
errorChan <- err
Expand All @@ -120,7 +120,7 @@ func (controller *SolverController) Start(ctx context.Context, cm *system.Cleanu
return err
},
)
log.Debug().Msgf("controller.loop.Start")
controller.log.Debug().Msgf("controller.loop.Start")
err = controller.loop.Start(true)
if err != nil {
errorChan <- err
Expand Down Expand Up @@ -151,22 +151,22 @@ func (controller *SolverController) subscribeToWeb3() error {
controller.web3Events.Storage.SubscribeDealStateChange(func(ev storage.StorageDealStateChange) {
_, err := controller.updateDealState(ev.DealId, ev.State)
if err != nil {
controller.log.Error("error updating deal state", err)
controller.log.Error().Err(err).Msg("error updating deal state")
return
}
controller.log.Info("StorageDealStateChange", data.GetAgreementStateString(ev.State))
controller.log.Info().Str("StorageDealStateChange", data.GetAgreementStateString(ev.State)).Msg("")
system.DumpObjectDebug(ev)
// update the store with the state change
controller.loop.Trigger()
})

// update the mediator
controller.web3Events.Mediation.SubscribeMediationRequested(func(ev mediation.MediationMediationRequested) {
controller.log.Info("MediationMediationRequested", "")
controller.log.Info().Msg("MediationMediationRequested")
system.DumpObjectDebug(ev)
_, err := controller.updateDealMediator(ev.DealId, ev.Mediator.String())
if err != nil {
controller.log.Error("error updating deal state", err)
controller.log.Error().Err(err).Msg("error updating deal state")
return
}

Expand Down Expand Up @@ -217,17 +217,22 @@ func (controller *SolverController) registerAsSolver() error {
return err
}

log.Debug().Msgf("GetUser with selfAddress: %s", selfAddress.String())
controller.log.Debug().Msgf("GetUser with selfAddress: %s", selfAddress.String())
selfUser, err := controller.web3SDK.GetUser(selfAddress)
if err != nil {
return err
}

// TODO: check the other props and call update if they have changed
log.Debug().Msgf("selfUser.Url: %s", selfUser.Url)
log.Debug().Msgf("controller.options.Server.URL: %s", controller.options.Server.URL)
controller.log.Debug().Msgf("selfUser.Url: %s", selfUser.Url)
controller.log.Debug().Msgf("controller.options.Server.URL: %s", controller.options.Server.URL)
if selfUser.Url != controller.options.Server.URL {
controller.log.Info("url change", fmt.Sprintf("solver will be updated because URL has changed: %s %s != %s", selfAddress.String(), selfUser.Url, controller.options.Server.URL))
controller.log.Info().
Str("address", selfAddress.String()).
Str("oldURL", selfUser.Url).
Str("newURL", controller.options.Server.URL).
Msg("solver will be updated because URL has changed")

err = controller.web3SDK.UpdateUser(
"",
controller.options.Server.URL,
Expand All @@ -237,7 +242,10 @@ func (controller *SolverController) registerAsSolver() error {
return err
}
} else {
controller.log.Info("url same", fmt.Sprintf("solver url already correct: %s %s", selfAddress.String(), controller.options.Server.URL))
controller.log.Info().
Str("address", selfAddress.String()).
Str("url", controller.options.Server.URL).
Msg("solver url already correct")
}

existingSolvers, err := controller.web3SDK.GetSolverAddresses()
Expand All @@ -247,21 +255,21 @@ func (controller *SolverController) registerAsSolver() error {
foundSolver := false
for _, existingSolver := range existingSolvers {
if existingSolver.String() == selfAddress.String() {
controller.log.Info("solver exists", selfAddress.String())
controller.log.Info().Str("address", selfAddress.String()).Msg("solver exists")
foundSolver = true
break
}
}
if !foundSolver {
controller.log.Info("solver registering", "")
controller.log.Info().Msg("solver registering")
// add the solver to the storage contract
err = controller.web3SDK.AddUserToList(
solverType,
)
if err != nil {
return err
}
controller.log.Info("solver registered", selfAddress.String())
controller.log.Info().Str("address", selfAddress.String()).Msg("solver registered")
}
return nil
}
Expand Down Expand Up @@ -291,7 +299,7 @@ func (controller *SolverController) solve(ctx context.Context) error {
}

// Match job offers with resource offers to make deals
deals, err := matcher.GetMatchingDeals(ctx, controller.store, controller.updateJobOfferState, controller.log, controller.tracer, controller.meter)
deals, err := matcher.GetMatchingDeals(ctx, controller.store, controller.updateJobOfferState, controller.tracer, controller.meter)
if err != nil {
span.SetStatus(codes.Error, "get matching deals failed")
span.RecordError(err)
Expand Down Expand Up @@ -346,7 +354,7 @@ func (controller *SolverController) cancelExpiredJobs(ctx context.Context) error
Active: true,
})
if err != nil {
controller.log.Error("get job offers failed", err)
controller.log.Error().Err(err).Msg("get job offers failed")
span.SetStatus(codes.Error, "get job offers failed")
span.RecordError(err)
return err
Expand All @@ -365,15 +373,15 @@ func (controller *SolverController) cancelExpiredJobs(ctx context.Context) error
// Cancel expired job offers
_, err := controller.updateJobOfferState(jobOffer.ID, jobOffer.DealID, data.GetAgreementStateIndex("JobTimedOut"))
if err != nil {
controller.log.Error("update expired job offer state failed", err)
controller.log.Error().Err(err).Msg("update expired job offer state failed")
span.SetStatus(codes.Error, "update expired job offer state failed")
span.RecordError(err)
}
} else {
// Cancel expired job offers, resource offers, and deals
_, err := controller.updateDealState(jobOffer.DealID, data.GetAgreementStateIndex("JobTimedOut"))
if err != nil {
controller.log.Error("update expired deal state failed", err)
controller.log.Error().Err(err).Msg("update expired deal state failed")
span.SetStatus(codes.Error, "update expired deal state failed")
span.RecordError(err)
}
Expand Down Expand Up @@ -413,7 +421,10 @@ func (controller *SolverController) addJobOffer(jobOffer data.JobOffer) (*data.J
}
jobOffer.ID = id

controller.log.Info("add job offer", jobOffer)
controller.log.Info().Str("cid", jobOffer.ID).
Str("address", jobOffer.JobCreator).
Any("jobOffer", jobOffer).
Msg("add job offer")

ret, err := controller.store.AddJobOffer(data.GetJobOfferContainer(jobOffer))
if err != nil {
Expand Down Expand Up @@ -443,26 +454,34 @@ func (controller *SolverController) addResourceOffer(resourceOffer data.Resource

// If the balance is less than the required balance, don't add the resource offer
if balance.Cmp(requiredBalanceWei) < 0 {
err := fmt.Errorf("address %s doesn't have enough ETH balance. The required balance is %s but current balance is %s", resourceOffer.ResourceProvider, requiredBalanceWei, balance)
controller.log.Error("ETH balance check failed", err)
controller.log.Error().Err(err).
Str("addresss", resourceOffer.ResourceProvider).
Str("balance", balance.String()).
Str("requiredBalance", requiredBalanceWei.String()).
Msg("resource provider does not have enough ETH to post offer")
return nil, nil
}

// required LP balance
requiredBalanceLp := web3.EtherToWei(float64(resourceOffer.DefaultPricing.InstructionPrice)) // based on the required LP balance for a job
balanceLp, err := controller.web3SDK.GetLPBalance(resourceOffer.ResourceProvider)
if err != nil {
err := fmt.Errorf("failed to retrieve LP balance for resource provider: %v", err)
controller.log.Error("LP Balance error", err)
controller.log.Error().Err(err).Msg("failed to retrieve LP balance for resource provider")
return nil, nil
}
if balanceLp.Cmp(requiredBalanceLp) < 0 {
err := fmt.Errorf("address %s doesn't have enough LP balance. The required balance is %s but current balance is %s", resourceOffer.ResourceProvider, requiredBalanceLp, balanceLp)
controller.log.Error("LP balance check failed", err)
controller.log.Error().Err(err).
Str("addresss", resourceOffer.ResourceProvider).
Str("balance", balanceLp.String()).
Str("requiredBalance", requiredBalanceLp.String()).
Msg("resource provider does not have enough LP to post offer")
return nil, nil
}

controller.log.Info("add resource offer", resourceOffer)
controller.log.Info().Str("cid", resourceOffer.ID).
Str("address", resourceOffer.ResourceProvider).
Any("resourceOffer", resourceOffer).
Msg("add resource offer")

metricsDashboard.TrackNodeInfo(resourceOffer)

Expand All @@ -480,7 +499,7 @@ func (controller *SolverController) addResourceOffer(resourceOffer data.Resource

// Remove resource offers in an unmatched DealNegotiating[0] state
func (controller *SolverController) removeUnmatchedResourceOffers(resourceProviderID string) error {
controller.log.Info("remove resource offer", resourceProviderID)
controller.log.Info().Str("address", resourceProviderID).Msg("remove unmatched resource offers")
resourceOffers, err := controller.store.GetResourceOffers(store.GetResourceOffersQuery{
ResourceProvider: resourceProviderID,
})
Expand All @@ -492,8 +511,10 @@ func (controller *SolverController) removeUnmatchedResourceOffers(resourceProvid
if offer.State == 0 {
err = controller.store.RemoveResourceOffer(offer.ID)
if err != nil {
controller.log.Error("remove resource offer failed",
fmt.Errorf("resource provider: %s, offer ID: %s, error: %s", resourceProviderID, offer.ID, err))
controller.log.Error().Err(err).
Str("address", resourceProviderID).
Str("cid", offer.ID).
Msg("remove resource offer failed")
}
}
}
Expand All @@ -520,7 +541,11 @@ func (controller *SolverController) addDeal(ctx context.Context, deal data.Deal)
span.SetAttributes(attribute.String("deal.id", deal.ID))
span.AddEvent("data.get_deal_id.done")

controller.log.Info("add deal", deal)
controller.log.Info().Str("cid", deal.ID).
Str("resourceProvider", deal.Members.ResourceProvider).
Str("jobCreator", deal.Members.JobCreator).
Any("deal", deal).
Msg("add deal")

//creates deal container and sets state to agreed
dealContainer := data.GetDealContainer(deal)
Expand Down Expand Up @@ -575,7 +600,7 @@ func (controller *SolverController) addDeal(ctx context.Context, deal data.Deal)
*
*/
func (controller *SolverController) updateJobOfferState(id string, dealID string, state uint8) (*data.JobOfferContainer, error) {
controller.log.Info("update job offer", fmt.Sprintf("%s %s", id, data.GetAgreementStateString(state)))
controller.log.Info().Str("cid", id).Str("state", data.GetAgreementStateString(state)).Msg("update job offer")

ret, err := controller.store.UpdateJobOfferState(id, dealID, state)
if err != nil {
Expand All @@ -589,7 +614,7 @@ func (controller *SolverController) updateJobOfferState(id string, dealID string
}

func (controller *SolverController) updateResourceOfferState(id string, dealID string, state uint8) (*data.ResourceOfferContainer, error) {
controller.log.Info("update resource offer", fmt.Sprintf("%s %s", id, data.GetAgreementStateString(state)))
controller.log.Info().Str("cid", id).Str("state", data.GetAgreementStateString(state)).Msg("update resource offer")

ret, err := controller.store.UpdateResourceOfferState(id, dealID, state)
if err != nil {
Expand All @@ -604,7 +629,7 @@ func (controller *SolverController) updateResourceOfferState(id string, dealID s

// this will also update the job and resource offer states
func (controller *SolverController) updateDealState(id string, state uint8) (*data.DealContainer, error) {
controller.log.Info("update deal", fmt.Sprintf("%s %s", id, data.GetAgreementStateString(state)))
controller.log.Info().Str("cid", id).Str("state", data.GetAgreementStateString(state)).Msg("update deal")

dealContainer, err := controller.store.UpdateDealState(id, state)
if err != nil {
Expand All @@ -628,7 +653,7 @@ func (controller *SolverController) updateDealState(id string, state uint8) (*da

// this will also update the job and resource offer states
func (controller *SolverController) updateDealMediator(id string, mediator string) (*data.DealContainer, error) {
controller.log.Info("update mediator", fmt.Sprintf("%s %s", id, mediator))
controller.log.Info().Str("cid", id).Str("address", mediator).Msg("update deal mediator")
dealContainer, err := controller.store.UpdateDealMediator(id, mediator)
if err != nil {
return nil, err
Expand All @@ -652,7 +677,7 @@ func (controller *SolverController) updateDealMediator(id string, mediator strin
*
*/
func (controller *SolverController) updateDealTransactionsResourceProvider(id string, payload data.DealTransactionsResourceProvider) (*data.DealContainer, error) {
controller.log.Info("update resource provider txs", payload)
controller.log.Info().Any("payload", payload).Msg("update resource provider txs")
dealContainer, err := controller.store.UpdateDealTransactionsResourceProvider(id, payload)
if err != nil {
return nil, err
Expand All @@ -665,7 +690,7 @@ func (controller *SolverController) updateDealTransactionsResourceProvider(id st
}

func (controller *SolverController) updateDealTransactionsJobCreator(id string, payload data.DealTransactionsJobCreator) (*data.DealContainer, error) {
controller.log.Info("update job creator txs", payload)
controller.log.Info().Any("payload", payload).Msg("update job creator txs")
dealContainer, err := controller.store.UpdateDealTransactionsJobCreator(id, payload)
if err != nil {
return nil, err
Expand All @@ -681,7 +706,7 @@ func (controller *SolverController) updateDealTransactionsJobCreator(id string,
}

func (controller *SolverController) updateDealTransactionsMediator(id string, payload data.DealTransactionsMediator) (*data.DealContainer, error) {
controller.log.Info("update mediator txs", payload)
controller.log.Info().Any("payload", payload).Msg("update mediator txs")
dealContainer, err := controller.store.UpdateDealTransactionsMediator(id, payload)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/solver/matcher/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"strings"

"github.com/lilypad-tech/lilypad/pkg/data"
"github.com/rs/zerolog/log"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
)

Expand Down Expand Up @@ -379,7 +379,7 @@ func getLargestVRAM(gpus []data.GPUSpec) int {
return largestVRAM
}

func logMatch(result matchResult) {
func logMatch(result matchResult, log *zerolog.Logger) {
switch r := result.(type) {
case offersMatched:
log.Trace().
Expand Down
Loading