-
Notifications
You must be signed in to change notification settings - Fork 9
Add PubSub Flink source #794
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis update introduces full support for Google Pub/Sub as a streaming source in Flink jobs. It adds new Scala classes, build targets, Python integration, and deployment scripts for Pub/Sub connectors, refactors source creation to be message-bus agnostic, and updates tests and configuration to enable and validate Pub/Sub ingestion. Changes
Sequence Diagram(s)sequenceDiagram
participant Python as GcpRunner (Python)
participant Submitter as DataprocSubmitter (Scala)
participant Flink as FlinkJob/FlinkSourceProvider
participant PubSub as PubSubFlinkSource
Python->>Submitter: Submit Flink job (with optional Pub/Sub JAR, subscription)
Submitter->>Flink: Build Flink job (includes Pub/Sub JAR if enabled)
Flink->>Flink: Use FlinkSourceProvider to select source
Flink->>PubSub: Instantiate PubSubFlinkSource if message bus is Pub/Sub
PubSub-->>Flink: Provide Flink DataStream from Pub/Sub
Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (7)
flink/src/test/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSourceSpec.scala (2)
8-48: Test suite looks good but could use more specific error checks.Using
assertThrows[Exception]is too generic. Consider using more specific exception types to ensure correct failure modes.
35-36: Add type verification for created source.Verify that
srcis actually aPubSubFlinkSourceinstance usingshouldBe a [PubSubFlinkSource].api/python/ai/chronon/repo/gcp.py (1)
78-86: Consider more descriptive environment variable name.
SUBSCRIPTION_NAMEis generic. Consider using a more specific name likeGCP_PUBSUB_SUBSCRIPTION_NAMEto match other GCP variable naming.flink/src/main/scala/ai/chronon/flink/source/FlinkSourceProvider.scala (1)
29-39: Smart use of reflection for PubSub source.Avoids direct dependency on GCP libraries - good for environments like AWS.
Consider adding error handling for reflection failures:
private def loadPubsubSource[T](props: Map[String, String], deserializationSchema: DeserializationSchema[T], topicInfo: TopicInfo): FlinkSource[T] = { - val cl = Thread.currentThread().getContextClassLoader() // Use Flink's classloader - val cls = cl.loadClass("ai.chronon.flink_connectors.pubsub.PubSubFlinkSource") - val constructor = cls.getConstructors.apply(0) - val onlineImpl = constructor.newInstance(props, deserializationSchema, topicInfo) - onlineImpl.asInstanceOf[FlinkSource[T]] + try { + val cl = Thread.currentThread().getContextClassLoader() // Use Flink's classloader + val cls = cl.loadClass("ai.chronon.flink_connectors.pubsub.PubSubFlinkSource") + val constructor = cls.getConstructors.apply(0) + val onlineImpl = constructor.newInstance(props, deserializationSchema, topicInfo) + onlineImpl.asInstanceOf[FlinkSource[T]] + } catch { + case e: ClassNotFoundException => + throw new IllegalStateException("PubSub connector not found on classpath. Ensure PubSub connector jar is included.", e) + case e: Exception => + throw new IllegalStateException(s"Failed to initialize PubSub source: ${e.getMessage}", e) + } }flink/src/main/scala/ai/chronon/flink_connectors/pubsub/FlinkPubSubItemEventDriver.scala (2)
111-119: Consider alternatives to Thread.sleepUsing Thread.sleep blocks the executing thread. For production, consider using Flink's timer service or rate-limiting operators instead.
44-62: Add parallelism configuration optionSource parallelism is hardcoded to 1. Consider allowing this to be configurable via command line arguments for handling larger datasets.
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala (1)
46-53: Watermark strategy documentation neededThe comment about skipping watermarks is valid but could benefit from more context about where/how watermarks are generated downstream.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (21)
api/python/ai/chronon/repo/gcp.py(4 hunks)api/python/test/canary/group_bys/gcp/item_event_canary.py(1 hunks)api/python/test/canary/teams.py(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala(4 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala(5 hunks)flink/BUILD.bazel(4 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala(5 hunks)flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/source/FlinkSourceProvider.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/source/KafkaFlinkSource.scala(2 hunks)flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala(5 hunks)flink/src/main/scala/ai/chronon/flink_connectors/pubsub/FlinkPubSubItemEventDriver.scala(1 hunks)flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala(2 hunks)flink/src/test/scala/ai/chronon/flink/test/source/KafkaFlinkSourceSpec.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala(2 hunks)flink/src/test/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSourceSpec.scala(1 hunks)maven_install.json(13 hunks)scripts/distribution/build_and_upload_artifacts.sh(3 hunks)spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala(3 hunks)tools/build_rules/dependencies/maven_repository.bzl(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (3)
flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala (2)
flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala (1)
FlinkSource(6-21)flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala (1)
FlinkTestUtils(86-156)
api/python/ai/chronon/repo/gcp.py (1)
api/python/ai/chronon/repo/utils.py (1)
get_environ_arg(49-53)
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (4)
flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala (1)
FlinkSource(6-21)flink/src/main/scala/ai/chronon/flink/source/FlinkSourceProvider.scala (2)
FlinkSourceProvider(6-40)build(7-18)flink/src/main/scala/ai/chronon/flink/source/KafkaFlinkSource.scala (2)
KafkaFlinkSource(12-53)KafkaFlinkSource(55-62)flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (3)
ValidationFlinkJob(84-131)ValidationFlinkJob(133-183)run(134-182)
⏰ Context from checks skipped due to timeout of 90000ms (31)
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: streaming_tests
- GitHub Check: online_tests
- GitHub Check: join_tests
- GitHub Check: service_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: streaming_tests
- GitHub Check: service_commons_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: online_tests
- GitHub Check: fetcher_tests
- GitHub Check: groupby_tests
- GitHub Check: api_tests
- GitHub Check: api_tests
- GitHub Check: batch_tests
- GitHub Check: fetcher_tests
- GitHub Check: aggregator_tests
- GitHub Check: analyzer_tests
- GitHub Check: analyzer_tests
- GitHub Check: flink_tests
- GitHub Check: flink_tests
- GitHub Check: spark_tests
- GitHub Check: spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: aggregator_tests
- GitHub Check: batch_tests
- GitHub Check: python_tests
🔇 Additional comments (59)
api/python/test/canary/teams.py (1)
66-67: PubSub configuration added correctly.The addition of these environment variables enables PubSub functionality and defines the subscription name for GCP team configuration.
tools/build_rules/dependencies/maven_repository.bzl (1)
179-180: Appropriate PubSub connector dependency added.Added the required Flink connector for Google PubSub.
flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala (2)
4-4: Import correctly updated for new package structure.Import statement properly updated to reflect the new package location of FlinkSource.
23-23: Added required parallelism value.The implicit parallelism parameter is correctly implemented to match the FlinkSource interface changes.
flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala (2)
1-1: Package reorganization looks good.Moved to a more specific package name that better represents the source functionality.
8-11: Well-documented parallelism field added.The parallelism field is properly documented and marked as implicit. This change supports the PubSub integration where parallelism configuration is important.
flink/src/test/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSourceSpec.scala (1)
14-16: Avoid passing null as parameter.The
nullargument toFlinkSourceProvider.build()should be replaced with a proper value or use an overloaded method without this parameter.flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala (2)
14-15: Import change looks good.The update to the FlinkSource import path reflects the package restructuring.
52-53: Good addition of implicit parallelism.Adding default parallelism value standardizes test source behavior.
scripts/distribution/build_and_upload_artifacts.sh (3)
157-160: Good implementation for PubSub connector build.The changes correctly add the PubSub connector JAR build step.
173-176: Appropriate validation for PubSub JAR.The check ensures PubSub JAR was successfully built, following the pattern used for other JARs.
204-204: Upload step correctly implemented.The PubSub JAR upload uses consistent metadata and destination path.
api/python/ai/chronon/repo/gcp.py (3)
33-33: Good constant naming for PubSub JAR.Follows the established pattern for JAR file constants.
289-301: Correctly handles PubSub JAR URI conditionally.The implementation only includes PubSub JAR when enabled, which is proper feature flagging.
343-347: Correctly passes subscription configuration.PubSub subscription is properly included in user arguments when enabled.
flink/src/test/scala/ai/chronon/flink/test/source/KafkaFlinkSourceSpec.scala (1)
1-43: LGTM! Test coverage for FlinkSourceProvider's Kafka functionality.Good test coverage for bootstrap server resolution from various sources.
spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala (3)
111-111: Added constant for PubSub connector JAR URI.Consistent naming with existing constants.
139-139: Added CLI argument keyword for PubSub connector JAR.Follows existing naming pattern.
164-164: Added PubSub argument to shared internal args set.Ensures proper arg handling in submission pipeline.
cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala (4)
18-18: Added UUID import for generating unique job IDs.Required for PubSub driver test.
31-31: Updated existing tests with PubSub connector parameter.Properly accommodates new optional parameter.
Also applies to: 92-92
101-122: New test validating PubSub connector JAR inclusion.Verifies JARs are properly combined in submission.
879-904: Replaced Kafka test with PubSub equivalent.Both tests are ignored for CI but useful for local testing.
api/python/test/canary/group_bys/gcp/item_event_canary.py (3)
36-53: Good refactoring - extracted reusable function.Improves code organization and reusability.
58-58: Renamed variable for clarity and updated function call.Better variable naming improves readability.
Also applies to: 60-60
62-65: Added PubSub equivalent source with parallelism configuration.Using 'tasks=4' parameter to set PubSub parallelism level.
flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala (3)
21-24: Good addition of imports.These imports support the new abstracted source creation model and enable checkpointing.
135-135: Source creation properly abstracted.Changed from direct Kafka source instantiation to FlinkSourceProvider, enabling support for PubSub.
Also applies to: 150-151
170-170: Important checkpoint configuration added.AT_LEAST_ONCE checkpointing mode is critical for PubSub which acknowledges messages during checkpoints.
flink/src/main/scala/ai/chronon/flink/FlinkJob.scala (5)
10-10: Updated import for source abstraction.Now includes FlinkSourceProvider for source creation abstraction.
270-272: Good consolidation of streaming parameters.Creating a combined properties map provides unified configuration approach supporting multiple source types.
278-278: Properly updated method call.ValidationFlinkJob.run now correctly receives the combined properties map.
292-292: Method parameter updated consistently.buildFlinkJob now accepts props map instead of Kafka-specific bootstrap parameter.
338-339: Source creation properly abstracted.Using FlinkSourceProvider.build enables support for multiple message bus types.
Also applies to: 355-355
flink/src/main/scala/ai/chronon/flink/source/FlinkSourceProvider.scala (2)
6-18: Well-designed factory method.The build method cleanly routes to appropriate source implementation based on message bus type.
20-27: Good property resolution helper.Properly checks props first, falls back to topic params, and filters empty values.
flink/src/main/scala/ai/chronon/flink/source/KafkaFlinkSource.scala (4)
12-23: Good refactoring of bootstrap resolution.First checks props map, then falls back to host/port from topicInfo. Clear error message when bootstrap is missing.
28-30: Efficient lazy parallelism calculation.Properly calculates parallelism only when needed.
34-35: Moved topic check to appropriate location.Topic existence check moved into getDataStream method where it logically belongs.
55-62: Well-structured companion object.Good Scala pattern with constant definition and helper method.
maven_install.json (13)
3-4: Autogenerated hashes updated.
490-496: Add gRPC Pub/Sub v1 artifact metadata.
2900-2906: Add Flink Pub/Sub connector artifact metadata.
5478-5497: Declare dependencies for grpc-google-cloud-pubsub-v1.
7329-7340: Register transitive deps for Flink GCP Pub/Sub connector.
9891-9893: Map Pub/Sub proto package.
13919-13923: List Flink Pub/Sub connector packages.
24307-24308: Include Pub/Sub artifact sources.
24995-24996: Reference Flink Pub/Sub connector in sources.
25779-25780: Include Pub/Sub gRPC jar and sources.
26467-26468: Add Flink Pub/Sub connector JAR references.
27251-27252: Add Pub/Sub gRPC jar & sources entry.
27939-27940: Include Flink Pub/Sub connector in artifact lists.flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala (2)
33-35: Good validation of required propertiesRequired properties are properly checked and clear error messages are provided.
11-23: Well-documented PubSub differencesGood explanation of key differences between PubSub and Kafka, particularly around topic parallelism and subscription behavior.
flink/BUILD.bazel (2)
95-144: Clean connector target isolationGood separation of PubSub connector into dedicated targets, preventing dependency conflicts.
3-3: Appropriate source glob narrowingCorrectly narrowed source file patterns to specific package structure.
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala (2)
291-298: Well-implemented JAR URI handlingCorrectly appends PubSub connector JAR to job configuration when provided.
397-408: Good optional property handlingProperly extracts optional PubSub connector URI and conditionally adds it to job properties.
| package ai.chronon.flink_connectors.pubsub | ||
|
|
||
| import org.apache.avro.Schema | ||
| import org.apache.avro.generic.GenericRecord | ||
| import org.apache.flink.api.common.functions.MapFunction | ||
| import org.apache.flink.api.common.typeinfo.TypeInformation | ||
| import org.apache.flink.core.fs.Path | ||
| import org.apache.flink.formats.avro.{AvroInputFormat, AvroSerializationSchema} | ||
| import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo | ||
| import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils | ||
| import org.apache.flink.streaming.api.datastream.DataStream | ||
| import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment | ||
| import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink | ||
| import org.rogach.scallop.{ScallopConf, ScallopOption, Serialization} | ||
|
|
||
| // Canary test app that can point to a source data file and will emit an event to PubSub periodically with an updated timestamp | ||
| object FlinkPubSubItemEventDriver { | ||
| // Pull in the Serialization trait to sidestep: https://github.com/scallop/scallop/issues/137 | ||
| class JobArgs(args: Seq[String]) extends ScallopConf(args) with Serialization { | ||
| val dataFileName: ScallopOption[String] = | ||
| opt[String](required = true, descr = "Name of the file on GCS to read data from") | ||
| val gcpProject: ScallopOption[String] = | ||
| opt[String](required = true, descr = "Gcp project") | ||
| val topic: ScallopOption[String] = opt[String](required = true, descr = "PubSub topic to write to") | ||
| val parentJobId: ScallopOption[String] = | ||
| opt[String](required = false, | ||
| descr = "Parent job id that invoked the Flink job. For example, the Dataproc job id.") | ||
| val eventDelayMillis: ScallopOption[Int] = | ||
| opt[Int](required = false, | ||
| descr = "Delay to use between event publishes (dictates the eps)", | ||
| default = Some(1000)) | ||
|
|
||
| verify() | ||
| } | ||
|
|
||
| def main(args: Array[String]): Unit = { | ||
| val jobArgs = new JobArgs(args) | ||
| val dataFileName = jobArgs.dataFileName() | ||
| val gcpProject = jobArgs.gcpProject() | ||
| val topic = jobArgs.topic() | ||
| val parentJobId = jobArgs.parentJobId() | ||
| val eventDelayMillis = jobArgs.eventDelayMillis() | ||
|
|
||
| // Configure GCS source | ||
| val avroFormat = new AvroInputFormat[GenericRecord]( | ||
| new Path(dataFileName), | ||
| classOf[GenericRecord] | ||
| ) | ||
|
|
||
| implicit val typeInfo: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(avroSchema) | ||
|
|
||
| // Set up the streaming execution environment | ||
| val env = StreamExecutionEnvironment.getExecutionEnvironment | ||
| env.getConfig | ||
| .enableForceKryo() // use kryo for complex types that Flink's default ser system doesn't support (e.g case classes) | ||
| env.getConfig.enableGenericTypes() // more permissive type checks | ||
| env.addDefaultKryoSerializer(classOf[Schema], classOf[AvroKryoSerializerUtils.AvroSchemaSerializer]) | ||
|
|
||
| val stream = env | ||
| .createInput(avroFormat) | ||
| .setParallelism(1) | ||
|
|
||
| val transformedStream: DataStream[GenericRecord] = stream | ||
| .map(new DelayedSourceTransformFn(eventDelayMillis)) | ||
| .setParallelism(stream.getParallelism) | ||
|
|
||
| // Configure PubSub sink | ||
| val serializationSchema = AvroSerializationSchema.forGeneric(avroSchema) | ||
|
|
||
| val pubSubSink = PubSubSink | ||
| .newBuilder() | ||
| .withSerializationSchema(serializationSchema) | ||
| .withProjectName(gcpProject) | ||
| .withTopicName(topic) | ||
| .build() | ||
|
|
||
| // Write to PubSub | ||
| transformedStream | ||
| .addSink(pubSubSink) | ||
| .setParallelism(transformedStream.getParallelism) | ||
|
|
||
| // Execute program | ||
| env.execute("Periodic PubSub Data Producer") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling and logging
No error handling for file read failures, transformation errors, or Pub/Sub publishing issues.
🤖 Prompt for AI Agents
In
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/FlinkPubSubItemEventDriver.scala
lines 1 to 84, the code lacks error handling and logging for critical operations
such as reading the input file, transforming the data stream, and publishing to
Pub/Sub. Add try-catch blocks or Flink-specific error handling mechanisms around
file reading and stream transformations to catch and log exceptions. Also,
implement logging for failures in the Pub/Sub sink setup and data publishing
steps to ensure any runtime issues are captured and can be diagnosed.
88c8994 to
7552838
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (4)
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/FlinkPubSubItemEventDriver.scala (4)
16-35: Add error handling and loggingNo error handling for file read failures, transformation errors, or Pub/Sub publishing issues.
70-76: 🛠️ Refactor suggestionAdd error handling for Pub/Sub sink.
Implement proper error handling for Pub/Sub failures.
val pubSubSink = PubSubSink .newBuilder() .withSerializationSchema(serializationSchema) .withProjectName(gcpProject) .withTopicName(topic) + .withFailOnError(false) // Consider setting this based on requirements + // Add retry policy if supported by the connector .build() +logger.info(s"Created Pub/Sub sink for topic $topic")
1-15: 🛠️ Refactor suggestionMissing imports for error handling and logging.
Add imports for proper logging and exception handling.
+import org.slf4j.{Logger, LoggerFactory} +import scala.util.{Try, Success, Failure}
36-84: 🛠️ Refactor suggestionImplement logging and exception handling.
Add logging and try-catch blocks to handle failures gracefully.
def main(args: Array[String]): Unit = { + val logger = LoggerFactory.getLogger(getClass) + logger.info("Starting Pub/Sub event driver") + + try { val jobArgs = new JobArgs(args) // ... existing code + logger.info(s"Reading from file: $dataFileName") + logger.info(s"Publishing to $topic in project $gcpProject") // ... existing code // Execute program + logger.info("Executing Flink job") env.execute("Periodic PubSub Data Producer") + } catch { + case e: Exception => + logger.error(s"Failed to execute Pub/Sub event driver: ${e.getMessage}", e) + throw e + } }
🧹 Nitpick comments (2)
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/FlinkPubSubItemEventDriver.scala (2)
59-62: Consider making parallelism configurable.Hardcoded parallelism limits scalability.
val stream = env .createInput(avroFormat) - .setParallelism(1) + .setParallelism(jobArgs.parallelism.getOrElse(1))Add to JobArgs:
val parallelism: ScallopOption[Int] = opt[Int](required = false, default = Some(1))
86-108: Document schema fields.Add documentation explaining each field's purpose.
lazy val avroSchema: Schema = { + // Define schema for event records with the following fields: + // - event_type: Type of event (e.g., view, click) + // - timestamp: Time when event occurred (in epoch milliseconds) + // - visitor_id: Unique identifier for the visitor + // - is_primary: Flag indicating primary event + // - Additional fields for tracking context and metadata new Schema.Parser().parse("""
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (21)
api/python/ai/chronon/repo/gcp.py(4 hunks)api/python/test/canary/group_bys/gcp/item_event_canary.py(1 hunks)api/python/test/canary/teams.py(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala(4 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala(5 hunks)flink/BUILD.bazel(4 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala(5 hunks)flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/source/FlinkSourceProvider.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/source/KafkaFlinkSource.scala(2 hunks)flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala(5 hunks)flink/src/main/scala/ai/chronon/flink_connectors/pubsub/FlinkPubSubItemEventDriver.scala(1 hunks)flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala(2 hunks)flink/src/test/scala/ai/chronon/flink/test/source/KafkaFlinkSourceSpec.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala(2 hunks)flink/src/test/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSourceSpec.scala(1 hunks)maven_install.json(13 hunks)scripts/distribution/build_and_upload_artifacts.sh(3 hunks)spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala(3 hunks)tools/build_rules/dependencies/maven_repository.bzl(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- tools/build_rules/dependencies/maven_repository.bzl
🚧 Files skipped from review as they are similar to previous changes (19)
- api/python/test/canary/teams.py
- flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala
- flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala
- flink/src/test/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSourceSpec.scala
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
- scripts/distribution/build_and_upload_artifacts.sh
- spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala
- flink/src/test/scala/ai/chronon/flink/test/source/KafkaFlinkSourceSpec.scala
- api/python/test/canary/group_bys/gcp/item_event_canary.py
- flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala
- api/python/ai/chronon/repo/gcp.py
- flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
- flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala
- flink/src/main/scala/ai/chronon/flink/source/KafkaFlinkSource.scala
- maven_install.json
- flink/src/main/scala/ai/chronon/flink/source/FlinkSourceProvider.scala
- flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala
- cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
- flink/BUILD.bazel
⏰ Context from checks skipped due to timeout of 90000ms (18)
- GitHub Check: service_commons_tests
- GitHub Check: online_tests
- GitHub Check: flink_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: api_tests
- GitHub Check: aggregator_tests
- GitHub Check: flink_tests
- GitHub Check: api_tests
- GitHub Check: online_tests
- GitHub Check: aggregator_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: enforce_triggered_workflows
| class DelayedSourceTransformFn(delayMs: Int) extends MapFunction[GenericRecord, GenericRecord] { | ||
| override def map(value: GenericRecord): GenericRecord = { | ||
| val updatedTimestamp = System.currentTimeMillis() | ||
| // Update the timestamp field in the record | ||
| value.put("timestamp", updatedTimestamp) | ||
| Thread.sleep(delayMs) | ||
| value | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid Thread.sleep in transformation functions.
Blocking sleep affects performance in distributed processing.
class DelayedSourceTransformFn(delayMs: Int) extends MapFunction[GenericRecord, GenericRecord] {
+ private val logger = LoggerFactory.getLogger(getClass)
+
override def map(value: GenericRecord): GenericRecord = {
+ try {
val updatedTimestamp = System.currentTimeMillis()
// Update the timestamp field in the record
value.put("timestamp", updatedTimestamp)
Thread.sleep(delayMs)
value
+ } catch {
+ case e: InterruptedException =>
+ logger.warn("Sleep interrupted", e)
+ Thread.currentThread().interrupt()
+ value
+ case e: Exception =>
+ logger.error(s"Error processing record: ${e.getMessage}", e)
+ throw e
+ }
}
}Consider alternative rate-limiting approaches.
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In
flink/src/main/scala/ai/chronon/flink_connectors/pubsub/FlinkPubSubItemEventDriver.scala
around lines 111 to 119, the map function uses Thread.sleep to introduce delay,
which blocks the thread and degrades performance in distributed processing.
Remove the Thread.sleep call and instead implement a non-blocking rate-limiting
mechanism, such as using Flink's built-in timers, event time processing, or a
custom asynchronous function to control the processing rate without blocking the
thread.
| ) | ||
| ) + f" --flink-main-jar-uri={flink_jar_uri}" | ||
| if enable_pubsub: | ||
| base_formatted_args += f" --flink-pubsub-jar-uri={flink_pubsub_connector_jar_uri}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw the batch side of things doesn't adopt this architecture but we do also have the connector pattern going there. We just bring in the deps for cloud_gcp. I guess there are pro's and cons to both, curious whether you think we should converge on the same arch between batch and streaming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I debated this a bit - ended up going with pulling in the jar optionally to cut the risks of inadvertent class conflicts. This way I don't need to worry at all about any pubsub changes affecting users that have nothing to do with pubsub in their streaming jobs as we're not loading the jars..
| scala_library( | ||
| name = "lib", | ||
| srcs = glob(["src/main/**/*.scala"]), | ||
| srcs = glob(["src/main/scala/ai/chronon/flink/**/*.scala"]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| env.getConfig | ||
| .enableForceKryo() // use kryo for complex types that Flink's default ser system doesn't support (e.g case classes) | ||
| env.getConfig.enableGenericTypes() // more permissive type checks | ||
| env.addDefaultKryoSerializer(classOf[Schema], classOf[AvroKryoSerializerUtils.AvroSchemaSerializer]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious, but if we don't add this, does serialization break?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah iirc, I hit kryo issues without
| * This means if the job is down for a while, we'll have a decent sized backlog to catch up on. To start afresh, a new subscription is | ||
| * needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To start afresh, a new subscription is needed
oof interesting
868fadb to
1b59a71
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (21)
api/python/ai/chronon/repo/gcp.py(3 hunks)api/python/test/canary/group_bys/gcp/item_event_canary.py(1 hunks)api/python/test/canary/teams.py(1 hunks)cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala(4 hunks)cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala(5 hunks)flink/BUILD.bazel(3 hunks)flink/src/main/scala/ai/chronon/flink/FlinkJob.scala(5 hunks)flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/source/FlinkSourceProvider.scala(1 hunks)flink/src/main/scala/ai/chronon/flink/source/KafkaFlinkSource.scala(2 hunks)flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala(5 hunks)flink/src/main/scala/ai/chronon/flink_connectors/pubsub/FlinkPubSubItemEventDriver.scala(1 hunks)flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala(2 hunks)flink/src/test/scala/ai/chronon/flink/test/source/KafkaFlinkSourceSpec.scala(1 hunks)flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala(2 hunks)flink/src/test/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSourceSpec.scala(1 hunks)maven_install.json(13 hunks)scripts/distribution/build_and_upload_artifacts.sh(2 hunks)spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala(3 hunks)tools/build_rules/dependencies/maven_repository.bzl(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- flink/src/test/scala/ai/chronon/flink/test/source/KafkaFlinkSourceSpec.scala
🚧 Files skipped from review as they are similar to previous changes (19)
- flink/src/main/scala/ai/chronon/flink/source/FlinkSource.scala
- scripts/distribution/build_and_upload_artifacts.sh
- api/python/test/canary/teams.py
- tools/build_rules/dependencies/maven_repository.bzl
- flink/src/test/scala/ai/chronon/flink/validation/ValidationFlinkJobIntegrationTest.scala
- api/python/ai/chronon/repo/gcp.py
- cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala
- flink/src/test/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSourceSpec.scala
- flink/src/test/scala/ai/chronon/flink/test/FlinkTestUtils.scala
- flink/src/main/scala/ai/chronon/flink/source/KafkaFlinkSource.scala
- flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
- spark/src/main/scala/ai/chronon/spark/submission/JobSubmitter.scala
- flink/src/main/scala/ai/chronon/flink_connectors/pubsub/PubSubFlinkSource.scala
- flink/src/main/scala/ai/chronon/flink/validation/ValidationFlinkJob.scala
- maven_install.json
- flink/src/main/scala/ai/chronon/flink_connectors/pubsub/FlinkPubSubItemEventDriver.scala
- api/python/test/canary/group_bys/gcp/item_event_canary.py
- flink/BUILD.bazel
- cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala
⏰ Context from checks skipped due to timeout of 90000ms (18)
- GitHub Check: cloud_gcp_tests
- GitHub Check: service_commons_tests
- GitHub Check: online_tests
- GitHub Check: service_tests
- GitHub Check: api_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: flink_tests
- GitHub Check: aggregator_tests
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: analyzer_tests
- GitHub Check: fetcher_tests
- GitHub Check: join_tests
- GitHub Check: batch_tests
- GitHub Check: python_tests
- GitHub Check: spark_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (3)
flink/src/main/scala/ai/chronon/flink/source/FlinkSourceProvider.scala (3)
1-5: Clean imports and package structure.Standard setup looks good.
6-18: Factory pattern implementation is solid.Good separation of concerns using pattern matching on message bus type.
20-27: Useful property resolution utility.Nice fallback mechanism from props to topicInfo params.
| // Pubsub source is loaded via reflection as we don't want the Flink module to depend on the PubSub connector | ||
| // module as we don't want to pull in Gcp deps in contexts such as running in Aws | ||
| private def loadPubsubSource[T](props: Map[String, String], | ||
| deserializationSchema: DeserializationSchema[T], | ||
| topicInfo: TopicInfo): FlinkSource[T] = { | ||
| val cl = Thread.currentThread().getContextClassLoader // Use Flink's classloader | ||
| val cls = cl.loadClass("ai.chronon.flink_connectors.pubsub.PubSubFlinkSource") | ||
| val constructor = cls.getConstructors.apply(0) | ||
| val onlineImpl = constructor.newInstance(props, deserializationSchema, topicInfo) | ||
| onlineImpl.asInstanceOf[FlinkSource[T]] | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Reflection approach needs error handling.
The reflection loading is clever for dependency isolation but fragile - getConstructors.apply(0) assumes constructor order and lacks exception handling.
private def loadPubsubSource[T](props: Map[String, String],
deserializationSchema: DeserializationSchema[T],
topicInfo: TopicInfo): FlinkSource[T] = {
- val cl = Thread.currentThread().getContextClassLoader // Use Flink's classloader
- val cls = cl.loadClass("ai.chronon.flink_connectors.pubsub.PubSubFlinkSource")
- val constructor = cls.getConstructors.apply(0)
- val onlineImpl = constructor.newInstance(props, deserializationSchema, topicInfo)
- onlineImpl.asInstanceOf[FlinkSource[T]]
+ try {
+ val cl = Thread.currentThread().getContextClassLoader
+ val cls = cl.loadClass("ai.chronon.flink_connectors.pubsub.PubSubFlinkSource")
+ val constructor = cls.getConstructor(classOf[Map[String, String]], classOf[DeserializationSchema[T]], classOf[TopicInfo])
+ val onlineImpl = constructor.newInstance(props, deserializationSchema, topicInfo)
+ onlineImpl.asInstanceOf[FlinkSource[T]]
+ } catch {
+ case e: Exception => throw new RuntimeException("Failed to load PubSub source", e)
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Pubsub source is loaded via reflection as we don't want the Flink module to depend on the PubSub connector | |
| // module as we don't want to pull in Gcp deps in contexts such as running in Aws | |
| private def loadPubsubSource[T](props: Map[String, String], | |
| deserializationSchema: DeserializationSchema[T], | |
| topicInfo: TopicInfo): FlinkSource[T] = { | |
| val cl = Thread.currentThread().getContextClassLoader // Use Flink's classloader | |
| val cls = cl.loadClass("ai.chronon.flink_connectors.pubsub.PubSubFlinkSource") | |
| val constructor = cls.getConstructors.apply(0) | |
| val onlineImpl = constructor.newInstance(props, deserializationSchema, topicInfo) | |
| onlineImpl.asInstanceOf[FlinkSource[T]] | |
| } | |
| private def loadPubsubSource[T](props: Map[String, String], | |
| deserializationSchema: DeserializationSchema[T], | |
| topicInfo: TopicInfo): FlinkSource[T] = { | |
| try { | |
| val cl = Thread.currentThread().getContextClassLoader | |
| val cls = cl.loadClass("ai.chronon.flink_connectors.pubsub.PubSubFlinkSource") | |
| val constructor = cls.getConstructor( | |
| classOf[Map[String, String]], | |
| classOf[DeserializationSchema[T]], | |
| classOf[TopicInfo] | |
| ) | |
| val onlineImpl = constructor.newInstance(props, deserializationSchema, topicInfo) | |
| onlineImpl.asInstanceOf[FlinkSource[T]] | |
| } catch { | |
| case e: Exception => | |
| throw new RuntimeException("Failed to load PubSub source", e) | |
| } | |
| } |
🤖 Prompt for AI Agents
In flink/src/main/scala/ai/chronon/flink/source/FlinkSourceProvider.scala around
lines 29 to 39, the reflection code uses getConstructors.apply(0) without
verifying constructor order and lacks error handling. To fix this, add proper
exception handling around the reflection calls to catch and handle
ClassNotFoundException, NoSuchMethodException, InstantiationException,
IllegalAccessException, and InvocationTargetException. Also, instead of blindly
using the first constructor, explicitly find the constructor matching the
expected parameter types to avoid relying on constructor order.
Summary
This PR adds a Google Pub/Sub Flink source. Some aspects to call out:
Some charts:
Publishing of messages to PubSub using the event driver Flink app:
Writes to BT from the gcp.item_event_canary.actions_pubsub Flink app:
Checklist
Summary by CodeRabbit
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Tests
Chores