diff --git a/build.sbt b/build.sbt index b41b8363b8..60cb1419fe 100644 --- a/build.sbt +++ b/build.sbt @@ -140,7 +140,7 @@ lazy val api = project "org.scalatest" %% "scalatest" % "3.2.19" % "test", "org.scalatestplus" %% "mockito-3-4" % "3.2.10.0" % "test", // needed by thrift - "org.slf4j" % "slf4j-api" % slf4jApiVersion, + "org.slf4j" % "slf4j-api" % slf4jApiVersion ) ) @@ -217,10 +217,10 @@ lazy val flink = project libraryDependencies ++= flink_all, assembly / assemblyMergeStrategy := { case PathList("META-INF", "services", xs @ _*) => MergeStrategy.concat - case "reference.conf" => MergeStrategy.concat - case "application.conf" => MergeStrategy.concat - case PathList("META-INF", xs @ _*) => MergeStrategy.discard - case _ => MergeStrategy.first + case "reference.conf" => MergeStrategy.concat + case "application.conf" => MergeStrategy.concat + case PathList("META-INF", xs @ _*) => MergeStrategy.discard + case _ => MergeStrategy.first }, // Exclude Hadoop & Guava from the assembled JAR // Else we hit an error - IllegalAccessError: class org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its @@ -229,7 +229,10 @@ lazy val flink = project // Or: 'com/google/protobuf/MapField' is not assignable to 'com/google/protobuf/MapFieldReflectionAccessor' assembly / assemblyExcludedJars := { val cp = (assembly / fullClasspath).value - cp filter { jar => jar.data.getName.startsWith("hadoop-") || jar.data.getName.startsWith("guava") || jar.data.getName.startsWith("protobuf")} + cp filter { jar => + jar.data.getName.startsWith("hadoop-") || jar.data.getName.startsWith("guava") || jar.data.getName + .startsWith("protobuf") + } }, libraryDependencies += "org.apache.flink" % "flink-test-utils" % flink_1_17 % Test excludeAll ( ExclusionRule(organization = "org.apache.logging.log4j", name = "log4j-api"), @@ -261,16 +264,16 @@ lazy val cloud_gcp = project // assembly merge settings to allow Flink jobs to kick off assembly / assemblyMergeStrategy := { case PathList("META-INF", "services", xs @ _*) => MergeStrategy.concat // Add to include channel provider - case PathList("META-INF", xs @ _*) => MergeStrategy.discard - case "reference.conf" => MergeStrategy.concat - case "application.conf" => MergeStrategy.concat - case _ => MergeStrategy.first + case PathList("META-INF", xs @ _*) => MergeStrategy.discard + case "reference.conf" => MergeStrategy.concat + case "application.conf" => MergeStrategy.concat + case _ => MergeStrategy.first }, libraryDependencies += "org.mockito" % "mockito-core" % "5.12.0" % Test, libraryDependencies += "com.google.cloud" % "google-cloud-bigtable-emulator" % "0.178.0" % Test, // force a newer version of reload4j to sidestep: https://security.snyk.io/vuln/SNYK-JAVA-CHQOSRELOAD4J-5731326 dependencyOverrides ++= Seq( - "ch.qos.reload4j" % "reload4j" % "1.2.25", + "ch.qos.reload4j" % "reload4j" % "1.2.25" ) ) @@ -450,22 +453,19 @@ lazy val hub = (project in file("hub")) } ) - // orchestrator lazy val orchestration = project .dependsOn(online.%("compile->compile;test->test")) .settings( assembly / mainClass := Some("ai.chronon.orchestration.RepoParser"), - Compile / run / mainClass := Some("ai.chronon.orchestration.RepoParser"), Compile / unmanagedResourceDirectories += baseDirectory.value / "src" / "main" / "resources", - libraryDependencies ++= Seq( "org.apache.logging.log4j" %% "log4j-api-scala" % "13.1.0", "org.apache.logging.log4j" % "log4j-core" % "2.20.0", // "org.slf4j" % "slf4j-api" % slf4jApiVersion, - "org.scalatest" %% "scalatest" % "3.2.19" % "test", - ), + "org.scalatest" %% "scalatest" % "3.2.19" % "test" + ) ) ThisBuild / assemblyMergeStrategy := { diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala index 6ed5ac6314..82ae8490c3 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/GcpFormatProvider.scala @@ -43,9 +43,9 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider val sparkOptions: Map[String, String] = Map( // todo(tchow): No longer needed after https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/1320 "temporaryGcsBucket" -> sparkSession.conf.get("spark.chronon.table.gcs.temporary_gcs_bucket"), - "writeMethod" -> "indirect", - "materializationProject" -> sparkSession.conf.get("spark.chronon.table.gcs.connector_output_project"), - "materializationDataset" -> sparkSession.conf.get("spark.chronon.table.gcs.connector_output_dataset") + "writeMethod" -> "direct", + "materializationProject" -> tableId.getProject, + "materializationDataset" -> tableId.getDataset ) ++ partitionColumnOption BigQueryFormat(tableId.getProject, sparkOptions) diff --git a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala index b16d87aadd..b4f46dbf56 100644 --- a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala @@ -118,7 +118,6 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable df } - // Needs provider def tableReachable(tableName: String): Boolean = { try { loadTable(tableName) @@ -133,7 +132,6 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable } } - // Needs provider def loadTable(tableName: String): DataFrame = { sparkSession.read.load(DataPointer.from(tableName, sparkSession)) }