Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ case class FetchContext(kvStore: KVStore,
.exists(_.asInstanceOf[Boolean])
}

def isCachingEnabled(groupByName: String): Boolean = {
Option(flagStore)
.exists(_.isSet("enable_fetcher_batch_ir_cache", Map("group_by_streaming_dataset" -> groupByName).toJava))
}

def shouldStreamingDecodeThrow(groupByName: String): Boolean = {
Option(flagStore)
.exists(
Expand Down
15 changes: 11 additions & 4 deletions online/src/main/scala/ai/chronon/online/fetcher/FetcherCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,19 @@ trait FetcherCache {
@transient private lazy val logger: Logger = LoggerFactory.getLogger(getClass)

val batchIrCacheName = "batch_cache"
val maybeBatchIrCache: Option[BatchIrCache] =
val defaultBatchIrCacheSize = "10000"

val configuredBatchIrCacheSize: Option[Int] =
Option(System.getProperty("ai.chronon.fetcher.batch_ir_cache_size_elements"))
.map(size => new BatchIrCache(batchIrCacheName, size.toInt))
.orElse(None)
.orElse(Some(defaultBatchIrCacheSize))
.map(_.toInt)
.filter(_ > 0)

val maybeBatchIrCache: Option[BatchIrCache] =
configuredBatchIrCacheSize
.map(size => new BatchIrCache(batchIrCacheName, size))

// Caching needs to be configured globally
// Caching needs to be configured globally with a cache size > 0
def isCacheSizeConfigured: Boolean = maybeBatchIrCache.isDefined

// Caching needs to be enabled for the specific groupBy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@ class GroupByFetcher(fetchContext: FetchContext, metadataStore: MetadataStore)
@transient private implicit lazy val logger: Logger = LoggerFactory.getLogger(getClass)

override def isCachingEnabled(groupBy: GroupBy): Boolean = {
if (!isCacheSizeConfigured || groupBy.getMetaData == null || groupBy.getMetaData.getName == null) return false

val gbName = groupBy.getMetaData.getName

val isCachingFlagEnabled = fetchContext.isCachingEnabled(gbName)

if (fetchContext.debug)
logger.info(s"Online IR caching is ${if (isCachingFlagEnabled) "enabled" else "disabled"} for $gbName")
configuredBatchIrCacheSize match {
case Some(cacheSize) =>
logger.info(s"Online IR caching is enabled with cache size = $cacheSize")
case None =>
logger.info("Online IR caching is disabled")
}

isCachingFlagEnabled
isCacheSizeConfigured
}

/** Convert a groupBy request into a batch kv request and optionally a streaming kv request
Expand Down
29 changes: 0 additions & 29 deletions online/src/test/scala/ai/chronon/online/test/FetcherBaseTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,35 +203,6 @@ class FetcherBaseTest extends AnyFlatSpec with MockitoSugar with Matchers with M
verify(ttlCache, never()).apply(any())
}

it should "determine if caching is enabled correctly" in {
val flagStore: FlagStore = (flagName: String, attributes: java.util.Map[String, String]) => {
flagName match {
case "enable_fetcher_batch_ir_cache" =>
attributes.get("group_by_streaming_dataset") match {
case "test_groupby_2" => false
case "test_groupby_3" => true
case other @ _ =>
fail(s"Unexpected group_by_streaming_dataset: $other")
false
}
case _ => false
}
}

kvStore = mock[KVStore](Answers.RETURNS_DEEP_STUBS)
when(kvStore.executionContext).thenReturn(ExecutionContext.global)

val fetchContext = FetchContext(kvStore, flagStore = flagStore)

val fetcherBaseWithFlagStore =
spy[fetcher.JoinPartFetcher](new fetcher.JoinPartFetcher(fetchContext, new MetadataStore(fetchContext)))
when(fetcherBaseWithFlagStore.isCacheSizeConfigured).thenReturn(true)

// no name set
assertFalse(fetchContext.isCachingEnabled("test_groupby_2"))
assertTrue(fetchContext.isCachingEnabled("test_groupby_3"))
}

it should "fetch in the happy case" in {
val fetchContext = mock[FetchContext]
val baseFetcher = new fetcher.JoinPartFetcher(fetchContext, mock[MetadataStore])
Expand Down
19 changes: 15 additions & 4 deletions service/src/main/java/ai/chronon/service/FetcherVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,27 @@ public class FetcherVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
ConfigStore cfgStore = new ConfigStore(vertx);
startHttpServer(cfgStore.getServerPort(), cfgStore.encodeConfig(), ApiProvider.buildApi(cfgStore), startPromise);

Api api = ApiProvider.buildApi(cfgStore);

// Execute the blocking Bigtable initialization in a separate worker thread
vertx.executeBlocking(() -> api.buildJavaFetcher("feature-service", false))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we encapsulate this async connection inside startHttpServer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so from what I understand, this is a prereq to what we run in startHttpServer and the code in the startHttpServer method is partly run inside the worker threads. So pulling this prior ensures that we init the client & connections and when those are done we spin up the http server

.onSuccess(fetcher -> {
try {
// This code runs back on the event loop when the blocking operation completes
startHttpServer(cfgStore.getServerPort(), cfgStore.encodeConfig(), fetcher, startPromise);
} catch (Exception e) {
startPromise.fail(e);
}
})
.onFailure(startPromise::fail);
}

protected void startHttpServer(int port, String configJsonString, Api api, Promise<Void> startPromise) throws Exception {
protected void startHttpServer(int port, String configJsonString, JavaFetcher fetcher, Promise<Void> startPromise) throws Exception {
Router router = Router.router(vertx);

// Define routes

JavaFetcher fetcher = api.buildJavaFetcher("feature-service", false);

// Set up sub-routes for the various feature retrieval apis
router.route("/v1/fetch/*").subRouter(FetchRouter.createFetchRoutes(vertx, fetcher));

Expand Down