From 9b631656ce58741a6b12030d73264ba86a2e0a7d Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 18 Jul 2018 14:38:07 +0200 Subject: [PATCH 01/11] Fix filebeat registry meta being nil vs empty --- filebeat/input/docker/input.go | 3 + filebeat/input/file/state.go | 7 ++- filebeat/input/input.go | 2 +- filebeat/input/log/input.go | 7 ++- filebeat/registrar/registrar.go | 74 +++++++++++++++++++++++++ filebeat/tests/system/test_registrar.py | 1 + 6 files changed, 90 insertions(+), 4 deletions(-) diff --git a/filebeat/input/docker/input.go b/filebeat/input/docker/input.go index e1b3eb49af7b..ce34276e5479 100644 --- a/filebeat/input/docker/input.go +++ b/filebeat/input/docker/input.go @@ -73,6 +73,9 @@ func NewInput( // Add stream to meta to ensure different state per stream if config.Containers.Stream != "all" { + if context.Meta == nil { + context.Meta = map[string]string{} + } context.Meta["stream"] = config.Containers.Stream } diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index 1f93a259f799..96c5abf2071f 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -44,6 +44,9 @@ type State struct { // NewState creates a new file state func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]string) State { + if len(meta) == 0 { + meta = nil + } return State{ Fileinfo: fileInfo, Source: path, @@ -60,7 +63,7 @@ func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]strin func (s *State) ID() string { // Generate id on first request. This is needed as id is not set when converting back from json if s.Id == "" { - if s.Meta == nil { + if len(s.Meta) == 0 { s.Id = s.FileStateOS.String() } else { hashValue, _ := hashstructure.Hash(s.Meta, nil) @@ -91,6 +94,6 @@ func (s *State) IsEqual(c *State) bool { func (s *State) IsEmpty() bool { return s.FileStateOS == file.StateOS{} && s.Source == "" && - s.Meta == nil && + len(s.Meta) == 0 && s.Timestamp.IsZero() } diff --git a/filebeat/input/input.go b/filebeat/input/input.go index 414d20963fe6..98eea51db8a5 100644 --- a/filebeat/input/input.go +++ b/filebeat/input/input.go @@ -96,7 +96,7 @@ func New( Done: input.done, BeatDone: input.beatDone, DynamicFields: dynFields, - Meta: map[string]string{}, + Meta: nil, } var ipt Input ipt, err = f(conf, outlet, context) diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 6964b513c893..9e8230c1edfd 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -93,6 +93,11 @@ func NewInput( // can be forwarded correctly to the registrar. stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone) + meta := context.Meta + if len(meta) == 0 { + meta = nil + } + p := &Input{ config: defaultConfig, cfg: cfg, @@ -101,7 +106,7 @@ func NewInput( stateOutlet: stateOut, states: file.NewStates(), done: context.Done, - meta: context.Meta, + meta: meta, } if err := cfg.Unpack(&p.config); err != nil { diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 804ef5692034..f03c7eb00cfd 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -139,6 +139,7 @@ func (r *Registrar) loadStates() error { return fmt.Errorf("Error decoding states: %s", err) } + states = fixStates(states) states = resetStates(states) r.states.SetStates(states) logp.Info("States Loaded from registrar: %+v", len(states)) @@ -146,6 +147,79 @@ func (r *Registrar) loadStates() error { return nil } +// fixStates cleans up the regsitry states when updating from an older version +// of filebeat potentially writing invalid entries. +func fixStates(states []file.State) []file.State { + if len(states) == 0 { + return states + } + + // we use a map of states here, so to identify and merge duplicate entries. + idx := map[string]*file.State{} + for i := range states { + state := &states[i] + id := state.ID() + old, exists := idx[id] + if !exists { + idx[id] = state + } + + mergeStates(old, state) // overwrite the entry in 'old' + } + + if len(idx) == len(states) { + return states + } + + i := 0 + newStates := make([]file.State, len(idx)) + for _, state := range idx { + newStates[i] = *state + i++ + } + return newStates +} + +// mergeStates merges 2 states by trying to determine the 'newer' state. +// The st state is overwritten with the updated fields. +func mergeStates(st, other *file.State) { + st.Finished = st.Finished || other.Finished + if st.Offset < other.Offset { // always select the higher offset + st.Offset = other.Offset + } + + // update file meta-data. As these are updated concurrently by the + // prospectors, select the newer state based on the update timestamp. + var meta, metaOld, metaNew map[string]string + if st.Timestamp.Before(other.Timestamp) { + st.Source = other.Source + st.Timestamp = other.Timestamp + st.TTL = other.TTL + st.FileStateOS = other.FileStateOS + + metaOld, metaNew = st.Meta, other.Meta + } else { + metaOld, metaNew = other.Meta, st.Meta + } + + if len(metaOld) == 0 || len(metaNew) == 0 { + meta = metaNew + } else { + meta = map[string]string{} + for k, v := range metaOld { + meta[k] = v + } + for k, v := range metaNew { + meta[k] = v + } + } + + if len(meta) == 0 { + meta = nil + } + st.Meta = meta +} + // resetStates sets all states to finished and disable TTL on restart // For all states covered by an input, TTL will be overwritten with the input value func resetStates(states []file.State) []file.State { diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index 73d303245232..dd74266fa815 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -70,6 +70,7 @@ def test_registrar_file_content(self): "offset": iterations * line_len, }, record) self.assertTrue("FileStateOS" in record) + self.assertIsNone(record["meta"]) file_state_os = record["FileStateOS"] if os.name == "nt": From beace65882f69eeb50e6f880cffcf8bb8719d104 Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 18 Jul 2018 21:32:59 +0200 Subject: [PATCH 02/11] Add a another len()==0 check --- filebeat/input/log/input.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 9e8230c1edfd..afba5fb42202 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -692,6 +692,10 @@ func (p *Input) updateState(state file.State) error { state.TTL = p.config.CleanInactive } + if len(state.Meta) == 0 { + state.Meta = nil + } + // Update first internal state p.states.Update(state) From 6ab9e6093d1089d36597b38fc1eda98374f5ad49 Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 18 Jul 2018 21:39:56 +0200 Subject: [PATCH 03/11] Separate parsing from opening the file --- filebeat/registrar/registrar.go | 36 ++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index f03c7eb00cfd..5a2a6d0c1353 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -20,6 +20,7 @@ package registrar import ( "encoding/json" "fmt" + "io" "os" "path/filepath" "sync" @@ -132,21 +133,28 @@ func (r *Registrar) loadStates() error { logp.Info("Loading registrar data from %s", r.registryFile) - decoder := json.NewDecoder(f) - states := []file.State{} - err = decoder.Decode(&states) + states, err := readStatesFrom(f) if err != nil { - return fmt.Errorf("Error decoding states: %s", err) + return err } - - states = fixStates(states) - states = resetStates(states) r.states.SetStates(states) logp.Info("States Loaded from registrar: %+v", len(states)) return nil } +func readStatesFrom(in io.Reader) ([]file.State, error) { + states := []file.State{} + decoder := json.NewDecoder(in) + if err := decoder.Decode(&states); err != nil { + return nil, fmt.Errorf("Error decoding states: %s", err) + } + + states = fixStates(states) + states = resetStates(states) + return states, nil +} + // fixStates cleans up the regsitry states when updating from an older version // of filebeat potentially writing invalid entries. func fixStates(states []file.State) []file.State { @@ -158,13 +166,15 @@ func fixStates(states []file.State) []file.State { idx := map[string]*file.State{} for i := range states { state := &states[i] + fixState(state) + id := state.ID() old, exists := idx[id] if !exists { idx[id] = state + } else { + mergeStates(old, state) // overwrite the entry in 'old' } - - mergeStates(old, state) // overwrite the entry in 'old' } if len(idx) == len(states) { @@ -180,6 +190,14 @@ func fixStates(states []file.State) []file.State { return newStates } +// fixState updates a read state to fullfil required invariantes: +// - "Meta" must be nil if len(Meta) == 0 +func fixState(st *file.State) { + if len(st.Meta) == 0 { + st.Meta = nil + } +} + // mergeStates merges 2 states by trying to determine the 'newer' state. // The st state is overwritten with the updated fields. func mergeStates(st, other *file.State) { From 78b26df93d26bc45671aa2c6bebd3a39b85d59d5 Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 18 Jul 2018 21:40:35 +0200 Subject: [PATCH 04/11] Add unit tests for parsing different registry formats --- filebeat/registrar/registrar_test.go | 187 +++++++++++++++++++++++++++ 1 file changed, 187 insertions(+) create mode 100644 filebeat/registrar/registrar_test.go diff --git a/filebeat/registrar/registrar_test.go b/filebeat/registrar/registrar_test.go new file mode 100644 index 000000000000..9671348ab243 --- /dev/null +++ b/filebeat/registrar/registrar_test.go @@ -0,0 +1,187 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package registrar + +import ( + "reflect" + "sort" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/filebeat/input/file" +) + +func TestRegistrarRead(t *testing.T) { + type testCase struct { + input string + expected []file.State + } + + zone := time.FixedZone("+0000", 0) + + cases := map[string]testCase{ + "ok registry with one entry": testCase{ + input: `[ + { + "type": "log", + "source": "test.log", + "offset": 10, + "timestamp": "2018-07-16T10:45:01+00:00", + "ttl": -1, + "meta": null + } + ]`, + expected: []file.State{ + { + Type: "log", + Source: "test.log", + Timestamp: time.Date(2018, time.July, 16, 10, 45, 01, 0, zone), + Offset: 10, + TTL: -2, // loader always resets states + }, + }, + }, + + "load config without meta": testCase{ + input: `[ + { + "type": "log", + "source": "test.log", + "offset": 10, + "timestamp": "2018-07-16T10:45:01+00:00", + "ttl": -1 + } + ]`, + expected: []file.State{ + { + Type: "log", + Source: "test.log", + Timestamp: time.Date(2018, time.July, 16, 10, 45, 01, 0, zone), + Offset: 10, + TTL: -2, // loader always resets states + }, + }, + }, + + "load config with empty meta": testCase{ + input: `[ + { + "type": "log", + "source": "test.log", + "offset": 10, + "timestamp": "2018-07-16T10:45:01+00:00", + "ttl": -1, + "meta": {} + } + ]`, + expected: []file.State{ + { + Type: "log", + Source: "test.log", + Timestamp: time.Date(2018, time.July, 16, 10, 45, 01, 0, zone), + Offset: 10, + TTL: -2, // loader always resets states + }, + }, + }, + + "requires merge without meta-data": testCase{ + input: `[ + { + "type": "log", + "source": "test.log", + "offset": 100, + "timestamp": "2018-07-16T10:45:01+00:00", + "ttl": -1, + "meta": {} + }, + { + "type": "log", + "source": "test.log", + "offset": 10, + "timestamp": "2018-07-16T10:45:10+00:00", + "ttl": -1, + "meta": null + } + ]`, + expected: []file.State{ + { + Type: "log", + Source: "test.log", + Timestamp: time.Date(2018, time.July, 16, 10, 45, 10, 0, zone), + Offset: 100, + TTL: -2, // loader always resets states + Meta: nil, + }, + }, + }, + } + + matchState := func(t *testing.T, i int, expected, actual file.State) { + check := func(name string, a, b interface{}) { + if !reflect.DeepEqual(a, b) { + t.Errorf("State %v: %v mismatch (expected=%v, actual=%v)", i, name, a, b) + } + } + + check("id", expected.ID(), actual.ID()) + check("source", expected.Source, actual.Source) + check("offset", expected.Offset, actual.Offset) + check("ttl", expected.TTL, actual.TTL) + check("meta", expected.Meta, actual.Meta) + check("type", expected.Type, actual.Type) + + if t1, t2 := expected.Timestamp, actual.Timestamp; !t1.Equal(t2) { + t.Errorf("State %v: timestamp mismatch (expected=%v, actual=%v)", i, t1, t2) + } + } + + for name, test := range cases { + test := test + t.Run(name, func(t *testing.T) { + in := strings.NewReader(test.input) + + states, err := readStatesFrom(in) + require.NoError(t, err) + + actual := sortedStates(states) + expected := sortedStates(test.expected) + if len(actual) != len(expected) { + t.Errorf("expected %v state, but registrar did load %v states", + len(expected), len(actual)) + return + } + + for i := range expected { + matchState(t, i, expected[i], actual[i]) + } + }) + } +} + +func sortedStates(states []file.State) []file.State { + tmp := make([]file.State, len(states)) + copy(tmp, states) + sort.Slice(tmp, func(i, j int) bool { + return tmp[i].ID() < tmp[j].ID() + }) + return tmp +} From 62007fd4b1fd81b0947f807fc3b4a63f7631fb15 Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 18 Jul 2018 21:58:29 +0200 Subject: [PATCH 05/11] Use assert, not require package --- filebeat/registrar/registrar_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/filebeat/registrar/registrar_test.go b/filebeat/registrar/registrar_test.go index 9671348ab243..102b073dffad 100644 --- a/filebeat/registrar/registrar_test.go +++ b/filebeat/registrar/registrar_test.go @@ -24,7 +24,7 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" "github.com/elastic/beats/filebeat/input/file" ) @@ -160,7 +160,9 @@ func TestRegistrarRead(t *testing.T) { in := strings.NewReader(test.input) states, err := readStatesFrom(in) - require.NoError(t, err) + if !assert.NoError(t, err) { + return + } actual := sortedStates(states) expected := sortedStates(test.expected) From f74762dcdef65a8f6032778bb0ffbc1b8310b6b7 Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 18 Jul 2018 23:13:12 +0200 Subject: [PATCH 06/11] Add system tests loading registry file from older versions --- .../files/registry/test-2lines-registry-6.3.0 | 1 + .../files/registry/test-2lines-registry-6.3.1 | 1 + .../test-2lines-registry-6.3.1-faulty | 4 + .../registry/test-2lines-registry-latest | 1 + .../tests/system/test_registrar_upgrade.py | 89 +++++++++++++++++++ 5 files changed, 96 insertions(+) create mode 100644 filebeat/tests/files/registry/test-2lines-registry-6.3.0 create mode 100644 filebeat/tests/files/registry/test-2lines-registry-6.3.1 create mode 100644 filebeat/tests/files/registry/test-2lines-registry-6.3.1-faulty create mode 100644 filebeat/tests/files/registry/test-2lines-registry-latest create mode 100644 filebeat/tests/system/test_registrar_upgrade.py diff --git a/filebeat/tests/files/registry/test-2lines-registry-6.3.0 b/filebeat/tests/files/registry/test-2lines-registry-6.3.0 new file mode 100644 index 000000000000..5f7414b9cf3a --- /dev/null +++ b/filebeat/tests/files/registry/test-2lines-registry-6.3.0 @@ -0,0 +1 @@ +[{"source":"test.log","offset":10,"timestamp":"2018-07-18T21:51:43.529893808+02:00","ttl":-1,"type":"log","FileStateOS":{"inode":8604592318,"device":16777220}}] diff --git a/filebeat/tests/files/registry/test-2lines-registry-6.3.1 b/filebeat/tests/files/registry/test-2lines-registry-6.3.1 new file mode 100644 index 000000000000..a4c2ccf126c6 --- /dev/null +++ b/filebeat/tests/files/registry/test-2lines-registry-6.3.1 @@ -0,0 +1 @@ +[{"source":"test.log","offset":10,"timestamp":"2018-07-18T21:51:43.529893808+02:00","ttl":-1,"type":"log","meta":{},"FileStateOS":{"inode":8604592318,"device":16777220}}] diff --git a/filebeat/tests/files/registry/test-2lines-registry-6.3.1-faulty b/filebeat/tests/files/registry/test-2lines-registry-6.3.1-faulty new file mode 100644 index 000000000000..2606e69bbbc3 --- /dev/null +++ b/filebeat/tests/files/registry/test-2lines-registry-6.3.1-faulty @@ -0,0 +1,4 @@ +[ + {"source":"test.log","offset":10,"timestamp":"2018-07-18T21:51:43.529893808+02:00","ttl":-1,"type":"log","meta":{},"FileStateOS":{"inode":8604592318,"device":16777220}}, + {"source":"test.log","offset":0,"timestamp":"2018-07-18T21:51:43.529893808+02:00","ttl":-1,"type":"log","meta":null,"FileStateOS":{"inode":8604592318,"device":16777220}} +] diff --git a/filebeat/tests/files/registry/test-2lines-registry-latest b/filebeat/tests/files/registry/test-2lines-registry-latest new file mode 100644 index 000000000000..110dc1613d1b --- /dev/null +++ b/filebeat/tests/files/registry/test-2lines-registry-latest @@ -0,0 +1 @@ +[{"source":"test.log","offset":10,"timestamp":"2018-07-18T21:51:43.529893808+02:00","ttl":-1,"type":"log","meta":null,"FileStateOS":{"inode":8604592318,"device":16777220}}] diff --git a/filebeat/tests/system/test_registrar_upgrade.py b/filebeat/tests/system/test_registrar_upgrade.py new file mode 100644 index 000000000000..7f54ab1d996c --- /dev/null +++ b/filebeat/tests/system/test_registrar_upgrade.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python +"""Test the registrar with old registry file formats""" + +import os +import json + +from nose.plugins.skip import Skip, SkipTest + +from filebeat import BaseTest + + +class Test(BaseTest): + def test_upgrade_from_6_3_0(self): + template = "test-2lines-registry-6.3.0" + self.run_with_single_registry_format(template) + + def test_upgrade_from_6_3_1(self): + template = "test-2lines-registry-6.3.1" + self.run_with_single_registry_format(template) + + def test_upgrade_from_faulty_6_3_1(self): + template = "test-2lines-registry-6.3.1-faulty" + self.run_with_single_registry_format(template) + + def test_upgrade_from_latest(self): + template = "test-2lines-registry-latest" + self.run_with_single_registry_format(template) + + + def run_with_single_registry_format(self, template): + # prepare log file + testfile, file_state = self.prepare_log() + + # prepare registry file + self.apply_registry_tempate(template, testfile, file_state) + + self.run_and_validate() + + + def run_and_validate(self): + filebeat = self.start_beat() + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=15) + + # stop filebeat and enforce one last registry update + filebeat.check_kill_and_wait() + + data = self.get_registry() + assert len(data) == 1 + assert data[0]["offset"] == 20 + + # check only second line has been written + output = self.read_output() + assert len(output) == 1 + assert output[0]["message"] == "abcdefghi" + + def apply_registry_tempate(self, template, testfile, file_state): + source = self.beat_path + "/tests/files/registry/" + template + with open(source) as f: + registry = json.loads(f.read()) + + for state in registry: + state["source"] = testfile + state["FileStateOS"] = file_state + with open(self.working_dir + "/registry", 'w') as f: + f.write(json.dumps(registry)) + + + def prepare_log(self): + # test is current skipped on windows, due to FileStateOS must match the + # current OS format. + if os.name == "nt": + SkipTest + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*" + ) + + os.mkdir(self.working_dir + "/log/") + + testfile_path = self.working_dir + "/log/test.log" + with open(testfile_path, 'w') as f: + f.write("123456789\n") + f.write("abcdefghi\n") + + st = os.stat(testfile_path) + file_state = {"inode": st.st_ino, "device": st.st_dev} + return testfile_path, file_state From 56bbdf1ff6ce0581e89a5714a280bdd029047e34 Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 18 Jul 2018 23:15:45 +0200 Subject: [PATCH 07/11] Add changelog entry --- CHANGELOG.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index ef447503f312..974f481257df 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -103,6 +103,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - Fix offset field pointing at end of a line. {issue}6514[6514] - Fix an issue when parsing ISO8601 dates with timezone definition {issue}7367[7367] - Fix Grok pattern of MongoDB module. {pull}7568[7568] +- Fix registry duplicates and log resending on upgrade. {issue}7634[7634] *Heartbeat* - Fix race due to updates of shared a map, that was not supposed to be shared between multiple go-routines. {issue}6616[6616] From 453e9827158f9480065e7389514971cf5787b88e Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 18 Jul 2018 23:18:34 +0200 Subject: [PATCH 08/11] pep8 --- filebeat/tests/system/test_registrar_upgrade.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/filebeat/tests/system/test_registrar_upgrade.py b/filebeat/tests/system/test_registrar_upgrade.py index 7f54ab1d996c..077b05de8785 100644 --- a/filebeat/tests/system/test_registrar_upgrade.py +++ b/filebeat/tests/system/test_registrar_upgrade.py @@ -26,7 +26,6 @@ def test_upgrade_from_latest(self): template = "test-2lines-registry-latest" self.run_with_single_registry_format(template) - def run_with_single_registry_format(self, template): # prepare log file testfile, file_state = self.prepare_log() @@ -36,7 +35,6 @@ def run_with_single_registry_format(self, template): self.run_and_validate() - def run_and_validate(self): filebeat = self.start_beat() self.wait_until( @@ -56,7 +54,7 @@ def run_and_validate(self): assert output[0]["message"] == "abcdefghi" def apply_registry_tempate(self, template, testfile, file_state): - source = self.beat_path + "/tests/files/registry/" + template + source = self.beat_path + "/tests/files/registry/" + template with open(source) as f: registry = json.loads(f.read()) @@ -66,7 +64,6 @@ def apply_registry_tempate(self, template, testfile, file_state): with open(self.working_dir + "/registry", 'w') as f: f.write(json.dumps(registry)) - def prepare_log(self): # test is current skipped on windows, due to FileStateOS must match the # current OS format. From 43bd3ad7b5f8b2c9175c2e7fb2fab522a443e6f8 Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 18 Jul 2018 23:21:09 +0200 Subject: [PATCH 09/11] typo --- .../tests/system/test_registrar_upgrade.py | 41 ++++++++++--------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/filebeat/tests/system/test_registrar_upgrade.py b/filebeat/tests/system/test_registrar_upgrade.py index 077b05de8785..d5b89ac35bca 100644 --- a/filebeat/tests/system/test_registrar_upgrade.py +++ b/filebeat/tests/system/test_registrar_upgrade.py @@ -31,29 +31,11 @@ def run_with_single_registry_format(self, template): testfile, file_state = self.prepare_log() # prepare registry file - self.apply_registry_tempate(template, testfile, file_state) + self.apply_registry_template(template, testfile, file_state) self.run_and_validate() - def run_and_validate(self): - filebeat = self.start_beat() - self.wait_until( - lambda: self.output_has(lines=1), - max_timeout=15) - - # stop filebeat and enforce one last registry update - filebeat.check_kill_and_wait() - - data = self.get_registry() - assert len(data) == 1 - assert data[0]["offset"] == 20 - - # check only second line has been written - output = self.read_output() - assert len(output) == 1 - assert output[0]["message"] == "abcdefghi" - - def apply_registry_tempate(self, template, testfile, file_state): + def apply_registry_template(self, template, testfile, file_state): source = self.beat_path + "/tests/files/registry/" + template with open(source) as f: registry = json.loads(f.read()) @@ -84,3 +66,22 @@ def prepare_log(self): st = os.stat(testfile_path) file_state = {"inode": st.st_ino, "device": st.st_dev} return testfile_path, file_state + + def run_and_validate(self): + filebeat = self.start_beat() + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=15) + + # stop filebeat and enforce one last registry update + filebeat.check_kill_and_wait() + + data = self.get_registry() + assert len(data) == 1 + assert data[0]["offset"] == 20 + + # check only second line has been written + output = self.read_output() + assert len(output) == 1 + assert output[0]["message"] == "abcdefghi" + From 12778ab0d01e5a55600412ab7dd8eeda67044c3f Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 18 Jul 2018 23:34:16 +0200 Subject: [PATCH 10/11] make autopep8 happy? --- filebeat/tests/system/test_registrar_upgrade.py | 1 - 1 file changed, 1 deletion(-) diff --git a/filebeat/tests/system/test_registrar_upgrade.py b/filebeat/tests/system/test_registrar_upgrade.py index d5b89ac35bca..639e33447bef 100644 --- a/filebeat/tests/system/test_registrar_upgrade.py +++ b/filebeat/tests/system/test_registrar_upgrade.py @@ -84,4 +84,3 @@ def run_and_validate(self): output = self.read_output() assert len(output) == 1 assert output[0]["message"] == "abcdefghi" - From 064fdfa017269797fad8acd093a0383f3454e4d2 Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 18 Jul 2018 23:52:55 +0200 Subject: [PATCH 11/11] Fix windows test skipping --- filebeat/tests/system/test_registrar_upgrade.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/tests/system/test_registrar_upgrade.py b/filebeat/tests/system/test_registrar_upgrade.py index 639e33447bef..21569e9c384a 100644 --- a/filebeat/tests/system/test_registrar_upgrade.py +++ b/filebeat/tests/system/test_registrar_upgrade.py @@ -50,7 +50,7 @@ def prepare_log(self): # test is current skipped on windows, due to FileStateOS must match the # current OS format. if os.name == "nt": - SkipTest + raise SkipTest self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*"