Skip to content

Commit

Permalink
Make the Elaticsearch index as pattern
Browse files Browse the repository at this point in the history
This transforms the fixed index into a pattern, somehow similar to how
Logstash is doing it. However, logstash is using the Joda format, for which
we don't have Go libraries. So instead of writing `filebeat-%{+YYYY.MM.dd}` you
would need to write `filebeat-%{+2006.01.02}` (i.e. layout by example).

Because the Go layouts don't support ISO 8601 weeks, in order to support weekly
indices, the special `isoweek` keyword was introduced, which is the equivalent
of Joda `xxxx.ww`. The layout `filebeat-%{+isoweek}` results in `filebeat-2016.29`
for today.

Part of elastic#2074. Closes elastic#921.
  • Loading branch information
Tudor Golubenco committed Jul 21, 2016
1 parent 245dd0a commit 42d8c57
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 22 deletions.
70 changes: 52 additions & 18 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"

"github.com/elastic/beats/libbeat/common"
Expand All @@ -21,8 +22,8 @@ import (

type Client struct {
Connection
index string
params map[string]string
indexPattern string
params map[string]string

// buffered bulk requests
bulkRequ *bulkRequest
Expand Down Expand Up @@ -75,7 +76,7 @@ var (
)

func NewClient(
esURL, index string, proxyURL *url.URL, tls *tls.Config,
esURL, indexPattern string, proxyURL *url.URL, tls *tls.Config,
username, password string,
params map[string]string,
timeout time.Duration,
Expand Down Expand Up @@ -127,8 +128,8 @@ func NewClient(
},
encoder: encoder,
},
index: index,
params: params,
indexPattern: indexPattern,
params: params,

bulkRequ: bulkRequ,

Expand All @@ -155,7 +156,7 @@ func (client *Client) Clone() *Client {
transport := client.http.Transport.(*http.Transport)
c, _ := NewClient(
client.URL,
client.index,
client.indexPattern,
client.proxyURL,
transport.TLSClientConfig,
client.Username,
Expand Down Expand Up @@ -186,7 +187,7 @@ func (client *Client) PublishEvents(

// encode events into bulk request buffer, dropping failed elements from
// events slice
events = bulkEncodePublishRequest(body, client.index, events)
events = bulkEncodePublishRequest(body, client.indexPattern, events)
if len(events) == 0 {
return nil, nil
}
Expand Down Expand Up @@ -224,16 +225,16 @@ func (client *Client) PublishEvents(
return nil, nil
}

// fillBulkRequest encodes all bulk requests and returns slice of events
// bulkEncodePublishRequest encodes all bulk requests and returns slice of events
// successfully added to bulk request.
func bulkEncodePublishRequest(
body bulkWriter,
index string,
indexPattern string,
events []common.MapStr,
) []common.MapStr {
okEvents := events[:0]
for _, event := range events {
meta := eventBulkMeta(index, event)
meta := eventBulkMeta(indexPattern, event)
err := body.Add(meta, event)
if err != nil {
logp.Err("Failed to encode event: %s", err)
Expand All @@ -245,8 +246,8 @@ func bulkEncodePublishRequest(
return okEvents
}

func eventBulkMeta(index string, event common.MapStr) bulkMeta {
index = getIndex(event, index)
func eventBulkMeta(indexPattern string, event common.MapStr) bulkMeta {
index := getIndex(event, indexPattern)
meta := bulkMeta{
Index: bulkMetaIndex{
Index: index,
Expand All @@ -259,31 +260,64 @@ func eventBulkMeta(index string, event common.MapStr) bulkMeta {
// getIndex returns the full index name
// Index is either defined in the config as part of the output
// or can be overload by the event through setting index
func getIndex(event common.MapStr, index string) string {
func getIndex(event common.MapStr, indexPattern string) string {

ts := time.Time(event["@timestamp"].(common.Time)).UTC()

// Check for dynamic index
// XXX: is this used/needed?
if _, ok := event["beat"]; ok {
beatMeta, ok := event["beat"].(common.MapStr)
if ok {
// Check if index is set dynamically
if dynamicIndex, ok := beatMeta["index"]; ok {
dynamicIndexValue, ok := dynamicIndex.(string)
if ok {
index = dynamicIndexValue
// Form the index by appending the timestamp to the
// dynamicIndexValue
return fmt.Sprintf("%s-%d.%02d.%02d", dynamicIndexValue,
ts.Year(), ts.Month(), ts.Day())
}
}
}
}

// Append timestamp to index
index = fmt.Sprintf("%s-%d.%02d.%02d", index,
ts.Year(), ts.Month(), ts.Day())
return formatIndex(indexPattern, ts)
}

// formatIndex applies a timestamp in a pattern like beatname-%{+2006.01.02}
func formatIndex(indexPattern string, ts time.Time) string {

index := indexPattern
for {
// find %{+...}
start := strings.Index(index, "%{+")
if start == -1 {
break
}
length := strings.Index(index[start:], "}") + 1
if length == 0 {
break
}

patternStart := start + 3
patternEnd := start + length - 1

formattedTs := formatTs(index[patternStart:patternEnd], ts)

index = index[0:start] + formattedTs + index[start+length:]
}
return index
}

func formatTs(layout string, ts time.Time) string {
if layout == "isoweek" {
year, week := ts.ISOWeek()
return fmt.Sprintf("%04d.%02d", year, week)
}
return ts.Format(layout)
}

// bulkCollectPublishFails checks per item errors returning all events
// to be tried again due to error code returned for that items. If indexing an
// event failed due to some error in the event itself (e.g. does not respect mapping),
Expand Down Expand Up @@ -427,7 +461,7 @@ func itemStatusInner(reader *jsonReader) (int, []byte, error) {
}

func (client *Client) PublishEvent(event common.MapStr) error {
index := getIndex(event, client.index)
index := getIndex(event, client.indexPattern)
debugf("Publish event: %s", event)

// insert the events one by one
Expand Down
42 changes: 42 additions & 0 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func TestCollectPublishFailAll(t *testing.T) {
assert.Equal(t, events, res)
}

/*
func TestGetIndexStandard(t *testing.T) {
time := time.Now().UTC()
Expand All @@ -131,6 +132,7 @@ func TestGetIndexStandard(t *testing.T) {
index := getIndex(event, "beatname")
assert.Equal(t, index, "beatname-"+extension)
}
*/

func TestGetIndexOverwrite(t *testing.T) {

Expand Down Expand Up @@ -215,3 +217,43 @@ func BenchmarkCollectPublishFailAll(b *testing.B) {
}
}
}

func TestFormatIndex(t *testing.T) {
ts, err := time.Parse("Jan 2 15:04:05 2006", "Feb 3 16:05:06 2007")
assert.NoError(t, err)

tests := []struct {
Input string
Expected string
}{
{"beatname-%{+2006.01.02}", "beatname-2007.02.03"}, // daily
{"beatname-%{+2006.01.02-15}", "beatname-2007.02.03-16"}, // hourly
{"filebeat-%{+isoweek}", "filebeat-2007.05"}, // weekly
{"filebeat", "filebeat"}, // using alias/rollover
{"beatname-%{+2006.01.02}-%{+Mon}", "beatname-2007.02.03-Sat"},
{"beatname-%{+2006.01.02}-%{+Mon}urday", "beatname-2007.02.03-Saturday"},
{"filebeat-1", "filebeat-1"},
{"filebeat-test-%{+06-01}-1", "filebeat-test-07-02-1"},
{"filebeat-%{+2006.01.02-15:04:05.000}", "filebeat-2007.02.03-16:05:06.000"},
}

for _, test := range tests {
res := formatIndex(test.Input, ts)
assert.Equal(t, test.Expected, res, "Failed on format: %s", test.Input)
}

}

func BenchmarkFormatIndex(b *testing.B) {
ts, err := time.Parse("Jan 2 15:04:05 2006", "Feb 3 16:05:06 2007")
if err != nil {
b.Fatal(err)
}

for i := 0; i < b.N; i++ {
res := formatIndex("beatname-%{+2006.01.02}", ts)
if res != "beatname-2007.02.03" {
b.Fail()
}
}
}
8 changes: 4 additions & 4 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ func New(beatName string, cfg *common.Config, topologyExpire int) (outputs.Outpu
cfg.SetInt("bulk_max_size", -1, defaultBulkSize)
}

if !cfg.HasField("index") {
cfg.SetString("index", -1, beatName)
}

output := &elasticsearchOutput{beatName: beatName}
err := output.init(cfg, topologyExpire)
if err != nil {
Expand All @@ -77,6 +73,10 @@ func (out *elasticsearchOutput) init(
return err
}

if config.Index == "" {
config.Index = out.beatName + "-%{+2006.01.02}"
}

tlsConfig, err := outputs.LoadTLSConfig(config.TLS)
if err != nil {
return err
Expand Down

0 comments on commit 42d8c57

Please sign in to comment.