Skip to content

Support contents only for loki flusher #1256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions docs/cn/data-pipeline/flusher/loki.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,25 @@ Alpha

## 配置参数

| 参数 | 类型 | 是否必选 | 说明 |
|------------------------------|----------|---------------|----------------------------------------------------------------------|
| Type | String | 是 | 插件类型 |
| Convert | Struct | 否 | ilogtail数据转换协议配置 |
| Convert.Protocol | String | 否 | ilogtail数据转换协议,可选值:`custom_single`,`otlp_log_v1`。默认值:`custom_single` |
| Convert.Encoding | String | 否 | ilogtail数据转换编码,可选值:`json`、`none`、`protobuf`,默认值:`json` |
| Convert.TagFieldsRename | Map | 否 | 对日志中tags中的json字段重命名 |
| Convert.ProtocolFieldsRename | Map | 否 | ilogtail日志协议字段重命名,可当前可重命名的字段:`contents`,`tags`和`time` |
| URL | String | 是 | Loki 推送地址,例如:`http://localhost:3100/loki/api/v1/push` |
| TenantID | String | 否 | Loki 的租户 ID(需要 Loki 开启该功能),默认为空,表示单租户模式 |
| MaxMessageWait | Int | 否 | 发送 batch 前的最长等待时间,默认 `1` 秒 |
| MaxMessageBytes | Int | 否 | 发送积累的最大的 batch 大小,默认 `1024 * 1024` bytes |
| Timeout | Int | 否 | 等待 Loki 响应的最大时间,默认 `10` 秒 |
| MinBackoff | Int | 否 | 重试之间的最短退避时间,默认 `500`毫秒 |
| MaxBackoff | Int | 否 | 重试的最长退避时间,默认 `5`分钟 |
| MaxRetries | Int | 否 | 最大重试次数,默认 `10` |
| DynamicLabels | String数组 | 两种Label至少选择一项 | 需要从日志中动态解析的标签列表,例如:`content.field1` |
| StaticLabels | Map | 两种Label至少选择一项 | 需要添加到每条日志上的静态标签 |
| 参数 | 类型 | 是否必选 | 说明 |
|------------------------------|----------|---------------|-----------------------------------------------------------------------------------------------|
| Type | String | 是 | 插件类型 |
| Convert | Struct | 否 | ilogtail数据转换协议配置 |
| Convert.Protocol | String | 否 | ilogtail数据转换协议,可选值:`custom_single`, `custom_single_flatten`,`otlp_log_v1`。默认值:`custom_single` |
| Convert.Encoding | String | 否 | ilogtail数据转换编码,可选值:`json`、`none`、`protobuf`,默认值:`json` |
| Convert.TagFieldsRename | Map | 否 | 对日志中tags中的json字段重命名 |
| Convert.ProtocolFieldsRename | Map | 否 | ilogtail日志协议字段重命名,可当前可重命名的字段:`contents`,`tags`和`time` |
| Convert.OnlyContents | Bool | 否 | 仅发送contents中的字段,目前只能和`custom_single_flatten`协议一起使用,默认值:`false` |
| URL | String | 是 | Loki 推送地址,例如:`http://localhost:3100/loki/api/v1/push` |
| TenantID | String | 否 | Loki 的租户 ID(需要 Loki 开启该功能),默认为空,表示单租户模式 |
| MaxMessageWait | Int | 否 | 发送 batch 前的最长等待时间,默认 `1` 秒 |
| MaxMessageBytes | Int | 否 | 发送积累的最大的 batch 大小,默认 `1024 * 1024` bytes |
| Timeout | Int | 否 | 等待 Loki 响应的最大时间,默认 `10` 秒 |
| MinBackoff | Int | 否 | 重试之间的最短退避时间,默认 `500`毫秒 |
| MaxBackoff | Int | 否 | 重试的最长退避时间,默认 `5`分钟 |
| MaxRetries | Int | 否 | 最大重试次数,默认 `10` |
| DynamicLabels | String数组 | 两种Label至少选择一项 | 需要从日志中动态解析的标签列表,例如:`content.field1` |
| StaticLabels | Map | 两种Label至少选择一项 | 需要添加到每条日志上的静态标签 |

## 样例

Expand Down Expand Up @@ -123,7 +124,7 @@ flushers:
- Type: flusher_loki
URL: http://<loki 服务的地址与端口>/loki/api/v1/push
DynamicLabels:
- tag.host.name
- tag.my.host.name
Convert:
TagFieldsRename:
host.name: my.host.name
Expand Down
1 change: 1 addition & 0 deletions pkg/protocol/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type Converter struct {
Encoding string
Separator string
IgnoreUnExpectedData bool
OnlyContents bool
TagKeyRenameMap map[string]string
ProtocolKeyRenameMap map[string]string
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/protocol/converter/converter_single_log_flatten.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,21 @@ func (c *Converter) ConvertToSingleProtocolLogsFlatten(logGroup *protocol.LogGro
}
desiredValues[i] = desiredValue

logLength := len(contents) + len(tags) + 1
logLength := 1 + len(contents)
if !c.OnlyContents {
logLength += len(tags)
}

customSingleLog := make(map[string]interface{}, logLength)
// merge contents to final logs
for k, v := range contents {
customSingleLog[k] = v
}
// merge tags to final logs
for k, v := range tags {
customSingleLog[k] = v
if !c.OnlyContents {
// merge tags to final logs
for k, v := range tags {
customSingleLog[k] = v
}
}

if newKey, ok := c.ProtocolKeyRenameMap[protocolKeyTime]; ok {
Expand Down
15 changes: 12 additions & 3 deletions plugins/flusher/loki/flusher_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ import (
"errors"
"time"

"github.com/grafana/loki-client-go/pkg/labelutil"

"github.com/grafana/loki-client-go/loki"
"github.com/grafana/loki-client-go/pkg/backoff"
"github.com/grafana/loki-client-go/pkg/labelutil"
"github.com/grafana/loki-client-go/pkg/urlutil"
promconf "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -73,6 +72,8 @@ type convertConfig struct {
// Convert encoding, default value:json
// The options are: 'json'
Encoding string
// Include only contents in the final result.
OnlyContents bool
}

func NewFlusherLoki() *FlusherLoki {
Expand Down Expand Up @@ -179,7 +180,15 @@ func (f *FlusherLoki) Stop() error {
func (f *FlusherLoki) getConverter() (*converter.Converter, error) {
logger.Debug(f.context.GetRuntimeContext(), "[ilogtail data convert config] Protocol", f.Convert.Protocol,
"Encoding", f.Convert.Encoding, "TagFieldsRename", f.Convert.TagFieldsRename, "ProtocolFieldsRename", f.Convert.ProtocolFieldsRename)
return converter.NewConverter(f.Convert.Protocol, f.Convert.Encoding, f.Convert.TagFieldsRename, f.Convert.ProtocolFieldsRename)
cvt, err := converter.NewConverter(f.Convert.Protocol, f.Convert.Encoding, f.Convert.TagFieldsRename, f.Convert.ProtocolFieldsRename)
if err != nil {
return nil, err
}
// only custom_single_flatten support contents_only
if f.Convert.Protocol == converter.ProtocolCustomSingleFlatten && f.Convert.OnlyContents {
cvt.OnlyContents = true
}
return cvt, nil
}

func (f *FlusherLoki) buildLokiConfig() (loki.Config, error) {
Expand Down