Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse a byte buffer for holding XML #3118

Merged
merged 2 commits into from
Dec 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ https://github.com/elastic/beats/compare/v5.0.1...master[Check the HEAD diff]
*Winlogbeat*

- Add `event_logs.batch_read_size` configuration option. {pull}2641[2641]
- Reduced amount of memory allocated while reading event log records. {pull}3113[3113] {pull}3118[3113]

==== Deprecated

Expand Down
35 changes: 20 additions & 15 deletions winlogbeat/eventlog/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package eventlog

import (
"fmt"
"io"
"syscall"
"time"

Expand Down Expand Up @@ -75,9 +76,10 @@ type winEventLog struct {
maxRead int // Maximum number returned in one Read.
lastRead uint64 // Record number of the last read event.

render func(event win.EvtHandle) (string, error) // Function for rendering the event to XML.
renderBuf []byte // Buffer used for rendering event.
cache *messageFilesCache // Cached mapping of source name to event message file handles.
render func(event win.EvtHandle, out io.Writer) error // Function for rendering the event to XML.
renderBuf []byte // Buffer used for rendering event.
outputBuf *sys.ByteBuffer // Buffer for receiving XML
cache *messageFilesCache // Cached mapping of source name to event message file handles.

logPrefix string // String to prefix on log messages.
eventMetadata common.EventMetadata // Field and tags to add to each event.
Expand Down Expand Up @@ -132,20 +134,22 @@ func (l *winEventLog) Read() ([]Record, error) {

var records []Record
for _, h := range handles {
x, err := l.render(h)
l.outputBuf.Reset()
err := l.render(h, l.outputBuf)
if bufErr, ok := err.(sys.InsufficientBufferError); ok {
detailf("%s Increasing render buffer size to %d", l.logPrefix,
bufErr.RequiredSize)
l.renderBuf = make([]byte, bufErr.RequiredSize)
x, err = l.render(h)
l.outputBuf.Reset()
err = l.render(h, l.outputBuf)
}
if err != nil && x == "" {
if err != nil && l.outputBuf.Len() == 0 {
logp.Err("%s Dropping event with rendering error. %v", l.logPrefix, err)
incrementMetric(dropReasons, err)
continue
}

r, err := l.buildRecordFromXML(x, err)
r, err := l.buildRecordFromXML(l.outputBuf.Bytes(), err)
if err != nil {
logp.Err("%s Dropping event. %v", l.logPrefix, err)
incrementMetric(dropReasons, err)
Expand Down Expand Up @@ -192,8 +196,8 @@ func (l *winEventLog) eventHandles(maxRead int) ([]win.EvtHandle, int, error) {
}
}

func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record, error) {
e, err := sys.UnmarshalEventXML([]byte(x))
func (l *winEventLog) buildRecordFromXML(x []byte, recoveredErr error) (Record, error) {
e, err := sys.UnmarshalEventXML(x)
if err != nil {
return Record{}, fmt.Errorf("Failed to unmarshal XML='%s'. %v", x, err)
}
Expand All @@ -213,7 +217,7 @@ func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record,
}

if logp.IsDebug(detailSelector) {
detailf("%s XML=%s Event=%+v", l.logPrefix, x, e)
detailf("%s XML=%s Event=%+v", l.logPrefix, string(x), e)
}

r := Record{
Expand All @@ -223,7 +227,7 @@ func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record,
}

if l.config.IncludeXML {
r.XML = x
r.XML = string(x)
}

return r, nil
Expand Down Expand Up @@ -270,6 +274,7 @@ func newWinEventLog(options map[string]interface{}) (EventLog, error) {
channelName: c.Name,
maxRead: c.BatchReadSize,
renderBuf: make([]byte, renderBufferSize),
outputBuf: sys.NewByteBuffer(renderBufferSize),
cache: newMessageFilesCache(c.Name, eventMetadataHandle, freeHandle),
logPrefix: fmt.Sprintf("WinEventLog[%s]", c.Name),
eventMetadata: c.EventMetadata,
Expand All @@ -281,12 +286,12 @@ func newWinEventLog(options map[string]interface{}) (EventLog, error) {
switch {
case c.Forwarded == nil && c.Name == "ForwardedEvents",
c.Forwarded != nil && *c.Forwarded == true:
l.render = func(event win.EvtHandle) (string, error) {
return win.RenderEventXML(event, l.renderBuf)
l.render = func(event win.EvtHandle, out io.Writer) error {
return win.RenderEventXML(event, l.renderBuf, out)
}
default:
l.render = func(event win.EvtHandle) (string, error) {
return win.RenderEvent(event, 0, l.renderBuf, l.cache.get)
l.render = func(event win.EvtHandle, out io.Writer) error {
return win.RenderEvent(event, 0, l.renderBuf, l.cache.get, out)
}
}

Expand Down
46 changes: 46 additions & 0 deletions winlogbeat/sys/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package sys

// ByteBuffer is an expandable buffer backed by a byte slice.
type ByteBuffer struct {
buf []byte
offset int
}

// NewByteBuffer creates a new ByteBuffer with an initial capacity of
// initialSize.
func NewByteBuffer(initialSize int) *ByteBuffer {
return &ByteBuffer{buf: make([]byte, initialSize)}
}

// Write appends the contents of p to the buffer, growing the buffer as needed.
// The return value is the length of p; err is always nil.
func (b *ByteBuffer) Write(p []byte) (int, error) {
if len(b.buf) < b.offset+len(p) {
// Create a buffer larger than needed so we don't spend lots of time
// allocating and copying.
spaceNeeded := len(b.buf) - b.offset + len(p)
largerBuf := make([]byte, 2*len(b.buf)+spaceNeeded)
copy(largerBuf, b.buf[:b.offset])
b.buf = largerBuf
}
n := copy(b.buf[b.offset:], p)
b.offset += n
return n, nil
}

// Reset resets the buffer to be empty. It retains the same underlying storage.
func (b *ByteBuffer) Reset() {
b.offset = 0
b.buf = b.buf[:cap(b.buf)]
}

// Bytes returns a slice of length b.Len() holding the bytes that have been
// written to the buffer.
func (b *ByteBuffer) Bytes() []byte {
return b.buf[:b.offset]
}

// Len returns the number of bytes that have been written to the buffer.
func (b *ByteBuffer) Len() int {
return b.offset
}
102 changes: 102 additions & 0 deletions winlogbeat/sys/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package sys

import (
"bytes"
"io"
"testing"

"github.com/stretchr/testify/assert"
)

var _ io.Writer = &ByteBuffer{}

func TestByteBuffer(t *testing.T) {
input := "hello"
length := len(input)
buf := NewByteBuffer(1024)

n, err := buf.Write([]byte(input))
if err != nil {
t.Fatal(err)
}
assert.Equal(t, length, n)

assert.Equal(t, input, string(buf.Bytes()))
assert.Equal(t, length, len(buf.Bytes()))
assert.Equal(t, length, buf.Len())
}

func TestByteBufferGrow(t *testing.T) {
input := "hello"
length := len(input)
buf := NewByteBuffer(0)

n, err := buf.Write([]byte(input))
if err != nil {
t.Fatal(err)
}
assert.Equal(t, length, n)

assert.Equal(t, input, string(buf.Bytes()))
assert.Equal(t, length, len(buf.Bytes()))
assert.Equal(t, length, buf.Len())
assert.Equal(t, length, len(buf.buf))

n, err = buf.Write([]byte(input))
if err != nil {
t.Fatal(err)
}
assert.Equal(t, length, n)

assert.Equal(t, input+input, string(buf.Bytes()))
assert.Equal(t, 2*length, len(buf.Bytes()))
assert.Equal(t, 2*length, buf.Len())
}

func BenchmarkByteBuffer(b *testing.B) {
input := []byte("test writing this sentence to a buffer")

b.Run("byteBuffer", func(b *testing.B) {
buf := NewByteBuffer(1024)
b.ResetTimer()

for i := 0; i < b.N; i++ {
buf.Write(input)
buf.Bytes()
buf.Reset()
}
})

b.Run("bytes.Buffer", func(b *testing.B) {
buf := bytes.NewBuffer(make([]byte, 0, 1024))
b.ResetTimer()

for i := 0; i < b.N; i++ {
buf.Write(input)
buf.Bytes()
buf.Reset()
}
})
}

func BenchmarkByteBufferGrow(b *testing.B) {
b.Run("byteBuffer", func(b *testing.B) {
buf := NewByteBuffer(0)
b.ResetTimer()

for i := 0; i < b.N; i++ {
buf.Write([]byte("a"))
buf.Bytes()
}
})

b.Run("bytes.Buffer", func(b *testing.B) {
buf := bytes.NewBuffer(make([]byte, 0))
b.ResetTimer()

for i := 0; i < b.N; i++ {
buf.Write([]byte("a"))
buf.Bytes()
}
})
}
58 changes: 58 additions & 0 deletions winlogbeat/sys/strings.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,69 @@
package sys

import (
"errors"
"fmt"
"io"
"strings"
"unicode/utf16"
"unicode/utf8"
)

// The conditions replacementChar==unicode.ReplacementChar and
// maxRune==unicode.MaxRune are verified in the tests.
// Defining them locally avoids this package depending on package unicode.

const (
replacementChar = '\uFFFD' // Unicode replacement character
maxRune = '\U0010FFFF' // Maximum valid Unicode code point.
)

const (
// 0xd800-0xdc00 encodes the high 10 bits of a pair.
// 0xdc00-0xe000 encodes the low 10 bits of a pair.
// the value is those 20 bits plus 0x10000.
surr1 = 0xd800
surr2 = 0xdc00
surr3 = 0xe000

surrSelf = 0x10000
)

var ErrBufferTooSmall = errors.New("buffer too small")

func UTF16ToUTF8Bytes(in []byte, out io.Writer) error {
if len(in)%2 != 0 {
return fmt.Errorf("input buffer must have an even length (length=%d)", len(in))
}

var runeBuf [4]byte
var v1, v2 uint16
for i := 0; i < len(in); i += 2 {
v1 = uint16(in[i]) | uint16(in[i+1])<<8

switch {
case v1 < surr1, surr3 <= v1:
n := utf8.EncodeRune(runeBuf[:], rune(v1))
out.Write(runeBuf[:n])
case surr1 <= v1 && v1 < surr2 && len(in) > i+2:
v2 = uint16(in[i+2]) | uint16(in[i+3])<<8
if surr2 <= v2 && v2 < surr3 {
// valid surrogate sequence
r := utf16.DecodeRune(rune(v1), rune(v2))
n := utf8.EncodeRune(runeBuf[:], r)
out.Write(runeBuf[:n])
}
i += 2
default:
// invalid surrogate sequence
n := utf8.EncodeRune(runeBuf[:], replacementChar)
out.Write(runeBuf[:n])
}
}

return nil
}

// UTF16BytesToString returns a string that is decoded from the UTF-16 bytes.
// The byte slice must be of even length otherwise an error will be returned.
// The integer returned is the offset to the start of the next string with
Expand Down
23 changes: 23 additions & 0 deletions winlogbeat/sys/strings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,26 @@ func BenchmarkUTF16BytesToString(b *testing.B) {
}
})
}

func TestUTF16ToUTF8(t *testing.T) {
input := "abc白鵬翔\u145A6"
utf16Bytes := toUTF16Bytes(input)

outputBuf := &bytes.Buffer{}
err := UTF16ToUTF8Bytes(utf16Bytes, outputBuf)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, []byte(input), outputBuf.Bytes())
}

func BenchmarkUTF16ToUTF8(b *testing.B) {
utf16Bytes := toUTF16Bytes("A logon was attempted using explicit credentials.")
outputBuf := &bytes.Buffer{}
b.ResetTimer()

for i := 0; i < b.N; i++ {
UTF16ToUTF8Bytes(utf16Bytes, outputBuf)
outputBuf.Reset()
}
}
Loading