Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

add writeQueue buffer to memoryIdx #1365

Merged
merged 4 commits into from
Jun 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docker/docker-chaos/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,12 @@ find-cache-invalidate-max-size = 100
find-cache-invalidate-max-wait = 5s
# amount of time to disable the findCache when the invalidate queue fills up.
find-cache-backoff-time = 60s
# enable buffering new metricDefinitions and writing them to the index in batches
write-queue-enabled = false
# maximum delay between flushing buffered metric writes to the index
write-queue-delay = 30s
# maximum number of metricDefinitions that can be added to the index in a single batch
write-max-batch-size = 5000

### Bigtable index
[bigtable-idx]
Expand Down
6 changes: 6 additions & 0 deletions docker/docker-cluster/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,12 @@ find-cache-invalidate-max-size = 100
find-cache-invalidate-max-wait = 5s
# amount of time to disable the findCache when the invalidate queue fills up.
find-cache-backoff-time = 60s
# enable buffering new metricDefinitions and writing them to the index in batches
write-queue-enabled = false
# maximum delay between flushing buffered metric writes to the index
write-queue-delay = 30s
# maximum number of metricDefinitions that can be added to the index in a single batch
write-max-batch-size = 5000

### Bigtable index
[bigtable-idx]
Expand Down
6 changes: 6 additions & 0 deletions docker/docker-dev-custom-cfg-kafka/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,12 @@ find-cache-invalidate-max-size = 100
find-cache-invalidate-max-wait = 5s
# amount of time to disable the findCache when the invalidate queue fills up.
find-cache-backoff-time = 60s
# enable buffering new metricDefinitions and writing them to the index in batches
write-queue-enabled = false
# maximum delay between flushing buffered metric writes to the index
write-queue-delay = 30s
# maximum number of metricDefinitions that can be added to the index in a single batch
write-max-batch-size = 5000

### Bigtable index
[bigtable-idx]
Expand Down
6 changes: 6 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,12 @@ find-cache-invalidate-max-size = 100
find-cache-invalidate-max-wait = 5s
# amount of time to disable the findCache when the invalidate queue fills up.
find-cache-backoff-time = 60s
# enable buffering new metricDefinitions and writing them to the index in batches
write-queue-enabled = false
# maximum delay between flushing buffered metric writes to the index
write-queue-delay = 30s
# maximum number of metricDefinitions that can be added to the index in a single batch
write-max-batch-size = 5000
```

### Bigtable index
Expand Down
4 changes: 2 additions & 2 deletions idx/bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (b *BigtableIdx) updateBigtable(now uint32, inMemory bool, archive idx.Arch
log.Debugf("bigtable-idx: updating def %s in index.", archive.MetricDefinition.Id)
b.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}
archive.LastSave = now
b.MemoryIndex.UpdateArchive(archive)
b.MemoryIndex.UpdateArchiveLastSave(archive.Id, archive.Partition, now)
} else {
// perform a non-blocking write to the writeQueue. If the queue is full, then
// this will fail and we won't update the LastSave timestamp. The next time
Expand All @@ -297,7 +297,7 @@ func (b *BigtableIdx) updateBigtable(now uint32, inMemory bool, archive idx.Arch
select {
case b.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}:
archive.LastSave = now
b.MemoryIndex.UpdateArchive(archive)
b.MemoryIndex.UpdateArchiveLastSave(archive.Id, archive.Partition, now)
default:
statSaveSkipped.Inc()
log.Debugf("bigtable-idx: writeQueue is full, update of %s not saved this time", archive.MetricDefinition.Id)
Expand Down
4 changes: 2 additions & 2 deletions idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
log.Debugf("cassandra-idx: updating def %s in index.", archive.MetricDefinition.Id)
c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}
archive.LastSave = now
c.MemoryIndex.UpdateArchive(archive)
c.MemoryIndex.UpdateArchiveLastSave(archive.Id, archive.Partition, now)
} else {
// perform a non-blocking write to the writeQueue. If the queue is full, then
// this will fail and we won't update the LastSave timestamp. The next time
Expand All @@ -331,7 +331,7 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
select {
case c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}:
archive.LastSave = now
c.MemoryIndex.UpdateArchive(archive)
c.MemoryIndex.UpdateArchiveLastSave(archive.Id, archive.Partition, now)
default:
statSaveSkipped.Inc()
log.Debugf("cassandra-idx: writeQueue is full, update of %s not saved this time.", archive.MetricDefinition.Id)
Expand Down
2 changes: 1 addition & 1 deletion idx/cassandra/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestAddToWriteQueue(t *testing.T) {

archive, _ := ix.Get(mkey)
archive.LastSave = uint32(time.Now().Unix() - 100)
ix.UpdateArchive(archive)
ix.UpdateArchiveLastSave(archive.Id, archive.Partition, archive.LastSave)
}
for _, s := range metrics {
mkey, err := schema.MKeyFromString(s.Id)
Expand Down
Loading