diff --git a/filebeat/input/log/config.go b/filebeat/input/log/config.go index 0b13992518d5..b88c11d932b1 100644 --- a/filebeat/input/log/config.go +++ b/filebeat/input/log/config.go @@ -27,8 +27,8 @@ import ( cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input/file" - "github.com/elastic/beats/filebeat/reader/json" "github.com/elastic/beats/filebeat/reader/multiline" + "github.com/elastic/beats/filebeat/reader/readjson" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/common/match" "github.com/elastic/beats/libbeat/logp" @@ -100,7 +100,7 @@ type config struct { IncludeLines []match.Matcher `config:"include_lines"` MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` Multiline *multiline.Config `config:"multiline"` - JSON *json.Config `config:"json"` + JSON *readjson.Config `config:"json"` // Hidden on purpose, used by the docker input: DockerJSON *struct { diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index d1d4f1481851..d5ffb3a38285 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -50,13 +50,10 @@ import ( "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/filebeat/reader" - "github.com/elastic/beats/filebeat/reader/docker_json" - "github.com/elastic/beats/filebeat/reader/encode" - "github.com/elastic/beats/filebeat/reader/encode/encoding" - "github.com/elastic/beats/filebeat/reader/json" - "github.com/elastic/beats/filebeat/reader/limit" "github.com/elastic/beats/filebeat/reader/multiline" - "github.com/elastic/beats/filebeat/reader/strip_newline" + "github.com/elastic/beats/filebeat/reader/readfile" + "github.com/elastic/beats/filebeat/reader/readfile/encoding" + "github.com/elastic/beats/filebeat/reader/readjson" "github.com/elastic/beats/filebeat/util" ) @@ -320,7 +317,7 @@ func (h *Harvester) Run() error { } if h.config.JSON != nil && len(jsonFields) > 0 { - ts := json.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON) + ts := readjson.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON) if !ts.IsZero() { // there was a `@timestamp` key in the event, so overwrite // the resulting timestamp @@ -552,21 +549,21 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { return nil, err } - r, err = encode.New(h.log, h.encoding, h.config.BufferSize) + r, err = readfile.NewEncodeReader(h.log, h.encoding, h.config.BufferSize) if err != nil { return nil, err } if h.config.DockerJSON != nil { // Docker json-file format, add custom parsing to the pipeline - r = docker_json.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial) + r = readjson.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial) } if h.config.JSON != nil { - r = json.New(r, h.config.JSON) + r = readjson.NewJSONReader(r, h.config.JSON) } - r = strip_newline.New(r) + r = readfile.NewStripNewline(r) if h.config.Multiline != nil { r, err = multiline.New(r, "\n", h.config.MaxBytes, h.config.Multiline) @@ -575,5 +572,5 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { } } - return limit.New(r, h.config.MaxBytes), nil + return readfile.NewLimitReader(r, h.config.MaxBytes), nil } diff --git a/filebeat/input/log/harvester_test.go b/filebeat/input/log/harvester_test.go index a311743cc934..f3f6697a8de3 100644 --- a/filebeat/input/log/harvester_test.go +++ b/filebeat/input/log/harvester_test.go @@ -31,7 +31,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/filebeat/reader" - "github.com/elastic/beats/filebeat/reader/encode/encoding" + "github.com/elastic/beats/filebeat/reader/readfile/encoding" "github.com/elastic/beats/libbeat/common" ) diff --git a/filebeat/reader/multiline/multiline.go b/filebeat/reader/multiline/multiline.go index f1b871f6c747..57209be94cdc 100644 --- a/filebeat/reader/multiline/multiline.go +++ b/filebeat/reader/multiline/multiline.go @@ -23,7 +23,7 @@ import ( "time" "github.com/elastic/beats/filebeat/reader" - "github.com/elastic/beats/filebeat/reader/timeout" + "github.com/elastic/beats/filebeat/reader/readfile" "github.com/elastic/beats/libbeat/common/match" "github.com/elastic/beats/libbeat/logp" ) @@ -112,7 +112,7 @@ func New( } if tout > 0 { - r = timeout.New(r, sigMultilineTimeout, tout) + r = readfile.NewTimeoutReader(r, sigMultilineTimeout, tout) } mlr := &Reader{ diff --git a/filebeat/reader/multiline/multiline_test.go b/filebeat/reader/multiline/multiline_test.go index 50ad18f17d13..6fe05fde0d28 100644 --- a/filebeat/reader/multiline/multiline_test.go +++ b/filebeat/reader/multiline/multiline_test.go @@ -30,9 +30,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/filebeat/reader" - "github.com/elastic/beats/filebeat/reader/encode" - "github.com/elastic/beats/filebeat/reader/encode/encoding" - "github.com/elastic/beats/filebeat/reader/strip_newline" + "github.com/elastic/beats/filebeat/reader/readfile" + "github.com/elastic/beats/filebeat/reader/readfile/encoding" "github.com/elastic/beats/libbeat/common/match" ) @@ -190,12 +189,12 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reade } var r reader.Reader - r, err = encode.New(in, enc, 4096) + r, err = readfile.NewEncodeReader(in, enc, 4096) if err != nil { t.Fatalf("Failed to initialize line reader: %v", err) } - r, err = New(strip_newline.New(r), "\n", 1<<20, &cfg) + r, err = New(readfile.NewStripNewline(r), "\n", 1<<20, &cfg) if err != nil { t.Fatalf("failed to initialize reader: %v", err) } diff --git a/filebeat/reader/encode/encode.go b/filebeat/reader/readfile/encode.go similarity index 81% rename from filebeat/reader/encode/encode.go rename to filebeat/reader/readfile/encode.go index d0306689200e..a86e6541bec6 100644 --- a/filebeat/reader/encode/encode.go +++ b/filebeat/reader/readfile/encode.go @@ -15,37 +15,36 @@ // specific language governing permissions and limitations // under the License. -package encode +package readfile import ( "io" "time" "github.com/elastic/beats/filebeat/reader" - "github.com/elastic/beats/filebeat/reader/encode/encoding" - "github.com/elastic/beats/filebeat/reader/line" + "github.com/elastic/beats/filebeat/reader/readfile/encoding" ) // Reader produces lines by reading lines from an io.Reader // through a decoder converting the reader it's encoding to utf-8. -type Reader struct { - reader *line.Reader +type EncoderReader struct { + reader *LineReader } // New creates a new Encode reader from input reader by applying // the given codec. -func New( +func NewEncodeReader( r io.Reader, codec encoding.Encoding, bufferSize int, -) (Reader, error) { - eReader, err := line.New(r, codec, bufferSize) - return Reader{eReader}, err +) (EncoderReader, error) { + eReader, err := NewLineReader(r, codec, bufferSize) + return EncoderReader{eReader}, err } // Next reads the next line from it's initial io.Reader // This converts a io.Reader to a reader.reader -func (r Reader) Next() (reader.Message, error) { +func (r EncoderReader) Next() (reader.Message, error) { c, sz, err := r.reader.Next() // Creating message object return reader.Message{ diff --git a/filebeat/reader/encode/encoding/encoding.go b/filebeat/reader/readfile/encoding/encoding.go similarity index 100% rename from filebeat/reader/encode/encoding/encoding.go rename to filebeat/reader/readfile/encoding/encoding.go diff --git a/filebeat/reader/encode/encoding/mixed.go b/filebeat/reader/readfile/encoding/mixed.go similarity index 100% rename from filebeat/reader/encode/encoding/mixed.go rename to filebeat/reader/readfile/encoding/mixed.go diff --git a/filebeat/reader/encode/encoding/utf16.go b/filebeat/reader/readfile/encoding/utf16.go similarity index 100% rename from filebeat/reader/encode/encoding/utf16.go rename to filebeat/reader/readfile/encoding/utf16.go diff --git a/filebeat/reader/encode/encoding/utf16_test.go b/filebeat/reader/readfile/encoding/utf16_test.go similarity index 100% rename from filebeat/reader/encode/encoding/utf16_test.go rename to filebeat/reader/readfile/encoding/utf16_test.go diff --git a/filebeat/reader/limit/limit.go b/filebeat/reader/readfile/limit.go similarity index 85% rename from filebeat/reader/limit/limit.go rename to filebeat/reader/readfile/limit.go index 356ba4395fb0..1d7b46e2a471 100644 --- a/filebeat/reader/limit/limit.go +++ b/filebeat/reader/readfile/limit.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package limit +package readfile import ( "github.com/elastic/beats/filebeat/reader" @@ -23,18 +23,18 @@ import ( // Reader sets an upper limited on line length. Lines longer // then the max configured line length will be snapped short. -type Reader struct { +type LimitReader struct { reader reader.Reader maxBytes int } // New creates a new reader limiting the line length. -func New(r reader.Reader, maxBytes int) *Reader { - return &Reader{reader: r, maxBytes: maxBytes} +func NewLimitReader(r reader.Reader, maxBytes int) *LimitReader { + return &LimitReader{reader: r, maxBytes: maxBytes} } // Next returns the next line. -func (r *Reader) Next() (reader.Message, error) { +func (r *LimitReader) Next() (reader.Message, error) { message, err := r.reader.Next() if len(message.Content) > r.maxBytes { message.Content = message.Content[:r.maxBytes] diff --git a/filebeat/reader/line/line.go b/filebeat/reader/readfile/line.go similarity index 94% rename from filebeat/reader/line/line.go rename to filebeat/reader/readfile/line.go index e3b3c1764216..be2714b498f1 100644 --- a/filebeat/reader/line/line.go +++ b/filebeat/reader/readfile/line.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package line +package readfile import ( "io" @@ -30,7 +30,7 @@ import ( // lineReader reads lines from underlying reader, decoding the input stream // using the configured codec. The reader keeps track of bytes consumed // from raw input stream for every decoded line. -type Reader struct { +type LineReader struct { reader io.Reader codec encoding.Encoding bufferSize int @@ -43,7 +43,7 @@ type Reader struct { } // New creates a new reader object -func New(input io.Reader, codec encoding.Encoding, bufferSize int) (*Reader, error) { +func NewLineReader(input io.Reader, codec encoding.Encoding, bufferSize int) (*LineReader, error) { encoder := codec.NewEncoder() // Create newline char based on encoding @@ -52,7 +52,7 @@ func New(input io.Reader, codec encoding.Encoding, bufferSize int) (*Reader, err return nil, err } - return &Reader{ + return &LineReader{ reader: input, codec: codec, bufferSize: bufferSize, @@ -64,7 +64,7 @@ func New(input io.Reader, codec encoding.Encoding, bufferSize int) (*Reader, err } // Next reads the next line until the new line character -func (r *Reader) Next() ([]byte, int, error) { +func (r *LineReader) Next() ([]byte, int, error) { // This loop is need in case advance detects an line ending which turns out // not to be one when decoded. If that is the case, reading continues. for { @@ -108,7 +108,7 @@ func (r *Reader) Next() ([]byte, int, error) { // Reads from the buffer until a new line character is detected // Returns an error otherwise -func (r *Reader) advance() error { +func (r *LineReader) advance() error { // Initial check if buffer has already a newLine character idx := r.inBuffer.IndexFrom(r.inOffset, r.nl) @@ -163,7 +163,7 @@ func (r *Reader) advance() error { return err } -func (r *Reader) decode(end int) (int, error) { +func (r *LineReader) decode(end int) (int, error) { var err error buffer := make([]byte, 1024) inBytes := r.inBuffer.Bytes() diff --git a/filebeat/reader/line/line_test.go b/filebeat/reader/readfile/line_test.go similarity index 96% rename from filebeat/reader/line/line_test.go rename to filebeat/reader/readfile/line_test.go index 6b420b7645ac..9d244f0decfa 100644 --- a/filebeat/reader/line/line_test.go +++ b/filebeat/reader/readfile/line_test.go @@ -17,7 +17,7 @@ // +build !integration -package line +package readfile import ( "bytes" @@ -27,7 +27,7 @@ import ( "github.com/stretchr/testify/assert" "golang.org/x/text/transform" - "github.com/elastic/beats/filebeat/reader/encode/encoding" + "github.com/elastic/beats/filebeat/reader/readfile/encoding" ) // Sample texts are from http://www.columbia.edu/~kermit/utf8.html @@ -68,7 +68,7 @@ func TestReaderEncodings(t *testing.T) { } // create line reader - reader, err := New(buffer, codec, 1024) + reader, err := NewLineReader(buffer, codec, 1024) if err != nil { t.Errorf("failed to initialize reader: %v", err) continue @@ -159,7 +159,7 @@ func testReadLines(t *testing.T, inputLines [][]byte) { // initialize reader buffer := bytes.NewBuffer(inputStream) codec, _ := encoding.Plain(buffer) - reader, err := New(buffer, codec, buffer.Len()) + reader, err := NewLineReader(buffer, codec, buffer.Len()) if err != nil { t.Fatalf("Error initializing reader: %v", err) } diff --git a/filebeat/reader/strip_newline/strip_newline.go b/filebeat/reader/readfile/strip_newline.go similarity index 96% rename from filebeat/reader/strip_newline/strip_newline.go rename to filebeat/reader/readfile/strip_newline.go index b62ba4381f6d..3394cb9289e4 100644 --- a/filebeat/reader/strip_newline/strip_newline.go +++ b/filebeat/reader/readfile/strip_newline.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package strip_newline +package readfile import ( "github.com/elastic/beats/filebeat/reader" @@ -28,7 +28,7 @@ type StripNewline struct { } // New creates a new line reader stripping the last tailing newline. -func New(r reader.Reader) *StripNewline { +func NewStripNewline(r reader.Reader) *StripNewline { return &StripNewline{r} } diff --git a/filebeat/reader/strip_newline/strip_newline_test.go b/filebeat/reader/readfile/strip_newline_test.go similarity index 98% rename from filebeat/reader/strip_newline/strip_newline_test.go rename to filebeat/reader/readfile/strip_newline_test.go index 8d18ea44e41c..543056393e53 100644 --- a/filebeat/reader/strip_newline/strip_newline_test.go +++ b/filebeat/reader/readfile/strip_newline_test.go @@ -17,7 +17,7 @@ // +build !integration -package strip_newline +package readfile import ( "testing" diff --git a/filebeat/reader/timeout/timeout.go b/filebeat/reader/readfile/timeout.go similarity index 90% rename from filebeat/reader/timeout/timeout.go rename to filebeat/reader/readfile/timeout.go index 2e3fe3ef895e..ea73bd607375 100644 --- a/filebeat/reader/timeout/timeout.go +++ b/filebeat/reader/readfile/timeout.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package timeout +package readfile import ( "errors" @@ -30,7 +30,7 @@ var ( // timeoutProcessor will signal some configurable timeout error if no // new line can be returned in time. -type Reader struct { +type TimeoutReader struct { reader reader.Reader timeout time.Duration signal error @@ -44,12 +44,12 @@ type lineMessage struct { } // New returns a new timeout reader from an input line reader. -func New(reader reader.Reader, signal error, t time.Duration) *Reader { +func NewTimeoutReader(reader reader.Reader, signal error, t time.Duration) *TimeoutReader { if signal == nil { signal = errTimeout } - return &Reader{ + return &TimeoutReader{ reader: reader, signal: signal, timeout: t, @@ -62,7 +62,7 @@ func New(reader reader.Reader, signal error, t time.Duration) *Reader { // For handline timeouts a goroutine is started for reading lines from // configured line reader. Only when underlying reader returns an error, the // goroutine will be finished. -func (r *Reader) Next() (reader.Message, error) { +func (r *TimeoutReader) Next() (reader.Message, error) { if !r.running { r.running = true go func() { diff --git a/filebeat/reader/docker_json/docker_json.go b/filebeat/reader/readjson/docker_json.go similarity index 93% rename from filebeat/reader/docker_json/docker_json.go rename to filebeat/reader/readjson/docker_json.go index df87e9a161fc..1ec2514b7873 100644 --- a/filebeat/reader/docker_json/docker_json.go +++ b/filebeat/reader/readjson/docker_json.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package docker_json +package readjson import ( "bytes" @@ -29,8 +29,8 @@ import ( "github.com/pkg/errors" ) -// Reader processor renames a given field -type Reader struct { +// DockerJSONReader processor renames a given field +type DockerJSONReader struct { reader reader.Reader // stream filter, `all`, `stderr` or `stdout` stream string @@ -52,8 +52,8 @@ type crioLog struct { } // New creates a new reader renaming a field -func New(r reader.Reader, stream string, partial bool) *Reader { - return &Reader{ +func New(r reader.Reader, stream string, partial bool) *DockerJSONReader { + return &DockerJSONReader{ stream: stream, partial: partial, reader: r, @@ -110,7 +110,7 @@ func parseDockerJSONLog(message reader.Message, msg *dockerLog) (reader.Message, } // Next returns the next line. -func (p *Reader) Next() (reader.Message, error) { +func (p *DockerJSONReader) Next() (reader.Message, error) { for { message, err := p.reader.Next() if err != nil { diff --git a/filebeat/reader/docker_json/docker_json_test.go b/filebeat/reader/readjson/docker_json_test.go similarity index 99% rename from filebeat/reader/docker_json/docker_json_test.go rename to filebeat/reader/readjson/docker_json_test.go index 2b838f0e079a..7bb997581d28 100644 --- a/filebeat/reader/docker_json/docker_json_test.go +++ b/filebeat/reader/readjson/docker_json_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package docker_json +package readjson import ( "testing" diff --git a/filebeat/reader/json/json.go b/filebeat/reader/readjson/json.go similarity index 93% rename from filebeat/reader/json/json.go rename to filebeat/reader/readjson/json.go index 0f62bca6c156..2b6207a74dc2 100644 --- a/filebeat/reader/json/json.go +++ b/filebeat/reader/readjson/json.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package json +package readjson import ( "bytes" @@ -30,19 +30,20 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -type JSON struct { +// JSONReader parses JSON inputs +type JSONReader struct { reader reader.Reader cfg *Config } // NewJSONReader creates a new reader that can decode JSON. -func New(r reader.Reader, cfg *Config) *JSON { - return &JSON{reader: r, cfg: cfg} +func NewJSONReader(r reader.Reader, cfg *Config) *JSONReader { + return &JSONReader{reader: r, cfg: cfg} } // decodeJSON unmarshals the text parameter into a MapStr and // returns the new text column if one was requested. -func (r *JSON) decode(text []byte) ([]byte, common.MapStr) { +func (r *JSONReader) decode(text []byte) ([]byte, common.MapStr) { var jsonFields map[string]interface{} err := unmarshal(text, &jsonFields) @@ -93,7 +94,7 @@ func unmarshal(text []byte, fields *map[string]interface{}) error { } // Next decodes JSON and returns the filled Line object. -func (r *JSON) Next() (reader.Message, error) { +func (r *JSONReader) Next() (reader.Message, error) { message, err := r.reader.Next() if err != nil { return message, err diff --git a/filebeat/reader/json/json_config.go b/filebeat/reader/readjson/json_config.go similarity index 98% rename from filebeat/reader/json/json_config.go rename to filebeat/reader/readjson/json_config.go index edadceb9c9b9..a95b9db08447 100644 --- a/filebeat/reader/json/json_config.go +++ b/filebeat/reader/readjson/json_config.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package json +package readjson // Config holds the options a JSON reader. type Config struct { diff --git a/filebeat/reader/json/json_test.go b/filebeat/reader/readjson/json_test.go similarity index 99% rename from filebeat/reader/json/json_test.go rename to filebeat/reader/readjson/json_test.go index 7ffe931b44cf..310bd373b5f0 100644 --- a/filebeat/reader/json/json_test.go +++ b/filebeat/reader/readjson/json_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package json +package readjson import ( "testing" @@ -176,7 +176,7 @@ func TestDecodeJSON(t *testing.T) { for _, test := range tests { - var p JSON + var p JSONReader p.cfg = &test.Config text, M := p.decode([]byte(test.Text)) assert.Equal(t, test.ExpectedText, string(text)) diff --git a/filebeat/scripts/tester/main.go b/filebeat/scripts/tester/main.go index 5e4538ba191d..3300b8ef623a 100644 --- a/filebeat/scripts/tester/main.go +++ b/filebeat/scripts/tester/main.go @@ -30,11 +30,9 @@ import ( "time" "github.com/elastic/beats/filebeat/reader" - "github.com/elastic/beats/filebeat/reader/encode" - "github.com/elastic/beats/filebeat/reader/encode/encoding" - "github.com/elastic/beats/filebeat/reader/limit" "github.com/elastic/beats/filebeat/reader/multiline" - "github.com/elastic/beats/filebeat/reader/strip_newline" + "github.com/elastic/beats/filebeat/reader/readfile" + "github.com/elastic/beats/filebeat/reader/readfile/encoding" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/match" ) @@ -135,12 +133,12 @@ func getLogsFromFile(logfile string, conf *logReaderConfig) ([]string, error) { } var r reader.Reader - r, err = encode.New(f, enc, 4096) + r, err = readfile.NewEncodeReader(f, enc, 4096) if err != nil { return nil, err } - r = strip_newline.New(r) + r = readfile.NewStripNewline(r) if conf.multiPattern != "" { p, err := match.Compile(conf.multiPattern) @@ -158,7 +156,7 @@ func getLogsFromFile(logfile string, conf *logReaderConfig) ([]string, error) { return nil, err } } - r = limit.New(r, conf.maxBytes) + r = readfile.NewLimitReader(r, conf.maxBytes) var logs []string for {