Skip to content
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
fd93108
add another error log
david-zlai Feb 18, 2025
55f099c
println
david-zlai Feb 18, 2025
3cf5e19
make conf_type plural
david-zlai Feb 18, 2025
91dd3cb
log the dataset
david-zlai Feb 18, 2025
da9009a
print key
david-zlai Feb 18, 2025
94d102d
bytestring instead
david-zlai Feb 18, 2025
d4daf13
force usage of metadata name
david-zlai Feb 19, 2025
b7b9b6d
println error
david-zlai Feb 19, 2025
70b7e71
add more error handling
david-zlai Feb 19, 2025
beaf474
fix attempt.
david-zlai Feb 19, 2025
82bd214
log the key
david-zlai Feb 19, 2025
9a5da22
cleanup
david-zlai Feb 19, 2025
9eac5a7
scalafmt
david-zlai Feb 19, 2025
4f4e699
test fix attempt
david-zlai Feb 19, 2025
69f00b1
removing logs
david-zlai Feb 19, 2025
f52d526
add constants
david-zlai Feb 19, 2025
fb4a569
format
david-zlai Feb 19, 2025
c62420f
address Piyush comments
david-zlai Feb 19, 2025
23e8815
test changes
david-zlai Feb 20, 2025
909466c
cleanup
david-zlai Feb 20, 2025
a6d9e31
reuse code
david-zlai Feb 20, 2025
6890cf6
fix for when confPath is provided
david-zlai Feb 20, 2025
73fc856
stop using nameToFilePath
david-zlai Feb 20, 2025
ff3a913
Remove extensions from online
david-zlai Feb 20, 2025
637831e
remvoe confkeywordsconstants and use constants file in api
david-zlai Feb 20, 2025
59b36a3
Migrate the rest of nameToFilePath out and use . instead of /
david-zlai Feb 21, 2025
ae91478
Add explanation of what's going on
david-zlai Feb 21, 2025
9a90663
Merge branch 'main' into davidhan/debug_fetch
david-zlai Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/src/main/scala/ai/chronon/api/Constants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,9 @@ object Constants {
// A negative integer within the safe range for both long and double in JavaScript, Java, Scala, Python
val magicNullLong: java.lang.Long = -1234567890L
val magicNullDouble: java.lang.Double = -1234567890.0

val JoinKeyword = "joins"
val GroupByKeyword = "group_bys"
val StagingQueryKeyword = "staging_queries"
val ModelKeyword = "models"
}
27 changes: 27 additions & 0 deletions api/src/main/scala/ai/chronon/api/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package ai.chronon.api

import ai.chronon.api.Constants._
import ai.chronon.api.DataModel._
import ai.chronon.api.Operation._
import ai.chronon.api.QueryUtils.buildSelects
Expand All @@ -39,6 +40,10 @@ import scala.util.Try

object Extensions {

private def _keyNameForKvStore(metaData: MetaData, keywordType: String): String = {
s"$keywordType/" + metaData.name
}

implicit class TimeUnitOps(timeUnit: TimeUnit) {
def str: String =
timeUnit match {
Expand Down Expand Up @@ -144,6 +149,7 @@ object Extensions {
.map(_.toScala.toMap)
.orNull

@deprecated("Use `name` instead.")
def nameToFilePath: String = metaData.name.replaceFirst("\\.", "/")

// helper function to extract values from customJson
Expand Down Expand Up @@ -438,6 +444,11 @@ object Extensions {
}

implicit class GroupByOps(groupBy: GroupBy) extends GroupBy(groupBy) {

def keyNameForKvStore: String = {
_keyNameForKvStore(groupBy.metaData, GroupByKeyword)
}

def maxWindow: Option[Window] = {
val allWindowsOpt = Option(groupBy.aggregations)
.flatMap(_.toScala.toSeq.allWindowsOpt)
Expand Down Expand Up @@ -822,6 +833,10 @@ object Extensions {
}

implicit class JoinOps(val join: Join) extends Serializable {
def keyNameForKvStore: String = {
_keyNameForKvStore(join.metaData, JoinKeyword)
}

@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
// all keys as they should appear in left that are being used on right
def leftKeyCols: Array[String] = {
Expand Down Expand Up @@ -1213,4 +1228,16 @@ object Extensions {
result
}
}

implicit class StagingQueryOps(stagingQuery: StagingQuery) {
def keyNameForKvStore: String = {
_keyNameForKvStore(stagingQuery.metaData, StagingQueryKeyword)
}
}

implicit class ModelOps(model: Model) {
def keyNameForKvStore: String = {
_keyNameForKvStore(model.metaData, ModelKeyword)
}
}
}
7 changes: 6 additions & 1 deletion online/src/main/scala/ai/chronon/online/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ trait KVStore {
if (response.values.isFailure) {
Failure(new RuntimeException(s"Request for key ${key} in dataset ${dataset} failed", response.values.failed.get))
} else {
Success(new String(response.latest.get.bytes, Constants.UTF8))
response.values.get.length match {
case 0 => {
Failure(new RuntimeException(s"Empty response from KVStore for key=${key} in dataset=${dataset}."))
}
case _ => Success(new String(response.latest.get.bytes, Constants.UTF8))
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion online/src/main/scala/ai/chronon/online/Fetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class Fetcher(val kvStore: KVStore,
}
}

val joinName = joinConf.metaData.nameToFilePath
val joinName = joinConf.metaData.name
val keySchema = StructType(s"${joinName.sanitize}_key", keyFields.toArray)
val keyCodec = AvroCodec.of(AvroConversions.fromChrononSchema(keySchema).toString)
val baseValueSchema = StructType(s"${joinName.sanitize}_value", valueFields.toArray)
Expand Down
12 changes: 9 additions & 3 deletions online/src/main/scala/ai/chronon/online/FetcherMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ai.chronon.online
import ai.chronon.api.Extensions.StringOps
import ai.chronon.api.Join
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.api.Constants._
import ai.chronon.api.ThriftJsonCodec
import ai.chronon.api.thrift.TBase
import com.fasterxml.jackson.databind.ObjectMapper
Expand Down Expand Up @@ -39,7 +40,12 @@ object FetcherMain {
val keyJson: ScallopOption[String] = opt[String](required = false, descr = "json of the keys to fetch")
val name: ScallopOption[String] = opt[String](required = false, descr = "name of the join/group-by to fetch")
val confType: ScallopOption[String] =
opt[String](required = false, descr = "Type of the conf to run. ex: join, group-by, etc")
choice(
Seq(JoinKeyword, GroupByKeyword),
required = false,
descr = "the type of conf to fetch",
default = Some(JoinKeyword)
)

val keyJsonFile: ScallopOption[String] = opt[String](
required = false,
Expand Down Expand Up @@ -185,7 +191,7 @@ object FetcherMain {
if (keyMapList.length > 1) {
logger.info(s"Plan to send ${keyMapList.length} fetches with ${args.interval()} seconds interval")
}
val fetcher = args.api.buildFetcher(true, "FetcherCLI")
val fetcher = args.api.buildFetcher(debug = true, "FetcherCLI")
def iterate(): Unit = {
keyMapList.foreach(keyMap => {
logger.info(s"--- [START FETCHING for ${keyMap}] ---")
Expand All @@ -199,7 +205,7 @@ object FetcherMain {
args.confPath.toOption.map(confPath => parseConf[Join](confPath))
val startNs = System.nanoTime
val requests = Seq(Fetcher.Request(featureName, keyMap, args.atMillis.toOption))
val resultFuture = if (args.confType() == "join") {
val resultFuture = if (args.confType() == JoinKeyword) {
fetcher.fetchJoin(requests, joinConfOption)
} else {
fetcher.fetchGroupBys(requests)
Expand Down
33 changes: 17 additions & 16 deletions online/src/main/scala/ai/chronon/online/MetadataDirWalker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import ai.chronon.api
import ai.chronon.api.Constants
import ai.chronon.api.ThriftJsonCodec
import ai.chronon.api.thrift.TBase
import ai.chronon.api.Constants._
import ai.chronon.api.Extensions._
import com.google.gson.Gson
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand All @@ -17,11 +19,6 @@ import scala.util.Try

class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String], maybeConfType: Option[String] = None) {

val JoinKeyword = "joins"
val GroupByKeyword = "group_bys"
val StagingQueryKeyword = "staging_queries"
val ModelKeyword = "models"

@transient implicit lazy val logger: Logger = LoggerFactory.getLogger(getClass)
private def loadJsonToConf[T <: TBase[_, _]: Manifest: ClassTag](file: String): Option[T] = {
try {
Expand Down Expand Up @@ -78,25 +75,29 @@ class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String], ma
nonEmptyFileList.foldLeft(Map.empty[String, Map[String, List[String]]]) { (acc, file) =>
// For each end point we apply the extractFn to the file path to extract the key value pair
val filePath = file.getPath
val optConf =
val (optConf, confKeyName) =
try {
filePath match {
case value if value.contains(s"$JoinKeyword/") || maybeConfType.contains(JoinKeyword) =>
loadJsonToConf[api.Join](filePath)
val conf = loadJsonToConf[api.Join](filePath)
(conf, conf.map(_.keyNameForKvStore))
case value if value.contains(s"$GroupByKeyword/") || maybeConfType.contains(GroupByKeyword) =>
loadJsonToConf[api.GroupBy](filePath)
val conf = loadJsonToConf[api.GroupBy](filePath)
(conf, conf.map(a => a.keyNameForKvStore))
case value if value.contains(s"$StagingQueryKeyword/") || maybeConfType.contains(StagingQueryKeyword) =>
loadJsonToConf[api.StagingQuery](filePath)
val conf = loadJsonToConf[api.StagingQuery](filePath)
(conf, conf.map(_.keyNameForKvStore))
case value if value.contains(s"$ModelKeyword/") || maybeConfType.contains(ModelKeyword) =>
loadJsonToConf[api.Model](filePath)
val conf = loadJsonToConf[api.Model](filePath)
(conf, conf.map(_.keyNameForKvStore))
}
} catch {
case e: Throwable =>
logger.error(s"Failed to parse compiled team from file path: $filePath, \nerror=${e.getMessage}")
None
(None, None)
}

if (optConf.isDefined) {
if (optConf.isDefined && confKeyName.isDefined) {
val kvPairToEndPoint: List[(String, (String, String))] = metadataEndPointNames
.map { endPointName =>
val conf = optConf.get
Expand All @@ -105,22 +106,22 @@ class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String], ma
case value if value.contains(s"$JoinKeyword/") || maybeConfType.contains(JoinKeyword) =>
MetadataEndPoint
.getEndPoint[api.Join](endPointName)
.extractFn(filePath, conf.asInstanceOf[api.Join])
.extractFn(confKeyName.get, conf.asInstanceOf[api.Join])

case value if value.contains(s"$GroupByKeyword/") || maybeConfType.contains(GroupByKeyword) =>
MetadataEndPoint
.getEndPoint[api.GroupBy](endPointName)
.extractFn(filePath, conf.asInstanceOf[api.GroupBy])
.extractFn(confKeyName.get, conf.asInstanceOf[api.GroupBy])

case value if value.contains(s"$StagingQueryKeyword/") || maybeConfType.contains(StagingQueryKeyword) =>
MetadataEndPoint
.getEndPoint[api.StagingQuery](endPointName)
.extractFn(filePath, conf.asInstanceOf[api.StagingQuery])
.extractFn(confKeyName.get, conf.asInstanceOf[api.StagingQuery])

case value if value.contains(s"$ModelKeyword/") || maybeConfType.contains(ModelKeyword) =>
MetadataEndPoint
.getEndPoint[api.Model](endPointName)
.extractFn(filePath, conf.asInstanceOf[api.Model])
.extractFn(confKeyName.get, conf.asInstanceOf[api.Model])
}

(endPointName, kVPair)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object MetadataEndPoint {
// value: entity config in json format
private def confByKeyEndPoint[Conf <: TBase[_, _]: Manifest: ClassTag] =
new MetadataEndPoint[Conf](
extractFn = (path, conf) => (path.confPathToKey, ThriftJsonCodec.toJsonStr(conf)),
extractFn = (metadataName, conf) => (metadataName, ThriftJsonCodec.toJsonStr(conf)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changing this to metadataName because metadataName is sourced from the actual conf:

{
  "metaData": {
    "name": "quickstart.training_set.v1",
....

Previously, path.confPathToKey extracts from the conf path. It's better to just be consistent and use the name in the actual conf (not the confpath)

name = ConfByKeyEndPointName
)

Expand Down
41 changes: 35 additions & 6 deletions online/src/main/scala/ai/chronon/online/MetadataStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package ai.chronon.online

import ai.chronon.api.Constants.MetadataDataset
import ai.chronon.api.Extensions.JoinOps
import ai.chronon.api.Extensions.MetadataOps
import ai.chronon.api.Extensions.StringOps
import ai.chronon.api.Extensions.WindowOps
import ai.chronon.api.Extensions.WindowUtils
import ai.chronon.api._
import ai.chronon.api.thrift.TBase
import ai.chronon.api.Constants._
import ai.chronon.online.KVStore.PutRequest
import ai.chronon.online.MetadataEndPoint.NameByTeamEndPointName
import org.slf4j.Logger
Expand All @@ -41,6 +41,22 @@ import scala.util.Try
// [timestamp -> {metric name -> metric value}]
case class DataMetrics(series: Seq[(Long, SortedMap[String, Any])])

case class ConfPathOrName(confPath: Option[String] = None, confName: Option[String] = None) {

if (confPath.isEmpty && confName.isEmpty) {
throw new IllegalArgumentException("confPath and confName cannot be both empty")
}

def computeConfKey(confKeyword: String): String = {
if (confName.isDefined) {
s"$confKeyword/" + confName.get

} else {
s"$confKeyword/" + confPath.get.split("/").takeRight(1).head
}
}
}

class MetadataStore(kvStore: KVStore,
val dataset: String = MetadataDataset,
timeoutMillis: Long,
Expand All @@ -62,9 +78,18 @@ class MetadataStore(kvStore: KVStore,
implicit val executionContext: ExecutionContext =
Option(executionContextOverride).getOrElse(FlexibleExecutionContext.buildExecutionContext)

def getConf[T <: TBase[_, _]: Manifest](confPathOrName: String): Try[T] = {
def getConf[T <: TBase[_, _]: Manifest](confPathOrName: ConfPathOrName): Try[T] = {
val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
val confKey = confPathOrName.confPathToKey

val confTypeKeyword = clazz match {
case j if j == classOf[Join] => JoinKeyword
case g if g == classOf[GroupBy] => GroupByKeyword
case sq if sq == classOf[StagingQuery] => StagingQueryKeyword
case m if m == classOf[Model] => ModelKeyword
case _ => throw new IllegalArgumentException(s"Unsupported conf type: $clazz")
}

val confKey = confPathOrName.computeConfKey(confTypeKeyword)
kvStore
.getString(confKey, dataset, timeoutMillis)
.map(conf => ThriftJsonCodec.fromJsonStr[T](conf, false, clazz))
Expand Down Expand Up @@ -122,7 +147,7 @@ class MetadataStore(kvStore: KVStore,
lazy val getJoinConf: TTLCache[String, Try[JoinOps]] = new TTLCache[String, Try[JoinOps]](
{ name =>
val startTimeMs = System.currentTimeMillis()
val result = getConf[Join](s"joins/$name")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I made this change only because it's not compatible with how I've done the run.py + dataprocsubmitter at the moment.

to explain, when we do metadata upload for a join what happens is that we:

  • first take the join conf file like production/joins/quickstart/training_set.v1 and then upload it to gcs gs://zipline-warehouse-canary/metadata/production/joins/quickstart/training_set.v1
  • next, we add that as a fileUri to dataproc so that dataproc copies the gcs file over to the spark working directory
    image

dataproc will copy the file but that only copies production/joins/quickstart/ training_set.v1 . dataproc won't (or from what I see won't) preserve any part of the original path. so training_set.v1 file is at the working directory.
(Chronon would have traditionally expected production/joins/quickstart/training_set.v1 and then called confPathToKey returning **joins**/quickstart/training_set.v1)

  • and later in the code, we'll upload the join conf to BigTable's CHRONON_METADATA where we'll set the key based on the path of the conf. And since the path of the conf from dataproc is just training_set.v1, it'll look like this in bt:
    image

........this breaks when we try to ultimately do a fetch join now as on this line of code we hardcode s"joins/$name"

val result = getConf[Join](ConfPathOrName(confName = Some(name)))
.recover { case e: java.util.NoSuchElementException =>
logger.error(
s"Failed to fetch conf for join $name at joins/$name, please check metadata upload to make sure the join metadata for $name has been uploaded")
Expand All @@ -143,9 +168,10 @@ class MetadataStore(kvStore: KVStore,
{ join => Metrics.Context(environment = "join.meta.fetch", join = join) })

def putJoinConf(join: Join): Unit = {
logger.info(s"uploading join conf to dataset: $dataset by key: joins/${join.metaData.nameToFilePath}")
val joinConfKeyForKvStore = join.keyNameForKvStore
logger.info(s"uploading join conf to dataset: $dataset by key:${joinConfKeyForKvStore}")
kvStore.put(
PutRequest(s"joins/${join.metaData.nameToFilePath}".getBytes(Constants.UTF8),
PutRequest(joinConfKeyForKvStore.getBytes(Constants.UTF8),
ThriftJsonCodec.toJsonStr(join).getBytes(Constants.UTF8),
dataset))
}
Expand Down Expand Up @@ -178,6 +204,9 @@ class MetadataStore(kvStore: KVStore,
logger.error(
s"Failed to fetch metadata for $batchDataset, is it possible Group By Upload for $name has not succeeded?")
throw e
case e: Throwable =>
logger.error(s"Failed to fetch metadata for $batchDataset", e)
throw e
}
logger.info(s"Fetched ${Constants.GroupByServingInfoKey} from : $batchDataset")
if (metaData.isFailure) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class DriftStore(kvStore: KVStore,
responseContexts.map { responseContext =>
val tileSeriesKey = new TileSeriesKey()
tileSeriesKey.setSlice(responseContext.tileKey.getSlice)
tileSeriesKey.setNodeName(joinConf.getMetaData.nameToFilePath)
tileSeriesKey.setNodeName(joinConf.getMetaData.name)
tileSeriesKey.setGroupName(responseContext.groupName)
tileSeriesKey.setColumn(responseContext.tileKey.getColumn)

Expand Down
5 changes: 3 additions & 2 deletions spark/src/main/scala/ai/chronon/spark/Driver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import ai.chronon.api.Extensions.SourceOps
import ai.chronon.api.ThriftJsonCodec
import ai.chronon.api.thrift.TBase
import ai.chronon.online.Api
import ai.chronon.online.ConfPathOrName
import ai.chronon.online.FetcherMain
import ai.chronon.online.MetadataDirWalker
import ai.chronon.online.MetadataEndPoint
Expand Down Expand Up @@ -90,7 +91,7 @@ object Driver {
opt[String](required = false, descr = "GCP BigTable instance id")

val confType: ScallopOption[String] =
opt[String](required = false, descr = "Type of the conf to run. ex: join, group-by, etc")
opt[String](required = false, descr = "Type of the conf to run. ex: joins, group_bys, models, staging_queries")
}

trait OfflineSubcommand extends SharedSubCommandArgs {
Expand Down Expand Up @@ -789,7 +790,7 @@ object Driver {
val confFile = findFile(args.confPath())
val groupByConf = confFile
.map(ThriftJsonCodec.fromJsonFile[api.GroupBy](_, check = false))
.getOrElse(args.metaDataStore.getConf[api.GroupBy](args.confPath()).get)
.getOrElse(args.metaDataStore.getConf[api.GroupBy](ConfPathOrName(confPath = Some(args.confPath()))).get)

val onlineJar = findFile(args.onlineJar())
if (args.debug())
Expand Down
6 changes: 3 additions & 3 deletions spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class LogFlattenerJob(session: SparkSession,
val metrics: Metrics.Context = Metrics.Context(Metrics.Environment.JoinLogFlatten, joinConf)

private def getUnfilledRanges(inputTable: String, outputTable: String): Seq[PartitionRange] = {
val partitionName: String = joinConf.metaData.nameToFilePath.replace("/", "%2F")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one line killed me. not sure why we did this but I think it's because later on when we do SHOW PARTITIONS from hive and collect the partition values, when we store values with / in them they get encoded into %2F. so I guess we wanted to be consistent and did this hack.

I'm deleting this line and instead making sure we just use metaData.name which should come without /'s in them

image

val partitionName: String = joinConf.metaData.name
val unfilledRangeTry = Try(
tableUtils.unfilledRanges(
outputTable,
Expand All @@ -80,7 +80,7 @@ class LogFlattenerJob(session: SparkSession,
val ranges = unfilledRangeTry match {
case Failure(_: AssertionError) =>
logger.info(s"""
|The join name ${joinConf.metaData.nameToFilePath} does not have available logged data yet.
|The join name ${joinConf.metaData.name} does not have available logged data yet.
|Please double check your logging status""".stripMargin)
Seq()
case Success(None) =>
Expand Down Expand Up @@ -208,7 +208,7 @@ class LogFlattenerJob(session: SparkSession,
}
val unfilledRanges = getUnfilledRanges(logTable, joinConf.metaData.loggedTable)
if (unfilledRanges.isEmpty) return
val joinName = joinConf.metaData.nameToFilePath
val joinName = joinConf.metaData.name

val start = System.currentTimeMillis()
val columnBeforeCount = columnCount()
Expand Down
Loading