Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply close_timeout also when output is blocked #3511

Merged
merged 1 commit into from
Feb 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- Add the `pipeline` config option at the prospector level, for configuring the Ingest Node pipeline ID. {pull}3433[3433]
- Update regular expressions used for matching file names or lines (multiline, include/exclude functionality) to new matchers improving performance of simple string matches. {pull}3469[3469]
- The `symlinks` and `harverster_limit` settings are now GA, instead of experimental. {pull}3525[3525]
- close_timeout is also applied when the output is blocking. {pull}3511[3511]

*Heartbeat*

Expand Down
6 changes: 5 additions & 1 deletion filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package harvester
import (
"errors"
"fmt"
"sync"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
Expand All @@ -39,6 +40,8 @@ type Harvester struct {
fileReader *LogFile
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
prospectorDone chan struct{}
once sync.Once
done chan struct{}
}

Expand All @@ -53,7 +56,8 @@ func NewHarvester(
config: defaultConfig,
state: state,
prospectorChan: prospectorChan,
done: done,
prospectorDone: done,
done: make(chan struct{}),
}

if err := cfg.Unpack(&h.config); err != nil {
Expand Down
65 changes: 42 additions & 23 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,29 @@ func (h *Harvester) Harvest(r reader.Reader) {
defer h.close()

// Channel to stop internal harvester routines
harvestDone := make(chan struct{})
defer close(harvestDone)
defer h.stop()

// Closes reader after timeout or when done channel is closed
// This routine is also responsible to properly stop the reader
go func() {
var closeTimeout <-chan time.Time

closeTimeout := make(<-chan time.Time)
// starts close_timeout timer
if h.config.CloseTimeout > 0 {
closeTimeout = time.After(h.config.CloseTimeout)
}

select {
// Applies when timeout is reached
case <-closeTimeout:
logp.Info("Closing harvester because close_timeout was reached: %s", h.state.Source)
logp.Info("Closing harvester because close_timeout was reached.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see how access to h.state.Source can race. Consider printing the last state in defer statement when Harvest returns (but after workers have been finished).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a looking message in the closing part where the state.Source is inside. So I don't think it is necessary here.

// Required for shutdown when hanging inside reader
case <-h.done:
case <-h.prospectorDone:
// Required when reader loop returns and reader finished
case <-harvestDone:
case <-h.done:
}

h.stop()
h.fileReader.Close()
}()

Expand Down Expand Up @@ -122,9 +125,10 @@ func (h *Harvester) Harvest(r reader.Reader) {
// Update offset
h.state.Offset += int64(message.Bytes)

// Create state event
event := input.NewEvent(h.getState())
state := h.getState()

// Create state event
event := input.NewEvent(state)
text := string(message.Content)

// Check if data should be added to event. Only export non empty events.
Expand All @@ -147,12 +151,21 @@ func (h *Harvester) Harvest(r reader.Reader) {
if !h.sendEvent(event) {
return
}
// Update state of harvester as successfully sent
h.state = state
}
}

func (h *Harvester) stop() {
h.once.Do(func() {
close(h.done)
})
}

// sendEvent sends event to the spooler channel
// Return false if event was not sent
func (h *Harvester) sendEvent(event *input.Event) bool {

select {
case <-h.done:
return false
Expand All @@ -161,6 +174,21 @@ func (h *Harvester) sendEvent(event *input.Event) bool {
}
}

// sendStateUpdate send an empty event with the current state to update the registry
// close_timeout does not apply here to make sure a harvester is closed properly. In
// case the output is blocked the harvester will stay open to make sure no new harvester
// is started. As soon as the output becomes available again, the finished state is written
// and processing can continue.
func (h *Harvester) sendStateUpdate() {
logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset)
event := input.NewEvent(h.state)

select {
case <-h.prospectorDone:
case h.prospectorChan <- event: // ship the new event downstream
}
}

// shouldExportLine decides if the line is exported or not based on
// the include_lines and exclude_lines options.
func (h *Harvester) shouldExportLine(line string) bool {
Expand Down Expand Up @@ -260,22 +288,18 @@ func (h *Harvester) initFileOffset(file *os.File) (int64, error) {
return file.Seek(0, os.SEEK_CUR)
}

// sendStateUpdate send an empty event with the current state to update the registry
func (h *Harvester) sendStateUpdate() bool {
logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset)
event := input.NewEvent(h.getState())
return h.sendEvent(event)
}

// getState returns an updated copy of the harvester state
func (h *Harvester) getState() file.State {

if h.config.InputType == config.StdinInputType {
return file.State{}
}

state := h.state

// refreshes the values in State with the values from the harvester itself
h.state.FileStateOS = file.GetOSState(h.state.Fileinfo)
return h.state
state.FileStateOS = file.GetOSState(h.state.Fileinfo)
return state
}

func (h *Harvester) close() {
Expand All @@ -289,6 +313,7 @@ func (h *Harvester) close() {
// If file was never opened, it can't be closed
if h.file != nil {

// close file handler
h.file.Close()

logp.Debug("harvester", "Closing file: %s", h.state.Source)
Expand Down Expand Up @@ -350,9 +375,3 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {

return reader.NewLimit(r, h.config.MaxBytes), nil
}

/*

TODO: introduce new structure: log_file —[raw bytes]—> (line —[utf8 bytes]—> encode) —[message]—> …`

*/
1 change: 0 additions & 1 deletion filebeat/harvester/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func TestReadLine(t *testing.T) {
},
file: f,
}
assert.NotNil(t, h)

var ok bool
h.encodingFactory, ok = encoding.FindEncoding(h.config.Encoding)
Expand Down
6 changes: 3 additions & 3 deletions filebeat/tests/system/test_multiline.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def test_close_timeout_with_multiline(self):
pattern="^\[",
negate="true",
match="after",
close_timeout="1s",
close_timeout="2s",
)

os.mkdir(self.working_dir + "/log/")
Expand Down Expand Up @@ -286,7 +286,7 @@ def test_close_timeout_with_multiline(self):
# close_timeout must have closed the reader exactly twice
self.wait_until(
lambda: self.log_contains_count(
"Closing harvester because close_timeout was reached") == 2,
"Closing harvester because close_timeout was reached") >= 1,
max_timeout=15)

output = self.read_output()
Expand All @@ -302,7 +302,7 @@ def test_consecutive_newline(self):
pattern="^\[",
negate="true",
match="after",
close_timeout="1s",
close_timeout="2s",
)

logentry1 = """[2016-09-02 19:54:23 +0000] Started 2016-09-02 19:54:23 +0000 "GET" for /gaq?path=%2FCA%2FFallbrook%2F1845-Acacia-Ln&referer=http%3A%2F%2Fwww.xxxxx.com%2FAcacia%2BLn%2BFallbrook%2BCA%2Baddresses&search_bucket=none&page_controller=v9%2Faddresses&page_action=show at 23.235.47.31
Expand Down