diff --git a/frontend/src/lib/components/charts/PercentileLineChart.svelte b/frontend/src/lib/components/charts/PercentileLineChart.svelte index efa2c7353e..704ecb4563 100644 --- a/frontend/src/lib/components/charts/PercentileLineChart.svelte +++ b/frontend/src/lib/components/charts/PercentileLineChart.svelte @@ -24,9 +24,9 @@ x="date" y="value" series={[ - { label: 'p95', color: '#4B92FF', index: 19 }, - { label: 'p50', color: '#7DFFB3', index: 10 }, - { label: 'p5', color: '#FDDD61', index: 1 } + { label: 'p95', color: '#4B92FF', index: 2 }, + { label: 'p50', color: '#7DFFB3', index: 1 }, + { label: 'p5', color: '#FDDD61', index: 0 } ].map((c) => { const timestamps = data.timestamps ?? []; const values = data.percentiles?.[c.index] ?? []; diff --git a/online/src/main/scala/ai/chronon/online/stats/DriftStore.scala b/online/src/main/scala/ai/chronon/online/stats/DriftStore.scala index 3758525675..60bdcaee6a 100644 --- a/online/src/main/scala/ai/chronon/online/stats/DriftStore.scala +++ b/online/src/main/scala/ai/chronon/online/stats/DriftStore.scala @@ -26,6 +26,7 @@ import scala.concurrent.Future import scala.util.Failure import scala.util.Success import scala.util.Try +import java.util.ArrayList class DriftStore(kvStore: KVStore, summaryDataset: String = Constants.TiledSummaryDataset, @@ -59,9 +60,30 @@ class DriftStore(kvStore: KVStore, } private case class SummaryRequestContext(request: GetRequest, tileKey: TileKey, groupName: String) + private case class SummaryResponseContext(summaries: Array[(TileSummary, Long)], tileKey: TileKey, groupName: String) case class TileSummaryInfo(key: TileSeriesKey, summaries: Array[(TileSummary, Long)]) { + def percentileToIndex(percentile: String): Int = { + // Convert "p5" to 5, "p95" to 95, etc. + val value = percentile.stripPrefix("p").toInt + // Convert percentile to index (20 total percentiles, from p0 to p100 in steps of 5) + value / 5 + } + + def filterPercentiles(summary: TileSummary, + requestedPercentiles: Seq[String] = Seq("p5", "p50", "p95")): TileSummary = { + val filtered = new TileSummary(summary) + if (summary.getPercentiles != null) { + val filteredPercentiles = new java.util.ArrayList[java.lang.Double]() + // Convert percentile strings to indices + val indices = requestedPercentiles.map(percentileToIndex) + indices.foreach(i => filteredPercentiles.add(summary.getPercentiles.get(i))) + filtered.setPercentiles(filteredPercentiles) + } + filtered + } + def toDriftSeries(driftMetric: DriftMetric, lookBack: Window, startMs: Long): TileDriftSeries = { val driftsArray = TileDriftCalculator.toTileDrifts(summaries, driftMetric, startMs, lookBack) val result = PivotUtils.pivot(driftsArray) @@ -69,7 +91,11 @@ class DriftStore(kvStore: KVStore, } def toSeries: TileSummarySeries = { - val result = PivotUtils.pivot(summaries) + // Filter percentiles before pivoting + val filteredSummaries = summaries.map { case (summary, timestamp) => + (filterPercentiles(summary), timestamp) + } + val result = PivotUtils.pivot(filteredSummaries) result.setKey(key) } } diff --git a/online/src/main/scala/ai/chronon/online/stats/PivotUtils.scala b/online/src/main/scala/ai/chronon/online/stats/PivotUtils.scala index eaf4e9fcf7..5f6b82ccab 100644 --- a/online/src/main/scala/ai/chronon/online/stats/PivotUtils.scala +++ b/online/src/main/scala/ai/chronon/online/stats/PivotUtils.scala @@ -45,7 +45,7 @@ object PivotUtils { while (seriesIndex < seriesLength) { val list = lists(seriesIndex) val value: T = if (list == null) { - null.asInstanceOf[T] + Constants.magicNullDouble.asInstanceOf[T] } else { val v = list.get(pctIndex) if (v == null || (v.isInstanceOf[Double] && v.asInstanceOf[Double].isNaN)) { diff --git a/spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala b/spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala index 8b85820d0d..210396aa26 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala @@ -8,8 +8,7 @@ import ai.chronon.api.Extensions.WindowOps import ai.chronon.api.PartitionSpec import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api.Window -import ai.chronon.observability.DriftMetric -import ai.chronon.observability.TileSummarySeries +import ai.chronon.observability.{DriftMetric, TileSummary, TileSummarySeries} import ai.chronon.online.KVStore import ai.chronon.online.stats.DriftStore import ai.chronon.spark.SparkSessionBuilder @@ -29,6 +28,7 @@ import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.util.Success +import scala.collection.JavaConverters._ class DriftTest extends AnyFlatSpec with Matchers { @@ -144,7 +144,7 @@ class DriftTest extends AnyFlatSpec with Matchers { |""".stripMargin) summaryTotals should be > 0 - summaryNulls.toDouble / summaryTotals.toDouble should be < 0.1 + summaryNulls.toDouble / summaryTotals.toDouble should be < 0.2 logger.info("Summary series fetched successfully".green) val startTs = 1673308800000L @@ -218,7 +218,7 @@ class DriftTest extends AnyFlatSpec with Matchers { if (isNumeric) { val percentileSeriesPerBreak = summarySeries.percentiles.toScala val timeStamps = summarySeries.timestamps.toScala - val breaks = DriftStore.breaks(20) + val breaks = Seq("p5", "p50", "p95") percentileSeriesPerBreak.zip(breaks).flatMap { case (percentileSeries, break) => percentileSeries.toScala.zip(timeStamps).map { case (value, ts) => TimeSeriesPoint(value, ts, Some(break)) } } @@ -231,4 +231,52 @@ class DriftTest extends AnyFlatSpec with Matchers { } } } + + "percentileToIndex" should "correctly convert percentile strings to indices" in { + val info = new DriftStore(null).TileSummaryInfo(null, null) + + info.percentileToIndex("p0") shouldBe 0 + info.percentileToIndex("p5") shouldBe 1 + info.percentileToIndex("p50") shouldBe 10 + info.percentileToIndex("p95") shouldBe 19 + info.percentileToIndex("p100") shouldBe 20 + } + + it should "throw NumberFormatException for invalid input" in { + val info = new DriftStore(null).TileSummaryInfo(null, null) + + an[NumberFormatException] should be thrownBy info.percentileToIndex("invalid") + an[NumberFormatException] should be thrownBy info.percentileToIndex("p") + an[NumberFormatException] should be thrownBy info.percentileToIndex("px5") + } + + "filterPercentiles" should "correctly filter default percentiles" in { + val info = new DriftStore(null).TileSummaryInfo(null, null) + + val summary = new TileSummary() + summary.setPercentiles((0 to 100 by 5).map(_.toDouble).map(Double.box).asJava) + + val filtered = info.filterPercentiles(summary) + filtered.getPercentiles.asScala should contain theSameElementsInOrderAs Seq(5.0, 50.0, 95.0).map(Double.box) + } + + "filterPercentiles" should "correctly filter specified percentiles" in { + val info = new DriftStore(null).TileSummaryInfo(null, null) + + val summary = new TileSummary() + summary.setPercentiles((0 to 100 by 5).map(_.toDouble).map(Double.box).asJava) + + val filtered = info.filterPercentiles(summary, Seq("p10", "p55", "p75")) + filtered.getPercentiles.asScala should contain theSameElementsInOrderAs Seq(10.0, 55.0, 75.0).map(Double.box) + } + + it should "handle null percentiles" in { + val info = new DriftStore(null).TileSummaryInfo(null, null) + + val summary = new TileSummary() + summary.setPercentiles(null) + + val filtered = info.filterPercentiles(summary) + filtered.getPercentiles should be(null) + } }