Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions frontend/src/lib/components/charts/PercentileLineChart.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
x="date"
y="value"
series={[
{ label: 'p95', color: '#4B92FF', index: 19 },
{ label: 'p50', color: '#7DFFB3', index: 10 },
{ label: 'p5', color: '#FDDD61', index: 1 }
{ label: 'p95', color: '#4B92FF', index: 2 },
{ label: 'p50', color: '#7DFFB3', index: 1 },
{ label: 'p5', color: '#FDDD61', index: 0 }
].map((c) => {
const timestamps = data.timestamps ?? [];
const values = data.percentiles?.[c.index] ?? [];
Expand Down
18 changes: 17 additions & 1 deletion online/src/main/scala/ai/chronon/online/stats/DriftStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import java.util.ArrayList

class DriftStore(kvStore: KVStore,
summaryDataset: String = Constants.TiledSummaryDataset,
Expand Down Expand Up @@ -62,14 +63,29 @@ class DriftStore(kvStore: KVStore,
private case class SummaryResponseContext(summaries: Array[(TileSummary, Long)], tileKey: TileKey, groupName: String)

case class TileSummaryInfo(key: TileSeriesKey, summaries: Array[(TileSummary, Long)]) {
private def filterPercentiles(summary: TileSummary): TileSummary = {
val filtered = new TileSummary(summary)
if (summary.getPercentiles != null) {
val filteredPercentiles = new java.util.ArrayList[java.lang.Double]()
// Keep only 5%, 50%, 95% percentiles (indices 1, 10, 19)
Seq(1, 10, 19).foreach(i => filteredPercentiles.add(summary.getPercentiles.get(i)))
filtered.setPercentiles(filteredPercentiles)
}
filtered
}

def toDriftSeries(driftMetric: DriftMetric, lookBack: Window, startMs: Long): TileDriftSeries = {
val driftsArray = TileDriftCalculator.toTileDrifts(summaries, driftMetric, startMs, lookBack)
val result = PivotUtils.pivot(driftsArray)
result.setKey(key)
}

def toSeries: TileSummarySeries = {
val result = PivotUtils.pivot(summaries)
// Filter percentiles before pivoting
val filteredSummaries = summaries.map { case (summary, timestamp) =>
(filterPercentiles(summary), timestamp)
}
val result = PivotUtils.pivot(filteredSummaries)
result.setKey(key)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object PivotUtils {
while (seriesIndex < seriesLength) {
val list = lists(seriesIndex)
val value: T = if (list == null) {
null.asInstanceOf[T]
Constants.magicNullDouble.asInstanceOf[T]
} else {
val v = list.get(pctIndex)
if (v == null || (v.isInstanceOf[Double] && v.asInstanceOf[Double].isNaN)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class DriftTest extends AnyFlatSpec with Matchers {
|""".stripMargin)

summaryTotals should be > 0
summaryNulls.toDouble / summaryTotals.toDouble should be < 0.1
summaryNulls.toDouble / summaryTotals.toDouble should be < 0.2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has become flaky since the pr - can we revert or forward fix?

logger.info("Summary series fetched successfully".green)

val startTs = 1673308800000L
Expand Down Expand Up @@ -218,7 +218,7 @@ class DriftTest extends AnyFlatSpec with Matchers {
if (isNumeric) {
val percentileSeriesPerBreak = summarySeries.percentiles.toScala
val timeStamps = summarySeries.timestamps.toScala
val breaks = DriftStore.breaks(20)
val breaks = Seq("5%", "50%", "95%")
percentileSeriesPerBreak.zip(breaks).flatMap { case (percentileSeries, break) =>
percentileSeries.toScala.zip(timeStamps).map { case (value, ts) => TimeSeriesPoint(value, ts, Some(break)) }
}
Expand Down