Skip to content

Commit

Permalink
feat: support override consumer group config and refine/fix some code (
Browse files Browse the repository at this point in the history
…#197)

* feat: support override consumer group config and refine/fix some code
    1. override old consumer group rather than skip
    2. fix UpdateConsumerGroup always failed
    3. refine codes

* refine: modify consumer group Stringer impl and log

* fix: no need to update when there's no difference
  • Loading branch information
wxybear authored Mar 10, 2023
1 parent 9e39fa5 commit e8f9ed9
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 21 deletions.
19 changes: 17 additions & 2 deletions client_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ type ConsumerGroup struct {
InOrder bool `json:"order"`
}

func (cg *ConsumerGroup) String() string {
return fmt.Sprintf("%+v", cg)
}

// ConsumerGroupCheckPoint type define
type ConsumerGroupCheckPoint struct {
ShardID int `json:"shard"`
CheckPoint string `json:"checkpoint"`
UpdateTime int64 `json:"updateTime"`
UpdateTime int64 `json:"updateTime"`
Consumer string `json:"consumer"`
}

Expand All @@ -35,6 +39,9 @@ func (c *Client) CreateConsumerGroup(project, logstore string, cg ConsumerGroup)
}

body, err := json.Marshal(cg)
if err != nil {
return err
}
uri := fmt.Sprintf("/logstores/%v/consumergroups", logstore)
_, err = c.request(project, "POST", uri, h, body)
if err != nil {
Expand All @@ -50,7 +57,15 @@ func (c *Client) UpdateConsumerGroup(project, logstore string, cg ConsumerGroup)
"Content-Type": "application/json",
}

body, err := json.Marshal(cg)
updates := make(map[string]interface{})
updates["order"] = cg.InOrder
if cg.Timeout > 0 {
updates["timeout"] = cg.Timeout
}
body, err := json.Marshal(updates)
if err != nil {
return err
}
uri := fmt.Sprintf("/logstores/%v/consumergroups/%v", logstore, cg.ConsumerGroupName)
_, err = c.request(project, "PUT", uri, h, body)
if err != nil {
Expand Down
42 changes: 31 additions & 11 deletions consumer/consumer_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consumerLibrary

import (
"fmt"
"time"

sls "github.com/aliyun/aliyun-log-go-sdk"
Expand Down Expand Up @@ -31,15 +32,15 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient
AccessKeyID: option.AccessKeyID,
AccessKeySecret: option.AccessKeySecret,
SecurityToken: option.SecurityToken,
UserAgent: option.ConsumerGroupName + "_" + option.ConsumerName,
UserAgent: option.ConsumerGroupName + "_" + option.ConsumerName,
}
if option.HTTPClient != nil {
client.SetHTTPClient(option.HTTPClient)
}
consumerGroup := sls.ConsumerGroup{
option.ConsumerGroupName,
option.HeartbeatIntervalInSecond * 3,
option.InOrder,
ConsumerGroupName: option.ConsumerGroupName,
Timeout: option.HeartbeatIntervalInSecond * 3,
InOrder: option.InOrder,
}
consumerClient := &ConsumerClient{
option,
Expand All @@ -51,18 +52,37 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient
return consumerClient
}

func (consumer *ConsumerClient) createConsumerGroup() {
err := consumer.client.CreateConsumerGroup(consumer.option.Project, consumer.option.Logstore, consumer.consumerGroup)
func (consumer *ConsumerClient) createConsumerGroup() error {
consumerGroups, err := consumer.client.ListConsumerGroup(consumer.option.Project, consumer.option.Logstore)
if err != nil {
if slsError, ok := err.(*sls.Error); ok {
if slsError.Code == "ConsumerGroupAlreadyExist" {
level.Info(consumer.logger).Log("msg", "New consumer join the consumer group", "consumer name", consumer.option.ConsumerName, "group name", consumer.option.ConsumerGroupName)
return fmt.Errorf("list consumer group failed: %w", err)
}
alreadyExist := false
for _, cg := range consumerGroups {
if cg.ConsumerGroupName == consumer.consumerGroup.ConsumerGroupName {
alreadyExist = true
if (*cg) != consumer.consumerGroup {
level.Info(consumer.logger).Log("msg", "this config is different from original config, try to override it", "old_config", cg)
} else {
level.Error(consumer.logger).Log("msg", "create consumer group error", "error", err)

level.Info(consumer.logger).Log("msg", "new consumer join the consumer group", "consumer name", consumer.option.ConsumerName,
"group name", consumer.option.ConsumerGroupName)
return nil
}
}
}
if alreadyExist {
if err := consumer.client.UpdateConsumerGroup(consumer.option.Project, consumer.option.Logstore, consumer.consumerGroup); err != nil {
return fmt.Errorf("update consumer group failed: %w", err)
}
} else {
if err := consumer.client.CreateConsumerGroup(consumer.option.Project, consumer.option.Logstore, consumer.consumerGroup); err != nil {
if slsError, ok := err.(*sls.Error); !ok || slsError.Code != "ConsumerGroupAlreadyExist" {
return fmt.Errorf("create consumer group failed: %w", err)
}
}
}

return nil
}

func (consumer *ConsumerClient) heartBeat(heart []int) ([]int, error) {
Expand Down
45 changes: 43 additions & 2 deletions consumer/consumer_client_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package consumerLibrary

import (
"github.com/aliyun/aliyun-log-go-sdk"
"fmt"
"testing"

sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/go-kit/kit/log"
"github.com/stretchr/testify/assert"
)

func InitOption() LogHubConfig {
Expand Down Expand Up @@ -36,7 +40,6 @@ func consumerGroup() sls.ConsumerGroup {
}

func TestConsumerClient_createConsumerGroup(t *testing.T) {

type fields struct {
option LogHubConfig
client *sls.Client
Expand All @@ -57,3 +60,41 @@ func TestConsumerClient_createConsumerGroup(t *testing.T) {
consumer.createConsumerGroup()
}
}

func internalGetConsumerGroup(client *sls.Client, project, logstore, groupName string) (sls.ConsumerGroup, error) {
cgs, err := client.ListConsumerGroup(project, logstore)
if err != nil {
return sls.ConsumerGroup{}, err
}
for _, cg := range cgs {
if cg.ConsumerGroupName == groupName {
return *cg, nil
}
}

return sls.ConsumerGroup{}, fmt.Errorf("consumer group not found")
}

func TestConsumerClient_updateConsumerGroup(t *testing.T) {
logger := log.NewNopLogger()
oldOption := InitOption()
newOption := oldOption
newOption.HeartbeatIntervalInSecond += 20
oldClient := initConsumerClient(oldOption, logger)
newClient := initConsumerClient(newOption, logger)
// ready
_ = oldClient.client.DeleteConsumerGroup(oldOption.Project, oldOption.Logstore, oldOption.ConsumerGroupName)
assert.NotEqual(t, newClient.consumerGroup, oldClient.consumerGroup)
// old config
assert.Nil(t, oldClient.createConsumerGroup())
cg, err := internalGetConsumerGroup(oldClient.client, oldOption.Project, oldOption.Logstore, oldOption.ConsumerGroupName)
assert.Nil(t, err)
assert.Equal(t, cg, oldClient.consumerGroup)
// new config
assert.Nil(t, newClient.createConsumerGroup())
cg, err = internalGetConsumerGroup(oldClient.client, oldOption.Project, oldOption.Logstore, oldOption.ConsumerGroupName)
assert.Nil(t, err)
assert.Equal(t, cg, newClient.consumerGroup)
// clean
_ = oldClient.client.DeleteConsumerGroup(oldOption.Project, oldOption.Logstore, oldOption.ConsumerGroupName)
}
2 changes: 1 addition & 1 deletion consumer/shard_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (consumer *ShardConsumerWorker) consume() {
func (consumer *ShardConsumerWorker) consumerShutDown() {
consumer.consumerShutDownFlag = true
if !consumer.isShutDownComplete() {
if consumer.getIsFlushCheckpointDoneStatus() == true {
if consumer.getIsFlushCheckpointDoneStatus() {
consumer.consume()
} else {
return
Expand Down
8 changes: 6 additions & 2 deletions consumer/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) str
do: do,
Logger: logger,
}
consumerClient.createConsumerGroup()
if err := consumerClient.createConsumerGroup(); err != nil {
level.Error(consumerWorker.Logger).Log(
"msg", "possibly failed to create or update consumer group, please check worker run log",
"err", err)
}
return consumerWorker
}

Expand Down Expand Up @@ -65,7 +69,7 @@ func (consumerWorker *ConsumerWorker) run() {
break
}
shardConsumer := consumerWorker.getShardConsumer(shard)
if shardConsumer.getConsumerIsCurrentDoneStatus() == true {
if shardConsumer.getConsumerIsCurrentDoneStatus() {
shardConsumer.consume()
} else {
continue
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -354,14 +354,12 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 h1:DnSr2mCsxyCE6ZgIkmcWUQY2R5cH/6wL7eIxEmQOMSE=
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
2 changes: 1 addition & 1 deletion log_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,6 @@ type LogConfig struct {
OutputType string `json:"outputType"`
OutputDetail OutputDetail `json:"outputDetail"`

CreateTime uint32 `json:"createTime,omitempty`
CreateTime uint32 `json:"createTime,omitempty"`
LastModifyTime uint32 `json:"lastModifyTime,omitempty"`
}

0 comments on commit e8f9ed9

Please sign in to comment.