Skip to content

Update metadata export logic for join derivation #879

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion spark/src/main/scala/ai/chronon/spark/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,25 @@ class Analyzer(tableUtils: TableUtils,
)
}
}
val aggMetadata: ListBuffer[AggregationMetadata] = if (joinConf.hasDerivations) {
val keyAndPartitionFields =
leftDf.schema.fields ++ Seq(org.apache.spark.sql.types.StructField(tableUtils.partitionColumn, StringType))
val sparkSchema = {
val schema: Seq[(String, DataType)] = leftSchema.toSeq ++ rightSchema.toSeq
StructType(SparkConversions.fromChrononSchema(schema))
}
val dummyOutputDf = tableUtils.sparkSession
.createDataFrame(tableUtils.sparkSession.sparkContext.parallelize(immutable.Seq[Row]()), sparkSchema)
val finalOutputColumns = joinConf.derivationsScala.finalOutputColumn(dummyOutputDf.columns).toSeq
val derivedDummyOutputDf = dummyOutputDf.select(finalOutputColumns: _*)
val columns = SparkConversions.toChrononSchema(
StructType(derivedDummyOutputDf.schema.filterNot(f => keyAndPartitionFields.map(_.name).contains(f.name))))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the filterNot necessary here? I think we should keep everything that users included in derivations. For example, we should allow key columns to be in the output if users explicitly included it in derivations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The aggMetadata are actually features, so the purpose of this line is to exclude the keys from the features list.
But this may not apply to the case where the users rename an external feature to a key name, if that happens the key(which is actually an external feature) will be filtered out here 🤔

ListBuffer(columns.map { tup => toAggregationMetadata(tup._1, tup._2, joinConf.hasDerivations) }: _*)
} else {
aggregationsMetadata
Copy link
Collaborator

@hzding621 hzding621 Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename agg because agg is specific to group_by, but now we have external parts and derivations

  • aggMetadata => joinOutputFieldsMetadata
  • aggregationsMetadata => joinIntermediateFieldsMetadata

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concept of this aggMetadata is more like output features(not including keys). It will be used as the source data of features on MLI tool. I am thinking of maybe renaming this to featuresMetadata or joinOutputValuesMetadata.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yuli-han rename?

  • aggMetadata => joinOutputValuesMetadata
  • aggregationsMetadata => joinIntermediateValuesMetadata

}
// (schema map showing the names and datatypes, right side feature aggregations metadata for metadata upload)
(leftSchema ++ rightSchema, aggregationsMetadata)
(leftSchema ++ rightSchema, aggMetadata)
}

// validate the schema of the left and right side of the join and make sure the types match
Expand Down
40 changes: 28 additions & 12 deletions spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,8 @@ package ai.chronon.spark.test

import ai.chronon.aggregator.test.Column
import ai.chronon.api
import ai.chronon.api.{
Accuracy,
Builders,
Constants,
JoinPart,
LongType,
Operation,
PartitionSpec,
StringType,
TimeUnit,
Window
}
import ai.chronon.api.Builders.Derivation
import ai.chronon.api.{Accuracy, Builders, Constants, JoinPart, LongType, Operation, PartitionSpec, StringType, TimeUnit, Window}
import ai.chronon.api.Extensions._
import ai.chronon.spark.Extensions._
import ai.chronon.spark.GroupBy.renderDataSourceQuery
Expand Down Expand Up @@ -1547,4 +1537,30 @@ class JoinTest {
assert(
thrown2.getMessage.contains("Table or view not found") && thrown3.getMessage.contains("Table or view not found"))
}

def testJoinDerivationAnalyzer(): Unit = {
lazy val spark: SparkSession = SparkSessionBuilder.build("JoinTest" + "_" + Random.alphanumeric.take(6).mkString, local = true)
val tableUtils = TableUtils(spark)
val namespace = "test_join_derivation" + "_" + Random.alphanumeric.take(6).mkString
tableUtils.createDatabase(namespace)
val viewsGroupBy = getViewsGroupBy(suffix = "cumulative", makeCumulative = true, namespace)
val joinConf = getEventsEventsTemporal("cumulative", namespace)
joinConf.setJoinParts(Seq(Builders.JoinPart(groupBy = viewsGroupBy)).asJava)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a test case that contains external parts?

joinConf.setDerivations(Seq(
Derivation(
name = "*",
expression = "*"
), Derivation(
name = "test_feature_name",
expression = f"${viewsGroupBy.metaData.name}_time_spent_ms_average"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some derivations that use ts and ds as inputs?

Copy link
Collaborator

@hzding621 hzding621 Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, can we add a test case for key columns as output?
such as

Derivation(
  name = "event_id",
  expression = "ext_contextual_event_id"
)

Copy link
Collaborator

@pengyu-hou pengyu-hou Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plus one on the ts and ds expressions. Those values will be derived from the request time.

)
).asJava)


val today = tableUtils.partitionSpec.at(System.currentTimeMillis())
val (_, aggregationsMetadata) =
new Analyzer(tableUtils, joinConf, monthAgo, today).analyzeJoin(joinConf, enableHitter = false)
aggregationsMetadata.foreach(agg => {assertTrue(agg.operation == "Derivation")})
aggregationsMetadata.exists(_.name == "test_feature_name")
}
}