Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import ai.chronon.online.KVStore.ListRequest
import ai.chronon.online.KVStore.ListResponse
import ai.chronon.online.KVStore.ListValue
import ai.chronon.online.metrics.Metrics
import com.google.api.core.ApiFuture
import com.google.api.core.{ApiFuture, ApiFutures}
import com.google.cloud.RetryOption
import com.google.cloud.bigquery.BigQuery
import com.google.cloud.bigquery.BigQueryErrorMessages
Expand All @@ -36,6 +36,9 @@ import org.slf4j.LoggerFactory
import org.threeten.bp.Duration

import java.nio.charset.Charset
import java.util
import scala.collection.concurrent.TrieMap
import scala.collection.mutable.ArrayBuffer
import scala.compat.java8.FutureConverters
import scala.concurrent.Future
import scala.concurrent.duration._
Expand Down Expand Up @@ -88,6 +91,8 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,

protected val metricsContext: Metrics.Context = Metrics.Context(Metrics.Environment.KVStore).withSuffix("bigtable")

protected val tableToContext = new TrieMap[String, Metrics.Context]()

override def create(dataset: String): Unit = create(dataset, Map.empty)

override def create(dataset: String, props: Map[String, Any]): Unit = {
Expand All @@ -114,8 +119,7 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,

case e: Exception =>
logger.error("Error creating table", e)
metricsContext.increment("create.failures", s"exception:${e.getClass.getName}")

metricsContext.increment("create.failures", Map("exception" -> e.getClass.getName))
}
}
.orElse(throw new IllegalStateException("Missing BigTable admin client. Is the ENABLE_UPLOAD_CLIENTS flag set?"))
Expand All @@ -136,9 +140,15 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,
private def readRowsMultiGet(
requestsByDataset: Map[String, Seq[KVStore.GetRequest]]): Seq[Future[Seq[KVStore.GetResponse]]] = {
requestsByDataset.map { case (dataset, datasetRequests) =>
val targetId = mapDatasetToTable(dataset)
val datasetMetricsContext = tableToContext.getOrElseUpdate(
targetId.toString,
metricsContext.copy(dataset = targetId.toString)
)

// Create a single query for all requests in this dataset
val query = Query
.create(mapDatasetToTable(dataset))
.create(targetId)
.filter(Filters.FILTERS.family().exactMatch(ColumnFamilyString))
.filter(Filters.FILTERS.qualifier().exactMatch(ColumnFamilyQualifierString))

Expand Down Expand Up @@ -184,8 +194,8 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,
// Process all results at once
scalaResultFuture
.map { rows =>
metricsContext.distribution("multiGet.latency", System.currentTimeMillis() - startTs, s"dataset:$dataset")
metricsContext.increment("multiGet.successes", s"dataset:$dataset")
datasetMetricsContext.distribution("multiGet.latency", System.currentTimeMillis() - startTs)
datasetMetricsContext.increment("multiGet.successes")

// Create a map for quick lookup by row key
val rowKeyToRowMap = rows.asScala.map(row => row.getKey() -> row).toMap
Expand All @@ -206,7 +216,7 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,
}
.recover { case e: Exception =>
logger.error("Error getting values", e)
metricsContext.increment("multiGet.bigtable_errors", s"exception:${e.getClass.getName},dataset:$dataset")
datasetMetricsContext.increment("multiGet.bigtable_errors", Map("exception" -> e.getClass.getName))
// If the batch fails, return failures for all requests in the batch
datasetRequests.map { request =>
KVStore.GetResponse(request, Failure(e))
Expand Down Expand Up @@ -255,8 +265,13 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,
val maybeListEntityType = request.props.get(ListEntityType)
val maybeStartKey = request.props.get(ContinuationKey)

val targetId = mapDatasetToTable(request.dataset)
val datasetMetricsContext = tableToContext.getOrElseUpdate(
targetId.toString,
metricsContext.copy(dataset = targetId.toString)
)
val query = Query
.create(mapDatasetToTable(request.dataset))
.create(targetId)
.filter(Filters.FILTERS.family().exactMatch(ColumnFamilyString))
.filter(Filters.FILTERS.qualifier().exactMatch(ColumnFamilyQualifierString))
// we also limit to the latest cell per row as we don't want clients to iterate over all prior edits
Expand All @@ -280,8 +295,8 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,

rowsScalaFuture
.map { rows =>
metricsContext.distribution("list.latency", System.currentTimeMillis() - startTs, s"dataset:${request.dataset}")
metricsContext.increment("list.successes", s"dataset:${request.dataset}")
datasetMetricsContext.distribution("list.latency", System.currentTimeMillis() - startTs)
datasetMetricsContext.increment("list.successes")

val listValues = rows.asScala.flatMap { row =>
row.getCells(ColumnFamilyString, ColumnFamilyQualifier).asScala.map { cell =>
Expand All @@ -300,7 +315,7 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,
}
.recover { case e: Exception =>
logger.error("Error listing values", e)
metricsContext.increment("list.bigtable_errors", s"exception:${e.getClass.getName},dataset:${request.dataset}")
datasetMetricsContext.increment("list.bigtable_errors", Map("exception" -> e.getClass.getName))

ListResponse(request, Failure(e), Map.empty)

Expand All @@ -315,6 +330,10 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,
val resultFutures = {
requests.map { request =>
val tableId = mapDatasetToTable(request.dataset)
val datasetMetricsContext = tableToContext.getOrElseUpdate(
tableId.toString,
metricsContext.copy(dataset = tableId.toString)
)

val tableType = getTableType(request.dataset)
val timestampInPutRequest = request.tsMillis.getOrElse(System.currentTimeMillis())
Expand Down Expand Up @@ -345,15 +364,13 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,
val scalaFuture = googleFutureToScalaFuture(mutateApiFuture)
scalaFuture
.map { _ =>
metricsContext.distribution("multiPut.latency",
System.currentTimeMillis() - startTs,
s"dataset:${request.dataset}")
metricsContext.increment("multiPut.successes", s"dataset:${request.dataset}")
datasetMetricsContext.distribution("multiPut.latency", System.currentTimeMillis() - startTs)
datasetMetricsContext.increment("multiPut.successes")
true
}
.recover { case e: Exception =>
logger.error("Error putting data", e)
metricsContext.increment("multiPut.failures", s"exception:${e.getClass.getName},dataset:${request.dataset}")
datasetMetricsContext.increment("multiPut.failures", Map("exception" -> e.getClass.getName))
false
}
}
Expand All @@ -364,7 +381,7 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,
override def bulkPut(sourceOfflineTable: String, destinationOnlineDataSet: String, partition: String): Unit = {
if (maybeBigQueryClient.isEmpty || maybeAdminClient.isEmpty) {
logger.error("Need the BigTable admin and BigQuery available to export data to BigTable")
metricsContext.increment("bulkPut.failures", "exception:missinguploadclients")
metricsContext.increment("bulkPut.failures", Map("exception" -> "missinguploadclients"))
throw new RuntimeException("BigTable admin and BigQuery clients are needed to export data to BigTable")
}

Expand Down Expand Up @@ -436,11 +453,12 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient,
if (completedJob == null) {
// job no longer exists
logger.error(s"Job corresponding to $jobId no longer exists")
metricsContext.increment("bulkPut.failures", "exception:missingjob")
metricsContext.increment("bulkPut.failures", Map("exception" -> "missingjob"))
throw new RuntimeException(s"Export job corresponding to $jobId no longer exists")
} else if (completedJob.getStatus.getError != null) {
logger.error(s"Job failed with error: ${completedJob.getStatus.getError}")
metricsContext.increment("bulkPut.failures", s"exception:${completedJob.getStatus.getError.getReason}")
metricsContext.increment("bulkPut.failures",
Map("exception" -> s"${completedJob.getStatus.getError.getReason}"))
throw new RuntimeException(s"Export job failed with error: ${completedJob.getStatus.getError}")
} else {
logger.info("Export job completed successfully")
Expand Down
53 changes: 53 additions & 0 deletions docker/fetcher/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Start from a Debian base image
FROM openjdk:17-jdk-slim

# We expect jars to be copied to the build_output directory as docker can't read from bazel-bin as that's a symlink
# https://stackoverflow.com/questions/31881904/docker-follow-symlink-outside-context
ENV CLOUD_AWS_JAR_PATH=build_output/cloud_aws_lib_deploy.jar
ENV CLOUD_GCP_JAR_PATH=build_output/cloud_gcp_lib_deploy.jar
ENV FETCHER_SVC_JAR_PATH=build_output/service_assembly_deploy.jar
ENV FETCHER_LAUNCH_SCRIPT=docker/fetcher/start.sh
ENV GCP_ONLINE_CLASS=ai.chronon.integrations.cloud_gcp.GcpApiImpl
ENV AWS_ONLINE_CLASS=ai.chronon.integrations.aws.AwsApiImpl

# Update package lists and install necessary tools
RUN apt-get update && apt-get install -y \
curl \
python3 \
python3-dev \
python3-setuptools \
vim \
wget \
procps \
python3-pip

ENV SCALA_VERSION 2.12.18

RUN curl https://downloads.lightbend.com/scala/${SCALA_VERSION}/scala-${SCALA_VERSION}.deb -k -o scala.deb && \
apt install -y ./scala.deb && \
rm -rf scala.deb /var/lib/apt/lists/*

ENV SCALA_HOME="/usr/bin/scala"
ENV PATH=${PATH}:${SCALA_HOME}/bin

WORKDIR /srv/zipline

ENV CLOUD_AWS_JAR=${CLOUD_AWS_JAR:-"/srv/zipline/cloud_aws/cloud_aws.jar"}
ENV CLOUD_GCP_JAR=${CLOUD_GCP_JAR:-"/srv/zipline/cloud_gcp/cloud_gcp.jar"}
ENV FETCHER_JAR=${FETCHER_JAR:-"/srv/zipline/fetcher/service.jar"}
ENV LOG_PATH=${LOG_PATH:-"/srv/zipline/fetcher/logs"}

COPY $CLOUD_AWS_JAR_PATH "$CLOUD_AWS_JAR"
COPY $CLOUD_GCP_JAR_PATH "$CLOUD_GCP_JAR"
COPY $FETCHER_SVC_JAR_PATH "$FETCHER_JAR"
COPY $FETCHER_LAUNCH_SCRIPT /srv/zipline/fetcher/start.sh

ENV FETCHER_PORT=9000

HEALTHCHECK --start-period=2m --retries=4 CMD curl --fail http://localhost:$FETCHER_PORT/ping || exit 1

RUN mkdir -p $LOG_PATH && \
chmod 755 $LOG_PATH

CMD /srv/zipline/fetcher/start.sh

41 changes: 41 additions & 0 deletions docker/fetcher/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/bin/bash
set -e

# Required environment variables
required_vars=("FETCHER_JAR" "FETCHER_PORT")
for var in "${required_vars[@]}"; do
if [ -z "${!var}" ]; then
echo "Error: Required environment variable $var is not set"
exit 1
fi
done

if [[ $USE_AWS == true ]]; then
ONLINE_JAR=$CLOUD_AWS_JAR
ONLINE_CLASS=$AWS_ONLINE_CLASS
else
ONLINE_JAR=$CLOUD_GCP_JAR
ONLINE_CLASS=$GCP_ONLINE_CLASS
fi

if [ -z "$EXPORTER_OTLP_ENDPOINT" ]; then
echo "OpenTelemetry endpoint not configured. Disabling metrics reporting"
METRICS_ENABLED="false"
else
METRICS_ENABLED="true"
fi

JMX_OPTS="-XX:MaxMetaspaceSize=1g -XX:MaxRAMPercentage=70.0 -XX:MinRAMPercentage=70.0 -XX:InitialRAMPercentage=70.0 -XX:MaxHeapFreeRatio=100 -XX:MinHeapFreeRatio=0"

echo "Starting Fetcher service with online jar $ONLINE_JAR and online class $ONLINE_CLASS"

if ! java -jar $FETCHER_JAR run ai.chronon.service.FetcherVerticle \
$JMX_OPTS \
-Dserver.port=$FETCHER_PORT \
-Donline.jar=$ONLINE_JAR \
-Dai.chronon.metrics.enabled=$METRICS_ENABLED \
-Dai.chronon.metrics.exporter.url=$EXPORTER_OTLP_ENDPOINT \
-Donline.class=$ONLINE_CLASS; then
echo "Error: Fetcher service failed to start"
exit 1
fi
Loading