Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,11 @@ filebeat.inputs:
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
#ignore_older: 0

# Ignore files that have not been updated since the selected event.
# ignore_inactive is disabled by default, so no files are ignored by setting it to "".
# Available options: since_first_start, since_last_start.
#ignore_inactive: ""

# Defines the buffer size every harvester uses when fetching the file
#harvester_buffer_size: 16384

Expand Down
5 changes: 5 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,11 @@ filebeat.inputs:
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
#ignore_older: 0

# Ignore files that have not been updated since the selected event.
# ignore_inactive is disabled by default, so no files are ignored by setting it to "".
# Available options: since_first_start, since_last_start.
#ignore_inactive: ""

# Defines the buffer size every harvester uses when fetching the file
#harvester_buffer_size: 16384

Expand Down
1 change: 1 addition & 0 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type config struct {
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint32 `config:"harvester_limit" validate:"min=0"`
IgnoreOlder time.Duration `config:"ignore_older"`
IgnoreInactive ignoreInactiveType `config:"ignore_inactive"`
}

type closerConfig struct {
Expand Down
57 changes: 51 additions & 6 deletions filebeat/input/filestream/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,48 @@
package filestream

import (
"fmt"
"time"

"github.com/urso/sderr"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert/unison"
)

type ignoreInactiveType uint8

const (
prospectorDebugKey = "file_prospector"
InvalidIgnoreInactive = iota
IgnoreInactiveSinceLastStart
IgnoreInactiveSinceFirstStart

ignoreInactiveSinceLastStartStr = "since_last_start"
ignoreInactiveSinceFirstStartStr = "since_first_start"
prospectorDebugKey = "file_prospector"
)

var (
ignoreInactiveSettings = map[string]ignoreInactiveType{
ignoreInactiveSinceLastStartStr: IgnoreInactiveSinceLastStart,
ignoreInactiveSinceFirstStartStr: IgnoreInactiveSinceFirstStart,
}
)

// fileProspector implements the Prospector interface.
// It contains a file scanner which returns file system events.
// The FS events then trigger either new Harvester runs or updates
// the statestore.
type fileProspector struct {
filewatcher loginp.FSWatcher
identifier fileIdentifier
ignoreOlder time.Duration
cleanRemoved bool
stateChangeCloser stateChangeCloserConfig
filewatcher loginp.FSWatcher
identifier fileIdentifier
ignoreOlder time.Duration
ignoreInactiveSince ignoreInactiveType
cleanRemoved bool
stateChangeCloser stateChangeCloserConfig
}

func (p *fileProspector) Init(cleaner loginp.ProspectorCleaner) error {
Expand Down Expand Up @@ -101,6 +119,8 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h
})

tg.Go(func() error {
ignoreInactiveSince := getIgnoreSince(p.ignoreInactiveSince, ctx.Agent)

for ctx.Cancelation.Err() == nil {
fe := p.filewatcher.Event()

Expand Down Expand Up @@ -130,6 +150,11 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h
break
}
}
if !ignoreInactiveSince.IsZero() && fe.Info.ModTime().Sub(ignoreInactiveSince) <= 0 {
log.Debugf("Ignore file because ignore_since.* reached time %v. File %s", p.ignoreInactiveSince, fe.NewPath)
break
}

hg.Start(ctx, src)

case loginp.OpTruncate:
Expand Down Expand Up @@ -217,3 +242,23 @@ func (p *fileProspector) stopHarvesterGroup(log *logp.Logger, hg loginp.Harveste
func (p *fileProspector) Test() error {
panic("TODO: implement me")
}

func getIgnoreSince(t ignoreInactiveType, info beat.Info) time.Time {
switch t {
case IgnoreInactiveSinceLastStart:
return info.StartTime
case IgnoreInactiveSinceFirstStart:
return info.FirstStart
default:
return time.Time{}
}
}

func (t *ignoreInactiveType) Unpack(v string) error {
val, ok := ignoreInactiveSettings[v]
if !ok {
return fmt.Errorf("invalid ignore_inactive setting: %s", v)
}
*t = val
return nil
}
8 changes: 7 additions & 1 deletion libbeat/beat/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package beat

import "github.com/gofrs/uuid"
import (
"time"

"github.com/gofrs/uuid"
)

// Info stores a beats instance meta data.
type Info struct {
Expand All @@ -29,6 +33,8 @@ type Info struct {
Hostname string // hostname
ID uuid.UUID // ID assigned to beat machine
EphemeralID uuid.UUID // ID assigned to beat process invocation (PID)
FirstStart time.Time // The time of the first start of the Beat.
StartTime time.Time // The time of last start of the Beat. Updated when the Beat is started or restarted.

// Monitoring-related fields
Monitoring struct {
Expand Down
16 changes: 13 additions & 3 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ func NewBeat(name, indexPrefix, v string, elasticLicensed bool) (*Beat, error) {
Name: hostname,
Hostname: hostname,
ID: id,
FirstStart: time.Now(),
StartTime: time.Now(),
EphemeralID: metrics.EphemeralID(),
},
Fields: fields,
Expand Down Expand Up @@ -695,7 +697,8 @@ func (b *Beat) configure(settings Settings) error {

func (b *Beat) loadMeta(metaPath string) error {
type meta struct {
UUID uuid.UUID `json:"uuid"`
UUID uuid.UUID `json:"uuid"`
FirstStart time.Time `json:"first_start"`
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will need some testing. the stdlib JSON encoder/decoder might not play well with that type.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my manual tests, it worked like a charm. What might be the problem?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern would be that timestamp serialization and parsing is not supported. If it works +1. We have some tests for loadMeta I think. Still would be nice to enhance the existing tests.

}

logp.Debug("beat", "Beat metadata path: %v", metaPath)
Expand All @@ -713,14 +716,21 @@ func (b *Beat) loadMeta(metaPath string) error {
}

f.Close()

if !m.FirstStart.IsZero() {
b.Info.FirstStart = m.FirstStart
}
valid := m.UUID != uuid.Nil
if valid {
b.Info.ID = m.UUID
}

if valid && !m.FirstStart.IsZero() {
return nil
}
}

// file does not exist or ID is invalid, let's create a new one
// file does not exist or ID is invalid or first start time is not defined, let's create a new one

// write temporary file first
tempFile := metaPath + ".new"
Expand All @@ -729,7 +739,7 @@ func (b *Beat) loadMeta(metaPath string) error {
return fmt.Errorf("Failed to create Beat meta file: %s", err)
}

encodeErr := json.NewEncoder(f).Encode(meta{UUID: b.Info.ID})
encodeErr := json.NewEncoder(f).Encode(meta{UUID: b.Info.ID, FirstStart: b.Info.FirstStart})
err = f.Sync()
if err != nil {
return fmt.Errorf("Beat meta file failed to write: %s", err)
Expand Down
28 changes: 28 additions & 0 deletions libbeat/cmd/instance/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,31 @@ func TestEmptyMetaJson(t *testing.T) {
assert.Equal(t, nil, err, "Unable to load meta file properly")
assert.NotEqual(t, uuid.Nil, b.Info.ID, "Beats UUID is not set")
}

func TestMetaJsonWithTimestamp(t *testing.T) {
firstBeat, err := NewBeat("filebeat", "testidx", "0.9", false)
if err != nil {
panic(err)
}
firstStart := firstBeat.Info.FirstStart

metaFile, err := ioutil.TempFile("../test", "meta.json")
assert.Equal(t, nil, err, "Unable to create temporary meta file")

metaPath := metaFile.Name()
metaFile.Close()
defer os.Remove(metaPath)

err = firstBeat.loadMeta(metaPath)
assert.Equal(t, nil, err, "Unable to load meta file properly")

secondBeat, err := NewBeat("filebeat", "testidx", "0.9", false)
if err != nil {
panic(err)
}
assert.False(t, firstStart.Equal(secondBeat.Info.FirstStart), "Before meta.json is loaded, first start must be different")
secondBeat.loadMeta(metaPath)

assert.Equal(t, nil, err, "Unable to load meta file properly")
assert.True(t, firstStart.Equal(secondBeat.Info.FirstStart), "Cannot load first start")
}
5 changes: 5 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2578,6 +2578,11 @@ filebeat.inputs:
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
#ignore_older: 0

# Ignore files that have not been updated since the selected event.
# ignore_inactive is disabled by default, so no files are ignored by setting it to "".
# Available options: since_first_start, since_last_start.
#ignore_inactive: ""

# Defines the buffer size every harvester uses when fetching the file
#harvester_buffer_size: 16384

Expand Down