Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,41 +1,20 @@
package ai.chronon.integrations.aws

import ai.chronon.api.Constants
import ai.chronon.online.KVStore
import ai.chronon.online.KVStore.GetResponse
import ai.chronon.online.KVStore.ListRequest
import ai.chronon.online.KVStore.ListResponse
import ai.chronon.online.KVStore.ListValue
import ai.chronon.online.KVStore.TimedValue
import ai.chronon.online.Metrics
import ai.chronon.online.KVStore._
import ai.chronon.online.{KVStore, Metrics}
import ai.chronon.online.Metrics.Context
import com.google.common.util.concurrent.RateLimiter
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition
import software.amazon.awssdk.services.dynamodb.model.AttributeValue
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement
import software.amazon.awssdk.services.dynamodb.model.KeyType
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest
import software.amazon.awssdk.services.dynamodb.model.QueryRequest
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType
import software.amazon.awssdk.services.dynamodb.model.ScanRequest
import software.amazon.awssdk.services.dynamodb.model.ScanResponse
import software.amazon.awssdk.services.dynamodb.model._

import java.time.Instant
import java.util
import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.Future
import scala.jdk.CollectionConverters._
import scala.util.Success
import scala.util.Try
import scala.util.{Success, Try}

object DynamoDBKVStoreConstants {
// Read capacity units to configure DynamoDB table with
Expand Down
Original file line number Diff line number Diff line change
@@ -1,46 +1,27 @@
package ai.chronon.integrations.cloud_gcp

import ai.chronon.api.Extensions.GroupByOps
import ai.chronon.api.Extensions.StringOps
import ai.chronon.api.Extensions.WindowOps
import ai.chronon.api.Extensions.WindowUtils
import ai.chronon.api.GroupBy
import ai.chronon.api.MetaData
import ai.chronon.api.PartitionSpec
import ai.chronon.online.KVStore
import ai.chronon.online.KVStore.ListRequest
import ai.chronon.online.KVStore.ListResponse
import ai.chronon.online.KVStore.ListValue
import ai.chronon.online.Metrics
import ai.chronon.api.{GroupBy, MetaData, PartitionSpec}
import ai.chronon.api.Extensions.{GroupByOps, StringOps, WindowOps, WindowUtils}
import ai.chronon.fetcher.TileKey
import ai.chronon.online.{KVStore, Metrics}
import ai.chronon.online.KVStore.{GetRequest, ListRequest, ListResponse, ListValue}
import com.google.cloud.RetryOption
import com.google.cloud.bigquery.BigQuery
import com.google.cloud.bigquery.BigQueryErrorMessages
import com.google.cloud.bigquery.BigQueryRetryConfig
import com.google.cloud.bigquery.Job
import com.google.cloud.bigquery.JobId
import com.google.cloud.bigquery.JobInfo
import com.google.cloud.bigquery.QueryJobConfiguration
import com.google.cloud.bigquery._
import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest
import com.google.cloud.bigtable.admin.v2.models.GCRules
import com.google.cloud.bigtable.admin.v2.models.{CreateTableRequest, GCRules}
import com.google.cloud.bigtable.data.v2.BigtableDataClient
import com.google.cloud.bigtable.data.v2.models.Filters
import com.google.cloud.bigtable.data.v2.models.Query
import com.google.cloud.bigtable.data.v2.models.{Filters, Query, RowMutation, TableId => BTTableId}
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange
import com.google.cloud.bigtable.data.v2.models.RowMutation
import com.google.cloud.bigtable.data.v2.models.{TableId => BTTableId}
import com.google.protobuf.ByteString
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.{Logger, LoggerFactory}
import org.threeten.bp.Duration

import java.nio.charset.Charset
import scala.collection.JavaConverters._
import scala.compat.java8.FutureConverters
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.util.Failure
import scala.util.Success
import scala.util.{Failure, Success}

/**
* BigTable based KV store implementation. We store a few kinds of data in our KV store:
Expand Down Expand Up @@ -170,17 +151,6 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,
Future.sequence(resultFutures)
}

// we currently use only one batch table for all batch time series datasets and one streaming table for all streaming ts datasets
private def mapDatasetToTable(dataset: String): BTTableId = {
if (dataset.endsWith("_BATCH")) {
BTTableId.of("GROUPBY_BATCH")
} else if (dataset.endsWith("_STREAMING")) {
BTTableId.of("GROUPBY_STREAMING")
} else {
BTTableId.of(dataset)
}
}

// time series datasets have a day timestamp appended to the row key to ensure that time series points across different
// days are split across rows
private def isTimeSeriesDataset(dataset: String): Boolean =
Expand Down Expand Up @@ -278,7 +248,7 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,

val rowKey = request.tsMillis match {
case Some(ts) if isTimeSeriesDataset(request.dataset) =>
buildRowKey(request.keyBytes, request.dataset, Some(ts))
buildRowKey(request.keyBytes, request.dataset, Option(ts))
case _ =>
buildRowKey(request.keyBytes, request.dataset)
}
Expand Down Expand Up @@ -398,6 +368,7 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,
}

object BigTableKVStore {

// continuation key to help with list pagination
val continuationKey: String = "continuationKey"

Expand All @@ -407,6 +378,24 @@ object BigTableKVStore {
// Default list limit
val defaultListLimit: Int = 100

/**
* row key (with tiling) convention:
* <dataset>#<entity-key>#<start_date>#<tile_size>
*
* row key (without tiling) convention:
* <dataset>#<entity_key>#<start_date>
* @param tileKey
* @return
*/
def buildTiledRowKey(dataset: String,
baseKeyBytes: Array[Byte],
startTimestampMs: Long,
tileSizeMs: Long): Array[Byte] = {

s"${dataset}#".getBytes(Charset.forName("UTF-8")) ++ baseKeyBytes ++ s"#${startTimestampMs}#${tileSizeMs}"
.getBytes(Charset.forName("UTF-8"))
}

// We prefix the dataset name to the key to ensure we can have multiple datasets in the same table
def buildRowKey(baseKeyBytes: Array[Byte], dataset: String, maybeTs: Option[Long] = None): Array[Byte] = {
val baseRowKey = s"$dataset#".getBytes(Charset.forName("UTF-8")) ++ baseKeyBytes
Expand All @@ -420,6 +409,50 @@ object BigTableKVStore {
}
}

def tileQueryColumnBased(query: Query, startTs: Long, endTs: Long) = {}

def tileQueryCellBased(query: Query, startTs: Long, endTs: Long) = {}
Comment on lines +412 to +414
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Empty method implementations

tileQueryColumnBased and tileQueryCellBased are empty. Either implement them or mark as TODO.


def mapDatasetToTable(dataset: String): BTTableId = {
if (dataset.endsWith("_BATCH")) {
BTTableId.of("GROUPBY_BATCH")
} else if (dataset.endsWith("_STREAMING")) {
BTTableId.of("GROUPBY_STREAMING")
} else {
BTTableId.of(dataset)
}
}

def requestToTiledQuery(getRequest: GetRequest): Query = {
val keyBytes = getRequest.keyBytes
val dataset = getRequest.dataset
val startTs = getRequest.startTsMillis.getOrElse(0L)
val endTs = getRequest.endTsMillis.getOrElse(System.currentTimeMillis())

val startRow = buildTiledRowKey(dataset, keyBytes, startTs, 1.day.toMillis)
val endRow = buildTiledRowKey(dataset, keyBytes, endTs, 1.day.toMillis)

Query
.create(mapDatasetToTable(dataset))
.filter(Filters.FILTERS.family().exactMatch(ColumnFamilyString))
.filter(Filters.FILTERS.qualifier().exactMatch(ColumnFamilyQualifierString))
.range(
ByteStringRange.unbounded().startClosed(ByteString.copyFrom(startRow)).endOpen(ByteString.copyFrom(endRow)))
.filter(
Filters.FILTERS
.timestamp()
.range()
.startClosed((endTs - (endTs % 1.day.toMillis)) * 1000)
.endClosed(endTs * 1000))

}

def computeTiles(dataset: String, keyBytes: Array[Byte], startTsInDay: Long, endTsInDay: Long): Seq[TileKey] = {
val dummyTile =
new TileKey().setDataset(dataset).setKeyBytes(keyBytes.map(java.lang.Byte.valueOf).toList.asJava)
List(dummyTile)
}

val ColumnFamilyString: String = "cf"
val ColumnFamilyQualifierString: String = "value"
val ColumnFamilyQualifier: ByteString = ByteString.copyFromUtf8(ColumnFamilyQualifierString)
Expand Down
1 change: 1 addition & 0 deletions flink/src/main/scala/ai/chronon/flink/FlinkJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class FlinkJob[T](eventSrc: FlinkSource[T],
* (with the above 4 operators and parallelism as injected by the user).
*/
def runGroupByJob(env: StreamExecutionEnvironment): DataStream[WriteResponse] = {

logger.info(
f"Running Flink job for groupByName=${groupByName}, Topic=${topic}. " +
"Tiling is disabled.")
Expand Down
25 changes: 7 additions & 18 deletions online/src/main/scala/ai/chronon/online/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,22 @@

package ai.chronon.online

import ai.chronon.api.Constants
import ai.chronon.api.StructType
import ai.chronon.online.KVStore.GetRequest
import ai.chronon.online.KVStore.GetResponse
import ai.chronon.online.KVStore.ListRequest
import ai.chronon.online.KVStore.ListResponse
import ai.chronon.online.KVStore.PutRequest
import ai.chronon.api.{Constants, StructType}
import ai.chronon.online.KVStore._
import org.apache.spark.sql.SparkSession
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.slf4j.{Logger, LoggerFactory}

import java.nio.charset.StandardCharsets
import java.util.Base64
import java.util.function.Consumer
import scala.collection.Seq
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.MILLISECONDS
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.{Duration, MILLISECONDS}
import scala.util.{Failure, Success, Try}

object KVStore {
// a scan request essentially for the keyBytes
// afterTsMillis - is used to limit the scan to more recent data
// startTsMillis - is used to limit the scan to more recent data
// endTsMillis - end range of the scan (starts from afterTsMillis to endTsMillis)
case class GetRequest(keyBytes: Array[Byte],
dataset: String,
Expand Down
33 changes: 9 additions & 24 deletions online/src/main/scala/ai/chronon/online/FetcherBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,22 @@ package ai.chronon.online

import ai.chronon.aggregator.row.ColumnAggregator
import ai.chronon.aggregator.windowing
import ai.chronon.aggregator.windowing.FinalBatchIr
import ai.chronon.aggregator.windowing.SawtoothOnlineAggregator
import ai.chronon.aggregator.windowing.TiledIr
import ai.chronon.api.Constants.MetadataDataset
import ai.chronon.api.Extensions.GroupByOps
import ai.chronon.api.Extensions.JoinOps
import ai.chronon.api.Extensions.ThrowableOps
import ai.chronon.aggregator.windowing.{FinalBatchIr, SawtoothOnlineAggregator, TiledIr}
import ai.chronon.api._
import ai.chronon.online.Fetcher.ColumnSpec
import ai.chronon.online.Fetcher.PrefixedRequest
import ai.chronon.online.Fetcher.Request
import ai.chronon.online.Fetcher.Response
import ai.chronon.online.FetcherCache.BatchResponses
import ai.chronon.online.FetcherCache.CachedBatchResponse
import ai.chronon.online.FetcherCache.KvStoreBatchResponse
import ai.chronon.online.KVStore.GetRequest
import ai.chronon.online.KVStore.GetResponse
import ai.chronon.online.KVStore.TimedValue
import ai.chronon.api.Constants.MetadataDataset
import ai.chronon.api.Extensions.{GroupByOps, JoinOps, ThrowableOps}
import ai.chronon.online.Fetcher.{ColumnSpec, PrefixedRequest, Request, Response}
import ai.chronon.online.FetcherCache.{BatchResponses, CachedBatchResponse, KvStoreBatchResponse}
import ai.chronon.online.KVStore.{GetRequest, GetResponse, TimedValue}
import ai.chronon.online.Metrics.Name
import ai.chronon.online.OnlineDerivationUtil.applyDeriveFunc
import ai.chronon.online.OnlineDerivationUtil.buildRenameOnlyDerivationFunction
import ai.chronon.online.OnlineDerivationUtil.{applyDeriveFunc, buildRenameOnlyDerivationFunction}
import com.google.gson.Gson

import java.util
import scala.collection.JavaConverters._
import scala.collection.Seq
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

// Does internal facing fetching
// 1. takes join request or groupBy requests
Expand Down
Loading