From ae7cc6f4c708439216eafbda4cff27de0ad1ac48 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Fri, 25 Apr 2025 09:11:19 -0400 Subject: [PATCH 01/11] Swap statsd for Otel metrics with support for http / prom readers --- .../cloud_gcp/BigTableKVStoreImpl.scala | 57 +- docker/fetcher/start.sh | 41 ++ maven_install.json | 552 +++++++++++++++++- online/BUILD.bazel | 16 +- .../java/ai/chronon/online/JavaFetcher.java | 4 +- .../ai/chronon/online/fetcher/Fetcher.scala | 40 +- .../ai/chronon/online/metrics/Metrics.scala | 140 ++--- .../online/metrics/MetricsReporter.scala | 17 + .../online/metrics/OtelMetricsReporter.scala | 149 +++++ .../ai/chronon/online/test/TagsTest.scala | 45 +- service_commons/BUILD.bazel | 2 +- .../service/ChrononServiceLauncher.java | 49 +- .../dependencies/maven_repository.bzl | 9 + 13 files changed, 914 insertions(+), 207 deletions(-) create mode 100755 docker/fetcher/start.sh create mode 100644 online/src/main/scala/ai/chronon/online/metrics/MetricsReporter.scala create mode 100644 online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala index 8786199e11..e697208ef0 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala @@ -13,8 +13,8 @@ import ai.chronon.online.KVStore import ai.chronon.online.KVStore.ListRequest import ai.chronon.online.KVStore.ListResponse import ai.chronon.online.KVStore.ListValue -import ai.chronon.online.metrics.Metrics -import com.google.api.core.ApiFuture +import ai.chronon.online.metrics.{Metrics, MetricsReporter} +import com.google.api.core.{ApiFuture, ApiFutures} import com.google.cloud.RetryOption import com.google.cloud.bigquery.BigQuery import com.google.cloud.bigquery.BigQueryErrorMessages @@ -36,6 +36,9 @@ import org.slf4j.LoggerFactory import org.threeten.bp.Duration import java.nio.charset.Charset +import java.util +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer import scala.compat.java8.FutureConverters import scala.concurrent.Future import scala.concurrent.duration._ @@ -88,6 +91,8 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, protected val metricsContext: Metrics.Context = Metrics.Context(Metrics.Environment.KVStore).withSuffix("bigtable") + protected val tableToContext = new TrieMap[String, Metrics.Context]() + override def create(dataset: String): Unit = create(dataset, Map.empty) override def create(dataset: String, props: Map[String, Any]): Unit = { @@ -114,8 +119,7 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, case e: Exception => logger.error("Error creating table", e) - metricsContext.increment("create.failures", s"exception:${e.getClass.getName}") - + metricsContext.increment("create.failures", Map("exception" -> e.getClass.getName)) } } .orElse(throw new IllegalStateException("Missing BigTable admin client. Is the ENABLE_UPLOAD_CLIENTS flag set?")) @@ -136,9 +140,15 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, private def readRowsMultiGet( requestsByDataset: Map[String, Seq[KVStore.GetRequest]]): Seq[Future[Seq[KVStore.GetResponse]]] = { requestsByDataset.map { case (dataset, datasetRequests) => + val targetId = mapDatasetToTable(dataset) + val datasetMetricsContext = tableToContext.getOrElseUpdate( + targetId.toString, + metricsContext.copy(dataset = targetId.toString) + ) + // Create a single query for all requests in this dataset val query = Query - .create(mapDatasetToTable(dataset)) + .create(targetId) .filter(Filters.FILTERS.family().exactMatch(ColumnFamilyString)) .filter(Filters.FILTERS.qualifier().exactMatch(ColumnFamilyQualifierString)) @@ -184,8 +194,8 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, // Process all results at once scalaResultFuture .map { rows => - metricsContext.distribution("multiGet.latency", System.currentTimeMillis() - startTs, s"dataset:$dataset") - metricsContext.increment("multiGet.successes", s"dataset:$dataset") + datasetMetricsContext.distribution("multiGet.latency", System.currentTimeMillis() - startTs) + datasetMetricsContext.increment("multiGet.successes") // Create a map for quick lookup by row key val rowKeyToRowMap = rows.asScala.map(row => row.getKey() -> row).toMap @@ -206,7 +216,7 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, } .recover { case e: Exception => logger.error("Error getting values", e) - metricsContext.increment("multiGet.bigtable_errors", s"exception:${e.getClass.getName},dataset:$dataset") + datasetMetricsContext.increment("multiGet.bigtable_errors", Map("exception" -> e.getClass.getName)) // If the batch fails, return failures for all requests in the batch datasetRequests.map { request => KVStore.GetResponse(request, Failure(e)) @@ -255,8 +265,13 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, val maybeListEntityType = request.props.get(ListEntityType) val maybeStartKey = request.props.get(ContinuationKey) + val targetId = mapDatasetToTable(request.dataset) + val datasetMetricsContext = tableToContext.getOrElseUpdate( + targetId.toString, + metricsContext.copy(dataset = targetId.toString) + ) val query = Query - .create(mapDatasetToTable(request.dataset)) + .create(targetId) .filter(Filters.FILTERS.family().exactMatch(ColumnFamilyString)) .filter(Filters.FILTERS.qualifier().exactMatch(ColumnFamilyQualifierString)) // we also limit to the latest cell per row as we don't want clients to iterate over all prior edits @@ -280,8 +295,8 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, rowsScalaFuture .map { rows => - metricsContext.distribution("list.latency", System.currentTimeMillis() - startTs, s"dataset:${request.dataset}") - metricsContext.increment("list.successes", s"dataset:${request.dataset}") + datasetMetricsContext.distribution("list.latency", System.currentTimeMillis() - startTs) + datasetMetricsContext.increment("list.successes") val listValues = rows.asScala.flatMap { row => row.getCells(ColumnFamilyString, ColumnFamilyQualifier).asScala.map { cell => @@ -300,7 +315,7 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, } .recover { case e: Exception => logger.error("Error listing values", e) - metricsContext.increment("list.bigtable_errors", s"exception:${e.getClass.getName},dataset:${request.dataset}") + datasetMetricsContext.increment("list.bigtable_errors", Map("exception" -> e.getClass.getName)) ListResponse(request, Failure(e), Map.empty) @@ -315,6 +330,10 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, val resultFutures = { requests.map { request => val tableId = mapDatasetToTable(request.dataset) + val datasetMetricsContext = tableToContext.getOrElseUpdate( + tableId.toString, + metricsContext.copy(dataset = tableId.toString) + ) val tableType = getTableType(request.dataset) val timestampInPutRequest = request.tsMillis.getOrElse(System.currentTimeMillis()) @@ -345,15 +364,13 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, val scalaFuture = googleFutureToScalaFuture(mutateApiFuture) scalaFuture .map { _ => - metricsContext.distribution("multiPut.latency", - System.currentTimeMillis() - startTs, - s"dataset:${request.dataset}") - metricsContext.increment("multiPut.successes", s"dataset:${request.dataset}") + datasetMetricsContext.distribution("multiPut.latency", System.currentTimeMillis() - startTs) + datasetMetricsContext.increment("multiPut.successes") true } .recover { case e: Exception => logger.error("Error putting data", e) - metricsContext.increment("multiPut.failures", s"exception:${e.getClass.getName},dataset:${request.dataset}") + datasetMetricsContext.increment("multiPut.failures", Map("exception" -> e.getClass.getName)) false } } @@ -364,7 +381,7 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, override def bulkPut(sourceOfflineTable: String, destinationOnlineDataSet: String, partition: String): Unit = { if (maybeBigQueryClient.isEmpty || maybeAdminClient.isEmpty) { logger.error("Need the BigTable admin and BigQuery available to export data to BigTable") - metricsContext.increment("bulkPut.failures", "exception:missinguploadclients") + metricsContext.increment("bulkPut.failures", Map("exception" -> "missinguploadclients")) throw new RuntimeException("BigTable admin and BigQuery clients are needed to export data to BigTable") } @@ -436,11 +453,11 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, if (completedJob == null) { // job no longer exists logger.error(s"Job corresponding to $jobId no longer exists") - metricsContext.increment("bulkPut.failures", "exception:missingjob") + metricsContext.increment("bulkPut.failures", Map("exception" -> "missingjob")) throw new RuntimeException(s"Export job corresponding to $jobId no longer exists") } else if (completedJob.getStatus.getError != null) { logger.error(s"Job failed with error: ${completedJob.getStatus.getError}") - metricsContext.increment("bulkPut.failures", s"exception:${completedJob.getStatus.getError.getReason}") + metricsContext.increment("bulkPut.failures", Map("exception" -> "completedJob.getStatus.getError.getReason")) throw new RuntimeException(s"Export job failed with error: ${completedJob.getStatus.getError}") } else { logger.info("Export job completed successfully") diff --git a/docker/fetcher/start.sh b/docker/fetcher/start.sh new file mode 100755 index 0000000000..0bc6cb9f99 --- /dev/null +++ b/docker/fetcher/start.sh @@ -0,0 +1,41 @@ +#!/bin/bash +set -e + +# Required environment variables +required_vars=("FETCHER_JAR" "FETCHER_PORT") +for var in "${required_vars[@]}"; do + if [ -z "${!var}" ]; then + echo "Error: Required environment variable $var is not set" + exit 1 + fi +done + +if [[ $USE_AWS == true ]]; then + ONLINE_JAR=$CLOUD_AWS_JAR + ONLINE_CLASS=$AWS_ONLINE_CLASS +else + ONLINE_JAR=$CLOUD_GCP_JAR + ONLINE_CLASS=$GCP_ONLINE_CLASS +fi + +if [ -z "$EXPORTER_OTLP_ENDPOINT" ]; then + echo "OpenTelemetry endpoint not configured. Disabling metrics reporting" + METRICS_ENABLED="false" +else + METRICS_ENABLED="true" +fi + +JMX_OPTS="-XX:MaxMetaspaceSize=1g -XX:MaxRAMPercentage=70.0 -XX:MinRAMPercentage=70.0 -XX:InitialRAMPercentage=70.0 -XX:MaxHeapFreeRatio=100 -XX:MinHeapFreeRatio=0" + +echo "Starting Fetcher service with online jar $ONLINE_JAR and online class $ONLINE_CLASS" + +if ! java -jar $FETCHER_JAR run ai.chronon.service.FetcherVerticle \ + $JMX_OPTS \ + -Dserver.port=$FETCHER_PORT \ + -Donline.jar=$ONLINE_JAR \ + -Dai.chronon.metrics.enabled=$METRICS_ENABLED \ + -Dai.chronon.metrics.exporter.url=$EXPORTER_OTLP_ENDPOINT \ + -Donline.class=$ONLINE_CLASS; then + echo "Error: Fetcher service failed to start" + exit 1 +fi diff --git a/maven_install.json b/maven_install.json index 5a7d07ddb5..3e6f448431 100755 --- a/maven_install.json +++ b/maven_install.json @@ -1,7 +1,7 @@ { "__AUTOGENERATED_FILE_DO_NOT_MODIFY_THIS_FILE_MANUALLY": "THERE_IS_NO_DATA_ONLY_ZUUL", - "__INPUT_ARTIFACTS_HASH": -1448107542, - "__RESOLVED_ARTIFACTS_HASH": -193250516, + "__INPUT_ARTIFACTS_HASH": -103810451, + "__RESOLVED_ARTIFACTS_HASH": 812258953, "artifacts": { "ant:ant": { "shasums": { @@ -1194,6 +1194,20 @@ }, "version": "1.3.16" }, + "com.squareup.okhttp3:okhttp": { + "shasums": { + "jar": "b1050081b14bb7a3a7e55a4d3ef01b5dcfabc453b4573a4fc019767191d5f4e0", + "sources": "d91a769a4140e542cddbac4e67fcf279299614e8bfd53bd23b85e60c2861341c" + }, + "version": "4.12.0" + }, + "com.squareup.okio:okio": { + "shasums": { + "jar": "8e63292e5c53bb93c4a6b0c213e79f15990fed250c1340f1c343880e1c9c39b5", + "sources": "64d5b6667f064511dd93100173f735b2d5052a1c926858f4b6a05b84e825ef94" + }, + "version": "3.6.0" + }, "com.squareup.okio:okio-jvm": { "shasums": { "jar": "ddc386ff14bd25d5c934167196eaf45b18de4f28e1c55a4db37ae594cbfd37e4", @@ -1761,6 +1775,13 @@ }, "version": "1.13.6" }, + "io.micrometer:micrometer-registry-otlp": { + "shasums": { + "jar": "530dc759cc077c81f5d69566b23bc05d395e5946f3af2309b7f169aeffd63d12", + "sources": "4e48ea0305fe51183c9e0ae5486d84753bf14051acb8f7c05349687f1b7971af" + }, + "version": "1.13.6" + }, "io.micrometer:micrometer-registry-statsd": { "shasums": { "jar": "0d57c12a34ab6e87d58d8b0ecde8ec592bcd2818e39a64c7314d8a0728eab5b6", @@ -2107,6 +2128,13 @@ }, "version": "1.37.0-alpha" }, + "io.opentelemetry.proto:opentelemetry-proto": { + "shasums": { + "jar": "1587f3427474de8d8ac19ea5558c2d5b4748179c0c527cc3ecb1c7595ac6e3a4", + "sources": "d8bb1f5f2e6a8a328823fb086d42b78d300813e0f0cd388099a2f0592443a79e" + }, + "version": "1.2.0-alpha" + }, "io.opentelemetry.semconv:opentelemetry-semconv": { "shasums": { "jar": "745a86a75ecb5e03f464f05ea2dc76e0f04d07273c5509fa74f393bff9b222b7", @@ -2116,10 +2144,10 @@ }, "io.opentelemetry:opentelemetry-api": { "shasums": { - "jar": "6566f1f1133d611ff4e8b8fdb8eb18577b970425620315363ee9be43843b14bf", - "sources": "1e7bac30757e9504ea737c4be341130d8f013a3290059022156bce1a06d64546" + "jar": "a813c9a92b82a4ffa3a62e88216a9e9ef9a407fcd41614fe596b2895167ed638", + "sources": "45ce4df9b8157e57e07dbd68784d3877474bd2fc2291954ef32b5666845d5c7f" }, - "version": "1.47.0" + "version": "1.49.0" }, "io.opentelemetry:opentelemetry-api-incubator": { "shasums": { @@ -2130,52 +2158,94 @@ }, "io.opentelemetry:opentelemetry-context": { "shasums": { - "jar": "15b4fc4234e6dca6d54800d572694ecbd07ba52c15fc5b221b4da5517ce8d90d", - "sources": "7c959f00d13fc1698a7d37878715e6c42b5ee89b2076a324ffaf8de5b9348b2b" + "jar": "17de3c85b341240b3b216c43adbd244c9fbd39c9e5448306d56460f80599a76b", + "sources": "d51f02535781654be13ca245bc09d1ddddf54c19788be8b3f8d833701c292a8b" + }, + "version": "1.49.0" + }, + "io.opentelemetry:opentelemetry-exporter-common": { + "shasums": { + "jar": "06d08af00b9fec8e99b6a4fda2eb201facbe2fe38a89eb11cbbfbe4183b73141", + "sources": "fc6b8455e51702a007008b05eb3570a13aaefb5775704bf1c101044297b85444" + }, + "version": "1.49.0" + }, + "io.opentelemetry:opentelemetry-exporter-otlp": { + "shasums": { + "jar": "b69c1664dbd75e7a4c5ab2e187766e7e4fcdb208c94dc569f1453c388d595812", + "sources": "6d32e29962e51677931393b668ba7972572a48dd802854d13900d8337f81a4ca" + }, + "version": "1.49.0" + }, + "io.opentelemetry:opentelemetry-exporter-otlp-common": { + "shasums": { + "jar": "be16d6802be0eb2d08389fc9af1e801fc98f96061fe6bcda2562dcb7e2e0dd5b", + "sources": "6de2fa295ad8c362d40b6590888a7ee35d59fc8e82dd094b92e2d5cec45376c5" + }, + "version": "1.49.0" + }, + "io.opentelemetry:opentelemetry-exporter-prometheus": { + "shasums": { + "jar": "43311a2a83d8d8d808913fe17b0954b8f2b90b47bcc5c114b74bf422dfb2c0d7", + "sources": "663d2937fdb560ed2a6fb27601eec5c378e2dd693ad45b91bc3ebf335f3afe86" + }, + "version": "1.49.0-alpha" + }, + "io.opentelemetry:opentelemetry-exporter-sender-okhttp": { + "shasums": { + "jar": "1783a33e5bb241a5d6062c01b5c8cae9b0e1d296aff0b4c1fe3280b136bf4ad4", + "sources": "baabdd90f15434a5ca61c1621858c341907461b884586cf908e703d721a93a77" }, - "version": "1.47.0" + "version": "1.49.0" }, "io.opentelemetry:opentelemetry-sdk": { "shasums": { - "jar": "4a09eb2ee484769973e14218a34e6da54f35955aa02b26dc5238b0c2ed6a801d", - "sources": "b3315aecae48b992e734c57ed659c73966836fcb2224f3f3882dee47feb8affd" + "jar": "d6fdaf0f04724d5c9362db2f809fd21f36e95c9c039d22fe695692d606107bff", + "sources": "dd827c172d20f046d5bf6a9df772290b0eaafb59589b6b98fd742c6bc78c2d37" }, - "version": "1.47.0" + "version": "1.49.0" }, "io.opentelemetry:opentelemetry-sdk-common": { "shasums": { - "jar": "7ce55666aca7f2e5697a57bd4133e4508a6dc5041685f2d1ef31bb156f32e3bd", - "sources": "96268cd9f2086de5d70289f750638a78c4aa3d9289ec083e242ed11529b332e7" + "jar": "b06214ccf3cda749edcd426c1492483e201e1fcfadb9f9cba8ecb439ff5c5d0f", + "sources": "dec98f91b60152b5c17c46aa30f0d763ee78fc672e3f006ce731077c7ba563bb" }, - "version": "1.47.0" + "version": "1.49.0" + }, + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure": { + "shasums": { + "jar": "d41db554d2813b35006a67b0ec357d1848ff6b11c2a768f35a9b776233eedc05", + "sources": "d1f9d48fa9c6c152b40e8e7192208af4f7d320dd85a96ea2b455bdfcec3605a6" + }, + "version": "1.49.0" }, "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi": { "shasums": { - "jar": "fe095e16871b942cae7fed6e0b3bbff462111fe62fe31ccd34d7542f8ebcfe90", - "sources": "4dd4752c749d50487ab0e5753dfbf9289a3efb5768c73fe5e575239513338a80" + "jar": "b3091033e5b4b4b49334709707858d993afb92fe5c32d4a0b27e02d2e956c5b7", + "sources": "7dd3b2e3ed4d2e91839c3f957b3a57d727284f2f1cfb6d51fbeb5e4e4db1aed0" }, - "version": "1.42.1" + "version": "1.49.0" }, "io.opentelemetry:opentelemetry-sdk-logs": { "shasums": { - "jar": "302491984b63eebaf4b58bd3ae19d223a55f79090a3e46b40507b49c3cbe9cc5", - "sources": "06c2783b864be3483b5fa552cd3e1ce2ea5564c3a68ceb4880bcac715029cf39" + "jar": "edf7f2411d5180099efebc5a5e2f01378f647897671aa9819f2064e24b6f7e9f", + "sources": "0d6d296d7380257727935c9235a5619dc9c981700d6f18e840cc0b72ca12d906" }, - "version": "1.47.0" + "version": "1.49.0" }, "io.opentelemetry:opentelemetry-sdk-metrics": { "shasums": { - "jar": "7d1442c5ca916ba2513005205d3b8b9bc5dca4e2a84867d0550602a0dfc0bba5", - "sources": "4713d4b3a529266f52e69b792a61f49ceda82239d469939a12abc0049c9cf5d9" + "jar": "cf28ea29fcfd4577a4a3bda388ac08cbbb86e4b1b534822848ea5c5554bfd603", + "sources": "08868596a4e0cce486df12b09dcfd5dd13a7582f98bee75bdecbf7432df20f3e" }, - "version": "1.47.0" + "version": "1.49.0" }, "io.opentelemetry:opentelemetry-sdk-trace": { "shasums": { - "jar": "03950efd5fa5a276769a593579d8f602742a5d52f9978569326d2a9f9e162546", - "sources": "618ec9636adf0d76fd20894c456168f148c1c1df920de855f13f2afb379dffac" + "jar": "483bf21dde1df1a2c9875ba7dff1e3c8c1a63789cd45ad58223ec64b5951318e", + "sources": "f370df177c08982ca20e0525adc94f995503e74d611c1240cd2cc616624277be" }, - "version": "1.47.0" + "version": "1.49.0" }, "io.perfmark:perfmark-api": { "shasums": { @@ -2184,6 +2254,48 @@ }, "version": "0.27.0" }, + "io.prometheus:prometheus-metrics-config": { + "shasums": { + "jar": "32e6ecf39f3ab76eee94ee69102795f8ed80024d27fda0ba41d54f25ac887fad", + "sources": "85d24864292b3b854d05ae291e42a60512d5ba49970e1d33309ca1f50e2b7e5d" + }, + "version": "1.3.6" + }, + "io.prometheus:prometheus-metrics-exporter-common": { + "shasums": { + "jar": "f2a2e0dcd101764cf2cceb57895c0cb1839f8eac17f6315112b7cb031db4e9a8", + "sources": "09e1968a812281dae25ee7601388dc0cfb28e7d36d2de909c6e73cb59b881788" + }, + "version": "1.3.6" + }, + "io.prometheus:prometheus-metrics-exporter-httpserver": { + "shasums": { + "jar": "b5db074b19cd439ad358eacdf6ca191eb744f8a14c9833f6c3c8db43ff463a0b", + "sources": "eb917f898609b261d92d3c5697e9566ab72213ab3a4333786f81fea3fa985260" + }, + "version": "1.3.6" + }, + "io.prometheus:prometheus-metrics-exposition-formats": { + "shasums": { + "jar": "527c5e21767934fa1d7fb0f4581f560254fe15fff5b86046eb3d669aec8ae000", + "sources": "5a14e821f13baeafae1deb31de59f5cf74b4e567b063999463173d2e6cc77886" + }, + "version": "1.3.6" + }, + "io.prometheus:prometheus-metrics-exposition-textformats": { + "shasums": { + "jar": "2d9e03503c2bcd1c913f2a4f82d2ebef8b9df971c6f22de7993dfabe01d575f9", + "sources": "b4305d8714ef48fe9c1022bfa68d73653262ff116524420755e955c7eea004eb" + }, + "version": "1.3.6" + }, + "io.prometheus:prometheus-metrics-model": { + "shasums": { + "jar": "51a8da74c037ddd5c94dd6bfc828e60b748efede0dc3fae6f188d4e0bbeadd75", + "sources": "c1bd6a6b9ce12d007b3d5d8570ffb79ca1c5a34d26964dd3251f25567e93cad4" + }, + "version": "1.3.6" + }, "io.swagger.core.v3:swagger-annotations": { "shasums": { "jar": "59573c4d6357c2121d40069959879cf008783cc8208dc5123f759b0e6a0077ad", @@ -4057,6 +4169,20 @@ }, "version": "1.9.21" }, + "org.jetbrains.kotlin:kotlin-stdlib-jdk7": { + "shasums": { + "jar": "33d148db0e11debd0d90677d28242bced907f9c77730000fd597867089039d86", + "sources": "ea10d3e5e6e695d8a5283cbf116321acae6ba42d0bdd3eda50f7c34a26fa25cb" + }, + "version": "1.8.21" + }, + "org.jetbrains.kotlin:kotlin-stdlib-jdk8": { + "shasums": { + "jar": "3db752a30074f06ee6c57984aa6f27da44f4d2bbc7f5442651f6988f1cb2b7d7", + "sources": "40e9a80f6b953d12389623760d438e69914098d0c4d7053f70f90533ec041259" + }, + "version": "1.8.21" + }, "org.jetbrains:annotations": { "shasums": { "jar": "195fb0da046d55bb042e91543484cf1da68b02bb7afbfe031f229e45ac84b3f2", @@ -6291,6 +6417,13 @@ "com.softwaremill.sttp.model:core_2.13", "com.softwaremill.sttp.shared:core_2.13" ], + "com.squareup.okhttp3:okhttp": [ + "com.squareup.okio:okio", + "org.jetbrains.kotlin:kotlin-stdlib-jdk8" + ], + "com.squareup.okio:okio": [ + "com.squareup.okio:okio-jvm" + ], "com.squareup.okio:okio-jvm": [ "org.jetbrains.kotlin:kotlin-stdlib" ], @@ -6602,6 +6735,10 @@ "io.micrometer:micrometer-observation": [ "io.micrometer:micrometer-commons" ], + "io.micrometer:micrometer-registry-otlp": [ + "io.micrometer:micrometer-core", + "io.opentelemetry.proto:opentelemetry-proto" + ], "io.micrometer:micrometer-registry-statsd": [ "io.micrometer:micrometer-core" ], @@ -6867,6 +7004,82 @@ "com.google.guava:guava", "io.opencensus:opencensus-api" ], + "io.opentelemetry.proto:opentelemetry-proto": [ + "com.google.protobuf:protobuf-java" + ], + "io.opentelemetry:opentelemetry-api": [ + "io.opentelemetry:opentelemetry-context" + ], + "io.opentelemetry:opentelemetry-exporter-common": [ + "io.opentelemetry:opentelemetry-api", + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi" + ], + "io.opentelemetry:opentelemetry-exporter-otlp": [ + "io.opentelemetry:opentelemetry-exporter-otlp-common", + "io.opentelemetry:opentelemetry-exporter-sender-okhttp", + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi", + "io.opentelemetry:opentelemetry-sdk-logs", + "io.opentelemetry:opentelemetry-sdk-metrics", + "io.opentelemetry:opentelemetry-sdk-trace" + ], + "io.opentelemetry:opentelemetry-exporter-otlp-common": [ + "io.opentelemetry:opentelemetry-exporter-common" + ], + "io.opentelemetry:opentelemetry-exporter-prometheus": [ + "io.opentelemetry:opentelemetry-exporter-common", + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi", + "io.opentelemetry:opentelemetry-sdk-metrics", + "io.prometheus:prometheus-metrics-exporter-httpserver" + ], + "io.opentelemetry:opentelemetry-exporter-sender-okhttp": [ + "com.squareup.okhttp3:okhttp", + "io.opentelemetry:opentelemetry-exporter-common", + "io.opentelemetry:opentelemetry-sdk-common" + ], + "io.opentelemetry:opentelemetry-sdk": [ + "io.opentelemetry:opentelemetry-api", + "io.opentelemetry:opentelemetry-sdk-common", + "io.opentelemetry:opentelemetry-sdk-logs", + "io.opentelemetry:opentelemetry-sdk-metrics", + "io.opentelemetry:opentelemetry-sdk-trace" + ], + "io.opentelemetry:opentelemetry-sdk-common": [ + "io.opentelemetry:opentelemetry-api" + ], + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure": [ + "io.opentelemetry:opentelemetry-sdk", + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi" + ], + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi": [ + "io.opentelemetry:opentelemetry-sdk" + ], + "io.opentelemetry:opentelemetry-sdk-logs": [ + "io.opentelemetry:opentelemetry-api", + "io.opentelemetry:opentelemetry-sdk-common" + ], + "io.opentelemetry:opentelemetry-sdk-metrics": [ + "io.opentelemetry:opentelemetry-api", + "io.opentelemetry:opentelemetry-sdk-common" + ], + "io.opentelemetry:opentelemetry-sdk-trace": [ + "io.opentelemetry:opentelemetry-api", + "io.opentelemetry:opentelemetry-sdk-common" + ], + "io.prometheus:prometheus-metrics-exporter-common": [ + "io.prometheus:prometheus-metrics-exposition-formats", + "io.prometheus:prometheus-metrics-exposition-textformats", + "io.prometheus:prometheus-metrics-model" + ], + "io.prometheus:prometheus-metrics-exporter-httpserver": [ + "io.prometheus:prometheus-metrics-exporter-common" + ], + "io.prometheus:prometheus-metrics-exposition-formats": [ + "io.prometheus:prometheus-metrics-exposition-textformats" + ], + "io.prometheus:prometheus-metrics-exposition-textformats": [ + "io.prometheus:prometheus-metrics-config", + "io.prometheus:prometheus-metrics-model" + ], "io.temporal:temporal-sdk": [ "com.fasterxml.jackson.core:jackson-databind", "com.fasterxml.jackson.datatype:jackson-datatype-jdk8", @@ -8260,6 +8473,13 @@ "org.jetbrains.kotlin:kotlin-stdlib": [ "org.jetbrains:annotations" ], + "org.jetbrains.kotlin:kotlin-stdlib-jdk7": [ + "org.jetbrains.kotlin:kotlin-stdlib" + ], + "org.jetbrains.kotlin:kotlin-stdlib-jdk8": [ + "org.jetbrains.kotlin:kotlin-stdlib", + "org.jetbrains.kotlin:kotlin-stdlib-jdk7" + ], "org.jruby.joni:joni": [ "org.jruby.jcodings:jcodings" ], @@ -11089,6 +11309,25 @@ "sttp.ws", "sttp.ws.testing" ], + "com.squareup.okhttp3:okhttp": [ + "okhttp3", + "okhttp3.internal", + "okhttp3.internal.authenticator", + "okhttp3.internal.cache", + "okhttp3.internal.cache2", + "okhttp3.internal.concurrent", + "okhttp3.internal.connection", + "okhttp3.internal.http", + "okhttp3.internal.http1", + "okhttp3.internal.http2", + "okhttp3.internal.io", + "okhttp3.internal.platform", + "okhttp3.internal.platform.android", + "okhttp3.internal.proxy", + "okhttp3.internal.publicsuffix", + "okhttp3.internal.tls", + "okhttp3.internal.ws" + ], "com.squareup.okio:okio-jvm": [ "okio", "okio.internal" @@ -11924,6 +12163,9 @@ "io.micrometer.observation.docs", "io.micrometer.observation.transport" ], + "io.micrometer:micrometer-registry-otlp": [ + "io.micrometer.registry.otlp" + ], "io.micrometer:micrometer-registry-statsd": [ "io.micrometer.shaded.io.netty.bootstrap", "io.micrometer.shaded.io.netty.buffer", @@ -12295,6 +12537,16 @@ "io.opentelemetry.contrib:opentelemetry-gcp-resources": [ "io.opentelemetry.contrib.gcp.resource" ], + "io.opentelemetry.proto:opentelemetry-proto": [ + "io.opentelemetry.proto.collector.logs.v1", + "io.opentelemetry.proto.collector.metrics.v1", + "io.opentelemetry.proto.collector.trace.v1", + "io.opentelemetry.proto.common.v1", + "io.opentelemetry.proto.logs.v1", + "io.opentelemetry.proto.metrics.v1", + "io.opentelemetry.proto.resource.v1", + "io.opentelemetry.proto.trace.v1" + ], "io.opentelemetry.semconv:opentelemetry-semconv": [ "io.opentelemetry.semconv" ], @@ -12323,6 +12575,46 @@ "io.opentelemetry.context.propagation", "io.opentelemetry.context.propagation.internal" ], + "io.opentelemetry:opentelemetry-exporter-common": [ + "io.opentelemetry.exporter.internal", + "io.opentelemetry.exporter.internal.compression", + "io.opentelemetry.exporter.internal.grpc", + "io.opentelemetry.exporter.internal.http", + "io.opentelemetry.exporter.internal.marshal" + ], + "io.opentelemetry:opentelemetry-exporter-otlp": [ + "io.opentelemetry.exporter.otlp.all.internal", + "io.opentelemetry.exporter.otlp.http.logs", + "io.opentelemetry.exporter.otlp.http.metrics", + "io.opentelemetry.exporter.otlp.http.trace", + "io.opentelemetry.exporter.otlp.internal", + "io.opentelemetry.exporter.otlp.logs", + "io.opentelemetry.exporter.otlp.metrics", + "io.opentelemetry.exporter.otlp.trace" + ], + "io.opentelemetry:opentelemetry-exporter-otlp-common": [ + "io.opentelemetry.exporter.internal.otlp", + "io.opentelemetry.exporter.internal.otlp.logs", + "io.opentelemetry.exporter.internal.otlp.metrics", + "io.opentelemetry.exporter.internal.otlp.traces", + "io.opentelemetry.proto.collector.logs.v1.internal", + "io.opentelemetry.proto.collector.metrics.v1.internal", + "io.opentelemetry.proto.collector.profiles.v1development.internal", + "io.opentelemetry.proto.collector.trace.v1.internal", + "io.opentelemetry.proto.common.v1.internal", + "io.opentelemetry.proto.logs.v1.internal", + "io.opentelemetry.proto.metrics.v1.internal", + "io.opentelemetry.proto.profiles.v1development.internal", + "io.opentelemetry.proto.resource.v1.internal", + "io.opentelemetry.proto.trace.v1.internal" + ], + "io.opentelemetry:opentelemetry-exporter-prometheus": [ + "io.opentelemetry.exporter.prometheus", + "io.opentelemetry.exporter.prometheus.internal" + ], + "io.opentelemetry:opentelemetry-exporter-sender-okhttp": [ + "io.opentelemetry.exporter.sender.okhttp.internal" + ], "io.opentelemetry:opentelemetry-sdk": [ "io.opentelemetry.sdk" ], @@ -12333,6 +12625,10 @@ "io.opentelemetry.sdk.internal", "io.opentelemetry.sdk.resources" ], + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure": [ + "io.opentelemetry.sdk.autoconfigure", + "io.opentelemetry.sdk.autoconfigure.internal" + ], "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi": [ "io.opentelemetry.sdk.autoconfigure.spi", "io.opentelemetry.sdk.autoconfigure.spi.internal", @@ -12379,6 +12675,28 @@ "io.perfmark:perfmark-api": [ "io.perfmark" ], + "io.prometheus:prometheus-metrics-config": [ + "io.prometheus.metrics.config" + ], + "io.prometheus:prometheus-metrics-exporter-common": [ + "io.prometheus.metrics.exporter.common" + ], + "io.prometheus:prometheus-metrics-exporter-httpserver": [ + "io.prometheus.metrics.exporter.httpserver" + ], + "io.prometheus:prometheus-metrics-exposition-formats": [ + "io.prometheus.metrics.expositionformats.generated.com_google_protobuf_4_29_3", + "io.prometheus.metrics.expositionformats.internal", + "io.prometheus.metrics.shaded.com_google_protobuf_4_29_3", + "io.prometheus.metrics.shaded.com_google_protobuf_4_29_3.compiler" + ], + "io.prometheus:prometheus-metrics-exposition-textformats": [ + "io.prometheus.metrics.expositionformats" + ], + "io.prometheus:prometheus-metrics-model": [ + "io.prometheus.metrics.model.registry", + "io.prometheus.metrics.model.snapshots" + ], "io.swagger.core.v3:swagger-annotations": [ "io.swagger.v3.oas.annotations", "io.swagger.v3.oas.annotations.callbacks", @@ -24136,8 +24454,12 @@ "com.softwaremill.sttp.shared:ws_2.12:jar:sources", "com.softwaremill.sttp.shared:ws_2.13", "com.softwaremill.sttp.shared:ws_2.13:jar:sources", + "com.squareup.okhttp3:okhttp", + "com.squareup.okhttp3:okhttp:jar:sources", + "com.squareup.okio:okio", "com.squareup.okio:okio-jvm", "com.squareup.okio:okio-jvm:jar:sources", + "com.squareup.okio:okio:jar:sources", "com.squareup.wire:wire-runtime-jvm", "com.squareup.wire:wire-runtime-jvm:jar:sources", "com.squareup.wire:wire-schema-jvm", @@ -24298,6 +24620,8 @@ "io.micrometer:micrometer-core:jar:sources", "io.micrometer:micrometer-observation", "io.micrometer:micrometer-observation:jar:sources", + "io.micrometer:micrometer-registry-otlp", + "io.micrometer:micrometer-registry-otlp:jar:sources", "io.micrometer:micrometer-registry-statsd", "io.micrometer:micrometer-registry-statsd:jar:sources", "io.netty:netty-all", @@ -24402,6 +24726,8 @@ "io.openlineage:spark-extension-interfaces:jar:sources", "io.opentelemetry.contrib:opentelemetry-gcp-resources", "io.opentelemetry.contrib:opentelemetry-gcp-resources:jar:sources", + "io.opentelemetry.proto:opentelemetry-proto", + "io.opentelemetry.proto:opentelemetry-proto:jar:sources", "io.opentelemetry.semconv:opentelemetry-semconv", "io.opentelemetry.semconv:opentelemetry-semconv:jar:sources", "io.opentelemetry:opentelemetry-api", @@ -24410,11 +24736,23 @@ "io.opentelemetry:opentelemetry-api:jar:sources", "io.opentelemetry:opentelemetry-context", "io.opentelemetry:opentelemetry-context:jar:sources", + "io.opentelemetry:opentelemetry-exporter-common", + "io.opentelemetry:opentelemetry-exporter-common:jar:sources", + "io.opentelemetry:opentelemetry-exporter-otlp", + "io.opentelemetry:opentelemetry-exporter-otlp-common", + "io.opentelemetry:opentelemetry-exporter-otlp-common:jar:sources", + "io.opentelemetry:opentelemetry-exporter-otlp:jar:sources", + "io.opentelemetry:opentelemetry-exporter-prometheus", + "io.opentelemetry:opentelemetry-exporter-prometheus:jar:sources", + "io.opentelemetry:opentelemetry-exporter-sender-okhttp", + "io.opentelemetry:opentelemetry-exporter-sender-okhttp:jar:sources", "io.opentelemetry:opentelemetry-sdk", "io.opentelemetry:opentelemetry-sdk-common", "io.opentelemetry:opentelemetry-sdk-common:jar:sources", + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure", "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi", "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi:jar:sources", + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:jar:sources", "io.opentelemetry:opentelemetry-sdk-logs", "io.opentelemetry:opentelemetry-sdk-logs:jar:sources", "io.opentelemetry:opentelemetry-sdk-metrics", @@ -24424,6 +24762,18 @@ "io.opentelemetry:opentelemetry-sdk:jar:sources", "io.perfmark:perfmark-api", "io.perfmark:perfmark-api:jar:sources", + "io.prometheus:prometheus-metrics-config", + "io.prometheus:prometheus-metrics-config:jar:sources", + "io.prometheus:prometheus-metrics-exporter-common", + "io.prometheus:prometheus-metrics-exporter-common:jar:sources", + "io.prometheus:prometheus-metrics-exporter-httpserver", + "io.prometheus:prometheus-metrics-exporter-httpserver:jar:sources", + "io.prometheus:prometheus-metrics-exposition-formats", + "io.prometheus:prometheus-metrics-exposition-formats:jar:sources", + "io.prometheus:prometheus-metrics-exposition-textformats", + "io.prometheus:prometheus-metrics-exposition-textformats:jar:sources", + "io.prometheus:prometheus-metrics-model", + "io.prometheus:prometheus-metrics-model:jar:sources", "io.swagger.core.v3:swagger-annotations", "io.swagger.core.v3:swagger-annotations:jar:sources", "io.temporal:temporal-sdk", @@ -24945,6 +25295,10 @@ "org.jetbrains.kotlin:kotlin-reflect", "org.jetbrains.kotlin:kotlin-reflect:jar:sources", "org.jetbrains.kotlin:kotlin-stdlib", + "org.jetbrains.kotlin:kotlin-stdlib-jdk7", + "org.jetbrains.kotlin:kotlin-stdlib-jdk7:jar:sources", + "org.jetbrains.kotlin:kotlin-stdlib-jdk8", + "org.jetbrains.kotlin:kotlin-stdlib-jdk8:jar:sources", "org.jetbrains.kotlin:kotlin-stdlib:jar:sources", "org.jetbrains:annotations", "org.jetbrains:annotations:jar:sources", @@ -25568,8 +25922,12 @@ "com.softwaremill.sttp.shared:ws_2.12:jar:sources", "com.softwaremill.sttp.shared:ws_2.13", "com.softwaremill.sttp.shared:ws_2.13:jar:sources", + "com.squareup.okhttp3:okhttp", + "com.squareup.okhttp3:okhttp:jar:sources", + "com.squareup.okio:okio", "com.squareup.okio:okio-jvm", "com.squareup.okio:okio-jvm:jar:sources", + "com.squareup.okio:okio:jar:sources", "com.squareup.wire:wire-runtime-jvm", "com.squareup.wire:wire-runtime-jvm:jar:sources", "com.squareup.wire:wire-schema-jvm", @@ -25730,6 +26088,8 @@ "io.micrometer:micrometer-core:jar:sources", "io.micrometer:micrometer-observation", "io.micrometer:micrometer-observation:jar:sources", + "io.micrometer:micrometer-registry-otlp", + "io.micrometer:micrometer-registry-otlp:jar:sources", "io.micrometer:micrometer-registry-statsd", "io.micrometer:micrometer-registry-statsd:jar:sources", "io.netty:netty-all", @@ -25834,6 +26194,8 @@ "io.openlineage:spark-extension-interfaces:jar:sources", "io.opentelemetry.contrib:opentelemetry-gcp-resources", "io.opentelemetry.contrib:opentelemetry-gcp-resources:jar:sources", + "io.opentelemetry.proto:opentelemetry-proto", + "io.opentelemetry.proto:opentelemetry-proto:jar:sources", "io.opentelemetry.semconv:opentelemetry-semconv", "io.opentelemetry.semconv:opentelemetry-semconv:jar:sources", "io.opentelemetry:opentelemetry-api", @@ -25842,11 +26204,23 @@ "io.opentelemetry:opentelemetry-api:jar:sources", "io.opentelemetry:opentelemetry-context", "io.opentelemetry:opentelemetry-context:jar:sources", + "io.opentelemetry:opentelemetry-exporter-common", + "io.opentelemetry:opentelemetry-exporter-common:jar:sources", + "io.opentelemetry:opentelemetry-exporter-otlp", + "io.opentelemetry:opentelemetry-exporter-otlp-common", + "io.opentelemetry:opentelemetry-exporter-otlp-common:jar:sources", + "io.opentelemetry:opentelemetry-exporter-otlp:jar:sources", + "io.opentelemetry:opentelemetry-exporter-prometheus", + "io.opentelemetry:opentelemetry-exporter-prometheus:jar:sources", + "io.opentelemetry:opentelemetry-exporter-sender-okhttp", + "io.opentelemetry:opentelemetry-exporter-sender-okhttp:jar:sources", "io.opentelemetry:opentelemetry-sdk", "io.opentelemetry:opentelemetry-sdk-common", "io.opentelemetry:opentelemetry-sdk-common:jar:sources", + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure", "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi", "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi:jar:sources", + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:jar:sources", "io.opentelemetry:opentelemetry-sdk-logs", "io.opentelemetry:opentelemetry-sdk-logs:jar:sources", "io.opentelemetry:opentelemetry-sdk-metrics", @@ -25856,6 +26230,18 @@ "io.opentelemetry:opentelemetry-sdk:jar:sources", "io.perfmark:perfmark-api", "io.perfmark:perfmark-api:jar:sources", + "io.prometheus:prometheus-metrics-config", + "io.prometheus:prometheus-metrics-config:jar:sources", + "io.prometheus:prometheus-metrics-exporter-common", + "io.prometheus:prometheus-metrics-exporter-common:jar:sources", + "io.prometheus:prometheus-metrics-exporter-httpserver", + "io.prometheus:prometheus-metrics-exporter-httpserver:jar:sources", + "io.prometheus:prometheus-metrics-exposition-formats", + "io.prometheus:prometheus-metrics-exposition-formats:jar:sources", + "io.prometheus:prometheus-metrics-exposition-textformats", + "io.prometheus:prometheus-metrics-exposition-textformats:jar:sources", + "io.prometheus:prometheus-metrics-model", + "io.prometheus:prometheus-metrics-model:jar:sources", "io.swagger.core.v3:swagger-annotations", "io.swagger.core.v3:swagger-annotations:jar:sources", "io.temporal:temporal-sdk", @@ -26377,6 +26763,10 @@ "org.jetbrains.kotlin:kotlin-reflect", "org.jetbrains.kotlin:kotlin-reflect:jar:sources", "org.jetbrains.kotlin:kotlin-stdlib", + "org.jetbrains.kotlin:kotlin-stdlib-jdk7", + "org.jetbrains.kotlin:kotlin-stdlib-jdk7:jar:sources", + "org.jetbrains.kotlin:kotlin-stdlib-jdk8", + "org.jetbrains.kotlin:kotlin-stdlib-jdk8:jar:sources", "org.jetbrains.kotlin:kotlin-stdlib:jar:sources", "org.jetbrains:annotations", "org.jetbrains:annotations:jar:sources", @@ -27000,8 +27390,12 @@ "com.softwaremill.sttp.shared:ws_2.12:jar:sources", "com.softwaremill.sttp.shared:ws_2.13", "com.softwaremill.sttp.shared:ws_2.13:jar:sources", + "com.squareup.okhttp3:okhttp", + "com.squareup.okhttp3:okhttp:jar:sources", + "com.squareup.okio:okio", "com.squareup.okio:okio-jvm", "com.squareup.okio:okio-jvm:jar:sources", + "com.squareup.okio:okio:jar:sources", "com.squareup.wire:wire-runtime-jvm", "com.squareup.wire:wire-runtime-jvm:jar:sources", "com.squareup.wire:wire-schema-jvm", @@ -27162,6 +27556,8 @@ "io.micrometer:micrometer-core:jar:sources", "io.micrometer:micrometer-observation", "io.micrometer:micrometer-observation:jar:sources", + "io.micrometer:micrometer-registry-otlp", + "io.micrometer:micrometer-registry-otlp:jar:sources", "io.micrometer:micrometer-registry-statsd", "io.micrometer:micrometer-registry-statsd:jar:sources", "io.netty:netty-all", @@ -27266,6 +27662,8 @@ "io.openlineage:spark-extension-interfaces:jar:sources", "io.opentelemetry.contrib:opentelemetry-gcp-resources", "io.opentelemetry.contrib:opentelemetry-gcp-resources:jar:sources", + "io.opentelemetry.proto:opentelemetry-proto", + "io.opentelemetry.proto:opentelemetry-proto:jar:sources", "io.opentelemetry.semconv:opentelemetry-semconv", "io.opentelemetry.semconv:opentelemetry-semconv:jar:sources", "io.opentelemetry:opentelemetry-api", @@ -27274,11 +27672,23 @@ "io.opentelemetry:opentelemetry-api:jar:sources", "io.opentelemetry:opentelemetry-context", "io.opentelemetry:opentelemetry-context:jar:sources", + "io.opentelemetry:opentelemetry-exporter-common", + "io.opentelemetry:opentelemetry-exporter-common:jar:sources", + "io.opentelemetry:opentelemetry-exporter-otlp", + "io.opentelemetry:opentelemetry-exporter-otlp-common", + "io.opentelemetry:opentelemetry-exporter-otlp-common:jar:sources", + "io.opentelemetry:opentelemetry-exporter-otlp:jar:sources", + "io.opentelemetry:opentelemetry-exporter-prometheus", + "io.opentelemetry:opentelemetry-exporter-prometheus:jar:sources", + "io.opentelemetry:opentelemetry-exporter-sender-okhttp", + "io.opentelemetry:opentelemetry-exporter-sender-okhttp:jar:sources", "io.opentelemetry:opentelemetry-sdk", "io.opentelemetry:opentelemetry-sdk-common", "io.opentelemetry:opentelemetry-sdk-common:jar:sources", + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure", "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi", "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi:jar:sources", + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:jar:sources", "io.opentelemetry:opentelemetry-sdk-logs", "io.opentelemetry:opentelemetry-sdk-logs:jar:sources", "io.opentelemetry:opentelemetry-sdk-metrics", @@ -27288,6 +27698,18 @@ "io.opentelemetry:opentelemetry-sdk:jar:sources", "io.perfmark:perfmark-api", "io.perfmark:perfmark-api:jar:sources", + "io.prometheus:prometheus-metrics-config", + "io.prometheus:prometheus-metrics-config:jar:sources", + "io.prometheus:prometheus-metrics-exporter-common", + "io.prometheus:prometheus-metrics-exporter-common:jar:sources", + "io.prometheus:prometheus-metrics-exporter-httpserver", + "io.prometheus:prometheus-metrics-exporter-httpserver:jar:sources", + "io.prometheus:prometheus-metrics-exposition-formats", + "io.prometheus:prometheus-metrics-exposition-formats:jar:sources", + "io.prometheus:prometheus-metrics-exposition-textformats", + "io.prometheus:prometheus-metrics-exposition-textformats:jar:sources", + "io.prometheus:prometheus-metrics-model", + "io.prometheus:prometheus-metrics-model:jar:sources", "io.swagger.core.v3:swagger-annotations", "io.swagger.core.v3:swagger-annotations:jar:sources", "io.temporal:temporal-sdk", @@ -27809,6 +28231,10 @@ "org.jetbrains.kotlin:kotlin-reflect", "org.jetbrains.kotlin:kotlin-reflect:jar:sources", "org.jetbrains.kotlin:kotlin-stdlib", + "org.jetbrains.kotlin:kotlin-stdlib-jdk7", + "org.jetbrains.kotlin:kotlin-stdlib-jdk7:jar:sources", + "org.jetbrains.kotlin:kotlin-stdlib-jdk8", + "org.jetbrains.kotlin:kotlin-stdlib-jdk8:jar:sources", "org.jetbrains.kotlin:kotlin-stdlib:jar:sources", "org.jetbrains:annotations", "org.jetbrains:annotations:jar:sources", @@ -29827,6 +30253,80 @@ "io.opentelemetry.contrib.gcp.resource.GCPResourceProvider" ] }, + "io.opentelemetry:opentelemetry-exporter-otlp": { + "io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider": [ + "io.opentelemetry.exporter.otlp.internal.OtlpLogRecordExporterComponentProvider", + "io.opentelemetry.exporter.otlp.internal.OtlpMetricExporterComponentProvider", + "io.opentelemetry.exporter.otlp.internal.OtlpSpanExporterComponentProvider" + ], + "io.opentelemetry.sdk.autoconfigure.spi.logs.ConfigurableLogRecordExporterProvider": [ + "io.opentelemetry.exporter.otlp.internal.OtlpLogRecordExporterProvider" + ], + "io.opentelemetry.sdk.autoconfigure.spi.metrics.ConfigurableMetricExporterProvider": [ + "io.opentelemetry.exporter.otlp.internal.OtlpMetricExporterProvider" + ], + "io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider": [ + "io.opentelemetry.exporter.otlp.internal.OtlpSpanExporterProvider" + ] + }, + "io.opentelemetry:opentelemetry-exporter-otlp:jar:sources": { + "io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider": [ + "io.opentelemetry.exporter.otlp.internal.OtlpLogRecordExporterComponentProvider", + "io.opentelemetry.exporter.otlp.internal.OtlpMetricExporterComponentProvider", + "io.opentelemetry.exporter.otlp.internal.OtlpSpanExporterComponentProvider" + ], + "io.opentelemetry.sdk.autoconfigure.spi.logs.ConfigurableLogRecordExporterProvider": [ + "io.opentelemetry.exporter.otlp.internal.OtlpLogRecordExporterProvider" + ], + "io.opentelemetry.sdk.autoconfigure.spi.metrics.ConfigurableMetricExporterProvider": [ + "io.opentelemetry.exporter.otlp.internal.OtlpMetricExporterProvider" + ], + "io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider": [ + "io.opentelemetry.exporter.otlp.internal.OtlpSpanExporterProvider" + ] + }, + "io.opentelemetry:opentelemetry-exporter-prometheus": { + "io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider": [ + "io.opentelemetry.exporter.prometheus.internal.PrometheusComponentProvider" + ], + "io.opentelemetry.sdk.autoconfigure.spi.internal.ConfigurableMetricReaderProvider": [ + "io.opentelemetry.exporter.prometheus.internal.PrometheusMetricReaderProvider" + ] + }, + "io.opentelemetry:opentelemetry-exporter-prometheus:jar:sources": { + "io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider": [ + "io.opentelemetry.exporter.prometheus.internal.PrometheusComponentProvider" + ], + "io.opentelemetry.sdk.autoconfigure.spi.internal.ConfigurableMetricReaderProvider": [ + "io.opentelemetry.exporter.prometheus.internal.PrometheusMetricReaderProvider" + ] + }, + "io.opentelemetry:opentelemetry-exporter-sender-okhttp": { + "io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider": [ + "io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSenderProvider" + ], + "io.opentelemetry.exporter.internal.http.HttpSenderProvider": [ + "io.opentelemetry.exporter.sender.okhttp.internal.OkHttpHttpSenderProvider" + ] + }, + "io.opentelemetry:opentelemetry-exporter-sender-okhttp:jar:sources": { + "io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider": [ + "io.opentelemetry.exporter.sender.okhttp.internal.OkHttpGrpcSenderProvider" + ], + "io.opentelemetry.exporter.internal.http.HttpSenderProvider": [ + "io.opentelemetry.exporter.sender.okhttp.internal.OkHttpHttpSenderProvider" + ] + }, + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure": { + "io.opentelemetry.sdk.autoconfigure.spi.ResourceProvider": [ + "io.opentelemetry.sdk.autoconfigure.EnvironmentResourceProvider" + ] + }, + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:jar:sources": { + "io.opentelemetry.sdk.autoconfigure.spi.ResourceProvider": [ + "io.opentelemetry.sdk.autoconfigure.EnvironmentResourceProvider" + ] + }, "io.vertx:vertx-auth-common": { "io.vertx.ext.auth.HashingAlgorithm": [ "io.vertx.ext.auth.impl.hash.PBKDF2", diff --git a/online/BUILD.bazel b/online/BUILD.bazel index 9b1200d856..79cacffa43 100644 --- a/online/BUILD.bazel +++ b/online/BUILD.bazel @@ -1,3 +1,13 @@ +OTEL_DEPS = [ + maven_artifact("io.opentelemetry:opentelemetry-api"), + maven_artifact("io_opentelemetry:opentelemetry-context"), + maven_artifact("io_opentelemetry:opentelemetry-sdk-common"), + maven_artifact("io.opentelemetry:opentelemetry-sdk"), + maven_artifact("io.opentelemetry:opentelemetry-sdk-metrics"), + maven_artifact("io.opentelemetry:opentelemetry-exporter-otlp"), + maven_artifact("io.opentelemetry:opentelemetry-exporter-prometheus"), +] + scala_library( name = "metrics_lib", srcs = glob(["src/main/scala/ai/chronon/online/metrics/*.scala"]), @@ -6,10 +16,9 @@ scala_library( "//conditions:default": True, # Enable for other versions }), visibility = ["//visibility:public"], - deps = [ + deps = OTEL_DEPS + [ "//api:lib", "//api:thrift_java", - maven_artifact("com.datadoghq:java-dogstatsd-client"), maven_artifact("org.slf4j:slf4j-api"), maven_artifact("org.apache.logging.log4j:log4j-api"), maven_artifact("org.apache.logging.log4j:log4j-core"), @@ -41,7 +50,7 @@ scala_library( "//conditions:default": True, # Enable for other versions }), visibility = ["//visibility:public"], - deps = [ + deps = OTEL_DEPS + [ ":metrics_lib", "//aggregator:lib", "//api:lib", @@ -89,6 +98,7 @@ test_deps = _SCALA_TEST_DEPS + [ maven_artifact("net.bytebuddy:byte-buddy-agent"), maven_artifact("org.apache.hadoop:hadoop-common"), maven_artifact("org.apache.hadoop:hadoop-client-api"), + maven_artifact("io.opentelemetry:opentelemetry-api"), ] scala_library( diff --git a/online/src/main/java/ai/chronon/online/JavaFetcher.java b/online/src/main/java/ai/chronon/online/JavaFetcher.java index 4e3fe12ebf..43a9cfee95 100644 --- a/online/src/main/java/ai/chronon/online/JavaFetcher.java +++ b/online/src/main/java/ai/chronon/online/JavaFetcher.java @@ -199,10 +199,10 @@ private void instrument(List requestNames, boolean isGroupBy, String met } private Metrics.Context getJoinContext(String joinName) { - return new Metrics.Context("join.fetch", joinName, null, null, false, null, null, null, null); + return new Metrics.Context("join.fetch", joinName, null, null, false, null, null, null, null, null); } private Metrics.Context getGroupByContext(String groupByName) { - return new Metrics.Context("group_by.fetch", null, groupByName, null, false, null, null, null, null); + return new Metrics.Context("group_by.fetch", null, groupByName, null, false, null, null, null, null, null); } } diff --git a/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala b/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala index 25cbef2804..45d0faa3f9 100644 --- a/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala +++ b/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala @@ -27,10 +27,7 @@ import ai.chronon.online.fetcher.Fetcher.{JoinSchemaResponse, Request, Response, import ai.chronon.online.metrics.{Metrics, TTLCache} import ai.chronon.online.serde._ import com.google.gson.Gson -import com.timgroup.statsd.Event -import com.timgroup.statsd.Event.AlertType import org.apache.avro.generic.GenericRecord -import org.json4s.BuildInfo import org.slf4j.{Logger, LoggerFactory} import java.util.function.Consumer @@ -103,32 +100,22 @@ class Fetcher(val kvStore: KVStore, @transient implicit lazy val logger: Logger = LoggerFactory.getLogger(getClass) private val fetchContext: FetchContext = - FetchContext(kvStore, metaDataSet, timeoutMillis, debug, flagStore, disableErrorThrows, executionContextOverride) + FetchContext(kvStore, + metaDataSet, + timeoutMillis, + debug, + flagStore, + disableErrorThrows, + executionContextOverride) implicit private val executionContext: ExecutionContext = fetchContext.getOrCreateExecutionContext val metadataStore: MetadataStore = new MetadataStore(fetchContext) private val joinPartFetcher = new JoinPartFetcher(fetchContext, metadataStore) - private def reportCallerNameFetcherVersion(): Unit = { - val message = - s"CallerName: ${Option(callerName).getOrElse("N/A")}, FetcherVersion: ${BuildInfo.version}" - val ctx = Metrics.Context(Metrics.Environment.Fetcher) - val event = Event - .builder() - .withTitle("FetcherInitialization") - .withText(message) - .withAlertType(AlertType.INFO) - .build() - ctx.recordEvent("caller_name_fetcher_version", event) - } - lazy val joinCodecCache: TTLCache[String, Try[JoinCodec]] = metadataStore.buildJoinCodecCache( Some(logControlEvent) ) - // run during initialization - reportCallerNameFetcherVersion() - private[online] def withTs(responses: Future[Seq[Response]]): Future[FetcherResponseWithTs] = { responses.map { response => FetcherResponseWithTs(response, System.currentTimeMillis()) @@ -453,8 +440,10 @@ class Fetcher(val kvStore: KVStore, .toMap val context = - Metrics.Context(environment = Metrics.Environment.JoinFetching, - join = validRequests.iterator.map(_.name.sanitize).toSeq.distinct.mkString(",")) + Metrics.Context( + environment = Metrics.Environment.JoinFetching, + join = validRequests.iterator.map(_.name.sanitize).toSeq.distinct.mkString(",") + ) context.distribution("response.external_pre_processing.latency", System.currentTimeMillis() - startTime) context.count("response.external_invalid_joins.count", invalidCount) val responseFutures = @@ -508,7 +497,8 @@ class Fetcher(val kvStore: KVStore, def fetchJoinSchema(joinName: String): Try[JoinSchemaResponse] = { val startTime = System.currentTimeMillis() - val ctx = Metrics.Context(Metrics.Environment.JoinSchemaFetching, join = joinName) + val ctx = + Metrics.Context(Metrics.Environment.JoinSchemaFetching, join = joinName) val joinCodecTry = joinCodecCache(joinName) @@ -558,6 +548,8 @@ class Fetcher(val kvStore: KVStore, part: ExternalPart) { lazy val context: Metrics.Context = - Metrics.Context(Metrics.Environment.JoinFetching, join = joinRequest.name, groupBy = part.fullName) + Metrics.Context(Metrics.Environment.JoinFetching, + join = joinRequest.name, + groupBy = part.fullName) } } diff --git a/online/src/main/scala/ai/chronon/online/metrics/Metrics.scala b/online/src/main/scala/ai/chronon/online/metrics/Metrics.scala index e0b5f38fad..5eddd05fee 100644 --- a/online/src/main/scala/ai/chronon/online/metrics/Metrics.scala +++ b/online/src/main/scala/ai/chronon/online/metrics/Metrics.scala @@ -19,7 +19,9 @@ package ai.chronon.online.metrics import ai.chronon.api.Extensions._ import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ -import com.timgroup.statsd.{Event, NoOpStatsDClient, NonBlockingStatsDClientBuilder, StatsDClient} +import io.opentelemetry.api.OpenTelemetry + +import scala.collection.mutable object Metrics { object Environment extends Enumeration { @@ -51,6 +53,7 @@ object Metrics { val Production = "production" val Accuracy = "accuracy" val Team = "team" + val Dataset = "dataset" } object Name { @@ -86,8 +89,6 @@ object Metrics { object Context { - val sampleRate: Double = 0.1 - def apply(environment: Environment, join: Join): Context = { Context( environment = environment, @@ -126,28 +127,29 @@ object Metrics { ) } - // Host can also be a Unix socket like: unix:///opt/datadog-agent/run/dogstatsd.sock - // In the unix socket case port is configured to be 0 - val statsHost: String = System.getProperty("ai.chronon.metrics.host", "localhost") - val statsPort: Int = System.getProperty("ai.chronon.metrics.port", "8125").toInt - // Can disable stats collection for local / dev environments - val statsEnabled: Boolean = System.getProperty("ai.chronon.metrics.enabled", "true").toBoolean - val tagCache: TTLCache[Context, String] = new TTLCache[Context, String]( - { ctx => ctx.toTags.reverse.mkString(",") }, - { ctx => ctx }, - ttlMillis = 5 * 24 * 60 * 60 * 1000 // 5 days - ) - - private val statsClient: StatsDClient = { - if (statsEnabled) { - new NonBlockingStatsDClientBuilder().prefix("ai.zipline").hostname(statsHost).port(statsPort).build() - } else { - new NoOpStatsDClient() + private val client: MetricsReporter = { + // Can disable metrics collection for local / dev environments + val metricsEnabled: Boolean = System.getProperty(MetricsEnabled, "true").toBoolean + val reporter: String = System.getProperty(MetricsReporter, "otel") + + reporter.toLowerCase match { + case "otel" | "opentelemetry" => + if (metricsEnabled) { + val metricReader = OtelMetricsReporter.buildOtelMetricReader() + val openTelemetry = OtelMetricsReporter.buildOpenTelemetryClient(metricReader) + new OtelMetricsReporter(openTelemetry) + } else { + new OtelMetricsReporter(OpenTelemetry.noop()) + } + case _ => + throw new IllegalArgumentException(s"Unknown metrics reporter: $reporter. Only opentelemetry is supported.") } } - } + val MetricsEnabled = "ai.chronon.metrics.enabled" + val MetricsReporter = "ai.chronon.metrics.reporter" + case class Context(environment: Environment, join: String = null, groupBy: String = null, @@ -156,25 +158,12 @@ object Metrics { accuracy: Accuracy = null, team: String = null, joinPartPrefix: String = null, - suffix: String = null) + suffix: String = null, + dataset: String = null) extends Serializable { def withSuffix(suffixN: String): Context = copy(suffix = (Option(suffix) ++ Seq(suffixN)).mkString(".")) - // Tagging happens to be the most expensive part(~40%) of reporting stats. - // And reporting stats is about 30% of overall fetching latency. - // So we do array packing directly instead of regular string interpolation. - // This simply creates "key:value" - // The optimization shaves about 2ms of 6ms of e2e overhead for 500 batch size. - def buildTag(key: String, value: String): String = { - val charBuf = new Array[Char](key.length + value.length + 1) - key.getChars(0, key.length, charBuf, 0) - value.getChars(0, value.length, charBuf, key.length + 1) - charBuf.update(key.length, ':') - new String(charBuf) - } - - private lazy val tags = Metrics.Context.tagCache(this) private val prefixString = environment + Option(suffix).map("." + _).getOrElse("") private def prefix(s: String): String = @@ -184,11 +173,39 @@ object Metrics { .append(s) .toString - @transient private lazy val stats: StatsDClient = Metrics.Context.statsClient + def toTags: Map[String, String] = { + val joinNames: Array[String] = Option(join).map(_.split(",")).getOrElse(Array.empty[String]).map(_.sanitize) + assert( + environment != null, + "Environment needs to be set - group_by.upload, group_by.streaming, join.fetching, group_by.fetching, group_by.offline etc") + val buffer = mutable.Map[String, String]() + + def addTag(key: String, value: String): Unit = { + if (value == null) return + buffer += key -> value + } + + joinNames.foreach(addTag(Tag.Join, _)) + + val groupByName = Option(groupBy).map(_.sanitize) + groupByName.foreach(addTag(Tag.GroupBy, _)) + + addTag(Tag.StagingQuery, stagingQuery) + addTag(Tag.Production, production.toString) + addTag(Tag.Team, team) + addTag(Tag.Environment, environment) + addTag(Tag.JoinPartPrefix, joinPartPrefix) + addTag(Tag.Accuracy, if (accuracy != null) accuracy.name() else null) + addTag(Tag.Dataset, dataset) + buffer.toMap + } + + implicit val context: Context = this - def increment(metric: String): Unit = stats.increment(prefix(metric), tags) + def increment(metric: String): Unit = Context.client.count(prefix(metric), 1, Map.empty) - def increment(metric: String, tag: String): Unit = stats.increment(prefix(metric), s"$tags,$tag") + def increment(metric: String, additionalTags: Map[String, String]): Unit = + Context.client.count(prefix(metric), 1, additionalTags) def incrementException(exception: Throwable)(implicit logger: org.slf4j.Logger): Unit = { val stackTrace = exception.getStackTrace @@ -202,50 +219,19 @@ object Metrics { s"[$method@$file:$line]${exception.getClass.toString}" } logger.error(s"Exception Message: ${exception.traceString}") - stats.increment(prefix(Name.Exception), s"$tags,${Metrics.Name.Exception}:${exceptionSignature}") + Context.client.count(prefix(Name.Exception), 1, Map(Metrics.Name.Exception -> exceptionSignature)) } def distribution(metric: String, value: Long): Unit = - stats.distribution(prefix(metric), value, Context.sampleRate, tags) - - def distribution(metric: String, value: Long, tag: String): Unit = - stats.distribution(prefix(metric), value, Context.sampleRate, s"$tags,$tag") - - def count(metric: String, value: Long): Unit = stats.count(prefix(metric), value, tags) - - def gauge(metric: String, value: Long): Unit = stats.gauge(prefix(metric), value, tags) - - def gauge(metric: String, value: Double): Unit = stats.gauge(prefix(metric), value, tags) - - def recordEvent(metric: String, event: Event): Unit = stats.recordEvent(event, prefix(metric), tags) - - def toTags: Array[String] = { - val joinNames: Array[String] = Option(join).map(_.split(",")).getOrElse(Array.empty[String]).map(_.sanitize) - assert( - environment != null, - "Environment needs to be set - group_by.upload, group_by.streaming, join.fetching, group_by.fetching, group_by.offline etc") - val buffer = new Array[String](7 + joinNames.length) - var counter = 0 + Context.client.distribution(prefix(metric), value, Map.empty) - def addTag(key: String, value: String): Unit = { - if (value == null) return - assert(counter < buffer.length, "array overflow") - buffer.update(counter, buildTag(key, value)) - counter += 1 - } + def distribution(metric: String, value: Long, additionalTags: Map[String, String]): Unit = + Context.client.distribution(prefix(metric), value, additionalTags) - joinNames.foreach(addTag(Tag.Join, _)) + def count(metric: String, value: Long): Unit = Context.client.count(prefix(metric), value) - val groupByName = Option(groupBy).map(_.sanitize) - groupByName.foreach(addTag(Tag.GroupBy, _)) + def gauge(metric: String, value: Long): Unit = Context.client.longGauge(prefix(metric), value) - addTag(Tag.StagingQuery, stagingQuery) - addTag(Tag.Production, production.toString) - addTag(Tag.Team, team) - addTag(Tag.Environment, environment) - addTag(Tag.JoinPartPrefix, joinPartPrefix) - addTag(Tag.Accuracy, if (accuracy != null) accuracy.name() else null) - buffer - } + def gauge(metric: String, value: Double): Unit = Context.client.doubleGauge(prefix(metric), value) } } diff --git a/online/src/main/scala/ai/chronon/online/metrics/MetricsReporter.scala b/online/src/main/scala/ai/chronon/online/metrics/MetricsReporter.scala new file mode 100644 index 0000000000..99b94424a3 --- /dev/null +++ b/online/src/main/scala/ai/chronon/online/metrics/MetricsReporter.scala @@ -0,0 +1,17 @@ +package ai.chronon.online.metrics + +import ai.chronon.online.metrics.Metrics.Context + +/** Generic interface for reporting metrics. Specific implementations of this cater to different metrics systems + * (e.g., StatsD, OpenTelemetry). + */ +trait MetricsReporter extends Serializable { + + def count(metric: String, value: Long, tags: Map[String, String] = Map.empty)(implicit context: Context): Unit + + def longGauge(metric: String, value: Long, tags: Map[String, String] = Map.empty)(implicit context: Context): Unit + + def doubleGauge(metric: String, value: Double, tags: Map[String, String] = Map.empty)(implicit context: Context): Unit + + def distribution(metric: String, value: Long, tags: Map[String, String] = Map.empty)(implicit context: Context): Unit +} diff --git a/online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala b/online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala new file mode 100644 index 0000000000..19888b8d3e --- /dev/null +++ b/online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala @@ -0,0 +1,149 @@ +package ai.chronon.online.metrics + +import ai.chronon.online.metrics.Metrics.Context +import io.opentelemetry.api.OpenTelemetry +import io.opentelemetry.api.common.{AttributeKey, Attributes} +import io.opentelemetry.api.metrics.{DoubleGauge, LongCounter, LongGauge, LongHistogram, Meter} +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator +import io.opentelemetry.context.propagation.ContextPropagators +import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter +import io.opentelemetry.exporter.prometheus.PrometheusHttpServer +import io.opentelemetry.sdk.OpenTelemetrySdk +import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.export.{MetricReader, PeriodicMetricReader} +import io.opentelemetry.sdk.resources.Resource + +import java.time.Duration +import scala.collection.concurrent.TrieMap + +class OtelMetricsReporter(openTelemetry: OpenTelemetry) extends MetricsReporter { + + private val meter: Meter = openTelemetry.getMeterProvider + .meterBuilder("ai.chronon") + .setInstrumentationVersion("0.0.0") + .build() + + val tagCache: TTLCache[Context, Attributes] = new TTLCache[Context, Attributes]( + { ctx => + val tagMap = ctx.toTags + buildAttributes(tagMap) + }, + { ctx => ctx }, + ttlMillis = 5 * 24 * 60 * 60 * 1000 // 5 days + ) + + private val counters = new TrieMap[String, LongCounter]() + private val longGauges = new TrieMap[String, LongGauge]() + private val doubleGauges = new TrieMap[String, DoubleGauge]() + private val histograms = new TrieMap[String, LongHistogram]() + + private def buildAttributes(tags: Map[String, String]): Attributes = { + val builder = Attributes.builder() + tags.foreach { case (k, v) => builder.put(k, v) } + builder.build() + } + + private def mergeAttributes(attributes: Attributes, tags: Map[String, String]): Attributes = { + val builder = attributes.toBuilder + tags.foreach { case (k, v) => builder.put(k, v) } + builder.build() + } + + override def count(metric: String, value: Long, tags: Map[String, String] = Map.empty)(implicit + context: Context): Unit = { + val counter = counters.getOrElseUpdate(metric, meter.counterBuilder(metric).build()) + val mergedAttributes = mergeAttributes(tagCache(context), tags) + counter.add(value, mergedAttributes) + } + + override def longGauge(metric: String, value: Long, tags: Map[String, String] = Map.empty)(implicit + context: Context): Unit = { + val gauge = longGauges.getOrElseUpdate(metric, meter.gaugeBuilder(metric).ofLongs().build()) + val mergedAttributes = mergeAttributes(tagCache(context), tags) + gauge.set(value, mergedAttributes) + } + + override def doubleGauge(metric: String, value: Double, tags: Map[String, String] = Map.empty)(implicit + context: Context): Unit = { + val gauge = doubleGauges.getOrElseUpdate(metric, meter.gaugeBuilder(metric).build()) + val mergedAttributes = mergeAttributes(tagCache(context), tags) + gauge.set(value, mergedAttributes) + } + + override def distribution(metric: String, value: Long, tags: Map[String, String] = Map.empty)(implicit + context: Context): Unit = { + val histogram = histograms.getOrElseUpdate(metric, meter.histogramBuilder(metric).ofLongs().build()) + val mergedAttributes = mergeAttributes(tagCache(context), tags) + histogram.record(value, mergedAttributes) + } +} + +object OtelMetricsReporter { + + val DefaultServiceName = "ai.chronon" + val MetricsReader = "ai.chronon.metrics.reader" + val MetricsExporterUrlKey = "ai.chronon.metrics.exporter.url" + val MetricsExporterPrometheusPortKey = "ai.chronon.metrics.exporter.port" + val MetricsExporterResourceKey = "ai.chronon.metrics.exporter.resources" + + val MetricsReaderDefault = "http" + val MetricsReaderPrometheus = "prometheus" + val MetricsExporterUrlDefault = "http://localhost:4318" + val MetricsExporterInterval = "PT15s" + val MetricsExporterPrometheusPortDefault = "8905" + + def getExporterUrl: String = { + System.getProperty(MetricsExporterUrlKey, MetricsExporterUrlDefault) + } + + def buildOtelMetricReader(): MetricReader = { + val metricReader = System.getProperty(MetricsReader, MetricsReaderDefault) + metricReader.toLowerCase match { + case MetricsReaderDefault => + val exporterUrl = getExporterUrl + "/v1/metrics" + + val metricExporter = OtlpHttpMetricExporter.builder.setEndpoint(exporterUrl).build + // Configure periodic metric reader// Configure periodic metric reader + PeriodicMetricReader.builder(metricExporter).setInterval(Duration.parse(MetricsExporterInterval)).build + case MetricsReaderPrometheus => + val prometheusPort = System.getProperty(MetricsExporterPrometheusPortKey, MetricsExporterPrometheusPortDefault).toInt + PrometheusHttpServer.builder + .setPort(prometheusPort) + .build + case _ => + throw new IllegalArgumentException(s"Unknown metrics reader (only http / prometheus supported): $metricReader") + } + } + + def buildOpenTelemetryClient(metricReader: MetricReader): OpenTelemetry = { + // Create resource with service information + val configuredResourceKVPairs = System.getProperty(MetricsExporterResourceKey, "") + .split(",") + .map(_.split("=")) + .filter(_.length == 2) + .map { case Array(k, v) => k.trim -> v.trim } + .toMap + + val builder = Attributes.builder() + configuredResourceKVPairs.map { case (k, v) => + val key = AttributeKey.stringKey(k) + builder.put(key, v) + } + + builder.put(AttributeKey.stringKey("service.name"), DefaultServiceName) + val resource = Resource.getDefault.merge(Resource.create(builder.build())) + + val meterProvider = SdkMeterProvider + .builder + .setResource(resource) + .registerMetricReader(metricReader) + .build + + // Build the OpenTelemetry object with only meter provider + OpenTelemetrySdk + .builder + .setMeterProvider(meterProvider) + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance)) + .build + } +} \ No newline at end of file diff --git a/online/src/test/scala/ai/chronon/online/test/TagsTest.scala b/online/src/test/scala/ai/chronon/online/test/TagsTest.scala index e12ae51c1b..28bf4e67b1 100644 --- a/online/src/test/scala/ai/chronon/online/test/TagsTest.scala +++ b/online/src/test/scala/ai/chronon/online/test/TagsTest.scala @@ -17,40 +17,17 @@ package ai.chronon.online.test import ai.chronon.api.Builders -import ai.chronon.online.metrics.Metrics.Context import ai.chronon.online.metrics.Metrics.Environment -import ai.chronon.online.metrics.TTLCache -import ai.chronon.online.metrics.Metrics +import ai.chronon.online.metrics.{Metrics, OtelMetricsReporter} +import io.opentelemetry.api.OpenTelemetry import org.junit.Assert.assertEquals import org.scalatest.flatspec.AnyFlatSpec class TagsTest extends AnyFlatSpec { // test that ttlCache of context is creates non duplicated entries - // copied from the private NonBlockingStatsDClient.tagString - def tagString(tags: Array[String], tagPrefix: String): String = { - var sb: StringBuilder = null - if (tagPrefix != null) { - if ((tags == null) || (tags.length == 0)) return tagPrefix - sb = new StringBuilder(tagPrefix) - sb.append(",") - } else { - if ((tags == null) || (tags.length == 0)) return "" - sb = new StringBuilder("|#") - } - for (n <- tags.length - 1 to 0 by -1) { - sb.append(tags(n)) - if (n > 0) sb.append(",") - } - sb.toString - } - it should "cached tags are computed tags" in { - val cache = new TTLCache[Metrics.Context, String]( - { ctx => ctx.toTags.mkString(",") }, - { ctx => ctx }, - ttlMillis = 5 * 24 * 60 * 60 * 1000 // 5 days - ) + val otelMetricsClient = new OtelMetricsReporter(OpenTelemetry.noop()) val context = Metrics.Context( Environment.JoinOffline, Builders.Join( @@ -70,19 +47,19 @@ class TagsTest extends AnyFlatSpec { ) ) ) :: Nil - ) + ), ) val copyFake = context.copy(join = "something else") val copyCorrect = copyFake.copy(join = context.join) - // add three entires to cache - two distinct contexts and one copy of the first - cache(context) - cache(copyCorrect) - cache(copyFake) - assertEquals(cache.cMap.size(), 2) + // add three entries to cache - two distinct contexts and one copy of the first + otelMetricsClient.tagCache(context) + otelMetricsClient.tagCache(copyCorrect) + otelMetricsClient.tagCache(copyFake) + assertEquals(otelMetricsClient.tagCache.cMap.size(), 2) - val slowTags = tagString(context.toTags, null) - val fastTags = tagString(Array(Context.tagCache(copyCorrect)), null) + val slowTags = otelMetricsClient.tagCache(context) + val fastTags = otelMetricsClient.tagCache(copyCorrect) assertEquals(slowTags, fastTags) } diff --git a/service_commons/BUILD.bazel b/service_commons/BUILD.bazel index 7879142a94..7d5c6fb8b1 100644 --- a/service_commons/BUILD.bazel +++ b/service_commons/BUILD.bazel @@ -9,7 +9,7 @@ java_library( maven_artifact("ch.qos.logback:logback-classic"), maven_artifact("com.typesafe:config"), maven_artifact("io.netty:netty-all"), - maven_artifact("io.micrometer:micrometer-registry-statsd"), + maven_artifact("io.micrometer:micrometer-registry-otlp"), maven_artifact("io.micrometer:micrometer-core"), maven_artifact("com.fasterxml.jackson.core:jackson-databind"), maven_artifact("org.slf4j:slf4j-api"), diff --git a/service_commons/src/main/java/ai/chronon/service/ChrononServiceLauncher.java b/service_commons/src/main/java/ai/chronon/service/ChrononServiceLauncher.java index fe898e145b..5c39d30e68 100644 --- a/service_commons/src/main/java/ai/chronon/service/ChrononServiceLauncher.java +++ b/service_commons/src/main/java/ai/chronon/service/ChrononServiceLauncher.java @@ -1,23 +1,22 @@ package ai.chronon.service; import ai.chronon.online.metrics.Metrics; +import ai.chronon.online.metrics.OtelMetricsReporter; import io.micrometer.core.instrument.Clock; import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.statsd.StatsdConfig; -import io.micrometer.statsd.StatsdMeterRegistry; +import io.micrometer.registry.otlp.OtlpConfig; +import io.micrometer.registry.otlp.OtlpMeterRegistry; import io.vertx.core.Launcher; import io.vertx.core.VertxOptions; import io.vertx.micrometer.Label; import io.vertx.micrometer.MicrometerMetricsFactory; import io.vertx.micrometer.MicrometerMetricsOptions; - -import java.util.HashMap; -import java.util.Map; +import java.util.Optional; /** * Custom launcher to help configure the Chronon vertx feature service - * to handle things like setting up a statsd metrics registry. - * We use statsd here to be consistent with the rest of our project (e.g. fetcher code). + * to handle things like setting up a otel metrics registry. + * We use otel here to be consistent with the rest of our project (e.g. fetcher code). * This allows us to send Vertx webservice metrics along with fetcher related metrics to allow users * to debug performance issues and set alerts etc. */ @@ -25,27 +24,37 @@ public class ChrononServiceLauncher extends Launcher { @Override public void beforeStartingVertx(VertxOptions options) { + boolean enableMetrics = Optional.ofNullable(System.getProperty(Metrics.MetricsEnabled())) + .map(Boolean::parseBoolean) + .orElse(false); - StatsdConfig config = new StatsdConfig() { - private final String statsdHost = Metrics.Context$.MODULE$.statsHost(); - private final String statsdPort = String.valueOf(Metrics.Context$.MODULE$.statsPort()); + if (enableMetrics) { + initializeMetrics(options); + } + } - final Map statsProps = new HashMap<>(Map.of( - prefix() + "." + "port", statsdPort, - prefix() + "." + "host", statsdHost, - prefix() + "." + "protocol", Integer.parseInt(statsdPort) == 0 ? "UDS_DATAGRAM" : "UDP" - )); + private void initializeMetrics(VertxOptions options) { + String serviceName = "ai.chronon"; + String exporterUrl = OtelMetricsReporter.getExporterUrl() + "/v1/metrics"; + String exportInterval = OtelMetricsReporter.MetricsExporterInterval(); - @Override - public String get(String key) { - return statsProps.get(key); + // Configure OTLP using Micrometer's built-in registry + OtlpConfig otlpConfig = key -> { + switch (key) { + case "otlp.url": + return exporterUrl; + case "otlp.step": + return exportInterval; + case "otlp.resourceAttributes": + return "service.name=" + serviceName; + default: + return null; } }; - MeterRegistry registry = new StatsdMeterRegistry(config, Clock.SYSTEM); + MeterRegistry registry = new OtlpMeterRegistry(otlpConfig, Clock.SYSTEM); MicrometerMetricsFactory metricsFactory = new MicrometerMetricsFactory(registry); - // Configure metrics via statsd MicrometerMetricsOptions metricsOptions = new MicrometerMetricsOptions() .setEnabled(true) .setJvmMetricsEnabled(true) diff --git a/tools/build_rules/dependencies/maven_repository.bzl b/tools/build_rules/dependencies/maven_repository.bzl index dabd7ea084..531cca29fe 100644 --- a/tools/build_rules/dependencies/maven_repository.bzl +++ b/tools/build_rules/dependencies/maven_repository.bzl @@ -70,6 +70,7 @@ maven_repository = repository( "ch.qos.logback:logback-classic:1.5.6", "com.typesafe:config:1.4.3", "io.micrometer:micrometer-registry-statsd:1.13.6", + "io.micrometer:micrometer-registry-otlp:1.13.6", "net.sf.py4j:py4j:0.10.9.9", "org.apache.commons:commons-lang3:3.12.0", "org.apache.commons:commons-math3:3.6.1", @@ -221,6 +222,14 @@ maven_repository = repository( # Temporal "io.temporal:temporal-sdk:1.28.0", "io.temporal:temporal-testing:1.28.0", + + # OpenTelemetry + "io.opentelemetry:opentelemetry-api:1.49.0", + "io.opentelemetry:opentelemetry-sdk:1.49.0", + "io.opentelemetry:opentelemetry-sdk-metrics:1.49.0", + "io.opentelemetry:opentelemetry-exporter-otlp:1.49.0", + "io.opentelemetry:opentelemetry-exporter-prometheus:1.49.0-alpha", + "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:1.49.0", ], excluded_artifacts = [ "org.apache.beam:beam-sdks-java-io-hadoop-common", From d06a426f3be968fc888948d4fac5d004b2f80f89 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Fri, 25 Apr 2025 09:26:55 -0400 Subject: [PATCH 02/11] Add instrumented threadpool --- .../metrics/FlexibleExecutionContext.scala | 15 ++-- .../InstrumentedThreadPoolExecutor.scala | 90 +++++++++++++++++++ 2 files changed, 98 insertions(+), 7 deletions(-) create mode 100644 online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala diff --git a/online/src/main/scala/ai/chronon/online/metrics/FlexibleExecutionContext.scala b/online/src/main/scala/ai/chronon/online/metrics/FlexibleExecutionContext.scala index 10295a758a..c5ce9a2bca 100644 --- a/online/src/main/scala/ai/chronon/online/metrics/FlexibleExecutionContext.scala +++ b/online/src/main/scala/ai/chronon/online/metrics/FlexibleExecutionContext.scala @@ -35,13 +35,14 @@ object FlexibleExecutionContext { } lazy val buildExecutor: ThreadPoolExecutor = { - val cores = Runtime.getRuntime.availableProcessors(); - new ThreadPoolExecutor(cores, // corePoolSize - cores * 4, // maxPoolSize - 600, // keepAliveTime - TimeUnit.SECONDS, // keep alive time units - new ArrayBlockingQueue[Runnable](1000), - threadFactory) + val cores = Runtime.getRuntime.availableProcessors() + new InstrumentedThreadPoolExecutor( + cores, // corePoolSize + cores * 4, // maxPoolSize + 600, // keepAliveTime + TimeUnit.SECONDS, // keep alive time units + new ArrayBlockingQueue[Runnable](10000), + threadFactory) } def buildExecutionContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(buildExecutor) diff --git a/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala b/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala new file mode 100644 index 0000000000..706d4ae06b --- /dev/null +++ b/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala @@ -0,0 +1,90 @@ +package ai.chronon.online.metrics + +import java.util.concurrent.{BlockingQueue, Executors, ScheduledExecutorService, ThreadFactory, ThreadPoolExecutor, TimeUnit} + +class InstrumentedThreadPoolExecutor(corePoolSize: Int, + maximumPoolSize: Int, + keepAliveTime: Long, + unit: TimeUnit, + workQueue: BlockingQueue[Runnable], + threadFactory: ThreadFactory, + metricsIntervalSeconds: Int = 15) + extends ThreadPoolExecutor( + corePoolSize, + maximumPoolSize, + keepAliveTime, + unit, + workQueue, + threadFactory + ) { + // Reporter for periodic metrics + private val metricsReporter: ScheduledExecutorService = buildMetricsScheduledExecutor() + + protected val metricsContext: Metrics.Context = Metrics.Context(Metrics.Environment.Fetcher).withSuffix("threadpool") + + // Schedule periodic metrics collection to capture sizes of the queue and the pool + private def buildMetricsScheduledExecutor(): ScheduledExecutorService = { + val reporter = Executors.newSingleThreadScheduledExecutor(r => { + val thread = new Thread(r) + thread.setDaemon(true) + thread.setName(s"metrics-reporter") + thread + }) + + reporter.scheduleAtFixedRate(() => { + // Report queue size + metricsContext.gauge("queue_size", getQueue.size()) + + // Report pool sizes directly from the executor + metricsContext.gauge("active_threads", getActiveCount) + metricsContext.gauge("pool_size", getPoolSize) + metricsContext.gauge("core_pool_size", getCorePoolSize) + metricsContext.gauge("maximum_pool_size", getMaximumPoolSize) + metricsContext.gauge("largest_pool_size", getLargestPoolSize) + + // Task counts from executor + metricsContext.gauge("completed_task_count", getCompletedTaskCount) + metricsContext.gauge("task_count", getTaskCount) + }, 0, metricsIntervalSeconds, TimeUnit.SECONDS) + + reporter + } + + // Wrapper on the Executor's execute method to capture metrics on task wait and execution times + override def execute(command: Runnable): Unit = { + val submitTime = System.currentTimeMillis() + + val instrumentedTask = new Runnable { + override def run(): Unit = { + val startTime = System.currentTimeMillis() + val waitTime = startTime - submitTime + + // Record wait time + metricsContext.distribution("wait_time_ms", waitTime) + + command.run() + val endTime = System.currentTimeMillis() + val execTime = endTime - startTime + val totalTime = endTime - submitTime + + // Record timing metrics + metricsContext.distribution("execution_time_ms", execTime) + metricsContext.distribution("total_time_ms", totalTime) + } + } + + super.execute(instrumentedTask) + } + + // Clean up resources on shutdown + override def shutdown(): Unit = { + metricsReporter.shutdown() + super.shutdown() + } + + override def shutdownNow(): java.util.List[Runnable] = { + metricsReporter.shutdownNow() + super.shutdownNow() + } + +} From 898dc0f25780629ff68fc1cb8296f24a1a4b70fb Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Fri, 25 Apr 2025 09:28:37 -0400 Subject: [PATCH 03/11] style: Apply scalafix and scalafmt changes --- .../ai/chronon/online/fetcher/Fetcher.scala | 12 +--- .../metrics/FlexibleExecutionContext.scala | 13 ++-- .../InstrumentedThreadPoolExecutor.scala | 60 +++++++++++-------- .../online/metrics/OtelMetricsReporter.scala | 28 ++++----- .../ai/chronon/online/test/TagsTest.scala | 2 +- 5 files changed, 59 insertions(+), 56 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala b/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala index 45d0faa3f9..cb4bdc968d 100644 --- a/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala +++ b/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala @@ -100,13 +100,7 @@ class Fetcher(val kvStore: KVStore, @transient implicit lazy val logger: Logger = LoggerFactory.getLogger(getClass) private val fetchContext: FetchContext = - FetchContext(kvStore, - metaDataSet, - timeoutMillis, - debug, - flagStore, - disableErrorThrows, - executionContextOverride) + FetchContext(kvStore, metaDataSet, timeoutMillis, debug, flagStore, disableErrorThrows, executionContextOverride) implicit private val executionContext: ExecutionContext = fetchContext.getOrCreateExecutionContext val metadataStore: MetadataStore = new MetadataStore(fetchContext) @@ -548,8 +542,6 @@ class Fetcher(val kvStore: KVStore, part: ExternalPart) { lazy val context: Metrics.Context = - Metrics.Context(Metrics.Environment.JoinFetching, - join = joinRequest.name, - groupBy = part.fullName) + Metrics.Context(Metrics.Environment.JoinFetching, join = joinRequest.name, groupBy = part.fullName) } } diff --git a/online/src/main/scala/ai/chronon/online/metrics/FlexibleExecutionContext.scala b/online/src/main/scala/ai/chronon/online/metrics/FlexibleExecutionContext.scala index c5ce9a2bca..3521034ed7 100644 --- a/online/src/main/scala/ai/chronon/online/metrics/FlexibleExecutionContext.scala +++ b/online/src/main/scala/ai/chronon/online/metrics/FlexibleExecutionContext.scala @@ -36,13 +36,12 @@ object FlexibleExecutionContext { lazy val buildExecutor: ThreadPoolExecutor = { val cores = Runtime.getRuntime.availableProcessors() - new InstrumentedThreadPoolExecutor( - cores, // corePoolSize - cores * 4, // maxPoolSize - 600, // keepAliveTime - TimeUnit.SECONDS, // keep alive time units - new ArrayBlockingQueue[Runnable](10000), - threadFactory) + new InstrumentedThreadPoolExecutor(cores, // corePoolSize + cores * 4, // maxPoolSize + 600, // keepAliveTime + TimeUnit.SECONDS, // keep alive time units + new ArrayBlockingQueue[Runnable](10000), + threadFactory) } def buildExecutionContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(buildExecutor) diff --git a/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala b/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala index 706d4ae06b..c64e4c3ad9 100644 --- a/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala +++ b/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala @@ -1,6 +1,13 @@ package ai.chronon.online.metrics -import java.util.concurrent.{BlockingQueue, Executors, ScheduledExecutorService, ThreadFactory, ThreadPoolExecutor, TimeUnit} +import java.util.concurrent.{ + BlockingQueue, + Executors, + ScheduledExecutorService, + ThreadFactory, + ThreadPoolExecutor, + TimeUnit +} class InstrumentedThreadPoolExecutor(corePoolSize: Int, maximumPoolSize: Int, @@ -9,14 +16,14 @@ class InstrumentedThreadPoolExecutor(corePoolSize: Int, workQueue: BlockingQueue[Runnable], threadFactory: ThreadFactory, metricsIntervalSeconds: Int = 15) - extends ThreadPoolExecutor( - corePoolSize, - maximumPoolSize, - keepAliveTime, - unit, - workQueue, - threadFactory - ) { + extends ThreadPoolExecutor( + corePoolSize, + maximumPoolSize, + keepAliveTime, + unit, + workQueue, + threadFactory + ) { // Reporter for periodic metrics private val metricsReporter: ScheduledExecutorService = buildMetricsScheduledExecutor() @@ -31,21 +38,26 @@ class InstrumentedThreadPoolExecutor(corePoolSize: Int, thread }) - reporter.scheduleAtFixedRate(() => { - // Report queue size - metricsContext.gauge("queue_size", getQueue.size()) - - // Report pool sizes directly from the executor - metricsContext.gauge("active_threads", getActiveCount) - metricsContext.gauge("pool_size", getPoolSize) - metricsContext.gauge("core_pool_size", getCorePoolSize) - metricsContext.gauge("maximum_pool_size", getMaximumPoolSize) - metricsContext.gauge("largest_pool_size", getLargestPoolSize) - - // Task counts from executor - metricsContext.gauge("completed_task_count", getCompletedTaskCount) - metricsContext.gauge("task_count", getTaskCount) - }, 0, metricsIntervalSeconds, TimeUnit.SECONDS) + reporter.scheduleAtFixedRate( + () => { + // Report queue size + metricsContext.gauge("queue_size", getQueue.size()) + + // Report pool sizes directly from the executor + metricsContext.gauge("active_threads", getActiveCount) + metricsContext.gauge("pool_size", getPoolSize) + metricsContext.gauge("core_pool_size", getCorePoolSize) + metricsContext.gauge("maximum_pool_size", getMaximumPoolSize) + metricsContext.gauge("largest_pool_size", getLargestPoolSize) + + // Task counts from executor + metricsContext.gauge("completed_task_count", getCompletedTaskCount) + metricsContext.gauge("task_count", getTaskCount) + }, + 0, + metricsIntervalSeconds, + TimeUnit.SECONDS + ) reporter } diff --git a/online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala b/online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala index 19888b8d3e..39c2248c3a 100644 --- a/online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala +++ b/online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala @@ -106,7 +106,8 @@ object OtelMetricsReporter { // Configure periodic metric reader// Configure periodic metric reader PeriodicMetricReader.builder(metricExporter).setInterval(Duration.parse(MetricsExporterInterval)).build case MetricsReaderPrometheus => - val prometheusPort = System.getProperty(MetricsExporterPrometheusPortKey, MetricsExporterPrometheusPortDefault).toInt + val prometheusPort = + System.getProperty(MetricsExporterPrometheusPortKey, MetricsExporterPrometheusPortDefault).toInt PrometheusHttpServer.builder .setPort(prometheusPort) .build @@ -117,33 +118,32 @@ object OtelMetricsReporter { def buildOpenTelemetryClient(metricReader: MetricReader): OpenTelemetry = { // Create resource with service information - val configuredResourceKVPairs = System.getProperty(MetricsExporterResourceKey, "") - .split(",") - .map(_.split("=")) - .filter(_.length == 2) - .map { case Array(k, v) => k.trim -> v.trim } - .toMap + val configuredResourceKVPairs = System + .getProperty(MetricsExporterResourceKey, "") + .split(",") + .map(_.split("=")) + .filter(_.length == 2) + .map { case Array(k, v) => k.trim -> v.trim } + .toMap val builder = Attributes.builder() configuredResourceKVPairs.map { case (k, v) => - val key = AttributeKey.stringKey(k) - builder.put(key, v) + val key = AttributeKey.stringKey(k) + builder.put(key, v) } builder.put(AttributeKey.stringKey("service.name"), DefaultServiceName) val resource = Resource.getDefault.merge(Resource.create(builder.build())) - val meterProvider = SdkMeterProvider - .builder + val meterProvider = SdkMeterProvider.builder .setResource(resource) .registerMetricReader(metricReader) .build // Build the OpenTelemetry object with only meter provider - OpenTelemetrySdk - .builder + OpenTelemetrySdk.builder .setMeterProvider(meterProvider) .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance)) .build } -} \ No newline at end of file +} diff --git a/online/src/test/scala/ai/chronon/online/test/TagsTest.scala b/online/src/test/scala/ai/chronon/online/test/TagsTest.scala index 28bf4e67b1..de44ae087a 100644 --- a/online/src/test/scala/ai/chronon/online/test/TagsTest.scala +++ b/online/src/test/scala/ai/chronon/online/test/TagsTest.scala @@ -47,7 +47,7 @@ class TagsTest extends AnyFlatSpec { ) ) ) :: Nil - ), + ) ) val copyFake = context.copy(join = "something else") val copyCorrect = copyFake.copy(join = context.join) From 75dd32af7288bb1957f4247506ac2849d28ee8ed Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Mon, 28 Apr 2025 08:50:59 -0700 Subject: [PATCH 04/11] Clean up import --- .../ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala index e697208ef0..3c116ebb04 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala @@ -13,7 +13,7 @@ import ai.chronon.online.KVStore import ai.chronon.online.KVStore.ListRequest import ai.chronon.online.KVStore.ListResponse import ai.chronon.online.KVStore.ListValue -import ai.chronon.online.metrics.{Metrics, MetricsReporter} +import ai.chronon.online.metrics.Metrics import com.google.api.core.{ApiFuture, ApiFutures} import com.google.cloud.RetryOption import com.google.cloud.bigquery.BigQuery From ce77a98575dd9aed8741101fbe9ba7cd857e0ed0 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Wed, 30 Apr 2025 17:28:05 -0700 Subject: [PATCH 05/11] Default to passed in svc name --- .../scala/ai/chronon/online/metrics/OtelMetricsReporter.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala b/online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala index 39c2248c3a..1f75b1e19c 100644 --- a/online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala +++ b/online/src/main/scala/ai/chronon/online/metrics/OtelMetricsReporter.scala @@ -80,7 +80,6 @@ class OtelMetricsReporter(openTelemetry: OpenTelemetry) extends MetricsReporter object OtelMetricsReporter { - val DefaultServiceName = "ai.chronon" val MetricsReader = "ai.chronon.metrics.reader" val MetricsExporterUrlKey = "ai.chronon.metrics.exporter.url" val MetricsExporterPrometheusPortKey = "ai.chronon.metrics.exporter.port" @@ -132,7 +131,6 @@ object OtelMetricsReporter { builder.put(key, v) } - builder.put(AttributeKey.stringKey("service.name"), DefaultServiceName) val resource = Resource.getDefault.merge(Resource.create(builder.build())) val meterProvider = SdkMeterProvider.builder From 2633c13bef3e1c865de32794884b0ebfe4187b42 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Thu, 1 May 2025 19:18:04 -0700 Subject: [PATCH 06/11] Add a delay to reporting tp metrics --- .../InstrumentedThreadPoolExecutor.scala | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala b/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala index c64e4c3ad9..8aa7eb00f3 100644 --- a/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala +++ b/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala @@ -1,13 +1,8 @@ package ai.chronon.online.metrics -import java.util.concurrent.{ - BlockingQueue, - Executors, - ScheduledExecutorService, - ThreadFactory, - ThreadPoolExecutor, - TimeUnit -} +import org.slf4j.LoggerFactory + +import java.util.concurrent.{BlockingQueue, Executors, ScheduledExecutorService, ThreadFactory, ThreadPoolExecutor, TimeUnit} class InstrumentedThreadPoolExecutor(corePoolSize: Int, maximumPoolSize: Int, @@ -27,8 +22,12 @@ class InstrumentedThreadPoolExecutor(corePoolSize: Int, // Reporter for periodic metrics private val metricsReporter: ScheduledExecutorService = buildMetricsScheduledExecutor() + private val InitialDelay = 60L + protected val metricsContext: Metrics.Context = Metrics.Context(Metrics.Environment.Fetcher).withSuffix("threadpool") + private val logger = LoggerFactory.getLogger(classOf[InstrumentedThreadPoolExecutor]) + // Schedule periodic metrics collection to capture sizes of the queue and the pool private def buildMetricsScheduledExecutor(): ScheduledExecutorService = { val reporter = Executors.newSingleThreadScheduledExecutor(r => { @@ -40,21 +39,26 @@ class InstrumentedThreadPoolExecutor(corePoolSize: Int, reporter.scheduleAtFixedRate( () => { - // Report queue size - metricsContext.gauge("queue_size", getQueue.size()) - - // Report pool sizes directly from the executor - metricsContext.gauge("active_threads", getActiveCount) - metricsContext.gauge("pool_size", getPoolSize) - metricsContext.gauge("core_pool_size", getCorePoolSize) - metricsContext.gauge("maximum_pool_size", getMaximumPoolSize) - metricsContext.gauge("largest_pool_size", getLargestPoolSize) - - // Task counts from executor - metricsContext.gauge("completed_task_count", getCompletedTaskCount) - metricsContext.gauge("task_count", getTaskCount) + try { + // Report queue size + metricsContext.gauge("queue_size", getQueue.size()) + + // Report pool sizes directly from the executor + metricsContext.gauge("active_threads", getActiveCount) + metricsContext.gauge("pool_size", getPoolSize) + metricsContext.gauge("core_pool_size", getCorePoolSize) + metricsContext.gauge("maximum_pool_size", getMaximumPoolSize) + metricsContext.gauge("largest_pool_size", getLargestPoolSize) + + // Task counts from executor + metricsContext.gauge("completed_task_count", getCompletedTaskCount) + metricsContext.gauge("task_count", getTaskCount) + } catch { + case e: Exception => + logger.warn("Error reporting fetcher threadpool metrics", e) + } }, - 0, + InitialDelay, metricsIntervalSeconds, TimeUnit.SECONDS ) From b20ed68e7be11e69f92f7781aa9a73a88a8a27ba Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Thu, 1 May 2025 19:21:32 -0700 Subject: [PATCH 07/11] Reintroduce docker fetcher file --- docker/fetcher/Dockerfile | 53 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 docker/fetcher/Dockerfile diff --git a/docker/fetcher/Dockerfile b/docker/fetcher/Dockerfile new file mode 100644 index 0000000000..8d61c526d9 --- /dev/null +++ b/docker/fetcher/Dockerfile @@ -0,0 +1,53 @@ +# Start from a Debian base image +FROM openjdk:17-jdk-slim + +# We expect jars to be copied to the build_output directory as docker can't read from bazel-bin as that's a symlink +# https://stackoverflow.com/questions/31881904/docker-follow-symlink-outside-context +ENV CLOUD_AWS_JAR_PATH=build_output/cloud_aws_lib_deploy.jar +ENV CLOUD_GCP_JAR_PATH=build_output/cloud_gcp_lib_deploy.jar +ENV FETCHER_SVC_JAR_PATH=build_output/service_assembly_deploy.jar +ENV FETCHER_LAUNCH_SCRIPT=docker/fetcher/start.sh +ENV GCP_ONLINE_CLASS=ai.chronon.integrations.cloud_gcp.GcpApiImpl +ENV AWS_ONLINE_CLASS=ai.chronon.integrations.aws.AwsApiImpl + +# Update package lists and install necessary tools +RUN apt-get update && apt-get install -y \ + curl \ + python3 \ + python3-dev \ + python3-setuptools \ + vim \ + wget \ + procps \ + python3-pip + +ENV SCALA_VERSION 2.12.18 + +RUN curl https://downloads.lightbend.com/scala/${SCALA_VERSION}/scala-${SCALA_VERSION}.deb -k -o scala.deb && \ + apt install -y ./scala.deb && \ + rm -rf scala.deb /var/lib/apt/lists/* + +ENV SCALA_HOME="/usr/bin/scala" +ENV PATH=${PATH}:${SCALA_HOME}/bin + +WORKDIR /srv/zipline + +ENV CLOUD_AWS_JAR=${CLOUD_AWS_JAR:-"/srv/zipline/cloud_aws/cloud_aws.jar"} +ENV CLOUD_GCP_JAR=${CLOUD_GCP_JAR:-"/srv/zipline/cloud_gcp/cloud_gcp.jar"} +ENV FETCHER_JAR=${FETCHER_JAR:-"/srv/zipline/fetcher/service.jar"} +ENV LOG_PATH=${LOG_PATH:-"/srv/zipline/fetcher/logs"} + +COPY $CLOUD_AWS_JAR_PATH "$CLOUD_AWS_JAR" +COPY $CLOUD_GCP_JAR_PATH "$CLOUD_GCP_JAR" +COPY $FETCHER_SVC_JAR_PATH "$FETCHER_JAR" +COPY $FETCHER_LAUNCH_SCRIPT /srv/zipline/fetcher/start.sh + +ENV FETCHER_PORT=9000 + +HEALTHCHECK --start-period=2m --retries=4 CMD curl --fail http://localhost:$FETCHER_PORT/ping || exit 1 + +RUN mkdir -p $LOG_PATH && \ + chmod 755 $LOG_PATH + +CMD /srv/zipline/fetcher/start.sh + From 7f9b6534c0212b754601180b12160196713d5544 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Thu, 1 May 2025 19:22:44 -0700 Subject: [PATCH 08/11] style: Apply scalafix and scalafmt changes --- .../online/metrics/InstrumentedThreadPoolExecutor.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala b/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala index 8aa7eb00f3..9d15205a29 100644 --- a/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala +++ b/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala @@ -2,7 +2,14 @@ package ai.chronon.online.metrics import org.slf4j.LoggerFactory -import java.util.concurrent.{BlockingQueue, Executors, ScheduledExecutorService, ThreadFactory, ThreadPoolExecutor, TimeUnit} +import java.util.concurrent.{ + BlockingQueue, + Executors, + ScheduledExecutorService, + ThreadFactory, + ThreadPoolExecutor, + TimeUnit +} class InstrumentedThreadPoolExecutor(corePoolSize: Int, maximumPoolSize: Int, From b8f7931e7b14020718a8e783f728a191376eaf80 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Thu, 1 May 2025 19:46:12 -0700 Subject: [PATCH 09/11] Fix order --- .../metrics/InstrumentedThreadPoolExecutor.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala b/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala index 9d15205a29..b7ab0ff8e7 100644 --- a/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala +++ b/online/src/main/scala/ai/chronon/online/metrics/InstrumentedThreadPoolExecutor.scala @@ -26,13 +26,11 @@ class InstrumentedThreadPoolExecutor(corePoolSize: Int, workQueue, threadFactory ) { + protected val metricsContext: Metrics.Context = Metrics.Context(Metrics.Environment.Fetcher).withSuffix("threadpool") + // Reporter for periodic metrics private val metricsReporter: ScheduledExecutorService = buildMetricsScheduledExecutor() - private val InitialDelay = 60L - - protected val metricsContext: Metrics.Context = Metrics.Context(Metrics.Environment.Fetcher).withSuffix("threadpool") - private val logger = LoggerFactory.getLogger(classOf[InstrumentedThreadPoolExecutor]) // Schedule periodic metrics collection to capture sizes of the queue and the pool @@ -62,10 +60,10 @@ class InstrumentedThreadPoolExecutor(corePoolSize: Int, metricsContext.gauge("task_count", getTaskCount) } catch { case e: Exception => - logger.warn("Error reporting fetcher threadpool metrics", e) + logger.warn(s"Error reporting fetcher threadpool metrics - $e") } }, - InitialDelay, + 60, metricsIntervalSeconds, TimeUnit.SECONDS ) From 80d91210e06c20cf3f906ea36622a81251a81733 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Fri, 2 May 2025 09:22:43 -0700 Subject: [PATCH 10/11] Fix bug --- .../ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala index 3c116ebb04..870cd13d76 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala @@ -457,7 +457,7 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, throw new RuntimeException(s"Export job corresponding to $jobId no longer exists") } else if (completedJob.getStatus.getError != null) { logger.error(s"Job failed with error: ${completedJob.getStatus.getError}") - metricsContext.increment("bulkPut.failures", Map("exception" -> "completedJob.getStatus.getError.getReason")) + metricsContext.increment("bulkPut.failures", Map("exception" -> s"${completedJob.getStatus.getError.getReason}")) throw new RuntimeException(s"Export job failed with error: ${completedJob.getStatus.getError}") } else { logger.info("Export job completed successfully") From 6df3660d22b6c73a70e24b4ffe86141720697e03 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Fri, 2 May 2025 09:22:59 -0700 Subject: [PATCH 11/11] style: Apply scalafix and scalafmt changes --- .../chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala index 870cd13d76..44cbfdd529 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala @@ -457,7 +457,8 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, throw new RuntimeException(s"Export job corresponding to $jobId no longer exists") } else if (completedJob.getStatus.getError != null) { logger.error(s"Job failed with error: ${completedJob.getStatus.getError}") - metricsContext.increment("bulkPut.failures", Map("exception" -> s"${completedJob.getStatus.getError.getReason}")) + metricsContext.increment("bulkPut.failures", + Map("exception" -> s"${completedJob.getStatus.getError.getReason}")) throw new RuntimeException(s"Export job failed with error: ${completedJob.getStatus.getError}") } else { logger.info("Export job completed successfully")