Skip to content

Commit bd705a8

Browse files
authored
chore: Configure log batcher via config (#109)
1 parent 78551e0 commit bd705a8

File tree

2 files changed

+54
-39
lines changed

2 files changed

+54
-39
lines changed

internal/config/config.go

+41-35
Original file line numberDiff line numberDiff line change
@@ -31,34 +31,38 @@ type Config struct {
3131
PortalProxyURL string
3232
Topics []string
3333

34-
Redis *redis.RedisConfig
35-
ClickHouse *clickhouse.ClickHouseConfig
36-
OpenTelemetry *otel.OpenTelemetryConfig
37-
PublishQueueConfig *mqs.QueueConfig
38-
DeliveryQueueConfig *mqs.QueueConfig
39-
LogQueueConfig *mqs.QueueConfig
40-
PublishMaxConcurrency int
41-
DeliveryMaxConcurrency int
42-
LogMaxConcurrency int
43-
RetryIntervalSeconds int
44-
RetryMaxCount int
34+
Redis *redis.RedisConfig
35+
ClickHouse *clickhouse.ClickHouseConfig
36+
OpenTelemetry *otel.OpenTelemetryConfig
37+
PublishQueueConfig *mqs.QueueConfig
38+
DeliveryQueueConfig *mqs.QueueConfig
39+
LogQueueConfig *mqs.QueueConfig
40+
PublishMaxConcurrency int
41+
DeliveryMaxConcurrency int
42+
LogMaxConcurrency int
43+
RetryIntervalSeconds int
44+
RetryMaxCount int
45+
LogBatcherDelayThresholdSeconds int
46+
LogBatcherItemCountThreshold int
4547
}
4648

4749
var defaultConfig = map[string]any{
48-
"PORT": 3333,
49-
"REDIS_HOST": "127.0.0.1",
50-
"REDIS_PORT": 6379,
51-
"REDIS_PASSWORD": "",
52-
"REDIS_DATABASE": 0,
53-
"DELIVERY_RABBITMQ_EXCHANGE": "outpost",
54-
"DELIVERY_RABBITMQ_QUEUE": "outpost.delivery",
55-
"LOG_RABBITMQ_EXCHANGE": "outpost_logs",
56-
"LOG_RABBITMQ_QUEUE": "outpost_logs.log",
57-
"PUBLISHMQ_MAX_CONCURRENCY": 1,
58-
"DELIVERYMQ_MAX_CONCURRENCY": 1,
59-
"LOGMQ_MAX_CONCURRENCY": 1,
60-
"RETRY_INTERVAL_SECONDS": 30,
61-
"MAX_RETRY_COUNT": 10,
50+
"PORT": 3333,
51+
"REDIS_HOST": "127.0.0.1",
52+
"REDIS_PORT": 6379,
53+
"REDIS_PASSWORD": "",
54+
"REDIS_DATABASE": 0,
55+
"DELIVERY_RABBITMQ_EXCHANGE": "outpost",
56+
"DELIVERY_RABBITMQ_QUEUE": "outpost.delivery",
57+
"LOG_RABBITMQ_EXCHANGE": "outpost_logs",
58+
"LOG_RABBITMQ_QUEUE": "outpost_logs.log",
59+
"PUBLISHMQ_MAX_CONCURRENCY": 1,
60+
"DELIVERYMQ_MAX_CONCURRENCY": 1,
61+
"LOGMQ_MAX_CONCURRENCY": 1,
62+
"RETRY_INTERVAL_SECONDS": 30,
63+
"MAX_RETRY_COUNT": 10,
64+
"LOG_BATCHER_DELAY_THRESHOLD_SECONDS": 5,
65+
"LOG_BATCHER_ITEM_COUNT_THRESHOLD": 100,
6266
}
6367

6468
var (
@@ -161,16 +165,18 @@ func Parse(flags Flags) (*Config, error) {
161165
Password: viper.GetString("REDIS_PASSWORD"),
162166
Database: mustInt(viper, "REDIS_DATABASE"),
163167
},
164-
ClickHouse: clickHouseConfig,
165-
OpenTelemetry: openTelemetry,
166-
PublishQueueConfig: publishQueueConfig,
167-
DeliveryQueueConfig: deliveryQueueConfig,
168-
LogQueueConfig: logQueueConfig,
169-
PublishMaxConcurrency: mustInt(viper, "PUBLISHMQ_MAX_CONCURRENCY"),
170-
DeliveryMaxConcurrency: mustInt(viper, "DELIVERYMQ_MAX_CONCURRENCY"),
171-
LogMaxConcurrency: mustInt(viper, "LOGMQ_MAX_CONCURRENCY"),
172-
RetryIntervalSeconds: mustInt(viper, "RETRY_INTERVAL_SECONDS"),
173-
RetryMaxCount: mustInt(viper, "MAX_RETRY_COUNT"),
168+
ClickHouse: clickHouseConfig,
169+
OpenTelemetry: openTelemetry,
170+
PublishQueueConfig: publishQueueConfig,
171+
DeliveryQueueConfig: deliveryQueueConfig,
172+
LogQueueConfig: logQueueConfig,
173+
PublishMaxConcurrency: mustInt(viper, "PUBLISHMQ_MAX_CONCURRENCY"),
174+
DeliveryMaxConcurrency: mustInt(viper, "DELIVERYMQ_MAX_CONCURRENCY"),
175+
LogMaxConcurrency: mustInt(viper, "LOGMQ_MAX_CONCURRENCY"),
176+
RetryIntervalSeconds: mustInt(viper, "RETRY_INTERVAL_SECONDS"),
177+
RetryMaxCount: mustInt(viper, "MAX_RETRY_COUNT"),
178+
LogBatcherDelayThresholdSeconds: mustInt(viper, "LOG_BATCHER_DELAY_THRESHOLD_SECONDS"),
179+
LogBatcherItemCountThreshold: mustInt(viper, "LOG_BATCHER_ITEM_COUNT_THRESHOLD"),
174180
}
175181

176182
return config, nil

internal/services/log/log.go

+13-4
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,11 @@ func NewService(ctx context.Context,
5656
var eventBatcher *batcher.Batcher[*models.Event]
5757
var deliveryBatcher *batcher.Batcher[*models.Delivery]
5858
if handler == nil {
59-
batcher, err := makeBatcher(ctx, logger, models.NewLogStore(chDB))
59+
batcherCfg := batcherConfig{
60+
ItemCountThreshold: cfg.LogBatcherItemCountThreshold,
61+
DelayThreshold: time.Duration(cfg.LogBatcherDelayThresholdSeconds) * time.Second,
62+
}
63+
batcher, err := makeBatcher(ctx, logger, models.NewLogStore(chDB), batcherCfg)
6064
if err != nil {
6165
return nil, err
6266
}
@@ -108,11 +112,16 @@ func (s *LogService) Run(ctx context.Context) error {
108112
return nil
109113
}
110114

111-
func makeBatcher(ctx context.Context, logger *otelzap.Logger, logStore models.LogStore) (*batcher.Batcher[*mqs.Message], error) {
115+
type batcherConfig struct {
116+
ItemCountThreshold int
117+
DelayThreshold time.Duration
118+
}
119+
120+
func makeBatcher(ctx context.Context, logger *otelzap.Logger, logStore models.LogStore, batcherCfg batcherConfig) (*batcher.Batcher[*mqs.Message], error) {
112121
b, err := batcher.NewBatcher(batcher.Config[*mqs.Message]{
113122
GroupCountThreshold: 2,
114-
ItemCountThreshold: 100,
115-
DelayThreshold: 5 * time.Second,
123+
ItemCountThreshold: batcherCfg.ItemCountThreshold,
124+
DelayThreshold: batcherCfg.DelayThreshold,
116125
NumGoroutines: 1,
117126
Processor: func(_ string, msgs []*mqs.Message) {
118127
logger.Ctx(ctx).Info("log batcher processor", zap.Int("msgs", len(msgs)))

0 commit comments

Comments
 (0)