Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions spark/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ scala_library(
"//conditions:default": True, # Enable for other versions
}),
visibility = ["//visibility:public"],
deps = test_deps,
deps = test_deps + _RUNFILES_DEP,
)

scala_test_suite(
Expand All @@ -123,11 +123,16 @@ scala_test_suite(
scala_test_suite(
name = "fetcher_test",
srcs = glob(["src/test/scala/ai/chronon/spark/test/fetcher/*.scala"]),
data = [
"//spark/src/test/resources:test-resources",
],
# defined in prelude_bazel file
jvm_flags = _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES,
resources = ["//spark/src/test/resources:test-resources"],
visibility = ["//visibility:public"],
deps = test_deps + [":test_lib"],
deps = test_deps + [
":test_lib",
] + _RUNFILES_DEP,
)

scala_test_suite(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ import ai.chronon.api.Constants.MetadataDataset
import ai.chronon.api.Extensions.{JoinOps, MetadataOps}
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.api._
import ai.chronon.online.fetcher.Fetcher.Request
import ai.chronon.online.KVStore.GetRequest
import ai.chronon.online.fetcher.FetchContext
import ai.chronon.online._
import ai.chronon.online.fetcher.FetchContext
import ai.chronon.online.fetcher.Fetcher.Request
import ai.chronon.spark.Extensions._
import ai.chronon.spark.stats.ConsistencyJob
import ai.chronon.spark.test.{DataFrameGen, OnlineUtils, SchemaEvolutionUtils}
import ai.chronon.spark.utils.MockApi
import ai.chronon.spark.{Join => _, _}
import com.google.gson.GsonBuilder
import com.google.devtools.build.runfiles.Runfiles
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.functions.{avg, col, lit}
import org.apache.spark.sql.{Row, SparkSession}
Expand All @@ -58,16 +58,17 @@ class FetcherTest extends AnyFlatSpec {
private val today = tableUtils.partitionSpec.at(System.currentTimeMillis())
private val yesterday = tableUtils.partitionSpec.before(today)

// todo(tchow):reenable
it should "test metadata store" ignore {
it should "test metadata store" in {
implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
implicit val tableUtils: TableUtils = TableUtils(spark)

val joinPath = "joins/team/example_join.v1"
val confResource = getClass.getResource(s"/$joinPath")
val confResource = getClass.getClassLoader.getResource(s"$joinPath")
val src = Source.fromResource(joinPath)
println(s"conf resource path for dir walker: ${confResource.getPath}")

val runFilesResource = Runfiles.create().rlocation("chronon/spark/src/test/resources/")

val expected = {
try src.mkString
finally src.close()
Expand All @@ -79,44 +80,46 @@ class FetcherTest extends AnyFlatSpec {
val singleFileMetadataStore = new fetcher.MetadataStore(FetchContext(inMemoryKvStore, singleFileDataSet))
inMemoryKvStore.create(singleFileDataSet)
// set the working directory to /chronon instead of $MODULE_DIR in configuration if Intellij fails testing
val singleFileDirWalker = new MetadataDirWalker(confResource.getPath, acceptedEndPoints)
val singleFileDirWalker = new MetadataDirWalker(runFilesResource, acceptedEndPoints)
val singleFileKvMap = singleFileDirWalker.run
val singleFilePut: Seq[Future[scala.collection.Seq[Boolean]]] = singleFileKvMap.toSeq.map { case (_, kvMap) =>
singleFileMetadataStore.put(kvMap, singleFileDataSet)
}
singleFilePut.flatMap(putRequests => Await.result(putRequests, Duration.Inf))

val response = inMemoryKvStore.get(GetRequest(joinPath.getBytes(), singleFileDataSet))
val joinKeyName = "joins/team.example_join.v1"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey guys, so this is a change I'm not sure is exactly correct, but it does get the tests to pass. I had to change joins/team/example_join.v1 -> joins/team.example_join.v1 removing the second / since that didn't seem to match up with what was written to the kv store. Not sure if this behavior had changed at any point. cc @piyush-zlai @nikhil-zlai

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is correct. join and groupby name in the metadata would have the team inside it.

ex:
quickstart.purchases.v1 where quickstart is the team

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonder how this ever worked?


val response = inMemoryKvStore.get(GetRequest(joinKeyName.getBytes(), singleFileDataSet))
val res = Await.result(response, Duration.Inf)
assertTrue(res.latest.isSuccess)
val actual = new String(res.values.get.head.bytes)
assertEquals(expected, actual.replaceAll("\\s+", ""))

val teamMetadataResponse = inMemoryKvStore.getString("joins/relevance", singleFileDataSet, 10000)
val teamMetadataRes = teamMetadataResponse.get
assert(teamMetadataRes.equals("joins/team/example_join.v1"))
assert(teamMetadataRes.equals(joinKeyName))

val directoryDataSetDataSet = MetadataDataset + "_directory_test"
val directoryMetadataStore =
new fetcher.MetadataStore(FetchContext(inMemoryKvStore, directoryDataSetDataSet))
inMemoryKvStore.create(directoryDataSetDataSet)
val directoryDataDirWalker =
new MetadataDirWalker(confResource.getPath.replace(s"/$joinPath", ""), acceptedEndPoints)
new MetadataDirWalker(runFilesResource, acceptedEndPoints)
val directoryDataKvMap = directoryDataDirWalker.run
val directoryPut = directoryDataKvMap.toSeq.map { case (_, kvMap) =>
directoryMetadataStore.put(kvMap, directoryDataSetDataSet)
}
directoryPut.flatMap(putRequests => Await.result(putRequests, Duration.Inf))
val dirResponse =
inMemoryKvStore.get(GetRequest(joinPath.getBytes(), directoryDataSetDataSet))
inMemoryKvStore.get(GetRequest(joinKeyName.getBytes, directoryDataSetDataSet))
val dirRes = Await.result(dirResponse, Duration.Inf)
assertTrue(dirRes.latest.isSuccess)
val dirActual = new String(dirRes.values.get.head.bytes)
assertEquals(expected, dirActual.replaceAll("\\s+", ""))

val teamMetadataDirResponse = inMemoryKvStore.getString("group_bys/team", directoryDataSetDataSet, 10000)
val teamMetadataDirRes = teamMetadataDirResponse.get
assert(teamMetadataDirRes.equals("group_bys/team/example_group_by.v1"))
assert(teamMetadataDirRes.equals("group_bys/team.example_group_by.v1"))

val emptyResponse =
inMemoryKvStore.get(GetRequest("NoneExistKey".getBytes(), "NonExistDataSetName"))
Expand Down
Loading