Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Fix deadlock when write queue full #1569

Merged
merged 4 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions idx/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/schema"
"github.com/grafana/metrictank/stats"
"github.com/grafana/metrictank/util"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -334,11 +335,7 @@ func (m *UnpartitionedMemoryIdx) Stop() {
// * by the time we look at the previous value and try to restore it, someone else may have updated it to a higher value
// all these scenarios are unlikely but we should accommodate them anyway.
func bumpLastUpdate(loc *int64, newVal int64) {
prev := atomic.SwapInt64(loc, newVal)
for prev > newVal {
newVal = prev
prev = atomic.SwapInt64(loc, newVal)
}
util.AtomicBumpInt64(loc, newVal)
}

// updates the partition and lastUpdate ts in an archive. Returns the previously set partition
Expand Down
23 changes: 9 additions & 14 deletions mdata/aggmetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/grafana/metrictank/cluster"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/grafana/metrictank/mdata/chunk"
mdataerrors "github.com/grafana/metrictank/mdata/errors"
"github.com/grafana/metrictank/schema"
"github.com/grafana/metrictank/util"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -43,7 +45,6 @@ type AggMetric struct {
ingestFromT0 uint32
ttl uint32
lastSaveStart uint32 // last chunk T0 that was added to the write Queue.
lastSaveFinish uint32 // last chunk T0 successfully written to Cassandra.
lastWrite uint32 // wall clock time of when last point was successfully added (possibly to the ROB)
firstTs uint32 // timestamp of first point seen
}
Expand Down Expand Up @@ -92,14 +93,8 @@ func NewAggMetric(store Store, cachePusher cache.CachePusher, key schema.AMKey,
// Sync the saved state of a chunk by its T0.
func (a *AggMetric) SyncChunkSaveState(ts uint32, sendPersist bool) ChunkSaveCallback {
return func() {
a.Lock()
if ts > a.lastSaveFinish {
a.lastSaveFinish = ts
}
if ts > a.lastSaveStart {
a.lastSaveStart = ts
}
a.Unlock()
util.AtomicBumpUint32(&a.lastSaveStart, ts)

log.Debugf("AM: metric %s at chunk T0=%d has been saved.", a.key, ts)
if sendPersist {
SendPersistMessage(a.key.String(), ts)
Expand Down Expand Up @@ -360,7 +355,8 @@ func (a *AggMetric) persist(pos int) {
chunk := a.chunks[pos]
pre := time.Now()

if a.lastSaveStart >= chunk.Series.T0 {
lastSaveStart := atomic.LoadUint32(&a.lastSaveStart)
if lastSaveStart >= chunk.Series.T0 {
// this can happen if
// a) there are 2 primary MT nodes both saving chunks to Cassandra
// b) a primary failed and this node was promoted to be primary but metric consuming is lagging.
Expand Down Expand Up @@ -391,7 +387,7 @@ func (a *AggMetric) persist(pos int) {
previousPos += len(a.chunks)
}
previousChunk := a.chunks[previousPos]
for (previousChunk.Series.T0 < chunk.Series.T0) && (a.lastSaveStart < previousChunk.Series.T0) {
for (previousChunk.Series.T0 < chunk.Series.T0) && (lastSaveStart < previousChunk.Series.T0) {
log.Debugf("AM: persist(): old chunk needs saving. Adding %s:%d to writeQueue", a.key, previousChunk.Series.T0)
cwr := NewChunkWriteRequest(
a.SyncChunkSaveState(previousChunk.Series.T0, true),
Expand All @@ -410,7 +406,7 @@ func (a *AggMetric) persist(pos int) {
}

// Every chunk with a T0 <= this chunks' T0 is now either saved, or in the writeQueue.
a.lastSaveStart = chunk.Series.T0
util.AtomicBumpUint32(&a.lastSaveStart, chunk.Series.T0)

log.Debugf("AM: persist(): sending %d chunks to write queue", len(pending))

Expand Down Expand Up @@ -492,8 +488,7 @@ func (a *AggMetric) add(ts uint32, val float64) {
log.Debugf("AM: %s Add(): created first chunk with first point: %v", a.key, a.chunks[0])
a.lastWrite = uint32(time.Now().Unix())
if a.dropFirstChunk {
a.lastSaveStart = t0
a.lastSaveFinish = t0
util.AtomicBumpUint32(&a.lastSaveStart, t0)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. need swap loop to account for chunksaves coming in concurrently.

a.addAggregators(ts, val)
return
Expand Down
25 changes: 25 additions & 0 deletions util/atomic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package util

import (
"sync/atomic"
)

// AtomicBumpInt64 assures the given address stores the highest int64 between the current value,
// the value provided and the value provided by any other concurrently executing call
func AtomicBumpInt64(loc *int64, newVal int64) {
prev := atomic.SwapInt64(loc, newVal)
for prev > newVal {
newVal = prev
prev = atomic.SwapInt64(loc, newVal)
}
}

// AtomicBumpUint32 assures the given address stores the highest uint32 between the current value,
// the value provided and the value provided by any other concurrently executing call
func AtomicBumpUint32(loc *uint32, newVal uint32) {
prev := atomic.SwapUint32(loc, newVal)
for prev > newVal {
newVal = prev
prev = atomic.SwapUint32(loc, newVal)
}
}