diff --git a/pkg/cloudevents/generic/baseclient.go b/pkg/cloudevents/generic/baseclient.go index 88524bcc..6efab986 100644 --- a/pkg/cloudevents/generic/baseclient.go +++ b/pkg/cloudevents/generic/baseclient.go @@ -48,6 +48,8 @@ type baseClient struct { } func (c *baseClient) connect(ctx context.Context) error { + logger := klog.FromContext(ctx) + var err error c.cloudEventsClient, err = c.newCloudEventsClient(ctx) if err != nil { @@ -58,7 +60,7 @@ func (c *baseClient) connect(ctx context.Context) error { go func() { for { if !c.isClientReady() { - klog.V(4).Infof("reconnecting the cloudevents client") + logger.V(2).Info("reconnecting the cloudevents client") c.cloudEventsClient, err = c.newCloudEventsClient(ctx) // TODO enhance the cloudevents SKD to avoid wrapping the error type to distinguish the net connection @@ -70,7 +72,7 @@ func (c *baseClient) connect(ctx context.Context) error { continue } // the cloudevents network connection is back, mark the client ready and send the receiver restart signal - klog.V(4).Infof("the cloudevents client is reconnected") + logger.V(2).Info("the cloudevents client is reconnected") increaseClientReconnectedCounter(c.clientID) c.setClientReady(true) c.sendReceiverSignal(restartReceiverSignal) @@ -108,6 +110,7 @@ func (c *baseClient) connect(ctx context.Context) error { } func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error { + logger := klog.FromContext(ctx) now := time.Now() if err := c.cloudEventsRateLimiter.Wait(ctx); err != nil { @@ -116,8 +119,11 @@ func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error { latency := time.Since(now) if latency > longThrottleLatency { - klog.Warningf("Waited for %v due to client-side throttling, not priority and fairness, request: %s", - latency, evt.Context) + logger.V(3).Info( + "Client-side throttling delay (not priority and fairness)", + "latency", latency, + "request", evt.Context.GetID(), + ) } sendingCtx, err := c.cloudEventsOptions.WithContext(ctx, evt.Context) @@ -129,8 +135,8 @@ func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error { return fmt.Errorf("the cloudevents client is not ready") } - klog.V(4).Infof("Sending event: %v\n%s", sendingCtx, evt.Context) - klog.V(5).Infof("Sending event: evt=%s", evt) + logger.V(2).Info("Sending event", "context", sendingCtx, "event", evt.Context) + logger.V(5).Info("Sending event", "event", func() any { return evt.String() }) if err := c.cloudEventsClient.Send(sendingCtx, evt); cloudevents.IsUndelivered(err) { return err } @@ -142,9 +148,10 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) { c.Lock() defer c.Unlock() + logger := klog.FromContext(ctx) // make sure there is only one subscription go routine starting for one client. if c.receiverChan != nil { - klog.Warningf("the subscription has already started") + logger.V(2).Info("the subscription has already started") return } @@ -159,8 +166,8 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) { if startReceiving { go func() { if err := c.cloudEventsClient.StartReceiver(receiverCtx, func(evt cloudevents.Event) { - klog.V(4).Infof("Received event: %s", evt.Context) - klog.V(5).Infof("Received event: evt=%s", evt) + logger.V(2).Info("Received event", "event", evt.Context) + logger.V(5).Info("Received event", "event", func() any { return evt.String() }) receive(receiverCtx, evt) }); err != nil { @@ -183,12 +190,12 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) { switch signal { case restartReceiverSignal: - klog.V(4).Infof("restart the cloudevents receiver") + logger.V(2).Info("restart the cloudevents receiver") // rebuild the receiver context and restart receiving receiverCtx, receiverCancel = context.WithCancel(context.TODO()) startReceiving = true case stopReceiverSignal: - klog.V(4).Infof("stop the cloudevents receiver") + logger.V(2).Info("stop the cloudevents receiver") receiverCancel() default: runtime.HandleError(fmt.Errorf("unknown receiver signal %d", signal))