Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions pkg/cloudevents/generic/baseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type baseClient struct {
}

func (c *baseClient) connect(ctx context.Context) error {
logger := klog.FromContext(ctx)

var err error
c.cloudEventsClient, err = c.newCloudEventsClient(ctx)
if err != nil {
Expand All @@ -58,7 +60,7 @@ func (c *baseClient) connect(ctx context.Context) error {
go func() {
for {
if !c.isClientReady() {
klog.V(4).Infof("reconnecting the cloudevents client")
logger.V(2).Info("reconnecting the cloudevents client")

c.cloudEventsClient, err = c.newCloudEventsClient(ctx)
// TODO enhance the cloudevents SKD to avoid wrapping the error type to distinguish the net connection
Expand All @@ -70,7 +72,7 @@ func (c *baseClient) connect(ctx context.Context) error {
continue
}
// the cloudevents network connection is back, mark the client ready and send the receiver restart signal
klog.V(4).Infof("the cloudevents client is reconnected")
logger.V(2).Info("the cloudevents client is reconnected")
increaseClientReconnectedCounter(c.clientID)
c.setClientReady(true)
c.sendReceiverSignal(restartReceiverSignal)
Expand Down Expand Up @@ -108,6 +110,7 @@ func (c *baseClient) connect(ctx context.Context) error {
}

func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error {
logger := klog.FromContext(ctx)
now := time.Now()

if err := c.cloudEventsRateLimiter.Wait(ctx); err != nil {
Expand All @@ -116,8 +119,11 @@ func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error {

latency := time.Since(now)
if latency > longThrottleLatency {
klog.Warningf("Waited for %v due to client-side throttling, not priority and fairness, request: %s",
latency, evt.Context)
logger.V(3).Info(
"Client-side throttling delay (not priority and fairness)",
"latency", latency,
"request", evt.Context.GetID(),
)
}

sendingCtx, err := c.cloudEventsOptions.WithContext(ctx, evt.Context)
Expand All @@ -129,8 +135,8 @@ func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error {
return fmt.Errorf("the cloudevents client is not ready")
}

klog.V(4).Infof("Sending event: %v\n%s", sendingCtx, evt.Context)
klog.V(5).Infof("Sending event: evt=%s", evt)
logger.V(2).Info("Sending event", "context", sendingCtx, "event", evt.Context)
logger.V(5).Info("Sending event", "event", func() any { return evt.String() })
if err := c.cloudEventsClient.Send(sendingCtx, evt); cloudevents.IsUndelivered(err) {
return err
}
Expand All @@ -142,9 +148,10 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
c.Lock()
defer c.Unlock()

logger := klog.FromContext(ctx)
// make sure there is only one subscription go routine starting for one client.
if c.receiverChan != nil {
klog.Warningf("the subscription has already started")
logger.V(2).Info("the subscription has already started")
return
}

Expand All @@ -159,8 +166,8 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
if startReceiving {
go func() {
if err := c.cloudEventsClient.StartReceiver(receiverCtx, func(evt cloudevents.Event) {
klog.V(4).Infof("Received event: %s", evt.Context)
klog.V(5).Infof("Received event: evt=%s", evt)
logger.V(2).Info("Received event", "event", evt.Context)
logger.V(5).Info("Received event", "event", func() any { return evt.String() })

receive(receiverCtx, evt)
}); err != nil {
Expand All @@ -183,12 +190,12 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {

switch signal {
case restartReceiverSignal:
klog.V(4).Infof("restart the cloudevents receiver")
logger.V(2).Info("restart the cloudevents receiver")
// rebuild the receiver context and restart receiving
receiverCtx, receiverCancel = context.WithCancel(context.TODO())
startReceiving = true
case stopReceiverSignal:
klog.V(4).Infof("stop the cloudevents receiver")
logger.V(2).Info("stop the cloudevents receiver")
receiverCancel()
default:
runtime.HandleError(fmt.Errorf("unknown receiver signal %d", signal))
Expand Down