Skip to content

Commit 9499ce5

Browse files
chore: slim down TableUtils (#719)
## Summary - Remove latest label view since it depends on some partition methods taht are lightly used. We don't use this Label Join anyway anymore so it's fine to deprecate. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Removed Features** - Removed support for creating and managing "latest label" views and their associated mapping logic. - Eliminated utility methods for checking and retrieving all table partitions. - **Bug Fixes** - Improved partition presence checks to include table reachability and more explicit partition retrieval. - **Breaking Changes** - Updated the return type of partition parsing to preserve order and allow duplicate keys. - **Tests** - Removed tests related to partition utilities and latest label mapping. <!-- end of auto-generated comment: release notes by coderabbit.ai --> <!-- av pr metadata This information is embedded by the av CLI when creating PRs to track the status of stacks when using Aviator. Please do not delete or edit this section of the PR. ``` {"parent":"main","parentHead":"","trunk":"main"} ``` --> --------- Co-authored-by: thomaschow <[email protected]>
1 parent 123f1bc commit 9499ce5

File tree

11 files changed

+18
-258
lines changed

11 files changed

+18
-258
lines changed

api/src/main/scala/ai/chronon/api/Extensions.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ object Extensions {
131131
def outputTable: String = s"${metaData.outputNamespace}.${metaData.cleanName}"
132132
def outputLabelTable: String = s"${metaData.outputNamespace}.${metaData.cleanName}_labels"
133133
def outputFinalView: String = s"${metaData.outputNamespace}.${metaData.cleanName}_labeled"
134-
def outputLatestLabelView: String = s"${metaData.outputNamespace}.${metaData.cleanName}_labeled_latest"
135134
def outputLabelTableV2: String =
136135
s"${metaData.outputNamespace}.${metaData.cleanName}_with_labels" // Used for the LabelJoinV2 flow
137136
def loggedTable: String = s"${outputTable}_logged"

cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigQueryCatalogTest.scala

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,7 @@ import ai.chronon.spark.catalog.{FormatProvider, Iceberg, TableUtils}
44
import ai.chronon.spark.submission.SparkSessionBuilder
55
import com.esotericsoftware.kryo.Kryo
66
import com.esotericsoftware.kryo.io.{Input, Output}
7-
import com.google.cloud.hadoop.fs.gcs.{
8-
GoogleHadoopFS,
9-
GoogleHadoopFileSystem,
10-
GoogleHadoopFileSystemConfiguration,
11-
HadoopConfigurationProperty
12-
}
7+
import com.google.cloud.hadoop.fs.gcs.{GoogleHadoopFS, GoogleHadoopFileSystem, GoogleHadoopFileSystemConfiguration, HadoopConfigurationProperty}
138
import com.google.cloud.spark.bigquery.SparkBigQueryUtil
149
import org.apache.iceberg.gcp.bigquery.{BigQueryMetastoreCatalog => BQMSCatalog}
1510
import org.apache.iceberg.gcp.gcs.GCSFileIO
@@ -106,7 +101,7 @@ class BigQueryCatalogTest extends AnyFlatSpec with MockitoSugar {
106101
val table = tableUtils.loadTable(nativeTable)
107102
table.show
108103
// val database = tableUtils.createDatabase("test_database")
109-
val allParts = tableUtils.allPartitions(nativeTable)
104+
val allParts = tableUtils.partitions(nativeTable)
110105
println(allParts)
111106
}
112107

@@ -116,7 +111,7 @@ class BigQueryCatalogTest extends AnyFlatSpec with MockitoSugar {
116111
val table = tableUtils.loadTable(externalTable)
117112
table.show
118113
// val database = tableUtils.createDatabase("test_database")
119-
val allParts = tableUtils.allPartitions(externalTable)
114+
val allParts = tableUtils.partitions(externalTable)
120115
println(allParts)
121116
}
122117

spark/src/main/scala/ai/chronon/spark/Driver.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,13 +1013,19 @@ object Driver {
10131013
val tableUtils = args.buildTableUtils()
10141014

10151015
val isAllPartitionsPresent = tablesToPartitionSpec.forall { case (tbl, spec) =>
1016-
val result = tableUtils.containsPartitions(tbl, spec)
1017-
if (result) {
1016+
val containsSpec = if (tableUtils.tableReachable(tbl)) {
1017+
val partList = tableUtils.partitions(tbl, spec.tail.toMap, partitionColumnName = spec.head._1)
1018+
partList.nonEmpty
1019+
} else {
1020+
logger.info(s"Table ${tbl} is not reachable.")
1021+
false
1022+
}
1023+
if (containsSpec) {
10181024
logger.info(s"Table ${tbl} has partition ${spec} present.")
10191025
} else {
10201026
logger.info(s"Table ${tbl} does not have partition ${spec} present.")
10211027
}
1022-
result
1028+
containsSpec
10231029
}
10241030
if (isAllPartitionsPresent) {
10251031
logger.info(s"All partitions ${partitionNames} are present.")

spark/src/main/scala/ai/chronon/spark/JoinUtils.scala

Lines changed: 0 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -252,86 +252,6 @@ object JoinUtils {
252252
tableUtils.sql(sqlStatement)
253253
}
254254

255-
/** *
256-
* Method to create a view with latest available label_ds for a given ds. This view is built
257-
* on top of final label view which has all label versions available.
258-
* This view will inherit the final label view properties as well.
259-
*/
260-
def createLatestLabelView(viewName: String,
261-
baseView: String,
262-
tableUtils: TableUtils,
263-
propertiesOverride: Map[String, String] = null): Unit = {
264-
val baseViewProperties: Map[String, String] = tableUtils.getTableProperties(baseView).getOrElse(Map.empty)
265-
val labelTableName = baseViewProperties.getOrElse(Constants.LabelViewPropertyKeyLabelTable.toString, "")
266-
assert(labelTableName.nonEmpty, "Not able to locate underlying label table for partitions")
267-
268-
val labelMapping: Map[String, Seq[PartitionRange]] = getLatestLabelMapping(labelTableName, tableUtils)
269-
val caseDefinitions = labelMapping.flatMap { case (ds: String, ranges: Seq[PartitionRange]) =>
270-
ranges
271-
.map(range =>
272-
"WHEN " + range.betweenClauses(
273-
tableUtils.partitionColumn) + s" THEN ${Constants.LabelPartitionColumn} = '$ds'")
274-
.toList
275-
}
276-
277-
val createFragment = s"""CREATE OR REPLACE VIEW $viewName"""
278-
val queryFragment =
279-
s"""
280-
| AS SELECT *
281-
| FROM $baseView
282-
| WHERE (
283-
| CASE
284-
| ${caseDefinitions.mkString("\n ")}
285-
| ELSE true
286-
| END
287-
| )
288-
| """.stripMargin
289-
290-
val mergedProperties =
291-
if (propertiesOverride != null) baseViewProperties ++ propertiesOverride
292-
else baseViewProperties
293-
val propertiesFragment = if (mergedProperties.nonEmpty) {
294-
s"""TBLPROPERTIES (
295-
| ${mergedProperties.map { case (k, v) => s"'$k'='$v'" }.mkString(",\n ")}
296-
|)""".stripMargin
297-
} else {
298-
""
299-
}
300-
val sqlStatement = Seq(createFragment, propertiesFragment, queryFragment).mkString("\n")
301-
tableUtils.sql(sqlStatement)
302-
}
303-
304-
/** compute the mapping label_ds -> PartitionRange of ds which has this label_ds as latest version
305-
* - Get all partitions from table
306-
* - For each ds, find the latest available label_ds
307-
* - Reverse the mapping and get the ds partition range for each label version(label_ds)
308-
*
309-
* @return Mapping of the label ds -> partition ranges of ds which has this label available as latest
310-
*/
311-
def getLatestLabelMapping(tableName: String, tableUtils: TableUtils): Map[String, collection.Seq[PartitionRange]] = {
312-
val partitions = tableUtils.allPartitions(tableName)
313-
assert(
314-
partitions.head.keys.equals(Set(tableUtils.partitionColumn, Constants.LabelPartitionColumn)),
315-
s""" Table must have label partition columns for latest label computation: `${tableUtils.partitionColumn}`
316-
| & `${Constants.LabelPartitionColumn}`
317-
|inputView: $tableName
318-
|""".stripMargin
319-
)
320-
321-
val labelMap = collection.mutable.Map[String, String]()
322-
partitions.foreach(par => {
323-
val ds_value = par(tableUtils.partitionColumn)
324-
val label_value: String = par(Constants.LabelPartitionColumn)
325-
if (!labelMap.contains(ds_value)) {
326-
labelMap.put(ds_value, label_value)
327-
} else {
328-
labelMap.put(ds_value, Seq(labelMap(ds_value), label_value).max)
329-
}
330-
})
331-
332-
labelMap.groupBy(_._2).map { case (v, kvs) => (v, tableUtils.chunk(kvs.keySet.toSet)) }
333-
}
334-
335255
/** Generate a Bloom filter for 'joinPart' when the row count to be backfilled falls below a specified threshold.
336256
* This method anticipates that there will likely be a substantial number of rows on the right side that need to be filtered out.
337257
* @return bloomfilter map option for right part

spark/src/main/scala/ai/chronon/spark/LabelJoin.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,6 @@ class LabelJoin(joinConf: api.Join, tableUtils: TableUtils, labelDS: String) {
111111
viewProperties = Map(Constants.LabelViewPropertyKeyLabelTable -> outputLabelTable,
112112
Constants.LabelViewPropertyFeatureTable -> joinConf.metaData.outputTable)
113113
)
114-
logger.info(s"Final labeled view created: ${joinConf.metaData.outputFinalView}")
115-
JoinUtils.createLatestLabelView(joinConf.metaData.outputLatestLabelView,
116-
baseView = joinConf.metaData.outputFinalView,
117-
tableUtils)
118-
logger.info(s"Final view with latest label created: ${joinConf.metaData.outputLatestLabelView}")
119114
labelDf
120115
}
121116
}

spark/src/main/scala/ai/chronon/spark/catalog/Format.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,14 @@ trait Format {
8787

8888
object Format {
8989

90-
def parseHiveStylePartition(pstring: String): Map[String, String] = {
90+
def parseHiveStylePartition(pstring: String): List[(String, String)] = {
9191
pstring
9292
.split("/")
9393
.map { part =>
9494
val p = part.split("=", 2)
9595
p(0) -> p(1)
9696
}
97-
.toMap
97+
.toList
9898
}
9999

100100
}

spark/src/main/scala/ai/chronon/spark/catalog/Hive.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ case object Hive extends Format {
2020
sparkSession.sqlContext
2121
.sql(s"SHOW PARTITIONS $tableName")
2222
.collect()
23-
.map(row => Format.parseHiveStylePartition(row.getString(0)))
23+
.map(row => Format.parseHiveStylePartition(row.getString(0)).toMap)
2424
.toList
2525
}
2626

spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -137,49 +137,6 @@ class TableUtils(@transient val sparkSession: SparkSession) extends Serializable
137137
}
138138
}
139139

140-
def containsPartitions(tableName: String, partitionSpec: Map[String, String]): Boolean = {
141-
if (!tableReachable(tableName)) return false
142-
143-
val format = tableFormatProvider
144-
.readFormat(tableName)
145-
.getOrElse(
146-
throw new IllegalStateException(
147-
s"Could not determine read format of table ${tableName}. It is no longer reachable."))
148-
149-
format match {
150-
case Iceberg => {
151-
partitionSpec.values.toSet.subsetOf(this.partitions(tableName).toSet)
152-
}
153-
case _ => this.allPartitions(tableName).contains(partitionSpec)
154-
}
155-
156-
}
157-
158-
// return all specified partition columns in a table in format of Map[partitionName, PartitionValue]
159-
def allPartitions(tableName: String, partitionColumnsFilter: List[String] = List.empty): List[Map[String, String]] = {
160-
161-
if (!tableReachable(tableName)) return List.empty[Map[String, String]]
162-
163-
val format = tableFormatProvider
164-
.readFormat(tableName)
165-
.getOrElse(
166-
throw new IllegalStateException(
167-
s"Could not determine read format of table ${tableName}. It is no longer reachable."))
168-
val partitionSeq = format.partitions(tableName, "")(sparkSession)
169-
170-
if (partitionColumnsFilter.isEmpty) {
171-
172-
partitionSeq
173-
174-
} else {
175-
176-
partitionSeq.map { partitionMap =>
177-
partitionMap.filterKeys(key => partitionColumnsFilter.contains(key)).toMap
178-
}
179-
180-
}
181-
}
182-
183140
def partitions(tableName: String,
184141
subPartitionsFilter: Map[String, String] = Map.empty,
185142
partitionRange: Option[PartitionRange] = None,

spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -439,49 +439,6 @@ class TableUtilsTest extends AnyFlatSpec {
439439
)))
440440
}
441441

442-
it should "all partitions and get latest label mapping" in {
443-
val tableName = "db.test_show_partitions"
444-
spark.sql("CREATE DATABASE IF NOT EXISTS db")
445-
446-
val columns1 = Array(
447-
StructField("long_field", LongType),
448-
StructField("int_field", IntType),
449-
StructField("ds", StringType),
450-
StructField("label_ds", StringType)
451-
)
452-
val df1 = makeDf(
453-
spark,
454-
StructType(
455-
tableName,
456-
columns1
457-
),
458-
List(
459-
Row(1L, 2, "2022-10-01", "2022-11-01"),
460-
Row(2L, 2, "2022-10-02", "2022-11-02"),
461-
Row(3L, 8, "2022-10-05", "2022-11-05"),
462-
Row(1L, 2, "2022-10-01", "2022-11-09"),
463-
Row(2L, 2, "2022-10-02", "2022-11-09"),
464-
Row(3L, 8, "2022-10-05", "2022-11-09")
465-
)
466-
)
467-
tableUtils.insertPartitions(df1,
468-
tableName,
469-
partitionColumns = List(tableUtils.partitionColumn, Constants.LabelPartitionColumn))
470-
val par = tableUtils.allPartitions(tableName)
471-
assertTrue(par.size == 6)
472-
assertEquals(par.head.keys, Set(tableUtils.partitionColumn, Constants.LabelPartitionColumn))
473-
474-
// filter subset of partitions
475-
val filtered = tableUtils.allPartitions(tableName, List(Constants.LabelPartitionColumn))
476-
assertTrue(filtered.size == 6)
477-
assertEquals(filtered.head.keys, Set(Constants.LabelPartitionColumn))
478-
479-
// verify the latest label version
480-
val labels = JoinUtils.getLatestLabelMapping(tableName, tableUtils)
481-
assertEquals(labels("2022-11-09"),
482-
List(PartitionRange("2022-10-01", "2022-10-02"), PartitionRange("2022-10-05", "2022-10-05")))
483-
}
484-
485442
private def prepareTestDataWithSubPartitions(tableName: String): Unit = {
486443
spark.sql("CREATE DATABASE IF NOT EXISTS db")
487444
val columns1 = Array(

spark/src/test/scala/ai/chronon/spark/test/join/FeatureWithLabelJoinTest.scala

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,6 @@ class FeatureWithLabelJoinTest extends AnyFlatSpec {
9494
.select("label_ds")
9595
.first()
9696
.get(0))
97-
98-
//validate the latest label view
99-
val latest = tableUtils.sql(s"select * from ${joinConf.metaData.outputLatestLabelView} order by label_ds")
100-
latest.show()
101-
// latest label should be all same "2022-11-11"
102-
assertEquals(latest.agg(max("label_ds")).first().getString(0), latest.agg(min("label_ds")).first().getString(0))
103-
assertEquals("2022-11-11", latest.agg(max("label_ds")).first().getString(0))
10497
}
10598

10699
it should "final views with agg label" in {
@@ -164,22 +157,6 @@ class FeatureWithLabelJoinTest extends AnyFlatSpec {
164157
val runner2 = new LabelJoin(joinConf, tableUtils, "2022-10-07")
165158
val updatedLabelDf = runner2.computeLabelJoin()
166159
updatedLabelDf.show()
167-
168-
//validate the label view
169-
val latest = tableUtils.sql(s"select * from ${joinConf.metaData.outputLatestLabelView} order by label_ds")
170-
latest.show()
171-
assertEquals(2,
172-
latest
173-
.where(latest("listing") === "3" && latest("ds") === "2022-10-03")
174-
.select("label_listing_labels_agg_is_active_max_5d")
175-
.first()
176-
.get(0))
177-
assertEquals("2022-10-07",
178-
latest
179-
.where(latest("listing") === "1" && latest("ds") === "2022-10-03")
180-
.select("label_ds")
181-
.first()
182-
.get(0))
183160
}
184161

185162
private def assertResult(computed: DataFrame, expected: DataFrame): Unit = {

0 commit comments

Comments
 (0)