Skip to content

Conversation

@piyush-zlai
Copy link
Contributor

@piyush-zlai piyush-zlai commented Mar 26, 2025

Summary

We've been seeing our listing.actions Flink apps are often treading water keeping up with the load. We squashed down some of the inefficiencies due to CU (#534) but we still are not able to keep up with the chosen parallelism of 12. The flamegraphs do show that we spend a decent chunk of time on kryo ser/deser in the KafkaRecordEmitter path. This is expected as the beacon events are fairly wide (~400 fields). As we immediately run SparkExprEval immediately after reading avro -> converting to Row, we decided to push the Spark expr eval into the source operator (DeserializationSchema). These changes improve perf significantly -

Screenshot 2025-03-26 at 7 54 18 PM

Changes unfortunately ended up being fairly intrusive as we do need to support being able to do vanilla Avro deser to support the validation flink job as well as deduplicating the SparkExpressionEval code between the new source projection operator and the old rich map function (used in validation flink job).

  • Dropped the untiled Flink job
  • Dropped the old testing mock source Flink job (was used during early dev, we don't need it going forward)

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

  • New Features

    • Introduced enhanced source capabilities with projected schemas.
    • Added a dedicated evaluator for processing Spark SQL expressions.
    • Provided new utilities for Avro record creation and deserialization testing.
    • Added new test classes for validating Avro source identity and projection deserialization.
  • Refactor

    • Streamlined job configuration by updating input data formats and removing legacy processing paths.
    • Refined schema management for improved extensibility and simplified deserialization.
    • Consolidated evaluation logic into a unified component for better performance.
  • Tests

    • Updated integration tests for unique event handling and new processing pipelines.
    • Enhanced test utilities to match the updated data transformation logic.
    • Removed obsolete test classes and replaced them with updated specifications for Avro deserialization.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 26, 2025

Walkthrough

Several core components have been refactored. The generic type parameter was removed from the TiledAvroCodecFn and FlinkJob classes, and the Flink job now accepts a Map[String, Any] stream with an updated input schema. The Kafka source is now generalized via a new base class, and schema handling was overhauled with a generic SchemaProvider and its specialized variants. A new SparkExpressionEval class replaces legacy Catalyst code for SQL expression evaluation. Test utilities have been revised, with some removed and new tests added for Avro deserialization features.

Changes

File(s) Change Summary
flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala Removed generic type from TiledAvroCodecFn case class.
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala
Removed generic type; updated constructor parameters and stream processing; removed unused methods.
flink/src/main/scala/ai/chronon/flink/FlinkSource.scala
flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala
Removed deprecated comment; changed to use Map[String, Any] and integrated SparkExpressionEvalFn transformation.
flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala Introduced BaseKafkaFlinkSource[T] with generic type; added ProjectedKafkaFlinkSource; updated deserialization method signatures.
flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala
flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala
flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala
Made SchemaProvider generic; replaced buildEncoderAndDeserSchema with buildDeserializationSchema; split schema provider into Base, Identity and Projected variants; updated usage in the validation job.
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala
flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala
Added new SparkExpressionEval class; refactored SparkExpressionEvalFn to delegate to the new evaluator and removed CatalystUtil usage.
flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala Replaced legacy Avro deserialization with BaseAvroDeserializationSchema and added two subclasses for identity and projection deserialization.
Test Files:
TestFlinkJob.scala
SchemaRegistrySchemaProviderSpec.scala
ValidationFlinkJobIntegrationTest.scala
AvroDeSerTestUtils.scala
AvroDeSerializationSupportSpec.scala (deleted)
AvroSourceIdentityDeSerializationSupportSpec.scala
AvroSourceProjectionDeSerializationSupportSpec.scala
Removed obsolete test utilities; updated test data types; added new test specs for Avro deserialization and refactored mock schema provider.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant FlinkJob
    participant KafkaSource as KafkaFlinkSource
    participant Eval as SparkExpressionEval
    participant Schema as SchemaProvider
    participant Sink

    Client->>FlinkJob: Instantiate job with new Map[String, Any] source
    FlinkJob->>KafkaSource: Request DataStream
    KafkaSource-->>FlinkJob: Return DataStream[Map[String, Any]]
    FlinkJob->>Eval: Evaluate expressions on events
    Eval-->>FlinkJob: Return evaluated event
    FlinkJob->>Schema: Build deserialization schema
    Schema-->>FlinkJob: Return schema and encoder
    FlinkJob->>Sink: Emit processed events
Loading
sequenceDiagram
    participant Job
    participant SchemaReg as SchemaRegistry
    Job->>SchemaReg: Call buildDeserializationSchema(groupBy)
    SchemaReg-->>Job: Return ParsedSchema with encoder
Loading

Possibly related PRs

Suggested reviewers

  • nikhil-zlai
  • david-zlai
  • tchow-zlai

Poem

In the code’s dance we streamline our art,
Generic types shed to give a fresh start.
Flink jobs now process with a brand-new flow,
Kafka, Avro, and schemas in a single show.
Evaluators and tests updated with cheer 🚀,
A refactored codebase shining clear.
Cheers to progress, simple and dear!

Warning

Review ran into problems

🔥 Problems

GitHub Actions and Pipeline Checks: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai plan to trigger planning for file edits and PR creation.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala (2)

113-113: Suggest coverage expansion.
It might be useful to assert the record structure in more detail (e.g., field-level validations).


126-126: Clarify the max-by logic.
Consider adding a brief comment explaining the rationale for picking the max timestamp element.

flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerTestUtils.scala (1)

21-59: Parameterizing the record.
All fields appear consistent with typical user profiles. If reusability expands, consider factoring out nested record creation.

flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala (1)

6-6: Potential for future expansions.
AbstractDeserializationSchema allows custom handling of big data scenarios.

flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)

13-13: Update doc references.
Consider removing mention of CatalystUtil if it's now fully abstracted by SparkExpressionEval.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 4687bc4 and baa3edb.

📒 Files selected for processing (19)
  • flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (6 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkSource.scala (0 hunks)
  • flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (3 hunks)
  • flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala (4 hunks)
  • flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (3 hunks)
  • flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (0 hunks)
  • flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (3 hunks)
  • flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (4 hunks)
  • flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala (4 hunks)
  • flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala (3 hunks)
  • flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala (3 hunks)
  • flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala (2 hunks)
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerTestUtils.scala (1 hunks)
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala (0 hunks)
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroSourceIdentityDeSerializationSupportSpec.scala (1 hunks)
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroSourceProjectionDeSerializationSupportSpec.scala (1 hunks)
💤 Files with no reviewable changes (3)
  • flink/src/main/scala/ai/chronon/flink/FlinkSource.scala
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala
  • flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala
🧰 Additional context used
🧬 Code Definitions (9)
flink/src/test/scala/org/apache/spark/sql/avro/AvroSourceIdentityDeSerializationSupportSpec.scala (1)
flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerTestUtils.scala (3)
  • AvroObjectCreator (21-93)
  • makeMetadataOnlyGroupBy (61-70)
  • createDummyRecordBytes (22-59)
flink/src/test/scala/org/apache/spark/sql/avro/AvroSourceProjectionDeSerializationSupportSpec.scala (4)
flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerTestUtils.scala (3)
  • AvroObjectCreator (21-93)
  • makeGroupBy (72-93)
  • createDummyRecordBytes (22-59)
flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala (1)
  • makeGroupBy (131-164)
flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala (1)
  • makeGroupBy (70-91)
flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (8)
  • open (29-36)
  • open (80-83)
  • open (117-130)
  • deserialize (85-96)
  • deserialize (132-135)
  • deserialize (137-140)
  • projectedSchema (109-115)
  • sourceEventEncoder (27-27)
flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala (3)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)
  • FlinkJob (60-171)
  • FlinkJob (173-348)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (2)
  • SparkExpressionEval (37-186)
  • getOutputSchema (111-113)
flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala (2)
  • E2ETestEvent (39-39)
  • FlinkTestUtils (91-164)
flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala (2)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (2)
  • SparkExpressionEval (37-186)
  • getOutputSchema (111-113)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
  • SparkExpressionEvalFn (22-67)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (5)
flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala (5)
  • flink (37-48)
  • SourceIdentitySchemaRegistrySchemaProvider (72-85)
  • SourceIdentitySchemaRegistrySchemaProvider (106-111)
  • buildDeserializationSchema (75-84)
  • buildDeserializationSchema (92-103)
flink/src/main/scala/ai/chronon/flink/types/FlinkTypes.scala (3)
  • AvroCodecOutput (75-95)
  • TimestampedTile (50-70)
  • WriteResponse (99-123)
flink/src/main/scala/ai/chronon/flink/window/KeySelectorBuilder.scala (1)
  • KeySelectorBuilder (15-41)
flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1)
  • getDataStream (38-55)
flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (2)
  • flatMap (105-117)
  • flatMap (143-155)
flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala (3)
online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala (2)
  • TopicInfo (33-33)
  • TopicInfo (34-53)
flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala (3)
  • flink (37-48)
  • buildDeserializationSchema (75-84)
  • buildDeserializationSchema (92-103)
flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (4)
  • sourceProjectionEnabled (76-76)
  • sourceProjectionEnabled (107-107)
  • sourceEventEncoder (27-27)
  • projectedSchema (109-115)
flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala (2)
online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala (3)
  • TopicInfo (33-33)
  • TopicInfo (34-53)
  • parse (37-52)
flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (2)
  • AvroSourceIdentityDeserializationSchema (73-97)
  • AvroSourceProjectionDeserializationSchema (99-155)
flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (2)
flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala (1)
  • ChrononDeserializationSchema (29-33)
flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerTestUtils.scala (1)
  • getMetricGroup (15-15)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (1)
flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (4)
  • initialize (65-84)
  • evaluateExpressions (95-109)
  • runSparkSQLBulk (126-172)
  • runCatalystBulk (177-185)
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: non_spark_tests
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (103)
flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (1)

131-131: Type parameter removal looks good

Removing generic type parameter from TiledAvroCodecFn aligns with PR goals to simplify the architecture.

flink/src/test/scala/org/apache/spark/sql/avro/AvroSourceIdentityDeSerializationSupportSpec.scala (1)

1-54: Comprehensive test coverage for identity deserialization

Tests cover key scenarios: standard deserialization, schema ID handling, and error cases.

flink/src/test/scala/org/apache/spark/sql/avro/AvroSourceProjectionDeSerializationSupportSpec.scala (3)

14-54: Good test for projection functionality

Tests data inclusion, schema validation, and type checking.


56-74: Effective filtering tests

Confirms filtering works as expected.


76-97: Error handling test is solid

Verifies graceful handling of corrupted Avro data.

flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala (2)

5-5: Import update is correct

Added SparkExpressionEval import while keeping SparkExpressionEvalFn for backward compatibility.


74-74: Class replacement aligns with PR goals

Using SparkExpressionEval directly instead of SparkExpressionEvalFn removes redundancy.

flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala (9)

3-5: Imports look fine.


17-17: Mock provider extension is correct.


26-27: Avro/Proto providers introduced cleanly.


29-29: Instance setup is straightforward.


33-36: Schema test is clear.


44-46: Simple validation of Avro schema.


54-56: Subject injection handled well.


63-66: Proto error scenario verified.


70-91: Helper method is concise.

flink/src/main/scala/ai/chronon/flink/SparkExpressionEval.scala (10)

1-5: Package & imports create a solid foundation.


26-36: Doc is clear and purposeful.


37-64: Fields, queries, and metrics appear consistent.


65-84: Initialization of histograms and counters is good.


86-94: performSql logic is succinct.


95-109: evaluateExpressions handles exceptions nicely.


111-113: Output schema method is straightforward.


115-120: Closing resources is properly handled.


121-172: Bulk Spark SQL flow is well-structured.


174-186: Catalyst bulk method coordinates results smoothly.

flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala (3)

14-14: Import for SparkExpressionEvalFn is fine.


50-51: Class now outputs a Map, nice approach.


61-65: flatMap usage ensures Spark eval integration.

flink/src/test/scala/ai/chronon/flink/test/FlinkJobIntegrationTest.scala (6)

72-76: Good move toward deterministic validation.
Using distinct IDs simplifies debugging and validation.


80-81: Validate class references.
Ensure SparkExpressionEvalFn usage aligns with SparkExpressionEval changes to avoid mismatches in transformations.


85-89: Schema extraction looks consistent.
Fetching output schema from SparkExpressionEval for downstream usage is correct and clear.


94-94: Parallelism check.
Verify the parallelism setting (2) meets throughput requirements for larger data volumes.


122-122: Key extraction is straightforward.
The composition of keys from the record decode is correct.


128-133: Final IR validation is correct.
Checking IR values matches the expected aggregation. Good job.

flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerTestUtils.scala (5)

1-2: Namespace clarity.
Keeping a dedicated package for Avro test utilities is clean.


3-10: Import usage is standard.
All required classes and conversions are neatly grouped.


12-19: Lightweight context.
DummyInitializationContext for testing is straightforward. No concurrency concerns.


61-70: Minimal groupBy creation.
Good to keep a skeleton GroupBy for various test scenarios.


72-93: Balanced approach to building GroupBy.
The method is flexible with optional filters. Good for test variations.

flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala (5)

3-4: Imports look relevant.
The api.GroupBy usage is aligned with new changes.


10-15: Concise docs.
Clear explanation of SchemaProvider responsibilities.


18-20: Generic approach is good.
abstract class SchemaProvider[T] fosters reusability for multiple data types.


22-33: Clean interface.
ChrononDeserializationSchema now explicitly states encoder & projection flags.


35-39: Projection trait.
Enabling pushdown is straightforward. Good structure.

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (14)

11-11: Added import looks fine.


16-21: Imports are aligned with new window classes.


52-55: Doc updates are consistent with new Map-based input.


60-64: Constructor signature matches Map-based ingestion.


102-102: Clear naming for sourceSparkProjectedStream.


108-108: Watermark assignment is straightforward.


112-112: Matching parallelism to source is logical.


149-149: Parallelism consistency is good.


157-157: Late-events side output remains aligned.


160-160: TiledAvroCodecFn call is correct.


163-163: Parallelism matches source.


260-260: maybeServingInfo usage is appropriate.


262-300: Refactor for ProjectedSchemaRegistrySchemaProvider is coherent.


330-330: Job execution call is unchanged in logic.

flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (4)

14-17: Base class provides good generic extension.


39-41: Generic builder usage is fine.


57-57: Closing brace is in place.


58-67: New source classes adapt schema usage well.

flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (19)

3-5: Imports align with new references.


8-8: Collector import is relevant for Flink custom deserialization.


19-20: Abstract base class clarifies Avro handling.


24-24: Protected counter is consistent with usage.


27-27: sourceEventEncoder override is succinct.


38-38: avroToInternalRow neatly encapsulates error handling.


64-70: Recover block logs failures gracefully.


73-97: Identity schema: does minimal transform.


76-76: sourceProjectionEnabled = false is correct here.


78-78: Transient row deserializer ensures lifecycle clarity.


80-80: open(...) sets up deserializer properly.


85-85: deserialize(...) returns Row or null on fail.


99-101: AvroSourceProjectionDeserializationSchema extends base with SourceProjection.


103-106: Eval, row serializer, counters introduced.


109-115: projectedSchema builds field list.


117-130: Initialization sets SparkExpressionEval.


132-135: deserialize(...) with collector processes multiple rows.


137-140: Disable single-arg deserialize.


142-154: doSparkExprEval handles errors politely.

flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala (8)

25-25: Transient field looks fine.
Will reinitialize on job restarts.


30-30: No functional change.


34-35: Initialization is straightforward.
No concerns.


40-40: Metrics initialization done.
Works for performance tracking.


44-44: Validate partial results or errors.
Ensure all exceptions in evaluateExpressions are properly handled.


49-49: Close method.
Graceful shutdown.


58-58: Spark SQL bulk call uses evaluator.
Looks good.


65-65: Catalyst bulk call.
Consistent approach.

flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (5)

4-11: Imports consolidated.
All references match new schema provider and SparkExprEvalFn usage.


149-149: New provider usage.
SourceIdentitySchemaRegistrySchemaProvider is correctly instantiated.


155-155: Schema construction.
Build deserialization straightforward.


156-156: No content change.


168-168: Using sourceEventEncoder.
Aligns with the new approach.

flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala (8)

2-3: New imports are consistent.
No issues found.

Also applies to: 5-5, 11-11


13-13: Doc updates.
Summarize new base class approach well.

Also applies to: 18-19


20-21: Abstract class creation.
Allows specialized schema providers.


37-39: Protected builder method.
Promotes subclass reusability.


50-52: readSchema method.
Fetching schema from registry is neatly handled.

Also applies to: 66-67


70-85: SourceIdentitySchemaRegistrySchemaProvider.
Returns raw source events with Avro identity.


87-105: ProjectedSchemaRegistrySchemaProvider.
Correctly handles Avro with source projection.


106-106: Registry constants.
Clear naming for keys.

Copy link
Contributor

@nikhil-zlai nikhil-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work!!

Not necessarily about this PR, but we should wire up the flink pipeline into the fetcher test that runs the backfill and lambda and compares results.

}
maybeServingInfo
.map { servingInfo =>
val topicUri = servingInfo.groupBy.streamingSource.get.topic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this block into its own function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

val metricsGroup = context.getMetricGroup
.addGroup("chronon")
.addGroup("topic", topicName)
.addGroup("feature_group", groupBy.getMetaData.getName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets call it group_by? I generally try to remove any mention of "feature" in the ml sense from across the repo.

Suggested change
.addGroup("feature_group", groupBy.getMetaData.getName)
.addGroup("group_by", groupBy.getMetaData.getName)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, updated

}

override def deserialize(messageBytes: Array[Byte]): Row = {
protected def avroToInternalRow(messageBytes: Array[Byte]): Try[InternalRow] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very cool!

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (1)

325-328: Runtime type validation

Consider using pattern matching instead of isInstanceOf and asInstanceOf for better type safety.

-require(
-  deserializationSchema.isInstanceOf[SourceProjection],
-  s"Expect created deserialization schema for groupBy: $groupByName with $topicInfo to mixin SourceProjection. " +
-    s"We got: ${deserializationSchema.getClass.getSimpleName}"
-)
-val projectedSchema = deserializationSchema.asInstanceOf[SourceProjection].projectedSchema
+deserializationSchema match {
+  case projection: SourceProjection => 
+    val projectedSchema = projection.projectedSchema
+  case _ =>
+    throw new IllegalArgumentException(
+      s"Expect created deserialization schema for groupBy: $groupByName with $topicInfo to mixin SourceProjection. " +
+      s"We got: ${deserializationSchema.getClass.getSimpleName}")
+}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between baa3edb and c4a6de0.

📒 Files selected for processing (2)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (7 hunks)
  • flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala
🧰 Additional context used
🧬 Code Definitions (1)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (5)
flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala (5)
  • flink (37-48)
  • SourceIdentitySchemaRegistrySchemaProvider (72-85)
  • SourceIdentitySchemaRegistrySchemaProvider (106-111)
  • buildDeserializationSchema (75-84)
  • buildDeserializationSchema (92-103)
flink/src/main/scala/ai/chronon/flink/types/FlinkTypes.scala (2)
  • AvroCodecOutput (75-95)
  • WriteResponse (99-123)
flink/src/main/scala/ai/chronon/flink/window/KeySelectorBuilder.scala (1)
  • KeySelectorBuilder (15-41)
flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1)
  • getDataStream (38-55)
flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala (2)
  • flatMap (105-117)
  • flatMap (143-155)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: non_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: non_spark_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (10)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (10)

52-62: Class signature updated to use Map instead of generic type

Class now accepts Map[String, Any] instead of generic type T and includes a new inputSchema parameter. This aligns with the source projection optimization mentioned in PR objectives.


102-103: Variable renamed for clarity

Renamed sourceStream to sourceSparkProjectedStream to better reflect that this stream now contains projected data.


108-112: Watermark assignment simplified

Watermark assignment now applied directly to the projected stream, removing the need for separate expression evaluation.


144-145: Updated aggregation to use input schema

The FlinkRowAggregationFunction and FlinkRowAggProcessFunction now receive the inputSchema parameter, ensuring they work with the projected data structure.


149-150: Consistent parallelism setting

Parallelism is now consistently derived from sourceSparkProjectedStream across all operators.


157-158: Consistent parallelism for late event tracking

Same parallelism setting approach applied to late event tracking.


160-161: Simplified TiledAvroCodecFn usage

TiledAvroCodecFn no longer requires a generic type parameter, simplifying its usage.


260-270: Simplified FlinkJob initialization

Job initialization logic is now more straightforward with better error handling.


297-298: Removed untiled job path

Only tiled job implementation is now supported, as mentioned in PR objectives.


308-346: New buildFlinkJob method with source projection

This new method:

  1. Creates appropriate schema provider
  2. Builds deserialization schema with projection capability
  3. Validates schema implements SourceProjection
  4. Initializes new ProjectedKafkaFlinkSource

Code effectively implements the source projection optimization mentioned in PR objectives.

@piyush-zlai piyush-zlai merged commit 221353f into main Mar 31, 2025
7 checks passed
@piyush-zlai piyush-zlai deleted the piyush/flink_source_projection branch March 31, 2025 14:21
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
## Summary
We've been seeing our listing.actions Flink apps are often treading
water keeping up with the load. We squashed down some of the
inefficiencies due to CU
(#534) but we still are not
able to keep up with the chosen parallelism of 12. The flamegraphs do
show that we spend a decent chunk of time on kryo ser/deser in the
KafkaRecordEmitter path. This is expected as the beacon events are
fairly wide (~400 fields). As we immediately run SparkExprEval
immediately after reading avro -> converting to Row, we decided to push
the Spark expr eval into the source operator (DeserializationSchema).
These changes improve perf significantly -

![Screenshot 2025-03-26 at 7 54
18 PM](https://github.com/user-attachments/assets/e065f214-a60f-4cbd-9d15-a2b1bb1455f2)

Changes unfortunately ended up being fairly intrusive as we do need to
support being able to do vanilla Avro deser to support the validation
flink job as well as deduplicating the SparkExpressionEval code between
the new source projection operator and the old rich map function (used
in validation flink job).
* Dropped the untiled Flink job
* Dropped the old testing mock source Flink job (was used during early
dev, we don't need it going forward)

## Checklist
- [X] Added Unit Tests
- [X] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
  - Introduced enhanced source capabilities with projected schemas.
  - Added a dedicated evaluator for processing Spark SQL expressions.
- Provided new utilities for Avro record creation and deserialization
testing.
- Added new test classes for validating Avro source identity and
projection deserialization.

- **Refactor**
- Streamlined job configuration by updating input data formats and
removing legacy processing paths.
- Refined schema management for improved extensibility and simplified
deserialization.
- Consolidated evaluation logic into a unified component for better
performance.

- **Tests**
- Updated integration tests for unique event handling and new processing
pipelines.
- Enhanced test utilities to match the updated data transformation
logic.
- Removed obsolete test classes and replaced them with updated
specifications for Avro deserialization.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
## Summary
We've been seeing our listing.actions Flink apps are often treading
water keeping up with the load. We squashed down some of the
inefficiencies due to CU
(#534) but we still are not
able to keep up with the chosen parallelism of 12. The flamegraphs do
show that we spend a decent chunk of time on kryo ser/deser in the
KafkaRecordEmitter path. This is expected as the beacon events are
fairly wide (~400 fields). As we immediately run SparkExprEval
immediately after reading avro -> converting to Row, we decided to push
the Spark expr eval into the source operator (DeserializationSchema).
These changes improve perf significantly -

![Screenshot 2025-03-26 at 7 54
18 PM](https://github.com/user-attachments/assets/e065f214-a60f-4cbd-9d15-a2b1bb1455f2)

Changes unfortunately ended up being fairly intrusive as we do need to
support being able to do vanilla Avro deser to support the validation
flink job as well as deduplicating the SparkExpressionEval code between
the new source projection operator and the old rich map function (used
in validation flink job).
* Dropped the untiled Flink job
* Dropped the old testing mock source Flink job (was used during early
dev, we don't need it going forward)

## Checklist
- [X] Added Unit Tests
- [X] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
  - Introduced enhanced source capabilities with projected schemas.
  - Added a dedicated evaluator for processing Spark SQL expressions.
- Provided new utilities for Avro record creation and deserialization
testing.
- Added new test classes for validating Avro source identity and
projection deserialization.

- **Refactor**
- Streamlined job configuration by updating input data formats and
removing legacy processing paths.
- Refined schema management for improved extensibility and simplified
deserialization.
- Consolidated evaluation logic into a unified component for better
performance.

- **Tests**
- Updated integration tests for unique event handling and new processing
pipelines.
- Enhanced test utilities to match the updated data transformation
logic.
- Removed obsolete test classes and replaced them with updated
specifications for Avro deserialization.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary
We've been seeing our listing.actions Flink apps are often treading
water keeping up with the load. We squashed down some of the
inefficiencies due to CU
(#534) but we still are not
able to keep up with the chosen parallelism of 12. The flamegraphs do
show that we spend a decent chunk of time on kryo ser/deser in the
KafkaRecordEmitter path. This is expected as the beacon events are
fairly wide (~400 fields). As we immediately run SparkExprEval
immediately after reading avro -> converting to Row, we decided to push
the Spark expr eval into the source operator (DeserializationSchema).
These changes improve perf significantly -

![Screenshot 2025-03-26 at 7 54
18 PM](https://github.com/user-attachments/assets/e065f214-a60f-4cbd-9d15-a2b1bb1455f2)

Changes unfortunately ended up being fairly intrusive as we do need to
support being able to do vanilla Avro deser to support the validation
flink job as well as deduplicating the SparkExpressionEval code between
the new source projection operator and the old rich map function (used
in validation flink job).
* Dropped the untiled Flink job
* Dropped the old testing mock source Flink job (was used during early
dev, we don't need it going forward)

## Checklist
- [X] Added Unit Tests
- [X] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
  - Introduced enhanced source capabilities with projected schemas.
  - Added a dedicated evaluator for processing Spark SQL expressions.
- Provided new utilities for Avro record creation and deserialization
testing.
- Added new test classes for validating Avro source identity and
projection deserialization.

- **Refactor**
- Streamlined job configuration by updating input data formats and
removing legacy processing paths.
- Refined schema management for improved extensibility and simplified
deserialization.
- Consolidated evaluation logic into a unified component for better
performance.

- **Tests**
- Updated integration tests for unique event handling and new processing
pipelines.
- Enhanced test utilities to match the updated data transformation
logic.
- Removed obsolete test classes and replaced them with updated
specifications for Avro deserialization.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
## Summary
We've been seeing our listing.actions Flink apps are often treading
water keeping up with the load. We squashed down some of the
inefficiencies due to CU
(#534) but we still are not
able to keep up with the chosen parallelism of 12. The flamegraphs do
show that we spend a decent chunk of time on kryo ser/deser in the
KafkaRecordEmitter path. This is expected as the beacon events are
fairly wide (~400 fields). As we immediately run SparkExprEval
immediately after reading avro -> converting to Row, we decided to push
the Spark expr eval into the source operator (DeserializationSchema).
These changes improve perf significantly -

![Screenshot 2025-03-26 at 7 54
18 PM](https://github.com/user-attachments/assets/e065f214-a60f-4cbd-9d15-a2b1bb1455f2)

Changes unfortunately ended up being fairly intrusive as we do need to
support being able to do vanilla Avro deser to support the validation
flink job as well as deduplicating the SparkExpressionEval code between
the new source projection operator and the old rich map function (used
in validation flink job).
* Dropped the untiled Flink job
* Dropped the old testing mock source Flink job (was used during early
dev, we don't need it going forward)

## Checklist
- [X] Added Unit Tests
- [X] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
  - Introduced enhanced source capabilities with projected schemas.
  - Added a dedicated evaluator for processing Spark SQL expressions.
- Provided new utilities for Avro record creation and deserialization
testing.
- Added new test classes for validating Avro source identity and
projection deserialization.

- **Refactor**
- Streamlined job configuration by updating input data formats and
removing legacy processing paths.
- Refined schema management for improved extensibility and simplified
deserialization.
- Consolidated evaluation logic into a unified component for better
performance.

- **Tests**
- Updated integration tests for unique event handling and new processing
pipelines.
- Enhanced test utilities to match the updated data transformation
logic.
- Removed obsolete test classes and replaced them with updated
specifications for Avro deserialization.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
## Summary
We've been seeing our listing.actions Flink apps are often treading
water keeping up with the load. We squashed down some of the
inefficiencies due to CU
(#534) but we still are not
able to keep up with the chosen parallelism of 12. The flamegraphs do
show that we spend a decent chunk of time on kryo ser/deser in the
KafkaRecordEmitter path. This is expected as the beacon events are
fairly wide (~400 fields). As we immediately run SparkExprEval
immediately after reading avro -> converting to Row, we decided to push
the Spark expr eval into the source operator (DeserializationSchema).
These changes improve perf significantly -

![Screenshot 2025-03-26 at 7 54
18 PM](https://github.com/user-attachments/assets/e065f214-a60f-4cbd-9d15-a2b1bb1455f2)

Changes unfortunately ended up being fairly intrusive as we do need to
support being able to do vanilla Avro deser to support the validation
flink job as well as deduplicating the SparkExpressionEval code between
the new source projection operator and the old rich map function (used
in validation flink job).
* Dropped the untiled Flink job
* Dropped the old testing moour clients source Flink job (was used during early
dev, we don't need it going forward)

## Cheour clientslist
- [X] Added Unit Tests
- [X] Covered by existing CI
- [X] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
  - Introduced enhanced source capabilities with projected schemas.
  - Added a dedicated evaluator for processing Spark SQL expressions.
- Provided new utilities for Avro record creation and deserialization
testing.
- Added new test classes for validating Avro source identity and
projection deserialization.

- **Refactor**
- Streamlined job configuration by updating input data formats and
removing legacy processing paths.
- Refined schema management for improved extensibility and simplified
deserialization.
- Consolidated evaluation logic into a unified component for better
performance.

- **Tests**
- Updated integration tests for unique event handling and new processing
pipelines.
- Enhanced test utilities to match the updated data transformation
logic.
- Removed obsolete test classes and replaced them with updated
specifications for Avro deserialization.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
@piyush-zlai piyush-zlai mentioned this pull request Jun 2, 2025
4 tasks
piyush-zlai added a commit that referenced this pull request Jun 3, 2025
## Summary
Revive the untiled Flink job from
#545. We thought we didn't
need it at that point but while discussing the CDC work, its turning out
to be a lot easier to build on top of raw events given the fact that
mutations can come fairly late and handling tile updates etc is painful
across long time ranges.

## Checklist
- [ ] Added Unit Tests
- [X] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Introduced a new Flink job mode without shuffling or windowing,
enabling streamlined data processing in a single node.

- **Tests**
	- Added an end-to-end integration test for the new Flink job mode.
- Refactored existing tests to improve maintainability and reduce
duplication.

- **Refactor**
- Simplified class definitions and improved code clarity by removing
unused type parameters.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants