Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions spark/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ scala_library(
"//conditions:default": True, # Enable for other versions
}),
visibility = ["//visibility:public"],
deps = test_deps,
deps = test_deps + _RUNFILES_DEP,
)

scala_test_suite(
Expand All @@ -123,11 +123,16 @@ scala_test_suite(
scala_test_suite(
name = "fetcher_test",
srcs = glob(["src/test/scala/ai/chronon/spark/test/fetcher/*.scala"]),
data = [
"//spark/src/test/resources:test-resources",
],
# defined in prelude_bazel file
jvm_flags = _JVM_FLAGS_FOR_ACCESSING_BASE_JAVA_CLASSES,
resources = ["//spark/src/test/resources:test-resources"],
visibility = ["//visibility:public"],
deps = test_deps + [":test_lib"],
deps = test_deps + [
":test_lib",
] + _RUNFILES_DEP,
)

scala_test_suite(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,24 @@ import ai.chronon.api.Constants.MetadataDataset
import ai.chronon.api.Extensions.{JoinOps, MetadataOps}
import ai.chronon.api.ScalaJavaConversions._
import ai.chronon.api._
import ai.chronon.online.fetcher.Fetcher.Request
import ai.chronon.online.KVStore.GetRequest
import ai.chronon.online.fetcher.FetchContext
import ai.chronon.online._
import ai.chronon.online.fetcher.FetchContext
import ai.chronon.online.fetcher.Fetcher.Request
import ai.chronon.spark.Extensions._
import ai.chronon.spark.stats.ConsistencyJob
import ai.chronon.spark.test.{DataFrameGen, OnlineUtils, SchemaEvolutionUtils}
import ai.chronon.spark.utils.MockApi
import ai.chronon.spark.{Join => _, _}
import com.google.gson.GsonBuilder
import com.google.devtools.build.runfiles.Runfiles
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.functions.{avg, col, lit}
import org.apache.spark.sql.{Row, SparkSession}
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
import org.scalatest.flatspec.AnyFlatSpec
import org.slf4j.{Logger, LoggerFactory}

import java.nio.charset.StandardCharsets
import java.util.TimeZone
import java.util.concurrent.Executors
import java.{lang, util}
Expand All @@ -47,6 +48,7 @@ import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.io.Source


class FetcherTest extends AnyFlatSpec {

@transient lazy val logger: Logger = LoggerFactory.getLogger(getClass)
Expand All @@ -58,16 +60,17 @@ class FetcherTest extends AnyFlatSpec {
private val today = tableUtils.partitionSpec.at(System.currentTimeMillis())
private val yesterday = tableUtils.partitionSpec.before(today)

// todo(tchow):reenable
it should "test metadata store" ignore {
it should "test metadata store" in {
implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
implicit val tableUtils: TableUtils = TableUtils(spark)

val joinPath = "joins/team/example_join.v1"
val confResource = getClass.getResource(s"/$joinPath")
val confResource = getClass.getClassLoader.getResource(s"$joinPath")
val src = Source.fromResource(joinPath)
println(s"conf resource path for dir walker: ${confResource.getPath}")

val runFilesResource = Runfiles.create().rlocation("chronon/spark/src/test/resources/")

val expected = {
try src.mkString
finally src.close()
Expand All @@ -79,7 +82,7 @@ class FetcherTest extends AnyFlatSpec {
val singleFileMetadataStore = new fetcher.MetadataStore(FetchContext(inMemoryKvStore, singleFileDataSet))
inMemoryKvStore.create(singleFileDataSet)
// set the working directory to /chronon instead of $MODULE_DIR in configuration if Intellij fails testing
val singleFileDirWalker = new MetadataDirWalker(confResource.getPath, acceptedEndPoints)
val singleFileDirWalker = new MetadataDirWalker(runFilesResource, acceptedEndPoints)
val singleFileKvMap = singleFileDirWalker.run
val singleFilePut: Seq[Future[scala.collection.Seq[Boolean]]] = singleFileKvMap.toSeq.map { case (_, kvMap) =>
singleFileMetadataStore.put(kvMap, singleFileDataSet)
Expand Down
Loading