-
Notifications
You must be signed in to change notification settings - Fork 8
Refactor flink job and add support to run chained groupBys #1247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis pull request introduces a major refactoring of the Flink streaming pipeline architecture, establishing a base abstraction layer ( Changes
Sequence Diagram(s)sequenceDiagram
participant Env as StreamExecutionEnvironment
participant CGB as ChainedGroupByJob
participant JEA as JoinEnrichmentAsyncFunction
participant JSQF as JoinSourceQueryFunction
participant Tile as Tiling & Aggregation
participant KVStore as AsyncKVStoreWriter
Env->>CGB: runTiledGroupByJob(env)
CGB->>CGB: read ProjectedEvent stream (watermarking)
CGB->>JEA: async enrich via join
JEA->>JEA: fetch join data (Fetcher.fetchJoin)
JEA-->>CGB: enriched ProjectedEvent
CGB->>JSQF: optional: apply join query transform
JSQF->>JSQF: execute SQL on enriched event
JSQF-->>CGB: transformed ProjectedEvent
CGB->>Tile: tumbling window (event-time)
Tile->>Tile: aggregate rows (FlinkRowAggProcessFunction)
Tile->>Tile: track late data (side output)
Tile->>CGB: TimestampedTile
CGB->>CGB: convert to AvroCodecOutput (TiledAvroCodecFn)
CGB->>KVStore: async write (unordered waits)
KVStore-->>Env: DataStream[WriteResponse]
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Areas requiring extra attention:
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 12
🧹 Nitpick comments (24)
python/test/canary/joins/gcp/demo_parent.py (1)
1-9: Remove shadowed import.EventSource imported from both thrift (line 1) and ai.chronon.source (line 9). The second import shadows the first, making line 1's EventSource unused.
Apply this diff:
-from gen_thrift.api.ttypes import EventSource, Source, JoinSource +from gen_thrift.api.ttypes import Source, JoinSourceapi/src/main/scala/ai/chronon/api/planner/MonolithJoinPlanner.scala (1)
47-77: Deduplicate metadata deps to avoid repeats.Multiple JoinSources can add the same upstream join dep. Dedup by table name to keep the graph lean.
- val allDeps = Option(join.joinParts).map(_.asScala).getOrElse(Seq.empty).flatMap { joinPart => + val allDeps = Option(join.joinParts).map(_.asScala).getOrElse(Seq.empty).flatMap { joinPart => … - Seq(groupByDep) ++ upstreamJoinDeps - } + Seq(groupByDep) ++ upstreamJoinDeps + } + .groupBy(_.tableInfo.table) + .map(_._2.head) + .toSeqapi/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala (1)
92-104: Stream deps: dedup and avoid constructing node for name.
- Dedup deps by table to prevent repeats.
- Use the known suffix instead of instantiating uploadToKVNode for its table.
- val uploadToKVDep = new TableDependency() + val uploadToKVTable = groupBy.metaData.outputTable + s"__${GroupByPlanner.UploadToKV}" + val uploadToKVDep = new TableDependency() .setTableInfo( new TableInfo() - .setTable(uploadToKVNode.metaData.outputTable) + .setTable(uploadToKVTable) ) .setStartOffset(WindowUtils.zero()) .setEndOffset(WindowUtils.zero()) - val streamingTableDeps = Seq(uploadToKVDep) ++ joinSourceDeps + val streamingTableDeps = + (Seq(uploadToKVDep) ++ joinSourceDeps) + .groupBy(_.tableInfo.table) + .map(_._2.head) + .toSeqflink/src/test/scala/ai/chronon/flink/test/deser/CatalystUtilComplexAvroTest.scala (1)
4-4: Remove unused import.
SourceProjectionDeserializationSchemais no longer referenced.-import ai.chronon.flink.deser.{DeserializationSchemaBuilder, ProjectedEvent, SourceProjectionDeserializationSchema} +import ai.chronon.flink.deser.{DeserializationSchemaBuilder, ProjectedEvent}api/src/test/scala/ai/chronon/api/test/planner/TableDependenciesTest.scala (1)
147-160: Minor: fix test title typo.Readability nit in the spec description.
- it should "prioritize partitionLag over shiftwhen both are provided" in { + it should "prioritize partitionLag over shift when both are provided" in {api/src/test/scala/ai/chronon/api/test/planner/GroupByPlannerTest.scala (1)
222-350: Add an assertion to guard against duplicate upstream deps.If multiple JoinSources reference the same upstream join, ensure only one dep is present.
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunctionTest.scala (2)
28-44: Metrics mocking: prefer typed matchers.Use typed any[...] to avoid matcher ambiguity with Scala overloading.
-when(mockSubGroup.addGroup(anyString(), anyString())).thenReturn(mockSubGroup) -when(mockSubGroup.counter(anyString())).thenReturn(mockCounter) -when(mockSubGroup.histogram(anyString(), any())).thenReturn(mockHistogram) +when(mockSubGroup.addGroup(any[String], any[String])).thenReturn(mockSubGroup) +when(mockSubGroup.counter(any[String])).thenReturn(mockCounter) +when(mockSubGroup.histogram(any[String], any[org.apache.flink.metrics.Histogram])).thenReturn(mockHistogram)
126-189: Test title vs behavior.“handle query errors gracefully” doesn’t induce an error. Consider stubbing performSql to throw and assert pass‑through + error counter increment.
flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
36-41: Encoder cast safety.Add a require to fail fast if encoder isn’t an ExpressionEncoder.
- val eventExprEncoder = encoder.asInstanceOf[ExpressionEncoder[T]] + require(encoder.isInstanceOf[ExpressionEncoder[T]], "SparkExpressionEvalFn requires an ExpressionEncoder") + val eventExprEncoder = encoder.asInstanceOf[ExpressionEncoder[T]]flink/src/test/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunctionTest.scala (1)
34-42: Metrics mocking: use typed matchers.Avoid ambiguity with Scala/Mockito overloads.
-when(mockSubGroup.addGroup(anyString(), anyString())).thenReturn(mockSubGroup) -when(mockSubGroup.counter(anyString())).thenReturn(mockCounter) -when(mockSubGroup.histogram(anyString(), any())).thenReturn(mockHistogram) +when(mockSubGroup.addGroup(any[String], any[String])).thenReturn(mockSubGroup) +when(mockSubGroup.counter(any[String])).thenReturn(mockCounter) +when(mockSubGroup.histogram(any[String], any[org.apache.flink.metrics.Histogram])).thenReturn(mockHistogram)api/src/test/scala/ai/chronon/api/test/planner/MonolithJoinPlannerTest.scala (2)
511-525: Avoid brittle size thresholds; assert membership explicitly.
tableDeps.size should be >= 4can flap as deps evolve. Assert presence of expected tables (both GBs and both upstream joins) and drop the size check.
569-576: Relax exact-size check; rely on contains.
tableDeps.size should be(1)is fragile. Keep the positive check for the streaming GB dep and assert the absence of upstream join deps; drop the size equality.flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala (4)
59-63: Defensive parse for kv_concurrency.String.toInt may throw. Default safely and clamp > 0.
- private val kvStoreCapacity = FlinkUtils - .getProperty("kv_concurrency", props, topicInfo) - .map(_.toInt) - .getOrElse(AsyncKVStoreWriter.kvStoreConcurrency) + import scala.util.Try + private val kvStoreCapacity = FlinkUtils + .getProperty("kv_concurrency", props, topicInfo) + .flatMap(s => Try(s.toInt).toOption) + .filter(_ > 0) + .getOrElse(AsyncKVStoreWriter.kvStoreConcurrency)
148-156: Make buffer duration configurable.100 ms is hard-coded; expose e.g., trigger_buffer_ms via props/topicInfo.
- val trigger = getTrigger() + val trigger = getTrigger()And:
- private def getTrigger(): Trigger[ProjectedEvent, TimeWindow] = { + private def getTrigger(): Trigger[ProjectedEvent, TimeWindow] = { FlinkUtils .getProperty("trigger", props, topicInfo) .map { - case "always_fire" => new AlwaysFireOnElementTrigger() - case "buffered" => new BufferedProcessingTimeTrigger(100L) + case "always_fire" => new AlwaysFireOnElementTrigger() + case "buffered" => + val bufMs = FlinkUtils.getProperty("trigger_buffer_ms", props, topicInfo).flatMap(s => scala.util.Try(s.toLong).toOption).getOrElse(100L) + new BufferedProcessingTimeTrigger(bufMs) case t => throw new IllegalArgumentException(s"Unsupported trigger type: $t. Supported: 'always_fire', 'buffered'") } .getOrElse(new AlwaysFireOnElementTrigger()) }
171-185: Expose allowed lateness.Default is 0; add allowed_lateness_ms to control late-data side output volume.
- val tilingDS: SingleOutputStreamOperator[TimestampedTile] = - sparkExprEvalDSAndWatermarks - .keyBy(KeySelectorBuilder.build(groupByServingInfoParsed.groupBy)) - .window(window) + val tilingDS: SingleOutputStreamOperator[TimestampedTile] = { + val allowedMs = FlinkUtils.getProperty("allowed_lateness_ms", props, topicInfo) + .flatMap(s => scala.util.Try(s.toLong).toOption).getOrElse(0L) + val w = sparkExprEvalDSAndWatermarks + .keyBy(KeySelectorBuilder.build(groupByServingInfoParsed.groupBy)) + .window(window) + .allowedLateness(Time.milliseconds(allowedMs)) .trigger(trigger) .sideOutputLateData(tilingLateEventsTag) .aggregate( new FlinkRowAggregationFunction(groupByServingInfoParsed.groupBy, inputSchema, enableDebug), new FlinkRowAggProcessFunction(groupByServingInfoParsed.groupBy, inputSchema, enableDebug) ) - .uid(s"tiling-01-$groupByName") - .name(s"Tiling for $groupByName") - .setParallelism(sourceSparkProjectedStream.getParallelism) + w.uid(s"tiling-01-$groupByName") + .name(s"Tiling for $groupByName") + .setParallelism(sourceSparkProjectedStream.getParallelism) + }
105-111: Writer timeout configurability.Consider kv_timeout_ms to tune AsyncDataStream timeout in different environments.
- AsyncKVStoreWriter.withUnorderedWaits( + AsyncKVStoreWriter.withUnorderedWaits( putRecordDS, sinkFn, groupByName, - capacity = kvStoreCapacity + timeoutMillis = FlinkUtils.getProperty("kv_timeout_ms", props, topicInfo).flatMap(s => scala.util.Try(s.toLong).toOption).getOrElse(AsyncKVStoreWriter.defaultTimeoutMillis), + capacity = kvStoreCapacity )Also applies to: 200-205
flink/src/test/scala/ai/chronon/flink/test/FlinkJobEntityIntegrationTest.scala (1)
79-105: Close SparkExpressionEval after schema extraction.Avoid lingering state; close explicitly.
- val outputSchema = new SparkExpressionEval(encoder, query, groupBy.getMetaData.getName, groupBy.dataModel).getOutputSchema + val eval = new SparkExpressionEval(encoder, query, groupBy.getMetaData.getName, groupBy.dataModel) + val outputSchema = eval.getOutputSchema + eval.close()flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala (2)
95-121: Reduce noisy test logs.Replace println with a logger to keep CI logs clean.
197-207: Set timeColumn in join-source query.Explicitly set timeColumn (e.g., "created") to avoid defaulting to "ts".
- query = - Builders.Query( + query = + Builders.Query( + timeColumn = "created", selects = Map( "user_id" -> "user_id", "listing_id" -> "listing_id", "final_price" -> "price_discounted_last", ), )flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
35-44: Unify metrics group label.Use a single label (e.g., "feature_group") for both to simplify dashboards.
- .addGroup("group_by", groupByName) + .addGroup("feature_group", groupByName)Also applies to: 118-123
flink/src/main/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunction.scala (1)
163-166: Guard against duplicate column names in joined schema.Concatenating leftSourceSchema ++ joinCodec.valueSchema may collide (e.g., key or ts), causing ambiguous columns.
- val joinFields = leftSourceSchema.fields ++ joinCodec.valueSchema.fields + val leftNames = leftSourceSchema.fields.map(_.name).toSet + val joinNames = joinCodec.valueSchema.fields.map(_.name).toSet + val dupes = leftNames.intersect(joinNames) + if (dupes.nonEmpty) + throw new IllegalArgumentException(s"Duplicate columns in join schema: ${dupes.mkString(",")}") + val joinFields = leftSourceSchema.fields ++ joinCodec.valueSchema.fieldsflink/src/main/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJob.scala (3)
98-105: Make buffered trigger interval configurable.Hard-coded 100 ms. Read from props to tune latency/throughput.
- private def getTrigger(): Trigger[ProjectedEvent, TimeWindow] = { + private val bufferedTriggerMs: Long = + FlinkUtils.getProperty("buffered_trigger_ms", props, topicInfo).getOrElse("100").toLong + private def getTrigger(): Trigger[ProjectedEvent, TimeWindow] = { FlinkUtils.getProperty("trigger", props, topicInfo).getOrElse("always_fire") match { case "always_fire" => new AlwaysFireOnElementTrigger() - case "buffered" => new BufferedProcessingTimeTrigger(100L) + case "buffered" => new BufferedProcessingTimeTrigger(bufferedTriggerMs) case t => throw new IllegalArgumentException(s"Unsupported trigger type: $t. Supported: 'always_fire', 'buffered'") } }
224-231: Expose KV-writer timeout via props.Currently uses default (1000 ms). Make it configurable.
- // Write to KV store using existing AsyncKVStoreWriter + // Write to KV store using existing AsyncKVStoreWriter + val kvTimeoutMillis: Long = + FlinkUtils.getProperty("kv_timeout_ms", props, topicInfo) + .getOrElse(AsyncKVStoreWriter.defaultTimeoutMillis.toString).toLong AsyncKVStoreWriter.withUnorderedWaits( avroConvertedStream, sinkFn, groupByName, - capacity = kvStoreCapacity + timeoutMillis = kvTimeoutMillis, + capacity = kvStoreCapacity )
81-84: Remove unused vals (keys/values/eventTimeColumn).They’re defined but unused.
- val keyColumns: Array[String] = groupByConf.keyColumns.toScala.toArray - val valueColumns: Array[String] = groupByConf.aggregationInputs - val eventTimeColumn = Constants.TimeColumn
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (34)
api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala(1 hunks)api/src/main/scala/ai/chronon/api/planner/MonolithJoinPlanner.scala(1 hunks)api/src/main/scala/ai/chronon/api/planner/TableDependencies.scala(1 hunks)api/src/test/scala/ai/chronon/api/test/planner/GroupByPlannerTest.scala(1 hunks)api/src/test/scala/ai/chronon/api/test/planner/MonolithJoinPlannerTest.scala(2 hunks)api/src/test/scala/ai/chronon/api/test/planner/TableDependenciesTest.scala(2 hunks)flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala(0 hunks)flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala(4 hunks)flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala(2 hunks)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala(5 hunks)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala(3 hunks)flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala(2 hunks)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala(7 hunks)flink/src/main/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJob.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunction.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala(2 hunks)flink/src/test/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJobIntegrationTest.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunctionTest.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunctionTest.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/test/FlinkJobEntityIntegrationTest.scala(3 hunks)flink/src/test/scala/ai/chronon/flink/test/FlinkJobEventIntegrationTest.scala(3 hunks)flink/src/test/scala/ai/chronon/flink/test/SparkExpressionEvalFnTest.scala(4 hunks)flink/src/test/scala/ai/chronon/flink/test/deser/CatalystUtilComplexAvroTest.scala(2 hunks)flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala(2 hunks)online/src/main/scala/ai/chronon/online/Api.scala(1 hunks)python/test/canary/compiled/group_bys/gcp/user_activities_chained.chained_user_gb__0(1 hunks)python/test/canary/compiled/joins/gcp/demo_chaining.downstream_join__0(1 hunks)python/test/canary/compiled/joins/gcp/demo_parent.parent_join__0(1 hunks)python/test/canary/group_bys/gcp/user_activities_chained.py(1 hunks)python/test/canary/joins/gcp/demo_chaining.py(1 hunks)python/test/canary/joins/gcp/demo_parent.py(1 hunks)
💤 Files with no reviewable changes (1)
- flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala
🧰 Additional context used
🧬 Code graph analysis (29)
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunctionTest.scala (5)
api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-100)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)online/src/main/scala/ai/chronon/online/Api.scala (2)
Api(211-288)buildFetcher(258-271)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (2)
Response(68-68)Request(59-62)flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala (3)
open(39-59)asyncInvoke(61-124)timeout(126-133)
python/test/canary/group_bys/gcp/user_activities_chained.py (1)
python/src/ai/chronon/group_by.py (3)
Operation(72-158)TimeUnit(188-190)Window(252-253)
flink/src/test/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJobIntegrationTest.scala (5)
flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala (2)
FlinkTestUtils(109-215)MockAsyncKVStoreWriter(87-96)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions(56-159)flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala (5)
JoinTestUtils(156-223)build(132-153)JoinTestEvent(21-21)buildJoinSourceTerminalGroupBy(163-222)get(66-68)flink/src/main/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJob.scala (1)
runTiledGroupByJob(116-231)
online/src/main/scala/ai/chronon/online/Api.scala (1)
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala (1)
buildFetcher(60-62)
flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
GroupByOps(501-732)query(396-404)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (3)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)
api/src/main/scala/ai/chronon/api/planner/MonolithJoinPlanner.scala (3)
api/src/main/scala/ai/chronon/api/Extensions.scala (4)
streamingSource(578-584)outputTable(165-173)WindowUtils(90-155)zero(154-154)api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala (2)
GroupByPlanner(15-140)GroupByPlanner(142-145)api/src/main/scala/ai/chronon/api/planner/TableDependencies.scala (2)
TableDependencies(9-147)fromJoinSources(129-145)
api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (3)
outputTable(165-173)WindowUtils(90-155)zero(154-154)api/src/main/scala/ai/chronon/api/planner/TableDependencies.scala (2)
TableDependencies(9-147)fromJoinSources(129-145)
flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala (8)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala (2)
ResolutionUtils(65-86)getSmallestTailHopMillis(70-85)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala (1)
FlinkSource(6-21)flink/src/main/scala/ai/chronon/flink/types/FlinkTypes.scala (3)
AvroCodecOutput(89-115)TimestampedTile(57-84)WriteResponse(119-148)flink/src/main/scala/ai/chronon/flink/window/Trigger.scala (2)
AlwaysFireOnElementTrigger(12-50)BufferedProcessingTimeTrigger(88-177)flink/src/main/scala/ai/chronon/flink/window/KeySelectorBuilder.scala (1)
KeySelectorBuilder(16-43)flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala (2)
FlinkUtils(7-17)getProperty(9-16)flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (1)
withUnorderedWaits(31-49)
flink/src/main/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJob.scala (10)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala (2)
ResolutionUtils(65-86)getSmallestTailHopMillis(70-85)flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (3)
AsyncKVStoreWriter(27-52)AsyncKVStoreWriter(58-158)withUnorderedWaits(31-49)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (1)
BaseFlinkJob(33-45)flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala (2)
FlinkUtils(7-17)getProperty(9-16)flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (1)
TiledAvroCodecFn(110-175)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala (1)
FlinkSource(6-21)flink/src/main/scala/ai/chronon/flink/types/FlinkTypes.scala (3)
AvroCodecOutput(89-115)TimestampedTile(57-84)WriteResponse(119-148)flink/src/main/scala/ai/chronon/flink/window/KeySelectorBuilder.scala (1)
KeySelectorBuilder(16-43)flink/src/main/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunction.scala (5)
flatMap(65-91)JoinSourceQueryFunction(27-92)JoinSourceQueryFunction(100-179)buildJoinSchema(145-178)buildCatalystUtil(103-140)
api/src/test/scala/ai/chronon/api/test/planner/GroupByPlannerTest.scala (3)
api/src/main/scala/ai/chronon/api/Builders.scala (5)
Builders(28-474)Source(107-141)Query(42-67)MetaData(265-321)joinSource(133-140)api/src/main/scala/ai/chronon/api/Extensions.scala (3)
table(440-440)keyColumns(1033-1038)topic(467-477)api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala (4)
GroupByPlanner(15-140)GroupByPlanner(142-145)buildPlan(119-139)streamingNode(89-117)
flink/src/main/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunction.scala (6)
api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-100)api/src/main/scala/ai/chronon/api/DataType.scala (1)
StructField(218-218)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)online/src/main/scala/ai/chronon/online/Api.scala (2)
Api(211-288)buildFetcher(258-271)online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (2)
SparkConversions(56-159)toChrononType(61-87)online/src/main/scala/ai/chronon/online/CatalystUtil.scala (1)
getOutputSparkSchema(153-153)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (2)
api/src/main/scala/ai/chronon/api/Builders.scala (1)
Query(42-67)api/src/main/scala/ai/chronon/api/Extensions.scala (3)
query(396-404)dataModel(379-384)dataModel(536-543)
flink/src/test/scala/ai/chronon/flink/test/FlinkJobEventIntegrationTest.scala (4)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
GroupByOps(501-732)query(396-404)flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala (1)
FlinkGroupByStreamingJob(40-219)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (4)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)getOutputSchema(115-118)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)
flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (2)
api/src/main/scala/ai/chronon/api/Builders.scala (1)
Query(42-67)api/src/main/scala/ai/chronon/api/Extensions.scala (1)
query(396-404)
python/test/canary/joins/gcp/demo_chaining.py (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (1)
query(396-404)api/src/main/scala/ai/chronon/api/Builders.scala (1)
Query(42-67)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (3)
query(396-404)streamingSource(578-584)cleanName(158-158)api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-100)
python/test/canary/joins/gcp/demo_parent.py (2)
api/src/main/scala/ai/chronon/api/Builders.scala (2)
Source(107-141)Query(42-67)python/src/ai/chronon/group_by.py (3)
Operation(72-158)TimeUnit(188-190)Window(252-253)
api/src/test/scala/ai/chronon/api/test/planner/MonolithJoinPlannerTest.scala (3)
api/src/main/scala/ai/chronon/api/planner/GroupByPlanner.scala (3)
GroupByPlanner(15-140)GroupByPlanner(142-145)buildPlan(119-139)api/src/main/scala/ai/chronon/api/planner/MonolithJoinPlanner.scala (3)
MonolithJoinPlanner(11-107)buildPlan(89-106)metadataUploadNode(44-87)api/src/main/scala/ai/chronon/api/Builders.scala (5)
Builders(28-474)Source(107-141)Query(42-67)MetaData(265-321)joinSource(133-140)
flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
GroupByOps(501-732)query(396-404)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (4)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)getOutputSchema(115-118)
flink/src/test/scala/ai/chronon/flink/test/SparkExpressionEvalFnTest.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (4)
Extensions(39-1350)query(396-404)dataModel(379-384)dataModel(536-543)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (3)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)
flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (2)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (3)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunctionTest.scala (4)
api/src/main/scala/ai/chronon/api/Builders.scala (5)
Builders(28-474)Source(107-141)Query(42-67)MetaData(265-321)joinSource(133-140)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)online/src/main/scala/ai/chronon/online/Api.scala (2)
Api(211-288)buildFetcher(258-271)flink/src/main/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunction.scala (2)
open(40-63)flatMap(65-91)
api/src/test/scala/ai/chronon/api/test/planner/TableDependenciesTest.scala (3)
api/src/main/scala/ai/chronon/api/Extensions.scala (3)
WindowUtils(90-155)MetadataOps(157-222)outputTable(165-173)api/src/main/scala/ai/chronon/api/Builders.scala (5)
Builders(28-474)MetaData(265-321)Source(107-141)Query(42-67)joinSource(133-140)api/src/main/scala/ai/chronon/api/planner/TableDependencies.scala (3)
TableDependencies(9-147)fromJoinSources(129-145)dep(34-37)
flink/src/test/scala/ai/chronon/flink/test/deser/CatalystUtilComplexAvroTest.scala (2)
flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (3)
DeserializationSchemaBuilder(31-62)buildSourceProjectionDeserSchema(44-53)buildSourceProjectionDeserSchema(55-61)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (2)
ProjectedEvent(92-92)SourceProjectionDeserializationSchema(94-169)
flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala (4)
flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala (1)
DirectExecutionContext(21-29)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)online/src/main/scala/ai/chronon/online/Api.scala (2)
Api(211-288)buildFetcher(258-271)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (1)
Request(59-62)
flink/src/test/scala/ai/chronon/flink/test/FlinkJobEntityIntegrationTest.scala (4)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
GroupByOps(501-732)query(396-404)flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala (1)
FlinkGroupByStreamingJob(40-219)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (4)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)getOutputSchema(115-118)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala (8)
api/src/main/scala/ai/chronon/api/DataType.scala (3)
DoubleType(197-197)StringType(207-207)StructField(218-218)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala (1)
FlinkSource(6-21)online/src/main/scala/ai/chronon/online/Api.scala (11)
KVStore(36-53)GetRequest(40-43)GetResponse(45-47)PutRequest(48-48)Api(211-288)LoggableResponse(138-142)LoggableResponse(144-184)buildFetcher(258-271)get(106-114)put(74-74)create(62-62)online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala (1)
FetchContext(9-27)online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (1)
MetadataStore(57-421)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (1)
Response(68-68)
api/src/main/scala/ai/chronon/api/planner/TableDependencies.scala (1)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
WindowUtils(90-155)zero(154-154)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (6)
flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (2)
AsyncKVStoreWriter(27-52)AsyncKVStoreWriter(58-158)flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala (2)
FlinkGroupByStreamingJob(40-219)runTiledGroupByJob(127-206)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (2)
ProjectedEvent(92-92)projectedSchema(108-114)flink/src/main/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJob.scala (1)
FlinkJoinSourceJob(49-252)flink/src/main/scala/ai/chronon/flink/source/FlinkSourceProvider.scala (2)
FlinkSourceProvider(6-31)build(7-18)online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala (4)
DataStream(72-124)TopicInfo(33-33)TopicInfo(34-70)parse(45-69)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: cloud_aws_tests
- GitHub Check: api_tests
- GitHub Check: udafs_tests
- GitHub Check: kv_store_tests
- GitHub Check: flink_tests
- GitHub Check: online_tests
- GitHub Check: stats_tests
- GitHub Check: aggregator_tests
- GitHub Check: join_tests
- GitHub Check: groupby_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: analyzer_tests
- GitHub Check: fetcher_tests
- GitHub Check: python_lint
- GitHub Check: streaming_tests
- GitHub Check: service_commons_tests
- GitHub Check: python_tests
- GitHub Check: service_tests
- GitHub Check: spark_tests
- GitHub Check: batch_tests
🔇 Additional comments (21)
flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala (2)
5-6: LGTM!Standard import for the DirectExecutionContext class.
18-29: No changes needed.DirectExecutionContext is properly used as a private instance field in JoinEnrichmentAsyncFunction and AsyncKVStoreWriter. The current local singleton pattern per class is appropriate; a global companion object singleton is unnecessary here.
python/test/canary/joins/gcp/demo_chaining.py (1)
1-24: LGTM - Clean chained join setup.The downstream join correctly wires demo_parent.source to the chained GroupBy.
python/test/canary/group_bys/gcp/user_activities_chained.py (1)
1-28: LGTM - Chained GroupBy configured correctly.Using JoinSource as a GroupBy source is the intended pattern for chaining.
python/test/canary/joins/gcp/demo_parent.py (3)
12-24: LGTM - EventSource configured correctly.Pub/Sub source with appropriate query projections.
33-44: LGTM - Parent join structure correct.Enriches event stream with listing features.
46-56: LGTM - JoinSource projection correct.Query correctly maps joined columns for downstream GroupBy.
python/test/canary/compiled/group_bys/gcp/user_activities_chained.chained_user_gb__0 (1)
1-190: Generated artifact - structure looks consistent.Compiled configuration matches source definition.
python/test/canary/compiled/joins/gcp/demo_parent.parent_join__0 (1)
1-183: Generated artifact - configuration correct.Parent join compiled structure matches source.
python/test/canary/compiled/joins/gcp/demo_chaining.downstream_join__0 (1)
1-240: Generated artifact - chaining structure correct.Downstream join compilation properly nests the join chain.
flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala (1)
78-80: No action required—makeGroupBy correctly provides streaming source.Both
makeGroupByimplementations in test utilities create a Source withtopic = "events.my_stream", satisfying thestreamingSource.isDefinedrequirement. The code at lines 78-80 is safe.online/src/main/scala/ai/chronon/online/Api.scala (1)
255-270: Do not restrict buildFetcher visibility; it is part of the public API.The method has extensive external callers across test and production code in spark, online, and flink modules. Making it
protectedwould break these dependencies.The thread-safety comment can be improved to clarify that lazy val ensures safe initialization, but the visibility restriction is infeasible.
Likely an incorrect or invalid review comment.
flink/src/test/scala/ai/chronon/flink/test/SparkExpressionEvalFnTest.scala (2)
23-30: Constructor migration LGTM.Using queryFromGroupBy + passing groupBy.metaData.name aligns with the new API. Tests read well.
81-89: Entity path coverage looks good.Explicitly passing groupBy.dataModel ensures correct ENTITIES filters/time columns.
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunctionTest.scala (1)
227-234: Schema access verification LGTM.Asserting valueSchema access validates the join schema wiring during open().
flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (1)
132-137: Validation wiring LGTM.Using queryFromGroupBy + new SparkExpressionEvalFn signature is correct.
flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (1)
44-53: Overloads align with query-driven eval path.Signatures and usage look correct; the groupBy variant intentionally requires a streaming source. Ensure JoinSource callers use the (query, groupByName, dataModel) overload.
Also applies to: 55-61
flink/src/test/scala/ai/chronon/flink/test/FlinkJobEventIntegrationTest.scala (1)
157-181: LGTM: test updated to new APIs and query path.Constructor, schema derivation, and job type changes are consistent.
flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala (1)
152-156: Trigger end-of-window semantics.AlwaysFireOnElementTrigger doesn’t FIRE at window end; ensure tiles are marked “closed” as intended. If closure depends on a window-end FIRE, this may miss a final emission.
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (1)
194-204: Helper API looks good.queryFromGroupBy and transform/filter construction are clear and align with DataModel.
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (1)
229-235: LGTM: metrics sink wiring.Stable UIDs/names per groupBy; parallelism follows upstream.
| // Check if this is a JoinSource GroupBy | ||
| if (servingInfo.groupBy.streamingSource.get.isSetJoinSource) { | ||
| buildJoinSourceFlinkJob(groupByName, props, api, servingInfo, enableDebug) | ||
| } else { | ||
| buildGroupByStreamingJob(groupByName, props, api, servingInfo, enableDebug) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid .get on optional streamingSource.
NPE risk when groupBy lacks streaming source. Handle None explicitly.
- if (servingInfo.groupBy.streamingSource.get.isSetJoinSource) {
- buildJoinSourceFlinkJob(groupByName, props, api, servingInfo, enableDebug)
- } else {
- buildGroupByStreamingJob(groupByName, props, api, servingInfo, enableDebug)
- }
+ servingInfo.groupBy.streamingSource match {
+ case Some(src) if src.isSetJoinSource =>
+ buildJoinSourceFlinkJob(groupByName, props, api, servingInfo, enableDebug)
+ case Some(_) =>
+ buildGroupByStreamingJob(groupByName, props, api, servingInfo, enableDebug)
+ case None =>
+ throw new IllegalArgumentException(s"Invalid groupBy: $groupByName. No streaming source configured.")
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Check if this is a JoinSource GroupBy | |
| if (servingInfo.groupBy.streamingSource.get.isSetJoinSource) { | |
| buildJoinSourceFlinkJob(groupByName, props, api, servingInfo, enableDebug) | |
| } else { | |
| buildGroupByStreamingJob(groupByName, props, api, servingInfo, enableDebug) | |
| } | |
| } | |
| // Check if this is a JoinSource GroupBy | |
| servingInfo.groupBy.streamingSource match { | |
| case Some(src) if src.isSetJoinSource => | |
| buildJoinSourceFlinkJob(groupByName, props, api, servingInfo, enableDebug) | |
| case Some(_) => | |
| buildGroupByStreamingJob(groupByName, props, api, servingInfo, enableDebug) | |
| case None => | |
| throw new IllegalArgumentException(s"Invalid groupBy: $groupByName. No streaming source configured.") | |
| } | |
| } |
🤖 Prompt for AI Agents
In flink/src/main/scala/ai/chronon/flink/FlinkJob.scala around lines 243 to 249,
avoid calling .get on servingInfo.groupBy.streamingSource which can be None and
cause an NPE; instead explicitly pattern-match or use Option combinators to
handle the None case and decide behavior: if streamingSource is Some(ss) and
ss.isSetJoinSource then call buildJoinSourceFlinkJob(...), if Some(_) call
buildGroupByStreamingJob(...), and if None either log/raise a clear error or
choose a sensible default pathway; update the conditional accordingly so no .get
is used and all three cases are handled.
| // Compute the output schema after JoinSourceQueryFunction transformations using Catalyst | ||
| val postTransformationSchema = computePostTransformationSchemaWithCatalyst(joinSource, inputSchema) | ||
|
|
||
| // Calculate tiling window size based on the GroupBy configuration | ||
| val tilingWindowSizeInMillis: Long = | ||
| ResolutionUtils.getSmallestTailHopMillis(groupByServingInfoParsed.groupBy) | ||
|
|
||
| // Configure tumbling window for tiled aggregations | ||
| val window = TumblingEventTimeWindows | ||
| .of(Time.milliseconds(tilingWindowSizeInMillis)) | ||
| .asInstanceOf[WindowAssigner[ProjectedEvent, TimeWindow]] | ||
|
|
||
| // Configure trigger (default to always fire on element) | ||
| val trigger = getTrigger() | ||
|
|
||
| // We use Flink "Side Outputs" to track any late events that aren't computed. | ||
| val tilingLateEventsTag = new OutputTag[ProjectedEvent]("tiling-late-events") {} | ||
|
|
||
| // Tiled aggregation: key by entity keys, window, and aggregate | ||
| val tilingDS: SingleOutputStreamOperator[TimestampedTile] = | ||
| processedStream | ||
| .keyBy(KeySelectorBuilder.build(groupByServingInfoParsed.groupBy)) | ||
| .window(window) | ||
| .trigger(trigger) | ||
| .sideOutputLateData(tilingLateEventsTag) | ||
| .aggregate( | ||
| // Aggregation function that maintains incremental IRs in state | ||
| new FlinkRowAggregationFunction(groupByServingInfoParsed.groupBy, postTransformationSchema, enableDebug), | ||
| // Process function that marks tiles as closed for client-side caching | ||
| new FlinkRowAggProcessFunction(groupByServingInfoParsed.groupBy, postTransformationSchema, enableDebug) | ||
| ) | ||
| .uid(s"tiling-$groupByName") | ||
| .name(s"Tiling for $groupByName") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Ensure schema consistency if query operator errors.
Downstream agg uses postTransformationSchema. If JoinSourceQueryFunction falls back to passthrough on error, shapes may diverge. After applying the earlier operator fix, confirm agg receives schema-aligned records.
🏁 Script executed:
#!/bin/bash
# Grep for any occurrences of collecting the original enriched event after query errors.
rg -nP 'JoinSourceQueryFunction|Error applying join source query|out\.collect\(enrichedEvent\)' -C2Length of output: 10710
🏁 Script executed:
#!/bin/bash
# Find FlinkRowAggregationFunction implementation
fd FlinkRowAggregationFunction --type fLength of output: 87
🏁 Script executed:
#!/bin/bash
# Search for FlinkRowAggregationFunction class definition
rg -nP 'class\s+FlinkRowAggregationFunction|new\s+FlinkRowAggregationFunction' -A10 --type scalaLength of output: 9763
🏁 Script executed:
#!/bin/bash
# Find add() method in FlinkRowAggregationFunction
rg -nP 'def add\(' flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala -A20Length of output: 826
🏁 Script executed:
#!/bin/bash
# Find toChrononRow method
rg -nP 'def toChrononRow' flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala -A15Length of output: 787
Schema consistency bug: error passthrough causes runtime failure in aggregation.
If JoinSourceQueryFunction errors (line 89 of JoinSourceQueryFunction.scala), it passes the original enrichedEvent unchanged. However, FlinkRowAggregationFunction at line 201 expects postTransformationSchema. When toChrononRow (line 166 of FlinkRowAggregators.scala) tries to map valueColumns, missing fields will cause KeyNotFoundException.
Fix: Apply the earlier operator fix to JoinSourceQueryFunction to ensure it either succeeds or fails the record properly, rather than passing through mismatched schema. Or add schema validation/coercion between processedStream and the aggregation operator to normalize records before aggregation.
🤖 Prompt for AI Agents
In flink/src/main/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJob.scala
around lines 174-206, the aggregation pipeline assumes records match
postTransformationSchema but JoinSourceQueryFunction may pass through an
unchanged enrichedEvent on error leading to KeyNotFoundException in
FlinkRowAggregationFunction/toChrononRow; fix by either applying the earlier
operator change to JoinSourceQueryFunction so it fails or drops the record on
error (do not pass through the original enrichedEvent), or insert a
validation/coercion step immediately before the .keyBy(...).window(...) chain
that checks and normalizes each record to postTransformationSchema (e.g., map
that verifies presence of required valueColumns and either transforms missing
fields to defaults or routes the record to a side-output/failure path), ensuring
only schema-consistent records reach FlinkRowAggregationFunction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair call out - instead of passing through on errors / timeouts, we will swallow the events so that they don't end up being poison pills for our app.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@piyush-zlai, sounds good! Swallowing errored events will prevent schema-mismatched records from reaching the aggregator and avoid the KeyNotFoundException. 👍
flink/src/main/scala/ai/chronon/flink/chaining/JoinSourceQueryFunction.scala
Show resolved
Hide resolved
flink/src/test/scala/ai/chronon/flink/chaining/ChainedGroupByJobIntegrationTest.scala
Show resolved
Hide resolved
flink/src/test/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJobIntegrationTest.scala
Outdated
Show resolved
Hide resolved
| val joinFuture = Promise[Seq[Fetcher.Response]]() | ||
| joinFuture.success(Seq(joinResponse)) | ||
| when(mockFetcher.fetchJoin(any(), any())).thenReturn(joinFuture.future) | ||
|
|
||
| val function = new JoinEnrichmentAsyncFunction(joinRequestName, "testGB", mockApi, enableDebug) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong mock signature for fetchJoin; tests will not match prod call.
Production calls fetchJoin(Seq(request)). Stub and verify the 1‑arg method.
-when(mockFetcher.fetchJoin(any(), any())).thenReturn(joinFuture.future)
+when(mockFetcher.fetchJoin(any[Seq[Fetcher.Request]])).thenReturn(joinFuture.future)Also update verification:
-verify(mockFetcher).fetchJoin(any(), any())
+verify(mockFetcher).fetchJoin(any[Seq[Fetcher.Request]])📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| val joinFuture = Promise[Seq[Fetcher.Response]]() | |
| joinFuture.success(Seq(joinResponse)) | |
| when(mockFetcher.fetchJoin(any(), any())).thenReturn(joinFuture.future) | |
| val function = new JoinEnrichmentAsyncFunction(joinRequestName, "testGB", mockApi, enableDebug) | |
| val joinFuture = Promise[Seq[Fetcher.Response]]() | |
| joinFuture.success(Seq(joinResponse)) | |
| when(mockFetcher.fetchJoin(any[Seq[Fetcher.Request]])).thenReturn(joinFuture.future) | |
| val function = new JoinEnrichmentAsyncFunction(joinRequestName, "testGB", mockApi, enableDebug) |
🤖 Prompt for AI Agents
In
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunctionTest.scala
around lines 55-59, the test stubs mockFetcher.fetchJoin with a two-arg
signature (any(), any()) but production calls fetchJoin(Seq(request)) with a
single Seq argument; change the stub to mock the single-argument method
(accepting Seq[Fetcher.Request] or any[Seq[Fetcher.Request]]), return the same
joinFuture.future, and update the verification to verify the single-arg
fetchJoin call (matching the expected Seq) instead of verifying a two-argument
call.
7b51303 to
b8ce89c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (1)
55-67: Add close() to release evaluator resources.SparkDFVsCatalystComparisonFn opens SparkExpressionEvalFn but never closes it. Implement close() and delegate to sparkExpressionEvalFn.close().
class SparkDFVsCatalystComparisonFn(sparkExpressionEvalFn: SparkExpressionEvalFn[Row]) extends RichAllWindowFunction[EventRecord, ValidationStats, GlobalWindow] { @@ override def open(parameters: Configuration): Unit = { super.open(parameters) sparkExpressionEvalFn.setRuntimeContext(this.getRuntimeContext) try { sparkExpressionEvalFn.open(parameters) @@ } } + + override def close(): Unit = { + try sparkExpressionEvalFn.close() + finally super.close() + } }
♻️ Duplicate comments (11)
flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala (2)
3-3: Compile-time break: importing a private class.
DirectExecutionContextisprivateinFlinkUtils.scala(line 21) and not visible here.
107-114: Don't fail stream on join errors; degrade gracefully.For best-effort enrichment, emit the original event instead of
completeExceptionally.flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
44-44: Metric label inconsistency: use "group_by".Dashboards expect
"group_by", not"feature_group". Change for consistency.flink/src/test/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunctionTest.scala (2)
57-57: Wrong mock signature; tests won't match production.Production calls
fetchJoin(Seq(request))(single arg). Mock should beany[Seq[Fetcher.Request]].
98-98: Fix verification to match single-arg signature.Change to
verify(mockFetcher).fetchJoin(any[Seq[Fetcher.Request]]).flink/src/test/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJobIntegrationTest.scala (2)
102-102: NPE risk: JoinSource-only GroupBy has nostreamingSource.
.getwill throw. Find JoinSource fromgroupBy.sourcesinstead.
92-98: Decode binary keys; avoidnew String(keyBytes).Keys are Avro-encoded (TileKey). Decode to extract
user_id, then assert membership.flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)
243-249: NPE risk: handle None case for streamingSource.Calling
.getwithout validatingstreamingSource.isDefinedrisks NPE.Use pattern matching or explicit None handling as suggested in prior review.
258-267: Validate leftSource.query before use.
leftSource.querymay be null; add a defensive check before passing to deserialization schema builder.Apply the suggestion from prior review to require the query is non-null.
flink/src/main/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunction.scala (1)
84-90: PII leak and schema mismatch on error.Logging full
enrichedEvent.fieldsrisks PII exposure. Passing through the original enriched event violates the downstream schema expectation (postTransformationSchema), causing runtime failures in aggregation.Apply the fix from prior review: redact field values in logs and emit a schema-aligned fallback record.
flink/src/main/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJob.scala (1)
174-207: Schema mismatch on upstream query error.If
JoinSourceQueryFunctionerrors and passes through the original enriched event (line 89), the aggregation function at line 201 will fail because it expectspostTransformationSchema. Fix the upstream error handler first (JoinSourceQueryFunction.scala lines 84-90).Apply the upstream fix suggested in prior review to ensure schema-aligned records reach aggregation.
🧹 Nitpick comments (1)
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala (1)
95-120: Reduce println noise in tests.The TestFetcher logs to stdout; prefer a test logger or guard behind a debug flag to keep CI logs clean.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (22)
flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala(0 hunks)flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala(4 hunks)flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala(2 hunks)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala(5 hunks)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala(3 hunks)flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala(2 hunks)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala(7 hunks)flink/src/main/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJob.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunction.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala(2 hunks)flink/src/test/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJobIntegrationTest.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunctionTest.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunctionTest.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/test/FlinkJobEntityIntegrationTest.scala(3 hunks)flink/src/test/scala/ai/chronon/flink/test/FlinkJobEventIntegrationTest.scala(3 hunks)flink/src/test/scala/ai/chronon/flink/test/SparkExpressionEvalFnTest.scala(4 hunks)flink/src/test/scala/ai/chronon/flink/test/deser/CatalystUtilComplexAvroTest.scala(2 hunks)flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala(2 hunks)online/src/main/scala/ai/chronon/online/Api.scala(1 hunks)
💤 Files with no reviewable changes (1)
- flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala
🚧 Files skipped from review as they are similar to previous changes (3)
- flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala
- flink/src/test/scala/ai/chronon/flink/test/deser/CatalystUtilComplexAvroTest.scala
- flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala
🧰 Additional context used
🧬 Code graph analysis (18)
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunctionTest.scala (5)
flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)online/src/main/scala/ai/chronon/online/Api.scala (2)
Api(211-288)buildFetcher(258-271)flink/src/test/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunctionTest.scala (1)
setupFunctionWithMockedMetrics(28-44)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (2)
Response(68-68)Request(59-62)flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala (3)
open(39-59)asyncInvoke(61-124)timeout(126-133)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (2)
api/src/main/scala/ai/chronon/api/Builders.scala (1)
Query(42-67)api/src/main/scala/ai/chronon/api/Extensions.scala (3)
query(396-404)dataModel(379-384)dataModel(536-543)
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunctionTest.scala (3)
api/src/main/scala/ai/chronon/api/Builders.scala (4)
Builders(28-474)Source(107-141)Query(42-67)joinSource(133-140)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)flink/src/main/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunction.scala (2)
open(40-63)flatMap(65-91)
flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
GroupByOps(501-732)query(396-404)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (3)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)
online/src/main/scala/ai/chronon/online/Api.scala (1)
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala (1)
buildFetcher(60-62)
flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala (4)
flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala (1)
DirectExecutionContext(21-29)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)online/src/main/scala/ai/chronon/online/Api.scala (2)
Api(211-288)buildFetcher(258-271)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (1)
Request(59-62)
flink/src/test/scala/ai/chronon/flink/test/SparkExpressionEvalFnTest.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (5)
Extensions(39-1350)GroupByOps(501-732)query(396-404)dataModel(379-384)dataModel(536-543)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (3)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala (9)
api/src/main/scala/ai/chronon/api/Builders.scala (5)
Builders(28-474)Source(107-141)Query(42-67)MetaData(265-321)joinSource(133-140)api/src/main/scala/ai/chronon/api/DataType.scala (3)
DoubleType(197-197)StringType(207-207)StructField(218-218)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala (1)
FlinkSource(6-21)online/src/main/scala/ai/chronon/online/Api.scala (8)
KVStore(36-53)GetRequest(40-43)GetResponse(45-47)PutRequest(48-48)Api(211-288)get(106-114)put(74-74)create(62-62)online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala (1)
FetchContext(9-27)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (1)
Response(68-68)online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (1)
AvroConversions(33-281)
flink/src/test/scala/ai/chronon/flink/test/FlinkJobEntityIntegrationTest.scala (4)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
GroupByOps(501-732)query(396-404)flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala (1)
FlinkGroupByStreamingJob(40-219)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (4)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)getOutputSchema(115-118)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (3)
query(396-404)streamingSource(578-584)cleanName(158-158)api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-100)
flink/src/main/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunction.scala (6)
api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-100)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)online/src/main/scala/ai/chronon/online/Api.scala (2)
Api(211-288)buildFetcher(258-271)online/src/main/scala/ai/chronon/online/JoinCodec.scala (1)
toMap(51-51)online/src/main/scala/ai/chronon/online/CatalystUtil.scala (1)
getOutputSparkSchema(153-153)flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala (2)
buildFetcher(60-62)buildJoinCodec(124-124)
flink/src/test/scala/ai/chronon/flink/test/FlinkJobEventIntegrationTest.scala (4)
flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala (1)
FlinkGroupByStreamingJob(40-219)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (4)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)getOutputSchema(115-118)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)flink/src/test/scala/ai/chronon/flink/test/FlinkJobEntityIntegrationTest.scala (1)
buildFlinkJob(79-105)
flink/src/test/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJobIntegrationTest.scala (4)
flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala (2)
FlinkTestUtils(109-215)MockAsyncKVStoreWriter(87-96)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions(56-159)flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala (5)
JoinTestUtils(156-223)build(132-153)JoinTestEvent(21-21)buildJoinSourceTerminalGroupBy(163-222)get(66-68)
flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
GroupByOps(501-732)query(396-404)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (3)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)
flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (2)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (3)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (9)
flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (2)
AsyncKVStoreWriter(27-52)AsyncKVStoreWriter(58-158)flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala (2)
FlinkGroupByStreamingJob(40-219)runTiledGroupByJob(127-206)flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (3)
DeserializationSchemaBuilder(31-62)buildSourceProjectionDeserSchema(44-53)buildSourceProjectionDeserSchema(55-61)flink/src/main/scala/ai/chronon/flink/deser/FlinkSerDeProvider.scala (2)
FlinkSerDeProvider(9-40)build(17-31)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (2)
ProjectedEvent(92-92)projectedSchema(108-114)flink/src/main/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJob.scala (1)
FlinkJoinSourceJob(49-252)flink/src/main/scala/ai/chronon/flink/source/FlinkSourceProvider.scala (2)
FlinkSourceProvider(6-31)build(7-18)flink/src/main/scala/ai/chronon/flink/types/FlinkTypes.scala (1)
WriteResponse(119-148)online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala (4)
DataStream(72-124)TopicInfo(33-33)TopicInfo(34-70)parse(45-69)
flink/src/main/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJob.scala (11)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala (2)
ResolutionUtils(65-86)getSmallestTailHopMillis(70-85)flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (3)
AsyncKVStoreWriter(27-52)AsyncKVStoreWriter(58-158)withUnorderedWaits(31-49)flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala (2)
FlinkUtils(7-17)getProperty(9-16)flink/src/main/scala/ai/chronon/flink/RichMetricsOperators.scala (1)
LateEventCounter(13-27)flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (1)
TiledAvroCodecFn(110-175)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala (1)
FlinkSource(6-21)flink/src/main/scala/ai/chronon/flink/types/FlinkTypes.scala (3)
AvroCodecOutput(89-115)TimestampedTile(57-84)WriteResponse(119-148)flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala (2)
FlinkRowAggProcessFunction(172-260)FlinkRowAggregationFunction(38-169)flink/src/main/scala/ai/chronon/flink/window/KeySelectorBuilder.scala (1)
KeySelectorBuilder(16-43)flink/src/main/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunction.scala (5)
flatMap(65-91)JoinSourceQueryFunction(27-92)JoinSourceQueryFunction(100-179)buildJoinSchema(145-178)buildCatalystUtil(103-140)
flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (2)
api/src/main/scala/ai/chronon/api/Builders.scala (1)
Query(42-67)api/src/main/scala/ai/chronon/api/Extensions.scala (1)
query(396-404)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (19)
- GitHub Check: service_commons_tests
- GitHub Check: flink_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: api_tests
- GitHub Check: online_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_tests
- GitHub Check: aggregator_tests
- GitHub Check: streaming_tests
- GitHub Check: udafs_tests
- GitHub Check: groupby_tests
- GitHub Check: spark_tests
- GitHub Check: kv_store_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: join_tests
- GitHub Check: analyzer_tests
- GitHub Check: stats_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (10)
online/src/main/scala/ai/chronon/online/Api.scala (1)
258-258: LGTM—allows test overrides.Removing
finalenables test scaffolding to overridebuildFetcher(e.g.,JoinTestUtils), improving testability with no production impact.flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (1)
34-44: Constructor refactor looks good.Query-based evaluation is cleaner;
queryFromGroupByhelper supports migration path.flink/src/test/scala/ai/chronon/flink/test/SparkExpressionEvalFnTest.scala (1)
23-30: Test correctly adapted to new API.Query extraction and updated constructor usage align with refactor.
flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala (1)
78-79: Test correctly updated for query-based path.Schema inference now uses query-aware constructor.
flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (1)
132-137: Constructor migration LGTM.Using queryFromGroupBy and passing (encoder, query, groupByName, dataModel) is correct and consistent across the codebase.
flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (3)
35-42: Identity builder change LGTM.Passing groupByName into SourceIdentityDeserializationSchema is clear and avoids heavier GroupBy dependency.
47-53: Projection builder LGTM.Deriving Query via SparkExpressionEval.queryFromGroupBy and threading dataModel/groupByName keeps callers consistent.
55-61: Helpful overload.The explicit (query, groupByName, dataModel) constructor simplifies non-GroupBy call sites.
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala (1)
176-189: Verify mock field consistency.Aggregation uses inputColumn = "price_discounted" but the mocked source doesn’t expose that field. If intentional (never executed), add a comment; otherwise align the field names.
flink/src/test/scala/ai/chronon/flink/test/FlinkJobEventIntegrationTest.scala (1)
157-165: Migration to query-driven eval LGTM.Deriving query via SparkExpressionEval.queryFromGroupBy and updating both SparkExpressionEvalFn and output-schema path are correct.
Also applies to: 174-181
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunctionTest.scala
Show resolved
Hide resolved
| class TestMetadataStore extends MetadataStore(FetchContext(new TestKVStore(), null)) with Serializable { | ||
| override def buildJoinCodec(join: Join, refreshOnFail: Boolean = false): JoinCodec = TestJoinCodec.build(join) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid null in FetchContext.
Passing null for metadataDataset is risky. Use default args.
-class TestMetadataStore extends MetadataStore(FetchContext(new TestKVStore(), null)) with Serializable {
+class TestMetadataStore extends MetadataStore(FetchContext(new TestKVStore())) with Serializable {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| class TestMetadataStore extends MetadataStore(FetchContext(new TestKVStore(), null)) with Serializable { | |
| override def buildJoinCodec(join: Join, refreshOnFail: Boolean = false): JoinCodec = TestJoinCodec.build(join) | |
| } | |
| class TestMetadataStore extends MetadataStore(FetchContext(new TestKVStore())) with Serializable { | |
| override def buildJoinCodec(join: Join, refreshOnFail: Boolean = false): JoinCodec = TestJoinCodec.build(join) | |
| } |
🤖 Prompt for AI Agents
In flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala around
lines 123 to 125, the TestMetadataStore is creating FetchContext with an
explicit null for metadataDataset; replace the explicit null by relying on
FetchContext's default parameter (i.e., call FetchContext(new TestKVStore()) or
omit the metadataDataset argument) so no null is passed.
b8ce89c to
82f864b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (7)
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunctionTest.scala (4)
58-62: Wrong mock signature for fetchJoin; tests will not match production call.Production calls
fetchJoin(Seq(request))(single argument). Update stub:-when(mockFetcher.fetchJoin(any(), any())).thenReturn(joinFuture.future) +when(mockFetcher.fetchJoin(any[Seq[Fetcher.Request]])).thenReturn(joinFuture.future)
101-101: Verification signature also incorrect.-verify(mockFetcher).fetchJoin(any(), any()) +verify(mockFetcher).fetchJoin(any[Seq[Fetcher.Request]])
151-151: Same signature issue in failure test.Apply the same fix here.
194-194: Same signature issue in empty response test.Apply the same fix here.
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)
238-249: Avoid.geton optionalstreamingSource—NPE risk.Use pattern matching to safely handle
Nonecase.Apply this diff:
private def buildFlinkJob(groupByName: String, props: Map[String, String], api: Api, servingInfo: GroupByServingInfoParsed, enableDebug: Boolean = false): BaseFlinkJob = { - // Check if this is a JoinSource GroupBy - if (servingInfo.groupBy.streamingSource.get.isSetJoinSource) { - buildJoinSourceFlinkJob(groupByName, props, api, servingInfo, enableDebug) - } else { - buildGroupByStreamingJob(groupByName, props, api, servingInfo, enableDebug) - } + servingInfo.groupBy.streamingSource match { + case Some(src) if src.isSetJoinSource => + buildJoinSourceFlinkJob(groupByName, props, api, servingInfo, enableDebug) + case Some(_) => + buildGroupByStreamingJob(groupByName, props, api, servingInfo, enableDebug) + case None => + throw new IllegalArgumentException(s"Invalid groupBy: $groupByName. No streaming source configured.") + } }
265-274: ValidateleftSource.queryis non-null before use.Add defensive check to prevent passing null to deserialization schema.
Apply this diff:
// Use left source query for deserialization schema - this is the topic & schema we use to drive // the JoinSource processing val leftSourceQuery = leftSource.query + require(leftSourceQuery != null, + s"JoinSource left source missing query for groupBy: $groupByName, join: ${joinSource.getJoin.getMetaData.getName}") val leftSourceGroupByName = s"left_source_${joinSource.getJoin.getMetaData.getName}"flink/src/main/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunction.scala (1)
84-89: Redact PII from error logs.Logging full
enrichedEvent.fieldsat line 88 risks exposing sensitive data. Log field names or redacted summary only.Apply this diff:
case ex: Exception => // we swallow the event on error errorCounter.inc() queryLatencyHistogram.update(System.currentTimeMillis() - startTime) - logger.error(s"Error applying join source query to event: ${enrichedEvent.fields}", ex) + val keys = if (enableDebug) enrichedEvent.fields.keySet.mkString(",") else "redacted" + logger.error(s"Join source query error; input keys=[$keys]", ex) }
🧹 Nitpick comments (2)
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala (1)
123-123: Use default parameter instead of explicitnull.
FetchContexthas a default formetadataDataset; omit the second argument to avoid null risk.Apply this diff:
-class TestMetadataStore extends MetadataStore(FetchContext(new TestKVStore(), null)) with Serializable { +class TestMetadataStore extends MetadataStore(FetchContext(new TestKVStore())) with Serializable {flink/src/main/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJob.scala (1)
82-83: Unused variables: valueColumns and eventTimeColumn.These are assigned but never referenced in the class.
- val keyColumns: Array[String] = groupByConf.keyColumns.toScala.toArray - val valueColumns: Array[String] = groupByConf.aggregationInputs - val eventTimeColumn = Constants.TimeColumn + val keyColumns: Array[String] = groupByConf.keyColumns.toScala.toArray
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (22)
flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala(0 hunks)flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala(4 hunks)flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala(2 hunks)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala(5 hunks)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala(3 hunks)flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala(2 hunks)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala(7 hunks)flink/src/main/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJob.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunction.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala(2 hunks)flink/src/test/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJobIntegrationTest.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunctionTest.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunctionTest.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/test/FlinkJobEntityIntegrationTest.scala(3 hunks)flink/src/test/scala/ai/chronon/flink/test/FlinkJobEventIntegrationTest.scala(3 hunks)flink/src/test/scala/ai/chronon/flink/test/SparkExpressionEvalFnTest.scala(4 hunks)flink/src/test/scala/ai/chronon/flink/test/deser/CatalystUtilComplexAvroTest.scala(2 hunks)flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala(2 hunks)online/src/main/scala/ai/chronon/online/Api.scala(1 hunks)
💤 Files with no reviewable changes (1)
- flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala
🚧 Files skipped from review as they are similar to previous changes (4)
- flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala
- flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala
- flink/src/test/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJobIntegrationTest.scala
- flink/src/test/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunctionTest.scala
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-10-28T15:00:37.643Z
Learnt from: piyush-zlai
PR: zipline-ai/chronon#1247
File: flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala:99-104
Timestamp: 2025-10-28T15:00:37.643Z
Learning: In flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala, AvroCodecFn and TiledAvroCodecFn do not require explicit imports from ai.chronon.flink.deser as the code compiles and runs fine without them.
Applied to files:
flink/src/test/scala/ai/chronon/flink/test/deser/CatalystUtilComplexAvroTest.scalaflink/src/main/scala/ai/chronon/flink/FlinkJob.scalaflink/src/test/scala/ai/chronon/flink/test/FlinkJobEventIntegrationTest.scalaflink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scalaflink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scalaflink/src/test/scala/ai/chronon/flink/test/FlinkJobEntityIntegrationTest.scala
📚 Learning: 2025-10-28T15:02:05.684Z
Learnt from: piyush-zlai
PR: zipline-ai/chronon#1247
File: flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala:42-47
Timestamp: 2025-10-28T15:02:05.684Z
Learning: In Flink jobs (e.g., SparkExpressionEvalFn.scala), the metric group uses "feature_group" as the tag name, not "group_by", because existing dashboards depend on this naming convention.
Applied to files:
flink/src/test/scala/ai/chronon/flink/test/SparkExpressionEvalFnTest.scalaflink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scalaflink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scalaflink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scalaflink/src/test/scala/ai/chronon/flink/test/FlinkJobEntityIntegrationTest.scala
📚 Learning: 2025-10-28T14:59:45.241Z
Learnt from: piyush-zlai
PR: zipline-ai/chronon#1247
File: flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala:107-114
Timestamp: 2025-10-28T14:59:45.241Z
Learning: In JoinEnrichmentAsyncFunction (flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala), events should be swallowed on join fetch errors rather than passing through original events or failing the stream. This prevents Flink app failures/restarts while avoiding incomplete data reaching downstream join source queries that expect enriched fields.
Applied to files:
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunctionTest.scalaflink/src/main/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJob.scalaflink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scalaflink/src/main/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunction.scala
🧬 Code graph analysis (17)
flink/src/test/scala/ai/chronon/flink/test/deser/CatalystUtilComplexAvroTest.scala (2)
flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (3)
DeserializationSchemaBuilder(31-62)buildSourceProjectionDeserSchema(44-53)buildSourceProjectionDeserSchema(55-61)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (2)
ProjectedEvent(92-92)SourceProjectionDeserializationSchema(94-169)
flink/src/test/scala/ai/chronon/flink/test/SparkExpressionEvalFnTest.scala (2)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (3)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala (8)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala (1)
FlinkSource(6-21)online/src/main/scala/ai/chronon/online/Api.scala (10)
KVStore(36-53)GetRequest(40-43)GetResponse(45-47)PutRequest(48-48)Api(211-288)LoggableResponse(138-142)LoggableResponse(144-184)buildFetcher(258-271)get(106-114)put(74-74)online/src/main/scala/ai/chronon/online/GroupByServingInfoParsed.scala (2)
GroupByServingInfoParsed(31-146)keyCodec(69-69)online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala (1)
FetchContext(9-27)online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (1)
MetadataStore(57-421)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (1)
Response(68-68)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
query(396-404)streamingSource(578-584)api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-100)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (7)
flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (2)
AsyncKVStoreWriter(27-52)AsyncKVStoreWriter(58-158)flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (3)
DeserializationSchemaBuilder(31-62)buildSourceProjectionDeserSchema(44-53)buildSourceProjectionDeserSchema(55-61)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (2)
ProjectedEvent(92-92)projectedSchema(108-114)flink/src/main/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJob.scala (1)
FlinkJoinSourceJob(49-252)flink/src/main/scala/ai/chronon/flink/source/FlinkSourceProvider.scala (2)
FlinkSourceProvider(6-31)build(7-18)flink/src/main/scala/ai/chronon/flink/types/FlinkTypes.scala (1)
WriteResponse(119-148)online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala (4)
DataStream(72-124)TopicInfo(33-33)TopicInfo(34-70)parse(45-69)
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunctionTest.scala (5)
api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-100)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)online/src/main/scala/ai/chronon/online/Api.scala (2)
Api(211-288)buildFetcher(258-271)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (2)
Response(68-68)Request(59-62)flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala (3)
open(39-60)asyncInvoke(62-116)timeout(118-126)
flink/src/main/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJob.scala (10)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala (2)
ResolutionUtils(65-86)getSmallestTailHopMillis(70-85)flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (3)
AsyncKVStoreWriter(27-52)AsyncKVStoreWriter(58-158)withUnorderedWaits(31-49)flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala (2)
FlinkUtils(7-17)getProperty(9-16)flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (1)
TiledAvroCodecFn(110-175)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala (1)
FlinkSource(6-21)flink/src/main/scala/ai/chronon/flink/types/FlinkTypes.scala (3)
AvroCodecOutput(89-115)TimestampedTile(57-84)WriteResponse(119-148)flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala (2)
FlinkRowAggProcessFunction(172-260)FlinkRowAggregationFunction(38-169)flink/src/main/scala/ai/chronon/flink/window/KeySelectorBuilder.scala (1)
KeySelectorBuilder(16-43)flink/src/main/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunction.scala (5)
flatMap(65-90)JoinSourceQueryFunction(27-91)JoinSourceQueryFunction(99-178)buildJoinSchema(144-177)buildCatalystUtil(102-139)
flink/src/test/scala/ai/chronon/flink/test/FlinkJobEventIntegrationTest.scala (4)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
GroupByOps(501-732)query(396-404)flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala (1)
FlinkGroupByStreamingJob(40-219)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (4)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)getOutputSchema(115-118)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)
flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
GroupByOps(501-732)query(396-404)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (4)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)getOutputSchema(115-118)
flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (2)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (3)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)
flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala (11)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala (2)
ResolutionUtils(65-86)getSmallestTailHopMillis(70-85)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)
FlinkJob(47-365)map(70-73)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala (1)
FlinkSource(6-21)flink/src/main/scala/ai/chronon/flink/types/FlinkTypes.scala (3)
AvroCodecOutput(89-115)TimestampedTile(57-84)WriteResponse(119-148)flink/src/main/scala/ai/chronon/flink/window/Trigger.scala (2)
AlwaysFireOnElementTrigger(12-50)BufferedProcessingTimeTrigger(88-177)flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala (2)
FlinkRowAggProcessFunction(172-260)FlinkRowAggregationFunction(38-169)flink/src/main/scala/ai/chronon/flink/window/KeySelectorBuilder.scala (1)
KeySelectorBuilder(16-43)flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala (2)
FlinkUtils(7-17)getProperty(9-16)flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (2)
AvroCodecFn(70-102)TiledAvroCodecFn(110-175)flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (1)
withUnorderedWaits(31-49)
flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
GroupByOps(501-732)query(396-404)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (3)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)
online/src/main/scala/ai/chronon/online/Api.scala (1)
flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala (1)
buildFetcher(60-62)
flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala (6)
flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala (1)
DirectExecutionContext(21-29)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)online/src/main/scala/ai/chronon/online/Api.scala (2)
Api(211-288)buildFetcher(258-271)flink/src/test/scala/ai/chronon/flink/joinrunner/JoinTestUtils.scala (2)
buildFetcher(60-62)fetchJoin(95-120)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (1)
Request(59-62)flink/src/test/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunctionTest.scala (4)
complete(74-77)complete(122-125)complete(165-168)complete(208-211)
flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (2)
api/src/main/scala/ai/chronon/api/Builders.scala (1)
Query(42-67)api/src/main/scala/ai/chronon/api/Extensions.scala (1)
query(396-404)
flink/src/main/scala/ai/chronon/flink/joinrunner/JoinSourceQueryFunction.scala (6)
api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-100)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(92-92)online/src/main/scala/ai/chronon/online/Api.scala (2)
Api(211-288)buildFetcher(258-271)online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (2)
SparkConversions(56-159)toChrononType(61-87)online/src/main/scala/ai/chronon/online/JoinCodec.scala (1)
toMap(51-51)online/src/main/scala/ai/chronon/online/CatalystUtil.scala (1)
getOutputSparkSchema(153-153)
flink/src/test/scala/ai/chronon/flink/test/FlinkJobEntityIntegrationTest.scala (4)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
GroupByOps(501-732)query(396-404)flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala (1)
FlinkGroupByStreamingJob(40-219)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (4)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)getOutputSchema(115-118)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
- GitHub Check: flink_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: online_tests
- GitHub Check: aggregator_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: udafs_tests
- GitHub Check: stats_tests
- GitHub Check: kv_store_tests
- GitHub Check: join_tests
- GitHub Check: streaming_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (25)
flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (2)
5-5: LGTM – refactor aligns with query-based evaluation.The import and query extraction follow the new pattern consistently.
Also applies to: 132-132
135-136: Constructor signature updated correctly.The new parameters (query, groupByName, dataModel) match the refactored API.
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (5)
21-21: Good – using recommended Scala 2.13+ collection converters.
25-26: Constructor refactor looks correct.Documentation and signature align with the query-based evaluation pattern.
Also applies to: 30-38
44-44: Query-based initialization is consistent.Setups extraction and schema naming align with the new pattern.
Also applies to: 50-50, 81-82, 116-117
195-200: Good helper for query extraction.Validation is clear and consistent with usage patterns.
202-232: Refactored transforms and filters logic is correct.DataModel-based branching for EVENTS vs ENTITIES is appropriate.
flink/src/test/scala/ai/chronon/flink/test/deser/CatalystUtilComplexAvroTest.scala (1)
4-4: Test updated to use builder pattern.DeserializationSchemaBuilder usage is consistent with the refactor.
Also applies to: 68-68
online/src/main/scala/ai/chronon/online/Api.scala (1)
258-258: Removingfinalenables test overrides.This change supports the test infrastructure introduced in this PR.
flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala (1)
3-3: Test updated for query-based evaluation.Changes align with the refactored SparkExpressionEval API.
Also applies to: 78-79
flink/src/test/scala/ai/chronon/flink/test/SparkExpressionEvalFnTest.scala (3)
3-3: Tests updated to query-based constructor.Event-based test correctly uses the new signature.
Also applies to: 6-6, 23-30
53-60: Null filter test updated correctly.Follows the same pattern as the first test.
81-88: Entity-based test includes dataModel parameter.Correctly passes
groupBy.dataModelfor entity handling.flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (3)
4-7: Imports support query-based deserialization.New imports enable the builder overloads.
35-42: Identity builder refactored with overload.Existing builder delegates to constructor with groupByName; new overload accepts explicit name.
47-61: Projection builder refactored with overload.Query extraction and explicit parameter overload follow the consistent pattern.
flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala (1)
1-131: LGTM—async enrichment implementation is sound.Error handling correctly swallows events on failure/timeout/not-found to prevent stream restarts and avoid propagating incomplete data. Metrics properly scoped.
flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala (1)
1-219: LGTM—clean job structure.Clear separation of tiled/untiled paths, proper watermarking, windowing, and configurable triggers. No issues found.
flink/src/test/scala/ai/chronon/flink/test/FlinkJobEventIntegrationTest.scala (1)
157-182: LGTM—test updated for query-based evaluation.Properly derives query via
queryFromGroupByand wires through updated constructors. Aligns with broader refactoring.flink/src/test/scala/ai/chronon/flink/test/FlinkJobEntityIntegrationTest.scala (1)
79-105: LGTM—entity test follows same query-based pattern.Consistent with event test refactoring. No issues.
flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
16-169: LGTM—deserialization schema refactored for query-based evaluation.Clean shift from GroupBy-centric to query/groupByName/dataModel parameters. Consistent metrics naming.
flink/src/main/scala/ai/chronon/flink/joinrunner/FlinkJoinSourceJob.scala (4)
86-105: LGTM!Configuration properties with sensible defaults and clear error handling for unsupported trigger types.
152-172: LGTM!Conditional query transform application is correct, using enriched stream directly when no transformations are defined.
174-231: LGTM!Schema consistency is maintained:
postTransformationSchemacorrectly flows into aggregation functions, and error handling inJoinSourceQueryFunctionswallows events rather than passing through mismatched records. Based on learnings.
237-251: LGTM!Helper correctly determines post-transformation schema for both query and no-query cases, ensuring downstream aggregation receives schema-aligned records.
| .name(s"Spark expression eval with timestamps for $groupByName") | ||
| .setParallelism(sourceSparkProjectedStream.getParallelism) | ||
|
|
||
| val enrichmentFunction = new JoinEnrichmentAsyncFunction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not too familiar with how things work under the hood here but should we be running the source projection after the enrichment? Otherwise you'd only be able to project from the the data stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's two levels of source projection -
One on the left src (so on the source object here:
| source = EventSource( |
The second spark eval is after the join enrichment while evaluating the join source to feed into the GB's aggregations (so the upstream_join_src object here:
| upstream_join_source = JoinSource( |
| * @param api API implementation for join fetching and KV store access | ||
| * @param enableDebug If enabled will log additional debug info per processed event | ||
| */ | ||
| class FlinkJoinSourceJob(eventSrc: FlinkSource[ProjectedEvent], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ai.chronon.flink.joinRunner.FlinkJoinSourceJob
flink and join and runner+job appear twice.
wonder if it is better for this to read as below instead.
ai.chronon.flink.ChainedGroupbyJob
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm yeah I was trying to keep the names a bit consistent with the Spark ones and was leaning into the join source runner bit. Can rename - I like ChainedGroupByJob so lets go with that. I'd like to package these though so I'm thinking of ai.chronon.flink.chaining.ChainedGroupByJob (as we implement the model source chaining stuff that would also go in this package).
Let me know if you want to name this differently as well.
| * @param eventSrc Provider of a Flink Datastream[ProjectedEvent] for the given topic and groupBy. The event | ||
| * consists of a field Map as well as metadata columns such as processing start time (to track | ||
| * metrics). The Map contains projected columns from the source data based on projections and filters | ||
| * in the GroupBy. | ||
| * @param inputSchema Schema of the left source (before enrichment) | ||
| * @param sinkFn Async Flink writer function to help us write to the KV store | ||
| * @param groupByServingInfoParsed The GroupBy we are working with | ||
| * @param parallelism Parallelism to use for the Flink job | ||
| * @param api API implementation for join fetching and KV store access | ||
| * @param enableDebug If enabled will log additional debug info per processed event | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://testing.googleblog.com/2024/08/less-is-more-principles-for-simple.html
the blog has other examples on how to write high signal comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I trimmed this down a bit - let me know if this reads better and if not, what specifically you'd like changed.
82f864b to
167c36b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
online/src/main/scala/ai/chronon/online/Api.scala (1)
212-218: Overridable buildFetcher + racy caching = potential double-init.
lazy val fetcheruses a mutablevar fetcherObjwithout synchronization. MakingbuildFetcheroverridable increases risk of non‑idempotent construction under races.Prefer a plain, thread‑safe lazy val and drop the mutable cache:
- lazy val fetcher: Fetcher = { - if (fetcherObj == null) - fetcherObj = buildFetcher() - fetcherObj - } - private var fetcherObj: Fetcher = null + // Thread‑safe lazy construction; ensure setTimeout/setFlagStore are called before first access. + lazy val fetcher: Fetcher = buildFetcher()Alternatively, guard the current pattern with synchronized double‑check. Also document override expectations (idempotent, serializable).
Also applies to: 258-269
flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
48-51: Fix null-handling: use Option(...) not Some(...).Some(null) yields Some and NPEs downstream. Wrap with Option to drop nulls.
- val maybeMutation = Some(deserSchemaProvider.fromBytes(messageBytes)) + val maybeMutation = Option(deserSchemaProvider.fromBytes(messageBytes))
♻️ Duplicate comments (7)
flink/src/test/scala/ai/chronon/flink/chaining/JoinEnrichmentAsyncFunctionTest.scala (1)
60-61: Wrong mock signature for fetchJoin; align with prod single‑arg API.Stub/verify the 1‑arg
fetchJoin(Seq[Fetcher.Request]).-when(mockFetcher.fetchJoin(any(), any())).thenReturn(joinFuture.future) +when(mockFetcher.fetchJoin(any[Seq[Fetcher.Request]])).thenReturn(joinFuture.future) ... -verify(mockFetcher).fetchJoin(any(), any()) +verify(mockFetcher).fetchJoin(any[Seq[Fetcher.Request]])Apply the same change in other tests where
fetchJoinis stubbed.#!/bin/bash # Verify Fetcher.fetchJoin signature(s) rg -nP 'def\s+fetchJoin\s*\(' online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala || rg -nP 'def\s+fetchJoin\s*\(' # Show call sites to confirm single-arg usage rg -nP '\.fetchJoin\s*\(' -C2Also applies to: 101-101, 151-151, 195-195
flink/src/test/scala/ai/chronon/flink/chaining/ChainedGroupByJobIntegrationTest.scala (1)
91-98: Decode binary keys; avoidnew String(keyBytes).Keys are Avro-encoded (
TileKey). Decode to extractuser_idfor validation.Apply proper decoding:
- // Extract and verify user IDs (keys contain dataset name + user_id in binary format) - val responseKeys = outputs.map(r => new String(r.keyBytes)).toSet - val expectedUsers = elements.map(_.user_id).toSet - - // Check that each expected user ID appears somewhere in at least one response key - expectedUsers.foreach { expectedUser => - responseKeys.exists(_.contains(expectedUser)) should be (true) - } + // Extract and verify user IDs by decoding key bytes + import ai.chronon.api.TilingUtils + val (_, gbInfo) = buildFlinkJoinSourceJob(groupBy, elements, testApi) + val decodedUsers: Set[String] = outputs.flatMap { r => + val tileKey = TilingUtils.deserializeTileKey(r.keyBytes) + val keyBytes = tileKey.keyBytes.toScala.toArray.map(_.asInstanceOf[Byte]) + val rec = gbInfo.keyCodec.decode(keyBytes) + Option(rec.get("user_id")).map(_.toString) + }.toSet + val expectedUsers = elements.map(_.user_id).toSet + expectedUsers.foreach(u => decodedUsers should contain (u))flink/src/test/scala/ai/chronon/flink/chaining/JoinTestUtils.scala (1)
123-125: Avoid passing null to FetchContext; use default metadataDataset.Use the default param to prevent NPEs and match prod behavior.
-class TestMetadataStore extends MetadataStore(FetchContext(new TestKVStore(), null)) with Serializable { +class TestMetadataStore extends MetadataStore(FetchContext(new TestKVStore())) with Serializable {flink/src/main/scala/ai/chronon/flink/chaining/JoinSourceQueryFunction.scala (1)
34-39: Redact PII and emit schema-aligned fallback on errors.Avoid logging full fields; on failure, output an event matching outputSchema to keep downstream operators stable.
@transient private var catalystUtil: CatalystUtil = _ + @transient private var expectedFieldNames: Array[String] = _ @@ - val result = JoinSourceQueryFunction.buildCatalystUtil(joinSource, inputSchema, api, enableDebug) + val result = JoinSourceQueryFunction.buildCatalystUtil(joinSource, inputSchema, api, enableDebug) catalystUtil = result.catalystUtil + expectedFieldNames = result.outputSchema.map(_._1).toArray @@ - if (enableDebug) { - logger.info(s"Join source query input: ${enrichedEvent.fields}") - logger.info(s"Join source query results: $queryResults") - } + if (enableDebug) { + logger.info(s"Join source query input keys: ${enrichedEvent.fields.keySet}") + logger.info(s"Join source query results count: ${queryResults.size}") + } @@ - case ex: Exception => - // we swallow the event on error - errorCounter.inc() - queryLatencyHistogram.update(System.currentTimeMillis() - startTime) - logger.error(s"Error applying join source query to event: ${enrichedEvent.fields}", ex) + case ex: Exception => + errorCounter.inc() + queryLatencyHistogram.update(System.currentTimeMillis() - startTime) + val keys = if (enableDebug) enrichedEvent.fields.keySet.mkString(",") else "redacted" + logger.error(s"Join source query error; input keys=[$keys]", ex) + // Emit schema-aligned fallback to avoid downstream schema failures + val filtered = expectedFieldNames.iterator.map(n => n -> enrichedEvent.fields.getOrElse(n, null)).toMap + out.collect(ProjectedEvent(filtered, enrichedEvent.startProcessingTimeMillis))Also applies to: 45-47, 73-76, 84-89
flink/src/main/scala/ai/chronon/flink/chaining/JoinEnrichmentAsyncFunction.scala (1)
3-3: Remove private class import causing cross-package compile error.
DirectExecutionContextis private toai.chronon.flink; importing it inai.chronon.flink.chainingfails at compile time. Replace with inline executor.Line 3:
-import ai.chronon.flink.DirectExecutionContextLines 130–131:
- private val ExecutionContextInstance: ExecutionContext = new DirectExecutionContext + private val ExecutionContextInstance: ExecutionContext = + scala.concurrent.ExecutionContext.fromExecutor((r: Runnable) => r.run())flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)
243-249: Avoid.getonstreamingSource.Handle None explicitly to prevent runtime NPEs.
- if (servingInfo.groupBy.streamingSource.get.isSetJoinSource) { - buildJoinSourceFlinkJob(groupByName, props, api, servingInfo, enableDebug) - } else { - buildGroupByStreamingJob(groupByName, props, api, servingInfo, enableDebug) - } + servingInfo.groupBy.streamingSource match { + case Some(src) if src.isSetJoinSource => + buildJoinSourceFlinkJob(groupByName, props, api, servingInfo, enableDebug) + case Some(_) => + buildGroupByStreamingJob(groupByName, props, api, servingInfo, enableDebug) + case None => + throw new IllegalArgumentException(s"Invalid groupBy: $groupByName. No streaming source configured.") + }
269-279: Require left-source query presence.Fail fast if missing to avoid null passed to deser schema.
+ require(leftSource.isSetQuery && leftSource.getQuery != null, + s"JoinSource left source missing query for groupBy: $groupByName") val leftSourceQuery = leftSource.query
🧹 Nitpick comments (11)
flink/src/test/scala/ai/chronon/flink/chaining/JoinSourceQueryFunctionTest.scala (1)
28-44: Solid coverage; add quick assertions on metrics and multi‑row results.Consider verifying counters (
success/errors) increment and add a case where the SQL emits multiple rows to ensureflatMapforwards all results.Also applies to: 46-123
flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (1)
55-61: API polish: add default for enableDebug to mirror other builders.For consistency with other overloads, give
enableDebuga default:- def buildSourceProjectionDeserSchema(provider: SerDe, - query: Query, - groupByName: String, - dataModel: DataModel, - enableDebug: Boolean): ChrononDeserializationSchema[ProjectedEvent] = { + def buildSourceProjectionDeserSchema(provider: SerDe, + query: Query, + groupByName: String, + dataModel: DataModel, + enableDebug: Boolean = false): ChrononDeserializationSchema[ProjectedEvent] = {flink/src/main/scala/ai/chronon/flink/chaining/JoinEnrichmentAsyncFunction.scala (2)
45-52: Standardize metrics tag: use feature_group, not group_by.Dashboards expect "feature_group". Align for consistency with other operators.
- val group = getRuntimeContext.getMetricGroup - .addGroup("chronon") - .addGroup("group_by", groupByName) - .addGroup("join_enrichment", joinRequestName) + val group = getRuntimeContext.getMetricGroup + .addGroup("chronon") + .addGroup("feature_group", groupByName) + .addGroup("join_enrichment", joinRequestName)Based on learnings
79-86: Clarify success vs not_found metrics.Currently successCounter increments even when responses is empty. Either move successCounter.inc() inside the nonEmpty branch or rename the counter to “requests”.
- joinFetchLatencyHistogram.update(System.currentTimeMillis() - startTime) - successCounter.inc() + joinFetchLatencyHistogram.update(System.currentTimeMillis() - startTime) @@ - if (responses.nonEmpty) { + if (responses.nonEmpty) { + successCounter.inc()Also applies to: 100-104
flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala (1)
208-218: Make buffered trigger interval configurable.Read buffer millis from props/topic to tune latency/throughput per job.
- private def getTrigger(): Trigger[ProjectedEvent, TimeWindow] = { - FlinkUtils + private def getTrigger(): Trigger[ProjectedEvent, TimeWindow] = { + val bufferMillis = + FlinkUtils.getProperty("trigger_buffer_ms", props, topicInfo).map(_.toLong).getOrElse(100L) + FlinkUtils .getProperty("trigger", props, topicInfo) .map { case "always_fire" => new AlwaysFireOnElementTrigger() - case "buffered" => new BufferedProcessingTimeTrigger(100L) + case "buffered" => new BufferedProcessingTimeTrigger(bufferMillis) case t => throw new IllegalArgumentException(s"Unsupported trigger type: $t. Supported: 'always_fire', 'buffered'") } .getOrElse(new AlwaysFireOnElementTrigger()) }flink/src/main/scala/ai/chronon/flink/chaining/JoinSourceQueryFunction.scala (1)
48-56: Standardize metrics tag to feature_group.- val group = getRuntimeContext.getMetricGroup - .addGroup("chronon") - .addGroup("join_source", joinSource.join.metaData.getName) - .addGroup("group_by", groupByName) + val group = getRuntimeContext.getMetricGroup + .addGroup("chronon") + .addGroup("join_source", joinSource.join.metaData.getName) + .addGroup("feature_group", groupByName)Based on learnings
flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (2)
114-116: Avoid shadowing member ‘evaluator’.Rename local to clarify or reuse the member.
- val evaluator = new SparkExpressionEval[Row](sourceEventEncoder, query, groupByName, dataModel) - evaluator.getOutputSchema.fields.map { field => + val tmpEval = new SparkExpressionEval[Row](sourceEventEncoder, query, groupByName, dataModel) + tmpEval.getOutputSchema.fields.map { field =>
177-189: Guard GenericRowHandler for non-GenericRow inputs.Prevent MatchError with a safe else.
val func: Any => Array[Any] = { case x: GenericRowWithSchema => { ... } + case other => + throw new IllegalArgumentException(s"Unsupported row type: ${other.getClass.getName}") }flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (1)
80-83: Standardize accessor style for Query.Mixing
query.setupsandquery.getWheresis inconsistent. Pick one (property or getter) across file.Example:
- val setups = Option(query.setups).map(_.asScala).getOrElse(Seq.empty) + val setups = Option(query.getSetups).map(_.asScala).getOrElse(Seq.empty) ... - val setups = Option(query.setups).map(_.asScala).getOrElse(Seq.empty) + val setups = Option(query.getSetups).map(_.asScala).getOrElse(Seq.empty)Also applies to: 115-118
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (1)
229-236: UIDs: avoid spaces (nit).Flink UIDs with spaces are harder to grep/track. Consider
metrics-sink-$groupByName.flink/src/main/scala/ai/chronon/flink/chaining/ChainedGroupByJob.scala (1)
85-94: Make buffered trigger interval configurable.Hard-coded 100ms; expose
buffered_msvia props/topic to tune latency/throughput.- case "buffered" => new BufferedProcessingTimeTrigger(100L) + case "buffered" => + val ms = FlinkUtils.getProperty("buffered_ms", props, topicInfo).map(_.toLong).getOrElse(100L) + new BufferedProcessingTimeTrigger(ms)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (22)
flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala(0 hunks)flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala(4 hunks)flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala(2 hunks)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala(5 hunks)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala(3 hunks)flink/src/main/scala/ai/chronon/flink/chaining/ChainedGroupByJob.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/chaining/JoinEnrichmentAsyncFunction.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/chaining/JoinSourceQueryFunction.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala(2 hunks)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala(7 hunks)flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala(2 hunks)flink/src/test/scala/ai/chronon/flink/chaining/ChainedGroupByJobIntegrationTest.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/chaining/JoinEnrichmentAsyncFunctionTest.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/chaining/JoinSourceQueryFunctionTest.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/chaining/JoinTestUtils.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/test/FlinkJobEntityIntegrationTest.scala(3 hunks)flink/src/test/scala/ai/chronon/flink/test/FlinkJobEventIntegrationTest.scala(3 hunks)flink/src/test/scala/ai/chronon/flink/test/SparkExpressionEvalFnTest.scala(4 hunks)flink/src/test/scala/ai/chronon/flink/test/deser/CatalystUtilComplexAvroTest.scala(2 hunks)flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala(2 hunks)online/src/main/scala/ai/chronon/online/Api.scala(1 hunks)
💤 Files with no reviewable changes (1)
- flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala
🚧 Files skipped from review as they are similar to previous changes (2)
- flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala
- flink/src/test/scala/ai/chronon/flink/test/FlinkJobEventIntegrationTest.scala
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-10-28T14:59:45.241Z
Learnt from: piyush-zlai
PR: zipline-ai/chronon#1247
File: flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala:107-114
Timestamp: 2025-10-28T14:59:45.241Z
Learning: In JoinEnrichmentAsyncFunction (flink/src/main/scala/ai/chronon/flink/joinrunner/JoinEnrichmentAsyncFunction.scala), events should be swallowed on join fetch errors rather than passing through original events or failing the stream. This prevents Flink app failures/restarts while avoiding incomplete data reaching downstream join source queries that expect enriched fields.
Applied to files:
flink/src/test/scala/ai/chronon/flink/chaining/JoinSourceQueryFunctionTest.scalaflink/src/test/scala/ai/chronon/flink/chaining/JoinEnrichmentAsyncFunctionTest.scalaflink/src/main/scala/ai/chronon/flink/chaining/JoinEnrichmentAsyncFunction.scalaflink/src/main/scala/ai/chronon/flink/chaining/JoinSourceQueryFunction.scalaflink/src/main/scala/ai/chronon/flink/chaining/ChainedGroupByJob.scala
📚 Learning: 2025-10-28T15:02:05.684Z
Learnt from: piyush-zlai
PR: zipline-ai/chronon#1247
File: flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala:42-47
Timestamp: 2025-10-28T15:02:05.684Z
Learning: In Flink jobs (e.g., SparkExpressionEvalFn.scala), the metric group uses "feature_group" as the tag name, not "group_by", because existing dashboards depend on this naming convention.
Applied to files:
flink/src/test/scala/ai/chronon/flink/test/SparkExpressionEvalFnTest.scalaflink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scalaflink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scalaflink/src/test/scala/ai/chronon/flink/test/FlinkJobEntityIntegrationTest.scalaflink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala
📚 Learning: 2025-10-28T15:00:37.643Z
Learnt from: piyush-zlai
PR: zipline-ai/chronon#1247
File: flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala:99-104
Timestamp: 2025-10-28T15:00:37.643Z
Learning: In flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala, AvroCodecFn and TiledAvroCodecFn do not require explicit imports from ai.chronon.flink.deser as the code compiles and runs fine without them.
Applied to files:
flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scalaflink/src/main/scala/ai/chronon/flink/FlinkJob.scalaflink/src/test/scala/ai/chronon/flink/test/FlinkJobEntityIntegrationTest.scalaflink/src/test/scala/ai/chronon/flink/test/deser/CatalystUtilComplexAvroTest.scalaflink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala
🧬 Code graph analysis (18)
flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
GroupByOps(501-732)query(396-404)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (3)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)
flink/src/test/scala/ai/chronon/flink/chaining/JoinSourceQueryFunctionTest.scala (6)
api/src/main/scala/ai/chronon/api/Builders.scala (5)
Builders(28-474)Source(107-141)Query(42-67)MetaData(265-321)joinSource(133-140)api/src/main/scala/ai/chronon/api/DataType.scala (5)
DoubleType(197-197)IntType(193-193)LongType(195-195)StringType(207-207)StructField(218-218)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (3)
ProjectedEvent(95-95)open(32-44)open(122-135)online/src/main/scala/ai/chronon/online/Api.scala (1)
Api(211-288)flink/src/test/scala/ai/chronon/flink/chaining/JoinTestUtils.scala (2)
buildFetcher(60-62)buildJoinCodec(124-124)flink/src/main/scala/ai/chronon/flink/chaining/JoinSourceQueryFunction.scala (2)
open(40-63)flatMap(65-90)
flink/src/test/scala/ai/chronon/flink/test/SparkExpressionEvalFnTest.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (5)
Extensions(39-1350)GroupByOps(501-732)query(396-404)dataModel(379-384)dataModel(536-543)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (3)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)
flink/src/test/scala/ai/chronon/flink/chaining/JoinEnrichmentAsyncFunctionTest.scala (4)
api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-100)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (3)
ProjectedEvent(95-95)open(32-44)open(122-135)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (2)
Response(68-68)Request(59-62)flink/src/main/scala/ai/chronon/flink/chaining/JoinEnrichmentAsyncFunction.scala (3)
open(39-60)asyncInvoke(62-116)timeout(118-126)
flink/src/test/scala/ai/chronon/flink/chaining/ChainedGroupByJobIntegrationTest.scala (5)
flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala (2)
FlinkTestUtils(109-215)MockAsyncKVStoreWriter(87-96)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (1)
SparkConversions(56-159)flink/src/test/scala/ai/chronon/flink/chaining/JoinTestUtils.scala (4)
JoinTestUtils(156-223)build(132-153)buildJoinSourceTerminalGroupBy(163-222)get(66-68)flink/src/main/scala/ai/chronon/flink/chaining/ChainedGroupByJob.scala (1)
runTiledGroupByJob(105-220)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (3)
query(396-404)streamingSource(578-584)cleanName(158-158)api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-100)
flink/src/main/scala/ai/chronon/flink/chaining/JoinEnrichmentAsyncFunction.scala (4)
flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala (1)
DirectExecutionContext(21-29)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(95-95)online/src/main/scala/ai/chronon/online/Api.scala (1)
Api(211-288)online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala (1)
Request(59-62)
flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (2)
api/src/main/scala/ai/chronon/api/Extensions.scala (1)
query(396-404)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (3)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)
flink/src/test/scala/ai/chronon/flink/chaining/JoinTestUtils.scala (7)
api/src/main/scala/ai/chronon/api/DataType.scala (3)
DoubleType(197-197)StringType(207-207)StructField(218-218)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(95-95)flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala (1)
FlinkSource(6-21)online/src/main/scala/ai/chronon/online/fetcher/FetchContext.scala (1)
FetchContext(9-27)online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala (1)
MetadataStore(57-421)online/src/main/scala/ai/chronon/online/serde/AvroConversions.scala (1)
AvroConversions(33-281)
flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (2)
api/src/main/scala/ai/chronon/api/Builders.scala (1)
Query(42-67)api/src/main/scala/ai/chronon/api/Extensions.scala (1)
query(396-404)
flink/src/main/scala/ai/chronon/flink/chaining/JoinSourceQueryFunction.scala (6)
api/src/main/scala/ai/chronon/api/Constants.scala (1)
Constants(23-100)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(95-95)online/src/main/scala/ai/chronon/online/Api.scala (1)
Api(211-288)online/src/main/scala/ai/chronon/online/serde/SparkConversions.scala (2)
SparkConversions(56-159)toChrononType(61-87)online/src/main/scala/ai/chronon/online/JoinCodec.scala (1)
toMap(51-51)online/src/main/scala/ai/chronon/online/CatalystUtil.scala (1)
getOutputSparkSchema(153-153)
flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (2)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (3)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (6)
flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (2)
AsyncKVStoreWriter(27-52)AsyncKVStoreWriter(58-158)flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala (2)
FlinkGroupByStreamingJob(40-219)runTiledGroupByJob(127-206)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (2)
ProjectedEvent(95-95)projectedSchema(114-120)flink/src/main/scala/ai/chronon/flink/chaining/ChainedGroupByJob.scala (2)
ChainedGroupByJob(38-241)runTiledGroupByJob(105-220)flink/src/main/scala/ai/chronon/flink/source/FlinkSourceProvider.scala (2)
FlinkSourceProvider(6-31)build(7-18)online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala (4)
DataStream(72-124)TopicInfo(33-33)TopicInfo(34-70)parse(45-69)
flink/src/test/scala/ai/chronon/flink/test/FlinkJobEntityIntegrationTest.scala (4)
api/src/main/scala/ai/chronon/api/Extensions.scala (2)
GroupByOps(501-732)query(396-404)flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala (1)
FlinkGroupByStreamingJob(40-219)flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (4)
SparkExpressionEval(34-192)SparkExpressionEval(194-243)queryFromGroupBy(195-200)getOutputSchema(115-118)flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
SparkExpressionEvalFn(22-73)
flink/src/main/scala/ai/chronon/flink/chaining/ChainedGroupByJob.scala (9)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala (2)
ResolutionUtils(65-86)getSmallestTailHopMillis(70-85)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (3)
BaseFlinkJob(33-45)FlinkJob(47-371)map(70-73)flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (1)
TiledAvroCodecFn(110-175)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(95-95)flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala (1)
FlinkSource(6-21)flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala (2)
FlinkRowAggProcessFunction(172-260)FlinkRowAggregationFunction(38-169)flink/src/main/scala/ai/chronon/flink/window/KeySelectorBuilder.scala (1)
KeySelectorBuilder(16-43)flink/src/main/scala/ai/chronon/flink/chaining/JoinSourceQueryFunction.scala (5)
flatMap(65-90)JoinSourceQueryFunction(27-91)JoinSourceQueryFunction(99-178)buildJoinSchema(144-177)buildCatalystUtil(102-139)flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (1)
withUnorderedWaits(31-49)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (2)
api/src/main/scala/ai/chronon/api/Builders.scala (1)
Query(42-67)api/src/main/scala/ai/chronon/api/Extensions.scala (3)
query(396-404)dataModel(379-384)dataModel(536-543)
flink/src/test/scala/ai/chronon/flink/test/deser/CatalystUtilComplexAvroTest.scala (2)
flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (3)
DeserializationSchemaBuilder(31-62)buildSourceProjectionDeserSchema(44-53)buildSourceProjectionDeserSchema(55-61)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (2)
ProjectedEvent(95-95)SourceProjectionDeserializationSchema(100-175)
flink/src/main/scala/ai/chronon/flink/FlinkGroupByStreamingJob.scala (11)
aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala (2)
ResolutionUtils(65-86)getSmallestTailHopMillis(70-85)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)
FlinkJob(47-371)map(70-73)flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala (1)
ProjectedEvent(95-95)flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala (1)
FlinkSource(6-21)flink/src/main/scala/ai/chronon/flink/types/FlinkTypes.scala (3)
AvroCodecOutput(89-115)TimestampedTile(57-84)WriteResponse(119-148)flink/src/main/scala/ai/chronon/flink/window/Trigger.scala (2)
AlwaysFireOnElementTrigger(12-50)BufferedProcessingTimeTrigger(88-177)flink/src/main/scala/ai/chronon/flink/window/FlinkRowAggregators.scala (2)
FlinkRowAggProcessFunction(172-260)FlinkRowAggregationFunction(38-169)flink/src/main/scala/ai/chronon/flink/window/KeySelectorBuilder.scala (1)
KeySelectorBuilder(16-43)flink/src/main/scala/ai/chronon/flink/FlinkUtils.scala (2)
FlinkUtils(7-17)getProperty(9-16)flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (2)
AvroCodecFn(70-102)TiledAvroCodecFn(110-175)flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala (1)
withUnorderedWaits(31-49)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: stats_tests
- GitHub Check: join_tests
- GitHub Check: udafs_tests
- GitHub Check: streaming_tests
- GitHub Check: service_tests
- GitHub Check: aggregator_tests
- GitHub Check: online_tests
- GitHub Check: api_tests
- GitHub Check: flink_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_commons_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (21)
flink/src/test/scala/ai/chronon/flink/chaining/JoinSourceQueryFunctionTest.scala (1)
186-229: Schema wiring check is good.Verifying
JoinCodec.valueSchemaaccess catches regressions in join schema integration.flink/src/test/scala/ai/chronon/flink/chaining/JoinEnrichmentAsyncFunctionTest.scala (1)
104-140: Error/timeout swallowing behavior matches design.Tests assert empty outputs on timeout/failure/empty joins—exactly as intended.
Based on learnings
Also applies to: 142-183, 185-226
flink/src/main/scala/ai/chronon/flink/deser/ChrononDeserializationSchema.scala (1)
44-53: Good: derive query from GroupBy here to keep callsites simple.Using
SparkExpressionEval.queryFromGroupBycentralizes validation (time column, etc.).flink/src/test/scala/ai/chronon/flink/test/deser/CatalystUtilComplexAvroTest.scala (1)
68-76: Builder usage LGTM.Switching to
DeserializationSchemaBuilder.buildSourceProjectionDeserSchemaaligns with new API and keeps tests resilient to constructor changes.flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (4)
3-3: LGTM!Import updated to support query-based constructor.
13-26: LGTM!Constructor refactored to accept
QueryandgroupByNameinstead ofGroupBy, aligning with the query-based evaluation pattern.
39-40: LGTM!Evaluator instantiation updated to match new constructor signature.
42-46: LGTM!Metric group correctly uses
groupByNamevariable. The "feature_group" tag name is maintained for dashboard compatibility.Based on learnings.
flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala (2)
3-3: LGTM!Import enables GroupBy extension methods used in test.
78-79: LGTM!Query derivation and constructor usage correctly updated to the new pattern.
flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (2)
5-5: LGTM!Import added to access
queryFromGroupByhelper.
132-136: LGTM!Query derivation and constructor parameters correctly updated for the validation job.
flink/src/test/scala/ai/chronon/flink/test/SparkExpressionEvalFnTest.scala (4)
3-6: LGTM!Imports updated to support query derivation and GroupBy extensions.
23-30: LGTM!First test correctly updated to derive query and use new constructor.
53-61: LGTM!Second test updated consistently with the new pattern.
81-89: LGTM!Entity-based test correctly includes
dataModelparameter.flink/src/test/scala/ai/chronon/flink/chaining/ChainedGroupByJobIntegrationTest.scala (1)
101-151: LGTM!Helper method correctly constructs
SparkExpressionEvalFnwith query and groupByName. DefaultDataModel.EVENTSis appropriate for the join's left source.flink/src/main/scala/ai/chronon/flink/chaining/JoinEnrichmentAsyncFunction.scala (1)
106-115: Good: swallows on join fetch failure to protect downstream.Based on learnings
flink/src/test/scala/ai/chronon/flink/test/FlinkJobEntityIntegrationTest.scala (1)
80-105: Refactor to FlinkGroupByStreamingJob looks good.Construction, schema derivation, and execution wiring are correct.
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (1)
34-39: Constructor/API migration LGTM.Query/DataModel-driven eval, transforms/filters, and schema propagation look sound.
Please confirm Query exposes ‘setups/selects/wheres’ with the property-style accessors used here across the codebase.
Also applies to: 44-45, 80-83, 115-118, 194-204
flink/src/main/scala/ai/chronon/flink/chaining/ChainedGroupByJob.scala (1)
32-47: End-to-end chaining job LGTM.Clear validations, async enrichment, optional query transform, tiling, and KV write path look good.
Please confirm JoinSourceQueryFunction drops records on errors (no passthrough of unenriched events) to keep schemas aligned with
postTransformationSchema.Also applies to: 96-121, 181-197, 206-220
| // Apply join source query transformations only if there are transformations to apply | ||
| val processedStream = | ||
| if (joinSource.query != null && joinSource.query.selects != null && !joinSource.query.selects.isEmpty) { | ||
| logger.info("Applying join source query transformations") | ||
| val queryFunction = new JoinSourceQueryFunction( | ||
| joinSource, | ||
| inputSchema, | ||
| groupByName, | ||
| api, | ||
| enableDebug | ||
| ) | ||
|
|
||
| enrichedStream | ||
| .flatMap(queryFunction) | ||
| .uid(s"join-source-query-$groupByName") | ||
| .name(s"Join Source Query for $groupByName") | ||
| .setParallelism(sourceSparkProjectedStream.getParallelism) | ||
| } else { | ||
| logger.info("No join source query transformations to apply - using enriched stream directly") | ||
| enrichedStream | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard query presence consistently.
You already gate on non-empty selects; also check joinSource.isSetQuery to avoid null deref in rare configs.
- if (joinSource.query != null && joinSource.query.selects != null && !joinSource.query.selects.isEmpty) {
+ if (joinSource.isSetQuery &&
+ joinSource.query != null &&
+ joinSource.query.selects != null &&
+ !joinSource.query.selects.isEmpty) {🤖 Prompt for AI Agents
In flink/src/main/scala/ai/chronon/flink/chaining/ChainedGroupByJob.scala around
lines 141 to 161, the current guard checks joinSource.query and
joinSource.query.selects but can still NPE if query isn't set; update the
condition to first verify joinSource.isSetQuery (or equivalent API) before
inspecting query.selects, i.e., combine isSetQuery && query != null && selects
!= null && !selects.isEmpty, and keep the existing branch behavior (create
JoinSourceQueryFunction and apply flatMap) only when that full guard passes;
otherwise log the "no transformations" message and use enrichedStream.
| if (enableDebug) { | ||
| logger.info(s"Join request: ${request.keys}, ts: ${request.atMillis}") | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid logging PII; redact event contents (even on debug/warn).
Don’t log full keys/fields. Log field names or counts only; redact values.
- if (enableDebug) {
- logger.info(s"Join request: ${request.keys}, ts: ${request.atMillis}")
- }
+ if (enableDebug) {
+ logger.info(s"Join request keys: ${request.keys.keySet}, ts: ${request.atMillis}")
+ }
@@
- logger.warn(s"Join enrichment timeout for event: ${event.fields}")
+ val keys = if (enableDebug) event.fields.keySet.mkString(",") else "redacted"
+ logger.warn(s"Join enrichment timeout; input keys=[$keys]")Based on learnings
Also applies to: 118-125
🤖 Prompt for AI Agents
In
flink/src/main/scala/ai/chronon/flink/chaining/JoinEnrichmentAsyncFunction.scala
around lines 68-71 (and likewise lines 118-125), the code logs full request
keys/fields which may contain PII; change the logging to avoid printing
values—log only safe metadata such as the number of keys, field names, or a
redacted placeholder. Replace logger.info(s"Join request: ${request.keys}, ts:
${request.atMillis}") with a message that reports request.keys.size or a
comma-separated list of key names (not values) or a constant "<redacted>" and
keep the timestamp; apply the same redaction approach to the log statements at
118-125 so no raw field values are emitted.
| .addGroup("chronon") | ||
| .addGroup("group_by", groupBy.getMetaData.getName) | ||
| .addGroup("group_by", groupByName) | ||
| deserializationErrorCounter = metricsGroup.counter("deserialization_errors") | ||
| deserTimeHistogram = metricsGroup.histogram( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unify metrics tag to "feature_group".
Dashboards expect "feature_group", not "group_by". Base and subclasses should match.
- .addGroup("group_by", groupByName)
+ .addGroup("feature_group", groupByName)Keep SourceProjectionDeserializationSchema using "feature_group" as it already does. Based on learnings.
Also applies to: 124-128
🤖 Prompt for AI Agents
In flink/src/main/scala/ai/chronon/flink/deser/DeserializationSchema.scala
around lines 35-38 (and similarly around 124-128), the metrics tag "group_by"
should be renamed to "feature_group" to match dashboards and
SourceProjectionDeserializationSchema; replace any
metricsGroup.addGroup("group_by", groupByName) (and any counters/histogram tags
referencing "group_by") with metricsGroup.addGroup("feature_group", groupByName)
so base class and subclasses use the same tag.
| val logger = LoggerFactory.getLogger(getClass) | ||
|
|
||
| val topicInfo = TopicInfo.parse(servingInfo.groupBy.streamingSource.get.topic) | ||
| val schemaProvider = FlinkSerDeProvider.build(topicInfo) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Also avoid .get here.
Same Optional streamingSource pattern; reuse the guarded value from buildFlinkJob or pattern-match again.
Confirm no callsites reach this with streamingSource = None.
---
🏁 Script executed:
```shell
#!/bin/bash
# Find the function containing lines 320-324 and surrounding context
rg -n "LoggerFactory.getLogger" flink/src/main/scala/ai/chronon/flink/FlinkJob.scala -A 5 -B 10
Length of output: 1983
🏁 Script executed:
#!/bin/bash
# Search for buildFlinkJob function definition
rg -n "def buildFlinkJob" flink/src/main/scala/ai/chronon/flink/FlinkJob.scala -A 30Length of output: 1544
🏁 Script executed:
#!/bin/bash
# Search for streamingSource guards and None checks
rg -n "streamingSource" flink/src/main/scala/ai/chronon/flink/FlinkJob.scala -B 3 -A 3Length of output: 1196
---
🏁 Script executed:
```shell
#!/bin/bash
# Find all callers of buildFlinkJob
rg -n "buildFlinkJob\(" flink/src/main/scala/ai/chronon/flink/FlinkJob.scala -B 5 -A 2
Length of output: 574
🏁 Script executed:
#!/bin/bash
# Search for where these functions are called from (look for the main entry point)
rg -n "buildGroupByStreamingJob\|buildJoinSourceFlinkJob" flink/src/main/scala/ai/chronon/flink/FlinkJob.scala -B 2 -A 2Length of output: 0
🏁 Script executed:
#!/bin/bash
# Check if streamingSource is validated/checked before buildFlinkJob usage
rg -n "streamingSource\|isDefined\|isEmpty" flink/src/main/scala/ai/chronon/flink/FlinkJob.scala | head -30Length of output: 0
Unguarded .get on streamingSource will crash if None—add guard before buildFlinkJob call.
At line 183, buildFlinkJob is invoked with no check on streamingSource. Line 244 immediately calls .get on it without validation. If getGroupByServingInfo returns a servingInfo where streamingSource is None, the code throws.
Guard at the call site (line 183) with servingInfo.groupBy.streamingSource.isDefined, or restructure to extract and pass streamingSource as a parameter through buildFlinkJob, buildJoinSourceFlinkJob, and buildGroupByStreamingJob to avoid repeated .get calls at lines 244, 258, and 322.
🤖 Prompt for AI Agents
In flink/src/main/scala/ai/chronon/flink/FlinkJob.scala around lines 320 to 324,
the code calls servingInfo.groupBy.streamingSource.get without checking for None
which will throw if streamingSource is absent; update the call site where
buildFlinkJob is invoked (around line 183) to only call it when
servingInfo.groupBy.streamingSource.isDefined, or refactor buildFlinkJob,
buildJoinSourceFlinkJob, and buildGroupByStreamingJob to accept streamingSource
as an explicit parameter (extract it once with pattern matching or .toOption)
and use that parameter instead of repeated .get calls at lines 244, 258, and 322
so all code paths safely handle missing streamingSource.
Summary
This PR ports over the existing join chaining implementation in JoinSourceRunner to Flink. Few things to call out:
Testing
Kicked off the chained join job:
Confirm the job is up and processing events and we can curl to get the enriched features:
Checklist
Summary by CodeRabbit
New Features
Refactor