Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions libbeat/publisher/queue/diskqueue/acks.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,25 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
)

// queuePosition represents a logical position within the queue buffer.
// queuePosition represents the position of a data frame within the queue: the
// containing segment, and a byte index into that segment on disk.
// It also stores the 0-based index of the current frame within its segment
// file. (Note that this depends only on the segment file itself, and is
// unrelated to the frameID type used to identify frames in memory.)
// The frame index is logically redundant with the byte index, but
// calculating it requires a linear scan of the segment file, so we store
// both values so we can track frame counts without reading the whole segment.
// When referencing a data frame, a byteIndex of 0 / uninitialized is
// understood to mean the first frame on disk (the header offset is
// added during handling); thus, `queuePosition{segmentID: 5}` always points
// to the first frame of segment 5, even though the logical position on
// disk depends on the header size, which can vary across schema version/s.
// However, a nonzero byteIndex is always interpreted as an exact
// file position.
type queuePosition struct {
segmentID segmentID
offset segmentOffset
segmentID segmentID
byteIndex uint64
frameIndex uint64
}

type diskQueueACKs struct {
Expand Down Expand Up @@ -114,15 +129,17 @@ func (dqa *diskQueueACKs) addFrames(frames []*readFrame) {
newSegment, ok := dqa.segmentBoundaries[dqa.nextFrameID]
if ok {
// This is the start of a new segment. Remove this frame from the
// segment boundary list and set the position to the start of the
// new segment.
// segment boundary list and reset the byte index to immediately
// after the segment header.
delete(dqa.segmentBoundaries, dqa.nextFrameID)
dqa.nextPosition = queuePosition{
segmentID: newSegment,
offset: 0,
segmentID: newSegment,
byteIndex: segmentHeaderSize,
frameIndex: 0,
}
}
dqa.nextPosition.offset += segmentOffset(dqa.frameSize[dqa.nextFrameID])
dqa.nextPosition.byteIndex += dqa.frameSize[dqa.nextFrameID]
dqa.nextPosition.frameIndex++
delete(dqa.frameSize, dqa.nextFrameID)
}
// We advanced the ACK position at least somewhat, so write its
Expand Down
14 changes: 8 additions & 6 deletions libbeat/publisher/queue/diskqueue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (c *userConfig) Validate() error {
// be at least twice as large.
if c.SegmentSize != nil && c.MaxSize != 0 && c.MaxSize < *c.SegmentSize*2 {
return errors.New(
"Disk queue max_size must be at least twice as big as segment_size")
"disk queue max_size must be at least twice as big as segment_size")
}

// We require a total queue size of at least 10MB, and a segment size of
Expand All @@ -100,17 +100,17 @@ func (c *userConfig) Validate() error {
// restarts, it will work fine.
if c.MaxSize != 0 && c.MaxSize < 10*1000*1000 {
return fmt.Errorf(
"Disk queue max_size (%d) cannot be less than 10MB", c.MaxSize)
"disk queue max_size (%d) cannot be less than 10MB", c.MaxSize)
}
if c.SegmentSize != nil && *c.SegmentSize < 1000*1000 {
return fmt.Errorf(
"Disk queue segment_size (%d) cannot be less than 1MB", *c.SegmentSize)
"disk queue segment_size (%d) cannot be less than 1MB", *c.SegmentSize)
}

if c.RetryInterval != nil && c.MaxRetryInterval != nil &&
*c.MaxRetryInterval < *c.RetryInterval {
return fmt.Errorf(
"Disk queue max_retry_interval (%v) can't be less than retry_interval (%v)",
"disk queue max_retry_interval (%v) can't be less than retry_interval (%v)",
*c.MaxRetryInterval, *c.RetryInterval)
}

Expand Down Expand Up @@ -189,8 +189,10 @@ func (settings Settings) segmentPath(segmentID segmentID) string {
fmt.Sprintf("%v.seg", segmentID))
}

func (settings Settings) maxSegmentOffset() segmentOffset {
return segmentOffset(settings.MaxSegmentSize - segmentHeaderSize)
// maxValidFrameSize returns the size of the largest possible frame that
// can be stored with the current queue settings.
func (settings Settings) maxValidFrameSize() uint64 {
return settings.MaxSegmentSize - segmentHeaderSize
}

// Given a retry interval, nextRetryInterval returns the next higher level
Expand Down
4 changes: 2 additions & 2 deletions libbeat/publisher/queue/diskqueue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ type diskQueueBatch struct {

func (consumer *diskQueueConsumer) Get(eventCount int) (queue.Batch, error) {
if consumer.closed {
return nil, fmt.Errorf("Tried to read from a closed disk queue consumer")
return nil, fmt.Errorf("tried to read from a closed disk queue consumer")
}

// Read at least one frame. This is guaranteed to eventually
// succeed unless the queue is closed.
frame, ok := <-consumer.queue.readerLoop.output
if !ok {
return nil, fmt.Errorf("Tried to read from a closed disk queue")
return nil, fmt.Errorf("tried to read from a closed disk queue")
}
frames := []*readFrame{frame}
eventLoop:
Expand Down
117 changes: 56 additions & 61 deletions libbeat/publisher/queue/diskqueue/core_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ func (dq *diskQueue) handleProducerWriteRequest(request producerWriteRequest) {
// than an entire segment all by itself (as long as it isn't, it is
// guaranteed to eventually enter the queue assuming no disk errors).
frameSize := request.frame.sizeOnDisk()
if dq.settings.maxSegmentOffset() < segmentOffset(frameSize) {
if frameSize > dq.settings.maxValidFrameSize() {
dq.logger.Warnf(
"Rejecting event with size %v because the segment buffer limit is %v",
frameSize, dq.settings.maxSegmentOffset())
frameSize, dq.settings.maxValidFrameSize())
request.responseChan <- false
return
}
Expand Down Expand Up @@ -129,14 +129,15 @@ func (dq *diskQueue) handleWriterLoopResponse(response writerLoopResponse) {
// The writer loop response contains the number of bytes written to
// each segment that appeared in the request. Entries always appear in
// the same sequence as (the beginning of) segments.writing.
for index, bytesWritten := range response.bytesWritten {
for index, segmentEntry := range response.segments {
// Update the segment with its new size.
dq.segments.writing[index].endOffset += segmentOffset(bytesWritten)
dq.segments.writing[index].byteCount += segmentEntry.bytesWritten
dq.segments.writing[index].frameCount += segmentEntry.framesWritten
}

// If there is more than one segment in the response, then all but the
// last have been closed and are ready to move to the reading list.
closedCount := len(response.bytesWritten) - 1
closedCount := len(response.segments) - 1
if closedCount > 0 {
// Remove the prefix of the writing array and append to to reading.
closedSegments := dq.segments.writing[:closedCount]
Expand All @@ -151,37 +152,19 @@ func (dq *diskQueue) handleReaderLoopResponse(response readerLoopResponse) {

// Advance the frame / offset based on what was just completed.
dq.segments.nextReadFrameID += frameID(response.frameCount)
dq.segments.nextReadOffset += segmentOffset(response.byteCount)
dq.segments.nextReadPosition += response.byteCount

var segment *queueSegment
if len(dq.segments.reading) > 0 {
// A segment is finished if we have read all the data, or
// the read response reports an error.
// Segments in the reading list have been completely written,
// so we can rely on their endOffset field to determine their size.
segment = dq.segments.reading[0]
if dq.segments.nextReadOffset >= segment.endOffset || response.err != nil {
dq.segments.reading = dq.segments.reading[1:]
dq.segments.acking = append(dq.segments.acking, segment)
dq.segments.nextReadOffset = 0
}
} else {
// A segment in the writing list can't be finished writing,
// so we don't check the endOffset.
segment = dq.segments.writing[0]
if response.err != nil {
// Errors reading a writing segment are awkward since we can't discard
// them until the writer loop is done with them. Instead we just seek
// to the end of the current data region. If we're lucky this lets us
// skip the intervening errors; if not, the segment will be cleaned up
// after the writer loop is done with it.
dq.segments.nextReadOffset = segment.endOffset
}
}
segment := dq.segments.readingSegment()
segment.framesRead += response.frameCount

// If there was an error, report it.
if response.err != nil {
// If there's an error, we advance to the end of the current segment.
// If the segment is in the reading list, it will be removed on the
// next call to maybeReadPending.
// If the segment is still in the writing list, we can't discard it
// until the writer loop is done with it, but we can hope that advancing
// to the current write position will get us out of our error state.
dq.segments.nextReadPosition = segment.byteCount
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the way readers and writers are coordinated, an error in a segment that is currently written would likely indicate a bug in the business logic, a bug in the framing done by the writer, or an unexpected race condition. The system will most likely recover once we did start a new segment file. In this case we might want to use logger.Criticalf, in order to encourage users to report bugs.

Errors on older, already closed segment files might indicate a broken/invalid segment file, or a bug in the framing. The former is to be expected if the system was not shutdown cleanly. Error level would be enough I think.


dq.logger.Errorf(
"Error reading segment file %s: %v",
dq.settings.segmentPath(segment.id), response.err)
Expand All @@ -197,7 +180,7 @@ func (dq *diskQueue) handleDeleterLoopResponse(response deleterLoopResponse) {
// This segment had an error, so it stays in the acked list.
newAckedSegments = append(newAckedSegments, dq.segments.acked[i])
errors = append(errors,
fmt.Errorf("Couldn't delete segment %d: %w",
fmt.Errorf("couldn't delete segment %d: %w",
dq.segments.acked[i].id, err))
}
}
Expand All @@ -209,7 +192,7 @@ func (dq *diskQueue) handleDeleterLoopResponse(response deleterLoopResponse) {
}
dq.segments.acked = newAckedSegments
if len(errors) > 0 {
dq.logger.Errorw("Deleting segment files", "errors", errors)
dq.logger.Errorw("deleting segment files", "errors", errors)
}
}

Expand Down Expand Up @@ -304,7 +287,7 @@ func (dq *diskQueue) handleShutdown() {
// delete things before the current segment.
if len(dq.segments.writing) > 0 &&
finalPosition.segmentID == dq.segments.writing[0].id &&
finalPosition.offset >= dq.segments.writing[0].endOffset {
finalPosition.byteIndex >= dq.segments.writing[0].byteCount {
dq.handleSegmentACK(finalPosition.segmentID)
} else if finalPosition.segmentID > 0 {
dq.handleSegmentACK(finalPosition.segmentID - 1)
Expand Down Expand Up @@ -353,43 +336,53 @@ func (segments *diskQueueSegments) readingSegment() *queueSegment {
return nil
}

// If the first entry of the reading list has been completely consumed,
// move it to the acking list and update the read position.
func (dq *diskQueue) maybeAdvanceReadingList() {
if len(dq.segments.reading) > 0 {
segment := dq.segments.reading[0]
if dq.segments.nextReadPosition >= segment.byteCount {
dq.segments.acking = append(dq.segments.acking, dq.segments.reading[0])
dq.segments.reading = dq.segments.reading[1:]
dq.segments.nextReadPosition = 0
}
}
}

// If the reading list is nonempty, and there are no outstanding read
// requests, send one.
func (dq *diskQueue) maybeReadPending() {
if dq.reading {
// A read request is already pending
return
}
// Check if the next reading segment has already been completely read. (This
// can happen if it was being written and read simultaneously.) In this case
// we should move it to the acking list and proceed to the next segment.
if len(dq.segments.reading) > 0 &&
dq.segments.nextReadOffset >= dq.segments.reading[0].endOffset {
dq.segments.acking = append(dq.segments.acking, dq.segments.reading[0])
dq.segments.reading = dq.segments.reading[1:]
dq.segments.nextReadOffset = 0
}
// If the current segment has already been completely read, move to
// the next one.
dq.maybeAdvanceReadingList()

// Get the next available segment from the reading or writing lists.
segment := dq.segments.readingSegment()
if segment == nil ||
dq.segments.nextReadOffset >= segmentOffset(segment.endOffset) {
dq.segments.nextReadPosition >= segment.byteCount {
// Nothing to read
return
}
if dq.segments.nextReadOffset == 0 {
// If we're reading the beginning of this segment, assign its firstFrameID
// so we can recognize its acked frames later.
// The first segment we read might not have its initial nextReadOffset
// set to 0 if the segment was already partially read on a previous run.
// However that can only happen when nextReadFrameID == 0, so we don't
// need to do anything in that case.
if dq.segments.nextReadPosition == 0 {
// If we're reading this segment for the first time, assign its
// firstFrameID so we can recognize its acked frames later, and advance
// the reading position to the end of the segment header.
// The first segment we read might not have the initial nextReadPosition
// set to 0 if it was already partially read on a previous run.
// However that can only happen when nextReadFrameID == 0, so in that
// case firstFrameID is already initialized to the correct value.
segment.firstFrameID = dq.segments.nextReadFrameID
dq.segments.nextReadPosition = segment.headerSize()
}
request := readerLoopRequest{
segment: segment,
startFrameID: dq.segments.nextReadFrameID,
startOffset: dq.segments.nextReadOffset,
endOffset: segment.endOffset,
segment: segment,
startFrameID: dq.segments.nextReadFrameID,
startPosition: dq.segments.nextReadPosition,
endPosition: segment.byteCount,
}
dq.readerLoop.requestChan <- request
dq.reading = true
Expand Down Expand Up @@ -433,18 +426,20 @@ func (dq *diskQueue) enqueueWriteFrame(frame *writeFrame) {
if len(dq.segments.writing) > 0 {
segment = dq.segments.writing[len(dq.segments.writing)-1]
}
frameLen := segmentOffset(frame.sizeOnDisk())
newSegmentSize := dq.segments.writingSegmentSize + frame.sizeOnDisk()
// If segment is nil, or the new segment exceeds its bounds,
// we need to create a new writing segment.
if segment == nil ||
dq.segments.nextWriteOffset+frameLen > dq.settings.maxSegmentOffset() {
newSegmentSize > dq.settings.MaxSegmentSize {
segment = &queueSegment{id: dq.segments.nextID}
dq.segments.writing = append(dq.segments.writing, segment)
dq.segments.nextID++
dq.segments.nextWriteOffset = 0
// Reset the on-disk size to its initial value, the file's header size
// with no frame data.
newSegmentSize = segmentHeaderSize
}

dq.segments.nextWriteOffset += frameLen
dq.segments.writingSegmentSize = newSegmentSize
dq.pendingFrames = append(dq.pendingFrames, segmentedFrame{
frame: frame,
segment: segment,
Expand Down
Loading