Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hub/src/main/java/ai/chronon/hub/HubVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected void startHttpServer(int port, String configJsonString, Api api, Promi
router.get("/api/v1/:name/job/type/:type").handler(RouteHandlerWrapper.createHandler(JobTracker::handle, JobTrackerRequest.class));

// hacked up in mem kv store bulkPut
KVStore inMemoryKVStore = InMemoryKvStore.build("hub", () -> null);
KVStore inMemoryKVStore = InMemoryKvStore.build("hub", () -> null, false);
// create relevant datasets in kv store
inMemoryKVStore.create(Constants.MetadataDataset());
inMemoryKVStore.create(Constants.TiledSummaryDataset());
Expand Down
53 changes: 20 additions & 33 deletions online/src/main/scala/ai/chronon/online/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,44 +73,31 @@ trait KVStore {

// helper method to blocking read a string - used for fetching metadata & not in hotpath.
def getString(key: String, dataset: String, timeoutMillis: Long): Try[String] = {

getResponse(key, dataset, timeoutMillis).values
.recoverWith { case ex =>
// wrap with more info
Failure(new RuntimeException(s"Request for key $key in dataset $dataset failed", ex))
}
.flatMap { values =>
if (values.isEmpty)
Failure(new RuntimeException(s"Empty response from KVStore for key=$key in dataset=$dataset."))
else
Success(new String(values.maxBy(_.millis).bytes, Constants.UTF8))
}
val bytesTry = getResponse(key, dataset, timeoutMillis)
bytesTry.map(bytes => new String(bytes, Constants.UTF8))
}

def getStringArray(key: String, dataset: String, timeoutMillis: Long): Try[Seq[String]] = {
val response = getResponse(key, dataset, timeoutMillis)

response.values
.map { values =>
val latestBytes = values.maxBy(_.millis).bytes
StringArrayConverter.bytesToStrings(latestBytes)
}
.recoverWith { case ex =>
// Wrap with more info
Failure(new RuntimeException(s"Request for key $key in dataset $dataset failed", ex))
}

val bytesTry = getResponse(key, dataset, timeoutMillis)
bytesTry.map(bytes => StringArrayConverter.bytesToStrings(bytes))
}

private def getResponse(key: String, dataset: String, timeoutMillis: Long): GetResponse = {
try {
val fetchRequest = KVStore.GetRequest(key.getBytes(Constants.UTF8), dataset)
val responseFutureOpt = get(fetchRequest)
Await.result(responseFutureOpt, Duration(timeoutMillis, MILLISECONDS))
} catch {
case ex: Exception =>
ex.printStackTrace()
throw ex
private def getResponse(key: String, dataset: String, timeoutMillis: Long): Try[Array[Byte]] = {
val fetchRequest = KVStore.GetRequest(key.getBytes(Constants.UTF8), dataset)
val responseFutureOpt = get(fetchRequest)

def buildException(e: Throwable) =
new RuntimeException(s"Request for key ${key} in dataset ${dataset} failed", e)

Try(Await.result(responseFutureOpt, Duration(timeoutMillis, MILLISECONDS))) match {
case Failure(e) =>
Failure(buildException(e))
case Success(resp) =>
if (resp.values.isFailure) {
Failure(buildException(resp.values.failed.get))
} else {
Success(resp.latest.get.bytes)
}
Comment on lines +92 to +100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

resp.latest.get may blow up on empty data.
If values is Success(Seq()), latest becomes Failure(NoSuchElementException), and .get re-throws outside the Try, bypassing your error path.

-          Success(resp.latest.get.bytes)
+          resp.latest.map(_.bytes)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Try(Await.result(responseFutureOpt, Duration(timeoutMillis, MILLISECONDS))) match {
case Failure(e) =>
Failure(buildException(e))
case Success(resp) =>
if (resp.values.isFailure) {
Failure(buildException(resp.values.failed.get))
} else {
Success(resp.latest.get.bytes)
}
Try(Await.result(responseFutureOpt, Duration(timeoutMillis, MILLISECONDS))) match {
case Failure(e) =>
Failure(buildException(e))
case Success(resp) =>
if (resp.values.isFailure) {
Failure(buildException(resp.values.failed.get))
} else {
resp.latest.map(_.bytes)
}
}

}
}

Expand Down
3 changes: 2 additions & 1 deletion online/src/main/scala/ai/chronon/online/JoinCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ case class JoinCodec(conf: JoinOps,
keySchema: StructType,
baseValueSchema: StructType,
keyCodec: AvroCodec,
baseValueCodec: AvroCodec)
baseValueCodec: AvroCodec,
hasPartialFailure: Boolean = false)
extends Serializable {

@transient lazy val valueSchema: StructType = {
Expand Down
46 changes: 33 additions & 13 deletions online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,16 @@ class Fetcher(val kvStore: KVStore,
ctx.distribution("derivation.latency.millis", requestEndTs - derivationStartTs)
ctx.distribution("request.latency.millis", requestEndTs - ts)

ResponseWithContext(request, finalizedDerivedMap, baseMap)
val response = ResponseWithContext(request, finalizedDerivedMap, baseMap)
// Refresh joinCodec if it has partial failure
if (joinCodec.hasPartialFailure) {
joinCodecCache.refresh(joinName)
}
response

case Failure(exception) =>
// more validation logic will be covered in compile.py to avoid this case
joinCodecCache.refresh(joinName)
ctx.incrementException(exception)
ResponseWithContext(request, Map("join_codec_fetch_exception" -> exception.traceString), Map.empty)

Expand Down Expand Up @@ -293,14 +299,15 @@ class Fetcher(val kvStore: KVStore,

val joinCodecTry = joinCodecCache(resp.request.name)

val loggingTry: Try[Unit] = joinCodecTry.map(codec => {
val metaData = codec.conf.join.metaData
val samplePercent = if (metaData.isSetSamplePercent) metaData.getSamplePercent else 0
val loggingTry: Try[Unit] = joinCodecTry
.map(codec => {
val metaData = codec.conf.join.metaData
val samplePercent = if (metaData.isSetSamplePercent) metaData.getSamplePercent else 0

if (samplePercent > 0)
encodeAndPublishLog(resp, ts, codec, samplePercent)
if (samplePercent > 0)
encodeAndPublishLog(resp, ts, codec, samplePercent)

})
})

loggingTry.failed.map { exception =>
// to handle GroupByServingInfo staleness that results in encoding failure
Expand All @@ -310,6 +317,10 @@ class Fetcher(val kvStore: KVStore,
_.incrementException(new RuntimeException(s"Logging failed due to: ${exception.traceString}", exception)))
}

if (joinCodecTry.isSuccess && joinCodecTry.get.hasPartialFailure) {
joinCodecCache.refresh(resp.request.name)
}

Response(resp.request, Success(resp.derivedValues))
}

Expand Down Expand Up @@ -390,6 +401,7 @@ class Fetcher(val kvStore: KVStore,
val joinName = request.name
val joinConfTry: Try[JoinOps] = metadataStore.getJoinConf(request.name)
if (joinConfTry.isFailure) {
metadataStore.getJoinConf.refresh(request.name)
resultMap.update(
request,
Failure(
Expand All @@ -412,6 +424,10 @@ class Fetcher(val kvStore: KVStore,
// step-2 dedup external requests across joins
val externalToJoinRequests: Seq[ExternalToJoinRequest] = validRequests
.flatMap { joinRequest =>
val joinConf = metadataStore.getJoinConf(joinRequest.name)
if (joinConf.isFailure) {
metadataStore.getJoinConf.refresh(joinRequest.name)
}
val parts =
metadataStore
.getJoinConf(joinRequest.name)
Expand Down Expand Up @@ -498,18 +514,22 @@ class Fetcher(val kvStore: KVStore,

val joinSchemaResponse = joinCodecTry
.map { joinCodec =>
JoinSchemaResponse(joinName,
joinCodec.keyCodec.schemaStr,
joinCodec.valueCodec.schemaStr,
joinCodec.loggingSchemaHash)
val response = JoinSchemaResponse(joinName,
joinCodec.keyCodec.schemaStr,
joinCodec.valueCodec.schemaStr,
joinCodec.loggingSchemaHash)
if (joinCodec.hasPartialFailure) {
joinCodecCache.refresh(joinName)
}
ctx.distribution("response.latency.millis", System.currentTimeMillis() - startTime)
response
}
.recover { case exception: Throwable =>
.recover { case exception =>
logger.error(s"Failed to fetch join schema for $joinName", exception)
ctx.incrementException(exception)
throw exception
}

joinSchemaResponse.foreach(_ => ctx.distribution("response.latency.millis", System.currentTimeMillis() - startTime))
joinSchemaResponse
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ class GroupByFetcher(fetchContext: FetchContext, metadataStore: MetadataStore)
*/
private def toLambdaKvRequest(request: Fetcher.Request): Try[LambdaKvRequest] = metadataStore
.getGroupByServingInfo(request.name)
.recover { case ex: Throwable =>
metadataStore.getGroupByServingInfo.refresh(request.name)
logger.error(s"Couldn't fetch GroupByServingInfo for ${request.name}", ex)
request.context.foreach(_.incrementException(ex))
throw ex
}
.map { groupByServingInfo =>
val context =
request.context.getOrElse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,16 @@ class JoinPartFetcher(fetchContext: FetchContext, metadataStore: MetadataStore)
val joinDecomposed: Seq[(Request, Try[Seq[Either[PrefixedRequest, KeyMissingException]]])] =
requests.map { request =>
// use passed-in join or fetch one
import ai.chronon.online.metrics
val joinTry: Try[JoinOps] = joinConf
.map(conf => Success(JoinOps(conf)))
.getOrElse(metadataStore.getJoinConf(request.name))
val joinTry: Try[JoinOps] = if (joinConf.isEmpty) {
val joinConfTry = metadataStore.getJoinConf(request.name)
if (joinConfTry.isFailure) {
metadataStore.getJoinConf.refresh(request.name)
}
joinConfTry
} else {
logger.debug(s"Using passed in join configuration: ${joinConf.get.metaData.getName}")
Success(JoinOps(joinConf.get))
Comment on lines +64 to +72
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refresh without retry can leak failures.
After refresh, the code still uses the original failed joinConfTry. Consider re-fetching or returning the refreshed value to avoid needless exceptions.

- if (joinConfTry.isFailure) {
-   metadataStore.getJoinConf.refresh(request.name)
- }
- joinConfTry
+ val freshTry = if (joinConfTry.isFailure) {
+   metadataStore.getJoinConf.refresh(request.name)
+   metadataStore.getJoinConf(request.name)
+ } else joinConfTry
+ freshTry
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val joinTry: Try[JoinOps] = if (joinConf.isEmpty) {
val joinConfTry = metadataStore.getJoinConf(request.name)
if (joinConfTry.isFailure) {
metadataStore.getJoinConf.refresh(request.name)
}
joinConfTry
} else {
logger.debug(s"Using passed in join configuration: ${joinConf.get.metaData.getName}")
Success(JoinOps(joinConf.get))
val joinTry: Try[JoinOps] = if (joinConf.isEmpty) {
val joinConfTry = metadataStore.getJoinConf(request.name)
- if (joinConfTry.isFailure) {
- metadataStore.getJoinConf.refresh(request.name)
- }
- joinConfTry
+ val freshTry = if (joinConfTry.isFailure) {
+ metadataStore.getJoinConf.refresh(request.name)
+ metadataStore.getJoinConf(request.name)
+ } else joinConfTry
+ freshTry
} else {
logger.debug(s"Using passed in join configuration: ${joinConf.get.metaData.getName}")
Success(JoinOps(joinConf.get))
}

}

var joinContext: Option[metrics.Metrics.Context] = None

Expand Down Expand Up @@ -163,7 +169,7 @@ class JoinPartFetcher(fetchContext: FetchContext, metadataStore: MetadataStore)
if (fetchContext.debug || Math.random() < 0.001) {
println(s"Failed to fetch $groupByRequest with \n${ex.traceString}")
}
Map(groupByRequest.name + "_exception" -> ex.traceString)
Map(prefix + "_exception" -> ex.traceString)
}
.get
}
Expand Down
105 changes: 69 additions & 36 deletions online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ class MetadataStore(fetchContext: FetchContext) {
if (result.isSuccess) metrics.Metrics.Context(metrics.Metrics.Environment.MetaDataFetching, result.get.join)
else metrics.Metrics.Context(metrics.Metrics.Environment.MetaDataFetching, join = name)
// Throw exception after metrics. No join metadata is bound to be a critical failure.
// This will ensure that a Failure is never cached in the getJoinConf TTLCache
if (result.isFailure) {
import ai.chronon.online.metrics
context.withSuffix("join").increment(metrics.Metrics.Name.Exception)
context.withSuffix("join").incrementException(result.failed.get)
throw result.failed.get
}
context
Expand Down Expand Up @@ -239,20 +239,56 @@ class MetadataStore(fetchContext: FetchContext) {
doRetrieveAllListConfs(new mutable.ArrayBuffer[String]())
}

private def buildJoinPartCodec(
joinPart: JoinPartOps,
servingInfo: GroupByServingInfoParsed): (Iterable[StructField], Iterable[StructField]) = {
val keySchema = servingInfo.keyCodec.chrononSchema.asInstanceOf[StructType]
val joinKeyFields = joinPart.leftToRight
.map { case (leftKey, rightKey) =>
StructField(leftKey, keySchema.fields.find(_.name == rightKey).get.fieldType)
}

Comment on lines +247 to +250
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Unsafe .get on missing right-key.
keySchema.fields.find(...).get will throw if the join mapping is out of sync with the group-by schema. Prefer safe lookup with explicit failure.

-        StructField(leftKey, keySchema.fields.find(_.name == rightKey).get.fieldType)
+        val fType = keySchema.fields.find(_.name == rightKey)
+          .getOrElse(throw new IllegalStateException(
+            s"Key $rightKey absent in schema for groupBy ${servingInfo.groupBy.metaData.getName}"))
+        StructField(leftKey, fType.fieldType)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
.map { case (leftKey, rightKey) =>
StructField(leftKey, keySchema.fields.find(_.name == rightKey).get.fieldType)
}
.map { case (leftKey, rightKey) =>
val fType = keySchema.fields.find(_.name == rightKey)
.getOrElse(throw new IllegalStateException(
s"Key $rightKey absent in schema for groupBy ${servingInfo.groupBy.metaData.getName}"))
StructField(leftKey, fType.fieldType)
}

val baseValueSchema: StructType = if (servingInfo.groupBy.aggregations == null) {
servingInfo.selectedChrononSchema
} else {
servingInfo.outputChrononSchema
}
val valueFields = if (!servingInfo.groupBy.hasDerivations) {
baseValueSchema.fields
} else {
buildDerivedFields(servingInfo.groupBy.derivationsScala, keySchema, baseValueSchema).toArray
}
val joinValueFields = valueFields.map(joinPart.constructJoinPartSchema)

(joinKeyFields, joinValueFields)
}

// key and value schemas
def buildJoinCodecCache(onCreateFunc: Option[Try[JoinCodec] => Unit]): TTLCache[String, Try[JoinCodec]] = {

val codecBuilder = { joinName: String =>
getJoinConf(joinName)
.map(_.join)
.map(buildJoinCodec)
.recoverWith { case th: Throwable =>
Failure(
new RuntimeException(
s"Couldn't fetch joinName = ${joinName} or build join codec due to ${th.traceString}",
th
))
val startTimeMs = System.currentTimeMillis()
val result: Try[JoinCodec] =
try {
getJoinConf(joinName)
.map(_.join)
.map(join => buildJoinCodec(join, refreshOnFail = true))
} catch {
case th: Throwable =>
getJoinConf.refresh(joinName)
Failure(
new RuntimeException(
s"Couldn't fetch joinName = ${joinName} or build join codec due to ${th.traceString}",
th
))
}
val context = Metrics.Context(Metrics.Environment.MetaDataFetching, join = joinName).withSuffix("join_codec")
if (result.isFailure) {
context.incrementException(result.failed.get)
} else {
context.distribution(Metrics.Name.LatencyMillis, System.currentTimeMillis() - startTimeMs)
}
result
}

new TTLCache[String, Try[JoinCodec]](
Expand All @@ -265,38 +301,32 @@ class MetadataStore(fetchContext: FetchContext) {
)
}

def buildJoinCodec(joinConf: Join): JoinCodec = {
def buildJoinCodec(joinConf: Join, refreshOnFail: Boolean): JoinCodec = {
val keyFields = new mutable.LinkedHashSet[StructField]
val valueFields = new mutable.ListBuffer[StructField]
var hasPartialFailure = false
// collect keyFields and valueFields from joinParts/GroupBys
joinConf.joinPartOps.foreach { joinPart =>
val servingInfoTry = getGroupByServingInfo(joinPart.groupBy.metaData.getName)
servingInfoTry
getGroupByServingInfo(joinPart.groupBy.metaData.getName)
.map { servingInfo =>
val keySchema = servingInfo.keyCodec.chrononSchema.asInstanceOf[StructType]
joinPart.leftToRight
.mapValues(right => keySchema.fields.find(_.name == right).get.fieldType)
.foreach { case (name, dType) =>
val keyField = StructField(name, dType)
keyFields.add(keyField)
}
val groupBySchemaBeforeDerivation: StructType =
if (servingInfo.groupBy.aggregations == null) {
servingInfo.selectedChrononSchema
val (keys, values) = buildJoinPartCodec(joinPart, servingInfo)
keys.foreach(k => keyFields.add(k))
values.foreach(v => valueFields.append(v))
}
.recoverWith {
case exception: Throwable => {
if (refreshOnFail) {
getGroupByServingInfo.refresh(joinPart.groupBy.metaData.getName)
hasPartialFailure = true
Success(())
} else {
servingInfo.outputChrononSchema
Failure(new Exception(
s"Failure to build join codec for join ${joinConf.metaData.name} due to bad groupBy serving info for ${joinPart.groupBy.metaData.name}",
exception))
}
val baseValueSchema: StructType = if (!servingInfo.groupBy.hasDerivations) {
groupBySchemaBeforeDerivation
} else {
val fields =
buildDerivedFields(servingInfo.groupBy.derivationsScala, keySchema, groupBySchemaBeforeDerivation)
StructType(s"groupby_derived_${servingInfo.groupBy.metaData.cleanName}", fields.toArray)
}
baseValueSchema.fields.foreach { sf =>
valueFields.append(joinPart.constructJoinPartSchema(sf))
}
}
.get
}

// gather key schema and value schema from external sources.
Expand Down Expand Up @@ -325,8 +355,7 @@ class MetadataStore(fetchContext: FetchContext) {
val keyCodec = AvroCodec.of(AvroConversions.fromChrononSchema(keySchema).toString)
val baseValueSchema = StructType(s"${joinName.sanitize}_value", valueFields.toArray)
val baseValueCodec = serde.AvroCodec.of(AvroConversions.fromChrononSchema(baseValueSchema).toString)
val joinCodec = JoinCodec(joinConf, keySchema, baseValueSchema, keyCodec, baseValueCodec)
joinCodec
JoinCodec(joinConf, keySchema, baseValueSchema, keyCodec, baseValueCodec, hasPartialFailure)
}

def getSchemaFromKVStore(dataset: String, key: String): serde.AvroCodec = {
Expand Down Expand Up @@ -366,6 +395,10 @@ class MetadataStore(fetchContext: FetchContext) {
}
logger.info(s"Fetched ${Constants.GroupByServingInfoKey} from : $batchDataset")
if (metaData.isFailure) {
Metrics
.Context(Metrics.Environment.MetaDataFetching, groupBy = name)
.withSuffix("group_by")
.incrementException(metaData.failed.get)
Failure(
new RuntimeException(s"Couldn't fetch group by serving info for $batchDataset, " +
"please make sure a batch upload was successful",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class FetcherBaseTest extends AnyFlatSpec with MockitoSugar with Matchers with M
)

val result = baseFetcher.parseGroupByResponse("prefix", request, response)
result.keySet shouldBe Set("name_exception")
result.keySet shouldBe Set("prefix_exception")
}

it should "check late batch data is handled correctly" in {
Expand Down
Loading