Skip to content

Commit 10ed8cc

Browse files
committed
feat: Delivery timeout
1 parent 69992e3 commit 10ed8cc

File tree

6 files changed

+99
-5
lines changed

6 files changed

+99
-5
lines changed

cmd/e2e/configs/basic.go

+1
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ func Basic(t *testing.T) *config.Config {
6767
LogMaxConcurrency: 3,
6868
RetryIntervalSeconds: 1,
6969
RetryMaxCount: 3,
70+
DeliveryTimeoutSeconds: 5,
7071
LogBatcherDelayThresholdSeconds: 1,
7172
LogBatcherItemCountThreshold: 100,
7273
MaxDestinationsPerTenant: 20,

internal/config/config.go

+3
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type Config struct {
4343
LogMaxConcurrency int
4444
RetryIntervalSeconds int
4545
RetryMaxCount int
46+
DeliveryTimeoutSeconds int
4647
LogBatcherDelayThresholdSeconds int
4748
LogBatcherItemCountThreshold int
4849
DestinationMetadataPath string
@@ -83,6 +84,7 @@ var defaultConfig = map[string]any{
8384
"LOGMQ_MAX_CONCURRENCY": 1,
8485
"RETRY_INTERVAL_SECONDS": 30,
8586
"MAX_RETRY_COUNT": 10,
87+
"DELIVERY_TIMEOUT_SECONDS": 5,
8688
"LOG_BATCHER_DELAY_THRESHOLD_SECONDS": 5,
8789
"LOG_BATCHER_ITEM_COUNT_THRESHOLD": 100,
8890
"MAX_DESTINATIONS_PER_TENANT": 20,
@@ -198,6 +200,7 @@ func Parse(flags Flags) (*Config, error) {
198200
LogMaxConcurrency: mustInt(viper, "LOGMQ_MAX_CONCURRENCY"),
199201
RetryIntervalSeconds: mustInt(viper, "RETRY_INTERVAL_SECONDS"),
200202
RetryMaxCount: mustInt(viper, "MAX_RETRY_COUNT"),
203+
DeliveryTimeoutSeconds: mustInt(viper, "DELIVERY_TIMEOUT_SECONDS"),
201204
LogBatcherDelayThresholdSeconds: mustInt(viper, "LOG_BATCHER_DELAY_THRESHOLD_SECONDS"),
202205
LogBatcherItemCountThreshold: mustInt(viper, "LOG_BATCHER_ITEM_COUNT_THRESHOLD"),
203206
DestinationMetadataPath: viper.GetString("DESTINATION_METADATA_PATH"),

internal/destregistry/registry.go

+23-1
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,14 @@ type registry struct {
5555
metadata map[string]*metadata.ProviderMetadata
5656
providers map[string]Provider
5757
publishers *lru.Cache[string, Publisher]
58+
config Config
5859
}
5960

6061
type Config struct {
6162
DestinationMetadataPath string
6263
PublisherCacheSize int
6364
PublisherTTL time.Duration
65+
DeliveryTimeout time.Duration
6466
}
6567

6668
func NewRegistry(cfg *Config, logger *otelzap.Logger) *registry {
@@ -70,6 +72,9 @@ func NewRegistry(cfg *Config, logger *otelzap.Logger) *registry {
7072
if cfg.PublisherTTL == 0 {
7173
cfg.PublisherTTL = defaultPublisherTTL
7274
}
75+
if cfg.DeliveryTimeout == 0 {
76+
cfg.DeliveryTimeout = defaultDeliveryTimeout
77+
}
7378

7479
onEvict := func(key string, p Publisher) {
7580
if err := p.Close(); err != nil {
@@ -87,6 +92,7 @@ func NewRegistry(cfg *Config, logger *otelzap.Logger) *registry {
8792
metadata: make(map[string]*metadata.ProviderMetadata),
8893
providers: make(map[string]Provider),
8994
publishers: cache,
95+
config: *cfg,
9096
}
9197
}
9298

@@ -115,11 +121,26 @@ func (r *registry) PublishEvent(ctx context.Context, destination *models.Destina
115121
if err != nil {
116122
return err
117123
}
118-
if err := publisher.Publish(ctx, event); err != nil {
124+
125+
// Create a new context with timeout
126+
timeoutCtx, cancel := context.WithTimeout(ctx, r.config.DeliveryTimeout)
127+
defer cancel()
128+
129+
if err := publisher.Publish(timeoutCtx, event); err != nil {
119130
var publishErr *ErrDestinationPublishAttempt
120131
if errors.As(err, &publishErr) {
121132
return publishErr
122133
}
134+
if errors.Is(err, context.DeadlineExceeded) {
135+
return &ErrDestinationPublishAttempt{
136+
Err: err,
137+
Provider: destination.Type,
138+
Data: map[string]interface{}{
139+
"error": "timeout",
140+
"timeout": r.config.DeliveryTimeout.String(),
141+
},
142+
}
143+
}
123144
return &ErrUnexpectedPublishError{Err: err}
124145
}
125146
return nil
@@ -203,4 +224,5 @@ func (r *registry) ObfuscateDestination(destination *models.Destination) (*model
203224
var (
204225
defaultPublisherCacheSize = 10000
205226
defaultPublisherTTL = time.Minute
227+
defaultDeliveryTimeout = 5 * time.Second
206228
)

internal/destregistry/registry_test.go

+70-4
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ import (
1818
type mockProvider struct {
1919
createCount int32
2020
*destregistry.BaseProvider
21+
publishDelay time.Duration
2122
}
2223

2324
type mockPublisher struct {
24-
id int64
25-
closed bool
25+
id int64
26+
closed bool
27+
publishDelay time.Duration
2628
}
2729

2830
var mockPublisherID int64
@@ -90,10 +92,20 @@ func (p *mockProvider) Validate(ctx context.Context, dest *models.Destination) e
9092

9193
func (p *mockProvider) CreatePublisher(ctx context.Context, dest *models.Destination) (destregistry.Publisher, error) {
9294
atomic.AddInt32(&p.createCount, 1)
93-
return newMockPublisher(), nil
95+
pub := newMockPublisher()
96+
pub.publishDelay = p.publishDelay
97+
return pub, nil
98+
}
99+
100+
func (p *mockPublisher) Publish(ctx context.Context, event *models.Event) error {
101+
select {
102+
case <-time.After(p.publishDelay):
103+
return nil
104+
case <-ctx.Done():
105+
return ctx.Err()
106+
}
94107
}
95108

96-
func (p *mockPublisher) Publish(ctx context.Context, event *models.Event) error { return nil }
97109
func (p *mockPublisher) Close() error {
98110
p.closed = true
99111
return nil
@@ -576,3 +588,57 @@ func TestObfuscateDestination(t *testing.T) {
576588
assert.Equal(t, "****", obfuscated.Credentials["token"]) // 3 chars
577589
assert.Equal(t, "****", obfuscated.Credentials["code"]) // 4 chars
578590
}
591+
592+
func TestPublishEventTimeout(t *testing.T) {
593+
timeout := 100 * time.Millisecond
594+
logger := testutil.CreateTestLogger(t)
595+
596+
t.Run("should not return timeout error when publish completes within timeout", func(t *testing.T) {
597+
registry := destregistry.NewRegistry(&destregistry.Config{
598+
DeliveryTimeout: timeout,
599+
}, logger)
600+
601+
provider, err := newMockProvider()
602+
require.NoError(t, err)
603+
provider.publishDelay = timeout / 2
604+
err = registry.RegisterProvider("test", provider)
605+
require.NoError(t, err)
606+
607+
destination := &models.Destination{
608+
Type: "test",
609+
}
610+
event := &models.Event{}
611+
612+
err = registry.PublishEvent(context.Background(), destination, event)
613+
assert.NoError(t, err)
614+
})
615+
616+
t.Run("should return timeout error when publish exceeds timeout", func(t *testing.T) {
617+
registry := destregistry.NewRegistry(&destregistry.Config{
618+
DeliveryTimeout: timeout,
619+
}, logger)
620+
621+
provider, err := newMockProvider()
622+
require.NoError(t, err)
623+
provider.publishDelay = timeout * 2
624+
err = registry.RegisterProvider("test", provider)
625+
require.NoError(t, err)
626+
627+
destination := &models.Destination{
628+
Type: "test",
629+
}
630+
event := &models.Event{}
631+
632+
err = registry.PublishEvent(context.Background(), destination, event)
633+
assert.Error(t, err)
634+
635+
var publishErr *destregistry.ErrDestinationPublishAttempt
636+
assert.ErrorAs(t, err, &publishErr)
637+
assert.Equal(t, "test", publishErr.Provider)
638+
639+
data, ok := publishErr.Data.(map[string]interface{})
640+
assert.True(t, ok, "Expected Data to be map[string]interface{}")
641+
assert.Equal(t, "timeout", data["error"])
642+
assert.Equal(t, timeout.String(), data["timeout"])
643+
})
644+
}

internal/services/api/api.go

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func NewService(ctx context.Context, wg *sync.WaitGroup, cfg *config.Config, log
4646

4747
registry := destregistry.NewRegistry(&destregistry.Config{
4848
DestinationMetadataPath: cfg.DestinationMetadataPath,
49+
DeliveryTimeout: time.Duration(cfg.DeliveryTimeoutSeconds) * time.Second,
4950
}, logger)
5051
if err := destregistrydefault.RegisterDefault(registry, destregistrydefault.RegisterDefaultDestinationOptions{
5152
Webhook: &destregistrydefault.DestWebhookConfig{

internal/services/delivery/delivery.go

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func NewService(ctx context.Context,
6464
if handler == nil {
6565
registry := destregistry.NewRegistry(&destregistry.Config{
6666
DestinationMetadataPath: cfg.DestinationMetadataPath,
67+
DeliveryTimeout: time.Duration(cfg.DeliveryTimeoutSeconds) * time.Second,
6768
}, logger)
6869
if err := destregistrydefault.RegisterDefault(registry, destregistrydefault.RegisterDefaultDestinationOptions{
6970
Webhook: &destregistrydefault.DestWebhookConfig{

0 commit comments

Comments
 (0)