Skip to content

Commit

Permalink
Merge pull request #187 from chaolee50/master
Browse files Browse the repository at this point in the history
add GetLogsToCompleted
  • Loading branch information
shabicheng authored Nov 23, 2022
2 parents d0078ce + 0101a07 commit 9afea25
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 0 deletions.
4 changes: 4 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ var RetryOnServerErrorEnabled = true

var GlobalDebugLevel = 0

var MaxCompletedRetryCount = 20

var MaxCompletedRetryLatency = 5 * time.Minute

// compress type
const (
Compress_LZ4 = iota // 0
Expand Down
7 changes: 7 additions & 0 deletions client_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ type ClientInterface interface {
GetLogsV2(project, logstore string, req *GetLogRequest) (*GetLogsResponse, error)
GetLogLinesV2(project, logstore string, req *GetLogRequest) (*GetLogLinesResponse, error)

// GetHistogramsToCompleted query logs with [from, to) time range to completed
GetHistogramsToCompleted(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error)
// GetLogsToCompleted query logs with [from, to) time range to completed
GetLogsToCompleted(project, logstore string, topic string, from int64, to int64, queryExp string, maxLineNum int64, offset int64, reverse bool) (*GetLogsResponse, error)
// GetLogsToCompletedV2 query logs with [from, to) time range to completed
GetLogsToCompletedV2(project, logstore string, req *GetLogRequest) (*GetLogsResponse, error)

// #################### Index Operations #####################
// CreateIndex ...
CreateIndex(project, logstore string, index Index) error
Expand Down
19 changes: 19 additions & 0 deletions client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,26 @@ func (c *Client) GetHistograms(project, logstore string, topic string, from int6
return ls.GetHistograms(topic, from, to, queryExp)
}

// GetHistogramsToCompleted query logs with [from, to) time range to completed
func (c *Client) GetHistogramsToCompleted(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetHistogramsToCompleted(topic, from, to, queryExp)
}

// GetLogs query logs with [from, to) time range
func (c *Client) GetLogs(project, logstore string, topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogsResponse, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetLogs(topic, from, to, queryExp, maxLineNum, offset, reverse)
}

// GetLogsToCompleted query logs with [from, to) time range to completed
func (c *Client) GetLogsToCompleted(project, logstore string, topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogsResponse, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetLogsToCompleted(topic, from, to, queryExp, maxLineNum, offset, reverse)
}

// GetLogLines ...
func (c *Client) GetLogLines(project, logstore string, topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogLinesResponse, error) {
Expand All @@ -217,6 +230,12 @@ func (c *Client) GetLogsV2(project, logstore string, req *GetLogRequest) (*GetLo
return ls.GetLogsV2(req)
}

// GetLogsToCompletedV2 ...
func (c *Client) GetLogsToCompletedV2(project, logstore string, req *GetLogRequest) (*GetLogsResponse, error) {
ls := convertLogstore(c, project, logstore)
return ls.GetLogsToCompletedV2(req)
}

// GetLogLinesV2 ...
func (c *Client) GetLogLinesV2(project, logstore string, req *GetLogRequest) (*GetLogLinesResponse, error) {
ls := convertLogstore(c, project, logstore)
Expand Down
70 changes: 70 additions & 0 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sls
import (
"encoding/json"
"fmt"
"time"

"io/ioutil"
"net/http"
Expand Down Expand Up @@ -635,6 +636,75 @@ func (s *LogStore) GetLogs(topic string, from int64, to int64, queryExp string,
return s.GetLogsV2(&req)
}

func (s *LogStore) getToCompleted(f func() (bool, error)) {
interval := 100 * time.Millisecond
retryCount := MaxCompletedRetryCount
isCompleted := false
timeoutTime := time.Now().Add(MaxCompletedRetryLatency)
for retryCount > 0 && timeoutTime.After(time.Now()) {
var err error
isCompleted, err = f()
if err != nil || isCompleted {
return
}
time.Sleep(interval)
retryCount--
if interval < 10*time.Second {
interval = interval * 2
}
if interval > 10*time.Second {
interval = 10 * time.Second
}
}
return
}

// GetLogsToCompleted query logs with [from, to) time range to completed
func (s *LogStore) GetLogsToCompleted(topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (*GetLogsResponse, error) {
var res *GetLogsResponse
var err error
f := func() (bool, error) {
res, err = s.GetLogs(topic, from, to, queryExp, maxLineNum, offset, reverse)
if err == nil {
return res.IsComplete(), nil
}
return false, err
}
s.getToCompleted(f)
return res, err
}

// GetLogsToCompletedV2 query logs with [from, to) time range to completed
func (s *LogStore) GetLogsToCompletedV2(req *GetLogRequest) (*GetLogsResponse, error) {
var res *GetLogsResponse
var err error
f := func() (bool, error) {
res, err = s.GetLogsV2(req)
if err == nil {
return res.IsComplete(), nil
}
return false, err
}
s.getToCompleted(f)
return res, err
}

// GetHistogramsToCompleted query logs with [from, to) time range to completed
func (s *LogStore) GetHistogramsToCompleted(topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) {
var res *GetHistogramsResponse
var err error
f := func() (bool, error) {
res, err = s.GetHistograms(topic, from, to, queryExp)
if err == nil {
return res.IsComplete(), nil
}
return false, err
}
s.getToCompleted(f)
return res, err
}

// GetLogsV2 query logs with [from, to) time range
func (s *LogStore) GetLogsV2(req *GetLogRequest) (*GetLogsResponse, error) {
rsp, b, logRsp, err := s.getLogs(req)
Expand Down
31 changes: 31 additions & 0 deletions token_auto_update_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,16 @@ func (c *TokenAutoUpdateClient) GetHistograms(project, logstore string, topic st
return
}

func (c *TokenAutoUpdateClient) GetHistogramsToCompleted(project, logstore string, topic string, from int64, to int64, queryExp string) (h *GetHistogramsResponse, err error) {
for i := 0; i < c.maxTryTimes; i++ {
h, err = c.logClient.GetHistogramsToCompleted(project, logstore, topic, from, to, queryExp)
if !c.processError(err) {
return
}
}
return
}

func (c *TokenAutoUpdateClient) GetLogsV2(project, logstore string, req *GetLogRequest) (r *GetLogsResponse, err error) {
for i := 0; i < c.maxTryTimes; i++ {
r, err = c.logClient.GetLogsV2(project, logstore, req)
Expand All @@ -779,6 +789,16 @@ func (c *TokenAutoUpdateClient) GetLogsV2(project, logstore string, req *GetLogR
return
}

func (c *TokenAutoUpdateClient) GetLogsToCompletedV2(project, logstore string, req *GetLogRequest) (r *GetLogsResponse, err error) {
for i := 0; i < c.maxTryTimes; i++ {
r, err = c.logClient.GetLogsToCompletedV2(project, logstore, req)
if !c.processError(err) {
return
}
}
return
}

func (c *TokenAutoUpdateClient) GetLogLinesV2(project, logstore string, req *GetLogRequest) (r *GetLogLinesResponse, err error) {
for i := 0; i < c.maxTryTimes; i++ {
r, err = c.logClient.GetLogLinesV2(project, logstore, req)
Expand All @@ -800,6 +820,17 @@ func (c *TokenAutoUpdateClient) GetLogs(project, logstore string, topic string,
return
}

func (c *TokenAutoUpdateClient) GetLogsToCompleted(project, logstore string, topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (r *GetLogsResponse, err error) {
for i := 0; i < c.maxTryTimes; i++ {
r, err = c.logClient.GetLogsToCompleted(project, logstore, topic, from, to, queryExp, maxLineNum, offset, reverse)
if !c.processError(err) {
return
}
}
return
}

func (c *TokenAutoUpdateClient) GetLogLines(project, logstore string, topic string, from int64, to int64, queryExp string,
maxLineNum int64, offset int64, reverse bool) (r *GetLogLinesResponse, err error) {
for i := 0; i < c.maxTryTimes; i++ {
Expand Down

0 comments on commit 9afea25

Please sign in to comment.