diff --git a/build.sbt b/build.sbt index e597909518..d51b25b53e 100644 --- a/build.sbt +++ b/build.sbt @@ -109,7 +109,8 @@ val circe = Seq( val flink_all = Seq( "org.apache.flink" %% "flink-streaming-scala", "org.apache.flink" % "flink-metrics-dropwizard", - "org.apache.flink" % "flink-clients" + "org.apache.flink" % "flink-clients", + "org.apache.flink" % "flink-yarn" ).map(_ % flink_1_17) val vertx_java = Seq( @@ -213,6 +214,22 @@ lazy val flink = project .settings( libraryDependencies ++= spark_all, libraryDependencies ++= flink_all, + assembly / assemblyMergeStrategy := { + case PathList("META-INF", "services", xs @ _*) => MergeStrategy.concat + case "reference.conf" => MergeStrategy.concat + case "application.conf" => MergeStrategy.concat + case PathList("META-INF", xs @ _*) => MergeStrategy.discard + case _ => MergeStrategy.first + }, + // Exclude Hadoop & Guava from the assembled JAR + // Else we hit an error - IllegalAccessError: class org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its + // superinterface org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator + // Or: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(...) + // Or: 'com/google/protobuf/MapField' is not assignable to 'com/google/protobuf/MapFieldReflectionAccessor' + assembly / assemblyExcludedJars := { + val cp = (assembly / fullClasspath).value + cp filter { jar => jar.data.getName.startsWith("hadoop-") || jar.data.getName.startsWith("guava") || jar.data.getName.startsWith("protobuf")} + }, libraryDependencies += "org.apache.flink" % "flink-test-utils" % flink_1_17 % Test excludeAll ( ExclusionRule(organization = "org.apache.logging.log4j", name = "log4j-api"), ExclusionRule(organization = "org.apache.logging.log4j", name = "log4j-core"), @@ -236,13 +253,24 @@ lazy val cloud_gcp = project libraryDependencies += "org.json4s" %% "json4s-native" % "3.7.0-M11", libraryDependencies += "org.json4s" %% "json4s-core" % "3.7.0-M11", libraryDependencies += "org.yaml" % "snakeyaml" % "2.3", + libraryDependencies += "io.grpc" % "grpc-netty-shaded" % "1.62.2", libraryDependencies ++= avro, libraryDependencies ++= spark_all_provided, dependencyOverrides ++= jackson, + // assembly merge settings to allow Flink jobs to kick off + assembly / assemblyMergeStrategy := { + case PathList("META-INF", "services", xs @ _*) => MergeStrategy.concat // Add to include channel provider + case PathList("META-INF", xs @ _*) => MergeStrategy.discard + case "reference.conf" => MergeStrategy.concat + case "application.conf" => MergeStrategy.concat + case _ => MergeStrategy.first + }, libraryDependencies += "org.mockito" % "mockito-core" % "5.12.0" % Test, libraryDependencies += "com.google.cloud" % "google-cloud-bigtable-emulator" % "0.178.0" % Test, // force a newer version of reload4j to sidestep: https://security.snyk.io/vuln/SNYK-JAVA-CHQOSRELOAD4J-5731326 - dependencyOverrides += "ch.qos.reload4j" % "reload4j" % "1.2.25" + dependencyOverrides ++= Seq( + "ch.qos.reload4j" % "reload4j" % "1.2.25", + ) ) lazy val cloud_gcp_submitter = project diff --git a/cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml b/cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml index 45220b88ad..46088bed5b 100644 --- a/cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml +++ b/cloud_gcp/src/main/resources/dataproc-submitter-conf.yaml @@ -2,5 +2,3 @@ projectId: "canary-443022" region: "us-central1" clusterName: "canary-2" -jarUri: "gs://zipline-jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar" -mainClass: "ai.chronon.spark.Driver" diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala index 2fde26a499..aa8f0c35f5 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitter.scala @@ -1,6 +1,12 @@ package ai.chronon.integrations.cloud_gcp import ai.chronon.spark.JobAuth import ai.chronon.spark.JobSubmitter +import ai.chronon.spark.JobSubmitterConstants.FlinkMainJarURI +import ai.chronon.spark.JobSubmitterConstants.JarURI +import ai.chronon.spark.JobSubmitterConstants.MainClass +import ai.chronon.spark.JobType +import ai.chronon.spark.{FlinkJob => TypeFlinkJob} +import ai.chronon.spark.{SparkJob => TypeSparkJob} import com.google.api.gax.rpc.ApiException import com.google.cloud.dataproc.v1._ import org.json4s._ @@ -14,9 +20,7 @@ import collection.JavaConverters._ case class SubmitterConf( projectId: String, region: String, - clusterName: String, - jarUri: String, - mainClass: String + clusterName: String ) { def endPoint: String = s"${region}-dataproc.googleapis.com:443" @@ -49,15 +53,20 @@ class DataprocSubmitter(jobControllerClient: JobControllerClient, conf: Submitte job.getDone } - override def submit(files: List[String], args: String*): String = { - - val sparkJob = SparkJob - .newBuilder() - .setMainClass(conf.mainClass) - .addJarFileUris(conf.jarUri) - .addAllFileUris(files.asJava) - .addAllArgs(args.toIterable.asJava) - .build() + override def submit(jobType: JobType, + jobProperties: Map[String, String], + files: List[String], + args: String*): String = { + val mainClass = jobProperties.getOrElse(MainClass, throw new RuntimeException("Main class not found")) + val jarUri = jobProperties.getOrElse(JarURI, throw new RuntimeException("Jar URI not found")) + + val jobBuilder = jobType match { + case TypeSparkJob => buildSparkJob(mainClass, jarUri, files, args: _*) + case TypeFlinkJob => + val mainJarUri = + jobProperties.getOrElse(FlinkMainJarURI, throw new RuntimeException(s"Missing expected $FlinkMainJarURI")) + buildFlinkJob(mainClass, mainJarUri, jarUri, args: _*) + } val jobPlacement = JobPlacement .newBuilder() @@ -65,11 +74,9 @@ class DataprocSubmitter(jobControllerClient: JobControllerClient, conf: Submitte .build() try { - val job = Job - .newBuilder() + val job = jobBuilder .setReference(jobReference) .setPlacement(jobPlacement) - .setSparkJob(sparkJob) .build() val submittedJob = jobControllerClient.submitJob(conf.projectId, conf.region, job) @@ -77,10 +84,36 @@ class DataprocSubmitter(jobControllerClient: JobControllerClient, conf: Submitte } catch { case e: ApiException => - throw new RuntimeException(s"Failed to submit job: ${e.getMessage}") + throw new RuntimeException(s"Failed to submit job: ${e.getMessage}", e) } } + private def buildSparkJob(mainClass: String, jarUri: String, files: List[String], args: String*): Job.Builder = { + val sparkJob = SparkJob + .newBuilder() + .setMainClass(mainClass) + .addJarFileUris(jarUri) + .addAllFileUris(files.asJava) + .addAllArgs(args.toIterable.asJava) + .build() + Job.newBuilder().setSparkJob(sparkJob) + } + + private def buildFlinkJob(mainClass: String, mainJarUri: String, jarUri: String, args: String*): Job.Builder = { + val envProps = + Map("jobmanager.memory.process.size" -> "4G", "yarn.classpath.include-user-jar" -> "FIRST") + + val flinkJob = FlinkJob + .newBuilder() + .setMainClass(mainClass) + .setMainJarFileUri(mainJarUri) + .putAllProperties(envProps.asJava) + .addJarFileUris(jarUri) + .addAllArgs(args.toIterable.asJava) + .build() + Job.newBuilder().setFlinkJob(flinkJob) + } + def jobReference: JobReference = JobReference.newBuilder().build() } @@ -146,14 +179,14 @@ object DataprocSubmitter { val submitterConf = SubmitterConf( projectId, region, - clusterName, - chrononJarUri, - "ai.chronon.spark.Driver" + clusterName ) val a = DataprocSubmitter(submitterConf) val jobId = a.submit( + TypeSparkJob, + Map(MainClass -> "ai.chronon.spark.Driver", JarURI -> chrononJarUri), gcsFiles.toList, userArgs: _* ) diff --git a/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala index 94afd02767..3639887b2f 100644 --- a/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala +++ b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/DataprocSubmitterTest.scala @@ -1,5 +1,9 @@ package ai.chronon.integrations.cloud_gcp +import ai.chronon.spark +import ai.chronon.spark.JobSubmitterConstants.FlinkMainJarURI +import ai.chronon.spark.JobSubmitterConstants.JarURI +import ai.chronon.spark.JobSubmitterConstants.MainClass import com.google.api.gax.rpc.UnaryCallable import com.google.cloud.dataproc.v1._ import com.google.cloud.dataproc.v1.stub.JobControllerStub @@ -37,9 +41,9 @@ class DataprocSubmitterTest extends AnyFunSuite with MockitoSugar { val submitter = new DataprocSubmitter( mockJobControllerClient, - SubmitterConf("test-project", "test-region", "test-cluster", "test-jar-uri", "test-main-class")) + SubmitterConf("test-project", "test-region", "test-cluster")) - val submittedJobId = submitter.submit(List.empty) + val submittedJobId = submitter.submit(spark.SparkJob, Map(MainClass -> "test-main-class", JarURI -> "test-jar-uri"), List.empty) assertEquals(submittedJobId, jobId) } @@ -47,11 +51,29 @@ class DataprocSubmitterTest extends AnyFunSuite with MockitoSugar { BigQueryUtilScala.validateScalaVersionCompatibility() } + ignore("test flink job locally") { + val submitter = DataprocSubmitter() + val submittedJobId = + submitter.submit(spark.FlinkJob, + Map(MainClass -> "ai.chronon.flink.FlinkJob", + FlinkMainJarURI -> "gs://zipline-jars/flink-assembly-0.1.0-SNAPSHOT.jar", + JarURI -> "gs://zipline-jars/cloud_gcp_bigtable.jar"), + List.empty, + "--online-class=ai.chronon.integrations.cloud_gcp.GcpApiImpl", + "--groupby-name=e2e-count", + "-ZGCP_PROJECT_ID=bigtable-project-id", + "-ZGCP_INSTANCE_ID=bigtable-instance-id") + println(submittedJobId) + } + ignore("Used to iterate locally. Do not enable this in CI/CD!") { val submitter = DataprocSubmitter() val submittedJobId = submitter.submit( + spark.SparkJob, + Map(MainClass -> "ai.chronon.spark.Driver", + JarURI -> "gs://zipline-jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar"), List("gs://zipline-jars/training_set.v1", "gs://zipline-jars/dataproc-submitter-conf.yaml", "gs://zipline-jars/additional-confs.yaml"), @@ -67,7 +89,11 @@ class DataprocSubmitterTest extends AnyFunSuite with MockitoSugar { val submitter = DataprocSubmitter() val submittedJobId = - submitter.submit(List.empty, + submitter.submit( + spark.SparkJob, + Map(MainClass -> "ai.chronon.spark.Driver", + JarURI -> "gs://zipline-jars/cloud_gcp-assembly-0.1.0-SNAPSHOT.jar"), + List.empty, "groupby-upload-bulk-load", "-ZGCP_PROJECT_ID=bigtable-project-id", "-ZGCP_INSTANCE_ID=bigtable-instance-id", diff --git a/flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala b/flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala index e8997d96fe..1f7d79519d 100644 --- a/flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala +++ b/flink/src/main/scala/ai/chronon/flink/AsyncKVStoreWriter.scala @@ -119,7 +119,7 @@ class AsyncKVStoreWriter(onlineImpl: Api, featureGroupName: String) // in the KVStore - we log the exception and skip the object to // not fail the app errorCounter.inc() - logger.error(s"Caught exception writing to KVStore for object: $input - $exception") + logger.error(s"Caught exception writing to KVStore for object: $input", exception) resultFuture.complete(util.Arrays.asList[WriteResponse](WriteResponse(input, status = false))) } } diff --git a/flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala b/flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala index 5d690c817d..20bb43a558 100644 --- a/flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala +++ b/flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala @@ -108,7 +108,7 @@ case class AvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed) case e: Exception => // To improve availability, we don't rethrow the exception. We just drop the event // and track the errors in a metric. Alerts should be set up on this metric. - logger.error(s"Error converting to Avro bytes - $e") + logger.error("Error converting to Avro bytes", e) eventProcessingErrorCounter.inc() avroConversionErrorCounter.inc() } diff --git a/flink/src/main/scala/ai/chronon/flink/FlinkJob.scala b/flink/src/main/scala/ai/chronon/flink/FlinkJob.scala index 419024b4e0..a86bcae0c2 100644 --- a/flink/src/main/scala/ai/chronon/flink/FlinkJob.scala +++ b/flink/src/main/scala/ai/chronon/flink/FlinkJob.scala @@ -9,6 +9,7 @@ import ai.chronon.flink.window.FlinkRowAggProcessFunction import ai.chronon.flink.window.FlinkRowAggregationFunction import ai.chronon.flink.window.KeySelector import ai.chronon.flink.window.TimestampedTile +import ai.chronon.online.Api import ai.chronon.online.GroupByServingInfoParsed import ai.chronon.online.KVStore.PutRequest import ai.chronon.online.SparkConversions @@ -22,6 +23,9 @@ import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.spark.sql.Encoder +import org.rogach.scallop.ScallopConf +import org.rogach.scallop.ScallopOption +import org.rogach.scallop.Serialization import org.slf4j.LoggerFactory /** @@ -196,3 +200,56 @@ class FlinkJob[T](eventSrc: FlinkSource[T], ) } } + +object FlinkJob { + // Pull in the Serialization trait to sidestep: https://github.com/scallop/scallop/issues/137 + class JobArgs(args: Seq[String]) extends ScallopConf(args) with Serialization { + val onlineClass: ScallopOption[String] = + opt[String](required = true, + descr = "Fully qualified Online.Api based class. We expect the jar to be on the class path") + val groupbyName: ScallopOption[String] = + opt[String](required = true, descr = "The name of the groupBy to process") + val mockSource: ScallopOption[Boolean] = + opt[Boolean](required = false, descr = "Use a mocked data source instead of a real source", default = Some(true)) + + val apiProps: Map[String, String] = props[String]('Z', descr = "Props to configure API / KV Store") + + verify() + } + + def main(args: Array[String]): Unit = { + val jobArgs = new JobArgs(args) + jobArgs.groupbyName() + val onlineClassName = jobArgs.onlineClass() + val props = jobArgs.apiProps.map(identity) + val useMockedSource = jobArgs.mockSource() + + val api = buildApi(onlineClassName, props) + val flinkJob = + if (useMockedSource) { + // We will yank this conditional block when we wire up our real sources etc. + TestFlinkJob.buildTestFlinkJob(api) + } else { + // TODO - what we need to do when we wire this up for real + // lookup groupByServingInfo by groupByName from the kv store + // based on the topic type (e.g. kafka / pubsub) and the schema class name: + // 1. lookup schema object using SchemaProvider (e.g SchemaRegistry / Jar based) + // 2. Create the appropriate Encoder for the given schema type + // 3. Invoke the appropriate source provider to get the source, encoder, parallelism + throw new IllegalArgumentException("We don't support non-mocked sources like Kafka / PubSub yet!") + } + + val env = StreamExecutionEnvironment.getExecutionEnvironment + // TODO add useful configs + flinkJob.runGroupByJob(env).addSink(new PrintSink) // TODO wire up a metrics sink / such + env.execute(s"${flinkJob.groupByName}") + } + + def buildApi(onlineClass: String, props: Map[String, String]): Api = { + val cl = Thread.currentThread().getContextClassLoader // Use Flink's classloader + val cls = cl.loadClass(onlineClass) + val constructor = cls.getConstructors.apply(0) + val onlineImpl = constructor.newInstance(props) + onlineImpl.asInstanceOf[Api] + } +} diff --git a/flink/src/main/scala/ai/chronon/flink/SourceProvider.scala b/flink/src/main/scala/ai/chronon/flink/SourceProvider.scala new file mode 100644 index 0000000000..8e3281e413 --- /dev/null +++ b/flink/src/main/scala/ai/chronon/flink/SourceProvider.scala @@ -0,0 +1,23 @@ +package ai.chronon.flink + +import ai.chronon.online.GroupByServingInfoParsed +import org.apache.spark.sql.Encoder + +/** + * SourceProvider is an abstract class that provides a way to build a source for a Flink job. + * It takes the groupByServingInfo as an argument and based on the configured GB details, configures + * the Flink source (e.g. Kafka or PubSub) with the right parallelism etc. + */ +abstract class SourceProvider[T](maybeGroupByServingInfoParsed: Option[GroupByServingInfoParsed]) { + // Returns a tuple of the source, parallelism + def buildSource(): (FlinkSource[T], Int) +} + +/** + * EncoderProvider is an abstract class that provides a way to build an Spark encoder for a Flink job. + * These encoders are used in the SparkExprEval Flink function to convert the incoming stream into types + * that are amenable for tiled / untiled processing. + */ +abstract class EncoderProvider[T] { + def buildEncoder(): Encoder[T] +} diff --git a/flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala b/flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala index 0bdb941f16..bad31d0e28 100644 --- a/flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala +++ b/flink/src/main/scala/ai/chronon/flink/SparkExpressionEvalFn.scala @@ -109,7 +109,7 @@ class SparkExpressionEvalFn[T](encoder: Encoder[T], groupBy: GroupBy) extends Ri case e: Exception => // To improve availability, we don't rethrow the exception. We just drop the event // and track the errors in a metric. Alerts should be set up on this metric. - logger.error(s"Error evaluating Spark expression - $e") + logger.error("Error evaluating Spark expression", e) exprEvalErrorCounter.inc() } } diff --git a/flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala b/flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala new file mode 100644 index 0000000000..83d1cde90f --- /dev/null +++ b/flink/src/main/scala/ai/chronon/flink/TestFlinkJob.scala @@ -0,0 +1,168 @@ +package ai.chronon.flink + +import ai.chronon.api.Accuracy +import ai.chronon.api.Builders +import ai.chronon.api.Extensions.WindowOps +import ai.chronon.api.Extensions.WindowUtils +import ai.chronon.api.GroupBy +import ai.chronon.api.GroupByServingInfo +import ai.chronon.api.Operation +import ai.chronon.api.PartitionSpec +import ai.chronon.api.TimeUnit +import ai.chronon.api.Window +import ai.chronon.online.Api +import ai.chronon.online.Extensions.StructTypeOps +import ai.chronon.online.GroupByServingInfoParsed +import org.apache.flink.api.scala.createTypeInformation +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.scala.DataStream +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.types.StructType +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import scala.jdk.CollectionConverters.asScalaBufferConverter + +// +// This file contains utility classes to spin up a TestFlink job to test end to end submission of Flink +// jobs to Flink clusters (e.g. DataProc). We mock things out at the moment to go with an in-memory +// datastream source as well as created a mocked GroupByServingInfo. The job does write out data to +// the configured KV store. + +case class E2ETestEvent(id: String, int_val: Int, double_val: Double, created: Long) + +class E2EEventSource(mockEvents: Seq[E2ETestEvent]) extends FlinkSource[E2ETestEvent] { + + override def getDataStream(topic: String, groupName: String)(env: StreamExecutionEnvironment, + parallelism: Int): DataStream[E2ETestEvent] = { + env.fromCollection(mockEvents) + } +} + +class PrintSink extends SinkFunction[WriteResponse] { + @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) + + override def invoke(value: WriteResponse, context: SinkFunction.Context): Unit = { + val elapsedTime = System.currentTimeMillis() - value.putRequest.tsMillis.get + logger.info(s"Received write response with status ${value.status}; elapsedTime = $elapsedTime ms") + } +} + +class MockedEncoderProvider extends EncoderProvider[E2ETestEvent] { + override def buildEncoder(): Encoder[E2ETestEvent] = Encoders.product[E2ETestEvent] +} + +class MockedSourceProvider extends SourceProvider[E2ETestEvent](None) { + import TestFlinkJob._ + + override def buildSource(): (FlinkSource[E2ETestEvent], Int) = { + val eventSrc = makeSource() + val parallelism = 2 // TODO - take parallelism as a job param + + (eventSrc, parallelism) + } +} + +object TestFlinkJob { + def makeSource(): FlinkSource[E2ETestEvent] = { + val startTs = System.currentTimeMillis() + val elements = (0 until 10).map(i => E2ETestEvent(s"test$i", i, i.toDouble, startTs)) + new E2EEventSource(elements) + } + + def makeTestGroupByServingInfoParsed(groupBy: GroupBy, + inputSchema: StructType, + outputSchema: StructType): GroupByServingInfoParsed = { + val groupByServingInfo = new GroupByServingInfo() + groupByServingInfo.setGroupBy(groupBy) + + // Set input avro schema for groupByServingInfo + groupByServingInfo.setInputAvroSchema( + inputSchema.toAvroSchema("Input").toString(true) + ) + + // Set key avro schema for groupByServingInfo + groupByServingInfo.setKeyAvroSchema( + StructType( + groupBy.keyColumns.asScala.map { keyCol => + val keyColStructType = outputSchema.fields.find(field => field.name == keyCol) + keyColStructType match { + case Some(col) => col + case None => + throw new IllegalArgumentException(s"Missing key col from output schema: $keyCol") + } + } + ).toAvroSchema("Key") + .toString(true) + ) + + // Set value avro schema for groupByServingInfo + val aggInputColNames = groupBy.aggregations.asScala.map(_.inputColumn).toList + groupByServingInfo.setSelectedAvroSchema( + StructType(outputSchema.fields.filter(field => aggInputColNames.contains(field.name))) + .toAvroSchema("Value") + .toString(true) + ) + + new GroupByServingInfoParsed( + groupByServingInfo, + PartitionSpec(format = "yyyy-MM-dd", spanMillis = WindowUtils.Day.millis) + ) + } + + def makeGroupBy(keyColumns: Seq[String], filters: Seq[String] = Seq.empty): GroupBy = + Builders.GroupBy( + sources = Seq( + Builders.Source.events( + table = "events.my_stream_raw", + topic = "events.my_stream", + query = Builders.Query( + selects = Map( + "id" -> "id", + "int_val" -> "int_val", + "double_val" -> "double_val" + ), + wheres = filters, + timeColumn = "created", + startPartition = "20231106" + ) + ) + ), + keyColumns = keyColumns, + aggregations = Seq( + Builders.Aggregation( + operation = Operation.SUM, + inputColumn = "double_val", + windows = Seq( + new Window(1, TimeUnit.DAYS) + ) + ) + ), + metaData = Builders.MetaData( + name = "e2e-count" + ), + accuracy = Accuracy.TEMPORAL + ) + + def buildTestFlinkJob(api: Api): FlinkJob[E2ETestEvent] = { + val encoderProvider = new MockedEncoderProvider() + val encoder = encoderProvider.buildEncoder() + + val groupBy = makeGroupBy(Seq("id")) + val sourceProvider = new MockedSourceProvider() + val (eventSrc, parallelism) = sourceProvider.buildSource() + + val outputSchema = new SparkExpressionEvalFn(encoder, groupBy).getOutputSchema + val groupByServingInfoParsed = makeTestGroupByServingInfoParsed(groupBy, encoder.schema, outputSchema) + + new FlinkJob( + eventSrc = eventSrc, + sinkFn = new AsyncKVStoreWriter(api, groupBy.metaData.name), + groupByServingInfoParsed = groupByServingInfoParsed, + encoder = encoder, + parallelism = parallelism + ) + } +} diff --git a/spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala b/spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala index 4efadaef3d..2f7e159d83 100644 --- a/spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala +++ b/spark/src/main/scala/ai/chronon/spark/JobSubmitter.scala @@ -1,8 +1,12 @@ package ai.chronon.spark +sealed trait JobType +case object SparkJob extends JobType +case object FlinkJob extends JobType + trait JobSubmitter { - def submit(files: List[String], args: String*): String + def submit(jobType: JobType, jobProperties: Map[String, String], files: List[String], args: String*): String def status(jobId: String): Unit @@ -12,3 +16,9 @@ trait JobSubmitter { abstract class JobAuth { def token(): Unit = {} } + +object JobSubmitterConstants { + val MainClass = "mainClass" + val JarURI = "jarUri" + val FlinkMainJarURI = "flinkMainJarUri" +}