Skip to content
Merged
27 changes: 25 additions & 2 deletions libbeat/publisher/queue/diskqueue/core_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,16 @@ func (dq *diskQueue) handleReaderLoopResponse(response readerLoopResponse) {
// A segment in the writing list can't be finished writing,
// so we don't check the endOffset.
segment = dq.segments.writing[0]
if response.err != nil {
// Errors reading a writing segment are awkward since we can't discard
// them until the writer loop is done with them. Instead we just seek
// to the end of the current data region. If we're lucky this lets us
// skip the intervening errors; if not, the segment will be cleaned up
// after the writer loop is done with it.
dq.segments.nextReadOffset = segment.endOffset
}
}
segment.framesRead = uint64(dq.segments.nextReadFrameID - segment.firstFrameID)
segment.framesRead += response.frameCount

// If there was an error, report it.
if response.err != nil {
Expand Down Expand Up @@ -346,14 +354,29 @@ func (dq *diskQueue) maybeReadPending() {
// A read request is already pending
return
}
// Check if the next reading segment has already been completely read. (This
// can happen if it was being written and read simultaneously.) In this case
// we should move it to the acking list and proceed to the next segment.
if len(dq.segments.reading) > 0 &&
dq.segments.nextReadOffset >= dq.segments.reading[0].endOffset {
dq.segments.acking = append(dq.segments.acking, dq.segments.reading[0])
dq.segments.reading = dq.segments.reading[1:]
dq.segments.nextReadOffset = 0
}
// Get the next available segment from the reading or writing lists.
segment := dq.segments.readingSegment()
if segment == nil ||
dq.segments.nextReadOffset >= segmentOffset(segment.endOffset) {
// Nothing to read
return
}
if dq.segments.nextReadOffset == 0 {
// If we're reading the beginning of this segment, assign its firstFrameID.
// If we're reading the beginning of this segment, assign its firstFrameID
// so we can recognize its acked frames later.
// The first segment we read might not have its initial nextReadOffset
// set to 0 if the segment was already partially read on a previous run.
// However that can only happen when nextReadFrameID == 0, so we don't
// need to do anything in that case.
segment.firstFrameID = dq.segments.nextReadFrameID
}
request := readerLoopRequest{
Expand Down
Loading