Skip to content

Conversation

@piyush-zlai
Copy link
Contributor

@piyush-zlai piyush-zlai commented Jan 28, 2025

Summary

Wire up support to look up schemas from schema registry + set up appropriate Avro Spark encoders to help with SparkExprEval.
There's a few things to call out:

  • Had to move some TopicChecker code around (moved it out of the spark module) to be able to re-use topic presence checks + partition count lookups. We set parallelism based on partition count.
  • Look up schemas in schema registry atm. We can add an implementation that does these based on jar artifacts. The schema registry support can also mean the Kafka wire format is schema registry (this is the Etsy default). In which case every Kafka message consists of a magic byte + schema ID int. There's other ways of using the registry (e.g. the way we had it at Stripe) where the wire format is the avro / thrift / proto bytes but the lookup of the schema is done based on topic + subject only.
  • Add support for a Flink Kafka source which plugs in a DeserializationSchema. This currently for us is Avro only and essentially allows us to crack open Avro Array[Bytes] to a Spark Row that we use over the rest of the Flink job (and makes it easier to run expr eval).
  • Assign watermarks post spark expr eval -> this allows us to use the user GroupBy Source configured timestamp column to accurately set watermarks and timestamps. This is not needed in the untiled but will be essential when we turn on tiling to ensure correctness.

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested - Kicked off on the cluster - Job
  • Documentation update

Summary by CodeRabbit

Based on the comprehensive changes across multiple files, here are the updated release notes:

  • New Features

    • Added Kafka Schema Registry support for Flink and Spark.
    • Introduced Avro deserialization support for Row-based data processing.
    • Enhanced event stream processing with watermarking capabilities.
    • Added support for dynamic topic and message bus configuration.
    • New KafkaFlinkSource class for integrating Kafka with Flink.
    • New SchemaRegistrySchemaProvider class for managing Avro schemas.
    • New TopicCheckerApp for command-line topic management.
    • New User Avro schema for structured user data representation.
  • Improvements

    • Refactored package and import structures.
    • Improved error handling for schema and topic management.
    • Added more flexible configuration options for streaming sources.
    • Enhanced metric computation with edit distance functionality.
  • Bug Fixes

    • Updated topic and stream handling to support more dynamic configurations.
    • Resolved potential issues with schema deserialization.
  • Chores

    • Reorganized project structure.
    • Updated dependencies to include Confluent Schema Registry client.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 28, 2025

Walkthrough

This pull request involves a package name change for the EditDistance class and its associated test, along with significant updates to the build.sbt file for Kafka integration. New classes for handling Kafka sources and Avro schema management are introduced, enhancing data processing capabilities within the Flink framework. The TopicInfo case class is modified to reflect a shift from topicType to messageBus, and various methods across multiple files have been updated to accommodate these changes.

Changes

File Change Summary
aggregator/src/main/scala/ai/chronon/aggregator/stats/EditDistance.scala Package name changed from ai.chronon.spark.stats to ai.chronon.aggregator.stats
aggregator/src/test/scala/ai/chronon/aggregator/test/EditDistanceTest.scala Package name updated from ai.chronon.spark.test to ai.chronon.aggregator.test, import statement modified
build.sbt Added Confluent schema registry resolver and Kafka-related dependencies
flink/src/main/scala/ai/chronon/flink/* Added new classes: KafkaFlinkSource, SchemaProvider, SchemaRegistrySchemaProvider; enhanced FlinkJob with watermarking
flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala Updated E2EEventSource class to use Row objects
online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala Updated TopicInfo to use messageBus instead of topicType
online/src/main/scala/ai/chronon/online/TopicChecker.scala Package changed, method signatures updated to include additionalProps, added mapToJavaProperties method
spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala Added edit_distance UDF method
spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala Updated argument in buildStream method from topic.topicType to topic.messageBus
spark/src/main/scala/ai/chronon/spark/streaming/KafkaStreamBuilder.scala Import added for TopicChecker from ai.chronon.online
spark/src/main/scala/ai/chronon/spark/streaming/TopicCheckerApp.scala New object added for managing command-line arguments and Kafka topic validation

Suggested Reviewers

  • nikhil-zlai
  • tchow-zlai
  • david-zlai

Poem

🌊 Streams of data, flowing free
Kafka's whispers, Flink's decree
Schemas dance, encoders sing
A code symphony on the wing! 🎵
Chronon's magic, ever bright 🌟

Warning

Review ran into problems

🔥 Problems

GitHub Actions: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🔭 Outside diff range comments (1)
spark/src/main/scala/ai/chronon/spark/streaming/KafkaStreamBuilder.scala (1)

Line range hint 35-35: Handle potential missing host in bootstrap server construction.

The getOrElse chain might fail if 'host' is missing.

-    val bootstrap = conf.getOrElse("bootstrap", conf("host") + conf.get("port").map(":" + _).getOrElse(""))
+    val bootstrap = conf.get("bootstrap").getOrElse {
+      conf.get("host").map { host =>
+        host + conf.get("port").map(":" + _).getOrElse("")
+      }.getOrElse(throw new IllegalArgumentException("Missing bootstrap or host configuration"))
+    }
🧹 Nitpick comments (21)
flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala (2)

14-15: Ensure descriptive config key for clarity.
Naming it schema_registry_url instead of registry_url might be clearer.


23-36: Add schema logging.
Consider logging the fetched schema ID and subject for easier debugging.

flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (2)

15-28: Consolidate bootstrap logic.
Simplify to reduce nesting if possible.


35-52: Add watermarks if needed.
Watermarks are disabled. Check if event-time processing is required.

online/src/main/scala/ai/chronon/online/TopicChecker.scala (1)

Line range hint 43-80: Refine error message.
The thrown RuntimeException includes large text. Keep it concise for logs.

flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (4)

18-25: Possible overhead with multiple calls to AvroDataToCatalyst
Reuse the same instance to reduce object creation.


27-30: Initialization timing
Mark them lazy if only used after open().


44-80: Return null on failure
We skip data. Consider partial recovery.


83-90: Naming
“build” method is good. Might rename to “create” for clarity.

flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (2)

80-92: Local Avro schema
Consider allowing dynamic overrides.


93-104: Repeated encode-decode
We do Avro encode then immediately deserialize. Possibly skip re-deser for performance.

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (1)

258-265: Out-of-order window
5-minute buffer might be large for some use cases.

flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala (1)

15-17: Consider adding validation for required config keys.

Add config validation in constructor to fail fast if required keys are missing.

flink/src/test/resources/user.avsc (2)

34-34: Consider adding pattern constraint for postalCode.

Add a pattern regex to validate postal code format.


48-51: Consider using logical type for timestamp.

Replace long with timestamp-millis logical type for better semantics.

-      "type": "long",
+      "type": {"type": "long", "logicalType": "timestamp-millis"},
flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala (1)

33-40: Consider enhancing schema validation.

Add assertions to verify the schema structure and field types.

online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala (1)

Line range hint 33-52: Add input validation in parse method.

Consider adding validation for:

  • Malformed key-value pairs
  • Empty or null parameters
  • Invalid message bus types
 def parse(topic: String): TopicInfo = {
   assert(topic.nonEmpty, s"invalid topic: $topic")
+  val validMessageBuses = Set("kafka", "pulsar") // Add supported types
   val (messageBus, rest) = if (topic.contains("://")) {
     val tokens = topic.split("://", 2)
+    assert(validMessageBuses.contains(tokens.head), s"unsupported message bus: ${tokens.head}")
     tokens.head -> tokens.last
   } else {
     "kafka" -> topic
   }
   assert(rest.nonEmpty, s"invalid topic: $topic")
   val fields = rest.split("/")
   val topicName = fields.head
   val params = fields.tail.map { f =>
+    assert(f.contains("="), s"malformed parameter: $f")
     val kv = f.split("=", 2); kv.head -> kv.last
   }.toMap
+  assert(params.values.forall(_.nonEmpty), "empty parameter values not allowed")
   TopicInfo(topicName, messageBus, params)
 }
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (1)

63-66: Extract test configuration to constants.

Move hardcoded values to constants or configuration file for better maintainability.

+  private val TestConfig = Map(
+    "groupby_name" -> "etsy.listing_canary.actions_v1",
+    "kafka_bootstrap" -> "bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092",
+    "gcp_project_id" -> "canary-443022",
+    "gcp_bigtable_instance" -> "zipline-canary-instance"
+  )
+
   it should "test flink job locally" ignore {
     val submitter = DataprocSubmitter()
     submitter.submit(spark.FlinkJob,
       Map(MainClass -> "ai.chronon.flink.FlinkJob",
         FlinkMainJarURI -> "gs://zipline-jars/flink-assembly-0.1.0-SNAPSHOT.jar",
         JarURI -> "gs://zipline-jars/cloud_gcp_bigtable.jar"),
       List.empty,
       "--online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl",
-      "--groupby-name=etsy.listing_canary.actions_v1",
-      "-kafka-bootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092",
-      "-ZGCP_PROJECT_ID=canary-443022",
-      "-ZGCP_BIGTABLE_INSTANCE_ID=zipline-canary-instance")
+      s"--groupby-name=${TestConfig("groupby_name")}",
+      s"-kafka-bootstrap=${TestConfig("kafka_bootstrap")}",
+      s"-ZGCP_PROJECT_ID=${TestConfig("gcp_project_id")}",
+      s"-ZGCP_BIGTABLE_INSTANCE_ID=${TestConfig("gcp_bigtable_instance")}")
flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala (3)

23-43: Refactor schema loading and improve resource handling.

Extract schema loading to a helper method and use try-with-resources pattern.

+  private def loadSchema(): String = {
+    val schemaSrc = Source.fromURL(getClass.getClassLoader.getResource("user.avsc"))
+    try schemaSrc.mkString finally schemaSrc.close()
+  }

   it should "deserialize avro data" in {
-    val schemaSrc = Source.fromURL(getClass.getClassLoader.getResource("user.avsc"))
-    val schemaStr =
-      try {
-        schemaSrc.mkString
-      } finally {
-        schemaSrc.close()
-      }
+    val schemaStr = loadSchema()

86-124: Extract test data to constants and split method.

Break down the record creation into smaller methods for better readability.

 object AvroObjectCreator {
+  private val TestData = Map(
+    "id" -> 12345,
+    "username" -> "johndoe",
+    "street" -> "123 Main St",
+    "city" -> "San Francisco"
+  )
+
+  private def createAddress(schema: Schema): GenericRecord = {
+    val address = new GenericData.Record(schema)
+    address.put("street", TestData("street"))
+    address.put("city", TestData("city"))
+    address.put("country", "USA")
+    address.put("postalCode", "94105")
+    address
+  }

66-83: Enhance corruption testing scenarios.

Add more test cases for different types of data corruption.

+  it should "handle various corruption scenarios" in {
+    val schemaStr = loadSchema()
+    val (_, deserSchema) = AvroDeserializationSupport.build("test-topic", schemaStr)
+    deserSchema.open(new DummyInitializationContext)
+    val recordBytes = AvroObjectCreator.createDummyRecordBytes(schemaStr)
+
+    // Test truncated data
+    assert(deserSchema.deserialize(recordBytes.take(10)) == null)
+
+    // Test invalid magic byte
+    val invalidMagicBytes = Array[Byte](1, 0, 0, 0, 1) ++ recordBytes
+    assert(deserSchema.deserialize(invalidMagicBytes) == null)
+  }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 6ba45d9 and ead69ac.

📒 Files selected for processing (21)
  • aggregator/src/main/scala/ai/chronon/aggregator/stats/EditDistance.scala (1 hunks)
  • aggregator/src/test/scala/ai/chronon/aggregator/test/EditDistanceTest.scala (1 hunks)
  • build.sbt (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (10 hunks)
  • flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SourceProvider.scala (0 hunks)
  • flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (5 hunks)
  • flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (1 hunks)
  • flink/src/test/resources/user.avsc (1 hunks)
  • flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala (1 hunks)
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala (2 hunks)
  • online/src/main/scala/ai/chronon/online/TopicChecker.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/KafkaStreamBuilder.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/TopicCheckerApp.scala (1 hunks)
💤 Files with no reviewable changes (1)
  • flink/src/main/scala/ai/chronon/flink/SourceProvider.scala
✅ Files skipped from review due to trivial changes (3)
  • aggregator/src/main/scala/ai/chronon/aggregator/stats/EditDistance.scala
  • aggregator/src/test/scala/ai/chronon/aggregator/test/EditDistanceTest.scala
  • spark/src/main/scala/ai/chronon/spark/Driver.scala
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: no_spark_scala_tests
  • GitHub Check: fetcher_spark_tests
  • GitHub Check: join_spark_tests
🔇 Additional comments (27)
flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala (2)

20-22: Optional private method.
buildSchemaRegistryClient is only used internally; consider making it private if feasible.


38-43: Guard for unexpected schema types.
Strictly Avro-based logic is fine, but highlight in docs that non-Avro is unsupported.

flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1)

33-33: Parallelism overrides.
Implicit parallelism might conflict if the job sets a different value. Confirm the design.

online/src/main/scala/ai/chronon/online/TopicChecker.scala (1)

82-85: Use standard Java property keys.
Validate that the property keys conform to Kafka's standard config.

flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (3)

3-16: Imports look fine
No issues in these dependencies.


31-33: Logger and counter usage
Seems fine for concurrency.


34-42: Metrics naming
Ensure “topic” group doesn't clash with other topics.

flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (2)

Line range hint 43-66: Mock source parallelism
Good approach for testing. Matches partition count.


181-193: Parallelism=1
Acceptable for tests; watch performance in production.

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (9)

Line range hint 4-42: Imported items
No concerns.


113-119: Watermark assignment
Excellent to handle event time right after spark eval.


168-173: Consistent approach
Matches runGroupByJob watermark usage.


201-201: Aggregator usage
Looks standard. No concurrency issues seen.


266-276: WatermarkStrategy
Ensure presence of time column to avoid null timestamps.


Line range hint 281-304: Optional Kafka config
Handle empty or invalid input carefully.


311-343: Strict bus check
Only “kafka” supported. Fine for now.


369-377: Sink parallelism
Matches upstream operator. No issues.


381-381: Private buildApi
Good scope reduction.

flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala (1)

8-14: Documentation clearly explains the purpose and responsibilities.

spark/src/main/scala/ai/chronon/spark/streaming/KafkaStreamBuilder.scala (1)

21-21: Import looks good.

flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala (3)

15-17: LGTM! Clean mock implementation.


26-31: LGTM! Good negative test case.


51-58: LGTM! Good validation of unsupported schema type.

spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala (1)

20-20: LGTM!

Import statement updated to reflect the new package structure.

spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala (1)

293-293: LGTM! Verify message bus usage.

Changed to use messageBus instead of topicType for stream building.

✅ Verification successful

✓ Verified: messageBus field exists and is used correctly

TopicInfo case class defines messageBus as a String parameter, confirming proper usage in JoinSourceRunner.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of messageBus in TopicInfo
ast-grep --pattern 'class TopicInfo {
  $$$
  messageBus: $_
  $$$
}'

Length of output: 71


Script:

#!/bin/bash
# Find TopicInfo
rg "case class TopicInfo|class TopicInfo" -A 5

Length of output: 804

build.sbt (2)

220-227: LGTM! Schema registry dependencies added.

Added Confluent resolver and schema registry client for Kafka integration.


251-251: LGTM! Test dependency added.

Added Kafka protobuf provider for testing.

Comment on lines +30 to +32
// confirm the topic exists
TopicChecker.topicShouldExist(topicInfo.name, bootstrap, topicInfo.params)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Close AdminClient to avoid leaks.
Topic existence is checked, but no final close is called on AdminClient.

+ try {
+   TopicChecker.topicShouldExist(topicInfo.name, bootstrap, topicInfo.params)
+ } finally {
+   // ensure resources are closed
+ }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +36 to 42
def getPartitions(topic: String, bootstrap: String, additionalProps: Map[String, String] = Map.empty): Int = {
val props = mapToJavaProperties(additionalProps ++ Map(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrap))
val adminClient = AdminClient.create(props)
val topicDescription = adminClient.describeTopics(util.Arrays.asList(topic)).values().get(topic);
topicDescription.get().partitions().size()
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle adminClient resources.
AdminClient isn't closed. Wrap in try-with-resources.

Comment on lines +39 to +41
logger.info(getPartitions(topic, bootstrap).toString)
System.exit(0)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider returning exit code instead of System.exit.

Direct System.exit calls make testing difficult and can leave resources unclosed.

-    logger.info(getPartitions(topic, bootstrap).toString)
-    System.exit(0)
+    logger.info(getPartitions(topic, bootstrap).toString)
+    // Return exit code instead
+    0
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
logger.info(getPartitions(topic, bootstrap).toString)
System.exit(0)
}
logger.info(getPartitions(topic, bootstrap).toString)
// Return exit code instead
0

Comment on lines +32 to +34
lazy val host = tokens.get("host")
lazy val port = tokens.get("port")
lazy val hostPort = s"${host.get}:${port.get}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add null checks for host/port retrieval.

get calls on Option might throw NPE. Use pattern matching or getOrElse.

-      lazy val host = tokens.get("host")
-      lazy val port = tokens.get("port")
-      lazy val hostPort = s"${host.get}:${port.get}"
+      lazy val hostPort = for {
+        host <- tokens.get("host")
+        port <- tokens.get("port")
+      } yield s"$host:$port"
+      lazy val finalHostPort = hostPort.getOrElse(throw new IllegalArgumentException("Missing host/port in config"))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
lazy val host = tokens.get("host")
lazy val port = tokens.get("port")
lazy val hostPort = s"${host.get}:${port.get}"
lazy val hostPort = for {
host <- tokens.get("host")
port <- tokens.get("port")
} yield s"$host:$port"
lazy val finalHostPort = hostPort.getOrElse(throw new IllegalArgumentException("Missing host/port in config"))

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (1)
flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (1)

Add @volatile to ensure thread visibility

-  private var isRunning = true
+  @volatile private var isRunning = true

Without volatile, changes to isRunning might not be visible across threads.

🔗 Analysis chain

Line range hint 43-67: Verify thread safety of event collection.

The source function's shared state isRunning should be volatile.

-        private var isRunning = true
+        @volatile private var isRunning = true
🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Search for similar source implementations with mutable flags
ast-grep --pattern 'class $_ extends $_ {
  $$$
  private var isRunning = $_
  $$$
}'

# Search for volatile usage patterns
rg "@volatile.*private.*var.*isRunning" -t scala

Length of output: 138


Script:

#!/bin/bash
# Search for SourceFunction implementations
ast-grep --pattern 'new SourceFunction[$_] {
  $$$
}'

# Search for any volatile usage
rg "@volatile.*private.*var" -t scala

# Search for similar cancel patterns
ast-grep --pattern 'override def cancel(): Unit = {
  $$$
}'

Length of output: 416

♻️ Duplicate comments (1)
online/src/main/scala/ai/chronon/online/TopicChecker.scala (1)

36-41: ⚠️ Potential issue

Close AdminClient resources.

AdminClient should be closed to prevent resource leaks.

-    val adminClient = AdminClient.create(props)
-    val topicDescription = adminClient.describeTopics(util.Arrays.asList(topic)).values().get(topic);
-    topicDescription.get().partitions().size()
+    val adminClient = AdminClient.create(props)
+    try {
+      val topicDescription = adminClient.describeTopics(util.Arrays.asList(topic)).values().get(topic)
+      topicDescription.get().partitions().size()
+    } finally {
+      adminClient.close()
+    }
🧹 Nitpick comments (8)
spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala (1)

Line range hint 89-100: Consider adding error handling for large sequences.

Edit distance computation can be expensive for very long sequences.

 private def edit_distance: UserDefinedFunction =
-    functions.udf((left: Object, right: Object) => EditDistance.between(left, right))
+    functions.udf((left: Object, right: Object) => {
+      val maxLength = 1000 // configurable threshold
+      if (left == null || right == null) return null
+      val leftStr = left.toString
+      val rightStr = right.toString
+      if (leftStr.length > maxLength || rightStr.length > maxLength) {
+        throw new IllegalArgumentException(s"Sequence length exceeds $maxLength characters")
+      }
+      EditDistance.between(left, right)
+    })
flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1)

33-33: Implicit parallelism.
Consider storing in a local val to avoid implicit shadowing.

flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (1)

18-19: Schema usage note.
No dynamic handling of differing schema IDs. If needed, consider caching by ID.

flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala (2)

14-16: Validate registry URL and make cache capacity configurable.

Add URL format validation and make cache capacity configurable via conf.

-  private val schemaRegistryUrl: String =
-    conf.getOrElse("registry_url", throw new IllegalArgumentException("registry_url not set"))
-  private val CacheCapacity: Int = 10
+  private val schemaRegistryUrl: String = validateRegistryUrl(
+    conf.getOrElse("registry_url", throw new IllegalArgumentException("registry_url not set")))
+  private val cacheCapacity: Int = conf.get("cache_capacity").map(_.toInt).getOrElse(10)

23-36: Enhance error handling and add schema caching.

Improve error messages and consider caching retrieved schemas to reduce registry load.

build.sbt (1)

220-227: Define Confluent version as a variable.

Move version to a val for easier maintenance.

+lazy val confluentVersion = "7.6.0"
 resolvers += "Confluent" at "https://packages.confluent.io/maven/",
-libraryDependencies += "io.confluent" % "kafka-schema-registry-client" % "7.6.0",
+libraryDependencies += "io.confluent" % "kafka-schema-registry-client" % confluentVersion,
flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (2)

93-104: Use deterministic timestamps for testing.

Replace System.currentTimeMillis() with a fixed timestamp for reproducible tests.

-    val startTs = System.currentTimeMillis()
+    val startTs = 1704067200000L  // 2024-01-01 00:00:00 UTC

181-195: Consider configurable parallelism.

Hard-coded parallelism of 1 may not be suitable for all test scenarios.

-    val parallelism = 1
+    val parallelism = sys.env.getOrElse("TEST_PARALLELISM", "1").toInt
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between ead69ac and 31fcbba.

📒 Files selected for processing (21)
  • aggregator/src/main/scala/ai/chronon/aggregator/stats/EditDistance.scala (1 hunks)
  • aggregator/src/test/scala/ai/chronon/aggregator/test/EditDistanceTest.scala (1 hunks)
  • build.sbt (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (10 hunks)
  • flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SourceProvider.scala (0 hunks)
  • flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (5 hunks)
  • flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (1 hunks)
  • flink/src/test/resources/user.avsc (1 hunks)
  • flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala (1 hunks)
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala (2 hunks)
  • online/src/main/scala/ai/chronon/online/TopicChecker.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/KafkaStreamBuilder.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/TopicCheckerApp.scala (1 hunks)
💤 Files with no reviewable changes (1)
  • flink/src/main/scala/ai/chronon/flink/SourceProvider.scala
🚧 Files skipped from review as they are similar to previous changes (11)
  • aggregator/src/main/scala/ai/chronon/aggregator/stats/EditDistance.scala
  • spark/src/main/scala/ai/chronon/spark/streaming/KafkaStreamBuilder.scala
  • spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala
  • aggregator/src/test/scala/ai/chronon/aggregator/test/EditDistanceTest.scala
  • spark/src/main/scala/ai/chronon/spark/Driver.scala
  • flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala
  • flink/src/test/resources/user.avsc
  • flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala
  • online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: table_utils_delta_format_spark_tests
  • GitHub Check: other_spark_tests
  • GitHub Check: mutation_spark_tests
  • GitHub Check: join_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: no_spark_scala_tests
  • GitHub Check: fetcher_spark_tests
🔇 Additional comments (19)
spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala (2)

20-20: LGTM: Package reorganization.

Clean move of EditDistance to aggregator.stats package.


Line range hint 46-47: LGTM: Well-defined UDF.

Proper UDF wrapper for EditDistance.between.

flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (4)

15-19: Class signature is straightforward.
Clean definition.


20-29: Validate host/port parameters.
Ensure config keys "host" and "port" are always present to avoid KeyErrors.


30-31: Close AdminClient after topic check.
Same as prior suggestion.


35-53: Validate offset strategy and watermark approach.
Using LATEST offset plus no watermarks may skip old data and event-time logic.

flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (4)

21-25: Encoder logic.
Looks concise and correct.


34-42: Initialization flow is clear.
Metrics and deserializer setup are standard.


44-80: Ensure partial messages are handled.
Exception is caught, but watch out for very short payloads.


83-90: Object build method is neat.
Simple interface returning both encoder and schema is good.

spark/src/main/scala/ai/chronon/spark/streaming/TopicCheckerApp.scala (3)

15-21: Command-line parsing is minimal but effective.
No immediate issues.


32-34: Host/port null checks.
Same as prior suggestion.


39-41: Return exit code instead of System.exit.
Same as prior suggestion.

flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala (1)

38-43: LGTM! Good schema type validation.

Clear schema type check with descriptive error message.

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)

258-275: LGTM! Well-implemented watermark strategy.

Good handling of late events and low traffic scenarios.


311-338: LGTM! Clean schema provider integration.

Good error handling and clear validation logic.

online/src/main/scala/ai/chronon/online/TopicChecker.scala (1)

82-85: LGTM! Clean utility method.

Simple and effective implementation.

build.sbt (1)

Line range hint 251-256: LGTM! Well-managed test dependencies.

Good handling of exclusions to prevent conflicts.

flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (1)

80-92: Validate schema field types.

Add runtime validation for field type compatibility with Avro.

Comment on lines +18 to +21
private val schemaRegistryClient: SchemaRegistryClient = buildSchemaRegistryClient(schemaRegistryUrl)

private[flink] def buildSchemaRegistryClient(registryUrl: String): SchemaRegistryClient =
new CachedSchemaRegistryClient(registryUrl, CacheCapacity)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for client initialization.

Catch and wrap initialization errors, consider adding retry mechanism.

-  private val schemaRegistryClient: SchemaRegistryClient = buildSchemaRegistryClient(schemaRegistryUrl)
+  private val schemaRegistryClient: SchemaRegistryClient = try {
+    buildSchemaRegistryClient(schemaRegistryUrl)
+  } catch {
+    case e: Exception => throw new IllegalStateException(s"Failed to initialize schema registry client: ${e.getMessage}", e)
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private val schemaRegistryClient: SchemaRegistryClient = buildSchemaRegistryClient(schemaRegistryUrl)
private[flink] def buildSchemaRegistryClient(registryUrl: String): SchemaRegistryClient =
new CachedSchemaRegistryClient(registryUrl, CacheCapacity)
private val schemaRegistryClient: SchemaRegistryClient = try {
buildSchemaRegistryClient(schemaRegistryUrl)
} catch {
case e: Exception => throw new IllegalStateException(s"Failed to initialize schema registry client: ${e.getMessage}", e)
}
private[flink] def buildSchemaRegistryClient(registryUrl: String): SchemaRegistryClient =
new CachedSchemaRegistryClient(registryUrl, CacheCapacity)


// We wire up the watermark strategy post the spark expr eval to be able to leverage the user's timestamp column (which is
// ETLed to Contants.TimeColumn) as the event timestamp and watermark
val watermarkStrategy: WatermarkStrategy[Map[String, Any]] = WatermarkStrategy
Copy link
Contributor

@nikhil-zlai nikhil-zlai Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonder if we should do Array[Any] & schema separately - instead of Map[String, Any]. That we can do when we need to optimize later - since it is backwards compatible I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we plug the watermark strategy downstream of the sparkexpr eval so that we can pick up the timestamp as the user defined it in their groupby source. Our spark eval output is map[string, any] which is what the type is here (we just reach in and pull out the 'ts' column). One thing to call out - the Map[String, Any] type pretty much stays for one operator hop. Immediately after the spark + watermark op we have a Avro codec operator which converts this to the appropriate event avro ready for wrting to the kv..

Comment on lines 314 to 347
val topicUri = servingInfo.groupBy.streamingSource.get.topic
val topicInfo = TopicInfo.parse(topicUri)
val schemaProvider =
topicInfo.params.get("registry_url") match {
case Some(_) => new SchemaRegistrySchemaProvider(topicInfo.params)
case None =>
throw new IllegalArgumentException(
"We only support schema registry based schema lookups. Missing registry_url in topic config")
}
val (encoder, deserializationSchema) = schemaProvider.buildEncoderAndDeserSchema(topicInfo)
val source =
topicInfo.messageBus match {
case "kafka" =>
new KafkaFlinkSource(kafkaBootstrap, deserializationSchema, topicInfo)
case _ =>
throw new IllegalArgumentException(s"Unsupported message bus: ${topicInfo.messageBus}")
}
new FlinkJob(
eventSrc = source,
sinkFn = new AsyncKVStoreWriter(api, servingInfo.groupBy.metaData.name),
groupByServingInfoParsed = servingInfo,
encoder = encoder,
parallelism = source.parallelism
)
}
.recover {
case e: Exception =>
throw new IllegalArgumentException(s"Unable to lookup serving info for GroupBy: '$groupByName'", e)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sprinkle in new lines - adds a nicely to readability

Suggested change
val topicUri = servingInfo.groupBy.streamingSource.get.topic
val topicInfo = TopicInfo.parse(topicUri)
val schemaProvider =
topicInfo.params.get("registry_url") match {
case Some(_) => new SchemaRegistrySchemaProvider(topicInfo.params)
case None =>
throw new IllegalArgumentException(
"We only support schema registry based schema lookups. Missing registry_url in topic config")
}
val (encoder, deserializationSchema) = schemaProvider.buildEncoderAndDeserSchema(topicInfo)
val source =
topicInfo.messageBus match {
case "kafka" =>
new KafkaFlinkSource(kafkaBootstrap, deserializationSchema, topicInfo)
case _ =>
throw new IllegalArgumentException(s"Unsupported message bus: ${topicInfo.messageBus}")
}
new FlinkJob(
eventSrc = source,
sinkFn = new AsyncKVStoreWriter(api, servingInfo.groupBy.metaData.name),
groupByServingInfoParsed = servingInfo,
encoder = encoder,
parallelism = source.parallelism
)
}
.recover {
case e: Exception =>
throw new IllegalArgumentException(s"Unable to lookup serving info for GroupBy: '$groupByName'", e)
}
val topicUri = servingInfo.groupBy.streamingSource.get.topic
val topicInfo = TopicInfo.parse(topicUri)
val schemaProvider =
topicInfo.params.get("registry_url") match {
case Some(_) => new SchemaRegistrySchemaProvider(topicInfo.params)
case None =>
throw new IllegalArgumentException(
"We only support schema registry based schema lookups. Missing registry_url in topic config")
}
val (encoder, deserializationSchema) = schemaProvider.buildEncoderAndDeserSchema(topicInfo)
val source =
topicInfo.messageBus match {
case "kafka" =>
new KafkaFlinkSource(kafkaBootstrap, deserializationSchema, topicInfo)
case _ =>
throw new IllegalArgumentException(s"Unsupported message bus: ${topicInfo.messageBus}")
}
new FlinkJob(
eventSrc = source,
sinkFn = new AsyncKVStoreWriter(api, servingInfo.groupBy.metaData.name),
groupByServingInfoParsed = servingInfo,
encoder = encoder,
parallelism = source.parallelism
)
}
.recover {
case e: Exception =>
throw new IllegalArgumentException(s"Unable to lookup serving info for GroupBy: '$groupByName'", e)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would also break this out into its own function actually

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +269 to +277
.forBoundedOutOfOrderness[Map[String, Any]](AllowedOutOfOrderness)
.withIdleness(IdlenessTimeout)
.withTimestampAssigner(new SerializableTimestampAssigner[Map[String, Any]] {
override def extractTimestamp(element: Map[String, Any], recordTimestamp: Long): Long = {
element.get(Constants.TimeColumn).map(_.asInstanceOf[Long]).getOrElse(recordTimestamp)
}
})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very cool!

nikhil-zlai
nikhil-zlai previously approved these changes Jan 28, 2025
Copy link
Contributor

@nikhil-zlai nikhil-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool!

@piyush-zlai piyush-zlai force-pushed the piyush/schema_registry branch from 31fcbba to a381571 Compare January 28, 2025 19:17
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1)

35-52: getDataStream method is straightforward.
No immediate issues, but ensure watermarks are configured if event-time is needed.

flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (1)

34-42: Add error handling in open method.

Wrap initialization in try-catch to prevent silent failures.

 override def open(context: DeserializationSchema.InitializationContext): Unit = {
   super.open(context)
-  val metricsGroup = context.getMetricGroup
-    .addGroup("chronon")
-    .addGroup("topic", topicName)
-  deserializationErrorCounter = metricsGroup.counter("avro_deserialization_errors")
-  avroDeserializer = AvroDataToCatalyst(null, jsonSchema, Map.empty)
-  sparkRowDeser = encoder.asInstanceOf[ExpressionEncoder[Row]].resolveAndBind().createDeserializer()
+  try {
+    val metricsGroup = context.getMetricGroup
+      .addGroup("chronon")
+      .addGroup("topic", topicName)
+    deserializationErrorCounter = metricsGroup.counter("avro_deserialization_errors")
+    avroDeserializer = AvroDataToCatalyst(null, jsonSchema, Map.empty)
+    sparkRowDeser = encoder.asInstanceOf[ExpressionEncoder[Row]].resolveAndBind().createDeserializer()
+  } catch {
+    case e: Exception =>
+      logger.error("Failed to initialize deserializer", e)
+      throw e
+  }
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 31fcbba and a381571.

📒 Files selected for processing (21)
  • aggregator/src/main/scala/ai/chronon/aggregator/stats/EditDistance.scala (1 hunks)
  • aggregator/src/test/scala/ai/chronon/aggregator/test/EditDistanceTest.scala (1 hunks)
  • build.sbt (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (10 hunks)
  • flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SourceProvider.scala (0 hunks)
  • flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (5 hunks)
  • flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (1 hunks)
  • flink/src/test/resources/user.avsc (1 hunks)
  • flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala (1 hunks)
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala (2 hunks)
  • online/src/main/scala/ai/chronon/online/TopicChecker.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/KafkaStreamBuilder.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/TopicCheckerApp.scala (1 hunks)
💤 Files with no reviewable changes (1)
  • flink/src/main/scala/ai/chronon/flink/SourceProvider.scala
🚧 Files skipped from review as they are similar to previous changes (13)
  • aggregator/src/main/scala/ai/chronon/aggregator/stats/EditDistance.scala
  • spark/src/main/scala/ai/chronon/spark/streaming/KafkaStreamBuilder.scala
  • spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala
  • spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
  • aggregator/src/test/scala/ai/chronon/aggregator/test/EditDistanceTest.scala
  • spark/src/main/scala/ai/chronon/spark/Driver.scala
  • flink/src/test/resources/user.avsc
  • flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala
  • online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala
  • flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala
  • flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: table_utils_delta_format_spark_tests
  • GitHub Check: mutation_spark_tests
  • GitHub Check: other_spark_tests
  • GitHub Check: fetcher_spark_tests
  • GitHub Check: join_spark_tests
  • GitHub Check: no_spark_scala_tests
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (25)
flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (4)

15-18: Constructor looks good.
The constructor properly receives Kafka params and extends FlinkSource with a clean signature.


20-28: Verify presence of 'host' key in topicInfo.params.
This code calls topicInfo.params("host"), which can throw a KeyNotFoundException if it’s missing.


30-31: Topic existence check is fine.
Please note that TopicChecker does not close the AdminClient. This is tracked in the TopicChecker file.


33-33: Smart approach to derive parallelism from partition count.
Using the number of partitions for parallelism is logical.

spark/src/main/scala/ai/chronon/spark/streaming/TopicCheckerApp.scala (2)

32-34: Risk of None.get on host/port.
This repeats a previous comment about handling potential None values.


39-40: Consider returning a status code instead of System.exit(0).
This repeats a previous suggestion to avoid direct system termination.

online/src/main/scala/ai/chronon/online/TopicChecker.scala (3)

36-37: Close the admin client.
Same concern from an earlier review. The client is never closed and can cause resource leaks.


43-44: Close the admin client in topicShouldExist too.
Same resource leak concern applies here.


82-85: mapToJavaProperties is concise.
It clearly maps extra props to Java Properties.

flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (4)

Line range hint 43-67: LGTM! Good improvements to the test source.

The changes to use Row type and mockPartitionCount parameter enhance testing flexibility.


80-91: LGTM! Well-structured schema definition.

The schema definition and Avro conversion are clear and type-safe.


93-104: LGTM! Clean implementation of source creation.

Good use of AvroCodec for serialization/deserialization pipeline.


181-195: LGTM! Consistent with Row-based processing changes.

Good integration with SparkExpressionEvalFn and proper encoder setup.

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (4)

258-276: LGTM! Robust watermark strategy implementation.

Well-configured for handling late events and low traffic scenarios.


285-288: LGTM! Good transition to real Kafka sources.

Proper handling of optional Kafka configuration.


311-347: LGTM! Well-structured source creation logic.

Good error handling and schema registry integration.


373-381: LGTM! Clean job execution setup.

Good metrics sink configuration with matching parallelism.

build.sbt (5)

220-220: LGTM!

Confluent Maven repository is required for schema registry dependencies.


Line range hint 229-236: LGTM!

Assembly merge strategy correctly handles META-INF services and configuration files.


Line range hint 237-246: LGTM!

Assembly exclusions prevent runtime errors with proper documentation.


251-251: LGTM!

Protobuf provider in test scope matches schema registry client version.


227-228: Verify schema registry client version compatibility.

Dependencies look good, but let's verify the version compatibility.

flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (3)

27-33: LGTM! Good use of @transient and lazy initialization.


83-90: LGTM! Clean factory method implementation.


21-25: Add schema validation in encoder method.

Validate jsonSchema format before creating AvroDataToCatalyst to fail fast on invalid schemas.

Comment on lines +56 to +69
val maybeMessage = if (schemaRegistryWireFormat) {
// schema id is set, we skip the first byte and read the schema id based on the wire format:
// https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#messages-wire-format
val buffer = ByteBuffer.wrap(messageBytes)
buffer.get
val messageSchemaId = buffer.getInt

// unfortunately we need to drop the first 5 bytes (and thus copy the rest of the byte array) as the AvroDataToCatalyst
// interface takes a byte array and the methods to do the Row conversion etc are all private so we can't reach in
doDeserialize(messageBytes.drop(5),
s"Failed to deserialize message from Avro Bytes to InternalRow. Message schema id $messageSchemaId")
} else {
doDeserialize(messageBytes, "Failed to deserialize message from Avro Bytes to InternalRow")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add wire format validation and optimize memory usage.

  1. Validate magic byte value
  2. Add buffer bounds check
  3. Consider using ByteBuffer.slice() to avoid array copy
 val maybeMessage = if (schemaRegistryWireFormat) {
   val buffer = ByteBuffer.wrap(messageBytes)
+  if (buffer.remaining() < 5) {
+    throw new IllegalArgumentException("Message too short for schema registry format")
+  }
-  buffer.get
+  val magicByte = buffer.get()
+  if (magicByte != 0) {
+    throw new IllegalArgumentException(s"Invalid magic byte: $magicByte")
+  }
   val messageSchemaId = buffer.getInt
-  doDeserialize(messageBytes.drop(5),
+  val messageBuffer = buffer.slice()
+  doDeserialize(messageBuffer.array(),

Committable suggestion skipped: line range outside the PR's diff.

@piyush-zlai piyush-zlai force-pushed the piyush/schema_registry branch 2 times, most recently from 6b13299 to 6b0a888 Compare January 28, 2025 20:47
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (7)
flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (2)

44-53: Revisit null on parse failure.

Swallowing errors by returning null may mask corrupted data trends. Consider an alternative strategy to handle corrupted messages (e.g., send to a DLQ).


83-90: Encapsulation enhancement.

Building and returning both encoder and deserializer is convenient but might encourage misuse. Return separate builders for clarity and maintain single responsibility.

flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1)

33-33: Implicit parallelism can be confusing.
Consider explicit assignment for clarity.

flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (2)

93-104: Consider parameterizing test data range.

The hardcoded range (0 until 10) limits test flexibility.

-    val elements: Seq[Map[String, AnyRef]] = (0 until 10).map(i =>
+    def makeSource(mockPartitionCount: Int, numElements: Int = 10): FlinkSource[Row] = {
+    val elements: Seq[Map[String, AnyRef]] = (0 until numElements).map(i =>

181-195: Make parallelism configurable.

Hardcoded parallelism=1 limits testing scenarios.

-  def buildTestFlinkJob(api: Api): FlinkJob[Row] = {
+  def buildTestFlinkJob(api: Api, parallelism: Int = 1): FlinkJob[Row] = {
     val groupBy = makeGroupBy(Seq("id"))
-    val parallelism = 1
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)

317-323: Enhance error message clarity.

Current message doesn't list supported alternatives.

-                    "We only support schema registry based schema lookups. Missing registry_url in topic config")
+                    "Schema registry based lookups require 'registry_url' parameter in topic config. Example: 'registry_url=http://schema-registry:8081'")

304-304: Make metadata store timeout configurable.

Hardcoded timeout limits flexibility.

-    val metadataStore = new MetadataStore(api.genKvStore, MetadataDataset, timeoutMillis = 10000)
+    val metadataStore = new MetadataStore(api.genKvStore, MetadataDataset, 
+      timeoutMillis = props.getOrElse("metadata.timeout.millis", "10000").toLong)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between a381571 and 6b13299.

📒 Files selected for processing (21)
  • aggregator/src/main/scala/ai/chronon/aggregator/stats/EditDistance.scala (1 hunks)
  • aggregator/src/test/scala/ai/chronon/aggregator/test/EditDistanceTest.scala (1 hunks)
  • build.sbt (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (10 hunks)
  • flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SourceProvider.scala (0 hunks)
  • flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (5 hunks)
  • flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (1 hunks)
  • flink/src/test/resources/user.avsc (1 hunks)
  • flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala (1 hunks)
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala (2 hunks)
  • online/src/main/scala/ai/chronon/online/TopicChecker.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/KafkaStreamBuilder.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/TopicCheckerApp.scala (1 hunks)
💤 Files with no reviewable changes (1)
  • flink/src/main/scala/ai/chronon/flink/SourceProvider.scala
🚧 Files skipped from review as they are similar to previous changes (14)
  • aggregator/src/main/scala/ai/chronon/aggregator/stats/EditDistance.scala
  • spark/src/main/scala/ai/chronon/spark/streaming/KafkaStreamBuilder.scala
  • spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala
  • aggregator/src/test/scala/ai/chronon/aggregator/test/EditDistanceTest.scala
  • spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala
  • spark/src/main/scala/ai/chronon/spark/Driver.scala
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
  • flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala
  • flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala
  • flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala
  • build.sbt
  • online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala
  • flink/src/test/resources/user.avsc
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: table_utils_delta_format_spark_tests
  • GitHub Check: other_spark_tests
  • GitHub Check: mutation_spark_tests
  • GitHub Check: join_spark_tests
  • GitHub Check: fetcher_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: no_spark_scala_tests
  • GitHub Check: enforce_triggered_workflows
🔇 Additional comments (15)
flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (2)

21-25: Clarify nullable fields in schema.

AvroDataToCatalyst usage with null for the original schema might cause confusion if the Avro message contains nullable fields. Ensure correct handling of optional fields to avoid potential NullPointerExceptions.


56-69: Add wire format validation and avoid array copying.

You are skipping 5 bytes for the Confluent wire format but no length check is performed. Also, consider using ByteBuffer.slice() instead of drop() to save memory.

flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (3)

15-29: Handle missing host key gracefully.
Accessing topicInfo.params("host") directly could throw an exception if the key is absent. Use safer lookups.


30-31: Close AdminClient to avoid leaks.
TopicChecker.topicShouldExist never closes the AdminClient resource.


35-52: Verify no-watermark usage.
Currently uses noWatermarks(). Confirm if event-time processing is unnecessary.

spark/src/main/scala/ai/chronon/spark/streaming/TopicCheckerApp.scala (3)

14-21: Args class is fine.
No issues found.


32-34: Null check host and port.
Avoid host.get or port.get. Use pattern matching or safe defaults.


39-41: Prefer returning exit code.
System.exit makes testing harder.

online/src/main/scala/ai/chronon/online/TopicChecker.scala (3)

36-42: Close AdminClient after partition check.
Lack of closure can leak resources.


Line range hint 43-80: Close AdminClient in topicShouldExist.
Wrap AdminClient creation in a try-finally block.


82-85: mapToJavaProperties looks good.
No concerns here.

flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (2)

Line range hint 43-67: LGTM: Generic Row type enables schema flexibility.

The transition from E2ETestEvent to Row type aligns with schema registry integration.


80-92: LGTM: Well-structured schema definition.

Schema includes essential fields and proper Avro conversion.

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)

258-276: LGTM: Robust watermark strategy implementation.

Properly handles out-of-order events and idleness with configurable timeouts.


285-288: LGTM: Production-ready argument defaults.

Appropriate transition from mock to real sources.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1)

30-31: ⚠️ Potential issue

** Topic existence check.**
AdminClient is never explicitly closed.

spark/src/main/scala/ai/chronon/spark/streaming/TopicCheckerApp.scala (1)

33-34: ⚠️ Potential issue

** Avoid potential NPE.**
Use option checks or pattern matching for host/port.

online/src/main/scala/ai/chronon/online/TopicChecker.scala (1)

36-37: ⚠️ Potential issue

** Close AdminClient to prevent resource leak.**
Wrap it in a try-finally or use try-with-resources.

 def getPartitions(topic: String, bootstrap: String, additionalProps: Map[String, String] = Map.empty): Int = {
   val props = mapToJavaProperties(...)
   val adminClient = AdminClient.create(props)
+  try {
     val topicDescription = adminClient.describeTopics(...).values().get(topic)
     topicDescription.get().partitions().size()
+  } finally {
+    adminClient.close()
+  }
 }
flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (1)

56-69: ⚠️ Potential issue

Add wire format validation.

Previous review comment still applies.

 val maybeMessage = if (schemaRegistryWireFormat) {
   val buffer = ByteBuffer.wrap(messageBytes)
+  if (buffer.remaining() < 5) {
+    throw new IllegalArgumentException("Message too short for schema registry format")
+  }
-  buffer.get
+  val magicByte = buffer.get()
+  if (magicByte != 0) {
+    throw new IllegalArgumentException(s"Invalid magic byte: $magicByte")
+  }
   val messageSchemaId = buffer.getInt
-  doDeserialize(messageBytes.drop(5),
+  val messageBuffer = buffer.slice()
+  doDeserialize(messageBuffer.array(),
🧹 Nitpick comments (3)
flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1)

20-29: Validate host & port presence.
Potential confusion with multiline host-port parsing. Consider clearer code for host + ":" + port.

val bootstrap: String =
-  kafkaBootstrap.getOrElse(
-    topicInfo.params.getOrElse(
-      "bootstrap",
-      topicInfo.params("host") + topicInfo.params
-        .get("port")
-        .map(":" + _)
-        .getOrElse(...)
-    ))
+  kafkaBootstrap.getOrElse(
+    topicInfo.params.getOrElse("bootstrap",
+      s"${topicInfo.params("host")}:${topicInfo.params.getOrElse("port",
+        throw new IllegalArgumentException("No bootstrap servers provided"))}"
+    ))
flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (2)

18-25: Consider making schemaRegistryWireFormat private.

The field is only used internally.

-class AvroDeserializationSchema(topicName: String, jsonSchema: String, schemaRegistryWireFormat: Boolean)
+class AvroDeserializationSchema(topicName: String, jsonSchema: String, private val schemaRegistryWireFormat: Boolean)

71-80: Consider using Option instead of null.

Using null for error cases is not idiomatic Scala.

-    val deserTry = maybeMessage.map(m => sparkRowDeser(m)).recover {
+    val deserTry = maybeMessage.map(m => Option(sparkRowDeser(m))).recover {
       case e: Exception =>
         logger.error("Failed to deserialize InternalRow to Row", e)
         deserializationErrorCounter.inc()
-        null
+        None
     }
 
-    deserTry.get
+    deserTry.get.orNull  // Only convert to null at the boundary
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 6b13299 and 6b0a888.

📒 Files selected for processing (21)
  • aggregator/src/main/scala/ai/chronon/aggregator/stats/EditDistance.scala (1 hunks)
  • aggregator/src/test/scala/ai/chronon/aggregator/test/EditDistanceTest.scala (1 hunks)
  • build.sbt (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (10 hunks)
  • flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SourceProvider.scala (0 hunks)
  • flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (5 hunks)
  • flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (1 hunks)
  • flink/src/test/resources/user.avsc (1 hunks)
  • flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala (1 hunks)
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala (2 hunks)
  • online/src/main/scala/ai/chronon/online/TopicChecker.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/KafkaStreamBuilder.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/TopicCheckerApp.scala (1 hunks)
💤 Files with no reviewable changes (1)
  • flink/src/main/scala/ai/chronon/flink/SourceProvider.scala
🚧 Files skipped from review as they are similar to previous changes (14)
  • aggregator/src/main/scala/ai/chronon/aggregator/stats/EditDistance.scala
  • spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala
  • spark/src/main/scala/ai/chronon/spark/streaming/KafkaStreamBuilder.scala
  • aggregator/src/test/scala/ai/chronon/aggregator/test/EditDistanceTest.scala
  • spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala
  • online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
  • flink/src/test/resources/user.avsc
  • flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala
  • flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala
  • flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala
  • build.sbt
  • spark/src/main/scala/ai/chronon/spark/Driver.scala
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: table_utils_delta_format_spark_tests
  • GitHub Check: other_spark_tests
  • GitHub Check: mutation_spark_tests
  • GitHub Check: join_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: no_spark_scala_tests
  • GitHub Check: fetcher_spark_tests
🔇 Additional comments (14)
flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (3)

15-17: Good constructor design.
Well-structured constructor parameters for Kafka and topic info.


33-33: Parallelism looks good.
Automatically matching the partition count is sensible.


35-52: Sufficient Kafka source initialization.
No watermarks used; verify if your use case requires them.

spark/src/main/scala/ai/chronon/spark/streaming/TopicCheckerApp.scala (1)

39-39: ** Prefer returning exit code.**
Direct System.exit complicates testing.

online/src/main/scala/ai/chronon/online/TopicChecker.scala (1)

82-85: mapToJavaProperties looks concise.
Implementation is straightforward and clear.

flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (4)

5-6: LGTM!

Clean import organization and well-structured schema definition.

Also applies to: 11-12, 15-15, 18-18, 29-31, 80-89


Line range hint 43-67: LGTM!

Clean transition to Row-based data handling with proper parallelism configuration.


93-104: LGTM!

Clean implementation of Avro-based Row conversion pipeline.


181-195: LGTM!

Clean integration with Avro-based schema handling.

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (3)

259-277: LGTM!

Well-structured watermark strategy with proper handling of out-of-order events and idleness.


286-289: LGTM!

Clean addition of Kafka configuration with proper optionality.


312-348: Extract source creation logic into a separate method.

The source creation logic is well-implemented but would be more maintainable as a separate method.

Consider extracting the logic into a method like:

+  private def createSource(servingInfo: GroupByServingInfoParsed, kafkaBootstrap: Option[String]): (FlinkSource[Row], Encoder[Row]) = {
+    val topicUri = servingInfo.groupBy.streamingSource.get.topic
+    val topicInfo = TopicInfo.parse(topicUri)
+
+    val schemaProvider =
+      topicInfo.params.get("registry_url") match {
+        case Some(_) => new SchemaRegistrySchemaProvider(topicInfo.params)
+        case None =>
+          throw new IllegalArgumentException(
+            "We only support schema registry based schema lookups. Missing registry_url in topic config")
+      }
+
+    val (encoder, deserializationSchema) = schemaProvider.buildEncoderAndDeserSchema(topicInfo)
+    val source =
+      topicInfo.messageBus match {
+        case "kafka" =>
+          new KafkaFlinkSource(kafkaBootstrap, deserializationSchema, topicInfo)
+        case _ =>
+          throw new IllegalArgumentException(s"Unsupported message bus: ${topicInfo.messageBus}")
+      }
+
+    (source, encoder)
+  }
flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (2)

83-90: LGTM: Companion object implementation looks good.

Clean and straightforward factory method.


21-25: Validate jsonSchema in encoder.

Missing schema validation could cause runtime errors.

 def encoder: Encoder[Row] = {
+  require(jsonSchema != null && jsonSchema.nonEmpty, "JSON schema cannot be null or empty")
   val avroDeserializer = AvroDataToCatalyst(null, jsonSchema, Map.empty)
   val catalystType = avroDeserializer.dataType.asInstanceOf[StructType]
   Encoders.row(catalystType)
 }
✅ Verification successful

Add schema validation to prevent runtime errors

 def encoder: Encoder[Row] = {
+  require(jsonSchema != null && jsonSchema.nonEmpty, "JSON schema cannot be null or empty")
   val avroDeserializer = AvroDataToCatalyst(null, jsonSchema, Map.empty)
   val catalystType = avroDeserializer.dataType.asInstanceOf[StructType]
   Encoders.row(catalystType)
 }
🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Find the class definition and related validation patterns
ast-grep --pattern 'class AvroDeserializationSupport {
  $$$
}'

# Search for similar validation patterns
rg "require.*schema.*null" -A 2

# Find usages of the encoder
rg "encoder.*Row" -A 2

Length of output: 2002

@tchow-zlai
Copy link
Collaborator

@piyush-zlai there's a test failure - not sure if it's new

@piyush-zlai piyush-zlai force-pushed the piyush/schema_registry branch from 6b0a888 to de868fd Compare January 29, 2025 01:11
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (1)

Line range hint 43-67: Add backpressure handling to prevent resource exhaustion.

The infinite loop with fixed delay could cause issues under load.

   override def run(ctx: SourceFunction.SourceContext[Row]): Unit = {
+    val maxRetries = 3
+    var retryCount = 0
     while (isRunning) {
-      mockEvents.foreach { event =>
-        ctx.collect(event)
+      try {
+        mockEvents.foreach { event =>
+          ctx.collect(event)
+        }
+        retryCount = 0
+        Thread.sleep(1000)
+      } catch {
+        case e: Exception if retryCount < maxRetries =>
+          retryCount += 1
+          Thread.sleep(1000 * retryCount)
+        case e: Exception =>
+          isRunning = false
+          throw e
       }
-      // Add some delay between event batches
-      Thread.sleep(1000)
     }
   }
♻️ Duplicate comments (3)
spark/src/main/scala/ai/chronon/spark/streaming/TopicCheckerApp.scala (2)

32-34: ⚠️ Potential issue

Add null checks for host/port retrieval.

Direct use of .get on Option can throw NPE.

-      lazy val host = tokens.get("host")
-      lazy val port = tokens.get("port")
-      lazy val hostPort = s"${host.get}:${port.get}"
+      lazy val hostPort = for {
+        host <- tokens.get("host")
+        port <- tokens.get("port")
+      } yield s"$host:$port"
+      lazy val finalHostPort = hostPort.getOrElse(throw new IllegalArgumentException("Missing host/port in config"))

39-41: 🛠️ Refactor suggestion

Consider returning exit code instead of System.exit.

Direct System.exit calls make testing difficult.

-    logger.info(getPartitions(topic, bootstrap).toString)
-    System.exit(0)
+    logger.info(getPartitions(topic, bootstrap).toString)
+    // Return exit code instead
+    0
flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1)

30-31: ⚠️ Potential issue

Close AdminClient to avoid leaks.

Topic existence check doesn't ensure AdminClient is closed.

-  TopicChecker.topicShouldExist(topicInfo.name, bootstrap, topicInfo.params)
+  try {
+    TopicChecker.topicShouldExist(topicInfo.name, bootstrap, topicInfo.params)
+  } catch {
+    case e: Exception =>
+      throw new IllegalArgumentException(s"Failed to verify topic existence: ${topicInfo.name}", e)
+  }
🧹 Nitpick comments (2)
flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1)

20-28: Simplify bootstrap server configuration logic.

The nested getOrElse and map operations make error scenarios hard to trace.

-  val bootstrap: String =
-    kafkaBootstrap.getOrElse(
-      topicInfo.params.getOrElse(
-        "bootstrap",
-        topicInfo.params("host") + topicInfo.params
-          .get("port")
-          .map(":" + _)
-          .getOrElse(throw new IllegalArgumentException("No bootstrap servers provided"))
-      ))
+  val bootstrap: String = {
+    def fromParams = for {
+      host <- topicInfo.params.get("host")
+      port <- topicInfo.params.get("port")
+    } yield s"$host:$port"
+    
+    kafkaBootstrap
+      .orElse(topicInfo.params.get("bootstrap"))
+      .orElse(fromParams)
+      .getOrElse(throw new IllegalArgumentException(
+        "No bootstrap servers provided. Set via kafkaBootstrap option or topic params (bootstrap or host:port)"))
+  }
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (1)

318-326: Add schema registry URL validation.

Ensure registry URL is well-formed before attempting connection.

   val schemaProvider =
     topicInfo.params.get("registry_url") match {
-      case Some(_) => new SchemaRegistrySchemaProvider(topicInfo.params)
+      case Some(url) if url.startsWith("http") => new SchemaRegistrySchemaProvider(topicInfo.params)
+      case Some(url) => throw new IllegalArgumentException(s"Invalid schema registry URL format: $url")
       case None =>
         throw new IllegalArgumentException(
           "We only support schema registry based schema lookups. Missing registry_url in topic config")
     }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 6b0a888 and de868fd.

📒 Files selected for processing (21)
  • aggregator/src/main/scala/ai/chronon/aggregator/stats/EditDistance.scala (1 hunks)
  • aggregator/src/test/scala/ai/chronon/aggregator/test/EditDistanceTest.scala (1 hunks)
  • build.sbt (2 hunks)
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (10 hunks)
  • flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala (1 hunks)
  • flink/src/main/scala/ai/chronon/flink/SourceProvider.scala (0 hunks)
  • flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (5 hunks)
  • flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (1 hunks)
  • flink/src/test/resources/user.avsc (1 hunks)
  • flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala (1 hunks)
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala (1 hunks)
  • online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala (2 hunks)
  • online/src/main/scala/ai/chronon/online/TopicChecker.scala (3 hunks)
  • spark/src/main/scala/ai/chronon/spark/Driver.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/KafkaStreamBuilder.scala (1 hunks)
  • spark/src/main/scala/ai/chronon/spark/streaming/TopicCheckerApp.scala (1 hunks)
💤 Files with no reviewable changes (1)
  • flink/src/main/scala/ai/chronon/flink/SourceProvider.scala
🚧 Files skipped from review as they are similar to previous changes (13)
  • aggregator/src/main/scala/ai/chronon/aggregator/stats/EditDistance.scala
  • aggregator/src/test/scala/ai/chronon/aggregator/test/EditDistanceTest.scala
  • spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala
  • spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala
  • spark/src/main/scala/ai/chronon/spark/streaming/KafkaStreamBuilder.scala
  • online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala
  • cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
  • flink/src/test/resources/user.avsc
  • flink/src/main/scala/ai/chronon/flink/SchemaProvider.scala
  • flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala
  • spark/src/main/scala/ai/chronon/spark/Driver.scala
  • flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala
  • flink/src/test/scala/org/apache/spark/sql/avro/AvroDeSerializationSupportSpec.scala
⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: no_spark_scala_tests
  • GitHub Check: fetcher_spark_tests
  • GitHub Check: other_spark_tests
  • GitHub Check: join_spark_tests
  • GitHub Check: mutation_spark_tests
🔇 Additional comments (15)
flink/src/main/scala/org/apache/spark/sql/avro/AvroDeserializationSupport.scala (5)

56-69: Add wire format validation.

Missing magic byte validation and bounds check.


44-54: LGTM! Robust error handling.

Good use of Try, logging, and metrics.


34-42: LGTM! Clean initialization.

Well-structured metrics setup.


83-90: LGTM! Clean factory method.

Good use of companion object pattern.


21-25: ⚠️ Potential issue

Validate schema and handle null cases.

AvroDataToCatalyst(null, ...) looks suspicious.

spark/src/main/scala/ai/chronon/spark/streaming/TopicCheckerApp.scala (1)

1-21: Clean implementation of command-line argument parsing.

The Args class is well-structured with proper logging setup.

flink/src/main/scala/ai/chronon/flink/KafkaFlinkSource.scala (1)

35-52: Well-structured DataStream configuration.

Proper error handling and parallelism settings.

online/src/main/scala/ai/chronon/online/TopicChecker.scala (2)

82-86: Clean utility method implementation.

Good separation of concerns for properties conversion.


36-41: ⚠️ Potential issue

Improve AdminClient resource management.

Missing proper cleanup and error handling.

   def getPartitions(topic: String, bootstrap: String, additionalProps: Map[String, String] = Map.empty): Int = {
     val props = mapToJavaProperties(additionalProps ++ Map(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrap))
-    val adminClient = AdminClient.create(props)
-    val topicDescription = adminClient.describeTopics(util.Arrays.asList(topic)).values().get(topic);
-    topicDescription.get().partitions().size()
+    var adminClient: AdminClient = null
+    try {
+      adminClient = AdminClient.create(props)
+      val topicDescription = adminClient.describeTopics(util.Arrays.asList(topic)).values().get(topic)
+      topicDescription.get().partitions().size()
+    } catch {
+      case e: Exception =>
+        throw new IllegalArgumentException(s"Failed to get partitions for topic: $topic", e)
+    } finally {
+      if (adminClient != null) {
+        adminClient.close()
+      }
+    }
   }

Likely invalid or redundant comment.

flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala (1)

93-104: Well-structured source creation logic.

Good use of functional transformations and error handling.

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (2)

259-277: Well-configured watermark strategy.

Good handling of out-of-order events and idleness.


377-385: Well-structured job execution setup.

Good error handling and metrics configuration.

build.sbt (3)

220-220: LGTM!

Confluent resolver is required for schema registry dependencies.


Line range hint 251-255: LGTM!

Test dependency and exclusions are properly configured.


227-228: Verify Confluent schema registry client version compatibility.

Dependencies look good, but verify version compatibility.

✅ Verification successful

Version compatibility verified successfully

The Confluent Schema Registry Client 7.6.0 is compatible with Kafka 3.8.1 client being used in the project.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check compatibility between Confluent Schema Registry and Kafka versions
curl -s https://docs.confluent.io/platform/current/installation/versions-interoperability.html | grep -A 5 "7.6.0"

Length of output: 1151


Script:

#!/bin/bash
# Find Kafka version in build.sbt
rg "kafka" build.sbt -A 1 -B 1

# Try getting compatibility info in a more parseable format
curl -s https://raw.githubusercontent.com/confluentinc/schema-registry/v7.6.0/pom.xml | grep -A 2 "<kafka.version>"

Length of output: 1772

@piyush-zlai piyush-zlai merged commit 38ea292 into main Jan 29, 2025
11 checks passed
@piyush-zlai piyush-zlai deleted the piyush/schema_registry branch January 29, 2025 01:28
nikhil-zlai pushed a commit that referenced this pull request Feb 4, 2025
…chema (#283)

## Summary
Wire up support to look up schemas from schema registry + set up
appropriate Avro Spark encoders to help with SparkExprEval.
There's a few things to call out:
* Had to move some TopicChecker code around (moved it out of the spark
module) to be able to re-use topic presence checks + partition count
lookups. We set parallelism based on partition count.
* Look up schemas in schema registry atm. We can add an implementation
that does these based on jar artifacts. The schema registry support can
also mean the Kafka wire format is schema registry (this is the Etsy
default). In which case every Kafka message consists of a magic byte +
schema ID int. There's other ways of using the registry (e.g. the way we
had it at Stripe) where the wire format is the avro / thrift / proto
bytes but the lookup of the schema is done based on topic + subject
only.
* Add support for a Flink Kafka source which plugs in a
[DeserializationSchema](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java).
This currently for us is Avro only and essentially allows us to crack
open Avro Array[Bytes] to a Spark Row that we use over the rest of the
Flink job (and makes it easier to run expr eval).
* Assign watermarks post spark expr eval -> this allows us to use the
user GroupBy Source configured timestamp column to accurately set
watermarks and timestamps. This is not needed in the untiled but will be
essential when we turn on tiling to ensure correctness.

## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested - Kicked off on the cluster -
[Job](https://zztr2oh6h5dktnjaibsfknngae-dot-us-central1.dataproc.googleusercontent.com/gateway/default/yarn/proxy/application_1738016914840_0004/)
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

Based on the comprehensive changes across multiple files, here are the
updated release notes:

- **New Features**
	- Added Kafka Schema Registry support for Flink and Spark.
- Introduced Avro deserialization support for Row-based data processing.
	- Enhanced event stream processing with watermarking capabilities.
	- Added support for dynamic topic and message bus configuration.
	- New `KafkaFlinkSource` class for integrating Kafka with Flink.
	- New `SchemaRegistrySchemaProvider` class for managing Avro schemas.
	- New `TopicCheckerApp` for command-line topic management.
	- New `User` Avro schema for structured user data representation.

- **Improvements**
	- Refactored package and import structures.
	- Improved error handling for schema and topic management.
	- Added more flexible configuration options for streaming sources.
	- Enhanced metric computation with edit distance functionality.

- **Bug Fixes**
- Updated topic and stream handling to support more dynamic
configurations.
	- Resolved potential issues with schema deserialization.

- **Chores**
	- Reorganized project structure.
	- Updated dependencies to include Confluent Schema Registry client.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
…chema (#283)

## Summary
Wire up support to look up schemas from schema registry + set up
appropriate Avro Spark encoders to help with SparkExprEval.
There's a few things to call out:
* Had to move some TopicChecker code around (moved it out of the spark
module) to be able to re-use topic presence checks + partition count
lookups. We set parallelism based on partition count.
* Look up schemas in schema registry atm. We can add an implementation
that does these based on jar artifacts. The schema registry support can
also mean the Kafka wire format is schema registry (this is the Etsy
default). In which case every Kafka message consists of a magic byte +
schema ID int. There's other ways of using the registry (e.g. the way we
had it at Stripe) where the wire format is the avro / thrift / proto
bytes but the lookup of the schema is done based on topic + subject
only.
* Add support for a Flink Kafka source which plugs in a
[DeserializationSchema](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java).
This currently for us is Avro only and essentially allows us to crack
open Avro Array[Bytes] to a Spark Row that we use over the rest of the
Flink job (and makes it easier to run expr eval).
* Assign watermarks post spark expr eval -> this allows us to use the
user GroupBy Source configured timestamp column to accurately set
watermarks and timestamps. This is not needed in the untiled but will be
essential when we turn on tiling to ensure correctness.

## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested - Kicked off on the cluster -
[Job](https://zztr2oh6h5dktnjaibsfknngae-dot-us-central1.dataproc.googleusercontent.com/gateway/default/yarn/proxy/application_1738016914840_0004/)
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

Based on the comprehensive changes across multiple files, here are the
updated release notes:

- **New Features**
	- Added Kafka Schema Registry support for Flink and Spark.
- Introduced Avro deserialization support for Row-based data processing.
	- Enhanced event stream processing with watermarking capabilities.
	- Added support for dynamic topic and message bus configuration.
	- New `KafkaFlinkSource` class for integrating Kafka with Flink.
	- New `SchemaRegistrySchemaProvider` class for managing Avro schemas.
	- New `TopicCheckerApp` for command-line topic management.
	- New `User` Avro schema for structured user data representation.

- **Improvements**
	- Refactored package and import structures.
	- Improved error handling for schema and topic management.
	- Added more flexible configuration options for streaming sources.
	- Enhanced metric computation with edit distance functionality.

- **Bug Fixes**
- Updated topic and stream handling to support more dynamic
configurations.
	- Resolved potential issues with schema deserialization.

- **Chores**
	- Reorganized project structure.
	- Updated dependencies to include Confluent Schema Registry client.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
…chema (#283)

## Summary
Wire up support to look up schemas from schema registry + set up
appropriate Avro Spark encoders to help with SparkExprEval.
There's a few things to call out:
* Had to move some TopicChecker code around (moved it out of the spark
module) to be able to re-use topic presence checks + partition count
lookups. We set parallelism based on partition count.
* Look up schemas in schema registry atm. We can add an implementation
that does these based on jar artifacts. The schema registry support can
also mean the Kafka wire format is schema registry (this is the Etsy
default). In which case every Kafka message consists of a magic byte +
schema ID int. There's other ways of using the registry (e.g. the way we
had it at Stripe) where the wire format is the avro / thrift / proto
bytes but the lookup of the schema is done based on topic + subject
only.
* Add support for a Flink Kafka source which plugs in a
[DeserializationSchema](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java).
This currently for us is Avro only and essentially allows us to crack
open Avro Array[Bytes] to a Spark Row that we use over the rest of the
Flink job (and makes it easier to run expr eval).
* Assign watermarks post spark expr eval -> this allows us to use the
user GroupBy Source configured timestamp column to accurately set
watermarks and timestamps. This is not needed in the untiled but will be
essential when we turn on tiling to ensure correctness.

## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested - Kicked off on the cluster -
[Job](https://zztr2oh6h5dktnjaibsfknngae-dot-us-central1.dataproc.googleusercontent.com/gateway/default/yarn/proxy/application_1738016914840_0004/)
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

Based on the comprehensive changes across multiple files, here are the
updated release notes:

- **New Features**
	- Added Kafka Schema Registry support for Flink and Spark.
- Introduced Avro deserialization support for Row-based data processing.
	- Enhanced event stream processing with watermarking capabilities.
	- Added support for dynamic topic and message bus configuration.
	- New `KafkaFlinkSource` class for integrating Kafka with Flink.
	- New `SchemaRegistrySchemaProvider` class for managing Avro schemas.
	- New `TopicCheckerApp` for command-line topic management.
	- New `User` Avro schema for structured user data representation.

- **Improvements**
	- Refactored package and import structures.
	- Improved error handling for schema and topic management.
	- Added more flexible configuration options for streaming sources.
	- Enhanced metric computation with edit distance functionality.

- **Bug Fixes**
- Updated topic and stream handling to support more dynamic
configurations.
	- Resolved potential issues with schema deserialization.

- **Chores**
	- Reorganized project structure.
	- Updated dependencies to include Confluent Schema Registry client.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
…chema (#283)

## Summary
Wire up support to look up schemas from schema registry + set up
appropriate Avro Spark encoders to help with SparkExprEval.
There's a few things to call out:
* Had to move some TopicChecker code around (moved it out of the spark
module) to be able to re-use topic presence checks + partition count
lookups. We set parallelism based on partition count.
* Look up schemas in schema registry atm. We can add an implementation
that does these based on jar artifacts. The schema registry support can
also mean the Kafka wire format is schema registry (this is the our clients
default). In which case every Kafka message consists of a magic byte +
schema ID int. There's other ways of using the registry (e.g. the way we
had it at Stripe) where the wire format is the avro / thrift / proto
bytes but the lookup of the schema is done based on topic + subject
only.
* Add support for a Flink Kafka source which plugs in a
[DeserializationSchema](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java).
This currently for us is Avro only and essentially allows us to crack
open Avro Array[Bytes] to a Spark Row that we use over the rest of the
Flink job (and makes it easier to run expr eval).
* Assign watermarks post spark expr eval -> this allows us to use the
user GroupBy Source configured timestamp column to accurately set
watermarks and timestamps. This is not needed in the untiled but will be
essential when we turn on tiling to ensure correctness.

## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested - Kicked off on the cluster -
[Job](https://zztr2oh6h5dktnjaibsfknngae-dot-us-central1.dataproc.googleusercontent.com/gateway/default/yarn/proxy/application_1738016914840_0004/)
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

Based on the comprehensive changes across multiple files, here are the
updated release notes:

- **New Features**
	- Added Kafka Schema Registry support for Flink and Spark.
- Introduced Avro deserialization support for Row-based data processing.
	- Enhanced event stream processing with watermarking capabilities.
	- Added support for dynamic topic and message bus configuration.
	- New `KafkaFlinkSource` class for integrating Kafka with Flink.
	- New `SchemaRegistrySchemaProvider` class for managing Avro schemas.
	- New `TopicCheckerApp` for command-line topic management.
	- New `User` Avro schema for structured user data representation.

- **Improvements**
	- Refactored package and import structures.
	- Improved error handling for schema and topic management.
	- Added more flexible configuration options for streaming sources.
	- Enhanced metric computation with edit distance functionality.

- **Bug Fixes**
- Updated topic and stream handling to support more dynamic
configurations.
	- Resolved potential issues with schema deserialization.

- **Chores**
	- Reorganized project structure.
	- Updated dependencies to include Confluent Schema Registry client.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
…chema (#283)

## Summary
Wire up support to look up schemas from schema registry + set up
appropriate Avro Spark encoders to help with SparkExprEval.
There's a few things to call out:
* Had to move some TopicChecker code around (moved it out of the spark
module) to be able to re-use topic presence checks + partition count
lookups. We set parallelism based on partition count.
* Look up schemas in schema registry atm. We can add an implementation
that does these based on jar artifacts. The schema registry support can
also mean the Kafka wire format is schema registry (this is the our clients
default). In which case every Kafka message consists of a magic byte +
schema ID int. There's other ways of using the registry (e.g. the way we
had it at Stripe) where the wire format is the avro / thrift / proto
bytes but the lookup of the schema is done based on topic + subject
only.
* Add support for a Flink Kafka source which plugs in a
[DeserializationSchema](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java).
This currently for us is Avro only and essentially allows us to crack
open Avro Array[Bytes] to a Spark Row that we use over the rest of the
Flink job (and makes it easier to run expr eval).
* Assign watermarks post spark expr eval -> this allows us to use the
user GroupBy Source configured timestamp column to accurately set
watermarks and timestamps. This is not needed in the untiled but will be
essential when we turn on tiling to ensure correctness.

## Checklist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested - Kicked off on the cluster -
[Job](https://zztr2oh6h5dktnjaibsfknngae-dot-us-central1.dataproc.googleusercontent.com/gateway/default/yarn/proxy/application_1738016914840_0004/)
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

Based on the comprehensive changes across multiple files, here are the
updated release notes:

- **New Features**
	- Added Kafka Schema Registry support for Flink and Spark.
- Introduced Avro deserialization support for Row-based data processing.
	- Enhanced event stream processing with watermarking capabilities.
	- Added support for dynamic topic and message bus configuration.
	- New `KafkaFlinkSource` class for integrating Kafka with Flink.
	- New `SchemaRegistrySchemaProvider` class for managing Avro schemas.
	- New `TopicCheckerApp` for command-line topic management.
	- New `User` Avro schema for structured user data representation.

- **Improvements**
	- Refactored package and import structures.
	- Improved error handling for schema and topic management.
	- Added more flexible configuration options for streaming sources.
	- Enhanced metric computation with edit distance functionality.

- **Bug Fixes**
- Updated topic and stream handling to support more dynamic
configurations.
	- Resolved potential issues with schema deserialization.

- **Chores**
	- Reorganized project structure.
	- Updated dependencies to include Confluent Schema Registry client.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
…chema (#283)

## Summary
Wire up support to look up schemas from schema registry + set up
appropriate Avro Spark encoders to help with SparkExprEval.
There's a few things to call out:
* Had to move some TopicCheour clientser code around (moved it out of the spark
module) to be able to re-use topic presence cheour clientss + partition count
lookups. We set parallelism based on partition count.
* Look up schemas in schema registry atm. We can add an implementation
that does these based on jar artifacts. The schema registry support can
also mean the Kafka wire format is schema registry (this is the our clients
default). In which case every Kafka message consists of a magic byte +
schema ID int. There's other ways of using the registry (e.g. the way we
had it at Stripe) where the wire format is the avro / thrift / proto
bytes but the lookup of the schema is done based on topic + subject
only.
* Add support for a Flink Kafka source which plugs in a
[DeserializationSchema](https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java).
This currently for us is Avro only and essentially allows us to craour clients
open Avro Array[Bytes] to a Spark Row that we use over the rest of the
Flink job (and makes it easier to run expr eval).
* Assign watermarks post spark expr eval -> this allows us to use the
user GroupBy Source configured timestamp column to accurately set
watermarks and timestamps. This is not needed in the untiled but will be
essential when we turn on tiling to ensure correctness.

## Cheour clientslist
- [X] Added Unit Tests
- [ ] Covered by existing CI
- [X] Integration tested - Kiour clientsed off on the cluster -
[Job](https://zztr2oh6h5dktnjaibsfknngae-dot-us-central1.dataproc.googleusercontent.com/gateway/default/yarn/proxy/application_1738016914840_0004/)
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

Based on the comprehensive changes across multiple files, here are the
updated release notes:

- **New Features**
	- Added Kafka Schema Registry support for Flink and Spark.
- Introduced Avro deserialization support for Row-based data processing.
	- Enhanced event stream processing with watermarking capabilities.
	- Added support for dynamic topic and message bus configuration.
	- New `KafkaFlinkSource` class for integrating Kafka with Flink.
	- New `SchemaRegistrySchemaProvider` class for managing Avro schemas.
	- New `TopicCheour clientserApp` for command-line topic management.
	- New `User` Avro schema for structured user data representation.

- **Improvements**
	- Refactored paour clientsage and import structures.
	- Improved error handling for schema and topic management.
	- Added more flexible configuration options for streaming sources.
	- Enhanced metric computation with edit distance functionality.

- **Bug Fixes**
- Updated topic and stream handling to support more dynamic
configurations.
	- Resolved potential issues with schema deserialization.

- **Chores**
	- Reorganized project structure.
	- Updated dependencies to include Confluent Schema Registry client.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants