diff --git a/spark/BUILD.bazel b/spark/BUILD.bazel index b26851d0be..10092d3211 100644 --- a/spark/BUILD.bazel +++ b/spark/BUILD.bazel @@ -101,7 +101,7 @@ scala_library( "//conditions:default": True, # Enable for other versions }), visibility = ["//visibility:public"], - deps = test_deps, + deps = test_deps + _RUNFILES_DEP, ) scala_test_suite( @@ -123,11 +123,16 @@ scala_test_suite( scala_test_suite( name = "fetcher_test", srcs = glob(["src/test/scala/ai/chronon/spark/test/fetcher/*.scala"]), + data = [ + "//spark/src/test/resources:test-resources", + ], # defined in prelude_bazel file jvm_flags = _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES, resources = ["//spark/src/test/resources:test-resources"], visibility = ["//visibility:public"], - deps = test_deps + [":test_lib"], + deps = test_deps + [ + ":test_lib", + ] + _RUNFILES_DEP, ) scala_test_suite( diff --git a/spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala b/spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala index 3f6b9c49ab..fff72194c4 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/fetcher/FetcherTest.scala @@ -22,16 +22,16 @@ import ai.chronon.api.Constants.MetadataDataset import ai.chronon.api.Extensions.{JoinOps, MetadataOps} import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ -import ai.chronon.online.fetcher.Fetcher.Request import ai.chronon.online.KVStore.GetRequest -import ai.chronon.online.fetcher.FetchContext import ai.chronon.online._ +import ai.chronon.online.fetcher.FetchContext +import ai.chronon.online.fetcher.Fetcher.Request import ai.chronon.spark.Extensions._ import ai.chronon.spark.stats.ConsistencyJob import ai.chronon.spark.test.{DataFrameGen, OnlineUtils, SchemaEvolutionUtils} import ai.chronon.spark.utils.MockApi import ai.chronon.spark.{Join => _, _} -import com.google.gson.GsonBuilder +import com.google.devtools.build.runfiles.Runfiles import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.functions.{avg, col, lit} import org.apache.spark.sql.{Row, SparkSession} @@ -58,16 +58,17 @@ class FetcherTest extends AnyFlatSpec { private val today = tableUtils.partitionSpec.at(System.currentTimeMillis()) private val yesterday = tableUtils.partitionSpec.before(today) - // todo(tchow):reenable - it should "test metadata store" ignore { + it should "test metadata store" in { implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) implicit val tableUtils: TableUtils = TableUtils(spark) val joinPath = "joins/team/example_join.v1" - val confResource = getClass.getResource(s"/$joinPath") + val confResource = getClass.getClassLoader.getResource(s"$joinPath") val src = Source.fromResource(joinPath) println(s"conf resource path for dir walker: ${confResource.getPath}") + val runFilesResource = Runfiles.create().rlocation("chronon/spark/src/test/resources/") + val expected = { try src.mkString finally src.close() @@ -79,14 +80,16 @@ class FetcherTest extends AnyFlatSpec { val singleFileMetadataStore = new fetcher.MetadataStore(FetchContext(inMemoryKvStore, singleFileDataSet)) inMemoryKvStore.create(singleFileDataSet) // set the working directory to /chronon instead of $MODULE_DIR in configuration if Intellij fails testing - val singleFileDirWalker = new MetadataDirWalker(confResource.getPath, acceptedEndPoints) + val singleFileDirWalker = new MetadataDirWalker(runFilesResource, acceptedEndPoints) val singleFileKvMap = singleFileDirWalker.run val singleFilePut: Seq[Future[scala.collection.Seq[Boolean]]] = singleFileKvMap.toSeq.map { case (_, kvMap) => singleFileMetadataStore.put(kvMap, singleFileDataSet) } singleFilePut.flatMap(putRequests => Await.result(putRequests, Duration.Inf)) - val response = inMemoryKvStore.get(GetRequest(joinPath.getBytes(), singleFileDataSet)) + val joinKeyName = "joins/team.example_join.v1" + + val response = inMemoryKvStore.get(GetRequest(joinKeyName.getBytes(), singleFileDataSet)) val res = Await.result(response, Duration.Inf) assertTrue(res.latest.isSuccess) val actual = new String(res.values.get.head.bytes) @@ -94,21 +97,21 @@ class FetcherTest extends AnyFlatSpec { val teamMetadataResponse = inMemoryKvStore.getString("joins/relevance", singleFileDataSet, 10000) val teamMetadataRes = teamMetadataResponse.get - assert(teamMetadataRes.equals("joins/team/example_join.v1")) + assert(teamMetadataRes.equals(joinKeyName)) val directoryDataSetDataSet = MetadataDataset + "_directory_test" val directoryMetadataStore = new fetcher.MetadataStore(FetchContext(inMemoryKvStore, directoryDataSetDataSet)) inMemoryKvStore.create(directoryDataSetDataSet) val directoryDataDirWalker = - new MetadataDirWalker(confResource.getPath.replace(s"/$joinPath", ""), acceptedEndPoints) + new MetadataDirWalker(runFilesResource, acceptedEndPoints) val directoryDataKvMap = directoryDataDirWalker.run val directoryPut = directoryDataKvMap.toSeq.map { case (_, kvMap) => directoryMetadataStore.put(kvMap, directoryDataSetDataSet) } directoryPut.flatMap(putRequests => Await.result(putRequests, Duration.Inf)) val dirResponse = - inMemoryKvStore.get(GetRequest(joinPath.getBytes(), directoryDataSetDataSet)) + inMemoryKvStore.get(GetRequest(joinKeyName.getBytes, directoryDataSetDataSet)) val dirRes = Await.result(dirResponse, Duration.Inf) assertTrue(dirRes.latest.isSuccess) val dirActual = new String(dirRes.values.get.head.bytes) @@ -116,7 +119,7 @@ class FetcherTest extends AnyFlatSpec { val teamMetadataDirResponse = inMemoryKvStore.getString("group_bys/team", directoryDataSetDataSet, 10000) val teamMetadataDirRes = teamMetadataDirResponse.get - assert(teamMetadataDirRes.equals("group_bys/team/example_group_by.v1")) + assert(teamMetadataDirRes.equals("group_bys/team.example_group_by.v1")) val emptyResponse = inMemoryKvStore.get(GetRequest("NoneExistKey".getBytes(), "NonExistDataSetName"))