Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions filebeat/input/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/reader/json"
"github.com/elastic/beats/filebeat/reader/multiline"
"github.com/elastic/beats/filebeat/reader/readjson"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -100,7 +100,7 @@ type config struct {
IncludeLines []match.Matcher `config:"include_lines"`
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"`
Multiline *multiline.Config `config:"multiline"`
JSON *json.Config `config:"json"`
JSON *readjson.Config `config:"json"`

// Hidden on purpose, used by the docker input:
DockerJSON *struct {
Expand Down
21 changes: 9 additions & 12 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,10 @@ import (
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/reader"
"github.com/elastic/beats/filebeat/reader/docker_json"
"github.com/elastic/beats/filebeat/reader/encode"
"github.com/elastic/beats/filebeat/reader/encode/encoding"
"github.com/elastic/beats/filebeat/reader/json"
"github.com/elastic/beats/filebeat/reader/limit"
"github.com/elastic/beats/filebeat/reader/multiline"
"github.com/elastic/beats/filebeat/reader/strip_newline"
"github.com/elastic/beats/filebeat/reader/readfile"
"github.com/elastic/beats/filebeat/reader/readfile/encoding"
"github.com/elastic/beats/filebeat/reader/readjson"
"github.com/elastic/beats/filebeat/util"
)

Expand Down Expand Up @@ -320,7 +317,7 @@ func (h *Harvester) Run() error {
}

if h.config.JSON != nil && len(jsonFields) > 0 {
ts := json.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON)
ts := readjson.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON)
if !ts.IsZero() {
// there was a `@timestamp` key in the event, so overwrite
// the resulting timestamp
Expand Down Expand Up @@ -552,21 +549,21 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
return nil, err
}

r, err = encode.New(h.log, h.encoding, h.config.BufferSize)
r, err = readfile.NewEncodeReader(h.log, h.encoding, h.config.BufferSize)
if err != nil {
return nil, err
}

if h.config.DockerJSON != nil {
// Docker json-file format, add custom parsing to the pipeline
r = docker_json.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial)
r = readjson.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial)
}

if h.config.JSON != nil {
r = json.New(r, h.config.JSON)
r = readjson.NewJSONReader(r, h.config.JSON)
}

r = strip_newline.New(r)
r = readfile.NewStripNewline(r)

if h.config.Multiline != nil {
r, err = multiline.New(r, "\n", h.config.MaxBytes, h.config.Multiline)
Expand All @@ -575,5 +572,5 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
}
}

return limit.New(r, h.config.MaxBytes), nil
return readfile.NewLimitReader(r, h.config.MaxBytes), nil
}
2 changes: 1 addition & 1 deletion filebeat/input/log/harvester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/reader"
"github.com/elastic/beats/filebeat/reader/encode/encoding"
"github.com/elastic/beats/filebeat/reader/readfile/encoding"
"github.com/elastic/beats/libbeat/common"
)

Expand Down
4 changes: 2 additions & 2 deletions filebeat/reader/multiline/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"time"

"github.com/elastic/beats/filebeat/reader"
"github.com/elastic/beats/filebeat/reader/timeout"
"github.com/elastic/beats/filebeat/reader/readfile"
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/logp"
)
Expand Down Expand Up @@ -112,7 +112,7 @@ func New(
}

if tout > 0 {
r = timeout.New(r, sigMultilineTimeout, tout)
r = readfile.NewTimeoutReader(r, sigMultilineTimeout, tout)
}

mlr := &Reader{
Expand Down
9 changes: 4 additions & 5 deletions filebeat/reader/multiline/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/reader"
"github.com/elastic/beats/filebeat/reader/encode"
"github.com/elastic/beats/filebeat/reader/encode/encoding"
"github.com/elastic/beats/filebeat/reader/strip_newline"
"github.com/elastic/beats/filebeat/reader/readfile"
"github.com/elastic/beats/filebeat/reader/readfile/encoding"
"github.com/elastic/beats/libbeat/common/match"
)

Expand Down Expand Up @@ -190,12 +189,12 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reade
}

var r reader.Reader
r, err = encode.New(in, enc, 4096)
r, err = readfile.NewEncodeReader(in, enc, 4096)
if err != nil {
t.Fatalf("Failed to initialize line reader: %v", err)
}

r, err = New(strip_newline.New(r), "\n", 1<<20, &cfg)
r, err = New(readfile.NewStripNewline(r), "\n", 1<<20, &cfg)
if err != nil {
t.Fatalf("failed to initialize reader: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,36 @@
// specific language governing permissions and limitations
// under the License.

package encode
package readfile

import (
"io"
"time"

"github.com/elastic/beats/filebeat/reader"
"github.com/elastic/beats/filebeat/reader/encode/encoding"
"github.com/elastic/beats/filebeat/reader/line"
"github.com/elastic/beats/filebeat/reader/readfile/encoding"
)

// Reader produces lines by reading lines from an io.Reader
// through a decoder converting the reader it's encoding to utf-8.
type Reader struct {
reader *line.Reader
type EncoderReader struct {
reader *LineReader
}

// New creates a new Encode reader from input reader by applying
// the given codec.
func New(
func NewEncodeReader(
r io.Reader,
codec encoding.Encoding,
bufferSize int,
) (Reader, error) {
eReader, err := line.New(r, codec, bufferSize)
return Reader{eReader}, err
) (EncoderReader, error) {
eReader, err := NewLineReader(r, codec, bufferSize)
return EncoderReader{eReader}, err
}

// Next reads the next line from it's initial io.Reader
// This converts a io.Reader to a reader.reader
func (r Reader) Next() (reader.Message, error) {
func (r EncoderReader) Next() (reader.Message, error) {
c, sz, err := r.reader.Next()
// Creating message object
return reader.Message{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,26 @@
// specific language governing permissions and limitations
// under the License.

package limit
package readfile

import (
"github.com/elastic/beats/filebeat/reader"
)

// Reader sets an upper limited on line length. Lines longer
// then the max configured line length will be snapped short.
type Reader struct {
type LimitReader struct {
reader reader.Reader
maxBytes int
}

// New creates a new reader limiting the line length.
func New(r reader.Reader, maxBytes int) *Reader {
return &Reader{reader: r, maxBytes: maxBytes}
func NewLimitReader(r reader.Reader, maxBytes int) *LimitReader {
return &LimitReader{reader: r, maxBytes: maxBytes}
}

// Next returns the next line.
func (r *Reader) Next() (reader.Message, error) {
func (r *LimitReader) Next() (reader.Message, error) {
message, err := r.reader.Next()
if len(message.Content) > r.maxBytes {
message.Content = message.Content[:r.maxBytes]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package line
package readfile

import (
"io"
Expand All @@ -30,7 +30,7 @@ import (
// lineReader reads lines from underlying reader, decoding the input stream
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
type Reader struct {
type LineReader struct {
reader io.Reader
codec encoding.Encoding
bufferSize int
Expand All @@ -43,7 +43,7 @@ type Reader struct {
}

// New creates a new reader object
func New(input io.Reader, codec encoding.Encoding, bufferSize int) (*Reader, error) {
func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize int) (*LineReader, error) {
encoder := codec.NewEncoder()

// Create newline char based on encoding
Expand All @@ -52,7 +52,7 @@ func New(input io.Reader, codec encoding.Encoding, bufferSize int) (*Reader, err
return nil, err
}

return &Reader{
return &LineReader{
reader: input,
codec: codec,
bufferSize: bufferSize,
Expand All @@ -64,7 +64,7 @@ func New(input io.Reader, codec encoding.Encoding, bufferSize int) (*Reader, err
}

// Next reads the next line until the new line character
func (r *Reader) Next() ([]byte, int, error) {
func (r *LineReader) Next() ([]byte, int, error) {
// This loop is need in case advance detects an line ending which turns out
// not to be one when decoded. If that is the case, reading continues.
for {
Expand Down Expand Up @@ -108,7 +108,7 @@ func (r *Reader) Next() ([]byte, int, error) {

// Reads from the buffer until a new line character is detected
// Returns an error otherwise
func (r *Reader) advance() error {
func (r *LineReader) advance() error {
// Initial check if buffer has already a newLine character
idx := r.inBuffer.IndexFrom(r.inOffset, r.nl)

Expand Down Expand Up @@ -163,7 +163,7 @@ func (r *Reader) advance() error {
return err
}

func (r *Reader) decode(end int) (int, error) {
func (r *LineReader) decode(end int) (int, error) {
var err error
buffer := make([]byte, 1024)
inBytes := r.inBuffer.Bytes()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

// +build !integration

package line
package readfile

import (
"bytes"
Expand All @@ -27,7 +27,7 @@ import (
"github.com/stretchr/testify/assert"
"golang.org/x/text/transform"

"github.com/elastic/beats/filebeat/reader/encode/encoding"
"github.com/elastic/beats/filebeat/reader/readfile/encoding"
)

// Sample texts are from http://www.columbia.edu/~kermit/utf8.html
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestReaderEncodings(t *testing.T) {
}

// create line reader
reader, err := New(buffer, codec, 1024)
reader, err := NewLineReader(buffer, codec, 1024)
if err != nil {
t.Errorf("failed to initialize reader: %v", err)
continue
Expand Down Expand Up @@ -159,7 +159,7 @@ func testReadLines(t *testing.T, inputLines [][]byte) {
// initialize reader
buffer := bytes.NewBuffer(inputStream)
codec, _ := encoding.Plain(buffer)
reader, err := New(buffer, codec, buffer.Len())
reader, err := NewLineReader(buffer, codec, buffer.Len())
if err != nil {
t.Fatalf("Error initializing reader: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package strip_newline
package readfile

import (
"github.com/elastic/beats/filebeat/reader"
Expand All @@ -28,7 +28,7 @@ type StripNewline struct {
}

// New creates a new line reader stripping the last tailing newline.
func New(r reader.Reader) *StripNewline {
func NewStripNewline(r reader.Reader) *StripNewline {
return &StripNewline{r}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

// +build !integration

package strip_newline
package readfile

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package timeout
package readfile

import (
"errors"
Expand All @@ -30,7 +30,7 @@ var (

// timeoutProcessor will signal some configurable timeout error if no
// new line can be returned in time.
type Reader struct {
type TimeoutReader struct {
reader reader.Reader
timeout time.Duration
signal error
Expand All @@ -44,12 +44,12 @@ type lineMessage struct {
}

// New returns a new timeout reader from an input line reader.
func New(reader reader.Reader, signal error, t time.Duration) *Reader {
func NewTimeoutReader(reader reader.Reader, signal error, t time.Duration) *TimeoutReader {
if signal == nil {
signal = errTimeout
}

return &Reader{
return &TimeoutReader{
reader: reader,
signal: signal,
timeout: t,
Expand All @@ -62,7 +62,7 @@ func New(reader reader.Reader, signal error, t time.Duration) *Reader {
// For handline timeouts a goroutine is started for reading lines from
// configured line reader. Only when underlying reader returns an error, the
// goroutine will be finished.
func (r *Reader) Next() (reader.Message, error) {
func (r *TimeoutReader) Next() (reader.Message, error) {
if !r.running {
r.running = true
go func() {
Expand Down
Loading