From ba36d72416ee1fe6894ab7754452167dc8586d64 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Tue, 13 Aug 2024 16:55:15 +0200 Subject: [PATCH] feat: add notification handler --- cmd/notification-service/main.go | 35 +- internal/event/models/subject.go | 16 + .../consumer/balancethreshold_test.go | 162 ++++++ .../notification/consumer/balancetreshold.go | 345 ++++++++++++ internal/notification/consumer/consumer.go | 32 +- internal/notification/event.go | 6 + .../notification/repository/repository.go | 10 + internal/notification/rule.go | 11 + internal/productcatalog/driver/feature.go | 2 +- internal/productcatalog/driver/parser.go | 2 +- test/notification/consumer_balance.go | 511 ++++++++++++++++++ test/notification/notification_test.go | 15 + 12 files changed, 1129 insertions(+), 18 deletions(-) create mode 100644 internal/notification/consumer/balancethreshold_test.go create mode 100644 internal/notification/consumer/balancetreshold.go create mode 100644 test/notification/consumer_balance.go diff --git a/cmd/notification-service/main.go b/cmd/notification-service/main.go index e0d1b08b6..823e1f56f 100644 --- a/cmd/notification-service/main.go +++ b/cmd/notification-service/main.go @@ -30,7 +30,10 @@ import ( "github.com/openmeterio/openmeter/config" "github.com/openmeterio/openmeter/internal/meter" + "github.com/openmeterio/openmeter/internal/notification" "github.com/openmeterio/openmeter/internal/notification/consumer" + notificationrepository "github.com/openmeterio/openmeter/internal/notification/repository" + notificationwebhook "github.com/openmeterio/openmeter/internal/notification/webhook" "github.com/openmeterio/openmeter/internal/registry" registrybuilder "github.com/openmeterio/openmeter/internal/registry/builder" "github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector" @@ -283,7 +286,7 @@ func main() { } // Dependencies: entitlement - entitlementConnectors := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{ + entitlementConnRegistry := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{ DatabaseClient: entClient, StreamingConnector: clickhouseStreamingConnector, MeterRepository: meterRepository, @@ -291,6 +294,34 @@ func main() { Publisher: eventPublisher, }) + // Dependencies: notification + notificationRepo, err := notificationrepository.New(notificationrepository.Config{ + Client: entClient, + Logger: logger.WithGroup("notification.postgres"), + }) + if err != nil { + logger.Error("failed to initialize notification repository", "error", err) + os.Exit(1) + } + + notificationWebhook, err := notificationwebhook.New(notificationwebhook.Config{ + SvixConfig: conf.Svix, + }) + if err != nil { + logger.Error("failed to initialize notification repository", "error", err) + os.Exit(1) + } + + notificationService, err := notification.New(notification.Config{ + Repository: notificationRepo, + Webhook: notificationWebhook, + FeatureConnector: entitlementConnRegistry.Feature, + }) + if err != nil { + logger.Error("failed to initialize notification service", "error", err) + os.Exit(1) + } + // Initialize consumer consumerOptions := consumer.Options{ SystemEventsTopic: conf.Events.SystemEvents.Topic, @@ -303,7 +334,7 @@ func main() { }, Marshaler: eventPublisher.Marshaler(), - Entitlement: entitlementConnectors, + Notification: notificationService, Logger: logger, } diff --git a/internal/event/models/subject.go b/internal/event/models/subject.go index 510a0c64e..32c50daf0 100644 --- a/internal/event/models/subject.go +++ b/internal/event/models/subject.go @@ -2,7 +2,11 @@ package models import ( "errors" + "maps" "time" + + "github.com/openmeterio/openmeter/api" + "github.com/openmeterio/openmeter/pkg/convert" ) type SubjectKeyAndID struct { @@ -35,3 +39,15 @@ func (s Subject) Validate() error { return nil } + +func (s Subject) ToAPIModel() api.Subject { + return api.Subject{ + Id: s.Id, + Key: s.Key, + DisplayName: s.DisplayName, + Metadata: convert.ToPointer(maps.Clone(s.Metadata)), + CurrentPeriodStart: s.CurrentPeriodStart, + CurrentPeriodEnd: s.CurrentPeriodEnd, + StripeCustomerId: s.StripeCustomerId, + } +} diff --git a/internal/notification/consumer/balancethreshold_test.go b/internal/notification/consumer/balancethreshold_test.go new file mode 100644 index 000000000..f38246109 --- /dev/null +++ b/internal/notification/consumer/balancethreshold_test.go @@ -0,0 +1,162 @@ +package consumer + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/openmeterio/openmeter/api" + "github.com/openmeterio/openmeter/internal/entitlement/snapshot" + "github.com/openmeterio/openmeter/internal/notification" + "github.com/openmeterio/openmeter/pkg/convert" +) + +func newNumericThreshold(v float64) notification.BalanceThreshold { + return notification.BalanceThreshold{ + Value: v, + Type: api.NUMBER, + } +} + +func newPercentThreshold(v float64) notification.BalanceThreshold { + return notification.BalanceThreshold{ + Value: v, + Type: api.PERCENT, + } +} + +func TestGetHighestMatchingBalanceThreshold(t *testing.T) { + tcs := []struct { + Name string + BalanceThresholds []notification.BalanceThreshold + EntitlementValue snapshot.EntitlementValue + Expect *notification.BalanceThreshold + }{ + { + Name: "Numerical values only", + BalanceThresholds: []notification.BalanceThreshold{ + newNumericThreshold(20), + newNumericThreshold(10), + newNumericThreshold(30), + }, + EntitlementValue: snapshot.EntitlementValue{ + Balance: convert.ToPointer(10.0), + Usage: convert.ToPointer(20.0), + }, + // Already used 20, so the matching threshold is the 20 + Expect: convert.ToPointer(newNumericThreshold(20)), + }, + { + Name: "Numerical values only - 100%", + BalanceThresholds: []notification.BalanceThreshold{ + newNumericThreshold(20), + newNumericThreshold(10), + newNumericThreshold(30), + }, + EntitlementValue: snapshot.EntitlementValue{ + Balance: convert.ToPointer(0.0), + Usage: convert.ToPointer(30.0), + }, + Expect: convert.ToPointer(newNumericThreshold(30)), + }, + { + Name: "Numerical values only - 100%+ with overage", + BalanceThresholds: []notification.BalanceThreshold{ + newNumericThreshold(20), + newNumericThreshold(10), + newNumericThreshold(30), + }, + EntitlementValue: snapshot.EntitlementValue{ + Balance: convert.ToPointer(0.0), + Usage: convert.ToPointer(30.0), + Overage: convert.ToPointer(10.0), + }, + Expect: convert.ToPointer(newNumericThreshold(30)), + }, + { + Name: "Percentages with overage", + BalanceThresholds: []notification.BalanceThreshold{ + newPercentThreshold(50), + newPercentThreshold(100), + newPercentThreshold(110), + newPercentThreshold(120), + }, + EntitlementValue: snapshot.EntitlementValue{ + Balance: convert.ToPointer(0.0), + Usage: convert.ToPointer(110.0), + Overage: convert.ToPointer(10.0), + }, + Expect: convert.ToPointer(newPercentThreshold(110)), + }, + { + Name: "Mixed values", + BalanceThresholds: []notification.BalanceThreshold{ + newNumericThreshold(20), + newNumericThreshold(10), + newNumericThreshold(30), + newPercentThreshold(50), + }, + EntitlementValue: snapshot.EntitlementValue{ + Balance: convert.ToPointer(14.0), + Usage: convert.ToPointer(16.0), + }, + Expect: convert.ToPointer(newPercentThreshold(50)), + }, + // Corner cases + { + Name: "No grants", + BalanceThresholds: []notification.BalanceThreshold{ + newNumericThreshold(20), + newPercentThreshold(100), + }, + EntitlementValue: snapshot.EntitlementValue{ + Balance: convert.ToPointer(0.0), + Usage: convert.ToPointer(0.0), + }, + Expect: nil, + }, + { + Name: "Last threshold is ", + BalanceThresholds: []notification.BalanceThreshold{ + newNumericThreshold(20), + }, + EntitlementValue: snapshot.EntitlementValue{ + Balance: convert.ToPointer(0.0), + Usage: convert.ToPointer(30.0), + }, + Expect: convert.ToPointer(newNumericThreshold(20)), + }, + { + Name: "Same threshold in percentage and number", + BalanceThresholds: []notification.BalanceThreshold{ + newNumericThreshold(15), + newPercentThreshold(50), + }, + EntitlementValue: snapshot.EntitlementValue{ + Balance: convert.ToPointer(14.0), + Usage: convert.ToPointer(16.0), + }, + Expect: convert.ToPointer(newPercentThreshold(50)), + }, + { + Name: "Exact threshold match", + BalanceThresholds: []notification.BalanceThreshold{ + newNumericThreshold(15), + newPercentThreshold(50), + }, + EntitlementValue: snapshot.EntitlementValue{ + Balance: convert.ToPointer(15.0), + Usage: convert.ToPointer(15.0), + }, + Expect: convert.ToPointer(newPercentThreshold(50)), + }, + } + + for _, tc := range tcs { + t.Run(tc.Name, func(t *testing.T) { + got, err := getHighestMatchingThreshold(tc.BalanceThresholds, tc.EntitlementValue) + assert.NoError(t, err) + assert.Equal(t, tc.Expect, got) + }) + } +} diff --git a/internal/notification/consumer/balancetreshold.go b/internal/notification/consumer/balancetreshold.go new file mode 100644 index 000000000..34cd036e9 --- /dev/null +++ b/internal/notification/consumer/balancetreshold.go @@ -0,0 +1,345 @@ +package consumer + +import ( + "cmp" + "context" + "crypto/sha256" + "errors" + "fmt" + "log/slog" + "strings" + "time" + + "golang.org/x/exp/slices" + + "github.com/openmeterio/openmeter/api" + "github.com/openmeterio/openmeter/internal/entitlement" + entitlementdriver "github.com/openmeterio/openmeter/internal/entitlement/driver" + "github.com/openmeterio/openmeter/internal/entitlement/snapshot" + "github.com/openmeterio/openmeter/internal/notification" + productcatalogdriver "github.com/openmeterio/openmeter/internal/productcatalog/driver" + "github.com/openmeterio/openmeter/pkg/defaultx" + "github.com/openmeterio/openmeter/pkg/models" + "github.com/openmeterio/openmeter/pkg/pagination" + "github.com/openmeterio/openmeter/pkg/recurrence" + "github.com/openmeterio/openmeter/pkg/slicesx" + "github.com/openmeterio/openmeter/pkg/sortx" +) + +type BalanceThresholdEventHandler struct { + Notification notification.Service + Logger *slog.Logger +} + +type BalanceThresholdEventHandlerState struct { + TotalGrants float64 `json:"totalGrants"` +} + +var ErrNoBalanceAvailable = errors.New("no balance available") + +func (b *BalanceThresholdEventHandler) Handle(ctx context.Context, event snapshot.SnapshotEvent) error { + if !b.isBalanceThresholdEvent(event) { + return nil + } + + // TODO[issue-1364]: this must be cached to prevent going to the DB for each balance.snapshot event + affectedRulesPaged, err := b.Notification.ListRules(ctx, notification.ListRulesInput{ + Namespaces: []string{event.Entitlement.Namespace}, + Types: []notification.RuleType{notification.RuleTypeBalanceThreshold}, + }) + if err != nil { + return fmt.Errorf("failed to list notification rules: %w", err) + } + + affectedRules := slicesx.Filter(affectedRulesPaged.Items, func(rule notification.Rule) bool { + if len(rule.Config.BalanceThreshold.Thresholds) == 0 { + return false + } + + if len(rule.Config.BalanceThreshold.Features) == 0 { + return true + } + + return slices.Contains(rule.Config.BalanceThreshold.Features, event.Entitlement.FeatureID) || + slices.Contains(rule.Config.BalanceThreshold.Features, event.Entitlement.FeatureKey) + }) + + var errs error + for _, rule := range affectedRules { + if !rule.HasEnabledChannels() { + break + } + + if err := b.handleRule(ctx, event, rule); err != nil { + errs = errors.Join(errs, err) + } + } + + return errs +} + +func (b *BalanceThresholdEventHandler) handleRule(ctx context.Context, balSnapshot snapshot.SnapshotEvent, rule notification.Rule) error { + // Check 1: do we have a threshold we should create an event for? + + threshold, err := getHighestMatchingThreshold(rule.Config.BalanceThreshold.Thresholds, *balSnapshot.Value) + if err != nil { + return fmt.Errorf("failed to get highest matching threshold: %w", err) + } + + if threshold == nil { + // No matching threshold found => nothing to create an event on + return nil + } + + // Check 2: fetch the last event for the same period and validate if we need to send a new notification + + periodDedupeHash := b.getPeriodsDeduplicationHash(balSnapshot, rule.ID) + + // TODO[issue-1364]: this must be cached to prevent going to the DB for each balance.snapshot event + lastEvents, err := b.Notification.ListEvents(ctx, notification.ListEventsInput{ + Page: pagination.Page{ + PageSize: 1, + PageNumber: 1, + }, + Namespaces: []string{balSnapshot.Entitlement.Namespace}, + + From: balSnapshot.Entitlement.CurrentUsagePeriod.From, + To: balSnapshot.Entitlement.CurrentUsagePeriod.To, + + DeduplicationHashes: []string{periodDedupeHash}, + OrderBy: notification.EventOrderByCreatedAt, + Order: sortx.OrderDesc, + }) + if err != nil { + return fmt.Errorf("failed to list events: %w", err) + } + + createEventInput := createBalanceThresholdEventInput{ + Snapshot: balSnapshot, + DedupeHash: periodDedupeHash, + Threshold: *threshold, + RuleID: rule.ID, + } + + if len(lastEvents.Items) == 0 { + // we need to trigger the event, as we have hit a threshold, and have no previous event + return b.createEvent(ctx, createEventInput) + } + + lastEvent := lastEvents.Items[0] + + if lastEvent.Payload.Type != notification.EventTypeBalanceThreshold { + // This should never happen, but let's log it and trigger the event, so that we have a better reference point + // in place + b.Logger.Error("last event is not a balance threshold event", slog.String("event_id", lastEvent.ID)) + return b.createEvent(ctx, createEventInput) + } + + lastEventActualValue, err := getBalanceThreshold( + lastEvent.Payload.BalanceThreshold.Threshold, + lastEvent.Payload.BalanceThreshold.Value) + if err != nil { + if err == ErrNoBalanceAvailable { + // In case there are no grants, percentage all percentage rules would match, so let's instead + // wait until we have some credits to calculate the actual value + b.Logger.Warn("no balance available skipping event creation", "last_event_id", lastEvent.ID) + return nil + } + return fmt.Errorf("failed to calculate actual value from last event: %w", err) + } + + if lastEventActualValue.BalanceThreshold != *threshold { + // The last event was triggered by a different threshold, so we need to trigger a new event + return b.createEvent(ctx, createEventInput) + } + + return nil +} + +type createBalanceThresholdEventInput struct { + Snapshot snapshot.SnapshotEvent + DedupeHash string + Threshold notification.BalanceThreshold + RuleID string +} + +func (b *BalanceThresholdEventHandler) createEvent(ctx context.Context, in createBalanceThresholdEventInput) error { + entitlementAPIEntity, err := entitlementdriver.Parser.ToMetered(&in.Snapshot.Entitlement) + if err != nil { + return fmt.Errorf("failed to map entitlement value to API: %w", err) + } + + annotations := notification.Annotations{ + notification.AnnotationEventSubjectKey: in.Snapshot.Subject.Key, + notification.AnnotationEventFeatureKey: in.Snapshot.Feature.Key, + notification.AnnotationEventDedupeHash: in.DedupeHash, + } + + if in.Snapshot.Subject.Id != nil && *in.Snapshot.Subject.Id != "" { + annotations[notification.AnnotationEventSubjectID] = in.Snapshot + } + + if in.Snapshot.Feature.ID != "" { + annotations[notification.AnnotationEventFeatureID] = in.Snapshot.Feature.ID + } + + _, err = b.Notification.CreateEvent(ctx, notification.CreateEventInput{ + NamespacedModel: models.NamespacedModel{ + Namespace: in.Snapshot.Entitlement.Namespace, + }, + Annotations: annotations, + Type: notification.EventTypeBalanceThreshold, + Payload: notification.EventPayload{ + EventPayloadMeta: notification.EventPayloadMeta{ + Type: notification.EventTypeBalanceThreshold, + }, + BalanceThreshold: notification.BalanceThresholdPayload{ + Entitlement: *entitlementAPIEntity, + Feature: productcatalogdriver.MapFeatureToResponse(in.Snapshot.Feature), + Subject: in.Snapshot.Subject.ToAPIModel(), + Value: (api.EntitlementValue)(*in.Snapshot.Value), + Threshold: in.Threshold, + }, + }, + RuleID: in.RuleID, + HandlerDeduplicationHash: in.DedupeHash, + }) + + return err +} + +func (b *BalanceThresholdEventHandler) isBalanceThresholdEvent(event snapshot.SnapshotEvent) bool { + if event.Entitlement.EntitlementType != entitlement.EntitlementTypeMetered { + return false + } + + // We don't care about delete events + if event.Operation != snapshot.ValueOperationUpdate { + return false + } + + // Let's validate the event value contains all the necessary fields for calculations + if event.Value == nil || event.Value.Balance == nil || event.Value.Usage == nil { + return false + } + + return true +} + +// getPeriodsDeduplicationHash generates a hash that the handler can use to deduplicate the events. Right now the hash is unique +// for a single entitlement usage period. We can use this to fetch the previous events for the same period and validate +// if we need to send a new notification. +func (b *BalanceThresholdEventHandler) getPeriodsDeduplicationHash(snapshot snapshot.SnapshotEvent, ruleID string) string { + // Note: this should not happen, but let's be safe here + currentUsagePeriod := defaultx.WithDefault( + snapshot.Entitlement.CurrentUsagePeriod, recurrence.Period{ + From: time.Time{}, + To: time.Time{}, + }) + + source := strings.Join([]string{ + ruleID, + snapshot.Namespace.ID, + currentUsagePeriod.From.UTC().Format(time.RFC3339), + currentUsagePeriod.To.UTC().Format(time.RFC3339), + snapshot.Subject.Key, + snapshot.Entitlement.ID, + snapshot.Feature.ID, + defaultx.WithDefault(snapshot.Entitlement.MeasureUsageFrom, time.Time{}).UTC().Format(time.RFC3339), + }, "/") + + h := sha256.New() + + h.Write([]byte(source)) + + bs := h.Sum(nil) + + // bsnap == balance.snapshot + // v1 == version 1 (in case we need to change the hashing strategy) + return fmt.Sprintf("bsnap_v1_%x", bs) +} + +type balanceThreshold struct { + notification.BalanceThreshold + + // NumericThreshold always contains the credit value of the threshold regardless if it's a percentage + // or a number threshold + NumericThreshold float64 +} + +func getTotalGrantsFromValue(value api.EntitlementValue) float64 { + return *value.Balance + *value.Usage - defaultx.WithDefault(value.Overage, 0) +} + +func getBalanceThreshold(threshold notification.BalanceThreshold, eValue api.EntitlementValue) (balanceThreshold, error) { + switch threshold.Type { + case api.NUMBER: + return balanceThreshold{ + BalanceThreshold: threshold, + NumericThreshold: threshold.Value, + }, nil + case api.PERCENT: + totalGrants := getTotalGrantsFromValue(eValue) + + // In case there are no grants yet, we can't calculate the actual value, we are filtering out the + // thresholds to prevent event triggering in the following scenario: + // + // - A new entitlement is created (and there are balance threshold rules active) + // - Then the granting is done as a separate step + // + // As this would mean that we would trigger a notification for the first activity for 100% + if totalGrants == 0 { + return balanceThreshold{}, ErrNoBalanceAvailable + } + + return balanceThreshold{ + BalanceThreshold: threshold, + NumericThreshold: totalGrants * threshold.Value / 100, + }, nil + + default: + return balanceThreshold{}, errors.New("unknown threshold type") + } +} + +func getHighestMatchingThreshold(thresholds []notification.BalanceThreshold, eValue snapshot.EntitlementValue) (*notification.BalanceThreshold, error) { + // Let's normalize the thresholds in a single slice with percentages already calculated + actualValues := make([]balanceThreshold, 0, len(thresholds)) + + for _, threshold := range thresholds { + actualValue, err := getBalanceThreshold(threshold, api.EntitlementValue(eValue)) + if err != nil { + if err == ErrNoBalanceAvailable { + continue + } + + return nil, err + } + + actualValues = append(actualValues, actualValue) + } + + // Now we have the actual values, let's sort by the thresholds ensuring that we have stable storing between percentages + // and numbers + + slices.SortFunc(actualValues, func(b1, b2 balanceThreshold) int { + result := cmp.Compare(b1.NumericThreshold, b2.NumericThreshold) + if result != 0 { + return result + } + + // If the actual values are the same, let's sort by the underlying representation (percentage ends up being the "bigger" one) + return cmp.Compare(b1.Type, b2.Type) + }) + + var highest *notification.BalanceThreshold + for _, threshold := range actualValues { + if threshold.NumericThreshold > *eValue.Usage { + break + } + + highest = &threshold.BalanceThreshold + } + + return highest, nil +} diff --git a/internal/notification/consumer/consumer.go b/internal/notification/consumer/consumer.go index ff07b56ed..cd27dff51 100644 --- a/internal/notification/consumer/consumer.go +++ b/internal/notification/consumer/consumer.go @@ -7,7 +7,7 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/openmeterio/openmeter/internal/entitlement/snapshot" - "github.com/openmeterio/openmeter/internal/registry" + "github.com/openmeterio/openmeter/internal/notification" "github.com/openmeterio/openmeter/internal/watermill/grouphandler" "github.com/openmeterio/openmeter/internal/watermill/router" "github.com/openmeterio/openmeter/openmeter/watermill/marshaler" @@ -16,8 +16,9 @@ import ( type Options struct { SystemEventsTopic string - Entitlement *registry.Entitlement - Router router.Options + Router router.Options + + Notification notification.Service Marshaler marshaler.Marshaler @@ -27,11 +28,20 @@ type Options struct { type Consumer struct { opts Options router *message.Router + + balanceThresholdHandler *BalanceThresholdEventHandler } func New(opts Options) (*Consumer, error) { + balanceThresholdEventHandler := &BalanceThresholdEventHandler{ + Notification: opts.Notification, + Logger: opts.Logger.WithGroup("balance_threshold_event_handler"), + } + consumer := &Consumer{ opts: opts, + + balanceThresholdHandler: balanceThresholdEventHandler, } router, err := router.NewDefaultRouter(opts.Router) @@ -49,7 +59,7 @@ func New(opts Options) (*Consumer, error) { return nil } - return consumer.handleSnapshotEvent(ctx, *event) + return consumer.balanceThresholdHandler.Handle(ctx, *event) }), ), ) @@ -60,16 +70,10 @@ func New(opts Options) (*Consumer, error) { }, nil } -func (w *Consumer) Run(ctx context.Context) error { - return w.router.Run(ctx) -} - -func (w *Consumer) Close() error { - return w.router.Close() +func (c *Consumer) Run(ctx context.Context) error { + return c.router.Run(ctx) } -func (w *Consumer) handleSnapshotEvent(_ context.Context, payload snapshot.SnapshotEvent) error { - w.opts.Logger.Info("handling entitlement snapshot event", slog.String("entitlement_id", payload.Entitlement.ID)) - - return nil +func (c *Consumer) Close() error { + return c.router.Close() } diff --git a/internal/notification/event.go b/internal/notification/event.go index 36d188812..69e295663 100644 --- a/internal/notification/event.go +++ b/internal/notification/event.go @@ -34,6 +34,8 @@ type Event struct { Payload EventPayload `json:"payload"` // Rule defines the notification Rule that generated this Event. Rule Rule `json:"rule"` + // DeduplicationHash is a hash that the handler can use to deduplicate events if needed + HandlerDeduplicationHash string `json:"-"` } func (e Event) AsNotificationEvent() (api.NotificationEvent, error) { @@ -233,6 +235,8 @@ type ListEventsInput struct { Subjects []string `json:"subjects,omitempty"` Features []string `json:"features,omitempty"` + DeduplicationHashes []string `json:"deduplicationHashes,omitempty"` + DeliveryStatusStates []EventDeliveryStatusState `json:"deliveryStatusStates,omitempty"` OrderBy api.ListNotificationEventsParamsOrderBy @@ -295,6 +299,8 @@ type CreateEventInput struct { Payload EventPayload `json:"payload"` // RuleID defines the notification Rule that generated this Event. RuleID string `json:"ruleId"` + // HandlerDeduplicationHash is a hash that the handler can use to deduplicate events if needed + HandlerDeduplicationHash string `json:"handlerDeduplicationHash"` } func (i CreateEventInput) Validate(ctx context.Context, service Service) error { diff --git a/internal/notification/repository/repository.go b/internal/notification/repository/repository.go index f87345d58..8f467e954 100644 --- a/internal/notification/repository/repository.go +++ b/internal/notification/repository/repository.go @@ -281,6 +281,10 @@ func (r repository) ListRules(ctx context.Context, params notification.ListRules query = query.Where(ruledb.Disabled(false)) } + if len(params.Types) > 0 { + query = query.Where(ruledb.TypeIn(params.Types...)) + } + query = query.WithChannels() order := entutils.GetOrdering(sortx.OrderDefault) @@ -477,6 +481,12 @@ func (r repository) ListEvents(ctx context.Context, params notification.ListEven query = query.Where(eventdb.CreatedAtLTE(params.To.UTC())) } + if len(params.DeduplicationHashes) > 0 { + query = query.Where( + entutils.JSONBIn(eventdb.FieldAnnotations, notification.AnnotationEventDedupeHash, params.DeduplicationHashes), + ) + } + // Eager load DeliveryStatus, Rules (including Channels) if len(params.DeliveryStatusStates) > 0 { query = query.WithDeliveryStatuses(func(query *entdb.NotificationEventDeliveryStatusQuery) { diff --git a/internal/notification/rule.go b/internal/notification/rule.go index 123bf6e1a..b5c1268fb 100644 --- a/internal/notification/rule.go +++ b/internal/notification/rule.go @@ -131,6 +131,16 @@ func (r Rule) Validate(ctx context.Context, service Service) error { return nil } +func (r Rule) HasEnabledChannels() bool { + for _, channel := range r.Channels { + if !channel.Disabled { + return true + } + } + + return false +} + const ( RuleTypeBalanceThreshold = RuleType(api.EntitlementsBalanceThreshold) ) @@ -263,6 +273,7 @@ type ListRulesInput struct { Namespaces []string Rules []string IncludeDisabled bool + Types []RuleType OrderBy api.ListNotificationRulesParamsOrderBy Order sortx.Order diff --git a/internal/productcatalog/driver/feature.go b/internal/productcatalog/driver/feature.go index e8237391f..2aa5e15f2 100644 --- a/internal/productcatalog/driver/feature.go +++ b/internal/productcatalog/driver/feature.go @@ -187,7 +187,7 @@ func (h *featureHandlers) ListFeatures() ListFeaturesHandler { mapped := make([]api.Feature, 0, len(paged.Items)) for _, f := range paged.Items { - mapped = append(mapped, MaptFeatureToResponse(f)) + mapped = append(mapped, MapFeatureToResponse(f)) } if params.Page.IsZero() { diff --git a/internal/productcatalog/driver/parser.go b/internal/productcatalog/driver/parser.go index efeeae94d..827cfe151 100644 --- a/internal/productcatalog/driver/parser.go +++ b/internal/productcatalog/driver/parser.go @@ -6,7 +6,7 @@ import ( "github.com/openmeterio/openmeter/pkg/convert" ) -func MaptFeatureToResponse(f productcatalog.Feature) api.Feature { +func MapFeatureToResponse(f productcatalog.Feature) api.Feature { return api.Feature{ CreatedAt: &f.CreatedAt, DeletedAt: nil, diff --git a/test/notification/consumer_balance.go b/test/notification/consumer_balance.go new file mode 100644 index 000000000..88cf4c853 --- /dev/null +++ b/test/notification/consumer_balance.go @@ -0,0 +1,511 @@ +package notification + +import ( + "context" + "log/slog" + "testing" + "time" + + "github.com/oklog/ulid/v2" + "github.com/stretchr/testify/require" + + "github.com/openmeterio/openmeter/api" + "github.com/openmeterio/openmeter/internal/entitlement" + "github.com/openmeterio/openmeter/internal/entitlement/snapshot" + eventmodels "github.com/openmeterio/openmeter/internal/event/models" + "github.com/openmeterio/openmeter/internal/notification" + "github.com/openmeterio/openmeter/internal/notification/consumer" + "github.com/openmeterio/openmeter/internal/productcatalog" + "github.com/openmeterio/openmeter/pkg/convert" + "github.com/openmeterio/openmeter/pkg/models" + "github.com/openmeterio/openmeter/pkg/recurrence" + "github.com/openmeterio/openmeter/pkg/sortx" +) + +type BalanceNotificaiontHandlerTestSuite struct { + Env TestEnv + + channel notification.Channel + rule notification.Rule + feature productcatalog.Feature + handler consumer.BalanceThresholdEventHandler + namespace string +} + +var ( + TestEntitlementCurrentUsagePeriod = recurrence.Period{ + From: time.Now().Add(-time.Hour), + To: time.Now().Add(24 * time.Hour), + } + TestEntitlementUsagePeriod = entitlement.UsagePeriod{ + Interval: recurrence.RecurrencePeriodDaily, + Anchor: TestEntitlementCurrentUsagePeriod.From, + } + TestEntitlementID = "test-entitlement-id" +) + +type BalanceSnapshotEventInput struct { + Feature productcatalog.Feature + Value snapshot.EntitlementValue + Namespace string +} + +func NewBalanceSnapshotEvent(in BalanceSnapshotEventInput) snapshot.SnapshotEvent { + return snapshot.SnapshotEvent{ + Entitlement: entitlement.Entitlement{ + GenericProperties: entitlement.GenericProperties{ + NamespacedModel: models.NamespacedModel{ + Namespace: in.Namespace, + }, + ID: TestEntitlementID, + FeatureID: in.Feature.ID, + FeatureKey: in.Feature.Key, + SubjectKey: TestSubjectKey, + EntitlementType: entitlement.EntitlementTypeMetered, + + UsagePeriod: &TestEntitlementUsagePeriod, + CurrentUsagePeriod: &TestEntitlementCurrentUsagePeriod, + }, + MeasureUsageFrom: &TestEntitlementCurrentUsagePeriod.From, + IsSoftLimit: convert.ToPointer(true), + LastReset: &TestEntitlementCurrentUsagePeriod.From, + }, + Namespace: eventmodels.NamespaceID{ + ID: in.Namespace, + }, + Subject: eventmodels.Subject{ + Key: TestSubjectKey, + }, + Feature: in.Feature, + Operation: snapshot.ValueOperationUpdate, + CalculatedAt: convert.ToPointer(time.Now()), + Value: &in.Value, + CurrentUsagePeriod: &TestEntitlementCurrentUsagePeriod, + } +} + +// setupNamespace can be used to set up an independent namespace for testing, it contains a single +// feature and rule with a channel. For more complex scenarios, additional setup might be required. +func (s *BalanceNotificaiontHandlerTestSuite) setupNamespace(ctx context.Context, t *testing.T) { + t.Helper() + + s.namespace = ulid.Make().String() + + service := s.Env.Notification() + + meter, err := s.Env.Meter().GetMeterByIDOrSlug(ctx, s.namespace, TestMeterSlug) + require.NoError(t, err, "Getting meter must not return error") + + s.feature, err = s.Env.Feature().CreateFeature(ctx, productcatalog.CreateFeatureInputs{ + Name: TestFeatureName, + Key: TestFeatureKey, + Namespace: s.namespace, + MeterSlug: convert.ToPointer(meter.Slug), + MeterGroupByFilters: meter.GroupBy, + }) + require.NoError(t, err, "Creating feature must not return error") + + input := NewCreateChannelInput("NotificationRuleTest") + input.Namespace = s.namespace + + channel, err := service.CreateChannel(ctx, input) + require.NoError(t, err, "Creating channel must not return error") + require.NotNil(t, channel, "Channel must not be nil") + + s.channel = *channel + + ruleInput := NewCreateRuleInput("TestRuleForNotificationWorker", s.channel.ID) + ruleInput.Namespace = s.namespace + + rule, err := service.CreateRule(ctx, ruleInput) + require.NoError(t, err, "Creating rule must not return error") + require.NotNil(t, rule, "Rule must not be nil") + + s.rule = *rule + s.rule.CreatedAt = s.rule.CreatedAt.Truncate(time.Microsecond) + s.rule.UpdatedAt = s.rule.UpdatedAt.Truncate(time.Microsecond) + + s.handler = consumer.BalanceThresholdEventHandler{ + Notification: service, + Logger: slog.Default(), + } +} + +func (s *BalanceNotificaiontHandlerTestSuite) TestGrantingFlow(ctx context.Context, t *testing.T) { + s.setupNamespace(ctx, t) + + service := s.Env.Notification() + + // Step 1: The current usage is less than the thresholds + snapshotEvent := NewBalanceSnapshotEvent(BalanceSnapshotEventInput{ + Feature: s.feature, + Value: snapshot.EntitlementValue{ + Balance: convert.ToPointer(100.0), + Usage: convert.ToPointer(50.0), + }, + Namespace: s.namespace, + }) + + err := s.handler.Handle(ctx, snapshotEvent) + require.NoError(t, err) + + events, err := service.ListEvents(ctx, notification.ListEventsInput{ + Namespaces: []string{s.namespace}, + }) + require.NoError(t, err, "Listing events must not return error") + require.Empty(t, events.Items, "No events should be created") + + // The rule has the following thresholds: + // - 95% of the balance + // - 1000 units + + // Step 2: The current usage is greater than the balance threshold 95% (balance = 4, usage = 96) + + snapshotEvent = NewBalanceSnapshotEvent(BalanceSnapshotEventInput{ + Feature: s.feature, + Value: snapshot.EntitlementValue{ + Balance: convert.ToPointer(4.0), + Usage: convert.ToPointer(96.0), + }, + Namespace: s.namespace, + }) + + require.NoError(t, s.handler.Handle(ctx, snapshotEvent), "Handling event must not return error") + + events, err = service.ListEvents(ctx, notification.ListEventsInput{ + Namespaces: []string{s.namespace}, + }) + require.NoError(t, err, "Listing events must not return error") + require.Len(t, events.Items, 1, "One event should be created") + + // Let's sanity check the resulting event + event := events.Items[0] + require.Equal(t, s.rule.ID, event.Rule.ID, "Event must be associated with the rule") + require.Equal(t, notification.EventTypeBalanceThreshold, event.Payload.Type, "Event must be of type balance threshold") + require.Equal(t, TestEntitlementID, *event.Payload.BalanceThreshold.Entitlement.Id, "Event must be associated with the entitlement") + require.NotEmpty(t, event.Annotations[notification.AnnotationEventDedupeHash], "Event must have a deduplication hash") + require.NoError(t, event.Payload.BalanceThreshold.Validate(), "Event must be valid") + require.Equal(t, api.NotificationRuleBalanceThresholdValue{ + Value: 95, + Type: api.PERCENT, + }, event.Payload.BalanceThreshold.Threshold) + + // Step 3: Additional events hitting the same 95% threshold should not create new events + snapshotEvent = NewBalanceSnapshotEvent(BalanceSnapshotEventInput{ + Feature: s.feature, + Value: snapshot.EntitlementValue{ + Balance: convert.ToPointer(3.0), + Usage: convert.ToPointer(97.0), + }, + Namespace: s.namespace, + }) + + require.NoError(t, s.handler.Handle(ctx, snapshotEvent), "Handling event must not return error") + + events, err = service.ListEvents(ctx, notification.ListEventsInput{ + Namespaces: []string{s.namespace}, + }) + require.NoError(t, err, "Listing events must not return error") + require.Len(t, events.Items, 1, "One event should be created") + + // Step 4: The user receives +2000 credits, given that current usage doesn't exceed any threshold + // we are not creating additional notifications + + snapshotEvent = NewBalanceSnapshotEvent(BalanceSnapshotEventInput{ + Feature: s.feature, + Value: snapshot.EntitlementValue{ + Balance: convert.ToPointer(2004.0), + Usage: convert.ToPointer(96.0), + }, + Namespace: s.namespace, + }) + + require.NoError(t, s.handler.Handle(ctx, snapshotEvent), "Handling event must not return error") + + events, err = service.ListEvents(ctx, notification.ListEventsInput{ + Namespaces: []string{s.namespace}, + }) + require.NoError(t, err, "Listing events must not return error") + require.Len(t, events.Items, 1, "One event should be created") + + // Step 5: The user spends 1000 credits, hitting the 1000 units threshold + snapshotEvent = NewBalanceSnapshotEvent(BalanceSnapshotEventInput{ + Feature: s.feature, + Value: snapshot.EntitlementValue{ + Balance: convert.ToPointer(1004.0), + Usage: convert.ToPointer(1096.0), + }, + Namespace: s.namespace, + }) + + require.NoError(t, s.handler.Handle(ctx, snapshotEvent), "Handling event must not return error") + + events, err = service.ListEvents(ctx, notification.ListEventsInput{ + Namespaces: []string{s.namespace}, + OrderBy: notification.EventOrderByCreatedAt, + Order: sortx.OrderDesc, + }) + require.NoError(t, err, "Listing events must not return error") + require.Len(t, events.Items, 2, "Two events should be created") + + // Let's sanity check the resulting event + event = events.Items[0] + require.Equal(t, notification.EventTypeBalanceThreshold, event.Payload.Type, "Event must be of type balance threshold") + require.NotEmpty(t, event.Annotations[notification.AnnotationEventDedupeHash], "Event must have a deduplication hash") + require.Equal(t, api.NotificationRuleBalanceThresholdValue{ + Value: 1000, + Type: api.NUMBER, + }, event.Payload.BalanceThreshold.Threshold) + + // Step 6: The user hits the 95% threshold again + snapshotEvent = NewBalanceSnapshotEvent(BalanceSnapshotEventInput{ + Feature: s.feature, + Value: snapshot.EntitlementValue{ + Balance: convert.ToPointer(4.0), + Usage: convert.ToPointer(2096.0), + }, + Namespace: s.namespace, + }) + + require.NoError(t, s.handler.Handle(ctx, snapshotEvent), "Handling event must not return error") + events, err = service.ListEvents(ctx, notification.ListEventsInput{ + Namespaces: []string{s.namespace}, + OrderBy: notification.EventOrderByCreatedAt, + Order: sortx.OrderDesc, + }) + require.NoError(t, err, "Listing events must not return error") + require.Len(t, events.Items, 3, "Three events should be created") + + // Let's sanity check the resulting event + event = events.Items[0] + require.Equal(t, notification.EventTypeBalanceThreshold, event.Payload.Type, "Event must be of type balance threshold") + require.Equal(t, api.NotificationRuleBalanceThresholdValue{ + Value: 95, + Type: api.PERCENT, + }, event.Payload.BalanceThreshold.Threshold) + + // Step 7: The user gets +1000 credits, given that the 95% threshold is no longer valid + // a new event should not be created for the 1000 units threshold + snapshotEvent = NewBalanceSnapshotEvent(BalanceSnapshotEventInput{ + Feature: s.feature, + Value: snapshot.EntitlementValue{ + Balance: convert.ToPointer(1004.0), + Usage: convert.ToPointer(2096.0), + }, + Namespace: s.namespace, + }) + + require.NoError(t, s.handler.Handle(ctx, snapshotEvent), "Handling event must not return error") + + events, err = service.ListEvents(ctx, notification.ListEventsInput{ + Namespaces: []string{s.namespace}, + OrderBy: notification.EventOrderByCreatedAt, + Order: sortx.OrderDesc, + }) + + require.NoError(t, err, "Listing events must not return error") + require.Len(t, events.Items, 4, "Four events should be created") + + // Let's sanity check the resulting event + event = events.Items[0] + require.Equal(t, notification.EventTypeBalanceThreshold, event.Payload.Type, "Event must be of type balance threshold") + require.Equal(t, api.NotificationRuleBalanceThresholdValue{ + Value: 1000, + Type: api.NUMBER, + }, event.Payload.BalanceThreshold.Threshold) + + // Step 8: The entitlement gets reset, no events should be created + + newUsagePeriod := recurrence.Period{ + From: TestEntitlementCurrentUsagePeriod.To, + To: TestEntitlementCurrentUsagePeriod.To.Add(24 * time.Hour), + } + + snapshotEvent = NewBalanceSnapshotEvent(BalanceSnapshotEventInput{ + Feature: s.feature, + Value: snapshot.EntitlementValue{ + Balance: convert.ToPointer(100.0), + Usage: convert.ToPointer(0.0), + }, + Namespace: s.namespace, + }) + snapshotEvent.Entitlement.CurrentUsagePeriod = &newUsagePeriod + snapshotEvent.Entitlement.LastReset = &newUsagePeriod.From + snapshotEvent.CurrentUsagePeriod = &newUsagePeriod + + require.NoError(t, s.handler.Handle(ctx, snapshotEvent), "Handling event must not return error") + + events, err = service.ListEvents(ctx, notification.ListEventsInput{ + Namespaces: []string{s.namespace}, + OrderBy: notification.EventOrderByCreatedAt, + Order: sortx.OrderDesc, + }) + + require.NoError(t, err, "Listing events must not return error") + require.Len(t, events.Items, 4, "Four events should be created") + + // Step 9: The user hits the 95% threshold again after the reset, new event should be created + snapshotEvent = NewBalanceSnapshotEvent(BalanceSnapshotEventInput{ + Feature: s.feature, + Value: snapshot.EntitlementValue{ + Balance: convert.ToPointer(1.0), + Usage: convert.ToPointer(99.0), + }, + Namespace: s.namespace, + }) + snapshotEvent.Entitlement.CurrentUsagePeriod = &newUsagePeriod + snapshotEvent.Entitlement.LastReset = &newUsagePeriod.From + snapshotEvent.CurrentUsagePeriod = &newUsagePeriod + + require.NoError(t, s.handler.Handle(ctx, snapshotEvent), "Handling event must not return error") + + events, err = service.ListEvents(ctx, notification.ListEventsInput{ + Namespaces: []string{s.namespace}, + OrderBy: notification.EventOrderByCreatedAt, + Order: sortx.OrderDesc, + }) + + require.NoError(t, err, "Listing events must not return error") + require.Len(t, events.Items, 5, "Five events should be created") + + // Let's sanity check the resulting event + event = events.Items[0] + require.Equal(t, notification.EventTypeBalanceThreshold, event.Payload.Type, "Event must be of type balance threshold") + require.Equal(t, api.NotificationRuleBalanceThresholdValue{ + Value: 95, + Type: api.PERCENT, + }, event.Payload.BalanceThreshold.Threshold) +} + +const ( + TestFeature2Name = "TestFeature2" + TestFeature2Key = "test-feature-2" + + TestFeature3Name = "TestFeature3" + TestFeature3Key = "test-feature-3" +) + +func (s *BalanceNotificaiontHandlerTestSuite) TestFeatureFiltering(ctx context.Context, t *testing.T) { + s.setupNamespace(ctx, t) + + service := s.Env.Notification() + + meter, err := s.Env.Meter().GetMeterByIDOrSlug(ctx, s.namespace, TestMeterSlug) + require.NoError(t, err, "Getting meter must not return error") + + // let's setup two more features (we should use different meters but for the sake of simplicity we are using the same one) + feature1 := s.feature + require.NotNil(t, feature1, "Feature must not be nil") + + feature2, err := s.Env.Feature().CreateFeature(ctx, productcatalog.CreateFeatureInputs{ + Name: TestFeature2Name, + Key: TestFeature2Key, + Namespace: s.namespace, + MeterSlug: convert.ToPointer(meter.Slug), + MeterGroupByFilters: meter.GroupBy, + }) + require.NoError(t, err, "Creating feature must not return error") + + feature3, err := s.Env.Feature().CreateFeature(ctx, productcatalog.CreateFeatureInputs{ + Name: TestFeature3Name, + Key: TestFeature3Key, + Namespace: s.namespace, + MeterSlug: convert.ToPointer(meter.Slug), + MeterGroupByFilters: meter.GroupBy, + }) + require.NoError(t, err, "Creating feature must not return error") + require.NotNil(t, feature3, "Feature must not be nil") + + // Let's create a few rules to test feature filtering + // wildcard rule without feature filtering + rule1 := s.rule + require.NotNil(t, rule1, "Rule must not be nil") + rule1.CreatedAt = rule1.CreatedAt.Truncate(time.Microsecond) + + // rule with feature filtering using feature key + ruleInput := NewCreateRuleInput("TestRule2ForNotificationWorker", s.channel.ID) + ruleInput.Namespace = s.namespace + ruleInput.Config.BalanceThreshold.Features = []string{feature2.Key} + + rule2, err := service.CreateRule(ctx, ruleInput) + require.NoError(t, err, "Creating rule must not return error") + require.NotNil(t, rule2, "Rule must not be nil") + rule2.CreatedAt = rule2.CreatedAt.Truncate(time.Microsecond) + rule2.UpdatedAt = rule2.UpdatedAt.Truncate(time.Microsecond) + + // rule with feature filtering using feature key + ruleInput = NewCreateRuleInput("TestRule3ForNotificationWorker", s.channel.ID) + ruleInput.Namespace = s.namespace + ruleInput.Config.BalanceThreshold.Features = []string{feature2.ID} + + rule3, err := service.CreateRule(ctx, ruleInput) + require.NoError(t, err, "Creating rule must not return error") + require.NotNil(t, rule3, "Rule must not be nil") + rule3.CreatedAt = rule3.CreatedAt.Truncate(time.Microsecond) + rule3.UpdatedAt = rule3.UpdatedAt.Truncate(time.Microsecond) + + // Step 1: A new event is created for feature 3, which should be only matched by + // rule 1 (wildcard rule) + snapshotEvent := NewBalanceSnapshotEvent(BalanceSnapshotEventInput{ + Feature: feature3, + Value: snapshot.EntitlementValue{ + Balance: convert.ToPointer(1.0), + Usage: convert.ToPointer(10001.0), + }, + Namespace: s.namespace, + }) + + require.NoError(t, s.handler.Handle(ctx, snapshotEvent), "Handling event must not return error") + + events, err := service.ListEvents(ctx, notification.ListEventsInput{ + Namespaces: []string{s.namespace}, + OrderBy: notification.EventOrderByCreatedAt, + Order: sortx.OrderDesc, + }) + + require.NoError(t, err, "Listing events must not return error") + require.Len(t, events.Items, 1, "Event is created") + + // Let's sanity check the resulting event + event := events.Items[0] + require.Equal(t, notification.EventTypeBalanceThreshold, event.Payload.Type, "Event must be of type balance threshold") + require.Equal(t, rule1, event.Rule, "Event must be associated with the rule") + + // Step 2: A new event is created for feature 2, which should be matched by all rules: + // - rule 1 (wildcard rule) + // - rule 2 (feature key filtering) + // - rule 3 (feature ID filtering) + + snapshotEvent = NewBalanceSnapshotEvent(BalanceSnapshotEventInput{ + Feature: feature2, + Value: snapshot.EntitlementValue{ + Balance: convert.ToPointer(1.0), + Usage: convert.ToPointer(10001.0), + }, + Namespace: s.namespace, + }) + + require.NoError(t, s.handler.Handle(ctx, snapshotEvent), "Handling event must not return error") + + events, err = service.ListEvents(ctx, notification.ListEventsInput{ + Namespaces: []string{s.namespace}, + OrderBy: notification.EventOrderByCreatedAt, + Order: sortx.OrderDesc, + }) + + require.NoError(t, err, "Listing events must not return error") + require.Len(t, events.Items, 4, "Events are created") + + // Let's sanity check the resulting events + eventsCreated := events.Items[0:3] + affectedRules := []notification.Rule{} + for _, event := range eventsCreated { + require.Equal(t, notification.EventTypeBalanceThreshold, event.Payload.Type, "Event must be of type balance threshold") + + affectedRules = append(affectedRules, event.Rule) + } + + require.Contains(t, affectedRules, rule1, "Event must be associated with the rule1") + require.Contains(t, affectedRules, *rule2, "Event must be associated with the rule2") + require.Contains(t, affectedRules, *rule3, "Event must be associated with the rule3") +} diff --git a/test/notification/notification_test.go b/test/notification/notification_test.go index 6c88940e9..cc5a6a3be 100644 --- a/test/notification/notification_test.go +++ b/test/notification/notification_test.go @@ -153,4 +153,19 @@ func TestNotification(t *testing.T) { testSuite.TestFilterEventBySubject(t) }) }) + + // Test suite covering the consumer that listens to events (balance) + t.Run("Consumer", func(t *testing.T) { + testSuite := BalanceNotificaiontHandlerTestSuite{ + Env: env, + } + + t.Run("TestGrantingFlow", func(t *testing.T) { + testSuite.TestGrantingFlow(ctx, t) + }) + + t.Run("TestFeatureFiltering", func(t *testing.T) { + testSuite.TestFeatureFiltering(ctx, t) + }) + }) }