From a4a27d7d7d4a6a434571c6151ab4f24d9eb6f4da Mon Sep 17 00:00:00 2001 From: ruflin Date: Wed, 1 Feb 2017 19:55:07 +0100 Subject: [PATCH] Apply close_timeout also when output is blocked Currently `close_timeout` does not apply in case the output is blocked. This PR changes the behavior of `close_timeout` to also close a file handler when the output is blocked. It is important to note, that this closes the file handler but NOT the harvester. This is important as the closing of the harvester requires a state update to set `state.Finished=true`. If this would not happen and the harvester is closed, processing would not continue when the output becomes available again. Previously the internal state of a harvester was updated when the event was created. This could lead to the issue that in case an event was not sent but the state update went through, that an event would be missing. This is now prevent by overwriting the internal state only when the event was successfully sent. The done channels from prospector and harvester are renamed to be more obvious which one belongs to what: h.done -> h.prospectorDone, h.harvestDone -> h.done. As the harvester channel is close with the `stop` method in all cases `h.done` is sufficient in most places. This PR does not solve the problem related to reloading and stopping a harvester mentioned in https://github.com/elastic/beats/pull/3511#issuecomment-277264760 related to reloading. This will be done in a follow up PR. --- CHANGELOG.asciidoc | 1 + filebeat/harvester/harvester.go | 6 ++- filebeat/harvester/log.go | 65 ++++++++++++++++--------- filebeat/harvester/log_test.go | 1 - filebeat/tests/system/test_multiline.py | 6 +-- 5 files changed, 51 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 1e0f4936bfdd..2fff60689288 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -67,6 +67,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] - Add the `pipeline` config option at the prospector level, for configuring the Ingest Node pipeline ID. {pull}3433[3433] - Update regular expressions used for matching file names or lines (multiline, include/exclude functionality) to new matchers improving performance of simple string matches. {pull}3469[3469] - The `symlinks` and `harverster_limit` settings are now GA, instead of experimental. {pull}3525[3525] +- close_timeout is also applied when the output is blocking. {pull}3511[3511] *Heartbeat* diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index 02525294ba6c..3e9755f16624 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -14,6 +14,7 @@ package harvester import ( "errors" "fmt" + "sync" "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" @@ -39,6 +40,8 @@ type Harvester struct { fileReader *LogFile encodingFactory encoding.EncodingFactory encoding encoding.Encoding + prospectorDone chan struct{} + once sync.Once done chan struct{} } @@ -53,7 +56,8 @@ func NewHarvester( config: defaultConfig, state: state, prospectorChan: prospectorChan, - done: done, + prospectorDone: done, + done: make(chan struct{}), } if err := cfg.Unpack(&h.config); err != nil { diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index ea7414b57630..95027045d5e2 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -58,13 +58,14 @@ func (h *Harvester) Harvest(r reader.Reader) { defer h.close() // Channel to stop internal harvester routines - harvestDone := make(chan struct{}) - defer close(harvestDone) + defer h.stop() // Closes reader after timeout or when done channel is closed // This routine is also responsible to properly stop the reader go func() { - var closeTimeout <-chan time.Time + + closeTimeout := make(<-chan time.Time) + // starts close_timeout timer if h.config.CloseTimeout > 0 { closeTimeout = time.After(h.config.CloseTimeout) } @@ -72,12 +73,14 @@ func (h *Harvester) Harvest(r reader.Reader) { select { // Applies when timeout is reached case <-closeTimeout: - logp.Info("Closing harvester because close_timeout was reached: %s", h.state.Source) + logp.Info("Closing harvester because close_timeout was reached.") // Required for shutdown when hanging inside reader - case <-h.done: + case <-h.prospectorDone: // Required when reader loop returns and reader finished - case <-harvestDone: + case <-h.done: } + + h.stop() h.fileReader.Close() }() @@ -122,9 +125,10 @@ func (h *Harvester) Harvest(r reader.Reader) { // Update offset h.state.Offset += int64(message.Bytes) - // Create state event - event := input.NewEvent(h.getState()) + state := h.getState() + // Create state event + event := input.NewEvent(state) text := string(message.Content) // Check if data should be added to event. Only export non empty events. @@ -147,12 +151,21 @@ func (h *Harvester) Harvest(r reader.Reader) { if !h.sendEvent(event) { return } + // Update state of harvester as successfully sent + h.state = state } } +func (h *Harvester) stop() { + h.once.Do(func() { + close(h.done) + }) +} + // sendEvent sends event to the spooler channel // Return false if event was not sent func (h *Harvester) sendEvent(event *input.Event) bool { + select { case <-h.done: return false @@ -161,6 +174,21 @@ func (h *Harvester) sendEvent(event *input.Event) bool { } } +// sendStateUpdate send an empty event with the current state to update the registry +// close_timeout does not apply here to make sure a harvester is closed properly. In +// case the output is blocked the harvester will stay open to make sure no new harvester +// is started. As soon as the output becomes available again, the finished state is written +// and processing can continue. +func (h *Harvester) sendStateUpdate() { + logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset) + event := input.NewEvent(h.state) + + select { + case <-h.prospectorDone: + case h.prospectorChan <- event: // ship the new event downstream + } +} + // shouldExportLine decides if the line is exported or not based on // the include_lines and exclude_lines options. func (h *Harvester) shouldExportLine(line string) bool { @@ -260,22 +288,18 @@ func (h *Harvester) initFileOffset(file *os.File) (int64, error) { return file.Seek(0, os.SEEK_CUR) } -// sendStateUpdate send an empty event with the current state to update the registry -func (h *Harvester) sendStateUpdate() bool { - logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset) - event := input.NewEvent(h.getState()) - return h.sendEvent(event) -} - +// getState returns an updated copy of the harvester state func (h *Harvester) getState() file.State { if h.config.InputType == config.StdinInputType { return file.State{} } + state := h.state + // refreshes the values in State with the values from the harvester itself - h.state.FileStateOS = file.GetOSState(h.state.Fileinfo) - return h.state + state.FileStateOS = file.GetOSState(h.state.Fileinfo) + return state } func (h *Harvester) close() { @@ -289,6 +313,7 @@ func (h *Harvester) close() { // If file was never opened, it can't be closed if h.file != nil { + // close file handler h.file.Close() logp.Debug("harvester", "Closing file: %s", h.state.Source) @@ -350,9 +375,3 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { return reader.NewLimit(r, h.config.MaxBytes), nil } - -/* - -TODO: introduce new structure: log_file —[raw bytes]—> (line —[utf8 bytes]—> encode) —[message]—> …` - -*/ diff --git a/filebeat/harvester/log_test.go b/filebeat/harvester/log_test.go index 8862b15c8dc2..c08a7ace01c4 100644 --- a/filebeat/harvester/log_test.go +++ b/filebeat/harvester/log_test.go @@ -69,7 +69,6 @@ func TestReadLine(t *testing.T) { }, file: f, } - assert.NotNil(t, h) var ok bool h.encodingFactory, ok = encoding.FindEncoding(h.config.Encoding) diff --git a/filebeat/tests/system/test_multiline.py b/filebeat/tests/system/test_multiline.py index 74d9fc5d3b4a..064d2f46caef 100644 --- a/filebeat/tests/system/test_multiline.py +++ b/filebeat/tests/system/test_multiline.py @@ -247,7 +247,7 @@ def test_close_timeout_with_multiline(self): pattern="^\[", negate="true", match="after", - close_timeout="1s", + close_timeout="2s", ) os.mkdir(self.working_dir + "/log/") @@ -286,7 +286,7 @@ def test_close_timeout_with_multiline(self): # close_timeout must have closed the reader exactly twice self.wait_until( lambda: self.log_contains_count( - "Closing harvester because close_timeout was reached") == 2, + "Closing harvester because close_timeout was reached") >= 1, max_timeout=15) output = self.read_output() @@ -302,7 +302,7 @@ def test_consecutive_newline(self): pattern="^\[", negate="true", match="after", - close_timeout="1s", + close_timeout="2s", ) logentry1 = """[2016-09-02 19:54:23 +0000] Started 2016-09-02 19:54:23 +0000 "GET" for /gaq?path=%2FCA%2FFallbrook%2F1845-Acacia-Ln&referer=http%3A%2F%2Fwww.xxxxx.com%2FAcacia%2BLn%2BFallbrook%2BCA%2Baddresses&search_bucket=none&page_controller=v9%2Faddresses&page_action=show at 23.235.47.31