Skip to content

Commit

Permalink
feat: add notification handler
Browse files Browse the repository at this point in the history
  • Loading branch information
turip committed Aug 21, 2024
1 parent 738c1d8 commit ba36d72
Show file tree
Hide file tree
Showing 12 changed files with 1,129 additions and 18 deletions.
35 changes: 33 additions & 2 deletions cmd/notification-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (

"github.com/openmeterio/openmeter/config"
"github.com/openmeterio/openmeter/internal/meter"
"github.com/openmeterio/openmeter/internal/notification"
"github.com/openmeterio/openmeter/internal/notification/consumer"
notificationrepository "github.com/openmeterio/openmeter/internal/notification/repository"
notificationwebhook "github.com/openmeterio/openmeter/internal/notification/webhook"
"github.com/openmeterio/openmeter/internal/registry"
registrybuilder "github.com/openmeterio/openmeter/internal/registry/builder"
"github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector"
Expand Down Expand Up @@ -283,14 +286,42 @@ func main() {
}

// Dependencies: entitlement
entitlementConnectors := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{
entitlementConnRegistry := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{
DatabaseClient: entClient,
StreamingConnector: clickhouseStreamingConnector,
MeterRepository: meterRepository,
Logger: logger,
Publisher: eventPublisher,
})

// Dependencies: notification
notificationRepo, err := notificationrepository.New(notificationrepository.Config{
Client: entClient,
Logger: logger.WithGroup("notification.postgres"),
})
if err != nil {
logger.Error("failed to initialize notification repository", "error", err)
os.Exit(1)
}

notificationWebhook, err := notificationwebhook.New(notificationwebhook.Config{
SvixConfig: conf.Svix,
})
if err != nil {
logger.Error("failed to initialize notification repository", "error", err)
os.Exit(1)
}

notificationService, err := notification.New(notification.Config{
Repository: notificationRepo,
Webhook: notificationWebhook,
FeatureConnector: entitlementConnRegistry.Feature,
})
if err != nil {
logger.Error("failed to initialize notification service", "error", err)
os.Exit(1)
}

// Initialize consumer
consumerOptions := consumer.Options{
SystemEventsTopic: conf.Events.SystemEvents.Topic,
Expand All @@ -303,7 +334,7 @@ func main() {
},
Marshaler: eventPublisher.Marshaler(),

Entitlement: entitlementConnectors,
Notification: notificationService,

Logger: logger,
}
Expand Down
16 changes: 16 additions & 0 deletions internal/event/models/subject.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package models

import (
"errors"
"maps"
"time"

"github.com/openmeterio/openmeter/api"
"github.com/openmeterio/openmeter/pkg/convert"
)

type SubjectKeyAndID struct {
Expand Down Expand Up @@ -35,3 +39,15 @@ func (s Subject) Validate() error {

return nil
}

func (s Subject) ToAPIModel() api.Subject {
return api.Subject{
Id: s.Id,
Key: s.Key,
DisplayName: s.DisplayName,
Metadata: convert.ToPointer(maps.Clone(s.Metadata)),
CurrentPeriodStart: s.CurrentPeriodStart,
CurrentPeriodEnd: s.CurrentPeriodEnd,
StripeCustomerId: s.StripeCustomerId,
}
}
162 changes: 162 additions & 0 deletions internal/notification/consumer/balancethreshold_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package consumer

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/openmeterio/openmeter/api"
"github.com/openmeterio/openmeter/internal/entitlement/snapshot"
"github.com/openmeterio/openmeter/internal/notification"
"github.com/openmeterio/openmeter/pkg/convert"
)

func newNumericThreshold(v float64) notification.BalanceThreshold {
return notification.BalanceThreshold{
Value: v,
Type: api.NUMBER,
}
}

func newPercentThreshold(v float64) notification.BalanceThreshold {
return notification.BalanceThreshold{
Value: v,
Type: api.PERCENT,
}
}

func TestGetHighestMatchingBalanceThreshold(t *testing.T) {
tcs := []struct {
Name string
BalanceThresholds []notification.BalanceThreshold
EntitlementValue snapshot.EntitlementValue
Expect *notification.BalanceThreshold
}{
{
Name: "Numerical values only",
BalanceThresholds: []notification.BalanceThreshold{
newNumericThreshold(20),
newNumericThreshold(10),
newNumericThreshold(30),
},
EntitlementValue: snapshot.EntitlementValue{
Balance: convert.ToPointer(10.0),
Usage: convert.ToPointer(20.0),
},
// Already used 20, so the matching threshold is the 20
Expect: convert.ToPointer(newNumericThreshold(20)),
},
{
Name: "Numerical values only - 100%",
BalanceThresholds: []notification.BalanceThreshold{
newNumericThreshold(20),
newNumericThreshold(10),
newNumericThreshold(30),
},
EntitlementValue: snapshot.EntitlementValue{
Balance: convert.ToPointer(0.0),
Usage: convert.ToPointer(30.0),
},
Expect: convert.ToPointer(newNumericThreshold(30)),
},
{
Name: "Numerical values only - 100%+ with overage",
BalanceThresholds: []notification.BalanceThreshold{
newNumericThreshold(20),
newNumericThreshold(10),
newNumericThreshold(30),
},
EntitlementValue: snapshot.EntitlementValue{
Balance: convert.ToPointer(0.0),
Usage: convert.ToPointer(30.0),
Overage: convert.ToPointer(10.0),
},
Expect: convert.ToPointer(newNumericThreshold(30)),
},
{
Name: "Percentages with overage",
BalanceThresholds: []notification.BalanceThreshold{
newPercentThreshold(50),
newPercentThreshold(100),
newPercentThreshold(110),
newPercentThreshold(120),
},
EntitlementValue: snapshot.EntitlementValue{
Balance: convert.ToPointer(0.0),
Usage: convert.ToPointer(110.0),
Overage: convert.ToPointer(10.0),
},
Expect: convert.ToPointer(newPercentThreshold(110)),
},
{
Name: "Mixed values",
BalanceThresholds: []notification.BalanceThreshold{
newNumericThreshold(20),
newNumericThreshold(10),
newNumericThreshold(30),
newPercentThreshold(50),
},
EntitlementValue: snapshot.EntitlementValue{
Balance: convert.ToPointer(14.0),
Usage: convert.ToPointer(16.0),
},
Expect: convert.ToPointer(newPercentThreshold(50)),
},
// Corner cases
{
Name: "No grants",
BalanceThresholds: []notification.BalanceThreshold{
newNumericThreshold(20),
newPercentThreshold(100),
},
EntitlementValue: snapshot.EntitlementValue{
Balance: convert.ToPointer(0.0),
Usage: convert.ToPointer(0.0),
},
Expect: nil,
},
{
Name: "Last threshold is ",
BalanceThresholds: []notification.BalanceThreshold{
newNumericThreshold(20),
},
EntitlementValue: snapshot.EntitlementValue{
Balance: convert.ToPointer(0.0),
Usage: convert.ToPointer(30.0),
},
Expect: convert.ToPointer(newNumericThreshold(20)),
},
{
Name: "Same threshold in percentage and number",
BalanceThresholds: []notification.BalanceThreshold{
newNumericThreshold(15),
newPercentThreshold(50),
},
EntitlementValue: snapshot.EntitlementValue{
Balance: convert.ToPointer(14.0),
Usage: convert.ToPointer(16.0),
},
Expect: convert.ToPointer(newPercentThreshold(50)),
},
{
Name: "Exact threshold match",
BalanceThresholds: []notification.BalanceThreshold{
newNumericThreshold(15),
newPercentThreshold(50),
},
EntitlementValue: snapshot.EntitlementValue{
Balance: convert.ToPointer(15.0),
Usage: convert.ToPointer(15.0),
},
Expect: convert.ToPointer(newPercentThreshold(50)),
},
}

for _, tc := range tcs {
t.Run(tc.Name, func(t *testing.T) {
got, err := getHighestMatchingThreshold(tc.BalanceThresholds, tc.EntitlementValue)
assert.NoError(t, err)
assert.Equal(t, tc.Expect, got)
})
}
}
Loading

0 comments on commit ba36d72

Please sign in to comment.