-
Notifications
You must be signed in to change notification settings - Fork 5k
[libbeat] Disk queue implementation #21176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 90 commits
e76a41b
1d8bf65
e8c8128
97a7ed5
f5ad9a2
a78b85c
b12020c
67540d6
94f125c
f29a96f
7ce01f9
a04980e
26b4248
f30f30b
c312d69
1a40b06
ce65718
22ae148
3f5f8fe
61fa5d7
0191dc6
3bf35ff
50bd450
04c9b60
7a2e09a
132ba8e
e73f55f
6d2ca31
988cef6
7774dc4
7146525
1001565
bb56b8f
a99e869
89da2b2
6fa9d33
e77dfa6
d2a65dc
b93fe83
3a3bfcd
cb1107d
3020962
df31c0e
4a22ccd
d4e1dcd
0e12495
56fe5ba
eaf6e2b
5f0376d
d63cbf0
98e61f0
e48c815
1575640
d495aa8
dd08e2c
8db0acf
26d226d
442a513
1a1742e
5ca039d
fdd4be6
f7a446d
79a81de
f88481c
8b61f5b
8c32477
0a6f9f4
91bc3b2
aea4cad
a8ca56a
6d91ab5
767aabb
0bf668e
857077c
d4cd0bb
91ce56f
f9dbfb1
93d3fff
0ac493c
3af1421
333cb76
0dfffe1
43c4aaa
0b33c4b
7b2f4ba
a626cec
53da09a
565a99f
008fbe7
b5ec926
11e55a8
ff04bb5
cc49879
2baf1d4
30cf56c
c619d6a
5b41351
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,146 @@ | ||
| // Licensed to Elasticsearch B.V. under one or more contributor | ||
| // license agreements. See the NOTICE file distributed with | ||
| // this work for additional information regarding copyright | ||
| // ownership. Elasticsearch B.V. licenses this file to you under | ||
| // the Apache License, Version 2.0 (the "License"); you may | ||
| // not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| package diskqueue | ||
|
|
||
| import ( | ||
| "os" | ||
| "sync" | ||
|
|
||
| "github.com/elastic/beats/v7/libbeat/logp" | ||
| ) | ||
|
|
||
| // queuePosition represents a logical position within the queue buffer. | ||
| type queuePosition struct { | ||
| segmentID segmentID | ||
| offset segmentOffset | ||
| } | ||
|
|
||
| type diskQueueACKs struct { | ||
| logger *logp.Logger | ||
|
|
||
| // This lock must be held to access diskQueueACKs fields (except for | ||
| // diskQueueACKs.done, which is always safe). | ||
| lock sync.Mutex | ||
|
|
||
| // The id and position of the first unacknowledged frame. | ||
| nextFrameID frameID | ||
| nextPosition queuePosition | ||
|
|
||
| // If a frame has been ACKed, then frameSize[frameID] contains its size on | ||
| // disk. The size is used to track the queuePosition of the oldest | ||
| // remaining frame, which is written to disk as ACKs are received. (We do | ||
| // this to avoid duplicating events if the beat terminates without a clean | ||
| // shutdown.) | ||
| frameSize map[frameID]uint64 | ||
|
|
||
| // segmentBoundaries maps the first frameID of each segment to its | ||
| // corresponding segment ID. | ||
| segmentBoundaries map[frameID]segmentID | ||
|
|
||
| // When a segment has been completely acknowledged by a consumer, it sends | ||
| // the segment ID to this channel, where it is read by the core loop and | ||
| // scheduled for deletion. | ||
| segmentACKChan chan segmentID | ||
|
|
||
| // An open writable file handle to the file that stores the queue position. | ||
| // This position is advanced as we receive ACKs, confirming it is safe | ||
| // to move forward, so the acking code is responsible for updating this | ||
| // file. | ||
| positionFile *os.File | ||
|
|
||
| // When the queue is closed, diskQueueACKs.done is closed to signal that | ||
| // the core loop will not accept any more acked segments and any future | ||
| // ACKs should be ignored. | ||
| done chan struct{} | ||
| } | ||
|
|
||
| func newDiskQueueACKs( | ||
| logger *logp.Logger, position queuePosition, positionFile *os.File, | ||
| ) *diskQueueACKs { | ||
| return &diskQueueACKs{ | ||
| logger: logger, | ||
| nextFrameID: 0, | ||
| nextPosition: position, | ||
| frameSize: make(map[frameID]uint64), | ||
| segmentBoundaries: make(map[frameID]segmentID), | ||
| segmentACKChan: make(chan segmentID), | ||
| positionFile: positionFile, | ||
| done: make(chan struct{}), | ||
| } | ||
| } | ||
|
|
||
| func (dqa *diskQueueACKs) addFrames(frames []*readFrame) { | ||
| dqa.lock.Lock() | ||
| defer dqa.lock.Unlock() | ||
| select { | ||
| case <-dqa.done: | ||
| // We are already done and should ignore any leftover ACKs we receive. | ||
| return | ||
| default: | ||
| } | ||
| for _, frame := range frames { | ||
| segment := frame.segment | ||
| if frame.id != 0 && frame.id == segment.firstFrameID { | ||
| // This is the first frame in its segment, mark it so we know when | ||
| // we're starting a new segment. | ||
| // | ||
| // Subtlety: we don't count the very first frame as a "boundary" even | ||
| // though it is the first frame we read from its segment. This prevents | ||
| // us from resetting our segment offset to zero, in case the initial | ||
| // offset was restored from a previous session instead of starting at | ||
| // the beginning of the first file. | ||
| dqa.segmentBoundaries[frame.id] = segment.id | ||
| } | ||
| dqa.frameSize[frame.id] = frame.bytesOnDisk | ||
| } | ||
| oldSegmentID := dqa.nextPosition.segmentID | ||
| if dqa.frameSize[dqa.nextFrameID] != 0 { | ||
| for ; dqa.frameSize[dqa.nextFrameID] != 0; dqa.nextFrameID++ { | ||
| newSegment, ok := dqa.segmentBoundaries[dqa.nextFrameID] | ||
| if ok { | ||
| // This is the start of a new segment. Remove this frame from the | ||
| // segment boundary list and set the position to the start of the | ||
| // new segment. | ||
| delete(dqa.segmentBoundaries, dqa.nextFrameID) | ||
| dqa.nextPosition = queuePosition{ | ||
| segmentID: newSegment, | ||
| offset: 0, | ||
| } | ||
| } | ||
| dqa.nextPosition.offset += segmentOffset(dqa.frameSize[dqa.nextFrameID]) | ||
| delete(dqa.frameSize, dqa.nextFrameID) | ||
| } | ||
| // We advanced the ACK position at least somewhat, so write its | ||
| // new value. | ||
| err := writeQueuePositionToHandle(dqa.positionFile, dqa.nextPosition) | ||
| if err != nil { | ||
| // TODO: Don't spam this warning on every ACK if it's a permanent error. | ||
| dqa.logger.Warnf("Couldn't save queue position: %v", err) | ||
| } | ||
| } | ||
| if oldSegmentID != dqa.nextPosition.segmentID { | ||
| // We crossed at least one segment boundary, inform the listener that | ||
| // everything before the current segment has been acknowledged (but bail | ||
| // out if our done channel has been closed, since that means there is no | ||
| // listener on the other end.) | ||
| select { | ||
| case dqa.segmentACKChan <- dqa.nextPosition.segmentID - 1: | ||
| case <-dqa.done: | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| // Licensed to Elasticsearch B.V. under one or more contributor | ||
| // license agreements. See the NOTICE file distributed with | ||
| // this work for additional information regarding copyright | ||
| // ownership. Elasticsearch B.V. licenses this file to you under | ||
| // the Apache License, Version 2.0 (the "License"); you may | ||
| // not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| package diskqueue | ||
|
|
||
| import ( | ||
| "encoding/binary" | ||
| "hash/crc32" | ||
| ) | ||
|
|
||
| // Computes the checksum that should be written / read in a frame footer | ||
| // based on the raw content of that frame (excluding header / footer). | ||
| func computeChecksum(data []byte) uint32 { | ||
| hash := crc32.NewIEEE() | ||
| frameLength := uint32(len(data) + frameMetadataSize) | ||
| binary.Write(hash, binary.LittleEndian, &frameLength) | ||
| hash.Write(data) | ||
| return hash.Sum32() | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,160 @@ | ||
| // Licensed to Elasticsearch B.V. under one or more contributor | ||
| // license agreements. See the NOTICE file distributed with | ||
| // this work for additional information regarding copyright | ||
| // ownership. Elasticsearch B.V. licenses this file to you under | ||
| // the Apache License, Version 2.0 (the "License"); you may | ||
| // not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| package diskqueue | ||
|
|
||
| import ( | ||
| "errors" | ||
| "fmt" | ||
| "path/filepath" | ||
|
|
||
| "github.com/elastic/beats/v7/libbeat/common" | ||
| "github.com/elastic/beats/v7/libbeat/common/cfgtype" | ||
| "github.com/elastic/beats/v7/libbeat/paths" | ||
| "github.com/elastic/beats/v7/libbeat/publisher/queue" | ||
| ) | ||
|
|
||
| // Settings contains the configuration fields to create a new disk queue | ||
| // or open an existing one. | ||
| type Settings struct { | ||
| // The path on disk of the queue's containing directory, which will be | ||
| // created if it doesn't exist. Within the directory, the queue's state | ||
| // is stored in state.dat and each segment's data is stored in | ||
| // {segmentIndex}.seg | ||
| // If blank, the default directory is "diskqueue" within the beat's data | ||
| // directory. | ||
| Path string | ||
|
|
||
| // MaxBufferSize is the maximum number of bytes that the queue should | ||
| // ever occupy on disk. A value of 0 means the queue can grow until the | ||
| // disk is full (this is not recommended on a primary system disk). | ||
| MaxBufferSize uint64 | ||
|
|
||
| // MaxSegmentSize is the maximum number of bytes that should be written | ||
| // to a single segment file before creating a new one. | ||
| MaxSegmentSize uint64 | ||
|
|
||
| // How many events will be read from disk while waiting for a consumer | ||
| // request. | ||
| ReadAheadLimit int | ||
|
|
||
| // How many events will be queued in memory waiting to be written to disk. | ||
| // This setting should rarely matter in practice, but if data is coming | ||
| // in faster than it can be written to disk for an extended period, | ||
| // this limit can keep it from overflowing memory. | ||
| WriteAheadLimit int | ||
|
|
||
| // A listener that should be sent ACKs when an event is successfully | ||
| // written to disk. | ||
| WriteToDiskListener queue.ACKListener | ||
| } | ||
|
|
||
| // userConfig holds the parameters for a disk queue that are configurable | ||
| // by the end user in the beats yml file. | ||
| type userConfig struct { | ||
| Path string `config:"path"` | ||
| MaxSize cfgtype.ByteSize `config:"max_size" validate:"required"` | ||
| SegmentSize *cfgtype.ByteSize `config:"segment_size"` | ||
| ReadAheadLimit *int `config:"read_ahead"` | ||
| WriteAheadLimit *int `config:"write_ahead"` | ||
| } | ||
|
|
||
| func (c *userConfig) Validate() error { | ||
| // If the segment size is explicitly specified, the total queue size must | ||
| // be at least twice as large. | ||
| if c.SegmentSize != nil && c.MaxSize != 0 && c.MaxSize < *c.SegmentSize*2 { | ||
| return errors.New( | ||
| "Disk queue max_size must be at least twice as big as segment_size") | ||
| } | ||
|
|
||
| // We require a total queue size of at least 10MB, and a segment size of | ||
| // at least 1MB. The queue can support lower thresholds, but it will perform | ||
| // terribly, so we give an explicit error in that case. | ||
| // These bounds are still extremely low for Beats ingestion, but if all you | ||
| // need is for a low-volume stream on a tiny device to persist between | ||
| // restarts, it will work fine. | ||
| if c.MaxSize != 0 && c.MaxSize < 10*1000*1000 { | ||
| return errors.New( | ||
| "Disk queue max_size cannot be less than 10MB") | ||
| } | ||
| if c.SegmentSize != nil && *c.SegmentSize < 1000*1000 { | ||
| return errors.New( | ||
| "Disk queue segment_size cannot be less than 1MB") | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // DefaultSettings returns a Settings object with reasonable default values | ||
| // for all important fields. | ||
| func DefaultSettings() Settings { | ||
| return Settings{ | ||
| MaxSegmentSize: 100 * (1 << 20), // 100MiB | ||
| MaxBufferSize: (1 << 30), // 1GiB | ||
|
|
||
| ReadAheadLimit: 256, | ||
| WriteAheadLimit: 1024, | ||
| } | ||
| } | ||
|
|
||
| // SettingsForUserConfig returns a Settings struct initialized with the | ||
| // end-user-configurable settings in the given config tree. | ||
| func SettingsForUserConfig(config *common.Config) (Settings, error) { | ||
| userConfig := userConfig{} | ||
| if err := config.Unpack(&userConfig); err != nil { | ||
| return Settings{}, err | ||
|
||
| } | ||
| settings := DefaultSettings() | ||
| settings.Path = userConfig.Path | ||
|
|
||
| settings.MaxBufferSize = uint64(userConfig.MaxSize) | ||
| if userConfig.SegmentSize != nil { | ||
| settings.MaxSegmentSize = uint64(*userConfig.SegmentSize) | ||
| } else { | ||
| // If no value is specified, default segment size is total queue size | ||
| // divided by 10. | ||
| settings.MaxSegmentSize = uint64(userConfig.MaxSize) / 10 | ||
| } | ||
|
|
||
| return settings, nil | ||
| } | ||
|
|
||
| // | ||
| // bookkeeping helpers | ||
| // | ||
|
|
||
| func (settings Settings) directoryPath() string { | ||
| if settings.Path == "" { | ||
| return paths.Resolve(paths.Data, "diskqueue") | ||
| } | ||
|
|
||
| return settings.Path | ||
| } | ||
|
|
||
| func (settings Settings) stateFilePath() string { | ||
| return filepath.Join(settings.directoryPath(), "state.dat") | ||
| } | ||
|
|
||
| func (settings Settings) segmentPath(segmentID segmentID) string { | ||
| return filepath.Join( | ||
| settings.directoryPath(), | ||
| fmt.Sprintf("%v.seg", segmentID)) | ||
| } | ||
|
|
||
| func (settings Settings) maxSegmentOffset() segmentOffset { | ||
| return segmentOffset(settings.MaxSegmentSize - segmentHeaderSize) | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.