Skip to content

Commit

Permalink
feat: add trigger config deadletter and order event (#643)
Browse files Browse the repository at this point in the history
* feat: add trigger config deadletter and order event

* feat: add trigger config deadletter and order event
  • Loading branch information
xdlbdy authored Oct 25, 2023
1 parent 404fc79 commit 3267c49
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 5 deletions.
4 changes: 3 additions & 1 deletion server/trigger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,7 @@ type Config struct {
// var client read event from segment batch size.
PullEventBatchSize int `yaml:"pull_event_batch_size"`
// max uack event number
MaxUACKEventNumber int `yaml:"max_uack_event_number"`
MaxUACKEventNumber int `yaml:"max_uack_event_number"`
DisableDeadLetter *bool `yaml:"disable_dead_letter"`
OrderEvent *bool `yaml:"order_event"`
}
8 changes: 6 additions & 2 deletions server/trigger/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,9 @@ func (t *trigger) writeEventToRetry(ctx context.Context, e *ce.Event, attempts i
for {
writeAttempt++
startTime := time.Now()
_, err := api.AppendOne(ctx, t.timerEventWriter, e)
timeoutCtx, cancel := context.WithTimeout(ctx, t.getConfig().DeliveryTimeout)
_, err := api.AppendOne(timeoutCtx, t.timerEventWriter, e)
cancel()
metrics.TriggerRetryEventAppendSecond.WithLabelValues(t.subscriptionIDStr).
Observe(time.Since(startTime).Seconds())
if err != nil {
Expand Down Expand Up @@ -534,7 +536,9 @@ func (t *trigger) writeEventToDeadLetter(ctx context.Context, e *ce.Event, reaso
for {
writeAttempt++
startTime := time.Now()
_, err := api.AppendOne(ctx, t.dlEventWriter, e)
timeoutCtx, cancel := context.WithTimeout(ctx, t.getConfig().DeliveryTimeout)
_, err := api.AppendOne(timeoutCtx, t.dlEventWriter, e)
cancel()
metrics.TriggerDeadLetterEventAppendSecond.WithLabelValues(t.subscriptionIDStr).
Observe(time.Since(startTime).Seconds())
if err != nil {
Expand Down
13 changes: 11 additions & 2 deletions server/trigger/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,20 @@ func (w *worker) getAllSubscriptionInfo(ctx context.Context) []*metapb.Subscript
func (w *worker) getTriggerOptions(subscription *primitive.Subscription) []trigger.Option {
opts := []trigger.Option{trigger.WithControllers(w.config.ControllerAddr)}
config := subscription.Config
// todo use subscription config replace global config
disableDeadLetter := config.DisableDeadLetter
if w.config.DisableDeadLetter != nil {
disableDeadLetter = *w.config.DisableDeadLetter
}
orderEvent := config.OrderedEvent
if w.config.OrderEvent != nil {
orderEvent = *w.config.OrderEvent
}
opts = append(opts, trigger.WithRateLimit(config.RateLimit),
trigger.WithDeliveryTimeout(config.DeliveryTimeout),
trigger.WithMaxRetryAttempts(config.GetMaxRetryAttempts()),
trigger.WithDisableDeadLetter(config.DisableDeadLetter),
trigger.WithOrdered(config.OrderedEvent),
trigger.WithDisableDeadLetter(disableDeadLetter),
trigger.WithOrdered(orderEvent),
trigger.WithGoroutineSize(w.config.SendEventGoroutineSize),
trigger.WithSendBatchSize(w.config.SendEventBatchSize),
trigger.WithPullBatchSize(w.config.PullEventBatchSize),
Expand Down

0 comments on commit 3267c49

Please sign in to comment.