From e8f9ed9bb3c591afda38ecb5d80c41ced9d191b3 Mon Sep 17 00:00:00 2001 From: FFish Date: Fri, 10 Mar 2023 11:21:08 +0800 Subject: [PATCH] feat: support override consumer group config and refine/fix some code (#197) * feat: support override consumer group config and refine/fix some code 1. override old consumer group rather than skip 2. fix UpdateConsumerGroup always failed 3. refine codes * refine: modify consumer group Stringer impl and log * fix: no need to update when there's no difference --- client_consumer.go | 19 ++++++++++++-- consumer/consumer_client.go | 42 +++++++++++++++++++++-------- consumer/consumer_client_test.go | 45 ++++++++++++++++++++++++++++++-- consumer/shard_worker.go | 2 +- consumer/worker.go | 8 ++++-- go.sum | 2 -- log_config.go | 2 +- 7 files changed, 99 insertions(+), 21 deletions(-) diff --git a/client_consumer.go b/client_consumer.go index 4a127c11..792cfe81 100644 --- a/client_consumer.go +++ b/client_consumer.go @@ -19,11 +19,15 @@ type ConsumerGroup struct { InOrder bool `json:"order"` } +func (cg *ConsumerGroup) String() string { + return fmt.Sprintf("%+v", cg) +} + // ConsumerGroupCheckPoint type define type ConsumerGroupCheckPoint struct { ShardID int `json:"shard"` CheckPoint string `json:"checkpoint"` - UpdateTime int64 `json:"updateTime"` + UpdateTime int64 `json:"updateTime"` Consumer string `json:"consumer"` } @@ -35,6 +39,9 @@ func (c *Client) CreateConsumerGroup(project, logstore string, cg ConsumerGroup) } body, err := json.Marshal(cg) + if err != nil { + return err + } uri := fmt.Sprintf("/logstores/%v/consumergroups", logstore) _, err = c.request(project, "POST", uri, h, body) if err != nil { @@ -50,7 +57,15 @@ func (c *Client) UpdateConsumerGroup(project, logstore string, cg ConsumerGroup) "Content-Type": "application/json", } - body, err := json.Marshal(cg) + updates := make(map[string]interface{}) + updates["order"] = cg.InOrder + if cg.Timeout > 0 { + updates["timeout"] = cg.Timeout + } + body, err := json.Marshal(updates) + if err != nil { + return err + } uri := fmt.Sprintf("/logstores/%v/consumergroups/%v", logstore, cg.ConsumerGroupName) _, err = c.request(project, "PUT", uri, h, body) if err != nil { diff --git a/consumer/consumer_client.go b/consumer/consumer_client.go index 0b871c86..3f474c15 100644 --- a/consumer/consumer_client.go +++ b/consumer/consumer_client.go @@ -1,6 +1,7 @@ package consumerLibrary import ( + "fmt" "time" sls "github.com/aliyun/aliyun-log-go-sdk" @@ -31,15 +32,15 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient AccessKeyID: option.AccessKeyID, AccessKeySecret: option.AccessKeySecret, SecurityToken: option.SecurityToken, - UserAgent: option.ConsumerGroupName + "_" + option.ConsumerName, + UserAgent: option.ConsumerGroupName + "_" + option.ConsumerName, } if option.HTTPClient != nil { client.SetHTTPClient(option.HTTPClient) } consumerGroup := sls.ConsumerGroup{ - option.ConsumerGroupName, - option.HeartbeatIntervalInSecond * 3, - option.InOrder, + ConsumerGroupName: option.ConsumerGroupName, + Timeout: option.HeartbeatIntervalInSecond * 3, + InOrder: option.InOrder, } consumerClient := &ConsumerClient{ option, @@ -51,18 +52,37 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient return consumerClient } -func (consumer *ConsumerClient) createConsumerGroup() { - err := consumer.client.CreateConsumerGroup(consumer.option.Project, consumer.option.Logstore, consumer.consumerGroup) +func (consumer *ConsumerClient) createConsumerGroup() error { + consumerGroups, err := consumer.client.ListConsumerGroup(consumer.option.Project, consumer.option.Logstore) if err != nil { - if slsError, ok := err.(*sls.Error); ok { - if slsError.Code == "ConsumerGroupAlreadyExist" { - level.Info(consumer.logger).Log("msg", "New consumer join the consumer group", "consumer name", consumer.option.ConsumerName, "group name", consumer.option.ConsumerGroupName) + return fmt.Errorf("list consumer group failed: %w", err) + } + alreadyExist := false + for _, cg := range consumerGroups { + if cg.ConsumerGroupName == consumer.consumerGroup.ConsumerGroupName { + alreadyExist = true + if (*cg) != consumer.consumerGroup { + level.Info(consumer.logger).Log("msg", "this config is different from original config, try to override it", "old_config", cg) } else { - level.Error(consumer.logger).Log("msg", "create consumer group error", "error", err) - + level.Info(consumer.logger).Log("msg", "new consumer join the consumer group", "consumer name", consumer.option.ConsumerName, + "group name", consumer.option.ConsumerGroupName) + return nil } } } + if alreadyExist { + if err := consumer.client.UpdateConsumerGroup(consumer.option.Project, consumer.option.Logstore, consumer.consumerGroup); err != nil { + return fmt.Errorf("update consumer group failed: %w", err) + } + } else { + if err := consumer.client.CreateConsumerGroup(consumer.option.Project, consumer.option.Logstore, consumer.consumerGroup); err != nil { + if slsError, ok := err.(*sls.Error); !ok || slsError.Code != "ConsumerGroupAlreadyExist" { + return fmt.Errorf("create consumer group failed: %w", err) + } + } + } + + return nil } func (consumer *ConsumerClient) heartBeat(heart []int) ([]int, error) { diff --git a/consumer/consumer_client_test.go b/consumer/consumer_client_test.go index 160e66e7..407dd5d5 100644 --- a/consumer/consumer_client_test.go +++ b/consumer/consumer_client_test.go @@ -1,8 +1,12 @@ package consumerLibrary import ( - "github.com/aliyun/aliyun-log-go-sdk" + "fmt" "testing" + + sls "github.com/aliyun/aliyun-log-go-sdk" + "github.com/go-kit/kit/log" + "github.com/stretchr/testify/assert" ) func InitOption() LogHubConfig { @@ -36,7 +40,6 @@ func consumerGroup() sls.ConsumerGroup { } func TestConsumerClient_createConsumerGroup(t *testing.T) { - type fields struct { option LogHubConfig client *sls.Client @@ -57,3 +60,41 @@ func TestConsumerClient_createConsumerGroup(t *testing.T) { consumer.createConsumerGroup() } } + +func internalGetConsumerGroup(client *sls.Client, project, logstore, groupName string) (sls.ConsumerGroup, error) { + cgs, err := client.ListConsumerGroup(project, logstore) + if err != nil { + return sls.ConsumerGroup{}, err + } + for _, cg := range cgs { + if cg.ConsumerGroupName == groupName { + return *cg, nil + } + } + + return sls.ConsumerGroup{}, fmt.Errorf("consumer group not found") +} + +func TestConsumerClient_updateConsumerGroup(t *testing.T) { + logger := log.NewNopLogger() + oldOption := InitOption() + newOption := oldOption + newOption.HeartbeatIntervalInSecond += 20 + oldClient := initConsumerClient(oldOption, logger) + newClient := initConsumerClient(newOption, logger) + // ready + _ = oldClient.client.DeleteConsumerGroup(oldOption.Project, oldOption.Logstore, oldOption.ConsumerGroupName) + assert.NotEqual(t, newClient.consumerGroup, oldClient.consumerGroup) + // old config + assert.Nil(t, oldClient.createConsumerGroup()) + cg, err := internalGetConsumerGroup(oldClient.client, oldOption.Project, oldOption.Logstore, oldOption.ConsumerGroupName) + assert.Nil(t, err) + assert.Equal(t, cg, oldClient.consumerGroup) + // new config + assert.Nil(t, newClient.createConsumerGroup()) + cg, err = internalGetConsumerGroup(oldClient.client, oldOption.Project, oldOption.Logstore, oldOption.ConsumerGroupName) + assert.Nil(t, err) + assert.Equal(t, cg, newClient.consumerGroup) + // clean + _ = oldClient.client.DeleteConsumerGroup(oldOption.Project, oldOption.Logstore, oldOption.ConsumerGroupName) +} diff --git a/consumer/shard_worker.go b/consumer/shard_worker.go index 00f2103b..278b19d7 100644 --- a/consumer/shard_worker.go +++ b/consumer/shard_worker.go @@ -164,7 +164,7 @@ func (consumer *ShardConsumerWorker) consume() { func (consumer *ShardConsumerWorker) consumerShutDown() { consumer.consumerShutDownFlag = true if !consumer.isShutDownComplete() { - if consumer.getIsFlushCheckpointDoneStatus() == true { + if consumer.getIsFlushCheckpointDoneStatus() { consumer.consume() } else { return diff --git a/consumer/worker.go b/consumer/worker.go index 8167a789..5cfa59f3 100644 --- a/consumer/worker.go +++ b/consumer/worker.go @@ -34,7 +34,11 @@ func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) str do: do, Logger: logger, } - consumerClient.createConsumerGroup() + if err := consumerClient.createConsumerGroup(); err != nil { + level.Error(consumerWorker.Logger).Log( + "msg", "possibly failed to create or update consumer group, please check worker run log", + "err", err) + } return consumerWorker } @@ -65,7 +69,7 @@ func (consumerWorker *ConsumerWorker) run() { break } shardConsumer := consumerWorker.getShardConsumer(shard) - if shardConsumer.getConsumerIsCurrentDoneStatus() == true { + if shardConsumer.getConsumerIsCurrentDoneStatus() { shardConsumer.consume() } else { continue diff --git a/go.sum b/go.sum index df8127ac..638b3a73 100644 --- a/go.sum +++ b/go.sum @@ -354,14 +354,12 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 h1:DnSr2mCsxyCE6ZgIkmcWUQY2R5cH/6wL7eIxEmQOMSE= golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/log_config.go b/log_config.go index 793a98d1..02626a1a 100644 --- a/log_config.go +++ b/log_config.go @@ -490,6 +490,6 @@ type LogConfig struct { OutputType string `json:"outputType"` OutputDetail OutputDetail `json:"outputDetail"` - CreateTime uint32 `json:"createTime,omitempty` + CreateTime uint32 `json:"createTime,omitempty"` LastModifyTime uint32 `json:"lastModifyTime,omitempty"` }