Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,
// Using the individual mutate calls allows us to easily return fine-grained success/failure information in the form
// our callers expect.
override def multiPut(requests: Seq[KVStore.PutRequest]): Future[Seq[Boolean]] = {
logger.info(s"Performing multi-put for ${requests.size} requests")
logger.debug(s"Performing multi-put for ${requests.size} requests")
val resultFutures = {
requests.map { request =>
val tableId = mapDatasetToTable(request.dataset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ai.chronon.integrations.cloud_gcp
import ai.chronon.spark.JobAuth
import ai.chronon.spark.JobSubmitter
import ai.chronon.spark.JobSubmitterConstants.FlinkMainJarURI
import ai.chronon.spark.JobSubmitterConstants.FlinkStateUri
import ai.chronon.spark.JobSubmitterConstants.JarURI
import ai.chronon.spark.JobSubmitterConstants.MainClass
import ai.chronon.spark.JobSubmitterConstants.SavepointUri
Expand Down Expand Up @@ -66,8 +67,10 @@ class DataprocSubmitter(jobControllerClient: JobControllerClient, conf: Submitte
case TypeFlinkJob =>
val mainJarUri =
jobProperties.getOrElse(FlinkMainJarURI, throw new RuntimeException(s"Missing expected $FlinkMainJarURI"))
val flinkStateUri =
jobProperties.getOrElse(FlinkStateUri, throw new RuntimeException(s"Missing expected $FlinkStateUri"))
val maybeSavepointUri = jobProperties.get(SavepointUri)
buildFlinkJob(mainClass, mainJarUri, jarUri, maybeSavepointUri, args: _*)
buildFlinkJob(mainClass, mainJarUri, jarUri, flinkStateUri, maybeSavepointUri, args: _*)
}

val jobPlacement = JobPlacement
Expand Down Expand Up @@ -104,12 +107,10 @@ class DataprocSubmitter(jobControllerClient: JobControllerClient, conf: Submitte
private def buildFlinkJob(mainClass: String,
mainJarUri: String,
jarUri: String,
flinkStateUri: String,
maybeSavePointUri: Option[String],
args: String*): Job.Builder = {

// TODO leverage a setting in teams.json when that's wired up
val checkpointsDir = "gs://zl-warehouse/flink-state"

// JobManager is primarily responsible for coordinating the job (task slots, checkpoint triggering) and not much else
// so 4G should suffice.
// We go with 64G TM containers (4 task slots per container)
Expand All @@ -127,10 +128,14 @@ class DataprocSubmitter(jobControllerClient: JobControllerClient, conf: Submitte
"taskmanager.memory.process.size" -> "64G",
"taskmanager.memory.network.min" -> "1G",
"taskmanager.memory.network.max" -> "2G",
// explicitly set the number of task slots as otherwise it defaults to the number of cores
"taskmanager.numberOfTaskSlots" -> "4",
"taskmanager.memory.managed.fraction" -> "0.5f",
// default is 256m, we seem to be close to the limit so we give ourselves some headroom
"taskmanager.memory.jvm-metaspace.size" -> "512m",
"yarn.classpath.include-user-jar" -> "FIRST",
"state.savepoints.dir" -> checkpointsDir,
"state.checkpoints.dir" -> checkpointsDir,
"state.savepoints.dir" -> flinkStateUri,
"state.checkpoints.dir" -> flinkStateUri,
// override the local dir for rocksdb as the default ends up being too large file name size wise
"state.backend.rocksdb.localdir" -> "/tmp/flink-state",
"state.checkpoint-storage" -> "filesystem"
Expand Down Expand Up @@ -246,8 +251,13 @@ object DataprocSubmitter {
val (dataprocJobType, jobProps) = jobTypeValue.toLowerCase match {
case "spark" => (TypeSparkJob, Map(MainClass -> mainClass, JarURI -> jarUri))
case "flink" => {
val flinkStateUri = sys.env.getOrElse("FLINK_STATE_URI", throw new Exception("FLINK_STATE_URI not set"))

val flinkMainJarUri = args.filter(_.startsWith(FLINK_MAIN_JAR_URI_ARG_PREFIX))(0).split("=")(1)
val baseJobProps = Map(MainClass -> mainClass, JarURI -> jarUri, FlinkMainJarURI -> flinkMainJarUri)
val baseJobProps = Map(MainClass -> mainClass,
JarURI -> jarUri,
FlinkMainJarURI -> flinkMainJarUri,
FlinkStateUri -> flinkStateUri)
if (args.exists(_.startsWith(FLINK_SAVEPOINT_URI_ARG_PREFIX))) {
val savepointUri = args.filter(_.startsWith(FLINK_SAVEPOINT_URI_ARG_PREFIX))(0).split("=")(1)
(TypeFlinkJob, baseJobProps + (SavepointUri -> savepointUri))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ai.chronon.integrations.cloud_gcp

import ai.chronon.spark
import ai.chronon.spark.JobSubmitterConstants.FlinkMainJarURI
import ai.chronon.spark.JobSubmitterConstants.FlinkStateUri
import ai.chronon.spark.JobSubmitterConstants.JarURI
import ai.chronon.spark.JobSubmitterConstants.MainClass
import com.google.api.gax.rpc.UnaryCallable
Expand Down Expand Up @@ -61,7 +62,9 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar {
FlinkMainJarURI -> "gs://zipline-jars/flink-assembly-0.1.0-SNAPSHOT.jar",
// Include savepoint / checkpoint Uri to resume from where a job left off
// SavepointUri -> "gs://zl-warehouse/flink-state/93686c72c3fd63f58d631e8388d8180d/chk-12",
JarURI -> "gs://zipline-jars/cloud_gcp_bigtable.jar"
JarURI -> "gs://zipline-jars/cloud_gcp_bigtable.jar",
// This is where we write out checkpoints / persist state while the job is running
FlinkStateUri -> "gs://zl-warehouse/flink-state"
),
List.empty,
"--online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl",
Expand All @@ -81,7 +84,9 @@ class DataprocSubmitterTest extends AnyFlatSpec with MockitoSugar {
Map(
MainClass -> "ai.chronon.flink.FlinkKafkaBeaconEventDriver",
FlinkMainJarURI -> "gs://zipline-jars/flink_kafka_ingest-assembly-0.1.0-SNAPSHOT.jar",
JarURI -> "gs://zipline-jars/cloud_gcp_bigtable.jar"
JarURI -> "gs://zipline-jars/cloud_gcp_bigtable.jar",
// This is where we write out checkpoints / persist state while the job is running
FlinkStateUri -> "gs://zl-warehouse/flink-state"
),
List.empty,
"--kafka-bootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092",
Expand Down
5 changes: 3 additions & 2 deletions flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import ai.chronon.api.DataType
import ai.chronon.api.Extensions.GroupByOps
import ai.chronon.api.Extensions.SourceOps
import ai.chronon.flink.FlinkJob.watermarkStrategy
import ai.chronon.flink.SchemaRegistrySchemaProvider.RegistryHostKey
import ai.chronon.flink.types.AvroCodecOutput
import ai.chronon.flink.types.TimestampedTile
import ai.chronon.flink.types.WriteResponse
Expand Down Expand Up @@ -318,11 +319,11 @@ object FlinkJob {
val topicInfo = TopicInfo.parse(topicUri)

val schemaProvider =
topicInfo.params.get("registry_url") match {
topicInfo.params.get(RegistryHostKey) match {
case Some(_) => new SchemaRegistrySchemaProvider(topicInfo.params)
case None =>
throw new IllegalArgumentException(
"We only support schema registry based schema lookups. Missing registry_url in topic config")
s"We only support schema registry based schema lookups. Missing $RegistryHostKey in topic config")
}

val (encoder, deserializationSchema) = schemaProvider.buildEncoderAndDeserSchema(topicInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,45 @@ import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Row
import org.apache.spark.sql.avro.AvroDeserializationSupport

/**
* SchemaProvider that uses the Confluent Schema Registry to fetch schemas for topics.
* Can be configured as: topic = "kafka://topic-name/registry_host=host/[registry_port=port]/[registry_scheme=http]/[subject=subject]"
* Port, scheme and subject are optional. If port is missing, we assume the host is pointing to a LB address / such that
* forwards to the right host + port. Scheme defaults to http. Subject defaults to the topic name + "-value" (based on schema
* registry conventions).
*/
class SchemaRegistrySchemaProvider(conf: Map[String, String]) extends SchemaProvider(conf) {
import SchemaRegistrySchemaProvider._

private val schemaRegistryHost: String =
conf.getOrElse(RegistryHostKey, throw new IllegalArgumentException(s"$RegistryHostKey not set"))

// port is optional as many folks configure just the host as it's behind an LB
private val schemaRegistryPortString: Option[String] = conf.get(RegistryPortKey)

// default to http if not set
private val schemaRegistrySchemeString: String = conf.getOrElse(RegistrySchemeKey, "http")

private val schemaRegistryUrl: String =
conf.getOrElse("registry_url", throw new IllegalArgumentException("registry_url not set"))
private val CacheCapacity: Int = 10

private val schemaRegistryClient: SchemaRegistryClient = buildSchemaRegistryClient(schemaRegistryUrl)
private val schemaRegistryClient: SchemaRegistryClient =
buildSchemaRegistryClient(schemaRegistrySchemeString, schemaRegistryHost, schemaRegistryPortString)

private[flink] def buildSchemaRegistryClient(registryUrl: String): SchemaRegistryClient =
new CachedSchemaRegistryClient(registryUrl, CacheCapacity)
private[flink] def buildSchemaRegistryClient(schemeString: String,
registryHost: String,
maybePortString: Option[String]): SchemaRegistryClient = {
maybePortString match {
case Some(portString) =>
val registryUrl = s"$schemeString://$registryHost:$portString"
new CachedSchemaRegistryClient(registryUrl, CacheCapacity)
case None =>
val registryUrl = s"$schemeString://$registryHost"
new CachedSchemaRegistryClient(registryUrl, CacheCapacity)
}
}

override def buildEncoderAndDeserSchema(topicInfo: TopicInfo): (Encoder[Row], DeserializationSchema[Row]) = {
val subject = topicInfo.params.getOrElse("subject", s"${topicInfo.name}-value")
val subject = topicInfo.params.getOrElse(RegistrySubjectKey, s"${topicInfo.name}-value")
val parsedSchema =
try {
val metadata = schemaRegistryClient.getLatestSchemaMetadata(subject)
Expand All @@ -43,3 +69,10 @@ class SchemaRegistrySchemaProvider(conf: Map[String, String]) extends SchemaProv
}
}
}

object SchemaRegistrySchemaProvider {
val RegistryHostKey = "registry_host"
val RegistryPortKey = "registry_port"
val RegistrySchemeKey = "registry_scheme"
val RegistrySubjectKey = "subject"
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ai.chronon.flink.test

import ai.chronon.flink.SchemaRegistrySchemaProvider
import ai.chronon.flink.SchemaRegistrySchemaProvider.RegistryHostKey
import ai.chronon.online.TopicInfo
import io.confluent.kafka.schemaregistry.SchemaProvider
import io.confluent.kafka.schemaregistry.avro.AvroSchema
Expand All @@ -14,7 +15,7 @@ import scala.jdk.CollectionConverters._

class MockSchemaRegistrySchemaProvider(conf: Map[String, String], mockSchemaRegistryClient: MockSchemaRegistryClient)
extends SchemaRegistrySchemaProvider(conf) {
override def buildSchemaRegistryClient(registryUrl: String): MockSchemaRegistryClient = mockSchemaRegistryClient
override def buildSchemaRegistryClient(schemeString: String, registryHost: String, maybePortString: Option[String]): MockSchemaRegistryClient = mockSchemaRegistryClient
}

class SchemaRegistrySchemaProviderSpec extends AnyFlatSpec {
Expand All @@ -23,7 +24,7 @@ class SchemaRegistrySchemaProviderSpec extends AnyFlatSpec {
val protoSchemaProvider: SchemaProvider = new ProtobufSchemaProvider
val schemaRegistryClient = new MockSchemaRegistryClient(Seq(avroSchemaProvider, protoSchemaProvider).asJava)
val schemaRegistrySchemaProvider =
new MockSchemaRegistrySchemaProvider(Map("registry_url" -> "http://localhost:8081"), schemaRegistryClient)
new MockSchemaRegistrySchemaProvider(Map(RegistryHostKey -> "localhost"), schemaRegistryClient)

it should "fail if the schema subject is not found" in {
val topicInfo = new TopicInfo("test-topic", "kafka", Map.empty)
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ object JobSubmitterConstants {
val JarURI = "jarUri"
val FlinkMainJarURI = "flinkMainJarUri"
val SavepointUri = "savepointUri"
val FlinkStateUri = "flinkStateUri"
}