From c614277166b4cee219ca55217ff5cca3afd53656 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Mon, 24 Feb 2025 15:52:36 -0500 Subject: [PATCH 1/9] Add join list endpoint --- .../main/scala/ai/chronon/api/Constants.scala | 10 ++ cloud_aws/BUILD.bazel | 1 + .../aws/DynamoDBKVStoreImpl.scala | 14 +- .../aws/DynamoDBKVStoreTest.scala | 9 +- .../cloud_gcp/BigTableKVStoreImpl.scala | 26 ++-- .../cloud_gcp/BigTableKVStoreTest.scala | 58 +++++++- .../java/ai/chronon/online/JavaFetcher.java | 8 ++ .../online/fetcher/MetadataStore.scala | 53 ++++++++ .../ai/chronon/service/FetcherVerticle.java | 9 +- .../chronon/service/handlers/FetchRouter.java | 3 +- .../service/handlers/JoinListHandler.java | 47 +++++++ .../service/handlers/JoinListHandlerTest.java | 127 ++++++++++++++++++ 12 files changed, 331 insertions(+), 34 deletions(-) create mode 100644 service/src/main/java/ai/chronon/service/handlers/JoinListHandler.java create mode 100644 service/src/test/java/ai/chronon/service/handlers/JoinListHandlerTest.java diff --git a/api/src/main/scala/ai/chronon/api/Constants.scala b/api/src/main/scala/ai/chronon/api/Constants.scala index 7c5dfba38f..02d1584a19 100644 --- a/api/src/main/scala/ai/chronon/api/Constants.scala +++ b/api/src/main/scala/ai/chronon/api/Constants.scala @@ -85,4 +85,14 @@ object Constants { val GroupByKeyword = "group_bys" val StagingQueryKeyword = "staging_queries" val ModelKeyword = "models" + + // KV store related constants + // continuation key to help with list pagination + val ContinuationKey: String = "continuation-key" + + // Limit of max number of entries to return in a list call + val ListLimit: String = "limit" + + // List entity type + val ListEntityType: String = "entity_type" } diff --git a/cloud_aws/BUILD.bazel b/cloud_aws/BUILD.bazel index 9d5347ee9a..fe6dffe219 100644 --- a/cloud_aws/BUILD.bazel +++ b/cloud_aws/BUILD.bazel @@ -24,6 +24,7 @@ scala_library( test_deps = [ ":cloud_aws_lib", + "//api:lib", "//online:lib", maven_artifact("software.amazon.awssdk:dynamodb"), maven_artifact("software.amazon.awssdk:regions"), diff --git a/cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala b/cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala index 5f92e06112..d288e8e656 100644 --- a/cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala +++ b/cloud_aws/src/main/scala/ai/chronon/integrations/aws/DynamoDBKVStoreImpl.scala @@ -1,6 +1,7 @@ package ai.chronon.integrations.aws import ai.chronon.api.Constants +import ai.chronon.api.Constants.{ContinuationKey, ListLimit} import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.online.KVStore import ai.chronon.online.KVStore.GetResponse @@ -36,7 +37,6 @@ import java.util.concurrent.ConcurrentHashMap import scala.concurrent.Future import scala.util.Success import scala.util.Try - import scala.collection.Seq object DynamoDBKVStoreConstants { @@ -49,12 +49,6 @@ object DynamoDBKVStoreConstants { // Optional field that indicates if this table is meant to be time sorted in Dynamo or not val isTimedSorted = "is-time-sorted" - // Limit of max number of entries to return in a list call - val listLimit = "limit" - - // continuation key to help with list pagination - val continuationKey = "continuation-key" - // Name of the partition key column to use val partitionKeyColumn = "keyBytes" @@ -172,13 +166,13 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore { } override def list(request: ListRequest): Future[ListResponse] = { - val listLimit = request.props.get(DynamoDBKVStoreConstants.listLimit) match { + val listLimit = request.props.get(ListLimit) match { case Some(value: Int) => value case Some(value: String) => value.toInt case _ => 100 } - val maybeExclusiveStartKey = request.props.get(continuationKey) + val maybeExclusiveStartKey = request.props.get(ContinuationKey) val maybeExclusiveStartKeyAttribute = maybeExclusiveStartKey.map { k => AttributeValue.builder.b(SdkBytes.fromByteArray(k.asInstanceOf[Array[Byte]])).build } @@ -199,7 +193,7 @@ class DynamoDBKVStoreImpl(dynamoDbClient: DynamoDbClient) extends KVStore { case Success(scanResponse) if scanResponse.hasLastEvaluatedKey => val lastEvalKey = scanResponse.lastEvaluatedKey().toScala.get(partitionKeyColumn) lastEvalKey match { - case Some(av) => ListResponse(request, resultElements, Map(continuationKey -> av.b().asByteArray())) + case Some(av) => ListResponse(request, resultElements, Map(ContinuationKey -> av.b().asByteArray())) case _ => noPagesLeftResponse } case _ => noPagesLeftResponse diff --git a/cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala b/cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala index 51fba9acd6..f1ee940a07 100644 --- a/cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala +++ b/cloud_aws/src/test/scala/ai/chronon/integrations/aws/DynamoDBKVStoreTest.scala @@ -1,5 +1,6 @@ package ai.chronon.integrations.aws +import ai.chronon.api.Constants.{ContinuationKey, ListLimit} import ai.chronon.online.KVStore._ import com.amazonaws.services.dynamodbv2.local.main.ServerRunner import com.amazonaws.services.dynamodbv2.local.server.DynamoDBProxyServer @@ -124,16 +125,16 @@ class DynamoDBKVStoreTest extends AnyFlatSpec with BeforeAndAfterAll { putResults.foreach(r => r shouldBe true) // call list - first call is only for 10 elements - val listReq1 = ListRequest(dataset, Map(listLimit -> 10)) + val listReq1 = ListRequest(dataset, Map(ListLimit -> 10)) val listResults1 = Await.result(kvStore.list(listReq1), 1.minute) - listResults1.resultProps.contains(continuationKey) shouldBe true + listResults1.resultProps.contains(ContinuationKey) shouldBe true validateExpectedListResponse(listResults1.values, 10) // call list - with continuation key val listReq2 = - ListRequest(dataset, Map(listLimit -> 100, continuationKey -> listResults1.resultProps(continuationKey))) + ListRequest(dataset, Map(ListLimit -> 100, ContinuationKey -> listResults1.resultProps(ContinuationKey))) val listResults2 = Await.result(kvStore.list(listReq2), 1.minute) - listResults2.resultProps.contains(continuationKey) shouldBe false + listResults2.resultProps.contains(ContinuationKey) shouldBe false validateExpectedListResponse(listResults2.values, 100) } diff --git a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala index 6d4512a1d1..fde1994c08 100644 --- a/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala +++ b/cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreImpl.scala @@ -1,5 +1,6 @@ package ai.chronon.integrations.cloud_gcp +import ai.chronon.api.Constants.{ContinuationKey, ListEntityType, ListLimit} import ai.chronon.api.Extensions.GroupByOps import ai.chronon.api.Extensions.StringOps import ai.chronon.api.Extensions.WindowOps @@ -211,13 +212,14 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, override def list(request: ListRequest): Future[ListResponse] = { logger.info(s"Performing list for ${request.dataset}") - val listLimit = request.props.get(BigTableKVStore.listLimit) match { + val listLimit = request.props.get(ListLimit) match { case Some(value: Int) => value case Some(value: String) => value.toInt case _ => defaultListLimit } - val maybeStartKey = request.props.get(continuationKey) + val maybeListEntityType = request.props.get(ListEntityType) + val maybeStartKey = request.props.get(ContinuationKey) val query = Query .create(mapDatasetToTable(request.dataset)) @@ -227,9 +229,15 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, .filter(Filters.FILTERS.limit().cellsPerRow(1)) .limit(listLimit) - // if we got a start row key, lets wire it up - maybeStartKey.foreach { startKey => - query.range(ByteStringRange.unbounded().startOpen(ByteString.copyFrom(startKey.asInstanceOf[Array[Byte]]))) + (maybeStartKey, maybeListEntityType) match { + case (Some(startKey), _) => + // we have a start key, we use that to pick up from where we left off + query.range(ByteStringRange.unbounded().startOpen(ByteString.copyFrom(startKey.asInstanceOf[Array[Byte]]))) + case (None, Some(listEntityType)) => + val startRowKey = buildRowKey(s"$listEntityType/".getBytes(Charset.forName("UTF-8")), request.dataset) + query.range(ByteStringRange.unbounded().startOpen(ByteString.copyFrom(startRowKey))) + case _ => + logger.info("No start key or list entity type provided. Starting from the beginning") } val startTs = System.currentTimeMillis() @@ -253,7 +261,7 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, if (listValues.size < listLimit) { Map.empty // last page, we're done } else - Map(continuationKey -> listValues.last.keyBytes) + Map(ContinuationKey -> listValues.last.keyBytes) ListResponse(request, Success(listValues), propsMap) @@ -410,12 +418,6 @@ class BigTableKVStoreImpl(dataClient: BigtableDataClient, object BigTableKVStore { - // continuation key to help with list pagination - val continuationKey: String = "continuationKey" - - // Limit of max number of entries to return in a list call - val listLimit: String = "limit" - // Default list limit val defaultListLimit: Int = 100 diff --git a/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala index cee71b00ea..8319b625ef 100644 --- a/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala +++ b/cloud_gcp/src/test/scala/ai/chronon/integrations/cloud_gcp/BigTableKVStoreTest.scala @@ -1,5 +1,6 @@ package ai.chronon.integrations.cloud_gcp +import ai.chronon.api.Constants.{ContinuationKey, GroupByKeyword, JoinKeyword, ListEntityType, ListLimit} import ai.chronon.api.TilingUtils import ai.chronon.online.KVStore.GetRequest import ai.chronon.online.KVStore.GetResponse @@ -176,21 +177,21 @@ class BigTableKVStoreTest extends AnyFlatSpec with BeforeAndAfter { // let's try and read these val limit = 10 - val listReq1 = ListRequest(dataset, Map(listLimit -> limit)) + val listReq1 = ListRequest(dataset, Map(ListLimit -> limit)) val listResult1 = Await.result(kvStore.list(listReq1), 1.second) listResult1.values.isSuccess shouldBe true - listResult1.resultProps.contains(BigTableKVStore.continuationKey) shouldBe true + listResult1.resultProps.contains(ContinuationKey) shouldBe true val listValues1 = listResult1.values.get listValues1.size shouldBe limit // another call, bigger limit val limit2 = 1000 - val continuationKey = listResult1.resultProps(BigTableKVStore.continuationKey) - val listReq2 = ListRequest(dataset, Map(listLimit -> limit2, BigTableKVStore.continuationKey -> continuationKey)) + val continuationKey = listResult1.resultProps(ContinuationKey) + val listReq2 = ListRequest(dataset, Map(ListLimit -> limit2, ContinuationKey -> continuationKey)) val listResult2 = Await.result(kvStore.list(listReq2), 1.second) listResult2.values.isSuccess shouldBe true - listResult2.resultProps.contains(BigTableKVStore.continuationKey) shouldBe false + listResult2.resultProps.contains(ContinuationKey) shouldBe false val listValues2 = listResult2.values.get listValues2.size shouldBe (putReqs.size - limit) @@ -201,6 +202,53 @@ class BigTableKVStoreTest extends AnyFlatSpec with BeforeAndAfter { .toSet } + it should "list entity types with pagination" in { + val dataset = "metadata" + val kvStore = new BigTableKVStoreImpl(dataClient, adminClient) + kvStore.create(dataset) + + val putGrpByReqs = (0 until 50).map { i => + val key = s"$GroupByKeyword/gbkey-$i" + val value = s"""{"name": "name-$i", "age": $i}""" + PutRequest(key.getBytes, value.getBytes, dataset, None) + } + + val putJoinReqs = (0 until 50).map { i => + val key = s"$JoinKeyword/joinkey-$i" + val value = s"""{"name": "name-$i", "age": $i}""" + PutRequest(key.getBytes, value.getBytes, dataset, None) + } + + val putResults = Await.result(kvStore.multiPut(putGrpByReqs ++ putJoinReqs), 1.second) + putResults.foreach(r => r shouldBe true) + + // let's try and read just the joins + val limit = 10 + val listReq1 = ListRequest(dataset, Map(ListLimit -> limit, ListEntityType -> JoinKeyword)) + + val listResult1 = Await.result(kvStore.list(listReq1), 1.second) + listResult1.values.isSuccess shouldBe true + listResult1.resultProps.contains(ContinuationKey) shouldBe true + val listValues1 = listResult1.values.get + listValues1.size shouldBe limit + + // another call, bigger limit + val limit2 = 1000 + val continuationKey = listResult1.resultProps(ContinuationKey) + val listReq2 = ListRequest(dataset, Map(ListLimit -> limit2, ContinuationKey -> continuationKey)) + val listResult2 = Await.result(kvStore.list(listReq2), 1.second) + listResult2.values.isSuccess shouldBe true + listResult2.resultProps.contains(ContinuationKey) shouldBe false + val listValues2 = listResult2.values.get + listValues2.size shouldBe (putJoinReqs.size - limit) + + // lets collect all the keys and confirm we got everything + val allKeys = (listValues1 ++ listValues2).map(v => new String(v.keyBytes, StandardCharsets.UTF_8)) + allKeys.toSet shouldBe putJoinReqs + .map(r => new String(buildRowKey(r.keyBytes, r.dataset), StandardCharsets.UTF_8)) + .toSet + } + it should "multiput failures" in { val mockDataClient = mock[BigtableDataClient](withSettings().mockMaker("mock-maker-inline")) val mockAdminClient = mock[BigtableTableAdminClient] diff --git a/online/src/main/java/ai/chronon/online/JavaFetcher.java b/online/src/main/java/ai/chronon/online/JavaFetcher.java index 0d841a7987..bdbc32926a 100644 --- a/online/src/main/java/ai/chronon/online/JavaFetcher.java +++ b/online/src/main/java/ai/chronon/online/JavaFetcher.java @@ -16,6 +16,7 @@ package ai.chronon.online; +import ai.chronon.api.ScalaJavaConversions; import ai.chronon.online.fetcher.Fetcher; import ai.chronon.online.fetcher.FetcherResponseWithTs; import scala.collection.Iterator; @@ -170,6 +171,13 @@ public CompletableFuture> fetchJoin(List request return convertResponsesWithTs(scalaResponses, false, startTs); } + public CompletableFuture> listJoins(boolean isOnline) { + // Get responses from the fetcher + Future> scalaResponses = this.fetcher.listJoins(isOnline); + // convert to Java friendly types + return FutureConverters.toJava(scalaResponses).toCompletableFuture().thenApply(ScalaJavaConversions::toJava); + } + private void instrument(List requestNames, boolean isGroupBy, String metricName, Long startTs) { long endTs = System.currentTimeMillis(); for (String s : requestNames) { diff --git a/online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala index 2f30a90263..32d3a62e8d 100644 --- a/online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala @@ -173,6 +173,59 @@ class MetadataStore(fetchContext: FetchContext) { fetchContext.metadataDataset)) } + def listJoins(isOnline: Boolean = true): Future[Seq[String]] = { + def parseJoins(response: ListResponse): Seq[String] = { + val result = response.values + .map { + seqListValues => + + seqListValues + .map(kv => new String(kv.valueBytes, StandardCharsets.UTF_8)) + .map(v => ThriftJsonCodec.fromJsonStr[api.Join](v, check = false, classOf[api.Join])) + .filter(_.join.metaData.online == isOnline) + .map(_.metaData.name) + + }.recover { + case e: Exception => + logger.error("Failed to list & parse joins from list response", e) + throw e + } + + val context = Metrics.Context(Metrics.Environment.MetaDataFetching) + // Throw exception after metrics as the inability to list joins is likely a critical failure. + if (result.isFailure) { + context.withSuffix("join_list").increment(Metrics.Name.Exception) + throw result.failed.get + } + + result.get + } + + def doRetrieveAllListConfs(acc: mutable.ArrayBuffer[String], paginationKey: Option[Any] = None): Future[Seq[String]] = { + val propsMap = { + paginationKey match { + case Some(key) => Map(ListEntityType -> JoinKeyword, ContinuationKey -> key) + case None => Map(ListEntityType -> JoinKeyword) + } + } + + val listRequest = ListRequest(dataset, propsMap) + kvStore.list(listRequest).flatMap { + response => + + val joinSeq: Seq[String] = parseJoins(response) + val newAcc = acc ++ joinSeq + if (response.resultProps.contains(ContinuationKey)) { + doRetrieveAllListConfs(newAcc, response.resultProps.get(ContinuationKey)) + } else { + Future.successful(newAcc) + } + } + } + + doRetrieveAllListConfs(new mutable.ArrayBuffer[String]()) + } + // key and value schemas def buildJoinCodecCache(onCreateFunc: Option[Try[JoinCodec] => Unit]): TTLCache[String, Try[JoinCodec]] = { diff --git a/service/src/main/java/ai/chronon/service/FetcherVerticle.java b/service/src/main/java/ai/chronon/service/FetcherVerticle.java index 72d0627a9b..9ddf5567df 100644 --- a/service/src/main/java/ai/chronon/service/FetcherVerticle.java +++ b/service/src/main/java/ai/chronon/service/FetcherVerticle.java @@ -1,7 +1,9 @@ package ai.chronon.service; import ai.chronon.online.Api; +import ai.chronon.online.JavaFetcher; import ai.chronon.service.handlers.FetchRouter; +import ai.chronon.service.handlers.JoinListHandler; import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; import io.vertx.core.http.HttpServer; @@ -31,8 +33,13 @@ protected void startHttpServer(int port, String configJsonString, Api api, Promi // Define routes + JavaFetcher fetcher = api.buildJavaFetcher("feature-service", false); + // Set up sub-routes for the various feature retrieval apis - router.route("/v1/fetch/*").subRouter(FetchRouter.createFetchRoutes(vertx, api)); + router.route("/v1/fetch/*").subRouter(FetchRouter.createFetchRoutes(vertx, fetcher)); + + // Set up route for list of online joins + router.get("/v1/joins").handler(new JoinListHandler(fetcher)); // Health check route router.get("/ping").handler(ctx -> { diff --git a/service/src/main/java/ai/chronon/service/handlers/FetchRouter.java b/service/src/main/java/ai/chronon/service/handlers/FetchRouter.java index bb1d0657d7..0507e15099 100644 --- a/service/src/main/java/ai/chronon/service/handlers/FetchRouter.java +++ b/service/src/main/java/ai/chronon/service/handlers/FetchRouter.java @@ -27,10 +27,9 @@ public CompletableFuture> apply(JavaFetcher fetcher, List { + + private final JavaFetcher fetcher; + + public JoinListHandler(JavaFetcher fetcher) { + this.fetcher = fetcher; + } + + @Override + public void handle(RoutingContext ctx) { + CompletableFuture> resultsJavaFuture = fetcher.listJoins(true); + // wrap the Java future we get in a Vert.x Future to not block the worker thread + Future> maybeFeatureResponses = Future.fromCompletionStage(resultsJavaFuture); + + maybeFeatureResponses.onSuccess( + resultList -> { + ctx.response() + .setStatusCode(200) + .putHeader("content-type", "application/json") + .end(new JsonObject().put("joinNames", resultList).encode()); + }); + + maybeFeatureResponses.onFailure( + err -> { + + List failureMessages = Collections.singletonList(err.getMessage()); + + ctx.response() + .setStatusCode(500) + .putHeader("content-type", "application/json") + .end(new JsonObject().put("errors", failureMessages).encode()); + }); + } +} + diff --git a/service/src/test/java/ai/chronon/service/handlers/JoinListHandlerTest.java b/service/src/test/java/ai/chronon/service/handlers/JoinListHandlerTest.java new file mode 100644 index 0000000000..c82b1da97f --- /dev/null +++ b/service/src/test/java/ai/chronon/service/handlers/JoinListHandlerTest.java @@ -0,0 +1,127 @@ +package ai.chronon.service.handlers; + +import ai.chronon.online.JTry; +import ai.chronon.online.JavaFetcher; +import ai.chronon.online.JavaRequest; +import ai.chronon.online.JavaResponse; +import ai.chronon.service.model.GetFeaturesResponse; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.ext.web.RoutingContext; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(VertxUnitRunner.class) +public class JoinListHandlerTest { + @Mock + private JavaFetcher mockFetcher; + + @Mock + private RoutingContext routingContext; + + @Mock + private HttpServerResponse response; + + private JoinListHandler handler; + private Vertx vertx; + + @Before + public void setUp(TestContext context) { + MockitoAnnotations.openMocks(this); + vertx = Vertx.vertx(); + + handler = new JoinListHandler(mockFetcher); + + // Set up common routing context behavior + when(routingContext.response()).thenReturn(response); + when(response.putHeader(anyString(), anyString())).thenReturn(response); + when(response.setStatusCode(anyInt())).thenReturn(response); + } + + @Test + public void testSuccessfulRequest(TestContext context) { + Async async = context.async(); + + List joins = List.of("my_joins.join_a.v1", "my_joins.join_a.v2", "my_joins.join_b.v1"); + // Set up mocks + CompletableFuture> futureListResponse = + CompletableFuture.completedFuture(joins); + + when(mockFetcher.listJoins(anyBoolean())).thenReturn(futureListResponse); + + // Capture the response that will be sent + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(String.class); + + // Trigger call + handler.handle(routingContext); + + // Assert results + vertx.setTimer(1000, id -> { + verify(response).setStatusCode(200); + verify(response).putHeader("content-type", "application/json"); + verify(response).end(responseCaptor.capture()); + + // Verify response format + JsonObject actualResponse = new JsonObject(responseCaptor.getValue()); + JsonArray joinNames = actualResponse.getJsonArray("joinNames"); + context.assertEquals(joinNames.size(), joins.size()); + for (int i = 0; i < joinNames.size(); i++) { + context.assertEquals(joins.get(i), joinNames.getString(i)); + } + async.complete(); + }); + } + + @Test + public void testFailedFutureRequest(TestContext context) { + Async async = context.async(); + + List joins = List.of("my_joins.join_a.v1", "my_joins.join_a.v2", "my_joins.join_b.v1"); + // Set up mocks + CompletableFuture> futureResponse = new CompletableFuture<>(); + futureResponse.completeExceptionally(new RuntimeException("Error in KV store lookup")); + + when(mockFetcher.listJoins(anyBoolean())).thenReturn(futureResponse); + + // Capture the response that will be sent + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(String.class); + + // Trigger call + handler.handle(routingContext); + + // Assert results + vertx.setTimer(1000, id -> { + verify(response).setStatusCode(500); + verify(response).putHeader("content-type", "application/json"); + verify(response).end(responseCaptor.capture()); + + // Verify response format + validateFailureResponse(responseCaptor.getValue(), context); + async.complete(); + }); + } + + private void validateFailureResponse(String jsonResponse, TestContext context) { + JsonObject actualResponse = new JsonObject(jsonResponse); + context.assertTrue(actualResponse.containsKey("errors")); + + String failureString = actualResponse.getJsonArray("errors").getString(0); + context.assertNotNull(failureString); + } +} From 8b7f1db2d910da5a358539fcca9628f0530bfe5d Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Mon, 24 Feb 2025 20:38:40 -0500 Subject: [PATCH 2/9] Some tweaks --- .../online/fetcher/MetadataStore.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala index 32d3a62e8d..d0d7d61909 100644 --- a/online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala @@ -174,6 +174,10 @@ class MetadataStore(fetchContext: FetchContext) { } def listJoins(isOnline: Boolean = true): Future[Seq[String]] = { + + val context = Metrics.Context(Metrics.Environment.MetaDataFetching) + val startTimeMs = System.currentTimeMillis() + def parseJoins(response: ListResponse): Seq[String] = { val result = response.values .map { @@ -187,17 +191,11 @@ class MetadataStore(fetchContext: FetchContext) { }.recover { case e: Exception => - logger.error("Failed to list & parse joins from list response", e) - throw e + logger.error("Failed to list & parse joins from list response", e) + context.withSuffix("join_list").increment(Metrics.Name.Exception) + throw e } - val context = Metrics.Context(Metrics.Environment.MetaDataFetching) - // Throw exception after metrics as the inability to list joins is likely a critical failure. - if (result.isFailure) { - context.withSuffix("join_list").increment(Metrics.Name.Exception) - throw result.failed.get - } - result.get } @@ -218,6 +216,9 @@ class MetadataStore(fetchContext: FetchContext) { if (response.resultProps.contains(ContinuationKey)) { doRetrieveAllListConfs(newAcc, response.resultProps.get(ContinuationKey)) } else { + context + .withSuffix("join_list") + .distribution(Metrics.Name.LatencyMillis, System.currentTimeMillis() - startTimeMs) Future.successful(newAcc) } } From f59e5d6e5e57f7b233da3ab31f4a042d5aed358b Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Mon, 24 Feb 2025 22:40:30 -0500 Subject: [PATCH 3/9] Add join schema endpoint --- .../java/ai/chronon/online/JavaFetcher.java | 6 + .../online/JavaJoinSchemaResponse.java | 24 ++++ .../scala/ai/chronon/online/Metrics.scala | 1 + .../ai/chronon/online/fetcher/Fetcher.scala | 27 ++++ .../ai/chronon/service/FetcherVerticle.java | 4 + .../service/handlers/JoinSchemaHandler.java | 54 ++++++++ .../handlers/JoinSchemaHandlerTest.java | 123 ++++++++++++++++++ 7 files changed, 239 insertions(+) create mode 100644 online/src/main/java/ai/chronon/online/JavaJoinSchemaResponse.java create mode 100644 service/src/main/java/ai/chronon/service/handlers/JoinSchemaHandler.java create mode 100644 service/src/test/java/ai/chronon/service/handlers/JoinSchemaHandlerTest.java diff --git a/online/src/main/java/ai/chronon/online/JavaFetcher.java b/online/src/main/java/ai/chronon/online/JavaFetcher.java index bdbc32926a..bb3bd84c6a 100644 --- a/online/src/main/java/ai/chronon/online/JavaFetcher.java +++ b/online/src/main/java/ai/chronon/online/JavaFetcher.java @@ -26,6 +26,7 @@ import scala.compat.java8.FutureConverters; import scala.concurrent.Future; import scala.concurrent.ExecutionContext; +import scala.util.Try; import java.util.ArrayList; import java.util.List; @@ -178,6 +179,11 @@ public CompletableFuture> listJoins(boolean isOnline) { return FutureConverters.toJava(scalaResponses).toCompletableFuture().thenApply(ScalaJavaConversions::toJava); } + public JTry fetchJoinSchema(String joinName) { + Try scalaResponse = this.fetcher.fetchJoinSchema(joinName); + return JTry.fromScala(scalaResponse).map(JavaJoinSchemaResponse::new); + } + private void instrument(List requestNames, boolean isGroupBy, String metricName, Long startTs) { long endTs = System.currentTimeMillis(); for (String s : requestNames) { diff --git a/online/src/main/java/ai/chronon/online/JavaJoinSchemaResponse.java b/online/src/main/java/ai/chronon/online/JavaJoinSchemaResponse.java new file mode 100644 index 0000000000..a1624135df --- /dev/null +++ b/online/src/main/java/ai/chronon/online/JavaJoinSchemaResponse.java @@ -0,0 +1,24 @@ +package ai.chronon.online; + +import ai.chronon.online.fetcher.Fetcher; + +public class JavaJoinSchemaResponse { + public String joinSchema; + public String schemaHash; + + public JavaJoinSchemaResponse(String joinSchema, String schemaHash) { + this.joinSchema = joinSchema; + this.schemaHash = schemaHash; + } + + public JavaJoinSchemaResponse(Fetcher.JoinSchemaResponse scalaResponse){ + this.joinSchema = scalaResponse.joinSchema(); + this.schemaHash = scalaResponse.schemaHash(); + } + + public Fetcher.JoinSchemaResponse toScala() { + return new Fetcher.JoinSchemaResponse( + joinSchema, + schemaHash); + } +} diff --git a/online/src/main/scala/ai/chronon/online/Metrics.scala b/online/src/main/scala/ai/chronon/online/Metrics.scala index d7b13b71bc..4e5f9c8048 100644 --- a/online/src/main/scala/ai/chronon/online/Metrics.scala +++ b/online/src/main/scala/ai/chronon/online/Metrics.scala @@ -28,6 +28,7 @@ object Metrics { type Environment = String val MetaDataFetching = "metadata.fetch" val JoinFetching = "join.fetch" + val JoinSchemaFetching = "join.schema.fetch" val GroupByFetching = "group_by.fetch" val GroupByUpload = "group_by.upload" val GroupByStreaming = "group_by.streaming" diff --git a/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala b/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala index 88d35dfad4..697a52cd3e 100644 --- a/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala +++ b/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala @@ -71,6 +71,14 @@ object Fetcher { context.distribution(Metrics.Name.FetchExceptions, exceptions) context.distribution(Metrics.Name.FetchCount, responseMap.size) } + + /** + * Response for a join schema request + * @param joinSchema - Json response that consists of: + * {"join_name" -> joinName, "key_schema" -> "avro schema string", "value_schema" -> "avro schema string"} + * @param schemaHash - Hash of the join schema payload (used to track updates to key / value schema fields or types) + */ + case class JoinSchemaResponse(joinSchema: String, schemaHash: String) } private[online] case class FetcherResponseWithTs(responses: Seq[Fetcher.Response], endTs: Long) @@ -416,6 +424,25 @@ class Fetcher(val kvStore: KVStore, } } + def fetchJoinSchema(joinName: String): Try[JoinSchemaResponse] = { + val startTime = System.currentTimeMillis() + val ctx = Metrics.Context(Environment.JoinSchemaFetching, join = joinName) + + val joinCodecTry = joinCodecCache(joinName) + + val joinSchemaResponse = joinCodecTry.map { joinCodec => + JoinSchemaResponse(joinCodec.loggingSchema, joinCodec.loggingSchemaHash) + }.recover { + case exception: Throwable => + logger.error(s"Failed to fetch join schema for $joinName", exception) + ctx.incrementException(exception) + throw exception + } + + joinSchemaResponse.foreach(_ => ctx.distribution("response.latency.millis", System.currentTimeMillis() - startTime)) + joinSchemaResponse + } + private def logControlEvent(encTry: Try[JoinCodec]): Unit = { if (encTry.isFailure) return diff --git a/service/src/main/java/ai/chronon/service/FetcherVerticle.java b/service/src/main/java/ai/chronon/service/FetcherVerticle.java index 9ddf5567df..0e8f8fdc79 100644 --- a/service/src/main/java/ai/chronon/service/FetcherVerticle.java +++ b/service/src/main/java/ai/chronon/service/FetcherVerticle.java @@ -4,6 +4,7 @@ import ai.chronon.online.JavaFetcher; import ai.chronon.service.handlers.FetchRouter; import ai.chronon.service.handlers.JoinListHandler; +import ai.chronon.service.handlers.JoinSchemaHandler; import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; import io.vertx.core.http.HttpServer; @@ -41,6 +42,9 @@ protected void startHttpServer(int port, String configJsonString, Api api, Promi // Set up route for list of online joins router.get("/v1/joins").handler(new JoinListHandler(fetcher)); + // Set up route for retrieval of Join schema + router.get("/v1/join/schema/:name").handler(new JoinSchemaHandler(fetcher)); + // Health check route router.get("/ping").handler(ctx -> { ctx.json("Pong!"); diff --git a/service/src/main/java/ai/chronon/service/handlers/JoinSchemaHandler.java b/service/src/main/java/ai/chronon/service/handlers/JoinSchemaHandler.java new file mode 100644 index 0000000000..12a23aa465 --- /dev/null +++ b/service/src/main/java/ai/chronon/service/handlers/JoinSchemaHandler.java @@ -0,0 +1,54 @@ +package ai.chronon.service.handlers; + +import ai.chronon.online.JTry; +import ai.chronon.online.JavaFetcher; +import ai.chronon.online.JavaJoinSchemaResponse; +import io.vertx.core.Handler; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; + +public class JoinSchemaHandler implements Handler { + + private final JavaFetcher fetcher; + private static final Logger logger = LoggerFactory.getLogger(JoinSchemaHandler.class); + + public JoinSchemaHandler(JavaFetcher fetcher) { + this.fetcher = fetcher; + } + + @Override + public void handle(RoutingContext ctx) { + String entityName = ctx.pathParam("name"); + + logger.debug("Retrieving join schema for {}", entityName); + + JTry joinSchemaResponseTry = fetcher.fetchJoinSchema(entityName); + if (! joinSchemaResponseTry.isSuccess()) { + + logger.error("Unable to retrieve join schema for: {}", entityName, joinSchemaResponseTry.getException()); + + List errorMessages = Collections.singletonList(joinSchemaResponseTry.getException().getMessage()); + + ctx.response() + .setStatusCode(500) + .putHeader("content-type", "application/json") + .end(new JsonObject().put("errors", errorMessages).encode()); + return; + } + + JavaJoinSchemaResponse joinSchemaResponse = joinSchemaResponseTry.getValue(); + JsonObject response = + new JsonObject() + .put("joinSchema", joinSchemaResponse.joinSchema) + .put("schemaHash", joinSchemaResponse.schemaHash); + ctx.response() + .setStatusCode(200) + .putHeader("content-type", "application/json") + .end(response.encode()); + } +} diff --git a/service/src/test/java/ai/chronon/service/handlers/JoinSchemaHandlerTest.java b/service/src/test/java/ai/chronon/service/handlers/JoinSchemaHandlerTest.java new file mode 100644 index 0000000000..f5338668c6 --- /dev/null +++ b/service/src/test/java/ai/chronon/service/handlers/JoinSchemaHandlerTest.java @@ -0,0 +1,123 @@ +package ai.chronon.service.handlers; + +import ai.chronon.online.JTry; +import ai.chronon.online.JavaFetcher; +import ai.chronon.online.JavaJoinSchemaResponse; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.ext.web.RoutingContext; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(VertxUnitRunner.class) +public class JoinSchemaHandlerTest { + @Mock + private JavaFetcher mockFetcher; + + @Mock + private RoutingContext routingContext; + + @Mock + private HttpServerResponse response; + + private JoinSchemaHandler handler; + private Vertx vertx; + + @Before + public void setUp(TestContext context) { + MockitoAnnotations.openMocks(this); + vertx = Vertx.vertx(); + + handler = new JoinSchemaHandler(mockFetcher); + + // Set up common routing context behavior + when(routingContext.response()).thenReturn(response); + when(response.putHeader(anyString(), anyString())).thenReturn(response); + when(response.setStatusCode(anyInt())).thenReturn(response); + when(routingContext.pathParam("name")).thenReturn("test_join"); + } + + @Test + public void testSuccessfulRequest(TestContext context) { + Async async = context.async(); + + JavaJoinSchemaResponse joinSchemaResponse = new JavaJoinSchemaResponse("fakejoinSchema", "fakeschemaHash"); + JTry joinSchemaResponseTry = JTry.success(joinSchemaResponse); + + // Set up mocks + when(mockFetcher.fetchJoinSchema(anyString())).thenReturn(joinSchemaResponseTry); + + // Capture the response that will be sent + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(String.class); + + // Trigger call + handler.handle(routingContext); + + // Assert results + vertx.setTimer(1000, id -> { + verify(response).setStatusCode(200); + verify(response).putHeader("content-type", "application/json"); + verify(response).end(responseCaptor.capture()); + + // Verify response format + JsonObject actualResponse = new JsonObject(responseCaptor.getValue()); + String joinSchema = actualResponse.getString("joinSchema"); + String schemaHash = actualResponse.getString("schemaHash"); + + context.assertEquals(joinSchema, "fakejoinSchema"); + context.assertEquals(schemaHash, "fakeschemaHash"); + async.complete(); + }); + } + + @Test + public void testFailedRequest(TestContext context) { + Async async = context.async(); + + // Set up mocks + JTry joinSchemaResponseTry = JTry.failure(new Exception("some fake failure")); + + when(mockFetcher.fetchJoinSchema(anyString())).thenReturn(joinSchemaResponseTry); + + // Capture the response that will be sent + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(String.class); + + // Trigger call + handler.handle(routingContext); + + // Assert results + vertx.setTimer(1000, id -> { + verify(response).setStatusCode(500); + verify(response).putHeader("content-type", "application/json"); + verify(response).end(responseCaptor.capture()); + + // Verify response format + validateFailureResponse(responseCaptor.getValue(), context); + async.complete(); + }); + } + + private void validateFailureResponse(String jsonResponse, TestContext context) { + JsonObject actualResponse = new JsonObject(jsonResponse); + context.assertTrue(actualResponse.containsKey("errors")); + + String failureString = actualResponse.getJsonArray("errors").getString(0); + context.assertNotNull(failureString); + } +} From b2adb512445a854ef2bcd0f562b6888f5d7aa8ae Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Mon, 24 Feb 2025 22:52:09 -0500 Subject: [PATCH 4/9] Rebase updates --- online/src/main/java/ai/chronon/online/JavaFetcher.java | 2 +- online/src/main/scala/ai/chronon/online/JoinCodec.scala | 6 +++--- .../main/scala/ai/chronon/online/fetcher/Fetcher.scala | 2 +- .../scala/ai/chronon/online/fetcher/MetadataStore.scala | 9 +++++---- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/online/src/main/java/ai/chronon/online/JavaFetcher.java b/online/src/main/java/ai/chronon/online/JavaFetcher.java index bb3bd84c6a..0a1408eb1b 100644 --- a/online/src/main/java/ai/chronon/online/JavaFetcher.java +++ b/online/src/main/java/ai/chronon/online/JavaFetcher.java @@ -174,7 +174,7 @@ public CompletableFuture> fetchJoin(List request public CompletableFuture> listJoins(boolean isOnline) { // Get responses from the fetcher - Future> scalaResponses = this.fetcher.listJoins(isOnline); + Future> scalaResponses = this.fetcher.metadataStore().listJoins(isOnline); // convert to Java friendly types return FutureConverters.toJava(scalaResponses).toCompletableFuture().thenApply(ScalaJavaConversions::toJava); } diff --git a/online/src/main/scala/ai/chronon/online/JoinCodec.scala b/online/src/main/scala/ai/chronon/online/JoinCodec.scala index fe0aa3f213..91d558954d 100644 --- a/online/src/main/scala/ai/chronon/online/JoinCodec.scala +++ b/online/src/main/scala/ai/chronon/online/JoinCodec.scala @@ -34,8 +34,8 @@ import com.google.gson.Gson case class JoinCodec(conf: JoinOps, keySchema: StructType, baseValueSchema: StructType, - keyCodec: serde.AvroCodec, - baseValueCodec: serde.AvroCodec) + keyCodec: AvroCodec, + baseValueCodec: AvroCodec) extends Serializable { @transient lazy val valueSchema: StructType = { @@ -89,7 +89,7 @@ case class JoinCodec(conf: JoinOps, object JoinCodec { - def buildLoggingSchema(joinName: String, keyCodec: serde.AvroCodec, valueCodec: serde.AvroCodec): String = { + def buildLoggingSchema(joinName: String, keyCodec: AvroCodec, valueCodec: AvroCodec): String = { val schemaMap = Map( "join_name" -> joinName, "key_schema" -> keyCodec.schemaStr, diff --git a/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala b/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala index 697a52cd3e..7ebff98091 100644 --- a/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala +++ b/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala @@ -23,7 +23,7 @@ import ai.chronon.api.Extensions.{ExternalPartOps, JoinOps, StringOps, Throwable import ai.chronon.api._ import ai.chronon.online.Metrics.Environment import ai.chronon.online.OnlineDerivationUtil.applyDeriveFunc -import ai.chronon.online.fetcher.Fetcher.{Request, Response, ResponseWithContext} +import ai.chronon.online.fetcher.Fetcher.{JoinSchemaResponse, Request, Response, ResponseWithContext} import ai.chronon.online.{serde, _} import com.google.gson.Gson import com.timgroup.statsd.Event diff --git a/online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala index d0d7d61909..cd68e7fbb4 100644 --- a/online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala @@ -21,13 +21,14 @@ import ai.chronon.api.Extensions._ import ai.chronon.api.ScalaJavaConversions.IteratorOps import ai.chronon.api._ import ai.chronon.api.thrift.TBase -import ai.chronon.online.KVStore.PutRequest +import ai.chronon.online.KVStore.{ListRequest, ListResponse, PutRequest} import ai.chronon.online.MetadataEndPoint.NameByTeamEndPointName import ai.chronon.online.OnlineDerivationUtil.buildDerivedFields import ai.chronon.online._ import ai.chronon.online.serde.AvroCodec import org.slf4j.{Logger, LoggerFactory} +import java.nio.charset.StandardCharsets import scala.collection.immutable.SortedMap import scala.collection.{Seq, mutable} import scala.concurrent.{ExecutionContext, Future} @@ -185,7 +186,7 @@ class MetadataStore(fetchContext: FetchContext) { seqListValues .map(kv => new String(kv.valueBytes, StandardCharsets.UTF_8)) - .map(v => ThriftJsonCodec.fromJsonStr[api.Join](v, check = false, classOf[api.Join])) + .map(v => ThriftJsonCodec.fromJsonStr[Join](v, check = false, classOf[Join])) .filter(_.join.metaData.online == isOnline) .map(_.metaData.name) @@ -207,8 +208,8 @@ class MetadataStore(fetchContext: FetchContext) { } } - val listRequest = ListRequest(dataset, propsMap) - kvStore.list(listRequest).flatMap { + val listRequest = ListRequest(fetchContext.metadataDataset, propsMap) + fetchContext.kvStore.list(listRequest).flatMap { response => val joinSeq: Seq[String] = parseJoins(response) From 85d345a245c4ea111867079ea5c82223f445f601 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Tue, 25 Feb 2025 10:43:11 -0500 Subject: [PATCH 5/9] Add list joins test --- online/BUILD.bazel | 2 + .../joins/user_transactions.txn_join_a | 248 ++++++++++++++++++ .../joins/user_transactions.txn_join_d | 248 ++++++++++++++++++ .../chronon/online/test/ListJoinsTest.scala | 104 ++++++++ 4 files changed, 602 insertions(+) create mode 100644 online/src/test/resources/joins/user_transactions.txn_join_a create mode 100644 online/src/test/resources/joins/user_transactions.txn_join_d create mode 100644 online/src/test/scala/ai/chronon/online/test/ListJoinsTest.scala diff --git a/online/BUILD.bazel b/online/BUILD.bazel index 1277133c4d..2b57c7bb5a 100644 --- a/online/BUILD.bazel +++ b/online/BUILD.bazel @@ -70,6 +70,7 @@ test_deps = [ scala_library( name = "test_lib", srcs = glob(["src/test/**/*.scala"]), + resources = glob(["src/test/resources/**/*"]), format = select({ "//tools/config:scala_2_13": False, # Disable for 2.13 "//conditions:default": True, # Enable for other versions @@ -81,6 +82,7 @@ scala_library( scala_test_suite( name = "tests", srcs = glob(["src/test/**/*.scala"]), + resources = glob(["src/test/resources/**/*"]), jvm_flags = _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES, visibility = ["//visibility:public"], deps = test_deps + [":test_lib"], diff --git a/online/src/test/resources/joins/user_transactions.txn_join_a b/online/src/test/resources/joins/user_transactions.txn_join_a new file mode 100644 index 0000000000..ea27d3094f --- /dev/null +++ b/online/src/test/resources/joins/user_transactions.txn_join_a @@ -0,0 +1,248 @@ +{ + "metaData": { + "name": "risk.user_transactions.txn_join_a", + "online": 0, + "production": 0, + "customJson": "{\"check_consistency\": false, \"lag\": 0, \"join_tags\": null, \"join_part_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.users_ds\", \"spec\": \"data.users/ds={{ ds }}\", \"start\": null, \"end\": null}", + "{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}", + "{\"name\": \"wait_for_data.merchants_ds\", \"spec\": \"data.merchants/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "tableProperties": { + "source": "chronon" + }, + "outputNamespace": "default", + "team": "risk", + "samplePercent": 100.0, + "offlineSchedule": "@daily" + }, + "left": { + "events": { + "table": "data.users", + "query": { + "selects": { + "user_id": "user_id", + "ts": "ts" + }, + "timeColumn": "ts", + "setups": [] + } + } + }, + "joinParts": [ + { + "groupBy": { + "metaData": { + "name": "risk.transaction_events.txn_group_by_user", + "online": 1, + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "events": { + "table": "data.txn_events", + "query": { + "selects": { + "user_id": "user_id", + "transaction_amount": "transaction_amount", + "transaction_type": "transaction_type" + }, + "timeColumn": "transaction_time", + "setups": [] + } + } + } + ], + "keyColumns": [ + "user_id" + ], + "aggregations": [ + { + "inputColumn": "transaction_amount", + "operation": 6, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + }, + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 30, + "timeUnit": 1 + }, + { + "length": 365, + "timeUnit": 1 + } + ] + }, + { + "inputColumn": "transaction_amount", + "operation": 7, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + } + ] + } + ] + }, + "prefix": "user" + }, + { + "groupBy": { + "metaData": { + "name": "risk.transaction_events.txn_group_by_merchant", + "online": 1, + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "events": { + "table": "data.txn_events", + "query": { + "selects": { + "merchant_id": "merchant_id", + "transaction_amount": "transaction_amount", + "transaction_type": "transaction_type" + }, + "timeColumn": "transaction_time", + "setups": [] + } + } + } + ], + "keyColumns": [ + "merchant_id" + ], + "aggregations": [ + { + "inputColumn": "transaction_amount", + "operation": 6, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + }, + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 30, + "timeUnit": 1 + }, + { + "length": 365, + "timeUnit": 1 + } + ] + }, + { + "inputColumn": "transaction_amount", + "operation": 7, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + } + ] + } + ] + }, + "prefix": "merchant" + }, + { + "groupBy": { + "metaData": { + "name": "risk.user_data.user_group_by", + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.users_ds\", \"spec\": \"data.users/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "entities": { + "snapshotTable": "data.users", + "query": { + "selects": { + "user_id": "user_id", + "account_age": "account_age", + "account_balance": "account_balance", + "credit_score": "credit_score", + "number_of_devices": "number_of_devices", + "country": "country", + "account_type": "account_type", + "preferred_language": "preferred_language" + }, + "setups": [] + } + } + } + ], + "keyColumns": [ + "user_id" + ] + }, + "prefix": "user" + }, + { + "groupBy": { + "metaData": { + "name": "risk.merchant_data.merchant_group_by", + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.merchants_ds\", \"spec\": \"data.merchants/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "entities": { + "snapshotTable": "data.merchants", + "query": { + "selects": { + "merchant_id": "merchant_id", + "account_age": "account_age", + "zipcode": "zipcode", + "is_big_merchant": "is_big_merchant", + "country": "country", + "account_type": "account_type", + "preferred_language": "preferred_language" + }, + "setups": [] + } + } + } + ], + "keyColumns": [ + "merchant_id" + ] + }, + "prefix": "merchant" + } + ] +} \ No newline at end of file diff --git a/online/src/test/resources/joins/user_transactions.txn_join_d b/online/src/test/resources/joins/user_transactions.txn_join_d new file mode 100644 index 0000000000..bcd4c4dca0 --- /dev/null +++ b/online/src/test/resources/joins/user_transactions.txn_join_d @@ -0,0 +1,248 @@ +{ + "metaData": { + "name": "risk.user_transactions.txn_join_d", + "online": 1, + "production": 0, + "customJson": "{\"check_consistency\": false, \"lag\": 0, \"join_tags\": null, \"join_part_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.users_ds\", \"spec\": \"data.users/ds={{ ds }}\", \"start\": null, \"end\": null}", + "{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}", + "{\"name\": \"wait_for_data.merchants_ds\", \"spec\": \"data.merchants/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "tableProperties": { + "source": "chronon" + }, + "outputNamespace": "default", + "team": "risk", + "samplePercent": 100.0, + "offlineSchedule": "@daily" + }, + "left": { + "events": { + "table": "data.users", + "query": { + "selects": { + "user_id": "user_id", + "ts": "ts" + }, + "timeColumn": "ts", + "setups": [] + } + } + }, + "joinParts": [ + { + "groupBy": { + "metaData": { + "name": "risk.transaction_events.txn_group_by_user", + "online": 1, + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "events": { + "table": "data.txn_events", + "query": { + "selects": { + "user_id": "user_id", + "transaction_amount": "transaction_amount", + "transaction_type": "transaction_type" + }, + "timeColumn": "transaction_time", + "setups": [] + } + } + } + ], + "keyColumns": [ + "user_id" + ], + "aggregations": [ + { + "inputColumn": "transaction_amount", + "operation": 6, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + }, + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 30, + "timeUnit": 1 + }, + { + "length": 365, + "timeUnit": 1 + } + ] + }, + { + "inputColumn": "transaction_amount", + "operation": 7, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + } + ] + } + ] + }, + "prefix": "user" + }, + { + "groupBy": { + "metaData": { + "name": "risk.transaction_events.txn_group_by_merchant", + "online": 1, + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "events": { + "table": "data.txn_events", + "query": { + "selects": { + "merchant_id": "merchant_id", + "transaction_amount": "transaction_amount", + "transaction_type": "transaction_type" + }, + "timeColumn": "transaction_time", + "setups": [] + } + } + } + ], + "keyColumns": [ + "merchant_id" + ], + "aggregations": [ + { + "inputColumn": "transaction_amount", + "operation": 6, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + }, + { + "length": 1, + "timeUnit": 1 + }, + { + "length": 30, + "timeUnit": 1 + }, + { + "length": 365, + "timeUnit": 1 + } + ] + }, + { + "inputColumn": "transaction_amount", + "operation": 7, + "argMap": {}, + "windows": [ + { + "length": 1, + "timeUnit": 0 + } + ] + } + ] + }, + "prefix": "merchant" + }, + { + "groupBy": { + "metaData": { + "name": "risk.user_data.user_group_by", + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.users_ds\", \"spec\": \"data.users/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "entities": { + "snapshotTable": "data.users", + "query": { + "selects": { + "user_id": "user_id", + "account_age": "account_age", + "account_balance": "account_balance", + "credit_score": "credit_score", + "number_of_devices": "number_of_devices", + "country": "country", + "account_type": "account_type", + "preferred_language": "preferred_language" + }, + "setups": [] + } + } + } + ], + "keyColumns": [ + "user_id" + ] + }, + "prefix": "user" + }, + { + "groupBy": { + "metaData": { + "name": "risk.merchant_data.merchant_group_by", + "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", + "dependencies": [ + "{\"name\": \"wait_for_data.merchants_ds\", \"spec\": \"data.merchants/ds={{ ds }}\", \"start\": null, \"end\": null}" + ], + "team": "risk", + "offlineSchedule": "@daily" + }, + "sources": [ + { + "entities": { + "snapshotTable": "data.merchants", + "query": { + "selects": { + "merchant_id": "merchant_id", + "account_age": "account_age", + "zipcode": "zipcode", + "is_big_merchant": "is_big_merchant", + "country": "country", + "account_type": "account_type", + "preferred_language": "preferred_language" + }, + "setups": [] + } + } + } + ], + "keyColumns": [ + "merchant_id" + ] + }, + "prefix": "merchant" + } + ] +} \ No newline at end of file diff --git a/online/src/test/scala/ai/chronon/online/test/ListJoinsTest.scala b/online/src/test/scala/ai/chronon/online/test/ListJoinsTest.scala new file mode 100644 index 0000000000..97468d52b3 --- /dev/null +++ b/online/src/test/scala/ai/chronon/online/test/ListJoinsTest.scala @@ -0,0 +1,104 @@ +package ai.chronon.online.test + +import ai.chronon.api.Constants.{ContinuationKey, MetadataDataset} +import org.mockito.ArgumentMatchers.any +import ai.chronon.online.KVStore.{ListRequest, ListResponse, ListValue} +import ai.chronon.online.fetcher.{FetchContext, MetadataStore} +import ai.chronon.online.{Api, KVStore} +import org.mockito.Answers +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfter +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.mockito.MockitoSugar + +import java.nio.charset.StandardCharsets +import scala.concurrent.duration.DurationInt +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.io.Source +import scala.util.{Success, Try} + +class ListJoinsTest extends AnyFlatSpec with MockitoSugar with BeforeAndAfter with Matchers { + + var api: Api = _ + var kvStore: KVStore = _ + var joinKVMap: Map[Array[Byte], Array[Byte]] = _ + + implicit val ec: ExecutionContext = ExecutionContext.global + + before { + kvStore = mock[KVStore](Answers.RETURNS_DEEP_STUBS) + api = mock[Api] + // The KVStore execution context is implicitly used for + // Future compositions in the Fetcher so provision it in + // the mock to prevent hanging. + when(kvStore.executionContext).thenReturn(ExecutionContext.global) + when(api.genKvStore).thenReturn(kvStore) + joinKVMap = loadJoinKVMap() + } + + it should "return only online joins" in { + val metadataStore = new MetadataStore(FetchContext(kvStore)) + when(kvStore.list(any())).thenReturn(generateListResponse()) + val resultFuture = metadataStore.listJoins() + val result = Await.result(resultFuture, 10.seconds) + assert(result.size == 1) + result.toSet shouldEqual Set("risk.user_transactions.txn_join_d") + } + + it should "fail the call on internal issues" in { + val metadataStore = new MetadataStore(FetchContext(kvStore)) + when(kvStore.list(any())).thenReturn(generateBrokenListResponse()) + an [Exception] should be thrownBy Await.result(metadataStore.listJoins(), 10.seconds) + } + + it should "paginate list calls" in { + val metadataStore = new MetadataStore(FetchContext(kvStore)) + + val responses: Seq[ListValue] = joinKVMap.map(kv => ListValue(kv._1, kv._2)).toSeq + // we want each of these ListValue responses to be returned in a separate ListResponse so we + // can test pagination. So we'll wrap each of these elements in a Try[Seq[..]] + val listResponseValues: Seq[Try[Seq[ListValue]]] = responses.map(v => Success(Seq(v))) + + // first response will have a continuation key + val first = Future(ListResponse(ListRequest(MetadataDataset, Map.empty), listResponseValues.head, Map(ContinuationKey -> "1"))) + // second response will not have a continuation key + val second = Future(ListResponse(ListRequest(MetadataDataset, Map.empty), listResponseValues.last, Map.empty)) + + when(kvStore.list(any())).thenReturn(first, second) + val resultFuture = metadataStore.listJoins() + val result = Await.result(resultFuture, 10.seconds) + assert(result.size == 1) + result.toSet shouldEqual Set("risk.user_transactions.txn_join_d") + } + + private def loadJoinKVMap(): Map[Array[Byte], Array[Byte]] = { + val paths = Seq( + // first is online = false + "joins/user_transactions.txn_join_a", + // this one is online = true + "joins/user_transactions.txn_join_d", + ) + + paths.map { path => + val inputStream = getClass.getClassLoader.getResourceAsStream(path) + if (inputStream == null) { + throw new IllegalArgumentException(s"Resource not found: $path") + } + val src = Source.fromInputStream(inputStream) + (path.getBytes(StandardCharsets.UTF_8), src.mkString.getBytes(StandardCharsets.UTF_8)) + }.toMap + } + + private def generateListResponse(): Future[ListResponse] = { + val listResponseValues: Try[Seq[ListValue]] = Success(joinKVMap.map(kv => ListValue(kv._1, kv._2)).toSeq) + Future(ListResponse(ListRequest(MetadataDataset, Map.empty), listResponseValues, Map.empty)) + } + + private def generateBrokenListResponse(): Future[ListResponse] = { + // we expect things to fail as 'broken_value' is not a valid join + val listResponseValues: Try[Seq[ListValue]] = Success(Seq(ListValue("some_key".getBytes, "broken_value".getBytes))) + Future(ListResponse(ListRequest(MetadataDataset, Map.empty), listResponseValues, Map.empty)) + } + +} From cdf539969ec4bd1045f5641240ed985c1696692f Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Tue, 25 Feb 2025 12:08:43 -0500 Subject: [PATCH 6/9] Fix join schema code + tests & rename publish script --- ...ker_images.sh => publish_docker_images.sh} | 7 +++++ .../online/JavaJoinSchemaResponse.java | 18 +++++++++---- .../ai/chronon/online/fetcher/Fetcher.scala | 9 ++++--- service/BUILD.bazel | 1 + .../service/handlers/JoinSchemaHandler.java | 7 ++++- .../handlers/JoinSchemaHandlerTest.java | 26 +++++++++++++++---- 6 files changed, 53 insertions(+), 15 deletions(-) rename distribution/{publish_gcp_docker_images.sh => publish_docker_images.sh} (89%) diff --git a/distribution/publish_gcp_docker_images.sh b/distribution/publish_docker_images.sh similarity index 89% rename from distribution/publish_gcp_docker_images.sh rename to distribution/publish_docker_images.sh index 6e66d97dfc..3c4446163b 100755 --- a/distribution/publish_gcp_docker_images.sh +++ b/distribution/publish_docker_images.sh @@ -30,9 +30,11 @@ cd $CHRONON_ROOT_DIR echo "Building jars" bazel build //cloud_gcp:cloud_gcp_lib_deploy.jar +bazel build //cloud_aws:cloud_aws_lib_deploy.jar bazel build //service:service_assembly_deploy.jar CLOUD_GCP_JAR="$CHRONON_ROOT_DIR/bazel-bin/cloud_gcp/cloud_gcp_lib_deploy.jar" +CLOUD_AWS_JAR="$CHRONON_ROOT_DIR/bazel-bin/cloud_aws/cloud_aws_lib_deploy.jar" SERVICE_JAR="$CHRONON_ROOT_DIR/bazel-bin/service/service_assembly_deploy.jar" if [ ! -f "$CLOUD_GCP_JAR" ]; then @@ -45,6 +47,11 @@ if [ ! -f "$SERVICE_JAR" ]; then exit 1 fi +if [ ! -f "$CLOUD_AWS_JAR" ]; then + echo "$CLOUD_AWS_JAR not found" + exit 1 +fi + # We copy to build output as the docker build can't access the bazel-bin (as its a symlink) echo "Copying jars to build_output" mkdir -p build_output diff --git a/online/src/main/java/ai/chronon/online/JavaJoinSchemaResponse.java b/online/src/main/java/ai/chronon/online/JavaJoinSchemaResponse.java index a1624135df..7488e7a6ec 100644 --- a/online/src/main/java/ai/chronon/online/JavaJoinSchemaResponse.java +++ b/online/src/main/java/ai/chronon/online/JavaJoinSchemaResponse.java @@ -3,22 +3,30 @@ import ai.chronon.online.fetcher.Fetcher; public class JavaJoinSchemaResponse { - public String joinSchema; + public String joinName; + public String keySchema; + public String valueSchema; public String schemaHash; - public JavaJoinSchemaResponse(String joinSchema, String schemaHash) { - this.joinSchema = joinSchema; + public JavaJoinSchemaResponse(String joinName, String keySchema, String valueSchema, String schemaHash) { + this.joinName = joinName; + this.keySchema = keySchema; + this.valueSchema = valueSchema; this.schemaHash = schemaHash; } public JavaJoinSchemaResponse(Fetcher.JoinSchemaResponse scalaResponse){ - this.joinSchema = scalaResponse.joinSchema(); + this.joinName = scalaResponse.joinName(); + this.keySchema = scalaResponse.keySchema(); + this.valueSchema = scalaResponse.valueSchema(); this.schemaHash = scalaResponse.schemaHash(); } public Fetcher.JoinSchemaResponse toScala() { return new Fetcher.JoinSchemaResponse( - joinSchema, + joinName, + keySchema, + valueSchema, schemaHash); } } diff --git a/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala b/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala index 7ebff98091..3c0e004977 100644 --- a/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala +++ b/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala @@ -74,11 +74,12 @@ object Fetcher { /** * Response for a join schema request - * @param joinSchema - Json response that consists of: - * {"join_name" -> joinName, "key_schema" -> "avro schema string", "value_schema" -> "avro schema string"} + * @param joinName - Name of the join + * @param keySchema - Avro schema string for the key + * @param valueSchema - Avro schema string for the value * @param schemaHash - Hash of the join schema payload (used to track updates to key / value schema fields or types) */ - case class JoinSchemaResponse(joinSchema: String, schemaHash: String) + case class JoinSchemaResponse(joinName: String, keySchema: String, valueSchema: String, schemaHash: String) } private[online] case class FetcherResponseWithTs(responses: Seq[Fetcher.Response], endTs: Long) @@ -431,7 +432,7 @@ class Fetcher(val kvStore: KVStore, val joinCodecTry = joinCodecCache(joinName) val joinSchemaResponse = joinCodecTry.map { joinCodec => - JoinSchemaResponse(joinCodec.loggingSchema, joinCodec.loggingSchemaHash) + JoinSchemaResponse(joinName, joinCodec.keyCodec.schemaStr, joinCodec.valueCodec.schemaStr, joinCodec.loggingSchemaHash) }.recover { case exception: Throwable => logger.error(s"Failed to fetch join schema for $joinName", exception) diff --git a/service/BUILD.bazel b/service/BUILD.bazel index 6fdb2ced94..ccd7fb99b4 100644 --- a/service/BUILD.bazel +++ b/service/BUILD.bazel @@ -29,6 +29,7 @@ test_deps = _VERTX_TEST_DEPS + [ maven_artifact("org.junit.platform:junit-platform-reporting"), maven_artifact("net.bytebuddy:byte-buddy"), maven_artifact("net.bytebuddy:byte-buddy-agent"), + maven_artifact("org.apache.avro:avro"), ] java_library( diff --git a/service/src/main/java/ai/chronon/service/handlers/JoinSchemaHandler.java b/service/src/main/java/ai/chronon/service/handlers/JoinSchemaHandler.java index 12a23aa465..814770afa2 100644 --- a/service/src/main/java/ai/chronon/service/handlers/JoinSchemaHandler.java +++ b/service/src/main/java/ai/chronon/service/handlers/JoinSchemaHandler.java @@ -42,9 +42,14 @@ public void handle(RoutingContext ctx) { } JavaJoinSchemaResponse joinSchemaResponse = joinSchemaResponseTry.getValue(); + JsonObject joinSchemaObj = new JsonObject() + .put("joinName", joinSchemaResponse.joinName) + .put("keySchema", joinSchemaResponse.keySchema) + .put("valueSchema", joinSchemaResponse.valueSchema); + JsonObject response = new JsonObject() - .put("joinSchema", joinSchemaResponse.joinSchema) + .put("joinSchema", joinSchemaObj) .put("schemaHash", joinSchemaResponse.schemaHash); ctx.response() .setStatusCode(200) diff --git a/service/src/test/java/ai/chronon/service/handlers/JoinSchemaHandlerTest.java b/service/src/test/java/ai/chronon/service/handlers/JoinSchemaHandlerTest.java index f5338668c6..32c8f495b8 100644 --- a/service/src/test/java/ai/chronon/service/handlers/JoinSchemaHandlerTest.java +++ b/service/src/test/java/ai/chronon/service/handlers/JoinSchemaHandlerTest.java @@ -11,6 +11,7 @@ import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import io.vertx.ext.web.RoutingContext; +import org.apache.avro.Schema; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -57,7 +58,9 @@ public void setUp(TestContext context) { public void testSuccessfulRequest(TestContext context) { Async async = context.async(); - JavaJoinSchemaResponse joinSchemaResponse = new JavaJoinSchemaResponse("fakejoinSchema", "fakeschemaHash"); + String avroSchemaString = "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"}]}"; + + JavaJoinSchemaResponse joinSchemaResponse = new JavaJoinSchemaResponse("user_join", avroSchemaString, avroSchemaString, "fakeschemaHash"); JTry joinSchemaResponseTry = JTry.success(joinSchemaResponse); // Set up mocks @@ -75,13 +78,26 @@ public void testSuccessfulRequest(TestContext context) { verify(response).putHeader("content-type", "application/json"); verify(response).end(responseCaptor.capture()); - // Verify response format + // Compare response strings JsonObject actualResponse = new JsonObject(responseCaptor.getValue()); - String joinSchema = actualResponse.getString("joinSchema"); - String schemaHash = actualResponse.getString("schemaHash"); - context.assertEquals(joinSchema, "fakejoinSchema"); + String schemaHash = actualResponse.getString("schemaHash"); context.assertEquals(schemaHash, "fakeschemaHash"); + + JsonObject returnedJoinSchema = actualResponse.getJsonObject("joinSchema"); + + String returnedJoinName = returnedJoinSchema.getString("joinName"); + context.assertEquals(returnedJoinName, "user_join"); + + String keySchema = returnedJoinSchema.getString("keySchema"); + context.assertEquals(keySchema, avroSchemaString); + + String valueSchema = returnedJoinSchema.getString("valueSchema"); + context.assertEquals(valueSchema, avroSchemaString); + + // confirm we can parse the avro schema fine + new Schema.Parser().parse(keySchema); + new Schema.Parser().parse(valueSchema); async.complete(); }); } From e55f8103d570f0ec26681b77866ad0c3c218f461 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Tue, 25 Feb 2025 12:10:13 -0500 Subject: [PATCH 7/9] style: Apply scalafix and scalafmt changes --- .../ai/chronon/online/fetcher/Fetcher.scala | 27 +++++---- .../online/fetcher/MetadataStore.scala | 55 +++++++++---------- .../chronon/online/test/ListJoinsTest.scala | 7 ++- 3 files changed, 45 insertions(+), 44 deletions(-) diff --git a/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala b/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala index 3c0e004977..0c2fed7240 100644 --- a/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala +++ b/online/src/main/scala/ai/chronon/online/fetcher/Fetcher.scala @@ -72,13 +72,12 @@ object Fetcher { context.distribution(Metrics.Name.FetchCount, responseMap.size) } - /** - * Response for a join schema request - * @param joinName - Name of the join - * @param keySchema - Avro schema string for the key - * @param valueSchema - Avro schema string for the value - * @param schemaHash - Hash of the join schema payload (used to track updates to key / value schema fields or types) - */ + /** Response for a join schema request + * @param joinName - Name of the join + * @param keySchema - Avro schema string for the key + * @param valueSchema - Avro schema string for the value + * @param schemaHash - Hash of the join schema payload (used to track updates to key / value schema fields or types) + */ case class JoinSchemaResponse(joinName: String, keySchema: String, valueSchema: String, schemaHash: String) } @@ -431,14 +430,18 @@ class Fetcher(val kvStore: KVStore, val joinCodecTry = joinCodecCache(joinName) - val joinSchemaResponse = joinCodecTry.map { joinCodec => - JoinSchemaResponse(joinName, joinCodec.keyCodec.schemaStr, joinCodec.valueCodec.schemaStr, joinCodec.loggingSchemaHash) - }.recover { - case exception: Throwable => + val joinSchemaResponse = joinCodecTry + .map { joinCodec => + JoinSchemaResponse(joinName, + joinCodec.keyCodec.schemaStr, + joinCodec.valueCodec.schemaStr, + joinCodec.loggingSchemaHash) + } + .recover { case exception: Throwable => logger.error(s"Failed to fetch join schema for $joinName", exception) ctx.incrementException(exception) throw exception - } + } joinSchemaResponse.foreach(_ => ctx.distribution("response.latency.millis", System.currentTimeMillis() - startTime)) joinSchemaResponse diff --git a/online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala b/online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala index cd68e7fbb4..8cadfd148c 100644 --- a/online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala +++ b/online/src/main/scala/ai/chronon/online/fetcher/MetadataStore.scala @@ -181,47 +181,44 @@ class MetadataStore(fetchContext: FetchContext) { def parseJoins(response: ListResponse): Seq[String] = { val result = response.values - .map { - seqListValues => - - seqListValues - .map(kv => new String(kv.valueBytes, StandardCharsets.UTF_8)) - .map(v => ThriftJsonCodec.fromJsonStr[Join](v, check = false, classOf[Join])) - .filter(_.join.metaData.online == isOnline) - .map(_.metaData.name) - - }.recover { - case e: Exception => - logger.error("Failed to list & parse joins from list response", e) - context.withSuffix("join_list").increment(Metrics.Name.Exception) - throw e + .map { seqListValues => + seqListValues + .map(kv => new String(kv.valueBytes, StandardCharsets.UTF_8)) + .map(v => ThriftJsonCodec.fromJsonStr[Join](v, check = false, classOf[Join])) + .filter(_.join.metaData.online == isOnline) + .map(_.metaData.name) + + } + .recover { case e: Exception => + logger.error("Failed to list & parse joins from list response", e) + context.withSuffix("join_list").increment(Metrics.Name.Exception) + throw e } result.get } - def doRetrieveAllListConfs(acc: mutable.ArrayBuffer[String], paginationKey: Option[Any] = None): Future[Seq[String]] = { + def doRetrieveAllListConfs(acc: mutable.ArrayBuffer[String], + paginationKey: Option[Any] = None): Future[Seq[String]] = { val propsMap = { paginationKey match { case Some(key) => Map(ListEntityType -> JoinKeyword, ContinuationKey -> key) - case None => Map(ListEntityType -> JoinKeyword) + case None => Map(ListEntityType -> JoinKeyword) } } val listRequest = ListRequest(fetchContext.metadataDataset, propsMap) - fetchContext.kvStore.list(listRequest).flatMap { - response => - - val joinSeq: Seq[String] = parseJoins(response) - val newAcc = acc ++ joinSeq - if (response.resultProps.contains(ContinuationKey)) { - doRetrieveAllListConfs(newAcc, response.resultProps.get(ContinuationKey)) - } else { - context - .withSuffix("join_list") - .distribution(Metrics.Name.LatencyMillis, System.currentTimeMillis() - startTimeMs) - Future.successful(newAcc) - } + fetchContext.kvStore.list(listRequest).flatMap { response => + val joinSeq: Seq[String] = parseJoins(response) + val newAcc = acc ++ joinSeq + if (response.resultProps.contains(ContinuationKey)) { + doRetrieveAllListConfs(newAcc, response.resultProps.get(ContinuationKey)) + } else { + context + .withSuffix("join_list") + .distribution(Metrics.Name.LatencyMillis, System.currentTimeMillis() - startTimeMs) + Future.successful(newAcc) + } } } diff --git a/online/src/test/scala/ai/chronon/online/test/ListJoinsTest.scala b/online/src/test/scala/ai/chronon/online/test/ListJoinsTest.scala index 97468d52b3..cff44a09b0 100644 --- a/online/src/test/scala/ai/chronon/online/test/ListJoinsTest.scala +++ b/online/src/test/scala/ai/chronon/online/test/ListJoinsTest.scala @@ -49,7 +49,7 @@ class ListJoinsTest extends AnyFlatSpec with MockitoSugar with BeforeAndAfter wi it should "fail the call on internal issues" in { val metadataStore = new MetadataStore(FetchContext(kvStore)) when(kvStore.list(any())).thenReturn(generateBrokenListResponse()) - an [Exception] should be thrownBy Await.result(metadataStore.listJoins(), 10.seconds) + an[Exception] should be thrownBy Await.result(metadataStore.listJoins(), 10.seconds) } it should "paginate list calls" in { @@ -61,7 +61,8 @@ class ListJoinsTest extends AnyFlatSpec with MockitoSugar with BeforeAndAfter wi val listResponseValues: Seq[Try[Seq[ListValue]]] = responses.map(v => Success(Seq(v))) // first response will have a continuation key - val first = Future(ListResponse(ListRequest(MetadataDataset, Map.empty), listResponseValues.head, Map(ContinuationKey -> "1"))) + val first = Future( + ListResponse(ListRequest(MetadataDataset, Map.empty), listResponseValues.head, Map(ContinuationKey -> "1"))) // second response will not have a continuation key val second = Future(ListResponse(ListRequest(MetadataDataset, Map.empty), listResponseValues.last, Map.empty)) @@ -77,7 +78,7 @@ class ListJoinsTest extends AnyFlatSpec with MockitoSugar with BeforeAndAfter wi // first is online = false "joins/user_transactions.txn_join_a", // this one is online = true - "joins/user_transactions.txn_join_d", + "joins/user_transactions.txn_join_d" ) paths.map { path => From 26856c2887003a498ea5d62b76b743020a340384 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Tue, 25 Feb 2025 12:53:24 -0500 Subject: [PATCH 8/9] Streamline some pojo stuff --- .../chronon/service/handlers/JoinSchemaHandler.java | 12 ++---------- .../service/handlers/JoinSchemaHandlerTest.java | 8 +++----- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/service/src/main/java/ai/chronon/service/handlers/JoinSchemaHandler.java b/service/src/main/java/ai/chronon/service/handlers/JoinSchemaHandler.java index 814770afa2..f259814a7b 100644 --- a/service/src/main/java/ai/chronon/service/handlers/JoinSchemaHandler.java +++ b/service/src/main/java/ai/chronon/service/handlers/JoinSchemaHandler.java @@ -42,18 +42,10 @@ public void handle(RoutingContext ctx) { } JavaJoinSchemaResponse joinSchemaResponse = joinSchemaResponseTry.getValue(); - JsonObject joinSchemaObj = new JsonObject() - .put("joinName", joinSchemaResponse.joinName) - .put("keySchema", joinSchemaResponse.keySchema) - .put("valueSchema", joinSchemaResponse.valueSchema); - - JsonObject response = - new JsonObject() - .put("joinSchema", joinSchemaObj) - .put("schemaHash", joinSchemaResponse.schemaHash); + ctx.response() .setStatusCode(200) .putHeader("content-type", "application/json") - .end(response.encode()); + .end(JsonObject.mapFrom(joinSchemaResponse).encode()); } } diff --git a/service/src/test/java/ai/chronon/service/handlers/JoinSchemaHandlerTest.java b/service/src/test/java/ai/chronon/service/handlers/JoinSchemaHandlerTest.java index 32c8f495b8..075d611939 100644 --- a/service/src/test/java/ai/chronon/service/handlers/JoinSchemaHandlerTest.java +++ b/service/src/test/java/ai/chronon/service/handlers/JoinSchemaHandlerTest.java @@ -84,15 +84,13 @@ public void testSuccessfulRequest(TestContext context) { String schemaHash = actualResponse.getString("schemaHash"); context.assertEquals(schemaHash, "fakeschemaHash"); - JsonObject returnedJoinSchema = actualResponse.getJsonObject("joinSchema"); - - String returnedJoinName = returnedJoinSchema.getString("joinName"); + String returnedJoinName = actualResponse.getString("joinName"); context.assertEquals(returnedJoinName, "user_join"); - String keySchema = returnedJoinSchema.getString("keySchema"); + String keySchema = actualResponse.getString("keySchema"); context.assertEquals(keySchema, avroSchemaString); - String valueSchema = returnedJoinSchema.getString("valueSchema"); + String valueSchema = actualResponse.getString("valueSchema"); context.assertEquals(valueSchema, avroSchemaString); // confirm we can parse the avro schema fine From 58aa0473038c763100b218462498ec50690a51e5 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Tue, 25 Feb 2025 15:34:09 -0500 Subject: [PATCH 9/9] PR feedback --- service/src/main/java/ai/chronon/service/FetcherVerticle.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/src/main/java/ai/chronon/service/FetcherVerticle.java b/service/src/main/java/ai/chronon/service/FetcherVerticle.java index 0e8f8fdc79..58b93d7909 100644 --- a/service/src/main/java/ai/chronon/service/FetcherVerticle.java +++ b/service/src/main/java/ai/chronon/service/FetcherVerticle.java @@ -43,7 +43,7 @@ protected void startHttpServer(int port, String configJsonString, Api api, Promi router.get("/v1/joins").handler(new JoinListHandler(fetcher)); // Set up route for retrieval of Join schema - router.get("/v1/join/schema/:name").handler(new JoinSchemaHandler(fetcher)); + router.get("/v1/join/:name/schema").handler(new JoinSchemaHandler(fetcher)); // Health check route router.get("/ping").handler(ctx -> {