Skip to content

Commit af20b80

Browse files
authored
Flink submit updates to allow our clients jobs to run (#315)
## Summary Some updates to get our Flink jobs running on the our clients side: * Configure schema registry via host/port/scheme instead of URL * Explicitly set the task slots per task manager * Configure checkpoint directory based on teams.json ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [X] Integration tested - Kicked off the job on the our clients cluster and confirmed it's up and running - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an enhanced configuration option for Flink job submissions by adding a state URI parameter for improved job state management. - Expanded schema registry configuration, enabling greater flexibility with host, port, and scheme settings. - **Chores** - Adjusted logging levels and refined error messaging to support better troubleshooting. - **Documentation** - Updated configuration guidance to aid in setting up schema registry integration. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 050b17c commit af20b80

File tree

7 files changed

+71
-20
lines changed

7 files changed

+71
-20
lines changed

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,
271271
// Using the individual mutate calls allows us to easily return fine-grained success/failure information in the form
272272
// our callers expect.
273273
override def multiPut(requests: Seq[KVStore.PutRequest]): Future[Seq[Boolean]] = {
274-
logger.info(s"Performing multi-put for ${requests.size} requests")
274+
logger.debug(s"Performing multi-put for ${requests.size} requests")
275275
val resultFutures = {
276276
requests.map { request =>
277277
val tableId = mapDatasetToTable(request.dataset)

cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ai.chronon.integrations.cloud_gcp
22
import ai.chronon.spark.JobAuth
33
import ai.chronon.spark.JobSubmitter
44
import ai.chronon.spark.JobSubmitterConstants.FlinkMainJarURI
5+
import ai.chronon.spark.JobSubmitterConstants.FlinkStateUri
56
import ai.chronon.spark.JobSubmitterConstants.JarURI
67
import ai.chronon.spark.JobSubmitterConstants.MainClass
78
import ai.chronon.spark.JobSubmitterConstants.SavepointUri
@@ -66,8 +67,10 @@ class DataprocSubmitter(jobControllerClient: JobControllerClient, conf: Submitte
6667
case TypeFlinkJob =>
6768
val mainJarUri =
6869
jobProperties.getOrElse(FlinkMainJarURI, throw new RuntimeException(s"Missing expected $FlinkMainJarURI"))
70+
val flinkStateUri =
71+
jobProperties.getOrElse(FlinkStateUri, throw new RuntimeException(s"Missing expected $FlinkStateUri"))
6972
val maybeSavepointUri = jobProperties.get(SavepointUri)
70-
buildFlinkJob(mainClass, mainJarUri, jarUri, maybeSavepointUri, args: _*)
73+
buildFlinkJob(mainClass, mainJarUri, jarUri, flinkStateUri, maybeSavepointUri, args: _*)
7174
}
7275

7376
val jobPlacement = JobPlacement
@@ -104,12 +107,10 @@ class DataprocSubmitter(jobControllerClient: JobControllerClient, conf: Submitte
104107
private def buildFlinkJob(mainClass: String,
105108
mainJarUri: String,
106109
jarUri: String,
110+
flinkStateUri: String,
107111
maybeSavePointUri: Option[String],
108112
args: String*): Job.Builder = {
109113

110-
// TODO leverage a setting in teams.json when that's wired up
111-
val checkpointsDir = "gs://zl-warehouse/flink-state"
112-
113114
// JobManager is primarily responsible for coordinating the job (task slots, checkpoint triggering) and not much else
114115
// so 4G should suffice.
115116
// We go with 64G TM containers (4 task slots per container)
@@ -127,10 +128,14 @@ class DataprocSubmitter(jobControllerClient: JobControllerClient, conf: Submitte
127128
"taskmanager.memory.process.size" -> "64G",
128129
"taskmanager.memory.network.min" -> "1G",
129130
"taskmanager.memory.network.max" -> "2G",
131+
// explicitly set the number of task slots as otherwise it defaults to the number of cores
132+
"taskmanager.numberOfTaskSlots" -> "4",
130133
"taskmanager.memory.managed.fraction" -> "0.5f",
134+
// default is 256m, we seem to be close to the limit so we give ourselves some headroom
135+
"taskmanager.memory.jvm-metaspace.size" -> "512m",
131136
"yarn.classpath.include-user-jar" -> "FIRST",
132-
"state.savepoints.dir" -> checkpointsDir,
133-
"state.checkpoints.dir" -> checkpointsDir,
137+
"state.savepoints.dir" -> flinkStateUri,
138+
"state.checkpoints.dir" -> flinkStateUri,
134139
// override the local dir for rocksdb as the default ends up being too large file name size wise
135140
"state.backend.rocksdb.localdir" -> "/tmp/flink-state",
136141
"state.checkpoint-storage" -> "filesystem"
@@ -246,8 +251,13 @@ object DataprocSubmitter {
246251
val (dataprocJobType, jobProps) = jobTypeValue.toLowerCase match {
247252
case "spark" => (TypeSparkJob, Map(MainClass -> mainClass, JarURI -> jarUri))
248253
case "flink" => {
254+
val flinkStateUri = sys.env.getOrElse("FLINK_STATE_URI", throw new Exception("FLINK_STATE_URI not set"))
255+
249256
val flinkMainJarUri = args.filter(_.startsWith(FLINK_MAIN_JAR_URI_ARG_PREFIX))(0).split("=")(1)
250-
val baseJobProps = Map(MainClass -> mainClass, JarURI -> jarUri, FlinkMainJarURI -> flinkMainJarUri)
257+
val baseJobProps = Map(MainClass -> mainClass,
258+
JarURI -> jarUri,
259+
FlinkMainJarURI -> flinkMainJarUri,
260+
FlinkStateUri -> flinkStateUri)
251261
if (args.exists(_.startsWith(FLINK_SAVEPOINT_URI_ARG_PREFIX))) {
252262
val savepointUri = args.filter(_.startsWith(FLINK_SAVEPOINT_URI_ARG_PREFIX))(0).split("=")(1)
253263
(TypeFlinkJob, baseJobProps + (SavepointUri -> savepointUri))

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ai.chronon.integrations.cloud_gcp
22

33
import ai.chronon.spark
44
import ai.chronon.spark.JobSubmitterConstants.FlinkMainJarURI
5+
import ai.chronon.spark.JobSubmitterConstants.FlinkStateUri
56
import ai.chronon.spark.JobSubmitterConstants.JarURI
67
import ai.chronon.spark.JobSubmitterConstants.MainClass
78
import com.google.api.gax.rpc.UnaryCallable
@@ -61,7 +62,9 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar {
6162
FlinkMainJarURI -> "gs://zipline-jars/flink-assembly-0.1.0-SNAPSHOT.jar",
6263
// Include savepoint / checkpoint Uri to resume from where a job left off
6364
// SavepointUri -> "gs://zl-warehouse/flink-state/93686c72c3fd63f58d631e8388d8180d/chk-12",
64-
JarURI -> "gs://zipline-jars/cloud_gcp_bigtable.jar"
65+
JarURI -> "gs://zipline-jars/cloud_gcp_bigtable.jar",
66+
// This is where we write out checkpoints / persist state while the job is running
67+
FlinkStateUri -> "gs://zl-warehouse/flink-state"
6568
),
6669
List.empty,
6770
"--online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl",
@@ -81,7 +84,9 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar {
8184
Map(
8285
MainClass -> "ai.chronon.flink.FlinkKafkaBeaconEventDriver",
8386
FlinkMainJarURI -> "gs://zipline-jars/flink_kafka_ingest-assembly-0.1.0-SNAPSHOT.jar",
84-
JarURI -> "gs://zipline-jars/cloud_gcp_bigtable.jar"
87+
JarURI -> "gs://zipline-jars/cloud_gcp_bigtable.jar",
88+
// This is where we write out checkpoints / persist state while the job is running
89+
FlinkStateUri -> "gs://zl-warehouse/flink-state"
8590
),
8691
List.empty,
8792
"--kafka-bootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092",

flink/src/main/scala/ai/chronon/flink/FlinkJob.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import ai.chronon.api.DataType
77
import ai.chronon.api.Extensions.GroupByOps
88
import ai.chronon.api.Extensions.SourceOps
99
import ai.chronon.flink.FlinkJob.watermarkStrategy
10+
import ai.chronon.flink.SchemaRegistrySchemaProvider.RegistryHostKey
1011
import ai.chronon.flink.types.AvroCodecOutput
1112
import ai.chronon.flink.types.TimestampedTile
1213
import ai.chronon.flink.types.WriteResponse
@@ -318,11 +319,11 @@ object FlinkJob {
318319
val topicInfo = TopicInfo.parse(topicUri)
319320

320321
val schemaProvider =
321-
topicInfo.params.get("registry_url") match {
322+
topicInfo.params.get(RegistryHostKey) match {
322323
case Some(_) => new SchemaRegistrySchemaProvider(topicInfo.params)
323324
case None =>
324325
throw new IllegalArgumentException(
325-
"We only support schema registry based schema lookups. Missing registry_url in topic config")
326+
s"We only support schema registry based schema lookups. Missing $RegistryHostKey in topic config")
326327
}
327328

328329
val (encoder, deserializationSchema) = schemaProvider.buildEncoderAndDeserSchema(topicInfo)

flink/src/main/scala/ai/chronon/flink/SchemaRegistrySchemaProvider.scala

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,45 @@ import org.apache.spark.sql.Encoder
99
import org.apache.spark.sql.Row
1010
import org.apache.spark.sql.avro.AvroDeserializationSupport
1111

12+
/**
13+
* SchemaProvider that uses the Confluent Schema Registry to fetch schemas for topics.
14+
* Can be configured as: topic = "kafka://topic-name/registry_host=host/[registry_port=port]/[registry_scheme=http]/[subject=subject]"
15+
* Port, scheme and subject are optional. If port is missing, we assume the host is pointing to a LB address / such that
16+
* forwards to the right host + port. Scheme defaults to http. Subject defaults to the topic name + "-value" (based on schema
17+
* registry conventions).
18+
*/
1219
class SchemaRegistrySchemaProvider(conf: Map[String, String]) extends SchemaProvider(conf) {
20+
import SchemaRegistrySchemaProvider._
21+
22+
private val schemaRegistryHost: String =
23+
conf.getOrElse(RegistryHostKey, throw new IllegalArgumentException(s"$RegistryHostKey not set"))
24+
25+
// port is optional as many folks configure just the host as it's behind an LB
26+
private val schemaRegistryPortString: Option[String] = conf.get(RegistryPortKey)
27+
28+
// default to http if not set
29+
private val schemaRegistrySchemeString: String = conf.getOrElse(RegistrySchemeKey, "http")
1330

14-
private val schemaRegistryUrl: String =
15-
conf.getOrElse("registry_url", throw new IllegalArgumentException("registry_url not set"))
1631
private val CacheCapacity: Int = 10
1732

18-
private val schemaRegistryClient: SchemaRegistryClient = buildSchemaRegistryClient(schemaRegistryUrl)
33+
private val schemaRegistryClient: SchemaRegistryClient =
34+
buildSchemaRegistryClient(schemaRegistrySchemeString, schemaRegistryHost, schemaRegistryPortString)
1935

20-
private[flink] def buildSchemaRegistryClient(registryUrl: String): SchemaRegistryClient =
21-
new CachedSchemaRegistryClient(registryUrl, CacheCapacity)
36+
private[flink] def buildSchemaRegistryClient(schemeString: String,
37+
registryHost: String,
38+
maybePortString: Option[String]): SchemaRegistryClient = {
39+
maybePortString match {
40+
case Some(portString) =>
41+
val registryUrl = s"$schemeString://$registryHost:$portString"
42+
new CachedSchemaRegistryClient(registryUrl, CacheCapacity)
43+
case None =>
44+
val registryUrl = s"$schemeString://$registryHost"
45+
new CachedSchemaRegistryClient(registryUrl, CacheCapacity)
46+
}
47+
}
2248

2349
override def buildEncoderAndDeserSchema(topicInfo: TopicInfo): (Encoder[Row], DeserializationSchema[Row]) = {
24-
val subject = topicInfo.params.getOrElse("subject", s"${topicInfo.name}-value")
50+
val subject = topicInfo.params.getOrElse(RegistrySubjectKey, s"${topicInfo.name}-value")
2551
val parsedSchema =
2652
try {
2753
val metadata = schemaRegistryClient.getLatestSchemaMetadata(subject)
@@ -43,3 +69,10 @@ class SchemaRegistrySchemaProvider(conf: Map[String, String]) extends SchemaProv
4369
}
4470
}
4571
}
72+
73+
object SchemaRegistrySchemaProvider {
74+
val RegistryHostKey = "registry_host"
75+
val RegistryPortKey = "registry_port"
76+
val RegistrySchemeKey = "registry_scheme"
77+
val RegistrySubjectKey = "subject"
78+
}

flink/src/test/scala/ai/chronon/flink/test/SchemaRegistrySchemaProviderSpec.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ai.chronon.flink.test
22

33
import ai.chronon.flink.SchemaRegistrySchemaProvider
4+
import ai.chronon.flink.SchemaRegistrySchemaProvider.RegistryHostKey
45
import ai.chronon.online.TopicInfo
56
import io.confluent.kafka.schemaregistry.SchemaProvider
67
import io.confluent.kafka.schemaregistry.avro.AvroSchema
@@ -14,7 +15,7 @@ import scala.jdk.CollectionConverters._
1415

1516
class MockSchemaRegistrySchemaProvider(conf: Map[String, String], mockSchemaRegistryClient: MockSchemaRegistryClient)
1617
extends SchemaRegistrySchemaProvider(conf) {
17-
override def buildSchemaRegistryClient(registryUrl: String): MockSchemaRegistryClient = mockSchemaRegistryClient
18+
override def buildSchemaRegistryClient(schemeString: String, registryHost: String, maybePortString: Option[String]): MockSchemaRegistryClient = mockSchemaRegistryClient
1819
}
1920

2021
class SchemaRegistrySchemaProviderSpec extends AnyFlatSpec {
@@ -23,7 +24,7 @@ class SchemaRegistrySchemaProviderSpec extends AnyFlatSpec {
2324
val protoSchemaProvider: SchemaProvider = new ProtobufSchemaProvider
2425
val schemaRegistryClient = new MockSchemaRegistryClient(Seq(avroSchemaProvider, protoSchemaProvider).asJava)
2526
val schemaRegistrySchemaProvider =
26-
new MockSchemaRegistrySchemaProvider(Map("registry_url" -> "http://localhost:8081"), schemaRegistryClient)
27+
new MockSchemaRegistrySchemaProvider(Map(RegistryHostKey -> "localhost"), schemaRegistryClient)
2728

2829
it should "fail if the schema subject is not found" in {
2930
val topicInfo = new TopicInfo("test-topic", "kafka", Map.empty)

spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,5 @@ object JobSubmitterConstants {
2222
val JarURI = "jarUri"
2323
val FlinkMainJarURI = "flinkMainJarUri"
2424
val SavepointUri = "savepointUri"
25+
val FlinkStateUri = "flinkStateUri"
2526
}

0 commit comments

Comments
 (0)