Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batch_read_size config to Winlogbeat #2641

Merged
merged 2 commits into from
Oct 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ https://github.com/elastic/beats/compare/v5.0.0-beta1...master[Check the HEAD di

*Filebeat*

- Add command line option -once to run filebeat only once and then close {pull}2456[2456]
- Add command line option -once to run filebeat only once and then close. {pull}2456[2456]

*Winlogbeat*

- Add `event_logs.batch_read_size` configuration option. {pull}2641[2641]

==== Deprecated

*Affecting all Beats*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ winlogbeat.event_logs:
- name: Application
--------------------------------------------------------------------------------

===== event_logs.batch_read_size

The maximum number of event log records to read from the Windows API in a single
batch. The default batch size is 100. *{vista_and_newer}*

Winlogbeat starts a goroutine (a lightweight thread) to read from each
individual event log. The goroutine reads a batch of event log records using the
Windows API, applies any processors to the events, publishes them to the
configured outputs, and waits for an acknowledgement from the outputs before
reading additional event log records.

[[configuration-winlogbeat-options-event_logs-name]]
===== event_logs.name

Expand Down
29 changes: 16 additions & 13 deletions winlogbeat/eventlog/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ import (
)

const (
// defaultMaxNumRead is the maximum number of event Read will return.
defaultMaxNumRead = 100

// renderBufferSize is the size in bytes of the buffer used to render events.
renderBufferSize = 1 << 14

Expand All @@ -28,15 +25,21 @@ const (
winEventLogAPIName = "wineventlog"
)

var winEventLogConfigKeys = append(commonConfigKeys, "ignore_older", "include_xml",
"event_id", "forwarded", "level", "provider")
var winEventLogConfigKeys = append(commonConfigKeys, "batch_read_size",
"ignore_older", "include_xml", "event_id", "forwarded", "level", "provider")

type winEventLogConfig struct {
ConfigCommon `config:",inline"`
IncludeXML bool `config:"include_xml"`
Forwarded *bool `config:"forwarded"`
SimpleQuery query `config:",inline"`
Raw map[string]interface{} `config:",inline"`
ConfigCommon `config:",inline"`
BatchReadSize int `config:"batch_read_size"` // Maximum number of events that Read will return.
IncludeXML bool `config:"include_xml"`
Forwarded *bool `config:"forwarded"`
SimpleQuery query `config:",inline"`
Raw map[string]interface{} `config:",inline"`
}

// defaultWinEventLogConfig is the default configuration for new wineventlog readers.
var defaultWinEventLogConfig = winEventLogConfig{
BatchReadSize: 100,
}

// query contains parameters used to customize the event log data that is
Expand Down Expand Up @@ -121,7 +124,7 @@ func (l *winEventLog) Read() ([]Record, error) {
return nil, nil
}
if err != nil {
logp.Warn("%s EventHandles returned error %v Errno: %d", l.logPrefix, err)
logp.Warn("%s EventHandles returned error %v", l.logPrefix, err)
return nil, err
}
defer func() {
Expand Down Expand Up @@ -218,7 +221,7 @@ func reportDrop(reason interface{}) {
// newWinEventLog creates and returns a new EventLog for reading event logs
// using the Windows Event Log.
func newWinEventLog(options map[string]interface{}) (EventLog, error) {
var c winEventLogConfig
c := defaultWinEventLogConfig
if err := readConfig(options, &c, winEventLogConfigKeys); err != nil {
return nil, err
}
Expand Down Expand Up @@ -254,7 +257,7 @@ func newWinEventLog(options map[string]interface{}) (EventLog, error) {
config: c,
query: query,
channelName: c.Name,
maxRead: defaultMaxNumRead,
maxRead: c.BatchReadSize,
renderBuf: make([]byte, renderBufferSize),
cache: newMessageFilesCache(c.Name, eventMetadataHandle, freeHandle),
logPrefix: fmt.Sprintf("WinEventLog[%s]", c.Name),
Expand Down
54 changes: 54 additions & 0 deletions winlogbeat/eventlog/wineventlog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// +build windows

package eventlog

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestWinEventLogBatchReadSize(t *testing.T) {
configureLogp()
log, err := initLog(providerName, sourceName, eventCreateMsgFile)
if err != nil {
t.Fatal(err)
}
defer func() {
err := uninstallLog(providerName, sourceName, log)
if err != nil {
t.Fatal(err)
}
}()

// Publish test messages:
for k, m := range messages {
err = log.Report(m.eventType, k, []string{m.message})
if err != nil {
t.Fatal(err)
}
}

batchReadSize := 2
eventlog, err := newWinEventLog(map[string]interface{}{"name": providerName, "batch_read_size": batchReadSize})
if err != nil {
t.Fatal(err)
}
err = eventlog.Open(0)
if err != nil {
t.Fatal(err)
}
defer func() {
err := eventlog.Close()
if err != nil {
t.Fatal(err)
}
}()

records, err := eventlog.Read()
if err != nil {
t.Fatal(err)
}

assert.Len(t, records, batchReadSize)
}