Skip to content

Commit

Permalink
fix: producer init fallback to ak client on failure (#295)
Browse files Browse the repository at this point in the history
* fix: producer init fallback to ak client on failure

* feat: refine init producer

* chore: refine demo and ut
  • Loading branch information
crimson-gao authored Nov 11, 2024
1 parent cdf1129 commit 5a60a6f
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 30 deletions.
5 changes: 4 additions & 1 deletion example/producer/native_pb_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ func main() {
producerConfig.Endpoint = os.Getenv("Endpoint")
producerConfig.AccessKeyID = os.Getenv("AccessKeyID")
producerConfig.AccessKeySecret = os.Getenv("AccessKeySecret")
producerInstance := producer.InitProducer(producerConfig)
producerInstance, err := producer.NewProducer(producerConfig)
if err != nil {
panic(err)
}
ch := make(chan os.Signal)
signal.Notify(ch, os.Kill, os.Interrupt)
producerInstance.Start()
Expand Down
5 changes: 4 additions & 1 deletion example/producer/performance_test_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ func main() {
rand.Seed(time.Now().Unix())
valueList = generateValuseList()

producerInstance := producer.InitProducer(producerConfig)
producerInstance, err := producer.NewProducer(producerConfig)
if err != nil {
panic(err)
}
ch := make(chan os.Signal)
signal.Notify(ch, os.Kill, os.Interrupt)
producerInstance.Start()
Expand Down
5 changes: 4 additions & 1 deletion example/producer/producer_simple_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ func main() {
Value: proto.String("value_2"),
},
}
producerInstance := producer.InitProducer(producerConfig)
producerInstance, err := producer.NewProducer(producerConfig)
if err != nil {
panic(err)
}
ch := make(chan os.Signal)
signal.Notify(ch, os.Kill, os.Interrupt)
producerInstance.Start()
Expand Down
5 changes: 4 additions & 1 deletion example/producer/simple_callback_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ func main() {
producerConfig.Endpoint = os.Getenv("Endpoint")
producerConfig.AccessKeyID = os.Getenv("AccessKeyID")
producerConfig.AccessKeySecret = os.Getenv("AccessKeySecret")
producerInstance := producer.InitProducer(producerConfig)
producerInstance, err := producer.NewProducer(producerConfig)
if err != nil {
panic(err)
}
ch := make(chan os.Signal)
signal.Notify(ch, os.Kill, os.Interrupt)
producerInstance.Start()
Expand Down
5 changes: 4 additions & 1 deletion example/producer/use_ststoken_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ func main() {
//When the producer is closed, if the StsTokenShutDown parameter is not set to nil, it will actively call the close method to close the channel.
producerConfig.StsTokenShutDown = make(chan struct{})
producerConfig.UpdateStsToken = updateStsToken
producerInstance := producer.InitProducer(producerConfig)
producerInstance, err := producer.NewProducer(producerConfig)
if err != nil {
panic(err)
}
ch := make(chan os.Signal)
signal.Notify(ch, os.Kill, os.Interrupt)
producerInstance.Start()
Expand Down
5 changes: 4 additions & 1 deletion producer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ producerConfig := producer.GetDefaultProducerConfig()
producerConfig.Endpoint = os.Getenv("Endpoint")
provider := sls.NewStaticCredentailsProvider(os.Getenv("AccessKeyID"), os.Getenv("AccessKeySecret"), "")
producerConfig.CredentialsProvider = provider
producerInstance:=producer.InitProducer(producerConfig)
producerInstance,err := producer.NewProducer(producerConfig)
if err != nil {
panic(err)
}
ch := make(chan os.Signal)
signal.Notify(ch, os.Kill, os.Interrupt)
producerInstance.Start() // 启动producer实例
Expand Down
5 changes: 4 additions & 1 deletion producer/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ func TestProducer_CallBack(t *testing.T) {
producerConfig.Endpoint = ""
producerConfig.AccessKeyID = ""
producerConfig.AccessKeySecret = ""
producerInstance := InitProducer(producerConfig)
producerInstance, err := NewProducer(producerConfig)
if err != nil {
panic(err)
}
ch := make(chan os.Signal)
signal.Notify(ch, os.Kill, os.Interrupt)
producerInstance.Start()
Expand Down
63 changes: 42 additions & 21 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,32 @@ type Producer struct {
producerLogGroupSize int64
}

func InitProducer(producerConfig *ProducerConfig) *Producer {
func NewProducer(producerConfig *ProducerConfig) (*Producer, error) {
logger := logConfig(producerConfig)
finalProducerConfig := validateProducerConfig(producerConfig, logger)

client, err := createClient(producerConfig)
client, err := createClient(finalProducerConfig, false, logger)
if err != nil {
level.Warn(logger).Log("msg", "Failed to create ststoken client, use default client without ststoken.", "error", err)
}
if producerConfig.Region != "" {
client.SetRegion(producerConfig.Region)
return nil, err
}
if producerConfig.AuthVersion != "" {
client.SetAuthVersion(producerConfig.AuthVersion)
}
if producerConfig.HTTPClient != nil {
client.SetHTTPClient(producerConfig.HTTPClient)
}
if producerConfig.UserAgent != "" {
client.SetUserAgent(producerConfig.UserAgent)
}
finalProducerConfig := validateProducerConfig(producerConfig)
return createProducerInternal(client, finalProducerConfig, logger), nil
}

// Deprecated: use NewProducer instead.
func InitProducer(producerConfig *ProducerConfig) *Producer {
logger := logConfig(producerConfig)
finalProducerConfig := validateProducerConfig(producerConfig, logger)

client, _ := createClient(finalProducerConfig, true, logger)
return createProducerInternal(client, finalProducerConfig, logger)
}

func createProducerInternal(client sls.ClientInterface, finalProducerConfig *ProducerConfig, logger log.Logger) *Producer {
configureClient(client, finalProducerConfig)
retryQueue := initRetryQueue()
errorStatusMap := func() map[int]*string {
errorCodeMap := map[int]*string{}
for _, v := range producerConfig.NoRetryStatusCodeList {
for _, v := range finalProducerConfig.NoRetryStatusCodeList {
errorCodeMap[int(v)] = nil
}
return errorCodeMap
Expand All @@ -76,22 +78,41 @@ func InitProducer(producerConfig *ProducerConfig) *Producer {
return producer
}

func createClient(producerConfig *ProducerConfig) (sls.ClientInterface, error) {
func configureClient(client sls.ClientInterface, producerConfig *ProducerConfig) {
if producerConfig.Region != "" {
client.SetRegion(producerConfig.Region)
}
if producerConfig.AuthVersion != "" {
client.SetAuthVersion(producerConfig.AuthVersion)
}
if producerConfig.HTTPClient != nil {
client.SetHTTPClient(producerConfig.HTTPClient)
}
if producerConfig.UserAgent != "" {
client.SetUserAgent(producerConfig.UserAgent)
}
}

func createClient(producerConfig *ProducerConfig, allowStsFallback bool, logger log.Logger) (sls.ClientInterface, error) {
// use CredentialsProvider
if producerConfig.CredentialsProvider != nil {
return sls.CreateNormalInterfaceV2(producerConfig.Endpoint, producerConfig.CredentialsProvider), nil
}
// use UpdateStsTokenFunc
if producerConfig.UpdateStsToken != nil && producerConfig.StsTokenShutDown != nil {
return sls.CreateTokenAutoUpdateClient(producerConfig.Endpoint, producerConfig.UpdateStsToken, producerConfig.StsTokenShutDown)
client, err := sls.CreateTokenAutoUpdateClient(producerConfig.Endpoint, producerConfig.UpdateStsToken, producerConfig.StsTokenShutDown)
if err == nil || !allowStsFallback {
return client, err
}
// for backward compatibility
level.Warn(logger).Log("msg", "Failed to create ststoken client, use default client without ststoken.", "error", err)
}
// fallback to default static long-lived AK
staticProvider := sls.NewStaticCredentialsProvider(producerConfig.AccessKeyID, producerConfig.AccessKeySecret, "")
return sls.CreateNormalInterfaceV2(producerConfig.Endpoint, staticProvider), nil
}

func validateProducerConfig(producerConfig *ProducerConfig) *ProducerConfig {
logger := logConfig(producerConfig)
func validateProducerConfig(producerConfig *ProducerConfig, logger log.Logger) *ProducerConfig {
if producerConfig.MaxReservedAttempts <= 0 {
level.Warn(logger).Log("msg", "This MaxReservedAttempts parameter must be greater than zero,program auto correction to default value")
producerConfig.MaxReservedAttempts = 11
Expand Down
62 changes: 60 additions & 2 deletions producer/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,25 @@ package producer
import (
"fmt"
"os"
"os/signal"
"testing"
"time"

sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/gogo/protobuf/proto"
)

func TestVsSign(t *testing.T) {
func TestV4Sign(t *testing.T) {
producerConfig := GetDefaultProducerConfig()
producerConfig.Endpoint = os.Getenv("LOG_TEST_ENDPOINT")
provider := sls.NewStaticCredentialsProvider(os.Getenv("LOG_TEST_ACCESS_KEY_ID"), os.Getenv("LOG_TEST_ACCESS_KEY_SECRET"), "")
producerConfig.CredentialsProvider = provider
producerConfig.Region = os.Getenv("LOG_TEST_REGION")
producerConfig.AuthVersion = sls.AuthV4
producerInstance := InitProducer(producerConfig)
producerInstance, err := NewProducer(producerConfig)
if err != nil {
panic(err)
}

producerInstance.Start() // 启动producer实例
for i := 0; i < 100; i++ {
Expand All @@ -30,3 +35,56 @@ func TestVsSign(t *testing.T) {
producerInstance.Close(60) // 有限关闭,传递int值,参数值需为正整数,单位为秒
producerInstance.SafeClose() // 安全关闭
}

func printShardId(shardId string) string {
config := GetDefaultProducerConfig()
newShardHash, err := AdjustHash(shardId, config.Buckets)
if err != nil {
panic(err)
}
fmt.Printf("shardId: %s -> %s\n", shardId, newShardHash)
return newShardHash
}

func TestProducer(t *testing.T) {
config := GetDefaultProducerConfig()
config.Endpoint = os.Getenv("LOG_TEST_ENDPOINT")
provider := sls.NewStaticCredentialsProvider(os.Getenv("LOG_TEST_ACCESS_KEY_ID"), os.Getenv("LOG_TEST_ACCESS_KEY_SECRET"), "")
config.CredentialsProvider = provider
producerInstance, err := NewProducer(config)
if err != nil {
panic(err)
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
producerInstance.Start()
hashs := []string{"aaa", ":bbbbb", "cccc"}
for i := 0; i < 100; i++ {
hash := hashs[i%len(hashs)]
calc := printShardId(hash)
content := []*sls.LogContent{}
content = append(content, &sls.LogContent{
Key: proto.String("hash"),
Value: proto.String(hash),
})
content = append(content, &sls.LogContent{
Key: proto.String("calc"),
Value: proto.String(calc),
})
log := &sls.Log{
Time: proto.Uint32(uint32(time.Now().Unix())),
Contents: content,
}

err := producerInstance.HashSendLog(os.Getenv("LOG_TEST_PROJECT"), os.Getenv("LOG_TEST_LOGSTORE"), hash, "127.0.0.1", "topic", log)
if err != nil {
fmt.Println(err)
}
time.Sleep(time.Millisecond * 500)
}

if _, ok := <-ch; ok {
fmt.Println("Get the shutdown signal and start to shut down")
producerInstance.Close(60000)
}
}

0 comments on commit 5a60a6f

Please sign in to comment.