Skip to content

Commit

Permalink
fix(core): Making blocks reclaimable for chunks that are dropped for …
Browse files Browse the repository at this point in the history
…cassandra
  • Loading branch information
vishramachandran committed Jan 24, 2018
1 parent 2133da8 commit 4a502ed
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
2 changes: 1 addition & 1 deletion core/src/main/resources/filodb-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ filodb {
retry-interval = 10s
retry-interval-max-jitter = 10s

ingestion-consistency-level = "QUORUM" // TODO change to ONE in later PR
ingestion-consistency-level = "ONE"
}

spark {
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -255,21 +255,21 @@ class TimeSeriesShard(dataset: Dataset, config: Config, val shardNum: Int, sink:
case Seq(Success, Success) => commitCheckpoint(dataset.ref, shardNum, flushGroup)
case Seq(er: ErrorResponse, _) => Future.successful(er)
case Seq(_, er: ErrorResponse) => Future.successful(er)
}.map {
case Success => blockHolder.markUsedBlocksReclaimable()
shardStats.flushesSuccessful.increment
Success
case other: Any => other
}.map { case resp =>
blockHolder.markUsedBlocksReclaimable()
resp
}.recover { case e =>
logger.error("Should have not reached this point. Possible programming error", e)
logger.error("Internal Error - should have not reached this state", e)
DataDropped
}
Task.fromFuture(taskFuture)
}

private def commitCheckpoint(ref: DatasetRef, shardNum: Int, flushGroup: FlushGroup): Future[Response] = {
val fut = metastore.writeCheckpoint(ref, shardNum, flushGroup.groupNum,
flushGroup.flushWatermark).recover { case e =>
val fut = metastore.writeCheckpoint(ref, shardNum, flushGroup.groupNum, flushGroup.flushWatermark).map { r =>
shardStats.flushesSuccessful.increment
r
}.recover { case e =>
logger.error("Critical! Checkpoint persistence skipped", e)
shardStats.flushesFailedOther.increment
// skip the checkpoint write
Expand Down

0 comments on commit 4a502ed

Please sign in to comment.