Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ object DailyResolution extends Resolution {
object ResolutionUtils {

/**
* Find the smallest tail window resolution in a GroupBy. Returns None if the GroupBy does not define any windows.
* Find the smallest tail window resolution in a GroupBy. Returns 1D if the GroupBy does not define any windows (all-time aggregates).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you expand on this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so if a user creates a GroupBy with no windows we end up defaulting to all time windows in Chronon. Currently the Flink code will throw an error for this scenario (we return None here and in FlinkJob when we call .get it errors out). Instead of failing we go with a 1D tile size as thats the best option to help us compute the all time window (batch has the rest)

* The window resolutions are: 5 min for a GroupBy a window < 12 hrs, 1 hr for < 12 days, 1 day for > 12 days.
* */
def getSmallestWindowResolutionInMillis(groupBy: GroupBy): Option[Long] =
def getSmallestWindowResolutionInMillis(groupBy: GroupBy): Long =
Option(
groupBy.aggregations.toScala.toArray
.flatMap(aggregation =>
if (aggregation.windows != null) aggregation.windows.toScala
else None)
.map(FiveMinuteResolution.calculateTailHop)
).filter(_.nonEmpty).map(_.min)
).filter(_.nonEmpty).map(_.min).getOrElse(WindowUtils.Day.millis)
}
7 changes: 7 additions & 0 deletions api/src/main/scala/ai/chronon/api/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ object Extensions {
s"${millis}ms"
}
}

// Returns the start of the window that contains the timestamp
// As an example consider a 1hr window: 3600 * 1000. If the timestamp is 1735733820000 (2025-01-01 12:17:00)
// the start of the window is 1735732800000 (2025-01-01 12:00:00)
def windowStartMillis(timestampMs: Long, windowSizeMs: Long): Long = {
timestampMs - (timestampMs % windowSizeMs)
}
}

implicit class MetadataOps(metaData: MetaData) {
Expand Down
21 changes: 17 additions & 4 deletions flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package ai.chronon.flink
import ai.chronon.api.Constants
import ai.chronon.api.DataModel
import ai.chronon.api.Extensions.GroupByOps
import ai.chronon.api.Extensions.WindowUtils
import ai.chronon.api.Query
import ai.chronon.api.TilingUtils
import ai.chronon.api.{StructType => ChrononStructType}
import ai.chronon.fetcher.TileKey
import ai.chronon.flink.types.AvroCodecOutput
import ai.chronon.flink.types.TimestampedTile
import ai.chronon.online.AvroConversions
Expand Down Expand Up @@ -126,9 +129,10 @@ case class AvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed)
* that can be written out to the KV store (PutRequest object).
*
* @param groupByServingInfoParsed The GroupBy we are working with
* @param tilingWindowSizeMs The size of the tiling window in milliseconds
* @tparam T The input data type
*/
case class TiledAvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed)
case class TiledAvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParsed, tilingWindowSizeMs: Long)
extends BaseAvroCodecFn[TimestampedTile, AvroCodecOutput] {
override def open(configuration: Configuration): Unit = {
super.open(configuration)
Expand Down Expand Up @@ -158,18 +162,27 @@ case class TiledAvroCodecFn[T](groupByServingInfoParsed: GroupByServingInfoParse
// 'keys' is a map of (key name in schema -> key value), e.g. Map("card_number" -> "4242-4242-4242-4242")
// We convert to AnyRef because Chronon expects an AnyRef (for scala <> java interoperability reasons).
val keys: Map[String, AnyRef] = keyColumns.zip(in.keys.map(_.asInstanceOf[AnyRef])).toMap
val keyBytes = keyToBytes(in.keys.toArray)
val entityKeyBytes = keyToBytes(in.keys.toArray)

val tileKey = new TileKey()
val tileStart = WindowUtils.windowStartMillis(tsMills, tilingWindowSizeMs)
tileKey.setDataset(streamingDataset)
tileKey.setKeyBytes(entityKeyBytes.toList.asJava.asInstanceOf[java.util.List[java.lang.Byte]])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be better to box individually? eg.

keyBytes.map(java.lang.Byte.valueOf).toList.asJava

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with this as I thought it would be better to skip iterating over all the bytes in the key bytes list given Scala and Java bytes are equivalent - so we can just do a constant time operation of casting the top level types.
Seems to work in unit tests. Wdyt?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay I thought it would have been an Exception but if it works in UT's that's good.

tileKey.setTileSizeMillis(tilingWindowSizeMs)
tileKey.setTileStartTimestampMillis(tileStart)

val valueBytes = in.tileBytes

logger.debug(
s"""
|Avro converting tile to PutRequest - tile=${in}
|groupBy=${groupByServingInfoParsed.groupBy.getMetaData.getName} tsMills=$tsMills keys=$keys
|keyBytes=${java.util.Base64.getEncoder.encodeToString(keyBytes)}
|keyBytes=${java.util.Base64.getEncoder.encodeToString(entityKeyBytes)}
|valueBytes=${java.util.Base64.getEncoder.encodeToString(valueBytes)}
|streamingDataset=$streamingDataset""".stripMargin
)

new AvroCodecOutput(keyBytes, valueBytes, streamingDataset, tsMills)
val tileKeyBytes = TilingUtils.serializeTileKey(tileKey)
new AvroCodecOutput(tileKeyBytes, valueBytes, streamingDataset, tsMills)
}
}
6 changes: 3 additions & 3 deletions flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class FlinkJob[T](eventSrc: FlinkSource[T],
f"Running Flink job for groupByName=${groupByName}, Topic=${topic}. " +
"Tiling is enabled.")

val tilingWindowSizeInMillis: Option[Long] =
val tilingWindowSizeInMillis: Long =
ResolutionUtils.getSmallestWindowResolutionInMillis(groupByServingInfoParsed.groupBy)

// we expect parallelism on the source stream to be set by the source provider
Expand Down Expand Up @@ -180,7 +180,7 @@ class FlinkJob[T](eventSrc: FlinkSource[T],
.toSeq

val window = TumblingEventTimeWindows
.of(Time.milliseconds(tilingWindowSizeInMillis.get))
.of(Time.milliseconds(tilingWindowSizeInMillis))
.asInstanceOf[WindowAssigner[Map[String, Any], TimeWindow]]

// An alternative to AlwaysFireOnElementTrigger can be used: BufferedProcessingTimeTrigger.
Expand Down Expand Up @@ -225,7 +225,7 @@ class FlinkJob[T](eventSrc: FlinkSource[T],
.setParallelism(sourceStream.getParallelism)

val putRecordDS: DataStream[AvroCodecOutput] = tilingDS
.flatMap(new TiledAvroCodecFn[T](groupByServingInfoParsed))
.flatMap(new TiledAvroCodecFn[T](groupByServingInfoParsed, tilingWindowSizeInMillis))
.uid(s"avro-conversion-01-$groupByName")
.name(s"Avro conversion for $groupByName")
.setParallelism(sourceStream.getParallelism)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ai.chronon.flink.test

import ai.chronon.api.TilingUtils
import ai.chronon.flink.FlinkJob
import ai.chronon.flink.SparkExpressionEvalFn
import ai.chronon.flink.types.TimestampedIR
Expand Down Expand Up @@ -34,7 +35,10 @@ class FlinkJobIntegrationTest extends AnyFlatSpec with BeforeAndAfter {
): TimestampedTile = {
// Decode the key bytes into a GenericRecord
val tileBytes = in.valueBytes
val record = groupByServingInfoParsed.keyCodec.decode(in.keyBytes)
// Deserialize the TileKey object and pull out the entity key bytes
val tileKey = TilingUtils.deserializeTileKey(in.keyBytes)
val keyBytes = tileKey.keyBytes.asScala.toArray.map(_.asInstanceOf[Byte])
val record = groupByServingInfoParsed.keyCodec.decode(keyBytes)

// Get all keys we expect to be in the GenericRecord
val decodedKeys: List[String] =
Expand Down