Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ai.chronon.integrations.aws
import ai.chronon.online._
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
import ai.chronon.online.serde._

import java.net.URI

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ai.chronon.online.FlagStoreConstants
import ai.chronon.online.GroupByServingInfoParsed
import ai.chronon.online.KVStore
import ai.chronon.online.LoggableResponse
import ai.chronon.online.Serde
import ai.chronon.online.serde.Serde
import ai.chronon.online.serde.AvroSerde
import com.google.api.gax.core.{InstantiatingExecutorProvider, NoCredentialsProvider}
import com.google.cloud.bigquery.BigQueryOptions
Expand Down
2 changes: 1 addition & 1 deletion flink/src/main/scala/ai/chronon/flink/AvroCodecFn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import ai.chronon.api.TilingUtils
import ai.chronon.api.{StructType => ChrononStructType}
import ai.chronon.flink.types.AvroCodecOutput
import ai.chronon.flink.types.TimestampedTile
import ai.chronon.online.AvroConversions
import ai.chronon.online.serde.AvroConversions
import ai.chronon.online.GroupByServingInfoParsed
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.configuration.Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import ai.chronon.api.Query
import ai.chronon.api.{StructType => ChrononStructType}
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.online.CatalystUtil
import ai.chronon.online.SparkConversions
import ai.chronon.online.serde.SparkConversions
import com.codahale.metrics.ExponentiallyDecayingReservoir
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper
import org.apache.flink.metrics.{Counter, Histogram, MetricGroup}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.apache.spark.sql.avro

import ai.chronon.api.{DataType, GroupBy}
import ai.chronon.flink.{ChrononDeserializationSchema, SourceProjection, SparkExpressionEval}
import ai.chronon.online.SparkConversions
import ai.chronon.online.serde.SparkConversions
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.metrics.Counter
import org.apache.flink.util.Collector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import ai.chronon.flink.{FlinkJob, SparkExpressionEval, SparkExpressionEvalFn}
import ai.chronon.flink.types.TimestampedIR
import ai.chronon.flink.types.TimestampedTile
import ai.chronon.flink.types.WriteResponse
import ai.chronon.online.{Api, GroupByServingInfoParsed, SparkConversions}
import ai.chronon.online.{Api, GroupByServingInfoParsed}
import ai.chronon.online.serde.SparkConversions
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.test.util.MiniClusterWithClientResource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.apache.spark.sql.avro

import ai.chronon.api.ScalaJavaConversions.ListOps
import ai.chronon.flink.test.UserAvroSchema
import ai.chronon.online.SparkConversions
import ai.chronon.online.serde.SparkConversions
import org.scalatest.flatspec.AnyFlatSpec
import org.apache.flink.api.common.functions.util.ListCollector

Expand Down
16 changes: 16 additions & 0 deletions online/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@ scala_library(
],
)

scala_library(
name = "serde_lib",
srcs = glob(["src/main/scala/ai/chronon/online/serde/*.scala"]),
format = select({
"//tools/config:scala_2_13": False, # Disable for 2.13
"//conditions:default": True, # Enable for other versions
}),
visibility = ["//visibility:public"],
deps = [
"//api:lib",
"//api:thrift_java",
maven_artifact("org.apache.avro:avro"),
"//tools/build_rules/spark:spark-exec",
],
)

scala_library(
name = "lib",
srcs = glob(["src/main/**/*.scala"]) + glob(["src/main/**/*.java"]),
Expand Down
59 changes: 5 additions & 54 deletions online/src/main/scala/ai/chronon/online/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,19 @@
package ai.chronon.online

import ai.chronon.api.Constants
import ai.chronon.api.StructType
import ai.chronon.online.KVStore._
import ai.chronon.online.fetcher.Fetcher
import org.apache.spark.sql.SparkSession
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.{Logger, LoggerFactory}
import ai.chronon.online.serde._

import java.nio.charset.StandardCharsets
import java.util.Base64
import java.util.function.Consumer
import scala.collection.Seq
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.MILLISECONDS
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.concurrent.duration.{Duration, MILLISECONDS}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

object KVStore {
// a scan request essentially for the keyBytes
Expand Down Expand Up @@ -145,40 +139,6 @@ object StringArrayConverter {
encodedString.split(",").map(s => new String(Base64.getDecoder.decode(s), StandardCharsets.UTF_8))
}
}

/** ==== MUTATION vs. EVENT ====
* Mutation is the general case of an Event
* Imagine a user impression/view stream - impressions/views are immutable events
* Imagine a stream of changes to a credit card transaction stream.
* - transactions can be "corrected"/updated & deleted, besides being "inserted"
* - This is one of the core difference between entity and event sources. Events are insert-only.
* - (The other difference is Entites are stored in the warehouse typically as snapshots of the table as of midnight)
* In case of an update - one must produce both before and after values
* In case of a delete - only before is populated & after is left as null
* In case of a insert - only after is populated & before is left as null
*
* ==== TIME ASSUMPTIONS ====
* The schema needs to contain a `ts`(milliseconds as a java Long)
* For the entities case, `mutation_ts` when absent will use `ts` as a replacement
*
* ==== TYPE CONVERSIONS ====
* Java types corresponding to the schema types. [[Serde]] should produce mutations that comply.
* NOTE: everything is nullable (hence boxed)
* IntType java.lang.Integer
* LongType java.lang.Long
* DoubleType java.lang.Double
* FloatType java.lang.Float
* ShortType java.lang.Short
* BooleanType java.lang.Boolean
* ByteType java.lang.Byte
* StringType java.lang.String
* BinaryType Array[Byte]
* ListType java.util.List[Byte]
* MapType java.util.Map[Byte]
* StructType Array[Any]
*/
case class Mutation(schema: StructType = null, before: Array[Any] = null, after: Array[Any] = null)

case class LoggableResponse(keyBytes: Array[Byte],
valueBytes: Array[Byte],
joinName: String,
Expand All @@ -191,15 +151,6 @@ case class LoggableResponseBase64(keyBase64: String,
tsMillis: Long,
schemaHash: String)

abstract class Serde extends Serializable {
def fromBytes(bytes: Array[Byte]): Mutation
def schema: StructType
def toBytes(mutation: Mutation): Array[Byte] = {
// not implemented
throw new UnsupportedOperationException("toBytes not implemented")
}
}

trait StreamBuilder {
def from(topicInfo: TopicInfo)(implicit session: SparkSession, props: Map[String, String]): DataStream
}
Expand Down
13 changes: 5 additions & 8 deletions online/src/main/scala/ai/chronon/online/CatalystUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,16 @@

package ai.chronon.online

import ai.chronon.api.DataType
import ai.chronon.api.StructType
import ai.chronon.online.CatalystUtil.PoolKey
import ai.chronon.online.CatalystUtil.poolMap
import ai.chronon.api.{DataType, StructType}
import ai.chronon.online.CatalystUtil.{PoolKey, poolMap}
import ai.chronon.online.Extensions.StructTypeOps
import org.apache.spark.sql.SparkSession
import ai.chronon.online.serde._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.FunctionAlreadyExistsException
import org.apache.spark.sql.types
import org.apache.spark.sql.{SparkSession, types}
import org.slf4j.LoggerFactory

import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
import java.util.function
import scala.collection.Seq

Expand Down
1 change: 1 addition & 0 deletions online/src/main/scala/ai/chronon/online/Extensions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package ai.chronon.online

import ai.chronon.api
import ai.chronon.online.serde._
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,14 @@
package ai.chronon.online

import ai.chronon.aggregator.windowing.SawtoothOnlineAggregator
import ai.chronon.api.Constants.ReversalField
import ai.chronon.api.Constants.TimeField
import ai.chronon.api.Extensions.GroupByOps
import ai.chronon.api.Extensions.MetadataOps
import ai.chronon.api.Constants.{ReversalField, TimeField}
import ai.chronon.api.Extensions.{GroupByOps, MetadataOps}
import ai.chronon.api.ScalaJavaConversions.ListOps
import ai.chronon.api._
import ai.chronon.online.OnlineDerivationUtil.DerivationFunc
import ai.chronon.online.OnlineDerivationUtil.buildDerivationFunction
import ai.chronon.online.serde.AvroCodec

import ai.chronon.online.OnlineDerivationUtil.{DerivationFunc, buildDerivationFunction}
import ai.chronon.online.serde._
import org.apache.avro.Schema

import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.Seq

// mixin class - with schema
Expand Down
2 changes: 1 addition & 1 deletion online/src/main/scala/ai/chronon/online/JoinCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import ai.chronon.online.OnlineDerivationUtil.DerivationFunc
import ai.chronon.online.OnlineDerivationUtil.buildDerivationFunction
import ai.chronon.online.OnlineDerivationUtil.buildDerivedFields
import ai.chronon.online.OnlineDerivationUtil.buildRenameOnlyDerivationFunction
import ai.chronon.online.serde.AvroCodec
import ai.chronon.online.serde._

import com.google.gson.Gson

Expand Down
2 changes: 1 addition & 1 deletion online/src/main/scala/ai/chronon/online/TileCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import ai.chronon.api.Extensions.WindowUtils
import ai.chronon.api.GroupBy
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.api.StructType
import ai.chronon.online.serde.AvroCodec
import ai.chronon.online.serde._
import org.apache.avro.generic.GenericData

import scala.collection.JavaConverters._
Expand Down
5 changes: 3 additions & 2 deletions online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import ai.chronon.online.OnlineDerivationUtil.applyDeriveFunc
import ai.chronon.online._
import ai.chronon.online.fetcher.Fetcher.{JoinSchemaResponse, Request, Response, ResponseWithContext}
import ai.chronon.online.metrics.{Metrics, TTLCache}
import ai.chronon.online.serde._
import com.google.gson.Gson
import com.timgroup.statsd.Event
import com.timgroup.statsd.Event.AlertType
Expand Down Expand Up @@ -221,12 +222,12 @@ class Fetcher(val kvStore: KVStore,
}

private def encode(schema: StructType,
codec: serde.AvroCodec,
codec: AvroCodec,
dataMap: Map[String, AnyRef],
cast: Boolean = false,
tries: Int = 3): Array[Byte] = {
def encodeOnce(schema: StructType,
codec: serde.AvroCodec,
codec: AvroCodec,
dataMap: Map[String, AnyRef],
cast: Boolean = false): Array[Byte] = {
val data = schema.fields.map { case StructField(name, typ) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import ai.chronon.aggregator.windowing.{FinalBatchIr, SawtoothOnlineAggregator,
import ai.chronon.api.Extensions.WindowOps
import ai.chronon.api.ScalaJavaConversions.{IteratorOps, JMapOps}
import ai.chronon.api.{DataModel, Row, Window}
import ai.chronon.online.{AvroConversions, GroupByServingInfoParsed}
import ai.chronon.online.serde.AvroConversions
import ai.chronon.online.GroupByServingInfoParsed
import ai.chronon.online.KVStore.TimedValue
import ai.chronon.online.metrics.Metrics.Name
import ai.chronon.online.fetcher.FetcherCache.{BatchResponses, CachedBatchResponse, KvStoreBatchResponse}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import ai.chronon.online.KVStore.{ListRequest, ListResponse, PutRequest}
import ai.chronon.online.MetadataEndPoint.NameByTeamEndPointName
import ai.chronon.online.OnlineDerivationUtil.buildDerivedFields
import ai.chronon.online._
import ai.chronon.online.serde.AvroCodec
import ai.chronon.online.serde._
import ai.chronon.online.metrics.{Metrics, TTLCache}
import org.slf4j.{Logger, LoggerFactory}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package ai.chronon.online.serde

import ai.chronon.api.{DataType, Row}
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.online.AvroConversions
import org.apache.avro.Schema
import org.apache.avro.Schema.Field
import org.apache.avro.file.SeekableByteArrayInput
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,19 @@
* limitations under the License.
*/

package ai.chronon.online
package ai.chronon.online.serde

import ai.chronon.api._
import org.apache.avro.LogicalTypes
import org.apache.avro.Schema
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Schema.Field
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.util.Utf8

import java.nio.ByteBuffer
import java.util
import scala.annotation.tailrec
import scala.collection.AbstractIterator
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.{AbstractIterator, mutable}

object AvroConversions {

Expand Down Expand Up @@ -183,7 +180,7 @@ object AvroConversions {
}

def encodeBytes(schema: StructType, extraneousRecord: Any => Array[Any] = null): Any => Array[Byte] = {
val codec: serde.AvroCodec = new serde.AvroCodec(fromChrononSchema(schema).toString(true));
val codec: AvroCodec = new AvroCodec(fromChrononSchema(schema).toString(true));
{ data: Any =>
val record =
fromChrononRow(data, codec.chrononSchema, codec.schema, extraneousRecord).asInstanceOf[GenericData.Record]
Expand All @@ -193,7 +190,7 @@ object AvroConversions {
}

def encodeJson(schema: StructType, extraneousRecord: Any => Array[Any] = null): Any => String = {
val codec: serde.AvroCodec = new serde.AvroCodec(fromChrononSchema(schema).toString(true));
val codec: AvroCodec = new AvroCodec(fromChrononSchema(schema).toString(true));
{ data: Any =>
val record =
fromChrononRow(data, codec.chrononSchema, codec.schema, extraneousRecord).asInstanceOf[GenericData.Record]
Expand Down
Loading