diff --git a/libbeat/publisher/queue/diskqueue/acks.go b/libbeat/publisher/queue/diskqueue/acks.go index ed9d7589db2d..397d52f25304 100644 --- a/libbeat/publisher/queue/diskqueue/acks.go +++ b/libbeat/publisher/queue/diskqueue/acks.go @@ -24,10 +24,25 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" ) -// queuePosition represents a logical position within the queue buffer. +// queuePosition represents the position of a data frame within the queue: the +// containing segment, and a byte index into that segment on disk. +// It also stores the 0-based index of the current frame within its segment +// file. (Note that this depends only on the segment file itself, and is +// unrelated to the frameID type used to identify frames in memory.) +// The frame index is logically redundant with the byte index, but +// calculating it requires a linear scan of the segment file, so we store +// both values so we can track frame counts without reading the whole segment. +// When referencing a data frame, a byteIndex of 0 / uninitialized is +// understood to mean the first frame on disk (the header offset is +// added during handling); thus, `queuePosition{segmentID: 5}` always points +// to the first frame of segment 5, even though the logical position on +// disk depends on the header size, which can vary across schema version/s. +// However, a nonzero byteIndex is always interpreted as an exact +// file position. type queuePosition struct { - segmentID segmentID - offset segmentOffset + segmentID segmentID + byteIndex uint64 + frameIndex uint64 } type diskQueueACKs struct { @@ -114,15 +129,17 @@ func (dqa *diskQueueACKs) addFrames(frames []*readFrame) { newSegment, ok := dqa.segmentBoundaries[dqa.nextFrameID] if ok { // This is the start of a new segment. Remove this frame from the - // segment boundary list and set the position to the start of the - // new segment. + // segment boundary list and reset the byte index to immediately + // after the segment header. delete(dqa.segmentBoundaries, dqa.nextFrameID) dqa.nextPosition = queuePosition{ - segmentID: newSegment, - offset: 0, + segmentID: newSegment, + byteIndex: segmentHeaderSize, + frameIndex: 0, } } - dqa.nextPosition.offset += segmentOffset(dqa.frameSize[dqa.nextFrameID]) + dqa.nextPosition.byteIndex += dqa.frameSize[dqa.nextFrameID] + dqa.nextPosition.frameIndex++ delete(dqa.frameSize, dqa.nextFrameID) } // We advanced the ACK position at least somewhat, so write its diff --git a/libbeat/publisher/queue/diskqueue/config.go b/libbeat/publisher/queue/diskqueue/config.go index b8ef456d03d4..78fdbbe36022 100644 --- a/libbeat/publisher/queue/diskqueue/config.go +++ b/libbeat/publisher/queue/diskqueue/config.go @@ -89,7 +89,7 @@ func (c *userConfig) Validate() error { // be at least twice as large. if c.SegmentSize != nil && c.MaxSize != 0 && c.MaxSize < *c.SegmentSize*2 { return errors.New( - "Disk queue max_size must be at least twice as big as segment_size") + "disk queue max_size must be at least twice as big as segment_size") } // We require a total queue size of at least 10MB, and a segment size of @@ -100,17 +100,17 @@ func (c *userConfig) Validate() error { // restarts, it will work fine. if c.MaxSize != 0 && c.MaxSize < 10*1000*1000 { return fmt.Errorf( - "Disk queue max_size (%d) cannot be less than 10MB", c.MaxSize) + "disk queue max_size (%d) cannot be less than 10MB", c.MaxSize) } if c.SegmentSize != nil && *c.SegmentSize < 1000*1000 { return fmt.Errorf( - "Disk queue segment_size (%d) cannot be less than 1MB", *c.SegmentSize) + "disk queue segment_size (%d) cannot be less than 1MB", *c.SegmentSize) } if c.RetryInterval != nil && c.MaxRetryInterval != nil && *c.MaxRetryInterval < *c.RetryInterval { return fmt.Errorf( - "Disk queue max_retry_interval (%v) can't be less than retry_interval (%v)", + "disk queue max_retry_interval (%v) can't be less than retry_interval (%v)", *c.MaxRetryInterval, *c.RetryInterval) } @@ -189,8 +189,10 @@ func (settings Settings) segmentPath(segmentID segmentID) string { fmt.Sprintf("%v.seg", segmentID)) } -func (settings Settings) maxSegmentOffset() segmentOffset { - return segmentOffset(settings.MaxSegmentSize - segmentHeaderSize) +// maxValidFrameSize returns the size of the largest possible frame that +// can be stored with the current queue settings. +func (settings Settings) maxValidFrameSize() uint64 { + return settings.MaxSegmentSize - segmentHeaderSize } // Given a retry interval, nextRetryInterval returns the next higher level diff --git a/libbeat/publisher/queue/diskqueue/consumer.go b/libbeat/publisher/queue/diskqueue/consumer.go index b2922778ea54..b88c6e59786b 100644 --- a/libbeat/publisher/queue/diskqueue/consumer.go +++ b/libbeat/publisher/queue/diskqueue/consumer.go @@ -40,14 +40,14 @@ type diskQueueBatch struct { func (consumer *diskQueueConsumer) Get(eventCount int) (queue.Batch, error) { if consumer.closed { - return nil, fmt.Errorf("Tried to read from a closed disk queue consumer") + return nil, fmt.Errorf("tried to read from a closed disk queue consumer") } // Read at least one frame. This is guaranteed to eventually // succeed unless the queue is closed. frame, ok := <-consumer.queue.readerLoop.output if !ok { - return nil, fmt.Errorf("Tried to read from a closed disk queue") + return nil, fmt.Errorf("tried to read from a closed disk queue") } frames := []*readFrame{frame} eventLoop: diff --git a/libbeat/publisher/queue/diskqueue/core_loop.go b/libbeat/publisher/queue/diskqueue/core_loop.go index ac6e22c52d86..fd7e710372ae 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop.go +++ b/libbeat/publisher/queue/diskqueue/core_loop.go @@ -93,10 +93,10 @@ func (dq *diskQueue) handleProducerWriteRequest(request producerWriteRequest) { // than an entire segment all by itself (as long as it isn't, it is // guaranteed to eventually enter the queue assuming no disk errors). frameSize := request.frame.sizeOnDisk() - if dq.settings.maxSegmentOffset() < segmentOffset(frameSize) { + if frameSize > dq.settings.maxValidFrameSize() { dq.logger.Warnf( "Rejecting event with size %v because the segment buffer limit is %v", - frameSize, dq.settings.maxSegmentOffset()) + frameSize, dq.settings.maxValidFrameSize()) request.responseChan <- false return } @@ -129,14 +129,15 @@ func (dq *diskQueue) handleWriterLoopResponse(response writerLoopResponse) { // The writer loop response contains the number of bytes written to // each segment that appeared in the request. Entries always appear in // the same sequence as (the beginning of) segments.writing. - for index, bytesWritten := range response.bytesWritten { + for index, segmentEntry := range response.segments { // Update the segment with its new size. - dq.segments.writing[index].endOffset += segmentOffset(bytesWritten) + dq.segments.writing[index].byteCount += segmentEntry.bytesWritten + dq.segments.writing[index].frameCount += segmentEntry.framesWritten } // If there is more than one segment in the response, then all but the // last have been closed and are ready to move to the reading list. - closedCount := len(response.bytesWritten) - 1 + closedCount := len(response.segments) - 1 if closedCount > 0 { // Remove the prefix of the writing array and append to to reading. closedSegments := dq.segments.writing[:closedCount] @@ -151,37 +152,19 @@ func (dq *diskQueue) handleReaderLoopResponse(response readerLoopResponse) { // Advance the frame / offset based on what was just completed. dq.segments.nextReadFrameID += frameID(response.frameCount) - dq.segments.nextReadOffset += segmentOffset(response.byteCount) + dq.segments.nextReadPosition += response.byteCount - var segment *queueSegment - if len(dq.segments.reading) > 0 { - // A segment is finished if we have read all the data, or - // the read response reports an error. - // Segments in the reading list have been completely written, - // so we can rely on their endOffset field to determine their size. - segment = dq.segments.reading[0] - if dq.segments.nextReadOffset >= segment.endOffset || response.err != nil { - dq.segments.reading = dq.segments.reading[1:] - dq.segments.acking = append(dq.segments.acking, segment) - dq.segments.nextReadOffset = 0 - } - } else { - // A segment in the writing list can't be finished writing, - // so we don't check the endOffset. - segment = dq.segments.writing[0] - if response.err != nil { - // Errors reading a writing segment are awkward since we can't discard - // them until the writer loop is done with them. Instead we just seek - // to the end of the current data region. If we're lucky this lets us - // skip the intervening errors; if not, the segment will be cleaned up - // after the writer loop is done with it. - dq.segments.nextReadOffset = segment.endOffset - } - } + segment := dq.segments.readingSegment() segment.framesRead += response.frameCount - - // If there was an error, report it. if response.err != nil { + // If there's an error, we advance to the end of the current segment. + // If the segment is in the reading list, it will be removed on the + // next call to maybeReadPending. + // If the segment is still in the writing list, we can't discard it + // until the writer loop is done with it, but we can hope that advancing + // to the current write position will get us out of our error state. + dq.segments.nextReadPosition = segment.byteCount + dq.logger.Errorf( "Error reading segment file %s: %v", dq.settings.segmentPath(segment.id), response.err) @@ -197,7 +180,7 @@ func (dq *diskQueue) handleDeleterLoopResponse(response deleterLoopResponse) { // This segment had an error, so it stays in the acked list. newAckedSegments = append(newAckedSegments, dq.segments.acked[i]) errors = append(errors, - fmt.Errorf("Couldn't delete segment %d: %w", + fmt.Errorf("couldn't delete segment %d: %w", dq.segments.acked[i].id, err)) } } @@ -209,7 +192,7 @@ func (dq *diskQueue) handleDeleterLoopResponse(response deleterLoopResponse) { } dq.segments.acked = newAckedSegments if len(errors) > 0 { - dq.logger.Errorw("Deleting segment files", "errors", errors) + dq.logger.Errorw("deleting segment files", "errors", errors) } } @@ -304,7 +287,7 @@ func (dq *diskQueue) handleShutdown() { // delete things before the current segment. if len(dq.segments.writing) > 0 && finalPosition.segmentID == dq.segments.writing[0].id && - finalPosition.offset >= dq.segments.writing[0].endOffset { + finalPosition.byteIndex >= dq.segments.writing[0].byteCount { dq.handleSegmentACK(finalPosition.segmentID) } else if finalPosition.segmentID > 0 { dq.handleSegmentACK(finalPosition.segmentID - 1) @@ -353,6 +336,19 @@ func (segments *diskQueueSegments) readingSegment() *queueSegment { return nil } +// If the first entry of the reading list has been completely consumed, +// move it to the acking list and update the read position. +func (dq *diskQueue) maybeAdvanceReadingList() { + if len(dq.segments.reading) > 0 { + segment := dq.segments.reading[0] + if dq.segments.nextReadPosition >= segment.byteCount { + dq.segments.acking = append(dq.segments.acking, dq.segments.reading[0]) + dq.segments.reading = dq.segments.reading[1:] + dq.segments.nextReadPosition = 0 + } + } +} + // If the reading list is nonempty, and there are no outstanding read // requests, send one. func (dq *diskQueue) maybeReadPending() { @@ -360,36 +356,33 @@ func (dq *diskQueue) maybeReadPending() { // A read request is already pending return } - // Check if the next reading segment has already been completely read. (This - // can happen if it was being written and read simultaneously.) In this case - // we should move it to the acking list and proceed to the next segment. - if len(dq.segments.reading) > 0 && - dq.segments.nextReadOffset >= dq.segments.reading[0].endOffset { - dq.segments.acking = append(dq.segments.acking, dq.segments.reading[0]) - dq.segments.reading = dq.segments.reading[1:] - dq.segments.nextReadOffset = 0 - } + // If the current segment has already been completely read, move to + // the next one. + dq.maybeAdvanceReadingList() + // Get the next available segment from the reading or writing lists. segment := dq.segments.readingSegment() if segment == nil || - dq.segments.nextReadOffset >= segmentOffset(segment.endOffset) { + dq.segments.nextReadPosition >= segment.byteCount { // Nothing to read return } - if dq.segments.nextReadOffset == 0 { - // If we're reading the beginning of this segment, assign its firstFrameID - // so we can recognize its acked frames later. - // The first segment we read might not have its initial nextReadOffset - // set to 0 if the segment was already partially read on a previous run. - // However that can only happen when nextReadFrameID == 0, so we don't - // need to do anything in that case. + if dq.segments.nextReadPosition == 0 { + // If we're reading this segment for the first time, assign its + // firstFrameID so we can recognize its acked frames later, and advance + // the reading position to the end of the segment header. + // The first segment we read might not have the initial nextReadPosition + // set to 0 if it was already partially read on a previous run. + // However that can only happen when nextReadFrameID == 0, so in that + // case firstFrameID is already initialized to the correct value. segment.firstFrameID = dq.segments.nextReadFrameID + dq.segments.nextReadPosition = segment.headerSize() } request := readerLoopRequest{ - segment: segment, - startFrameID: dq.segments.nextReadFrameID, - startOffset: dq.segments.nextReadOffset, - endOffset: segment.endOffset, + segment: segment, + startFrameID: dq.segments.nextReadFrameID, + startPosition: dq.segments.nextReadPosition, + endPosition: segment.byteCount, } dq.readerLoop.requestChan <- request dq.reading = true @@ -433,18 +426,20 @@ func (dq *diskQueue) enqueueWriteFrame(frame *writeFrame) { if len(dq.segments.writing) > 0 { segment = dq.segments.writing[len(dq.segments.writing)-1] } - frameLen := segmentOffset(frame.sizeOnDisk()) + newSegmentSize := dq.segments.writingSegmentSize + frame.sizeOnDisk() // If segment is nil, or the new segment exceeds its bounds, // we need to create a new writing segment. if segment == nil || - dq.segments.nextWriteOffset+frameLen > dq.settings.maxSegmentOffset() { + newSegmentSize > dq.settings.MaxSegmentSize { segment = &queueSegment{id: dq.segments.nextID} dq.segments.writing = append(dq.segments.writing, segment) dq.segments.nextID++ - dq.segments.nextWriteOffset = 0 + // Reset the on-disk size to its initial value, the file's header size + // with no frame data. + newSegmentSize = segmentHeaderSize } - dq.segments.nextWriteOffset += frameLen + dq.segments.writingSegmentSize = newSegmentSize dq.pendingFrames = append(dq.pendingFrames, segmentedFrame{ frame: frame, segment: segment, diff --git a/libbeat/publisher/queue/diskqueue/core_loop_test.go b/libbeat/publisher/queue/diskqueue/core_loop_test.go index 1eb9ff54a157..3bf53e9b8a53 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop_test.go +++ b/libbeat/publisher/queue/diskqueue/core_loop_test.go @@ -78,9 +78,9 @@ func TestHandleProducerWriteRequest(t *testing.T) { }, "accept with frame in new segment if current segment is full": { segments: diskQueueSegments{ - writing: []*queueSegment{{}}, - nextWriteOffset: 600, - nextID: 1, + writing: []*queueSegment{{}}, + writingSegmentSize: 600, + nextID: 1, }, frameSize: 500, shouldBlock: false, @@ -90,7 +90,7 @@ func TestHandleProducerWriteRequest(t *testing.T) { "reject when full and shouldBlock=false": { segments: diskQueueSegments{ reading: []*queueSegment{ - {endOffset: 9600}, + {byteCount: 9600}, }, }, frameSize: 500, @@ -100,7 +100,7 @@ func TestHandleProducerWriteRequest(t *testing.T) { "block when full and shouldBlock=true": { segments: diskQueueSegments{ reading: []*queueSegment{ - {endOffset: 9600}, + {byteCount: 9600}, }, }, frameSize: 500, @@ -212,7 +212,7 @@ func TestHandleProducerWriteRequest(t *testing.T) { func TestHandleWriterLoopResponse(t *testing.T) { // handleWriterLoopResponse should: - // - Add the values in the bytesWritten array, in order, to the endOffset + // - Add the values in the bytesWritten array, in order, to the byteCount // of the segments in segments.writing (these represent the amount // written to each segment as a result of the preceding writer loop // request). @@ -225,7 +225,7 @@ func TestHandleWriterLoopResponse(t *testing.T) { settings: DefaultSettings(), segments: diskQueueSegments{ writing: []*queueSegment{ - {id: 1, endOffset: 100}, + {id: 1, byteCount: 100}, {id: 2}, {id: 3}, {id: 4}, @@ -235,71 +235,76 @@ func TestHandleWriterLoopResponse(t *testing.T) { // Write to one segment (no segments should be moved to reading list) dq.handleWriterLoopResponse(writerLoopResponse{ - bytesWritten: []int64{100}, + segments: []writerLoopResponseSegment{ + {bytesWritten: 100}, + }, }) if len(dq.segments.writing) != 4 || len(dq.segments.reading) != 0 { t.Fatalf("expected 4 writing and 0 reading segments, got %v writing "+ "and %v reading", len(dq.segments.writing), len(dq.segments.reading)) } - if dq.segments.writing[0].endOffset != 200 { + if dq.segments.writing[0].byteCount != 200 { t.Errorf("expected first writing segment to be size 200, got %v", - dq.segments.writing[0].endOffset) + dq.segments.writing[0].byteCount) } // Write to two segments (the first one should be moved to reading list) dq.handleWriterLoopResponse(writerLoopResponse{ - bytesWritten: []int64{100, 100}, + segments: []writerLoopResponseSegment{ + {bytesWritten: 100}, + {bytesWritten: 100}, + }, }) if len(dq.segments.writing) != 3 || len(dq.segments.reading) != 1 { t.Fatalf("expected 3 writing and 1 reading segments, got %v writing "+ "and %v reading", len(dq.segments.writing), len(dq.segments.reading)) } - if dq.segments.reading[0].endOffset != 300 { + if dq.segments.reading[0].byteCount != 300 { t.Errorf("expected first reading segment to be size 300, got %v", - dq.segments.reading[0].endOffset) + dq.segments.reading[0].byteCount) } - if dq.segments.writing[0].endOffset != 100 { + if dq.segments.writing[0].byteCount != 100 { t.Errorf("expected first writing segment to be size 100, got %v", - dq.segments.writing[0].endOffset) + dq.segments.writing[0].byteCount) } // Write to three segments (the first two should be moved to reading list) dq.handleWriterLoopResponse(writerLoopResponse{ - bytesWritten: []int64{100, 100, 500}, + segments: []writerLoopResponseSegment{ + {bytesWritten: 100}, + {bytesWritten: 100}, + {bytesWritten: 500}, + }, }) if len(dq.segments.writing) != 1 || len(dq.segments.reading) != 3 { t.Fatalf("expected 1 writing and 3 reading segments, got %v writing "+ "and %v reading", len(dq.segments.writing), len(dq.segments.reading)) } - if dq.segments.reading[0].endOffset != 300 { + if dq.segments.reading[0].byteCount != 300 { t.Errorf("expected first reading segment to be size 300, got %v", - dq.segments.reading[0].endOffset) + dq.segments.reading[0].byteCount) } - if dq.segments.reading[1].endOffset != 200 { + if dq.segments.reading[1].byteCount != 200 { t.Errorf("expected second reading segment to be size 200, got %v", - dq.segments.reading[1].endOffset) + dq.segments.reading[1].byteCount) } - if dq.segments.reading[2].endOffset != 100 { + if dq.segments.reading[2].byteCount != 100 { t.Errorf("expected third reading segment to be size 100, got %v", - dq.segments.reading[2].endOffset) + dq.segments.reading[2].byteCount) } - if dq.segments.writing[0].endOffset != 500 { + if dq.segments.writing[0].byteCount != 500 { t.Errorf("expected first writing segment to be size 500, got %v", - dq.segments.writing[0].endOffset) + dq.segments.writing[0].byteCount) } } func TestHandleReaderLoopResponse(t *testing.T) { // handleReaderLoopResponse should: - // - advance segments.{nextReadFrameID, nextReadOffset} by the values in + // - advance segments.{nextReadFrameID, nextReadPosition} by the values in // response.{frameCount, byteCount} // - advance the target segment's framesRead field by response.frameCount - // - if reading[0] encountered an error or was completely read, move it from - // the reading list to the acking list and reset nextReadOffset to zero - // - if writing[0] encountered an error, advance nextReadOffset to the - // segment's current endOffset (we can't discard the active writing - // segment like we do for errors in the reading list, but we can still - // mark the remaining data as processed) + // - if there was an error reading the current segment, set + // nextReadPosition to the end of the segment. testCases := map[string]struct { // The segment structure to start with before calling @@ -307,14 +312,13 @@ func TestHandleReaderLoopResponse(t *testing.T) { segments diskQueueSegments response readerLoopResponse - expectedFrameID frameID - expectedOffset segmentOffset - expectedACKingSegment *segmentID + expectedFrameID frameID + expectedPosition uint64 }{ "completely read first reading segment": { segments: diskQueueSegments{ reading: []*queueSegment{ - {id: 1, endOffset: 1000}, + {id: 1, byteCount: 1000}, }, nextReadFrameID: 5, }, @@ -322,14 +326,13 @@ func TestHandleReaderLoopResponse(t *testing.T) { frameCount: 10, byteCount: 1000, }, - expectedFrameID: 15, - expectedOffset: 0, - expectedACKingSegment: segmentIDRef(1), + expectedFrameID: 15, + expectedPosition: 1000, }, "read first half of first reading segment": { segments: diskQueueSegments{ reading: []*queueSegment{ - {id: 1, endOffset: 1000}, + {id: 1, byteCount: 1000}, }, nextReadFrameID: 5, }, @@ -337,29 +340,28 @@ func TestHandleReaderLoopResponse(t *testing.T) { frameCount: 5, byteCount: 500, }, - expectedFrameID: 10, - expectedOffset: 500, + expectedFrameID: 10, + expectedPosition: 500, }, "read second half of first reading segment": { segments: diskQueueSegments{ reading: []*queueSegment{ - {id: 1, endOffset: 1000}, + {id: 1, byteCount: 1000}, }, - nextReadFrameID: 5, - nextReadOffset: 500, + nextReadFrameID: 5, + nextReadPosition: 500, }, response: readerLoopResponse{ frameCount: 5, byteCount: 500, }, - expectedFrameID: 10, - expectedOffset: 0, - expectedACKingSegment: segmentIDRef(1), + expectedFrameID: 10, + expectedPosition: 1000, }, "read of first reading segment aborted by error": { segments: diskQueueSegments{ reading: []*queueSegment{ - {id: 1, endOffset: 1000}, + {id: 1, byteCount: 1000}, }, nextReadFrameID: 5, }, @@ -368,14 +370,13 @@ func TestHandleReaderLoopResponse(t *testing.T) { byteCount: 100, err: fmt.Errorf("something bad happened"), }, - expectedFrameID: 6, - expectedOffset: 0, - expectedACKingSegment: segmentIDRef(1), + expectedFrameID: 6, + expectedPosition: 1000, }, "completely read first writing segment": { segments: diskQueueSegments{ writing: []*queueSegment{ - {id: 1, endOffset: 1000}, + {id: 1, byteCount: 1000}, }, nextReadFrameID: 5, }, @@ -383,13 +384,13 @@ func TestHandleReaderLoopResponse(t *testing.T) { frameCount: 10, byteCount: 1000, }, - expectedFrameID: 15, - expectedOffset: 1000, + expectedFrameID: 15, + expectedPosition: 1000, }, "read first half of first writing segment": { segments: diskQueueSegments{ writing: []*queueSegment{ - {id: 1, endOffset: 1000}, + {id: 1, byteCount: 1000}, }, nextReadFrameID: 5, }, @@ -397,28 +398,28 @@ func TestHandleReaderLoopResponse(t *testing.T) { frameCount: 5, byteCount: 500, }, - expectedFrameID: 10, - expectedOffset: 500, + expectedFrameID: 10, + expectedPosition: 500, }, "read second half of first writing segment": { segments: diskQueueSegments{ writing: []*queueSegment{ - {id: 1, endOffset: 1000}, + {id: 1, byteCount: 1000}, }, - nextReadOffset: 500, - nextReadFrameID: 5, + nextReadPosition: 500, + nextReadFrameID: 5, }, response: readerLoopResponse{ frameCount: 5, byteCount: 500, }, - expectedFrameID: 10, - expectedOffset: 1000, + expectedFrameID: 10, + expectedPosition: 1000, }, - "error reading a writing segments skips remaining data": { + "error reading a writing segment skips remaining data": { segments: diskQueueSegments{ writing: []*queueSegment{ - {id: 1, endOffset: 1000}, + {id: 1, byteCount: 1000}, }, nextReadFrameID: 5, }, @@ -427,8 +428,8 @@ func TestHandleReaderLoopResponse(t *testing.T) { byteCount: 100, err: fmt.Errorf("something bad happened"), }, - expectedFrameID: 6, - expectedOffset: 1000, + expectedFrameID: 6, + expectedPosition: 1000, }, } @@ -444,21 +445,9 @@ func TestHandleReaderLoopResponse(t *testing.T) { t.Errorf("%s: expected nextReadFrameID = %d, got %d", description, test.expectedFrameID, dq.segments.nextReadFrameID) } - if dq.segments.nextReadOffset != test.expectedOffset { - t.Errorf("%s: expected nextReadOffset = %d, got %d", - description, test.expectedOffset, dq.segments.nextReadOffset) - } - if test.expectedACKingSegment != nil { - if len(dq.segments.acking) == 0 { - t.Errorf("%s: expected acking segment %d, got none", - description, *test.expectedACKingSegment) - } else if dq.segments.acking[0].id != *test.expectedACKingSegment { - t.Errorf("%s: expected acking segment %d, got %d", - description, *test.expectedACKingSegment, dq.segments.acking[0].id) - } - } else if len(dq.segments.acking) != 0 { - t.Errorf("%s: expected no acking segment, got %v", - description, *dq.segments.acking[0]) + if dq.segments.nextReadPosition != test.expectedPosition { + t.Errorf("%s: expected nextReadPosition = %d, got %d", + description, test.expectedPosition, dq.segments.nextReadPosition) } } } @@ -466,16 +455,16 @@ func TestHandleReaderLoopResponse(t *testing.T) { func TestMaybeReadPending(t *testing.T) { // maybeReadPending should: // - If diskQueue.reading is true, do nothing and return immediately. - // - If any unread data is available in a reading or writing segment, - // send a readerLoopRequest for the full amount available in the - // first such segment, and set diskQueue.reading to true. - // - When creating a readerLoopRequest that includes the beginning of - // a segment (startOffset == 0), set that segment's firstFrameID - // to segments.nextReadFrameID (so ACKs based on frame ID can be linked - // back to the segment that generated them). - // - If the first reading segment has already been completely read (which - // can happen if it was read while still in the writing list), move it to - // the acking list and set segments.nextReadOffset to 0. + // - If the first reading segment has already been completely read, + // move it to the acking list and set segments.nextReadPosition to 0. + // - If nextReadPosition is / becomes 0, and a segment is available to + // read, set that segment's firstFrameID to segments.nextReadFrameID + // (so ACKs based on frame ID can be linked + // back to the segment that generated them), and set nextReadPosition + // to the end of the segment header. + // - If there is unread data in the next available segment, + // send a readerLoopRequest for the full amount and set + // diskQueue.reading to true. testCases := map[string]struct { // The segment structure to start with before calling maybeReadPending @@ -491,7 +480,7 @@ func TestMaybeReadPending(t *testing.T) { "read one full segment": { segments: diskQueueSegments{ reading: []*queueSegment{ - {id: 1, endOffset: 1000}, + {id: 1, byteCount: 1000}, }, // The next read request should start with frame 5 nextReadFrameID: 5, @@ -499,14 +488,16 @@ func TestMaybeReadPending(t *testing.T) { expectedRequest: &readerLoopRequest{ segment: &queueSegment{id: 1}, startFrameID: 5, - startOffset: 0, - endOffset: 1000, + // startPosition is 8, the end of the segment header in the + // current file schema. + startPosition: 8, + endPosition: 1000, }, }, "do nothing if reading flag is set": { segments: diskQueueSegments{ reading: []*queueSegment{ - {id: 1, endOffset: 1000}, + {id: 1, byteCount: 1000}, }, }, reading: true, @@ -515,35 +506,35 @@ func TestMaybeReadPending(t *testing.T) { "read the end of a segment": { segments: diskQueueSegments{ reading: []*queueSegment{ - {id: 1, endOffset: 1000}, + {id: 1, byteCount: 1000}, }, // The next read request should start with frame 5 nextReadFrameID: 5, // Start reading at position 500 - nextReadOffset: 500, + nextReadPosition: 500, }, expectedRequest: &readerLoopRequest{ segment: &queueSegment{id: 1}, startFrameID: 5, - // Should be reading from nextReadOffset (500) to the end of + // Should be reading from nextReadPosition (500) to the end of // the segment (1000). - startOffset: 500, - endOffset: 1000, + startPosition: 500, + endPosition: 1000, }, }, "ignore writing segments if reading is available": { segments: diskQueueSegments{ reading: []*queueSegment{ - {id: 1, endOffset: 1000}, + {id: 1, byteCount: 1000}, }, writing: []*queueSegment{ - {id: 2, endOffset: 1000}, + {id: 2, byteCount: 1000}, }, }, expectedRequest: &readerLoopRequest{ - segment: &queueSegment{id: 1}, - startOffset: 0, - endOffset: 1000, + segment: &queueSegment{id: 1}, + startPosition: 8, + endPosition: 1000, }, }, "do nothing if no segments are available": { @@ -553,50 +544,69 @@ func TestMaybeReadPending(t *testing.T) { "read the writing segment if no reading segments are available": { segments: diskQueueSegments{ writing: []*queueSegment{ - {id: 2, endOffset: 1000}, + {id: 2, byteCount: 1000}, }, - nextReadOffset: 500, + nextReadPosition: 500, }, expectedRequest: &readerLoopRequest{ - segment: &queueSegment{id: 2}, - startOffset: 500, - endOffset: 1000, + segment: &queueSegment{id: 2}, + startPosition: 500, + endPosition: 1000, }, }, "do nothing if the writing segment has already been fully read": { segments: diskQueueSegments{ writing: []*queueSegment{ - {id: 2, endOffset: 1000}, + {id: 2, byteCount: 1000}, }, - nextReadOffset: 1000, + nextReadPosition: 1000, }, expectedRequest: nil, }, "skip the first reading segment if it's already been fully read": { segments: diskQueueSegments{ reading: []*queueSegment{ - {id: 1, endOffset: 1000}, - {id: 2, endOffset: 500}, + {id: 1, byteCount: 1000}, + {id: 2, byteCount: 500}, }, - nextReadOffset: 1000, + nextReadPosition: 1000, }, expectedRequest: &readerLoopRequest{ - segment: &queueSegment{id: 2}, - startOffset: 0, - endOffset: 500, + segment: &queueSegment{id: 2}, + startPosition: 8, + endPosition: 500, }, expectedACKingSegment: segmentIDRef(1), }, "move empty reading segment to the acking list if it's the only one": { segments: diskQueueSegments{ reading: []*queueSegment{ - {id: 1, endOffset: 1000}, + {id: 1, byteCount: 1000}, }, - nextReadOffset: 1000, + nextReadPosition: 1000, }, expectedRequest: nil, expectedACKingSegment: segmentIDRef(1), }, + "reading the beginning of an old segment file uses the right header size": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + { + id: 1, + byteCount: 1000, + schemaVersion: makeUint32Ptr(0)}, + }, + // The next read request should start with frame 5 + nextReadFrameID: 5, + }, + expectedRequest: &readerLoopRequest{ + segment: &queueSegment{id: 1}, + startFrameID: 5, + // The header size for schema version 0 was 4 bytes. + startPosition: 4, + endPosition: 1000, + }, + }, } for description, test := range testCases { @@ -621,7 +631,7 @@ func TestMaybeReadPending(t *testing.T) { t.Errorf("%s: expected request %v, got %v", description, *test.expectedRequest, request) } - if request.startOffset == 0 && + if request.startPosition == 0 && request.segment.firstFrameID != firstFrameID { t.Errorf( "%s: maybeReadPending should update firstFrameID", description) @@ -644,10 +654,6 @@ func TestMaybeReadPending(t *testing.T) { t.Errorf("%s: expected acking segment %v, got %v", description, *test.expectedACKingSegment, dq.segments.acking[0].id) } - if dq.segments.nextReadOffset != 0 { - t.Errorf("%s: expected read offset 0 after acking segment, got %v", - description, dq.segments.nextReadOffset) - } } else if len(dq.segments.acking) != 0 { t.Errorf("%s: expected no acking segment, got %v", description, *dq.segments.acking[0]) @@ -963,12 +969,16 @@ func makeWriteFrameWithSize(size int) *writeFrame { return &writeFrame{serialized: make([]byte, size-frameMetadataSize)} } +func makeUint32Ptr(value uint32) *uint32 { + return &value +} + func segmentWithSize(size int) *queueSegment { if size < segmentHeaderSize { // Can't have a segment smaller than the segment header return nil } - return &queueSegment{endOffset: segmentOffset(size - segmentHeaderSize)} + return &queueSegment{byteCount: uint64(size)} } func equalReaderLoopRequests( @@ -976,8 +986,8 @@ func equalReaderLoopRequests( ) bool { // We compare segment ids rather than segment pointers because it's // awkward to include the same pointer repeatedly in the test definition. - return r0.startOffset == r1.startOffset && - r0.endOffset == r1.endOffset && + return r0.startPosition == r1.startPosition && + r0.endPosition == r1.endPosition && r0.segment.id == r1.segment.id && r0.startFrameID == r1.startFrameID } diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go index aaa897458766..7ac0e8ef3ae2 100644 --- a/libbeat/publisher/queue/diskqueue/queue.go +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -137,6 +137,15 @@ func NewQueue(logger *logp.Logger, settings Settings) (queue.Queue, error) { // warning and fall back on the oldest existing segment, if any. logger.Warnf("Couldn't load most recent queue position: %v", err) } + if nextReadPosition.frameIndex == 0 { + // If the previous state was written by an older version, it may lack + // the frameIndex field. In this case we reset the read offset within + // the segment, which may cause one-time retransmission of some events + // from a previous version, but ensures that our metrics are consistent. + // In the more common case that frameIndex is 0 because this segment + // simply hasn't been read yet, setting byteIndex to 0 is a no-op. + nextReadPosition.byteIndex = 0 + } positionFile, err := os.OpenFile( settings.stateFilePath(), os.O_WRONLY|os.O_CREATE, 0600) if err != nil { @@ -150,7 +159,8 @@ func NewQueue(logger *logp.Logger, settings Settings) (queue.Queue, error) { } // Index any existing data segments to be placed in segments.reading. - initialSegments, err := scanExistingSegments(settings.directoryPath()) + initialSegments, err := + scanExistingSegments(logger, settings.directoryPath()) if err != nil { return nil, err } @@ -177,14 +187,29 @@ func NewQueue(logger *logp.Logger, settings Settings) (queue.Queue, error) { nextReadPosition = queuePosition{segmentID: initialSegments[0].id} } + // We can compute the active frames right now but still need a way to report + // them to the global beat metrics. For now, just log the total. + // Note that for consistency with existing queue behavior, this excludes + // events that are still present on disk but were already sent and + // acknowledged on a previous run (we probably want to track these as well + // in the future.) + // TODO: pass in a context that queues can use to report these events. + activeFrameCount := 0 + for _, segment := range initialSegments { + activeFrameCount += int(segment.frameCount) + } + activeFrameCount -= int(nextReadPosition.frameIndex) + logger.Infof("Found %d existing events on queue start", activeFrameCount) + queue := &diskQueue{ logger: logger, settings: settings, segments: diskQueueSegments{ - reading: initialSegments, - nextID: nextSegmentID, - nextReadOffset: nextReadPosition.offset, + reading: initialSegments, + acked: ackedSegments, + nextID: nextSegmentID, + nextReadPosition: nextReadPosition.byteIndex, }, acks: newDiskQueueACKs(logger, nextReadPosition, positionFile), diff --git a/libbeat/publisher/queue/diskqueue/reader_loop.go b/libbeat/publisher/queue/diskqueue/reader_loop.go index 5b30f03e81d2..a31cc5b8a777 100644 --- a/libbeat/publisher/queue/diskqueue/reader_loop.go +++ b/libbeat/publisher/queue/diskqueue/reader_loop.go @@ -23,11 +23,13 @@ import ( "os" ) +// startPosition and endPosition are absolute byte offsets into the segment +// file on disk, and must point to frame boundaries. type readerLoopRequest struct { - segment *queueSegment - startOffset segmentOffset - startFrameID frameID - endOffset segmentOffset + segment *queueSegment + startPosition uint64 + startFrameID frameID + endPosition uint64 } type readerLoopResponse struct { @@ -102,13 +104,14 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon return readerLoopResponse{err: err} } defer handle.Close() - _, err = handle.Seek( - segmentHeaderSize+int64(request.startOffset), os.SEEK_SET) + // getReader positions us at the start of the data region, so we use + // a relative seek to advance to the request position. + _, err = handle.Seek(int64(request.startPosition), os.SEEK_CUR) if err != nil { return readerLoopResponse{err: err} } - targetLength := uint64(request.endOffset - request.startOffset) + targetLength := uint64(request.endPosition - request.startPosition) for { remainingLength := targetLength - byteCount diff --git a/libbeat/publisher/queue/diskqueue/segments.go b/libbeat/publisher/queue/diskqueue/segments.go index 617b089110ed..f666a1941c8a 100644 --- a/libbeat/publisher/queue/diskqueue/segments.go +++ b/libbeat/publisher/queue/diskqueue/segments.go @@ -20,11 +20,15 @@ package diskqueue import ( "encoding/binary" "fmt" + "io" "io/ioutil" "os" + "path" "sort" "strconv" "strings" + + "github.com/elastic/beats/v7/libbeat/logp" ) // diskQueueSegments encapsulates segment-related queue metadata. @@ -60,42 +64,39 @@ type diskQueueSegments struct { // to the next queueSegment we create. nextID segmentID - // nextWriteOffset is the segment offset at which the next new frame - // should be written. This offset always applies to the last entry of - // writing[]. This is distinct from the endOffset field within a segment: - // endOffset tracks how much data _has_ been written to a segment, while - // nextWriteOffset also includes everything that is _scheduled_ to be - // written. - nextWriteOffset segmentOffset + // writingSegmentSize tracks the expected on-disk size of the current write + // segment after all scheduled frames have finished writing. This is used in + // diskQueue.enqueueWriteFrame to detect when to roll over to a new segment. + writingSegmentSize uint64 // nextReadFrameID is the first frame ID in the current or pending // read request. nextReadFrameID frameID - // nextReadOffset is the segment offset corresponding to the frame - // nextReadFrameID. This offset always applies to the first reading - // segment: either reading[0], or writing[0] if reading is empty. - nextReadOffset segmentOffset + // nextReadPosition is the next absolute byte offset on disk that should be + // read from the current read segment. The current read segment is either + // reading[0], or writing[0] if the reading list is empty. + nextReadPosition uint64 } // segmentID is a unique persistent integer id assigned to each created // segment in ascending order. type segmentID uint64 -// segmentOffset is a byte index into the segment's data region. -// An offset of 0 means the first byte after the segment file header. -type segmentOffset uint64 - // The metadata for a single segment file. type queueSegment struct { // A segment id is globally unique within its originating queue. id segmentID - // The byte offset of the end of the segment's data region. This is - // updated when the segment is written to, and should always correspond - // to the end of a complete data frame. The total size of a segment file - // on disk is segmentHeaderSize + segment.endOffset. - endOffset segmentOffset + // If this segment was loaded from a previous session, schemaVersion + // points to the file schema version that was read from its header. + // This is only used by queueSegment.headerSize(), which is used in + // maybeReadPending to calculate the position of the first data frame. + schemaVersion *uint32 + + // The number of bytes occupied by this segment on-disk, as of the most + // recent completed writerLoop request. + byteCount uint64 // The ID of the first frame that was / will be read from this segment. // This field is only valid after a read request has been sent for @@ -103,6 +104,11 @@ type queueSegment struct { // which can only happen after reading has begun on the segment.) firstFrameID frameID + // The number of frames contained in this segment on disk, as of the + // most recent completed writerLoop request (this does not include + // segments which are merely scheduled to be written). + frameCount uint32 + // The number of frames read from this segment during this session. This // does not necessarily equal the number of frames in the segment, even // after reading is complete, since the segment may have been partially @@ -113,11 +119,24 @@ type queueSegment struct { } type segmentHeader struct { + // The schema version for this segment file. Current schema version is 1. version uint32 + + // If the segment file has been completely written, this field contains + // the number of data frames, which is used to track the number of + // pending events left in the queue from previous sessions. + // If the segment file has not been completely written, this field is zero. + // Only present in schema version >= 1. + frameCount uint32 } -// Segment headers are currently just a 32-bit version. -const segmentHeaderSize = 4 +const currentSegmentVersion = 1 + +// Segment headers are currently a 4-byte version plus a 4-byte frame count. +// In contexts where the segment may have been created by an earlier version, +// instead use (queueSegment).headerSize() which accounts for the schema +// version of the target segment. +const segmentHeaderSize = 8 // Sort order: we store loaded segments in ascending order by their id. type bySegmentID []*queueSegment @@ -128,30 +147,39 @@ func (s bySegmentID) Less(i, j int) bool { return s[i].id < s[j].id } // Scan the given path for segment files, and return them in a list // ordered by segment id. -func scanExistingSegments(path string) ([]*queueSegment, error) { - files, err := ioutil.ReadDir(path) +func scanExistingSegments(logger *logp.Logger, pathStr string) ([]*queueSegment, error) { + files, err := ioutil.ReadDir(pathStr) if err != nil { - return nil, fmt.Errorf("Couldn't read queue directory '%s': %w", path, err) + return nil, fmt.Errorf("couldn't read queue directory '%s': %w", pathStr, err) } segments := []*queueSegment{} for _, file := range files { - if file.Size() <= segmentHeaderSize { - // Ignore segments that don't have at least some data beyond the - // header (this will always be true of segments we write unless there - // is an error). - continue - } components := strings.Split(file.Name(), ".") if len(components) == 2 && strings.ToLower(components[1]) == "seg" { // Parse the id as base-10 64-bit unsigned int. We ignore file names that // don't match the "[uint64].seg" pattern. if id, err := strconv.ParseUint(components[0], 10, 64); err == nil { - segments = append(segments, - &queueSegment{ - id: segmentID(id), - endOffset: segmentOffset(file.Size() - segmentHeaderSize), - }) + fullPath := path.Join(pathStr, file.Name()) + header, err := readSegmentHeaderWithFrameCount(fullPath) + if header == nil { + logger.Errorf("couldn't load segment file '%v': %v", fullPath, err) + continue + } + // If we get an error but still got a valid header back, then we + // were able to read at least some frames, so we keep this segment + // but issue a warning. + if err != nil { + logger.Warnf( + "error loading segment file '%v', data may be incomplete: %v", + fullPath, err) + } + segments = append(segments, &queueSegment{ + id: segmentID(id), + schemaVersion: &header.version, + frameCount: header.frameCount, + byteCount: uint64(file.Size()), + }) } } } @@ -159,11 +187,19 @@ func scanExistingSegments(path string) ([]*queueSegment, error) { return segments, nil } -func (segment *queueSegment) sizeOnDisk() uint64 { - return uint64(segment.endOffset) + segmentHeaderSize +// headerSize returns the logical size ("logical" because it may not have +// been written to disk yet) of this segment file's header region. The +// segment's first data frame begins immediately after the header. +func (segment *queueSegment) headerSize() uint64 { + if segment.schemaVersion != nil && *segment.schemaVersion < 1 { + // Schema 0 had nothing except the 4-byte version. + return 4 + } + return segmentHeaderSize } -// Should only be called from the reader loop. +// Should only be called from the reader loop. If successful, returns an open +// file handle positioned at the beginning of the segment's data region. func (segment *queueSegment) getReader( queueSettings Settings, ) (*os.File, error) { @@ -173,8 +209,8 @@ func (segment *queueSegment) getReader( return nil, fmt.Errorf( "Couldn't open segment %d: %w", segment.id, err) } - // Right now there is only one valid header (indicating schema version - // zero) so we don't need the value itself. + // We don't need the header contents here, we just want to advance past the + // header region, so discard the return value. _, err = readSegmentHeader(file) if err != nil { file.Close() @@ -193,8 +229,7 @@ func (segment *queueSegment) getWriter( if err != nil { return nil, err } - header := &segmentHeader{version: 0} - err = writeSegmentHeader(file, header) + err = writeSegmentHeader(file, 0) if err != nil { return nil, fmt.Errorf("Couldn't write segment header: %w", err) } @@ -222,20 +257,116 @@ func (segment *queueSegment) getWriterWithRetry( return file, err } -func readSegmentHeader(in *os.File) (*segmentHeader, error) { +// readSegmentHeaderWithFrameCount reads the header from the beginning +// of the file at the given path. If the header's frameCount is 0 +// (whether because it is from an old version or because the segment +// file was not closed cleanly), it attempts to calculate it manually +// by scanning the file, and returns a struct with the "correct" +// frame count. +func readSegmentHeaderWithFrameCount(path string) (*segmentHeader, error) { + file, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf( + "couldn't open segment file '%s': %w", path, err) + } + defer file.Close() + // Wrap the handle to retry non-fatal errors and always return the full + // requested data length if possible, then read the raw header. + reader := autoRetryReader{file} + header, err := readSegmentHeader(reader) + if err != nil { + return nil, err + } + // If the header has a positive frame count then there is + // no more work to do, so return immediately. + if header.frameCount > 0 { + return header, nil + } + // If we made it here, we loaded a valid header but the frame count is + // zero, so we need to check it with a manual scan. This can + // only happen in one of two uncommon situations: + // - The segment was created by an old version that didn't track frame count + // - The segment file was not closed cleanly during the previous session + // and still has the placeholder value of 0. + // In either case, the right thing to do is to scan the file + // and fill in the frame count manually. + for { + var frameLength uint32 + err = binary.Read(reader, binary.LittleEndian, &frameLength) + if err != nil { + // EOF at a frame boundary means we successfully scanned all frames. + if err == io.EOF && header.frameCount > 0 { + return header, nil + } + // All other errors mean we are done scanning, exit the loop. + break + } + // Length is encoded in both the first and last four bytes of a frame. To + // detect truncated / corrupted frames, seek to the last four bytes of + // the current frame to make sure the trailing length matches before + // advancing to the next frame (otherwise we might accept an impossible + // length). + _, err = file.Seek(int64(frameLength-8), os.SEEK_CUR) + if err != nil { + break + } + var duplicateLength uint32 + err = binary.Read(reader, binary.LittleEndian, &duplicateLength) + if err != nil { + break + } + if frameLength != duplicateLength { + err = fmt.Errorf( + "mismatched frame length: %v vs %v", frameLength, duplicateLength) + break + } + + header.frameCount++ + } + // If we ended up here instead of returning directly, then + // we encountered an error. We still return a valid header as + // long as we successfully scanned at least one frame first. + if header.frameCount > 0 { + return header, err + } + return nil, err +} + +// readSegmentHeader decodes a raw header from the given reader and +// returns it as a struct. +func readSegmentHeader(in io.Reader) (*segmentHeader, error) { header := &segmentHeader{} err := binary.Read(in, binary.LittleEndian, &header.version) if err != nil { return nil, err } - if header.version != 0 { + if header.version > currentSegmentVersion { return nil, fmt.Errorf("Unrecognized schema version %d", header.version) } + if header.version >= 1 { + err = binary.Read(in, binary.LittleEndian, &header.frameCount) + if err != nil { + return nil, err + } + } return header, nil } -func writeSegmentHeader(out *os.File, header *segmentHeader) error { - err := binary.Write(out, binary.LittleEndian, header.version) +// writeSegmentHeader seeks to the beginning of the given file handle and +// writes a segment header with the current schema version, containing the +// given frameCount. +func writeSegmentHeader(out *os.File, frameCount uint32) error { + _, err := out.Seek(0, io.SeekStart) + if err != nil { + return err + } + + version := uint32(currentSegmentVersion) + err = binary.Write(out, binary.LittleEndian, version) + if err != nil { + return err + } + err = binary.Write(out, binary.LittleEndian, frameCount) return err } @@ -244,16 +375,16 @@ func writeSegmentHeader(out *os.File, header *segmentHeader) error { func (segments *diskQueueSegments) sizeOnDisk() uint64 { total := uint64(0) for _, segment := range segments.writing { - total += segment.sizeOnDisk() + total += segment.byteCount } for _, segment := range segments.reading { - total += segment.sizeOnDisk() + total += segment.byteCount } for _, segment := range segments.acking { - total += segment.sizeOnDisk() + total += segment.byteCount } for _, segment := range segments.acked { - total += segment.sizeOnDisk() + total += segment.byteCount } return total } diff --git a/libbeat/publisher/queue/diskqueue/serialize.go b/libbeat/publisher/queue/diskqueue/serialize.go index 9db8e7b1bd9f..456f180665b0 100644 --- a/libbeat/publisher/queue/diskqueue/serialize.go +++ b/libbeat/publisher/queue/diskqueue/serialize.go @@ -51,11 +51,6 @@ type entry struct { Fields common.MapStr } -const ( - // If - flagGuaranteed uint8 = 1 << 0 -) - func newEventEncoder() *eventEncoder { e := &eventEncoder{} e.reset() diff --git a/libbeat/publisher/queue/diskqueue/state_file.go b/libbeat/publisher/queue/diskqueue/state_file.go index d8cbb5690ac2..9247598eaeb5 100644 --- a/libbeat/publisher/queue/diskqueue/state_file.go +++ b/libbeat/publisher/queue/diskqueue/state_file.go @@ -24,6 +24,8 @@ import ( "os" ) +const currentStateFileVersion = 1 + // Given an open file handle to the queue state, decode the current position // and return the result if successful, otherwise an error. func queuePositionFromHandle( @@ -40,7 +42,7 @@ func queuePositionFromHandle( if err != nil { return queuePosition{}, err } - if version != 0 { + if version > currentStateFileVersion { return queuePosition{}, fmt.Errorf("Unsupported queue metadata version (%d)", version) } @@ -52,11 +54,19 @@ func queuePositionFromHandle( } err = binary.Read( - reader, binary.LittleEndian, &position.offset) + reader, binary.LittleEndian, &position.byteIndex) if err != nil { return queuePosition{}, err } + // frameIndex field was added in schema version 1 + if version >= 1 { + err = binary.Read( + reader, binary.LittleEndian, &position.frameIndex) + if err != nil { + return queuePosition{}, err + } + } return position, nil } @@ -82,7 +92,8 @@ func writeQueuePositionToHandle( } // Want to write: version (0), segment id, segment offset. - err = binary.Write(file, binary.LittleEndian, uint32(0)) + err = binary.Write( + file, binary.LittleEndian, uint32(currentStateFileVersion)) if err != nil { return err } @@ -90,6 +101,10 @@ func writeQueuePositionToHandle( if err != nil { return err } - err = binary.Write(file, binary.LittleEndian, position.offset) + err = binary.Write(file, binary.LittleEndian, position.byteIndex) + if err != nil { + return err + } + err = binary.Write(file, binary.LittleEndian, position.frameIndex) return err } diff --git a/libbeat/publisher/queue/diskqueue/writer_loop.go b/libbeat/publisher/queue/diskqueue/writer_loop.go index ff1ff97616a6..ce7bed6b2b06 100644 --- a/libbeat/publisher/queue/diskqueue/writer_loop.go +++ b/libbeat/publisher/queue/diskqueue/writer_loop.go @@ -49,12 +49,19 @@ type writerLoopRequest struct { frames []segmentedFrame } +// A writerLoopResponseSegment specifies the number of frames and bytes +// written to a single segment as a result of a writerLoopRequest. +type writerLoopResponseSegment struct { + framesWritten uint32 + bytesWritten uint64 +} + // A writerLoopResponse reports the number of bytes written to each // segment in the request. There is guaranteed to be one entry for each // segment that appeared in the request, in the same order. If there is // more than one entry, then all but the last segment have been closed. type writerLoopResponse struct { - bytesWritten []int64 + segments []writerLoopResponseSegment } type writerLoop struct { @@ -100,20 +107,28 @@ func newWriterLoop(logger *logp.Logger, settings Settings) *writerLoop { func (wl *writerLoop) run() { for { - block, ok := <-wl.requestChan + request, ok := <-wl.requestChan if !ok { - // The request channel is closed, we are done + // The request channel is closed, we are done. If there is an active + // segment file, finalize its frame count and close it. + if wl.outputFile != nil { + writeSegmentHeader(wl.outputFile, wl.currentSegment.frameCount) + wl.outputFile.Sync() + wl.outputFile.Close() + wl.outputFile = nil + } return } - bytesWritten := wl.processRequest(block) - wl.responseChan <- writerLoopResponse{bytesWritten: bytesWritten} + wl.responseChan <- wl.processRequest(request) } } // processRequest writes the frames in the given request to disk and returns // the number of bytes written to each segment, in the order they were // encountered. -func (wl *writerLoop) processRequest(request writerLoopRequest) []int64 { +func (wl *writerLoop) processRequest( + request writerLoopRequest, +) writerLoopResponse { // retryWriter wraps the file handle with timed retries. // retryWriter.Write is guaranteed to return only if the write // completely succeeded or the queue is being closed. @@ -126,8 +141,11 @@ func (wl *writerLoop) processRequest(request writerLoopRequest) []int64 { totalACKCount := 0 producerACKCounts := make(map[*diskQueueProducer]int) - var bytesWritten []int64 // Bytes written to all segments. - curBytesWritten := int64(0) // Bytes written to the current segment. + // responseEntry tracks the number of frames and bytes written to the + // current segment. + var curSegment writerLoopResponseSegment + // response + var response writerLoopResponse outerLoop: for _, frameRequest := range request.frames { // If the new segment doesn't match the last one, we need to open a new @@ -136,14 +154,17 @@ outerLoop: wl.logger.Debugf( "Creating new segment file with id %v\n", frameRequest.segment.id) if wl.outputFile != nil { - // Try to sync to disk, then close the file. + // Update the header with the frame count (including the ones we + // just wrote), try to sync to disk, then close the file. + writeSegmentHeader(wl.outputFile, + wl.currentSegment.frameCount+curSegment.framesWritten) wl.outputFile.Sync() wl.outputFile.Close() wl.outputFile = nil - // We are done with this segment, add the byte count to the list and - // reset the current counter. - bytesWritten = append(bytesWritten, curBytesWritten) - curBytesWritten = 0 + // We are done with this segment, add the totals to the response and + // reset the current counters. + response.segments = append(response.segments, curSegment) + curSegment = writerLoopResponseSegment{} } wl.currentSegment = frameRequest.segment file, err := wl.currentSegment.getWriterWithRetry( @@ -183,11 +204,12 @@ outerLoop: if err != nil { break } - // Update the byte count as the last step: that way if we abort while - // a frame is partially written, we only report up to the last - // complete frame. (This almost never matters, but it allows for + // Update the frame and byte count as the last step: that way if we + // abort while a frame is partially written, we only report up to the + // last complete frame. (This almost never matters, but it allows for // more controlled recovery after a bad shutdown.) - curBytesWritten += int64(frameSize) + curSegment.framesWritten++ + curSegment.bytesWritten += uint64(frameSize) // Update the ACKs that will be sent at the end of the request. totalACKCount++ @@ -215,8 +237,9 @@ outerLoop: producer.config.ACK(ackCount) } - // Return the total byte counts, including the final segment. - return append(bytesWritten, curBytesWritten) + // Add the final segment to the response and return it. + response.segments = append(response.segments, curSegment) + return response } func (wl *writerLoop) applyRetryBackoff() {