diff --git a/.bazelversion b/.bazelversion new file mode 100644 index 0000000000..19b860c187 --- /dev/null +++ b/.bazelversion @@ -0,0 +1 @@ +6.4.0 diff --git a/.ijwb/.bazelproject b/.ijwb/.bazelproject index d87cdc2369..0145258a47 100644 --- a/.ijwb/.bazelproject +++ b/.ijwb/.bazelproject @@ -1,16 +1 @@ -directories: - # Add the directories you want added as source here - # By default, we've added your entire workspace ('.') - . - -# Automatically includes all relevant targets under the 'directories' above -derive_targets_from_directories: true - -targets: - # If source code isn't resolving, add additional targets that compile it here - -additional_languages: - # Uncomment any additional languages you want supported - python - scala - java +import tools/ide_support/intellij/default_view.bazelproject diff --git a/WORKSPACE b/WORKSPACE index b0d62a697a..be4e7b70b4 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -123,6 +123,12 @@ load("@io_bazel_rules_scala//scala:toolchains.bzl", "scala_register_toolchains") scala_register_toolchains() +load("@io_bazel_rules_scala//testing:junit.bzl", "junit_repositories", "junit_toolchain") + +junit_repositories() + +junit_toolchain() + load("@io_bazel_rules_scala//testing:scalatest.bzl", "scalatest_repositories", "scalatest_toolchain") scalatest_repositories() diff --git a/aggregator/BUILD.bazel b/aggregator/BUILD.bazel index b7458a1f53..87d12b232e 100644 --- a/aggregator/BUILD.bazel +++ b/aggregator/BUILD.bazel @@ -57,13 +57,13 @@ scala_library( ]), ) -scala_test_suite( +scala_junit_test_suite( name = "test", srcs = glob(["src/test/scala/ai/chronon/aggregator/test/*.scala"]), + suffixes = ["Test"], visibility = ["//visibility:public"], deps = [ ":aggregator", - ":test-lib", "//api:api-lib", "//api:api-models", maven_artifact("junit:junit"), diff --git a/aggregator/src/test/scala/ai/chronon/aggregator/test/ApproxDistinctTest.scala b/aggregator/src/test/scala/ai/chronon/aggregator/test/ApproxDistinctTest.scala index 2416a894f5..fb127c6c2c 100644 --- a/aggregator/src/test/scala/ai/chronon/aggregator/test/ApproxDistinctTest.scala +++ b/aggregator/src/test/scala/ai/chronon/aggregator/test/ApproxDistinctTest.scala @@ -19,8 +19,11 @@ package ai.chronon.aggregator.test import ai.chronon.aggregator.base.ApproxDistinctCount import junit.framework.TestCase import org.junit.Assert._ +import org.junit.Test class ApproxDistinctTest extends TestCase { + + @Test def testErrorBound(uniques: Int, errorBound: Int, lgK: Int): Unit = { val uniqueElems = 1 to uniques val duplicates = uniqueElems ++ uniqueElems ++ uniqueElems @@ -32,6 +35,7 @@ class ApproxDistinctTest extends TestCase { assertTrue(Math.abs(estimated - uniques) < errorBound) } + @Test def testMergingErrorBound(uniques: Int, errorBound: Int, lgK: Int, merges: Int): Unit = { val chunkSize = uniques / merges assert(chunkSize > 0) @@ -50,12 +54,14 @@ class ApproxDistinctTest extends TestCase { assertTrue(Math.abs(estimated - uniques) < errorBound) } + @Test def testErrorBounds(): Unit = { testErrorBound(uniques = 100, errorBound = 1, lgK = 10) testErrorBound(uniques = 1000, errorBound = 20, lgK = 10) testErrorBound(uniques = 10000, errorBound = 300, lgK = 10) } + @Test def testMergingErrorBounds(): Unit = { testMergingErrorBound(uniques = 100, errorBound = 1, lgK = 10, merges = 10) testMergingErrorBound(uniques = 1000, errorBound = 20, lgK = 10, merges = 4) diff --git a/aggregator/src/test/scala/ai/chronon/aggregator/test/ApproxHistogramTest.scala b/aggregator/src/test/scala/ai/chronon/aggregator/test/ApproxHistogramTest.scala index eb1512cb5f..177c04d373 100644 --- a/aggregator/src/test/scala/ai/chronon/aggregator/test/ApproxHistogramTest.scala +++ b/aggregator/src/test/scala/ai/chronon/aggregator/test/ApproxHistogramTest.scala @@ -3,11 +3,14 @@ package ai.chronon.aggregator.test import ai.chronon.aggregator.base.{ApproxHistogram, ApproxHistogramIr} import junit.framework.TestCase import org.junit.Assert._ +import org.junit.Test import java.util import scala.jdk.CollectionConverters._ class ApproxHistogramTest extends TestCase { + + @Test def testHistogram(): Unit = { val approxHistogram = new ApproxHistogram[String](3) val counts = (1L to 3).map(i => i.toString -> i).toMap @@ -18,6 +21,7 @@ class ApproxHistogramTest extends TestCase { assertEquals(toHashMap(counts), approxHistogram.finalize(ir)) } + @Test def testSketch(): Unit = { val approxHistogram = new ApproxHistogram[String](3) val counts = (1L to 4).map(i => i.toString -> i).toMap @@ -29,6 +33,7 @@ class ApproxHistogramTest extends TestCase { assertEquals(toHashMap(expected), approxHistogram.finalize(ir)) } + @Test def testMergeSketches(): Unit = { val approxHistogram = new ApproxHistogram[String](3) val counts1: Map[String, Long] = Map("5" -> 5L, "4" -> 4, "2" -> 2, "1" -> 1) @@ -51,6 +56,7 @@ class ApproxHistogramTest extends TestCase { assertTrue(ir.histogram.isEmpty) } + @Test def testMergeHistograms(): Unit = { val approxHistogram = new ApproxHistogram[String](3) val counts1: Map[String, Long] = Map("4" -> 4L, "2" -> 2) @@ -73,6 +79,7 @@ class ApproxHistogramTest extends TestCase { assertTrue(ir.sketch.isEmpty) } + @Test def testMergeHistogramsToSketch(): Unit = { val approxHistogram = new ApproxHistogram[String](3) val counts1: Map[String, Long] = Map("4" -> 4L, "3" -> 3) @@ -96,6 +103,7 @@ class ApproxHistogramTest extends TestCase { assertTrue(ir.histogram.isEmpty) } + @Test def testMergeSketchAndHistogram(): Unit = { val approxHistogram = new ApproxHistogram[String](3) val counts1: Map[String, Long] = Map("5" -> 5L, "3" -> 3, "2" -> 2, "1" -> 1) @@ -118,6 +126,7 @@ class ApproxHistogramTest extends TestCase { assert(ir.histogram.isEmpty) } + @Test def testNormalizeHistogram(): Unit = { val approxHistogram = new ApproxHistogram[String](3) val counts = (1L to 3).map(i => i.toString -> i).toMap @@ -128,6 +137,7 @@ class ApproxHistogramTest extends TestCase { assertEquals(ir, normalized) } + @Test def testNormalizeSketch(): Unit = { val approxHistogram = new ApproxHistogram[String](3) val counts = (1L to 4).map(i => i.toString -> i).toMap diff --git a/aggregator/src/test/scala/ai/chronon/aggregator/test/ApproxPercentilesTest.scala b/aggregator/src/test/scala/ai/chronon/aggregator/test/ApproxPercentilesTest.scala index 3eb8ff5647..2e4b5c4fa4 100644 --- a/aggregator/src/test/scala/ai/chronon/aggregator/test/ApproxPercentilesTest.scala +++ b/aggregator/src/test/scala/ai/chronon/aggregator/test/ApproxPercentilesTest.scala @@ -22,11 +22,14 @@ import ai.chronon.aggregator.row.StatsGenerator import com.yahoo.sketches.kll.KllFloatsSketch import junit.framework.TestCase import org.junit.Assert._ +import org.junit.Test import scala.util.Random class ApproxPercentilesTest extends TestCase { @transient lazy val logger = LoggerFactory.getLogger(getClass) + + @Test def testBasicImpl(nums: Int, slide: Int, k: Int, percentiles: Array[Double], errorPercent: Float): Unit = { val sorted = (0 to nums).map(_.toFloat) val elems = Random.shuffle(sorted.toList).toArray @@ -54,6 +57,7 @@ class ApproxPercentilesTest extends TestCase { diffs.foreach(diff => assertTrue(diff < errorMargin)) } + @Test def testBasicPercentiles: Unit = { val percentiles_tested: Int = 31 val percentiles: Array[Double] = (0 to percentiles_tested).toArray.map(i => i * 1.0 / percentiles_tested) @@ -72,6 +76,7 @@ class ApproxPercentilesTest extends TestCase { drift } + @Test def testPSIDrifts(): Unit = { assertTrue( getPSIDrift( diff --git a/aggregator/src/test/scala/ai/chronon/aggregator/test/FrequentItemsTest.scala b/aggregator/src/test/scala/ai/chronon/aggregator/test/FrequentItemsTest.scala index fac5349ae5..6274c3175d 100644 --- a/aggregator/src/test/scala/ai/chronon/aggregator/test/FrequentItemsTest.scala +++ b/aggregator/src/test/scala/ai/chronon/aggregator/test/FrequentItemsTest.scala @@ -3,11 +3,14 @@ package ai.chronon.aggregator.test import ai.chronon.aggregator.base.{FrequentItemType, FrequentItems, FrequentItemsFriendly, ItemsSketchIR} import junit.framework.TestCase import org.junit.Assert._ +import org.junit.Test import java.util import scala.jdk.CollectionConverters._ class FrequentItemsTest extends TestCase { + + @Test def testNonPowerOfTwoAndTruncate(): Unit = { val size = 3 val items = new FrequentItems[String](size) @@ -31,6 +34,7 @@ class FrequentItemsTest extends TestCase { result) } + @Test def testLessItemsThanSize(): Unit = { val size = 10 val items = new FrequentItems[java.lang.Long](size) @@ -53,6 +57,7 @@ class FrequentItemsTest extends TestCase { result) } + @Test def testZeroSize(): Unit = { val size = 0 val items = new FrequentItems[java.lang.Double](size) @@ -69,6 +74,7 @@ class FrequentItemsTest extends TestCase { assertEquals(new util.HashMap[String, Double](), result) } + @Test def testSketchSizes(): Unit = { val expectedSketchSizes = Map( @@ -88,6 +94,7 @@ class FrequentItemsTest extends TestCase { assertEquals(expectedSketchSizes, actualSketchSizes) } + @Test def testNormalization(): Unit = { val testValues = (1 to 4) .map(i => i -> i) @@ -119,6 +126,7 @@ class FrequentItemsTest extends TestCase { assertEquals(expectedStringValues, actualStringValues) } + @Test def testBulkMerge(): Unit = { val sketch = new FrequentItems[String](3) diff --git a/aggregator/src/test/scala/ai/chronon/aggregator/test/MinHeapTest.scala b/aggregator/src/test/scala/ai/chronon/aggregator/test/MinHeapTest.scala index 5cf5dda1a5..14e7a21c07 100644 --- a/aggregator/src/test/scala/ai/chronon/aggregator/test/MinHeapTest.scala +++ b/aggregator/src/test/scala/ai/chronon/aggregator/test/MinHeapTest.scala @@ -19,11 +19,14 @@ package ai.chronon.aggregator.test import ai.chronon.aggregator.base.MinHeap import junit.framework.TestCase import org.junit.Assert._ +import org.junit.Test import java.util import scala.collection.JavaConverters._ class MinHeapTest extends TestCase { + + @Test def testInserts(): Unit = { val mh = new MinHeap[Int](maxSize = 4, Ordering.Int) diff --git a/aggregator/src/test/scala/ai/chronon/aggregator/test/MomentTest.scala b/aggregator/src/test/scala/ai/chronon/aggregator/test/MomentTest.scala index 4c45eafa35..3997e39284 100644 --- a/aggregator/src/test/scala/ai/chronon/aggregator/test/MomentTest.scala +++ b/aggregator/src/test/scala/ai/chronon/aggregator/test/MomentTest.scala @@ -4,6 +4,7 @@ import ai.chronon.aggregator.base._ import junit.framework.TestCase import org.apache.commons.math3.stat.descriptive.moment.{Kurtosis => ApacheKurtosis, Skewness => ApacheSkew} import org.junit.Assert._ +import org.junit.Test class MomentTest extends TestCase { def makeAgg(aggregator: MomentAggregator, values: Seq[Double]): (MomentAggregator, MomentsIR) = { @@ -35,24 +36,28 @@ class MomentTest extends TestCase { assertEquals(expected(v1 ++ v2), agg.finalize(ir), 0.1) } + @Test def testUpdate(): Unit = { val values = Seq(1.1, 2.2, 3.3, 4.4, 5.5) assertUpdate(new Skew(), values, expectedSkew) assertUpdate(new Kurtosis(), values, expectedKurtosis) } + @Test def testInsufficientSizes(): Unit = { val values = Seq(1.1, 2.2, 3.3, 4.4) assertUpdate(new Skew(), values.take(2), _ => Double.NaN) assertUpdate(new Kurtosis(), values.take(3), _ => Double.NaN) } + @Test def testNoVariance(): Unit = { val values = Seq(1.0, 1.0, 1.0, 1.0) assertUpdate(new Skew(), values, _ => Double.NaN) assertUpdate(new Kurtosis(), values, _ => Double.NaN) } + @Test def testMerge(): Unit = { val values1 = Seq(1.1, 2.2, 3.3) val values2 = Seq(4.4, 5.5) @@ -60,6 +65,7 @@ class MomentTest extends TestCase { assertMerge(new Skew(), values1, values2, expectedSkew) } + @Test def testNormalize(): Unit = { val values = Seq(1.0, 2.0, 3.0, 4.0, 5.0) val (agg, ir) = makeAgg(new Kurtosis, values) diff --git a/aggregator/src/test/scala/ai/chronon/aggregator/test/RowAggregatorTest.scala b/aggregator/src/test/scala/ai/chronon/aggregator/test/RowAggregatorTest.scala index 58c25ce6a9..4e2b2a3751 100644 --- a/aggregator/src/test/scala/ai/chronon/aggregator/test/RowAggregatorTest.scala +++ b/aggregator/src/test/scala/ai/chronon/aggregator/test/RowAggregatorTest.scala @@ -20,6 +20,7 @@ import ai.chronon.aggregator.row.RowAggregator import ai.chronon.api._ import junit.framework.TestCase import org.junit.Assert._ +import org.junit.Test import java.util import scala.collection.JavaConverters._ @@ -49,6 +50,7 @@ object TestRow { } class RowAggregatorTest extends TestCase { + @Test def testUpdate(): Unit = { val rows = List( TestRow(1L, 4, 5.0f, "A", Seq(5, 3, 4), Seq("D", "A", "B", "A"), Map("A" -> 1, "B" -> 2)), diff --git a/aggregator/src/test/scala/ai/chronon/aggregator/test/SawtoothAggregatorTest.scala b/aggregator/src/test/scala/ai/chronon/aggregator/test/SawtoothAggregatorTest.scala index 60bb5fc2c3..cef363a711 100644 --- a/aggregator/src/test/scala/ai/chronon/aggregator/test/SawtoothAggregatorTest.scala +++ b/aggregator/src/test/scala/ai/chronon/aggregator/test/SawtoothAggregatorTest.scala @@ -25,6 +25,7 @@ import ai.chronon.api._ import com.google.gson.Gson import junit.framework.TestCase import org.junit.Assert._ +import org.junit.Test import java.util import scala.collection.mutable @@ -47,6 +48,7 @@ class Timer { class SawtoothAggregatorTest extends TestCase { + @Test def testTailAccuracy(): Unit = { val timer = new Timer val queries = CStream.genTimestamps(new Window(30, TimeUnit.DAYS), 10000, 5 * 60 * 1000) @@ -118,6 +120,7 @@ class SawtoothAggregatorTest extends TestCase { } } + @Test def testRealTimeAccuracy(): Unit = { val timer = new Timer val queries = CStream.genTimestamps(new Window(1, TimeUnit.DAYS), 1000) diff --git a/aggregator/src/test/scala/ai/chronon/aggregator/test/SawtoothOnlineAggregatorTest.scala b/aggregator/src/test/scala/ai/chronon/aggregator/test/SawtoothOnlineAggregatorTest.scala index 95ac9d37c6..30d2f0aa2d 100644 --- a/aggregator/src/test/scala/ai/chronon/aggregator/test/SawtoothOnlineAggregatorTest.scala +++ b/aggregator/src/test/scala/ai/chronon/aggregator/test/SawtoothOnlineAggregatorTest.scala @@ -23,6 +23,7 @@ import ai.chronon.api._ import com.google.gson.Gson import junit.framework.TestCase import org.junit.Assert.assertEquals +import org.junit.Test import java.time.{Instant, ZoneOffset} import java.time.format.DateTimeFormatter @@ -30,6 +31,7 @@ import java.util.Locale class SawtoothOnlineAggregatorTest extends TestCase { + @Test def testConsistency(): Unit = { val queryEndTs = TsUtils.round(System.currentTimeMillis(), WindowUtils.Day.millis) val batchEndTs = queryEndTs - WindowUtils.Day.millis diff --git a/aggregator/src/test/scala/ai/chronon/aggregator/test/TwoStackLiteAggregatorTest.scala b/aggregator/src/test/scala/ai/chronon/aggregator/test/TwoStackLiteAggregatorTest.scala index 4225842e1b..45895b7c88 100644 --- a/aggregator/src/test/scala/ai/chronon/aggregator/test/TwoStackLiteAggregatorTest.scala +++ b/aggregator/src/test/scala/ai/chronon/aggregator/test/TwoStackLiteAggregatorTest.scala @@ -22,12 +22,14 @@ import ai.chronon.aggregator.windowing.{TwoStackLiteAggregator, TwoStackLiteAggr import ai.chronon.api.{Aggregation, Builders, IntType, LongType, Operation, StructField, StructType, TimeUnit, Window} import junit.framework.TestCase import org.junit.Assert._ +import org.junit.Test import ai.chronon.api.Extensions.AggregationOps import com.google.gson.Gson import scala.collection.Seq class TwoStackLiteAggregatorTest extends TestCase{ + @Test def testBufferWithTopK(): Unit = { val topK = new TopK[Integer](IntType, 2) val bankersBuffer = new TwoStackLiteAggregationBuffer(topK, 5) @@ -53,6 +55,7 @@ class TwoStackLiteAggregatorTest extends TestCase{ assertBufferEquals(Seq(10), bankersBuffer.query) } + @Test def testAgainstSawtooth(): Unit = { val timer = new Timer val queries = CStream.genTimestamps(new Window(30, TimeUnit.DAYS), 100000, 5 * 60 * 1000) diff --git a/aggregator/src/test/scala/ai/chronon/aggregator/test/VarianceTest.scala b/aggregator/src/test/scala/ai/chronon/aggregator/test/VarianceTest.scala index 21f7b8a553..9c207b8a4a 100644 --- a/aggregator/src/test/scala/ai/chronon/aggregator/test/VarianceTest.scala +++ b/aggregator/src/test/scala/ai/chronon/aggregator/test/VarianceTest.scala @@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory import ai.chronon.aggregator.base.Variance import junit.framework.TestCase import org.junit.Assert._ +import org.junit.Test class VarianceTest extends TestCase { @transient lazy val logger = LoggerFactory.getLogger(getClass) @@ -59,6 +60,7 @@ class VarianceTest extends TestCase { assertTrue((naiveResult - welfordResult) / naiveResult < 0.0000001) } + @Test def testVariance: Unit = { compare(1000000) compare(1000000, min = 100000, max = 100001) diff --git a/api/BUILD.bazel b/api/BUILD.bazel index 3a5baf4f03..ef55c23483 100644 --- a/api/BUILD.bazel +++ b/api/BUILD.bazel @@ -34,13 +34,14 @@ scala_library( ), ) -scala_test_suite( +scala_junit_test_suite( name = "test", srcs = glob(["src/test/scala/ai/chronon/api/test/*.scala"]), - visibility = ["//visibility:public"], + strip_prefix = "src/test/scala/ai/chronon/api/test", + visibility = ["//visibility:private"], deps = [ - ":api-models", ":api-lib", + ":api-models", "//third_party/java/spark:spark-libs", maven_artifact("com.fasterxml.jackson.core:jackson-core"), maven_artifact("com.fasterxml.jackson.core:jackson-databind"), diff --git a/flink/BUILD.bazel b/flink/BUILD.bazel index 333fbbc70a..bca5b96131 100644 --- a/flink/BUILD.bazel +++ b/flink/BUILD.bazel @@ -41,10 +41,11 @@ scala_library( ], ) -scala_test_suite( +scala_junit_test_suite( name = "test", srcs = glob(["src/test/scala/ai/chronon/flink/**/*.scala"]), - visibility = ["//visibility:public"], + strip_prefix = "src/test/scala/ai/chronon/flink", + visibility = ["//visibility:private"], deps = [ ":flink", ":test-lib", diff --git a/jvm/spark_repos.bzl b/jvm/spark_repos.bzl index 53da4ae798..d0484cf35a 100644 --- a/jvm/spark_repos.bzl +++ b/jvm/spark_repos.bzl @@ -6,6 +6,7 @@ spark_2_4_repo = repo(name = "spark_2_4", artifacts = [ "org.apache.curator:apache-curator:2.11.0", "org.apache.datasketches:datasketches-java:2.0.0", "org.apache.datasketches:datasketches-memory:1.3.0", + "org.apache.derby:derby:10.12.1.1", "org.apache.hive:hive-exec:1.2.1", versioned_artifacts("2.4.0", [ "org.apache.spark:spark-streaming_2.11", @@ -28,7 +29,6 @@ spark_3_1_repo = repo(name = "spark_3_1", artifacts = [ "org.apache.curator:apache-curator:2.12.0", "org.apache.datasketches:datasketches-java:2.0.0", "org.apache.datasketches:datasketches-memory:1.3.0", - "org.apache.hive:hive-exec:3.1.2", "org.apache.kafka:kafka_2.12:2.6.3", versioned_artifacts("3.1.1", [ "org.apache.spark:spark-streaming_2.12", @@ -43,9 +43,11 @@ spark_3_1_repo = repo(name = "spark_3_1", artifacts = [ "org.json4s:json4s-core_2.12", "org.json4s:json4s-jackson_2.12", ]), + "org.apache.derby:derby:10.12.1.1", "org.apache.hive:hive-metastore:2.3.9", + "org.apache.hive:hive-exec:2.3.9", "io.delta:delta-core_2.12:2.0.2", -], excluded_artifacts = ["org.slf4j:slf4j-log4j12"]) +], excluded_artifacts = ["org.slf4j:slf4j-log4j12", "org.pentaho:pentaho-aggdesigner-algorithm"]) spark_3_2_repo = repo( name = "spark_3_2", @@ -80,11 +82,14 @@ spark_3_2_repo = repo( "org.apache.avro:avro:1.8.2", "org.apache.avro:avro-mapred:1.8.2", "org.apache.hive:hive-metastore:2.3.9", - "org.apache.hive:hive-exec:3.1.2", + "org.apache.hive:hive-exec:2.3.9", # Monitoring "io.prometheus.jmx:jmx_prometheus_javaagent:0.20.0", "io.delta:delta-core_2.12:2.0.2", + + # Test + "org.apache.derby:derby:10.14.2.0", ], excluded_artifacts = [ "org.pentaho:pentaho-aggdesigner-algorithm", @@ -124,12 +129,15 @@ spark_3_5_repo = repo( "org.apache.avro:avro:1.8.2", "org.apache.avro:avro-mapred:1.8.2", "org.apache.hive:hive-metastore:2.3.9", - "org.apache.hive:hive-exec:3.1.2", + "org.apache.hive:hive-exec:2.3.9", # Monitoring "io.prometheus.jmx:jmx_prometheus_javaagent:0.20.0", "io.delta:delta-core_2.12:2.0.2", "io.delta:delta-core_2.13:2.0.2", + + # Test + "org.apache.derby:derby:10.14.2.0", ], excluded_artifacts = [ "org.pentaho:pentaho-aggdesigner-algorithm", diff --git a/online/BUILD.bazel b/online/BUILD.bazel index 19c606e1e7..9fe8addfd4 100644 --- a/online/BUILD.bazel +++ b/online/BUILD.bazel @@ -24,10 +24,10 @@ scala_library( ]), visibility = ["//visibility:public"], deps = [ - "//api:api-models", + "//aggregator", "//api:api-lib", + "//api:api-models", "//third_party/java/spark:spark-libs", - "//aggregator", maven_artifact("com.esotericsoftware:kryo"), scala_artifact("org.json4s:json4s-core"), scala_artifact("org.json4s:json4s-jackson"), @@ -61,10 +61,10 @@ scala_library( srcs = glob(["src/test/scala/ai/chronon/online/**/*.scala"]), visibility = ["//visibility:public"], deps = [ - "//api:api-models", - "//api:api-lib", ":online", "//aggregator", + "//api:api-lib", + "//api:api-models", "//third_party/java/spark:spark-libs", maven_artifact("com.esotericsoftware:kryo"), scala_artifact("org.json4s:json4s-core"), @@ -102,10 +102,11 @@ scala_library( ), ) -scala_test_suite( +scala_junit_test_suite( name = "test", srcs = glob(["src/test/scala/ai/chronon/online/**/*.scala"]), - visibility = ["//visibility:public"], + strip_prefix = "src/test/scala/ai/chronon/online", + visibility = ["//visibility:private"], deps = [ ":online", ":test-lib", @@ -113,6 +114,13 @@ scala_test_suite( "//api:api-lib", "//api:api-models", "//third_party/java/spark:spark-libs", + maven_artifact("com.fasterxml.jackson.core:jackson-core"), + maven_artifact("com.fasterxml.jackson.core:jackson-databind"), + scala_artifact("org.scalactic:scalactic"), + scala_artifact("org.scalatest:scalatest-matchers-core"), + scala_artifact("org.scalatest:scalatest-core"), + maven_artifact("org.scalatest:scalatest-compatible"), + scala_artifact("org.scalatest:scalatest-shouldmatchers"), scala_artifact("org.scalatestplus:mockito-3-4"), maven_artifact("org.mockito:mockito-core"), maven_artifact("org.apache.thrift:libthrift"), @@ -123,7 +131,9 @@ scala_test_suite( maven_artifact("com.github.ben-manes.caffeine:caffeine"), maven_artifact("junit:junit"), maven_artifact("com.novocode:junit-interface"), - ], + ] + select_for_scala_version(before_2_13 = [ + maven_artifact("com.fasterxml.jackson.module:jackson-module-scala_2.12"), + ]), ) genrule( diff --git a/online/src/test/scala/ai/chronon/online/test/DataStreamBuilderTest.scala b/online/src/test/scala/ai/chronon/online/test/DataStreamBuilderTest.scala index 74ec3a2113..4fd9638be0 100644 --- a/online/src/test/scala/ai/chronon/online/test/DataStreamBuilderTest.scala +++ b/online/src/test/scala/ai/chronon/online/test/DataStreamBuilderTest.scala @@ -29,7 +29,12 @@ import scala.util.ScalaJavaConversions.JListOps class DataStreamBuilderTest { @transient lazy val logger = LoggerFactory.getLogger(getClass) lazy val spark: SparkSession = { - System.setSecurityManager(null) + try { + System.setSecurityManager(null) + } catch { + case (t: java.lang.SecurityException) if t.getMessage.contains("GoogleTestSecurityManager") => + // Running on Bazel, allow it. + } val spark = SparkSession .builder() .appName("DataStreamBuilderTest") diff --git a/service/BUILD.bazel b/service/BUILD.bazel index a41b4ba3b1..af9fcffca5 100644 --- a/service/BUILD.bazel +++ b/service/BUILD.bazel @@ -24,10 +24,13 @@ java_library( ], ) -scala_test_suite( +scala_junit_test_suite( name = "test", srcs = glob(["src/test/java/ai/chronon/service/handlers/*.java"]), - visibility = ["//visibility:public"], + strip_prefix = "src/test/java/ai/chronon/service/handlers", + # prefixes = ["ai.chronon.service.handlers"], + # suffixes = ["Test"], + visibility = ["//visibility:private"], deps = [ ":service", "//online", @@ -38,5 +41,6 @@ scala_test_suite( maven_artifact("io.vertx:vertx-unit"), maven_artifact("io.vertx:vertx-core"), maven_artifact("io.vertx:vertx-web"), + maven_artifact("io.vertx:vertx-codegen"), ], ) diff --git a/spark/BUILD.bazel b/spark/BUILD.bazel index b5edfb36b6..a81a050910 100644 --- a/spark/BUILD.bazel +++ b/spark/BUILD.bazel @@ -57,6 +57,7 @@ scala_library( "//api:api-models", "//online", "//third_party/java/spark:spark-libs", + "@bazel_tools//tools/java/runfiles", maven_artifact("com.google.code.gson:gson"), maven_artifact("org.apache.thrift:libthrift"), maven_artifact("com.google.guava:guava"), @@ -79,19 +80,30 @@ scala_library( ], ) -scala_test_suite( +scala_junit_test_suite( name = "test", + timeout = "eternal", srcs = glob(["src/test/scala/ai/chronon/spark/test/**/*.scala"]), - visibility = ["//visibility:public"], + data = [ + "//spark/src/test/resources", + "//tools/policies:derby.policy", + ], + jvm_flags = [ + "-Djava.security.policy=$(location //tools/policies:derby.policy)", + ], + resources = ["//spark/src/test/resources"], + strip_prefix = "src/test/scala/ai/chronon/spark/test", + visibility = ["//visibility:private"], deps = [ ":spark", - ":test-lib", "//aggregator", "//aggregator:test-lib", "//api:api-lib", "//api:api-models", "//online", "//third_party/java/spark:spark-libs", + "//third_party/java/spark:spark-test-libs", + "@bazel_tools//tools/java/runfiles", scala_artifact("org.scala-lang.modules:scala-java8-compat"), maven_artifact("junit:junit"), maven_artifact("com.novocode:junit-interface"), @@ -100,6 +112,11 @@ scala_test_suite( scala_artifact("com.fasterxml.jackson.module:jackson-module-scala"), maven_artifact("com.google.code.gson:gson"), scala_artifact("org.rogach:scallop"), + scala_artifact("org.scalactic:scalactic"), + scala_artifact("org.scalatest:scalatest-matchers-core"), + scala_artifact("org.scalatest:scalatest-core"), + maven_artifact("org.scalatest:scalatest-compatible"), + scala_artifact("org.scalatest:scalatest-shouldmatchers"), scala_artifact("org.scalatestplus:mockito-3-4"), maven_artifact("org.mockito:mockito-core"), maven_artifact("org.slf4j:slf4j-api"), diff --git a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala index ebe6f6b1ad..285ee43b38 100644 --- a/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala +++ b/spark/src/main/scala/ai/chronon/spark/SparkSessionBuilder.scala @@ -58,7 +58,12 @@ object SparkSessionBuilder { if (local) { //required to run spark locally with hive support enabled - for sbt test - System.setSecurityManager(null) + try { + System.setSecurityManager(null) + } catch { + case (t: java.lang.SecurityException) if t.getMessage.contains("GoogleTestSecurityManager") => + // Running on Bazel, allow it. + } } val userName = Properties.userName val warehouseDir = localWarehouseLocation.map(expandUser).getOrElse(DefaultWarehouseDir.getAbsolutePath) @@ -102,6 +107,7 @@ object SparkSessionBuilder { .config("spark.sql.warehouse.dir", s"$warehouseDir/data") .config("spark.hadoop.javax.jdo.option.ConnectionURL", metastoreDb) .config("spark.driver.bindAddress", "127.0.0.1") + .config("spark.ui.enabled", "false") } else { // hive jars need to be available on classpath - no needed for local testing baseBuilder @@ -131,6 +137,7 @@ object SparkSessionBuilder { .master("local[*]") .config("spark.local.dir", s"/tmp/$userName/chronon-spark-streaming") .config("spark.kryo.registrationRequired", "true") + .config("spark.ui.enabled", "false") } else { baseBuilder } diff --git a/spark/src/test/resources/BUILD.bazel b/spark/src/test/resources/BUILD.bazel new file mode 100644 index 0000000000..1f185ecf19 --- /dev/null +++ b/spark/src/test/resources/BUILD.bazel @@ -0,0 +1,5 @@ +filegroup( + name = "resources", + srcs = glob(["**"]), + visibility = ["//visibility:public"], +) diff --git a/spark/src/test/scala/ai/chronon/spark/test/ChainingFetcherTest.scala b/spark/src/test/scala/ai/chronon/spark/test/ChainingFetcherTest.scala index 61575dfe7f..f30db29a10 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/ChainingFetcherTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/ChainingFetcherTest.scala @@ -22,7 +22,7 @@ import ai.chronon.api import ai.chronon.api.Constants.ChrononMetadataKey import ai.chronon.api.Extensions.{JoinOps, MetadataOps} import ai.chronon.api._ -import ai.chronon.online.Fetcher.{Request} +import ai.chronon.online.Fetcher.Request import ai.chronon.online.{MetadataStore, SparkConversions} import ai.chronon.spark.Extensions._ import ai.chronon.spark.{Join => _, _} @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.functions.lit import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.Test import java.lang import java.util.TimeZone @@ -311,6 +312,7 @@ class ChainingFetcherTest extends TestCase { assertEquals(0, diff.count()) } + @Test def testFetchParentJoin(): Unit = { val namespace = "parent_join_fetch" val joinConf = generateMutationData(namespace, Accuracy.TEMPORAL) @@ -318,6 +320,7 @@ class ChainingFetcherTest extends TestCase { compareTemporalFetch(joinConf, "2021-04-15", expected, fetcherResponse, "user") } + @Test def testFetchChainingDeterministic(): Unit = { val namespace = "chaining_fetch" val chainingJoinConf = generateChainingJoinData(namespace, Accuracy.TEMPORAL) diff --git a/spark/src/test/scala/ai/chronon/spark/test/ExampleDataUtils.scala b/spark/src/test/scala/ai/chronon/spark/test/ExampleDataUtils.scala new file mode 100644 index 0000000000..237da41895 --- /dev/null +++ b/spark/src/test/scala/ai/chronon/spark/test/ExampleDataUtils.scala @@ -0,0 +1,17 @@ +package ai.chronon.spark.test + +import com.google.devtools.build.runfiles.Runfiles +import java.io.File + +object ExampleDataUtils { + lazy val runfiles = Runfiles.create() + + def getExampleDataDirectory(): String = { + val confResource = getClass.getResource("/") + if (confResource != null) confResource.getPath + else runfiles.rlocation("chronon/spark/src/test/resources") + } + + def getExampleData(path: String): String = + new File(getExampleDataDirectory(), path).getPath +} diff --git a/spark/src/test/scala/ai/chronon/spark/test/FetchStatsTest.scala b/spark/src/test/scala/ai/chronon/spark/test/FetchStatsTest.scala index c6b3c1b258..f10fd848aa 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/FetchStatsTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/FetchStatsTest.scala @@ -32,6 +32,7 @@ import ai.chronon.spark.{Analyzer, Join, SparkSessionBuilder, TableUtils} import com.google.gson.GsonBuilder import junit.framework.TestCase import org.apache.spark.sql.SparkSession +import org.junit.Test import java.util.TimeZone import java.util.concurrent.Executors @@ -56,6 +57,7 @@ class FetchStatsTest extends TestCase { private val today = tableUtils.partitionSpec.at(System.currentTimeMillis()) private val yesterday = tableUtils.partitionSpec.before(today) + @Test def testFetchStats(): Unit = { // Part 1: Build the assets. Join definition, compute and serve stats. tableUtils.createDatabase(namespace) diff --git a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala index 051ec1be73..25214a8fa6 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.functions.{avg, col, lit} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.Test import org.mockito.ArgumentMatchers.{any, anyString} import org.mockito.Mockito.{reset, spy, when} import org.slf4j.LoggerFactory @@ -655,12 +656,14 @@ class FetcherTest extends TestCase { assertEquals(0, diff.count()) } + @Test def testTemporalFetchJoinDeterministic(): Unit = { val namespace = "deterministic_fetch" val joinConf = generateMutationData(namespace) compareTemporalFetch(joinConf, "2021-04-10", namespace, consistencyCheck = false, dropDsOnWrite = true) } + @Test def testTemporalFetchJoinDerivation(): Unit = { val namespace = "derivation_fetch" val joinConf = generateMutationData(namespace) @@ -675,6 +678,7 @@ class FetcherTest extends TestCase { compareTemporalFetch(joinConf, "2021-04-10", namespace, consistencyCheck = false, dropDsOnWrite = true) } + @Test def testTemporalFetchJoinDerivationRenameOnly(): Unit = { val namespace = "derivation_fetch_rename_only" val joinConf = generateMutationData(namespace) @@ -685,6 +689,7 @@ class FetcherTest extends TestCase { compareTemporalFetch(joinConf, "2021-04-10", namespace, consistencyCheck = false, dropDsOnWrite = true) } + @Test def testTemporalFetchJoinGenerated(): Unit = { val namespace = "generated_fetch" val joinConf = generateRandomData(namespace) @@ -695,6 +700,7 @@ class FetcherTest extends TestCase { dropDsOnWrite = false) } + @Test def testTemporalTiledFetchJoinDeterministic(): Unit = { val namespace = "deterministic_tiled_fetch" val joinConf = generateEventOnlyData(namespace, groupByCustomJson = Some("{\"enable_tiling\": true}")) @@ -702,6 +708,7 @@ class FetcherTest extends TestCase { } // test soft-fail on missing keys + @Test def testEmptyRequest(): Unit = { val spark: SparkSession = createSparkSession() val namespace = "empty_request" @@ -727,6 +734,7 @@ class FetcherTest extends TestCase { assertTrue(responseMap.keys.forall(_.endsWith("_exception"))) } + @Test def testTemporalFetchGroupByNonExistKey(): Unit = { val namespace = "non_exist_key_group_by_fetch" val spark: SparkSession = createSparkSession() @@ -752,6 +760,7 @@ class FetcherTest extends TestCase { assertEquals(expected, result.head.values.get) } + @Test def testKVStorePartialFailure(): Unit = { val spark: SparkSession = createSparkSession() @@ -781,6 +790,7 @@ class FetcherTest extends TestCase { exceptionKeys.foreach(k => assertTrue(responseMap.contains(k))) } + @Test def testGroupByServingInfoTtlCacheRefresh(): Unit = { val namespace = "test_group_by_serving_info_ttl_cache_refresh" val spark: SparkSession = createSparkSession() @@ -817,6 +827,7 @@ class FetcherTest extends TestCase { assertTrue(response3.values.isSuccess) } + @Test def testJoinConfTtlCacheRefresh(): Unit = { val namespace = "test_join_conf_ttl_cache_refresh" val spark: SparkSession = createSparkSession() @@ -850,104 +861,4 @@ class FetcherTest extends TestCase { val response2 = fetch() assertTrue(response2.isSuccess) } -} - -object FetcherTestUtil { - @transient lazy val logger = LoggerFactory.getLogger(getClass) - def joinResponses(spark: SparkSession, - requests: Array[Request], - mockApi: MockApi, - useJavaFetcher: Boolean = false, - runCount: Int = 1, - samplePercent: Double = -1, - logToHive: Boolean = false, - debug: Boolean = false)(implicit ec: ExecutionContext): (List[Response], DataFrame) = { - val chunkSize = 100 - @transient lazy val fetcher = mockApi.buildFetcher(debug) - @transient lazy val javaFetcher = mockApi.buildJavaFetcher() - - def fetchOnce = { - var latencySum: Long = 0 - var latencyCount = 0 - val blockStart = System.currentTimeMillis() - val result = requests.iterator - .grouped(chunkSize) - .map { oldReqs => - // deliberately mis-type a few keys - val r = oldReqs - .map(r => - r.copy(keys = r.keys.mapValues { v => - if (v.isInstanceOf[java.lang.Long]) v.toString else v - }.toMap)) - val responses = if (useJavaFetcher) { - // Converting to java request and using the toScalaRequest functionality to test conversion - val convertedJavaRequests = r.map(new JavaRequest(_)).toJava - val javaResponse = javaFetcher.fetchJoin(convertedJavaRequests) - FutureConverters - .toScala(javaResponse) - .map( - _.toScala.map(jres => - Response( - Request(jres.request.name, jres.request.keys.toScala.toMap, Option(jres.request.atMillis)), - jres.values.toScala.map(_.toScala) - ))) - } else { - fetcher.fetchJoin(r) - } - - // fix mis-typed keys in the request - val fixedResponses = - responses.map(resps => resps.zip(oldReqs).map { case (resp, req) => resp.copy(request = req) }) - System.currentTimeMillis() -> fixedResponses - } - .flatMap { - case (start, future) => - val result = Await.result(future, Duration(10000, SECONDS)) // todo: change back to millis - val latency = System.currentTimeMillis() - start - latencySum += latency - latencyCount += 1 - result - } - .toList - val latencyMillis = latencySum.toFloat / latencyCount.toFloat - val qps = (requests.length * 1000.0) / (System.currentTimeMillis() - blockStart).toFloat - (latencyMillis, qps, result) - } - - // to overwhelm the profiler with fetching code path - // so as to make it prominent in the flamegraph & collect enough stats - - var latencySum = 0.0 - var qpsSum = 0.0 - var loggedValues: Seq[LoggableResponseBase64] = null - var result: List[Response] = null - (0 until runCount).foreach { _ => - val (latency, qps, resultVal) = fetchOnce - result = resultVal - loggedValues = mockApi.flushLoggedValues - latencySum += latency - qpsSum += qps - } - val fetcherNameString = if (useJavaFetcher) "Java" else "Scala" - - logger.info(s""" - |Averaging fetching stats for $fetcherNameString Fetcher over ${requests.length} requests $runCount times - |with batch size: $chunkSize - |average qps: ${qpsSum / runCount} - |average latency: ${latencySum / runCount} - |""".stripMargin) - val loggedDf = mockApi.loggedValuesToDf(loggedValues, spark) - if (logToHive) { - TableUtils(spark).insertPartitions( - loggedDf, - mockApi.logTable, - partitionColumns = Seq("ds", "name") - ) - } - if (samplePercent > 0) { - logger.info(s"logged count: ${loggedDf.count()}") - loggedDf.show() - } - result -> loggedDf - } -} +} \ No newline at end of file diff --git a/spark/src/test/scala/ai/chronon/spark/test/FetcherTestUtil.scala b/spark/src/test/scala/ai/chronon/spark/test/FetcherTestUtil.scala new file mode 100644 index 0000000000..92ca07b9e3 --- /dev/null +++ b/spark/src/test/scala/ai/chronon/spark/test/FetcherTestUtil.scala @@ -0,0 +1,112 @@ +package ai.chronon.spark.test + +import ai.chronon.online.Fetcher.{Request, Response} +import ai.chronon.online.{JavaRequest, LoggableResponseBase64} +import ai.chronon.spark.TableUtils +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.slf4j.LoggerFactory + +import scala.compat.java8.FutureConverters +import scala.concurrent.{Await, ExecutionContext} +import scala.concurrent.duration.{Duration, SECONDS} +import scala.util.ScalaJavaConversions._ + +object FetcherTestUtil { + @transient lazy val logger = LoggerFactory.getLogger(getClass) + def joinResponses(spark: SparkSession, + requests: Array[Request], + mockApi: MockApi, + useJavaFetcher: Boolean = false, + runCount: Int = 1, + samplePercent: Double = -1, + logToHive: Boolean = false, + debug: Boolean = false)(implicit ec: ExecutionContext): (List[Response], DataFrame) = { + val chunkSize = 100 + @transient lazy val fetcher = mockApi.buildFetcher(debug) + @transient lazy val javaFetcher = mockApi.buildJavaFetcher() + + def fetchOnce = { + var latencySum: Long = 0 + var latencyCount = 0 + val blockStart = System.currentTimeMillis() + val result = requests.iterator + .grouped(chunkSize) + .map { oldReqs => + // deliberately mis-type a few keys + val r = oldReqs + .map(r => + r.copy(keys = r.keys.mapValues { v => + if (v.isInstanceOf[java.lang.Long]) v.toString else v + }.toMap)) + val responses = if (useJavaFetcher) { + // Converting to java request and using the toScalaRequest functionality to test conversion + val convertedJavaRequests = r.map(new JavaRequest(_)).toJava + val javaResponse = javaFetcher.fetchJoin(convertedJavaRequests) + FutureConverters + .toScala(javaResponse) + .map( + _.toScala.map(jres => + Response( + Request(jres.request.name, jres.request.keys.toScala.toMap, Option(jres.request.atMillis)), + jres.values.toScala.map(_.toScala) + ))) + } else { + fetcher.fetchJoin(r) + } + + // fix mis-typed keys in the request + val fixedResponses = + responses.map(resps => resps.zip(oldReqs).map { case (resp, req) => resp.copy(request = req) }) + System.currentTimeMillis() -> fixedResponses + } + .flatMap { + case (start, future) => + val result = Await.result(future, Duration(10000, SECONDS)) // todo: change back to millis + val latency = System.currentTimeMillis() - start + latencySum += latency + latencyCount += 1 + result + } + .toList + val latencyMillis = latencySum.toFloat / latencyCount.toFloat + val qps = (requests.length * 1000.0) / (System.currentTimeMillis() - blockStart).toFloat + (latencyMillis, qps, result) + } + + // to overwhelm the profiler with fetching code path + // so as to make it prominent in the flamegraph & collect enough stats + + var latencySum = 0.0 + var qpsSum = 0.0 + var loggedValues: Seq[LoggableResponseBase64] = null + var result: List[Response] = null + (0 until runCount).foreach { _ => + val (latency, qps, resultVal) = fetchOnce + result = resultVal + loggedValues = mockApi.flushLoggedValues + latencySum += latency + qpsSum += qps + } + val fetcherNameString = if (useJavaFetcher) "Java" else "Scala" + + logger.info(s""" + |Averaging fetching stats for $fetcherNameString Fetcher over ${requests.length} requests $runCount times + |with batch size: $chunkSize + |average qps: ${qpsSum / runCount} + |average latency: ${latencySum / runCount} + |""".stripMargin) + val loggedDf = mockApi.loggedValuesToDf(loggedValues, spark) + if (logToHive) { + TableUtils(spark).insertPartitions( + loggedDf, + mockApi.logTable, + partitionColumns = Seq("ds", "name") + ) + } + if (samplePercent > 0) { + logger.info(s"logged count: ${loggedDf.count()}") + loggedDf.show() + } + result -> loggedDf + } +} diff --git a/spark/src/test/scala/ai/chronon/spark/test/MetadataExporterTest.scala b/spark/src/test/scala/ai/chronon/spark/test/MetadataExporterTest.scala index 9fbfde24c3..5fe00489e3 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/MetadataExporterTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/MetadataExporterTest.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.SparkSession import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.junit.Assert.assertEquals +import org.junit.Test import scala.io.Source import java.io.File @@ -62,6 +63,7 @@ class MetadataExporterTest extends TestCase { } } + @Test def testMetadataExport(): Unit = { // Create the tables. val namespace = "example_namespace" @@ -78,10 +80,10 @@ class MetadataExporterTest extends TestCase { val sampleDf = DataFrameGen .events(spark, sampleData, 10000, partitions = 30) sampleDf.save(sampleTable) - val confResource = getClass.getResource("/") + val confResourcePath = ExampleDataUtils.getExampleDataDirectory() val tmpDir: File = Files.createTempDir() - MetadataExporter.run(confResource.getPath, tmpDir.getAbsolutePath) - printFilesInDirectory(s"${confResource.getPath}/joins/team") + MetadataExporter.run(confResourcePath, tmpDir.getAbsolutePath) + printFilesInDirectory(s"${confResourcePath}/joins/team") printFilesInDirectory(s"${tmpDir.getAbsolutePath}/joins") // Read the files. val file = Source.fromFile(s"${tmpDir.getAbsolutePath}/joins/example_join.v1") diff --git a/spark/src/test/scala/ai/chronon/spark/test/MetadataStoreTest.scala b/spark/src/test/scala/ai/chronon/spark/test/MetadataStoreTest.scala index 8cfd0b5526..5d5378be5a 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/MetadataStoreTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/MetadataStoreTest.scala @@ -6,6 +6,7 @@ import ai.chronon.online.MetadataEndPoint.NameByTeamEndPointName import ai.chronon.online.{MetadataDirWalker, MetadataEndPoint, MetadataStore} import junit.framework.TestCase import org.junit.Assert.{assertEquals, assertFalse, assertTrue} +import org.junit.Test import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} @@ -13,8 +14,8 @@ import scala.io.Source class MetadataStoreTest extends TestCase { val joinPath = "joins/team/example_join.v1" - val confResource = getClass.getResource(s"/$joinPath") - val src = Source.fromFile(confResource.getPath) + val confResourcePath = ExampleDataUtils.getExampleData(joinPath) + val src = Source.fromFile(confResourcePath) val expected = { try src.mkString @@ -23,6 +24,7 @@ class MetadataStoreTest extends TestCase { val acceptedEndPoints = List(MetadataEndPoint.ConfByKeyEndPointName, MetadataEndPoint.NameByTeamEndPointName) + @Test def testMetadataStoreSingleFile(): Unit = { val inMemoryKvStore = OnlineUtils.buildInMemoryKVStore("FetcherTest") val singleFileDataSet = ChrononMetadataKey @@ -31,7 +33,7 @@ class MetadataStoreTest extends TestCase { inMemoryKvStore.create(singleFileDataSet) inMemoryKvStore.create(NameByTeamEndPointName) // set the working directory to /chronon instead of $MODULE_DIR in configuration if Intellij fails testing - val singleFileDirWalker = new MetadataDirWalker(confResource.getPath, acceptedEndPoints) + val singleFileDirWalker = new MetadataDirWalker(confResourcePath, acceptedEndPoints) val singleFileKvMap = singleFileDirWalker.run val singleFilePut: Seq[Future[scala.collection.Seq[Boolean]]] = singleFileKvMap.toSeq.map { case (endPoint, kvMap) => singleFileMetadataStore.put(kvMap, endPoint) @@ -56,6 +58,7 @@ class MetadataStoreTest extends TestCase { assertFalse(emptyRes.latest.isSuccess) } + @Test def testMetadataStoreDirectory(): Unit = { val inMemoryKvStore = OnlineUtils.buildInMemoryKVStore("FetcherTest") val directoryDataSetDataSet = ChrononMetadataKey @@ -63,7 +66,7 @@ class MetadataStoreTest extends TestCase { val directoryMetadataStore = new MetadataStore(inMemoryKvStore, directoryDataSetDataSet, timeoutMillis = 10000) inMemoryKvStore.create(directoryDataSetDataSet) inMemoryKvStore.create(directoryMetadataDataSet) - val directoryDataDirWalker = new MetadataDirWalker(confResource.getPath.replace(s"/$joinPath", ""), acceptedEndPoints) + val directoryDataDirWalker = new MetadataDirWalker(ExampleDataUtils.getExampleDataDirectory(), acceptedEndPoints) val directoryDataKvMap = directoryDataDirWalker.run val directoryPut = directoryDataKvMap.toSeq.map { case (endPoint, kvMap) => directoryMetadataStore.put(kvMap, endPoint) diff --git a/spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala b/spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala index 8bb7a132dc..1b8ab86120 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala @@ -27,6 +27,7 @@ import junit.framework.TestCase import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, assertTrue} +import org.junit.Test import java.nio.charset.StandardCharsets import java.util.{Base64, TimeZone} @@ -35,31 +36,6 @@ import scala.concurrent.Await import scala.concurrent.duration.{Duration, SECONDS} import scala.util.ScalaJavaConversions.{JListOps, ListOps} -case class GroupByTestSuite( - name: String, - groupByConf: GroupBy, - groupByData: DataFrame -) - -case class JoinTestSuite( - joinConf: Join, - groupBys: Seq[GroupByTestSuite], - fetchExpectations: (Map[String, AnyRef], Map[String, AnyRef]) -) - -object JoinTestSuite { - - def apply(joinConf: Join, groupBys: Seq[GroupByTestSuite]): JoinTestSuite = { - val suite = JoinTestSuite(joinConf, groupBys) - assert( - groupBys.map(_.groupByConf.metaData.name) == - joinConf.joinParts.toScala - .map(_.groupBy.metaData.name) - ) - suite - } -} - class SchemaEvolutionTest extends TestCase { val spark: SparkSession = SparkSessionBuilder.build("SchemaEvolutionTest", local = true) @@ -301,6 +277,7 @@ class SchemaEvolutionTest extends TestCase { flattenedDf } + @Test private def testSchemaEvolution(namespace: String, joinSuiteV1: JoinTestSuite, joinSuiteV2: JoinTestSuite): Unit = { assert(joinSuiteV1.joinConf.metaData.name == joinSuiteV2.joinConf.metaData.name, message = "Schema evolution can only be tested on changes of the SAME join") @@ -428,11 +405,13 @@ class SchemaEvolutionTest extends TestCase { assertTrue(removedFeatures.forall(flattenedDf34.schema.fieldNames.contains(_))) } + @Test def testAddFeatures(): Unit = { val namespace = "add_features" testSchemaEvolution(namespace, createV1Join(namespace), createV2Join(namespace)) } + @Test def testRemoveFeatures(): Unit = { val namespace = "remove_features" testSchemaEvolution(namespace, createV2Join(namespace), createV1Join(namespace)) diff --git a/spark/src/test/scala/ai/chronon/spark/test/StreamingTest.scala b/spark/src/test/scala/ai/chronon/spark/test/StreamingTest.scala index a8a48c1b33..56cd4d0774 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/StreamingTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/StreamingTest.scala @@ -22,14 +22,14 @@ import ai.chronon.api.{Accuracy, Builders, Constants, Operation, TimeUnit, Windo import ai.chronon.api.Constants.ChrononMetadataKey import ai.chronon.api.Extensions._ import ai.chronon.spark.test.StreamingTest.buildInMemoryKvStore -import ai.chronon.online.{MetadataStore} +import ai.chronon.online.MetadataStore import ai.chronon.spark.Extensions._ import ai.chronon.spark.{Join => _, _} import junit.framework.TestCase -import org.apache.spark.sql.{SparkSession} +import org.apache.spark.sql.SparkSession +import org.junit.Test import java.util.TimeZone - import scala.collection.JavaConverters.{asScalaBufferConverter, _} object StreamingTest { @@ -49,6 +49,7 @@ class StreamingTest extends TestCase { private val yesterday = tableUtils.partitionSpec.before(today) private val yearAgo = tableUtils.partitionSpec.minus(today, new Window(365, TimeUnit.DAYS)) + @Test def testStructInStreaming(): Unit = { tableUtils.createDatabase(namespace) val topicName = "fake_topic" diff --git a/spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala b/spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala index 8fe03d6563..a2fe851d72 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala @@ -25,7 +25,32 @@ import ai.chronon.spark.TableUtils import org.apache.spark.sql.functions.col import org.apache.spark.sql.{DataFrame, Row, SparkSession} -import scala.util.ScalaJavaConversions.JListOps +import scala.util.ScalaJavaConversions.{JListOps, ListOps} + +case class GroupByTestSuite( + name: String, + groupByConf: GroupBy, + groupByData: DataFrame + ) + +case class JoinTestSuite( + joinConf: Join, + groupBys: Seq[GroupByTestSuite], + fetchExpectations: (Map[String, AnyRef], Map[String, AnyRef]) + ) + +object JoinTestSuite { + + def apply(joinConf: Join, groupBys: Seq[GroupByTestSuite]): JoinTestSuite = { + val suite = JoinTestSuite(joinConf, groupBys) + assert( + groupBys.map(_.groupByConf.metaData.name) == + joinConf.joinParts.toScala + .map(_.groupBy.metaData.name) + ) + suite + } +} object TestUtils { def createViewsGroupBy(namespace: String, diff --git a/third_party/java/spark/BUILD.bazel b/third_party/java/spark/BUILD.bazel index 692b462f59..c2f653cb02 100644 --- a/third_party/java/spark/BUILD.bazel +++ b/third_party/java/spark/BUILD.bazel @@ -96,6 +96,13 @@ SPARK_3_5_LIBS_PROVIDED = SPARK_LIBS_PROVIDED + [ ), ] +SPARK_LIBS_TEST = [ + jar( + name = "derby", + org = "org.apache.derby", + ), +] + java_library( name = "spark-libs", visibility = ["//visibility:public"], @@ -107,6 +114,17 @@ java_library( }), ) +java_library( + name = "spark-test-libs", + visibility = ["//visibility:public"], + exports = select({ + "//conditions:default": get_jars_for_repo("spark_3_2", SPARK_LIBS_TEST), + "//tools/flags/spark:spark_3_1": get_jars_for_repo("spark_3_1", SPARK_LIBS_TEST), + "//tools/flags/spark:spark_3_2": get_jars_for_repo("spark_3_2", SPARK_LIBS_TEST), + "//tools/flags/spark:spark_3_5": get_jars_for_repo("spark_3_5", SPARK_LIBS_TEST), + }), +) + # Usually spark provided jars are subset of all jars java_library( name = "spark-provided-libs", diff --git a/tools/build_rules/prelude_bazel b/tools/build_rules/prelude_bazel index eb693d0d15..10481f56f0 100644 --- a/tools/build_rules/prelude_bazel +++ b/tools/build_rules/prelude_bazel @@ -8,6 +8,7 @@ load( "scala_jar") load("@rules_java//java:defs.bzl", "java_library","java_binary") -load("@io_bazel_rules_scala//scala:scala.bzl", "scala_library", "scala_binary","scala_test_suite") +load("@io_bazel_rules_scala//scala:scala.bzl", "scala_library", "scala_binary", "scala_junit_test", "scala_test_suite") load("//tools/build_rules:maven_artifact.bzl", "maven_artifact", "scala_artifact") load("//tools/build_rules:jvm_binary.bzl", "jvm_binary") +load("//tools/build_rules:testing.bzl", "scala_junit_test_suite") diff --git a/tools/build_rules/testing.bzl b/tools/build_rules/testing.bzl new file mode 100644 index 0000000000..61007332f8 --- /dev/null +++ b/tools/build_rules/testing.bzl @@ -0,0 +1,49 @@ +load("@io_bazel_rules_scala//scala:scala.bzl", "scala_junit_test", "scala_library") + +def is_junit_test(path): + return path.endswith("Test.scala") or path.endswith("Test.java") + +def make_short_name(path, strip_prefix): + if path.startswith(strip_prefix): + short_name = path[len(strip_prefix):] + else: + short_name = path + if short_name.startswith("/"): + short_name = short_name[1:] + return short_name.replace("/", "_").replace(".scala", "").replace(".java", "") + +def scala_junit_test_suite(name, srcs, strip_prefix, **kwargs): + test_deps = kwargs.pop("deps", []) + jvm_flags = kwargs.pop("jvm_flags", []) + timeout = kwargs.pop("timeout", "moderate") + + util_srcs = [src for src in srcs if not is_junit_test(src)] + if len(util_srcs) > 0: + test_utils = "{}_utils".format(name) + scala_library( + name = test_utils, + srcs = util_srcs, + deps = test_deps, + **kwargs + ) + test_deps.append(":{}".format(test_utils)) + + tests = [] + for src in srcs: + if is_junit_test(src): + test_name = "{}_{}".format(name, make_short_name(src, strip_prefix)) + tests.append(test_name) + scala_junit_test( + name = test_name, + srcs = [src], + suffixes = ["Test"], + timeout = timeout, + deps = test_deps, + jvm_flags = jvm_flags, + **kwargs + ) + + native.test_suite( + name = name, + tests = tests, + ) diff --git a/tools/ide_support/intellij/default_view.bazelproject b/tools/ide_support/intellij/default_view.bazelproject new file mode 100644 index 0000000000..d87cdc2369 --- /dev/null +++ b/tools/ide_support/intellij/default_view.bazelproject @@ -0,0 +1,16 @@ +directories: + # Add the directories you want added as source here + # By default, we've added your entire workspace ('.') + . + +# Automatically includes all relevant targets under the 'directories' above +derive_targets_from_directories: true + +targets: + # If source code isn't resolving, add additional targets that compile it here + +additional_languages: + # Uncomment any additional languages you want supported + python + scala + java diff --git a/tools/policies/BUILD.bazel b/tools/policies/BUILD.bazel new file mode 100644 index 0000000000..b8ede830b0 --- /dev/null +++ b/tools/policies/BUILD.bazel @@ -0,0 +1 @@ +exports_files(["derby.policy"]) diff --git a/tools/policies/derby.policy b/tools/policies/derby.policy new file mode 100644 index 0000000000..7d7fddbc7f --- /dev/null +++ b/tools/policies/derby.policy @@ -0,0 +1,9 @@ +// This file ensures that tests running under Bazel will have the correct permissions +// to use Derby as a backend for the Hive metastore. +// +// See: https://db.apache.org/derby/docs/10.13/security/csecembeddedperms.html +grant { + permission java.lang.RuntimePermission "createClassLoader"; + permission org.apache.derby.security.SystemPermission "engine", "usederbyinternals"; + permission java.util.PropertyPermission "derby.*", "read"; +};