Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ lazy val api = project
"org.scalatest" %% "scalatest" % "3.2.19" % "test",
"org.scalatestplus" %% "mockito-3-4" % "3.2.10.0" % "test",
// needed by thrift
"org.slf4j" % "slf4j-api" % slf4jApiVersion,
"org.slf4j" % "slf4j-api" % slf4jApiVersion
)
)

Expand Down Expand Up @@ -217,10 +217,10 @@ lazy val flink = project
libraryDependencies ++= flink_all,
assembly / assemblyMergeStrategy := {
case PathList("META-INF", "services", xs @ _*) => MergeStrategy.concat
case "reference.conf" => MergeStrategy.concat
case "application.conf" => MergeStrategy.concat
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case _ => MergeStrategy.first
case "reference.conf" => MergeStrategy.concat
case "application.conf" => MergeStrategy.concat
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case _ => MergeStrategy.first
},
// Exclude Hadoop & Guava from the assembled JAR
// Else we hit an error - IllegalAccessError: class org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its
Expand All @@ -229,7 +229,10 @@ lazy val flink = project
// Or: 'com/google/protobuf/MapField' is not assignable to 'com/google/protobuf/MapFieldReflectionAccessor'
assembly / assemblyExcludedJars := {
val cp = (assembly / fullClasspath).value
cp filter { jar => jar.data.getName.startsWith("hadoop-") || jar.data.getName.startsWith("guava") || jar.data.getName.startsWith("protobuf")}
cp filter { jar =>
jar.data.getName.startsWith("hadoop-") || jar.data.getName.startsWith("guava") || jar.data.getName
.startsWith("protobuf")
}
},
libraryDependencies += "org.apache.flink" % "flink-test-utils" % flink_1_17 % Test excludeAll (
ExclusionRule(organization = "org.apache.logging.log4j", name = "log4j-api"),
Expand Down Expand Up @@ -261,16 +264,16 @@ lazy val cloud_gcp = project
// assembly merge settings to allow Flink jobs to kick off
assembly / assemblyMergeStrategy := {
case PathList("META-INF", "services", xs @ _*) => MergeStrategy.concat // Add to include channel provider
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case "reference.conf" => MergeStrategy.concat
case "application.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case "reference.conf" => MergeStrategy.concat
case "application.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
},
libraryDependencies += "org.mockito" % "mockito-core" % "5.12.0" % Test,
libraryDependencies += "com.google.cloud" % "google-cloud-bigtable-emulator" % "0.178.0" % Test,
// force a newer version of reload4j to sidestep: https://security.snyk.io/vuln/SNYK-JAVA-CHQOSRELOAD4J-5731326
dependencyOverrides ++= Seq(
"ch.qos.reload4j" % "reload4j" % "1.2.25",
"ch.qos.reload4j" % "reload4j" % "1.2.25"
)
)

Expand Down Expand Up @@ -450,22 +453,19 @@ lazy val hub = (project in file("hub"))
}
)


// orchestrator
lazy val orchestration = project
.dependsOn(online.%("compile->compile;test->test"))
.settings(
assembly / mainClass := Some("ai.chronon.orchestration.RepoParser"),

Compile / run / mainClass := Some("ai.chronon.orchestration.RepoParser"),
Compile / unmanagedResourceDirectories += baseDirectory.value / "src" / "main" / "resources",

libraryDependencies ++= Seq(
"org.apache.logging.log4j" %% "log4j-api-scala" % "13.1.0",
"org.apache.logging.log4j" % "log4j-core" % "2.20.0",
// "org.slf4j" % "slf4j-api" % slf4jApiVersion,
"org.scalatest" %% "scalatest" % "3.2.19" % "test",
),
"org.scalatest" %% "scalatest" % "3.2.19" % "test"
)
)

ThisBuild / assemblyMergeStrategy := {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ case class GcpFormatProvider(sparkSession: SparkSession) extends FormatProvider
val sparkOptions: Map[String, String] = Map(
// todo(tchow): No longer needed after https://github.com/GoogleCloudDataproc/spark-bigquery-connector/pull/1320
"temporaryGcsBucket" -> sparkSession.conf.get("spark.chronon.table.gcs.temporary_gcs_bucket"),
"writeMethod" -> "indirect",
"materializationProject" -> sparkSession.conf.get("spark.chronon.table.gcs.connector_output_project"),
"materializationDataset" -> sparkSession.conf.get("spark.chronon.table.gcs.connector_output_dataset")
"writeMethod" -> "direct",
Copy link
Collaborator Author

@tchow-zlai tchow-zlai Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in later versions of the spark-bigquery connector, direct writes do support partitioned tables.

"materializationProject" -> tableId.getProject,
"materializationDataset" -> tableId.getDataset
) ++ partitionColumnOption

BigQueryFormat(tableId.getProject, sparkOptions)
Expand Down
2 changes: 0 additions & 2 deletions spark/src/main/scala/ai/chronon/spark/TableUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
df
}

// Needs provider
def tableReachable(tableName: String): Boolean = {
try {
loadTable(tableName)
Expand All @@ -133,7 +132,6 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
}
}

// Needs provider
def loadTable(tableName: String): DataFrame = {
sparkSession.read.load(DataPointer.from(tableName, sparkSession))
}
Expand Down
Loading