Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

calculate TTL relative to now when inserting into cassandra #1448

Merged
merged 5 commits into from
Aug 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions mdata/chunk/spans.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,22 @@ func init() {
RevChunkSpans[v] = SpanCode(k)
}
}

// SpanOfChunk takes a chunk and tries to determine its span.
// It returns 0 if it failed to determine the span, this could fail
// either because the given chunk is invalid or because it has an old format
func ExtractChunkSpan(chunk []byte) uint32 {
if len(chunk) < 2 {
return 0
}

if Format(chunk[0]) != FormatStandardGoTszWithSpan && Format(chunk[0]) != FormatGoTszLongWithSpan {
return 0
}

if int(chunk[1]) >= len(ChunkSpans) {
return 0
}

return ChunkSpans[SpanCode(chunk[1])]
}
22 changes: 21 additions & 1 deletion store/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,26 @@ func (c *CassandraStore) insertChunk(key string, t0, ttl uint32, data []byte) er
return nil
}

span := chunk.ExtractChunkSpan(data)
if span == 0 {
span = mdata.MaxChunkSpan()
}

// we calculate ttl like this:
// - the chunk's t0 plus its span is the ts of last possible datapoint in the chunk
// - the timestamp of the last datapoint + ttl is the timestamp until when we want to keep this chunk
// - then we subtract the current time stamp to get the difference relative to now
// - the result is the ttl in seconds relative to now
relativeTtl := int64(t0+span+ttl) - time.Now().Unix()

// if the ttl relative to now is <=0 then we can omit the insert
if relativeTtl <= 0 {
if log.IsLevelEnabled(log.DebugLevel) {
log.Debugf("Omitting insert of chunk with ttl %d: %s, %d", relativeTtl, key, t0)
}
return nil
}

table, ok := c.TTLTables[ttl]
if !ok {
return errTableNotFound
Expand All @@ -363,7 +383,7 @@ func (c *CassandraStore) insertChunk(key string, t0, ttl uint32, data []byte) er
row_key := fmt.Sprintf("%s_%d", key, t0/Month_sec) // "month number" based on unix timestamp (rounded down)
pre := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
ret := c.Session.Query(table.QueryWrite, row_key, t0, data, ttl).WithContext(ctx).Exec()
ret := c.Session.Query(table.QueryWrite, row_key, t0, data, uint32(relativeTtl)).WithContext(ctx).Exec()
cancel()
cassPutExecDuration.Value(time.Now().Sub(pre))
return ret
Expand Down