-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prospector and Harvester Cleanups #2020
Conversation
99dd8cc
to
921504c
Compare
@@ -148,11 +160,12 @@ func (h *Harvester) shouldExportLine(line string) bool { | |||
// the file system is scanned | |||
func (h *Harvester) openFile() error { | |||
|
|||
f, err := file.ReadOpen(h.path) | |||
f, err := file.ReadOpen(h.state.Source) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After doing file.ReadOpen
a number of checks that might return an error are executed. Question: in case of openFile returning an error, who is closing the file handle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right :-( This could be a reason we leak some file descriptors. We must backport this fix to 1.3 @tsg FYI
f94045f
to
6ff1beb
Compare
"harvest: %q position:%d (offset snapshot:%d)", h.path, h.getOffset(), offset) | ||
_, err = file.Seek(h.getOffset(), os.SEEK_SET) | ||
logp.Debug("harvester", "harvest: %q position:%d (offset snapshot:%d)", h.state.Source, h.state.Offset, offset) | ||
_, err = file.Seek(h.state.Offset, os.SEEK_SET) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing offset = h.state.Offset
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, glad you spotted this as otherwise it gets overwritten below :-(
6ff1beb
to
72f41b5
Compare
@@ -186,52 +216,38 @@ func (h *Harvester) openFile() error { | |||
return err | |||
} | |||
|
|||
// yay, open file | |||
h.file = source.File{f} | |||
return nil | |||
} | |||
|
|||
func (h *Harvester) initFileOffset(file *os.File) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@urso After your input I decided to rewrite / cleanup this part here. I think it is identical to before, but now max 1 Seek instead of 2 are needed and it seems easier to read. Can you have a close look if it looks sane for you?
@ruflin needs rebase |
df9e833
to
37e9b4f
Compare
return nil | ||
} | ||
|
||
func (h *Harvester) initFileOffset(file *os.File) error { | ||
offset, err := file.Seek(0, os.SEEK_CUR) | ||
func (h *Harvester) getFileOffset(file *os.File) (int64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think initFileOffset
still matches this methods functionality better. getFileOffset
sounds like re-usable getter.
See elastic#1913 * Make sure ignore_older is not only applied to new files but also existing files * Improve logging in prospector * Add expvar to prospector and harvester * Cleanup offset handling * Add truncation handling to prospector * Handle all offsets as part of the state * Remove path from harvester and use state.Source instead * Remove obsolete exclude / include options * Remove unnecessary getOffset() * Fix potential file handler leak. Close file handler in case of errors. * Implement cleaner file handler closing * Cleanup initOffset handling * Add build-image task to generate json in metricbeat as it was missing * Only send state if file was properly initialised * Only set offset if seek succeeds
37e9b4f
to
c428d42
Compare
See elastic#1913