Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Fix deadlock when write queue full #1569

Merged
merged 4 commits into from
Dec 12, 2019
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions mdata/aggmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/grafana/metrictank/cluster"
Expand Down Expand Up @@ -43,7 +44,6 @@ type AggMetric struct {
ingestFromT0 uint32
ttl uint32
lastSaveStart uint32 // last chunk T0 that was added to the write Queue.
lastSaveFinish uint32 // last chunk T0 successfully written to Cassandra.
lastWrite uint32 // wall clock time of when last point was successfully added (possibly to the ROB)
firstTs uint32 // timestamp of first point seen
}
Expand Down Expand Up @@ -92,14 +92,11 @@ func NewAggMetric(store Store, cachePusher cache.CachePusher, key schema.AMKey,
// Sync the saved state of a chunk by its T0.
func (a *AggMetric) SyncChunkSaveState(ts uint32, sendPersist bool) ChunkSaveCallback {
return func() {
a.Lock()
if ts > a.lastSaveFinish {
a.lastSaveFinish = ts
lastSaveStart := atomic.LoadUint32(&a.lastSaveStart)
if ts > lastSaveStart {
atomic.StoreUint32(&a.lastSaveStart, ts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

race condition!
I think you need something like

        newVal := ts
	prev := atomic.SwapUint32(&a.lastSaveStart, newVal)
	for prev > newVal {
		newVal = prev
		prev = atomic.SwapUint32(&a.lastSaveStart, newVal)
	}

(i took this from idx/memory.bumpLastUpdate())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, i'll update that

}
if ts > a.lastSaveStart {
a.lastSaveStart = ts
}
a.Unlock()

log.Debugf("AM: metric %s at chunk T0=%d has been saved.", a.key, ts)
if sendPersist {
SendPersistMessage(a.key.String(), ts)
Expand Down Expand Up @@ -360,7 +357,8 @@ func (a *AggMetric) persist(pos int) {
chunk := a.chunks[pos]
pre := time.Now()

if a.lastSaveStart >= chunk.Series.T0 {
lastSaveStart := atomic.LoadUint32(&a.lastSaveStart)
if lastSaveStart >= chunk.Series.T0 {
// this can happen if
// a) there are 2 primary MT nodes both saving chunks to Cassandra
// b) a primary failed and this node was promoted to be primary but metric consuming is lagging.
Expand Down Expand Up @@ -391,7 +389,7 @@ func (a *AggMetric) persist(pos int) {
previousPos += len(a.chunks)
}
previousChunk := a.chunks[previousPos]
for (previousChunk.Series.T0 < chunk.Series.T0) && (a.lastSaveStart < previousChunk.Series.T0) {
for (previousChunk.Series.T0 < chunk.Series.T0) && (lastSaveStart < previousChunk.Series.T0) {
log.Debugf("AM: persist(): old chunk needs saving. Adding %s:%d to writeQueue", a.key, previousChunk.Series.T0)
cwr := NewChunkWriteRequest(
a.SyncChunkSaveState(previousChunk.Series.T0, true),
Expand All @@ -410,7 +408,7 @@ func (a *AggMetric) persist(pos int) {
}

// Every chunk with a T0 <= this chunks' T0 is now either saved, or in the writeQueue.
a.lastSaveStart = chunk.Series.T0
atomic.StoreUint32(&a.lastSaveStart, chunk.Series.T0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to my comment in SyncChunkSaveState, we need to account for others setting higher values using a swap loop.


log.Debugf("AM: persist(): sending %d chunks to write queue", len(pending))

Expand Down Expand Up @@ -492,8 +490,7 @@ func (a *AggMetric) add(ts uint32, val float64) {
log.Debugf("AM: %s Add(): created first chunk with first point: %v", a.key, a.chunks[0])
a.lastWrite = uint32(time.Now().Unix())
if a.dropFirstChunk {
a.lastSaveStart = t0
a.lastSaveFinish = t0
atomic.StoreUint32(&a.lastSaveStart, t0)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. need swap loop to account for chunksaves coming in concurrently.

a.addAggregators(ts, val)
return
Expand Down