diff --git a/.bazeliskrc b/.bazeliskrc new file mode 100644 index 0000000000..dbb7b4bcc3 --- /dev/null +++ b/.bazeliskrc @@ -0,0 +1 @@ +USE_BAZEL_VERSION=6.4.0 \ No newline at end of file diff --git a/.gitignore b/.gitignore index 301d6b8e5e..5508dd83e6 100644 --- a/.gitignore +++ b/.gitignore @@ -87,4 +87,7 @@ releases # Metals-generated sbt files /project/**/metals.sbt -/project/**/metals.sbt.lock \ No newline at end of file +/project/**/metals.sbt.lock + +# Bazel temporary output +/bazel-* \ No newline at end of file diff --git a/WORKSPACE b/WORKSPACE new file mode 100644 index 0000000000..c7b0fa56d5 --- /dev/null +++ b/WORKSPACE @@ -0,0 +1,77 @@ +workspace(name = "chronon") + +# Scala version used across the project +SCALA_VERSION = "2.12.18" + +load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") + +# Contains useful bazel utility functions and rules +http_archive( + name = "bazel_skylib", + sha256 = "bc283cdfcd526a52c3201279cda4bc298652efa898b10b4db0837dc51652756f", + urls = [ + "https://mirror.bazel.build/github.com/bazelbuild/bazel-skylib/releases/download/1.7.1/bazel-skylib-1.7.1.tar.gz", + "https://github.com/bazelbuild/bazel-skylib/releases/download/1.7.1/bazel-skylib-1.7.1.tar.gz", + ], +) + +# For Java support +http_archive( + name = "rules_java", + sha256 = "e81e9deaae0d9d99ef3dd5f6c1b32338447fe16d5564155531ea4eb7ef38854b", + urls = [ + "https://github.com/bazelbuild/rules_java/releases/download/7.0.6/rules_java-7.0.6.tar.gz", + ], +) +load("@rules_java//java:repositories.bzl", "rules_java_dependencies", "rules_java_toolchains") +rules_java_dependencies() +rules_java_toolchains() +load("@rules_java//java:repositories.bzl", "remote_jdk17_repos") +remote_jdk17_repos() + +# For JVM support +http_archive( + name = "rules_jvm_external", + strip_prefix = "rules_jvm_external-6.6", + sha256 = "3afe5195069bd379373528899c03a3072f568d33bd96fe037bd43b1f590535e7", + url = "https://github.com/bazel-contrib/rules_jvm_external/releases/download/6.6/rules_jvm_external-6.6.tar.gz" +) +load("@rules_jvm_external//:repositories.bzl", "rules_jvm_external_deps") +rules_jvm_external_deps() +load("@rules_jvm_external//:setup.bzl", "rules_jvm_external_setup") +rules_jvm_external_setup() + +# For Scala support +http_archive( + name = "io_bazel_rules_scala", + sha256 = "e734eef95cf26c0171566bdc24d83bd82bdaf8ca7873bec6ce9b0d524bdaf05d", + strip_prefix = "rules_scala-6.6.0", + url = "https://github.com/bazelbuild/rules_scala/releases/download/v6.6.0/rules_scala-v6.6.0.tar.gz", +) +# Initialize Scala with specific version support +load("@io_bazel_rules_scala//:scala_config.bzl", "scala_config") +scala_config(scala_version = SCALA_VERSION) +load("@io_bazel_rules_scala//scala:scala.bzl", "scala_repositories") +scala_repositories() +load("@io_bazel_rules_scala//scala:toolchains.bzl", "scala_register_toolchains") +scala_register_toolchains() +load("@io_bazel_rules_scala//testing:scalatest.bzl", "scalatest_repositories", "scalatest_toolchain") +scalatest_repositories() +scalatest_toolchain() + +# For Protobuf support +http_archive( + name = "rules_proto", + sha256 = "dc3fb206a2cb3441b485eb1e423165b231235a1ea9b031b4433cf7bc1fa460dd", + strip_prefix = "rules_proto-5.3.0-21.7", + urls = [ + "https://github.com/bazelbuild/rules_proto/archive/refs/tags/5.3.0-21.7.tar.gz", + ], +) +load("@rules_proto//proto:repositories.bzl", "rules_proto_dependencies", "rules_proto_toolchains") +rules_proto_dependencies() +rules_proto_toolchains() + +# To load all dependencies used across our modules +load("//tools/build_rules/dependencies:load_dependencies.bzl", "load_all_dependencies") +load_all_dependencies() \ No newline at end of file diff --git a/aggregator/src/main/scala/ai/chronon/aggregator/row/MapColumnAggregator.scala b/aggregator/src/main/scala/ai/chronon/aggregator/row/MapColumnAggregator.scala index df66aab7be..0629c13c94 100644 --- a/aggregator/src/main/scala/ai/chronon/aggregator/row/MapColumnAggregator.scala +++ b/aggregator/src/main/scala/ai/chronon/aggregator/row/MapColumnAggregator.scala @@ -18,9 +18,9 @@ package ai.chronon.aggregator.row import ai.chronon.aggregator.base.SimpleAggregator import ai.chronon.api.Row +import ai.chronon.api.ScalaJavaConversions._ import java.util -import scala.util.ScalaJavaConversions.IteratorOps class MapColumnAggregator[Input, IR, Output](agg: SimpleAggregator[Input, IR, Output], columnIndices: ColumnIndices, diff --git a/aggregator/src/main/scala/ai/chronon/aggregator/row/StatsGenerator.scala b/aggregator/src/main/scala/ai/chronon/aggregator/row/StatsGenerator.scala index 9d63ff8fa8..02c3add856 100644 --- a/aggregator/src/main/scala/ai/chronon/aggregator/row/StatsGenerator.scala +++ b/aggregator/src/main/scala/ai/chronon/aggregator/row/StatsGenerator.scala @@ -18,12 +18,12 @@ package ai.chronon.aggregator.row import ai.chronon.api import ai.chronon.api.Extensions._ +import ai.chronon.api.ScalaJavaConversions._ import org.apache.datasketches.kll.KllFloatsSketch import org.apache.datasketches.memory.Memory import java.util import scala.collection.Seq -import scala.util.ScalaJavaConversions.JMapOps /** * Module managing FeatureStats Schema, Aggregations to be used by type and aggregator construction. diff --git a/aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala b/aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala index c765aa34f4..710eca406c 100644 --- a/aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala +++ b/aggregator/src/main/scala/ai/chronon/aggregator/windowing/Resolution.scala @@ -19,11 +19,10 @@ package ai.chronon.aggregator.windowing import ai.chronon.api.Extensions.WindowOps import ai.chronon.api.Extensions.WindowUtils import ai.chronon.api.GroupBy +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api.TimeUnit import ai.chronon.api.Window -import scala.util.ScalaJavaConversions.ListOps - trait Resolution extends Serializable { // For a given window what is the resolution of the tail // The tail hops with the window size as represented by the return value diff --git a/api/BUILD.bazel b/api/BUILD.bazel new file mode 100644 index 0000000000..e44dada8e6 --- /dev/null +++ b/api/BUILD.bazel @@ -0,0 +1,59 @@ +load("@io_bazel_rules_scala//scala:scala.bzl", "scala_library", "scala_test_suite") +load("//tools/build_rules/thrift:thrift.bzl", "thrift_java_library") + +scala_library( + name = "api-lib", + srcs = glob(["src/main/**/*.scala", "src/main/**/*.java"]), + visibility = ["//visibility:public"], + deps = [ + ":api-models", + "//tools/build_rules/spark:spark-exec", + maven_artifact("com.fasterxml.jackson.core:jackson-core"), + maven_artifact("com.fasterxml.jackson.core:jackson-databind"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + maven_artifact("org.apache.commons:commons-lang3"), + maven_artifact("com.google.code.gson:gson"), + scala_artifact("org.scala-lang.modules:scala-collection-compat"), + scala_artifact("org.scala-lang.modules:scala-parser-combinators"), + ], +) + +scala_test_suite( + name = "api-test", + srcs = glob(["src/test/**/*.scala"]), + visibility = ["//visibility:public"], + deps = [ + ":api-models", + ":api-lib", + "//tools/build_rules/spark:spark-exec", + maven_artifact("com.fasterxml.jackson.core:jackson-core"), + maven_artifact("com.fasterxml.jackson.core:jackson-databind"), + maven_artifact("org.apache.thrift:libthrift"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + maven_artifact("junit:junit"), + maven_artifact("com.novocode:junit-interface"), + maven_artifact("org.mockito:mockito-core"), + scala_artifact("org.scala-lang.modules:scala-parser-combinators"), + scala_artifact("org.scala-lang.modules:scala-collection-compat"), + ], +) + +thrift_java_library( + name = "api-models-java", + srcs = glob(["thrift/*.thrift"]), +) + +java_library( + name = "api-models", + srcs = [":api-models-java"] + glob(["src/main/java/ai/chronon/api/thrift/**/*.java"]), + visibility = ["//visibility:public"], + deps = [ + maven_artifact("javax.annotation:javax.annotation.api"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + maven_artifact("org.apache.commons:commons-lang3"), + maven_artifact("com.google.code.gson:gson"), + ], +) \ No newline at end of file diff --git a/api/src/main/scala-2.11/scala/util/ScalaVersionSpecificCollectionsConverter.scala b/api/src/main/scala-2.11/scala/util/ScalaVersionSpecificCollectionsConverter.scala deleted file mode 100644 index ce90993974..0000000000 --- a/api/src/main/scala-2.11/scala/util/ScalaVersionSpecificCollectionsConverter.scala +++ /dev/null @@ -1,86 +0,0 @@ -package scala.util - -import scala.collection.JavaConverters._ -import scala.collection.parallel.ParSeq - -object ScalaVersionSpecificCollectionsConverter { - - def convertScalaMapToJava[S, T](map: Map[S, T]): java.util.Map[S, T] = { - map.asJava - } - - def convertJavaMapToScala[S, T](map: java.util.Map[S, T]): Map[S, T] = { - map.asScala.toMap - } - - def convertScalaListToJava[S](map: List[S]): java.util.List[S] = { - map.asJava - } - - def convertScalaSeqToJava[S](seq: Seq[S]): java.util.List[S] = { - seq.asJava - } - - def convertJavaListToScala[S](map: java.util.List[S]): List[S] = { - map.asScala.toList - } -} - -object ScalaJavaConversions { - - implicit class IteratorOps[T](iterator: java.util.Iterator[T]) { - def toScala: Iterator[T] = { - iterator.asScala - } - } - implicit class JIteratorOps[T](iterator: Iterator[T]) { - def toJava: java.util.Iterator[T] = { - iterator.asJava - } - } - implicit class ListOps[T](list: java.util.List[T]) { - def toScala: List[T] = { - if (list == null) { - null - } else { - list.iterator().asScala.toList - } - } - } - implicit class IterableOps[T](it: Iterable[T]) { - def parallel: ParSeq[T] = { - if (it == null) { - null - } else { - it.toSeq.par - } - } - } - implicit class JListOps[T](list: Seq[T]) { - def toJava: java.util.List[T] = { - if (list == null) { - null - } else { - list.asJava - } - } - } - implicit class MapOps[K, V](map: java.util.Map[K, V]) { - def toScala: Map[K, V] = { - if (map == null) { - null - } else { - map.asScala.toMap - } - } - } - implicit class JMapOps[K, V](map: Map[K, V]) { - def toJava: java.util.Map[K, V] = { - if (map == null) { - null - } else { - map.asJava - } - } - } -} diff --git a/api/src/main/scala-2.13/scala/util/ScalaVersionSpecificCollectionsConverter.scala b/api/src/main/scala-2.13/scala/util/ScalaVersionSpecificCollectionsConverter.scala deleted file mode 100644 index ecfd6ae30d..0000000000 --- a/api/src/main/scala-2.13/scala/util/ScalaVersionSpecificCollectionsConverter.scala +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (C) 2023 The Chronon Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package scala.util - -import scala.collection.parallel.CollectionConverters.IterableIsParallelizable -import scala.collection.parallel.ParSeq -import scala.jdk.CollectionConverters._ - -object ScalaVersionSpecificCollectionsConverter { - - def convertScalaMapToJava[S, T](map: Map[S, T]): java.util.Map[S, T] = { - map.asJava - } - - def convertJavaMapToScala[S, T](map: java.util.Map[S, T]): Map[S, T] = { - map.asScala.toMap - } - - def convertScalaListToJava[S](map: List[S]): java.util.List[S] = { - map.asJava - } - - def convertScalaSeqToJava[S](seq: Seq[S]): java.util.List[S] = { - seq.asJava - } - - def convertJavaListToScala[S](map: java.util.List[S]): List[S] = { - map.asScala.toList - } -} - -object ScalaJavaConversions { - - implicit class IteratorOps[T](iterator: java.util.Iterator[T]) { - def toScala: Iterator[T] = { - iterator.asScala - } - } - implicit class JIteratorOps[T](iterator: Iterator[T]) { - def toJava: java.util.Iterator[T] = { - iterator.asJava - } - } - implicit class ListOps[T](list: java.util.List[T]) { - def toScala: List[T] = { - if (list == null) { - null - } else { - list.iterator().asScala.toList - } - } - } - implicit class JListOps[T](list: Seq[T]) { - def toJava: java.util.List[T] = { - if (list == null) { - null - } else { - list.asJava - } - } - } - implicit class JListSeqOps[T](list: scala.collection.Seq[T]) { - def toJava: java.util.List[T] = { - if (list == null) { - null - } else { - list.asJava - } - } - } - implicit class MapOps[K, V](map: java.util.Map[K, V]) { - def toScala: Map[K, V] = { - if (map == null) { - null - } else { - map.asScala.toMap - } - } - } - implicit class IterableOps[T](it: Iterable[T]) { - def parallel: ParSeq[T] = { - if (it == null) { - null - } else { - it.toSeq.par.toSeq - } - } - } - implicit class JMapOps[K, V](map: Map[K, V]) { - def toJava: java.util.Map[K, V] = { - if (map == null) { - null - } else { - map.asJava - } - } - } -} diff --git a/api/src/main/scala/ai/chronon/api/Builders.scala b/api/src/main/scala/ai/chronon/api/Builders.scala index 2acc4f0878..672c9be4fc 100644 --- a/api/src/main/scala/ai/chronon/api/Builders.scala +++ b/api/src/main/scala/ai/chronon/api/Builders.scala @@ -18,10 +18,10 @@ package ai.chronon.api import ai.chronon.api.DataType.toTDataType import ai.chronon.api.Extensions.WindowUtils +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.observability.DriftSpec import scala.collection.Seq -import scala.util.ScalaJavaConversions._ // mostly used by tests to define confs easily object Builders { diff --git a/api/src/main/scala/ai/chronon/api/DataType.scala b/api/src/main/scala/ai/chronon/api/DataType.scala index 1e60c5b54a..471a8b8da9 100644 --- a/api/src/main/scala/ai/chronon/api/DataType.scala +++ b/api/src/main/scala/ai/chronon/api/DataType.scala @@ -16,9 +16,9 @@ package ai.chronon.api +import ai.chronon.api.ScalaJavaConversions._ + import java.util -import scala.util.ScalaJavaConversions.JListOps -import scala.util.ScalaJavaConversions.ListOps sealed trait DataType extends Serializable diff --git a/api/src/main/scala/ai/chronon/api/Extensions.scala b/api/src/main/scala/ai/chronon/api/Extensions.scala index aa3631d025..b312803d7c 100644 --- a/api/src/main/scala/ai/chronon/api/Extensions.scala +++ b/api/src/main/scala/ai/chronon/api/Extensions.scala @@ -19,6 +19,7 @@ package ai.chronon.api import ai.chronon.api.DataModel._ import ai.chronon.api.Operation._ import ai.chronon.api.QueryUtils.buildSelects +import ai.chronon.api.ScalaJavaConversions._ import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.ObjectMapper import org.apache.spark.sql.Column @@ -33,10 +34,6 @@ import java.util.regex.Pattern import scala.collection.Seq import scala.collection.mutable import scala.util.Failure -import scala.util.ScalaJavaConversions.IteratorOps -import scala.util.ScalaJavaConversions.JMapOps -import scala.util.ScalaJavaConversions.ListOps -import scala.util.ScalaJavaConversions.MapOps import scala.util.Success import scala.util.Try diff --git a/api/src/main/scala-2.12/scala/util/ScalaVersionSpecificCollectionsConverter.scala b/api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala similarity index 70% rename from api/src/main/scala-2.12/scala/util/ScalaVersionSpecificCollectionsConverter.scala rename to api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala index 4c8bd14c37..0a69fca163 100644 --- a/api/src/main/scala-2.12/scala/util/ScalaVersionSpecificCollectionsConverter.scala +++ b/api/src/main/scala/ai/chronon/api/ScalaJavaConversions.scala @@ -1,32 +1,41 @@ -package scala.util +package ai.chronon.api import scala.collection.parallel.ParSeq import scala.jdk.CollectionConverters._ -object ScalaVersionSpecificCollectionsConverter { - - def convertScalaMapToJava[S, T](map: Map[S, T]): java.util.Map[S, T] = { - map.asJava - } +object ScalaJavaConversions { - def convertJavaMapToScala[S, T](map: java.util.Map[S, T]): Map[S, T] = { - map.asScala.toMap + def toJava[T](list: Seq[T]): java.util.List[T] = { + if (list == null) { + null + } else { + list.asJava + } } - def convertScalaListToJava[S](map: List[S]): java.util.List[S] = { - map.asJava + def toScala[T](list: java.util.List[T]): Seq[T] = { + if (list == null) { + null + } else { + list.asScala + } } - def convertScalaSeqToJava[S](seq: Seq[S]): java.util.List[S] = { - seq.asJava + def toJava[K, V](map: Map[K, V]): java.util.Map[K, V] = { + if (map == null) { + null + } else { + map.asJava + } } - def convertJavaListToScala[S](jList: java.util.List[S]): List[S] = { - jList.asScala.toList + def toScala[K, V](map: java.util.Map[K, V]): Map[K, V] = { + if (map == null) { + null + } else { + map.asScala.toMap + } } -} - -object ScalaJavaConversions { implicit class IteratorOps[T](iterator: java.util.Iterator[T]) { def toScala: Iterator[T] = { diff --git a/api/src/main/scala/ai/chronon/api/ThriftJsonCodec.scala b/api/src/main/scala/ai/chronon/api/ThriftJsonCodec.scala index 208933099a..89a55844fc 100644 --- a/api/src/main/scala/ai/chronon/api/ThriftJsonCodec.scala +++ b/api/src/main/scala/ai/chronon/api/ThriftJsonCodec.scala @@ -17,6 +17,7 @@ package ai.chronon.api import ai.chronon.api.Extensions.StringsOps +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api.thrift.TBase import ai.chronon.api.thrift.TDeserializer import ai.chronon.api.thrift.TSerializer @@ -35,7 +36,6 @@ import java.util.Base64 import scala.io.BufferedSource import scala.io.Source._ import scala.reflect.ClassTag -import scala.util.ScalaJavaConversions.ListOps object ThriftJsonCodec { @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) diff --git a/api/src/test/scala/ai/chronon/api/test/ExtensionsTest.scala b/api/src/test/scala/ai/chronon/api/test/ExtensionsTest.scala index b73017bcb7..f128de98b5 100644 --- a/api/src/test/scala/ai/chronon/api/test/ExtensionsTest.scala +++ b/api/src/test/scala/ai/chronon/api/test/ExtensionsTest.scala @@ -21,6 +21,7 @@ import ai.chronon.api.Builders import ai.chronon.api.Constants import ai.chronon.api.Extensions._ import ai.chronon.api.GroupBy +import ai.chronon.api.ScalaJavaConversions._ import org.junit.Assert.assertEquals import org.junit.Assert.assertFalse import org.junit.Assert.assertTrue @@ -29,7 +30,6 @@ import org.mockito.Mockito.spy import org.mockito.Mockito.when import java.util.Arrays -import scala.util.ScalaJavaConversions.JListOps class ExtensionsTest { diff --git a/devnotes.md b/devnotes.md index 5e59655184..2ff03e27e1 100644 --- a/devnotes.md +++ b/devnotes.md @@ -15,13 +15,12 @@ export CHRONON_API=$CHRONON_OS/api/py alias materialize="PYTHONPATH=$CHRONON_API:$PYTHONPATH $CHRONON_API/ai/chronon/repo/compile.py" ``` -### Install specific version of thrift +### Install latest version of thrift -Thrift is a dependency for compile. The latest version 0.14 is very new - feb 2021, and incompatible with hive metastore. So we force 0.13. +Thrift is a dependency for compile. The latest version is 0.22 jan 2025. ```shell -brew tap cartman-kai/thrift -brew install thrift@0.13 +brew install thrift ``` ### Install Python dependency packages for API @@ -39,7 +38,7 @@ python3 -m pip install -U tox build * ```asdf install``` (see `.tool-versions` for required runtimes and versions) -> NOTE: Use scala `2.12.18` and java `corretto-8` for the OSS Chronon distribution. +> NOTE: Use scala `2.12.18` and java `corretto-17` for Zipline distribution. older java `corretto-8` is used for OSS Chronon distribution. @@ -107,6 +106,8 @@ materialize --input_path= ``` ### Testing +We are currently migrating our build tool to bazel from sbt. +#### Using sbt All tests ```shell sbt test @@ -130,9 +131,22 @@ sbt dependencyBrowseGraph sbt dependencyBrowseTree ``` +#### Using bazel +All tests for a specific module +```shell +# Example: bazel test //api:api-test +bazel test //{module}:{test_target} +``` +Specific submodule tests +```shell +# Example: bazel test //api:api-test_test_suite_src_test_scala_ai_chronon_api_test_DataPointerTest.scala +bazel test //{module}:{test_target}_test_suite_{submodule_test_path} +``` + # Chronon Build Process * Inside the `$CHRONON_OS` directory. +## Using sbt ### To build all of the Chronon artifacts locally (builds all the JARs, and Python API) ```shell sbt package @@ -166,6 +180,21 @@ sbt assembly ```shell sbt 'spark/assembly' ``` +## Using bazel + +### To clean the repository for a fresh build +```shell +# Removes build outputs and action cache. +bazel clean +# This leaves workspace as if Bazel was never run. +# Does additional cleanup compared to above command and should also be generally faster +bazel clean --expunge +``` +### Build a fat jar for just one module +```shell +# Example: bazel build //api:api-lib +bazel build //{module}:{build_target} +``` # Chronon Artifacts Publish Process * Inside the `$CHRONON_OS` directory. diff --git a/hub/src/main/scala/ai/chronon/hub/handlers/TimeSeriesHandler.scala b/hub/src/main/scala/ai/chronon/hub/handlers/TimeSeriesHandler.scala index 9119b827c7..99a6d34d90 100644 --- a/hub/src/main/scala/ai/chronon/hub/handlers/TimeSeriesHandler.scala +++ b/hub/src/main/scala/ai/chronon/hub/handlers/TimeSeriesHandler.scala @@ -1,6 +1,7 @@ package ai.chronon.hub.handlers import ai.chronon.api.Extensions.WindowOps +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api.TimeUnit import ai.chronon.api.Window import ai.chronon.hub.model.Aggregates @@ -36,8 +37,6 @@ import scala.concurrent.duration.DurationInt import scala.concurrent.{Future => ScalaFuture} import scala.jdk.CollectionConverters._ import scala.util.Failure -import scala.util.ScalaJavaConversions.ListOps -import scala.util.ScalaJavaConversions.MapOps import scala.util.Success /** diff --git a/online/src/main/java/ai/chronon/online/JavaExternalSourceHandler.java b/online/src/main/java/ai/chronon/online/JavaExternalSourceHandler.java index 5610727ba6..e8cecd3125 100644 --- a/online/src/main/java/ai/chronon/online/JavaExternalSourceHandler.java +++ b/online/src/main/java/ai/chronon/online/JavaExternalSourceHandler.java @@ -19,7 +19,7 @@ import scala.collection.Seq; import scala.compat.java8.FutureConverters; import scala.concurrent.Future; -import scala.util.ScalaVersionSpecificCollectionsConverter; +import ai.chronon.api.ScalaJavaConversions; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -38,8 +38,7 @@ public abstract class JavaExternalSourceHandler extends ExternalSourceHandler { @Override public Future> fetch(Seq requests) { // TODO: deprecate ScalaVersionSpecificCollectionsConverter in java - java.util.List javaRequests = ScalaVersionSpecificCollectionsConverter - .convertScalaListToJava(requests.toList()) + java.util.List javaRequests = ScalaJavaConversions.toJava(requests.toList()) .stream() .map(JavaRequest::fromScalaRequest) .collect(Collectors.toList()); @@ -50,7 +49,7 @@ public Future> fetch(Seq requests) { .stream() .map(JavaResponse::toScala) .collect(Collectors.toList()); - return ScalaVersionSpecificCollectionsConverter.convertJavaListToScala(jListSMap).toSeq(); + return ScalaJavaConversions.toScala(jListSMap); } ); return FutureConverters.toScala(mapJFuture); diff --git a/online/src/main/java/ai/chronon/online/JavaRequest.java b/online/src/main/java/ai/chronon/online/JavaRequest.java index 63f4fdccfb..9b230c85f9 100644 --- a/online/src/main/java/ai/chronon/online/JavaRequest.java +++ b/online/src/main/java/ai/chronon/online/JavaRequest.java @@ -17,7 +17,7 @@ package ai.chronon.online; import scala.Option; -import scala.util.ScalaVersionSpecificCollectionsConverter; +import ai.chronon.api.ScalaJavaConversions; import java.util.Map; @@ -38,7 +38,7 @@ public JavaRequest(String name, Map keys, Long atMillis) { public JavaRequest(Fetcher.Request scalaRequest) { this.name = scalaRequest.name(); - this.keys = ScalaVersionSpecificCollectionsConverter.convertScalaMapToJava(scalaRequest.keys()); + this.keys = ScalaJavaConversions.toJava(scalaRequest.keys()); Option millisOpt = scalaRequest.atMillis(); if (millisOpt.isDefined()) { this.atMillis = (Long) millisOpt.get(); @@ -52,7 +52,7 @@ public static JavaRequest fromScalaRequest(Fetcher.Request scalaRequest) { public Fetcher.Request toScalaRequest() { scala.collection.immutable.Map scalaKeys = null; if (keys != null) { - scalaKeys = ScalaVersionSpecificCollectionsConverter.convertJavaMapToScala(keys); + scalaKeys = ScalaJavaConversions.toScala(keys); } return new Fetcher.Request( diff --git a/online/src/main/java/ai/chronon/online/JavaResponse.java b/online/src/main/java/ai/chronon/online/JavaResponse.java index 0e672e9fe9..6a3f359519 100644 --- a/online/src/main/java/ai/chronon/online/JavaResponse.java +++ b/online/src/main/java/ai/chronon/online/JavaResponse.java @@ -16,7 +16,7 @@ package ai.chronon.online; -import scala.util.ScalaVersionSpecificCollectionsConverter; +import ai.chronon.api.ScalaJavaConversions; import java.util.Map; @@ -36,7 +36,7 @@ public JavaResponse(Fetcher.Response scalaResponse){ .fromScala(scalaResponse.values()) .map(v -> { if (v != null) - return ScalaVersionSpecificCollectionsConverter.convertScalaMapToJava(v); + return ScalaJavaConversions.toJava(v); else return null; }); @@ -45,6 +45,6 @@ public JavaResponse(Fetcher.Response scalaResponse){ public Fetcher.Response toScala() { return new Fetcher.Response( request.toScalaRequest(), - values.map(ScalaVersionSpecificCollectionsConverter::convertJavaMapToScala).toScala()); + values.map(ScalaJavaConversions::toScala).toScala()); } } diff --git a/online/src/main/java/ai/chronon/online/JavaSeriesStatsResponse.java b/online/src/main/java/ai/chronon/online/JavaSeriesStatsResponse.java index 6e5536027d..6d79556334 100644 --- a/online/src/main/java/ai/chronon/online/JavaSeriesStatsResponse.java +++ b/online/src/main/java/ai/chronon/online/JavaSeriesStatsResponse.java @@ -16,7 +16,7 @@ package ai.chronon.online; -import scala.util.ScalaVersionSpecificCollectionsConverter; +import ai.chronon.api.ScalaJavaConversions; import java.util.Map; @@ -33,12 +33,12 @@ public JavaSeriesStatsResponse(Fetcher.SeriesStatsResponse scalaResponse){ this.request = new JavaStatsRequest(scalaResponse.request()); this.values = JTry .fromScala(scalaResponse.values()) - .map(ScalaVersionSpecificCollectionsConverter::convertScalaMapToJava); + .map(ScalaJavaConversions::toJava); } public Fetcher.SeriesStatsResponse toScala() { return new Fetcher.SeriesStatsResponse( request.toScalaRequest(), - values.map(ScalaVersionSpecificCollectionsConverter::convertJavaMapToScala).toScala()); + values.map(ScalaJavaConversions::toScala).toScala()); } } diff --git a/online/src/main/java/ai/chronon/online/JavaStatsRequest.java b/online/src/main/java/ai/chronon/online/JavaStatsRequest.java index c94b01f0f3..96ffdd0684 100644 --- a/online/src/main/java/ai/chronon/online/JavaStatsRequest.java +++ b/online/src/main/java/ai/chronon/online/JavaStatsRequest.java @@ -17,7 +17,7 @@ package ai.chronon.online; import scala.Option; -import scala.util.ScalaVersionSpecificCollectionsConverter; +import ai.chronon.api.ScalaJavaConversions; import java.util.Map; diff --git a/online/src/main/java/ai/chronon/online/JavaStatsResponse.java b/online/src/main/java/ai/chronon/online/JavaStatsResponse.java index be650d06ab..a2da3ee708 100644 --- a/online/src/main/java/ai/chronon/online/JavaStatsResponse.java +++ b/online/src/main/java/ai/chronon/online/JavaStatsResponse.java @@ -16,7 +16,7 @@ package ai.chronon.online; -import scala.util.ScalaVersionSpecificCollectionsConverter; +import ai.chronon.api.ScalaJavaConversions; import java.util.Map; @@ -42,7 +42,7 @@ public JavaStatsResponse(Fetcher.StatsResponse scalaResponse){ this.request = new JavaStatsRequest(scalaResponse.request()); this.values = JTry .fromScala(scalaResponse.values()) - .map(ScalaVersionSpecificCollectionsConverter::convertScalaMapToJava); + .map(ScalaJavaConversions::toJava); this.millis = scalaResponse.millis(); } diff --git a/online/src/main/scala-2.11/ai/chronon/online/ScalaVersionSpecificCatalystHelper.scala b/online/src/main/scala-2.11/ai/chronon/online/ScalaVersionSpecificCatalystHelper.scala deleted file mode 100644 index 218607f66f..0000000000 --- a/online/src/main/scala-2.11/ai/chronon/online/ScalaVersionSpecificCatalystHelper.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (C) 2023 The Chronon Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ai.chronon.online - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, InterpretedPredicate} - -object ScalaVersionSpecificCatalystHelper { - - def evalFilterExec(row: InternalRow, condition: Expression, attributes: Seq[Attribute]): Boolean = { - val predicate = InterpretedPredicate.create(condition, attributes) - predicate.initialize(0) - val r = predicate.eval(row) - r - } -} diff --git a/online/src/main/scala-2.13/ai/chronon/online/ScalaVersionSpecificCatalystHelper.scala b/online/src/main/scala-2.13/ai/chronon/online/ScalaVersionSpecificCatalystHelper.scala deleted file mode 100644 index a0547f63c4..0000000000 --- a/online/src/main/scala-2.13/ai/chronon/online/ScalaVersionSpecificCatalystHelper.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (C) 2023 The Chronon Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ai.chronon.online - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Predicate} - -import scala.collection.Seq - -object ScalaVersionSpecificCatalystHelper { - - def evalFilterExec(row: InternalRow, condition: Expression, attributes: Seq[Attribute]): Boolean = { - val predicate = Predicate.create(condition, attributes.toSeq) - predicate.initialize(0) - val r = predicate.eval(row) - r - } -} diff --git a/online/src/main/scala/ai/chronon/online/AvroCodec.scala b/online/src/main/scala/ai/chronon/online/AvroCodec.scala index d268752ce1..107559317e 100644 --- a/online/src/main/scala/ai/chronon/online/AvroCodec.scala +++ b/online/src/main/scala/ai/chronon/online/AvroCodec.scala @@ -18,6 +18,7 @@ package ai.chronon.online import ai.chronon.api.DataType import ai.chronon.api.Row +import ai.chronon.api.ScalaJavaConversions._ import org.apache.avro.Schema import org.apache.avro.Schema.Field import org.apache.avro.file.SeekableByteArrayInput @@ -29,7 +30,6 @@ import org.apache.avro.io._ import java.io.ByteArrayOutputStream import scala.collection.mutable -import scala.util.ScalaJavaConversions.ListOps class AvroCodec(val schemaStr: String) extends Serializable { @transient private lazy val parser = new Schema.Parser() diff --git a/online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala b/online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala index f72a55601c..81e928e099 100644 --- a/online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala +++ b/online/src/main/scala/ai/chronon/online/DataStreamBuilder.scala @@ -20,14 +20,13 @@ import ai.chronon.api import ai.chronon.api.Constants import ai.chronon.api.DataModel import ai.chronon.api.DataModel.DataModel +import ai.chronon.api.ScalaJavaConversions._ import org.apache.spark.sql.DataFrame import org.slf4j.Logger import org.slf4j.LoggerFactory import scala.collection.Seq import scala.util.Failure -import scala.util.ScalaJavaConversions.ListOps -import scala.util.ScalaJavaConversions.MapOps import scala.util.Success import scala.util.Try diff --git a/online/src/main/scala/ai/chronon/online/JoinCodec.scala b/online/src/main/scala/ai/chronon/online/JoinCodec.scala index 2048382d51..bd2570d75b 100644 --- a/online/src/main/scala/ai/chronon/online/JoinCodec.scala +++ b/online/src/main/scala/ai/chronon/online/JoinCodec.scala @@ -20,6 +20,7 @@ import ai.chronon.api.DataType import ai.chronon.api.Extensions.JoinOps import ai.chronon.api.Extensions.MetadataOps import ai.chronon.api.HashUtils +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api.StructField import ai.chronon.api.StructType import ai.chronon.online.OnlineDerivationUtil.DerivationFunc @@ -28,8 +29,6 @@ import ai.chronon.online.OnlineDerivationUtil.buildDerivedFields import ai.chronon.online.OnlineDerivationUtil.buildRenameOnlyDerivationFunction import com.google.gson.Gson -import scala.util.ScalaJavaConversions.JMapOps - case class JoinCodec(conf: JoinOps, keySchema: StructType, baseValueSchema: StructType, diff --git a/online/src/main/scala/ai/chronon/online/Metrics.scala b/online/src/main/scala/ai/chronon/online/Metrics.scala index 00ef4d1cd5..d7b13b71bc 100644 --- a/online/src/main/scala/ai/chronon/online/Metrics.scala +++ b/online/src/main/scala/ai/chronon/online/Metrics.scala @@ -17,13 +17,12 @@ package ai.chronon.online import ai.chronon.api.Extensions._ +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ import com.timgroup.statsd.Event import com.timgroup.statsd.NonBlockingStatsDClient import com.timgroup.statsd.NonBlockingStatsDClientBuilder -import scala.util.ScalaJavaConversions.ListOps - object Metrics { object Environment extends Enumeration { type Environment = String diff --git a/online/src/main/scala/ai/chronon/online/SparkInternalRowConversions.scala b/online/src/main/scala/ai/chronon/online/SparkInternalRowConversions.scala index cc0b1ca7d0..a06ab9d1e2 100644 --- a/online/src/main/scala/ai/chronon/online/SparkInternalRowConversions.scala +++ b/online/src/main/scala/ai/chronon/online/SparkInternalRowConversions.scala @@ -16,6 +16,7 @@ package ai.chronon.online +import ai.chronon.api.ScalaJavaConversions._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.util.ArrayBasedMapData @@ -27,7 +28,6 @@ import org.apache.spark.unsafe.types.UTF8String import java.util import scala.collection.mutable -import scala.util.ScalaJavaConversions.IteratorOps object SparkInternalRowConversions { // the identity function diff --git a/online/src/main/scala/ai/chronon/online/TileCodec.scala b/online/src/main/scala/ai/chronon/online/TileCodec.scala index cfa141ff0c..788047c623 100644 --- a/online/src/main/scala/ai/chronon/online/TileCodec.scala +++ b/online/src/main/scala/ai/chronon/online/TileCodec.scala @@ -23,11 +23,11 @@ import ai.chronon.api.Extensions.AggregationOps import ai.chronon.api.Extensions.MetadataOps import ai.chronon.api.Extensions.WindowUtils import ai.chronon.api.GroupBy +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api.StructType import org.apache.avro.generic.GenericData import scala.collection.JavaConverters._ -import scala.util.ScalaJavaConversions.ListOps object TileCodec { def buildRowAggregator(groupBy: GroupBy, inputSchema: Seq[(String, DataType)]): RowAggregator = { diff --git a/online/src/main/scala/ai/chronon/online/stats/TileDriftCalculator.scala b/online/src/main/scala/ai/chronon/online/stats/TileDriftCalculator.scala index 624099a81c..3f87c58b6b 100644 --- a/online/src/main/scala/ai/chronon/online/stats/TileDriftCalculator.scala +++ b/online/src/main/scala/ai/chronon/online/stats/TileDriftCalculator.scala @@ -1,6 +1,7 @@ package ai.chronon.online.stats import ai.chronon.api.Extensions.WindowOps +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api.Window import ai.chronon.observability.DriftMetric import ai.chronon.observability.TileDrift @@ -8,8 +9,6 @@ import ai.chronon.observability.TileSummary import ai.chronon.online.stats.DriftMetrics.histogramDistance import ai.chronon.online.stats.DriftMetrics.percentileDistance -import scala.util.ScalaJavaConversions.IteratorOps - object TileDriftCalculator { @inline diff --git a/online/src/test/scala/ai/chronon/online/test/DataStreamBuilderTest.scala b/online/src/test/scala/ai/chronon/online/test/DataStreamBuilderTest.scala index cc01c96c82..c549ebac73 100644 --- a/online/src/test/scala/ai/chronon/online/test/DataStreamBuilderTest.scala +++ b/online/src/test/scala/ai/chronon/online/test/DataStreamBuilderTest.scala @@ -19,6 +19,7 @@ package ai.chronon.online.test import ai.chronon.api.Builders import ai.chronon.api.DataModel import ai.chronon.api.LongType +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api.StringType import ai.chronon.api.StructField import ai.chronon.api.StructType @@ -34,8 +35,6 @@ import org.junit.Test import org.slf4j.Logger import org.slf4j.LoggerFactory -import scala.util.ScalaJavaConversions.JListOps - class DataStreamBuilderTest { @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) lazy val spark: SparkSession = { diff --git a/online/src/test/scala/ai/chronon/online/test/stats/DriftMetricsTest.scala b/online/src/test/scala/ai/chronon/online/test/stats/DriftMetricsTest.scala index e4dee24b9a..bfe1a2a073 100644 --- a/online/src/test/scala/ai/chronon/online/test/stats/DriftMetricsTest.scala +++ b/online/src/test/scala/ai/chronon/online/test/stats/DriftMetricsTest.scala @@ -1,13 +1,12 @@ package ai.chronon.online.test.stats +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.observability.DriftMetric import ai.chronon.online.stats.DriftMetrics.histogramDistance import ai.chronon.online.stats.DriftMetrics.percentileDistance import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import scala.util.ScalaJavaConversions.JMapOps - class DriftMetricsTest extends AnyFunSuite with Matchers { def buildPercentiles(mean: Double, variance: Double, breaks: Int = 20): Array[Double] = { diff --git a/orchestration/src/main/scala/ai/chronon/orchestration/utils/RelevantLeftForJoinPart.scala b/orchestration/src/main/scala/ai/chronon/orchestration/utils/RelevantLeftForJoinPart.scala index 248c930f71..306bb404d9 100644 --- a/orchestration/src/main/scala/ai/chronon/orchestration/utils/RelevantLeftForJoinPart.scala +++ b/orchestration/src/main/scala/ai/chronon/orchestration/utils/RelevantLeftForJoinPart.scala @@ -4,12 +4,11 @@ import ai.chronon.api.Extensions.GroupByOps import ai.chronon.api.Extensions.JoinPartOps import ai.chronon.api.Extensions.SourceOps import ai.chronon.api.Extensions.StringOps +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ import ai.chronon.orchestration.utils.CollectionExtensions.JMapExtension import ai.chronon.orchestration.utils.ColumnExpression.getTimeExpression -import scala.util.ScalaJavaConversions.ListOps - // TODO(phase-2): This is not wired into the planner yet // computes subset of the left source that is relevant for a join part // we cache the join_part table across joins diff --git a/service_commons/src/main/java/ai/chronon/service/ApiProvider.java b/service_commons/src/main/java/ai/chronon/service/ApiProvider.java index 08e68edbb0..809e0fde16 100644 --- a/service_commons/src/main/java/ai/chronon/service/ApiProvider.java +++ b/service_commons/src/main/java/ai/chronon/service/ApiProvider.java @@ -3,7 +3,7 @@ import ai.chronon.online.Api; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.util.ScalaVersionSpecificCollectionsConverter; +import ai.chronon.api.ScalaJavaConversions; import java.io.File; import java.net.URL; @@ -52,7 +52,7 @@ public static Api buildApi(ConfigStore configStore) throws Exception { } Map propsMap = configStore.getOnlineApiProps(); - scala.collection.immutable.Map scalaPropsMap = ScalaVersionSpecificCollectionsConverter.convertJavaMapToScala(propsMap); + scala.collection.immutable.Map scalaPropsMap = ScalaJavaConversions.toScala(propsMap); return (Api) apiClass.getConstructors()[0].newInstance(scalaPropsMap); } diff --git a/spark/src/main/scala/ai/chronon/spark/Analyzer.scala b/spark/src/main/scala/ai/chronon/spark/Analyzer.scala index 24f3309f50..5a8ddbd99e 100644 --- a/spark/src/main/scala/ai/chronon/spark/Analyzer.scala +++ b/spark/src/main/scala/ai/chronon/spark/Analyzer.scala @@ -26,6 +26,7 @@ import ai.chronon.api.DataModel.Entities import ai.chronon.api.DataModel.Events import ai.chronon.api.DataType import ai.chronon.api.Extensions._ +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api.TimeUnit import ai.chronon.api.Window import ai.chronon.online.PartitionRange @@ -50,7 +51,6 @@ import scala.collection.Seq import scala.collection.immutable import scala.collection.mutable import scala.collection.mutable.ListBuffer -import scala.util.ScalaJavaConversions.ListOps //@SerialVersionUID(3457890987L) //class ItemSketchSerializable(var mapSize: Int) extends ItemsSketch[String](mapSize) with Serializable {} diff --git a/spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala b/spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala index 20afe7c40e..a0d38a1b74 100644 --- a/spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala +++ b/spark/src/main/scala/ai/chronon/spark/BootstrapInfo.scala @@ -22,6 +22,7 @@ import ai.chronon.api.Extensions._ import ai.chronon.api.ExternalPart import ai.chronon.api.JoinPart import ai.chronon.api.PartitionSpec +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api.StructField import ai.chronon.online.PartitionRange import ai.chronon.online.SparkConversions @@ -36,7 +37,6 @@ import org.slf4j.LoggerFactory import scala.collection.Seq import scala.collection.immutable import scala.collection.mutable -import scala.util.ScalaJavaConversions.ListOps import scala.util.Try case class JoinPartMetadata( diff --git a/spark/src/main/scala/ai/chronon/spark/Extensions.scala b/spark/src/main/scala/ai/chronon/spark/Extensions.scala index a0e650aef3..92feac202f 100644 --- a/spark/src/main/scala/ai/chronon/spark/Extensions.scala +++ b/spark/src/main/scala/ai/chronon/spark/Extensions.scala @@ -20,6 +20,7 @@ import ai.chronon.api import ai.chronon.api.Constants import ai.chronon.api.DataPointer import ai.chronon.api.PartitionSpec +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.online.AvroConversions import ai.chronon.online.PartitionRange import ai.chronon.online.SparkConversions @@ -38,7 +39,6 @@ import org.slf4j.LoggerFactory import java.util import scala.collection.Seq import scala.reflect.ClassTag -import scala.util.ScalaJavaConversions.IteratorOps object Extensions { diff --git a/spark/src/main/scala/ai/chronon/spark/GroupBy.scala b/spark/src/main/scala/ai/chronon/spark/GroupBy.scala index 995d9e31b5..25e74d8113 100644 --- a/spark/src/main/scala/ai/chronon/spark/GroupBy.scala +++ b/spark/src/main/scala/ai/chronon/spark/GroupBy.scala @@ -29,6 +29,7 @@ import ai.chronon.api.DataModel.Events import ai.chronon.api.Extensions._ import ai.chronon.api.ParametricMacro import ai.chronon.api.PartitionSpec +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.online.PartitionRange import ai.chronon.online.RowWrapper import ai.chronon.online.SparkConversions @@ -46,9 +47,6 @@ import org.slf4j.LoggerFactory import java.util import scala.collection.Seq import scala.collection.mutable -import scala.util.ScalaJavaConversions.JListOps -import scala.util.ScalaJavaConversions.ListOps -import scala.util.ScalaJavaConversions.MapOps class GroupBy(val aggregations: Seq[api.Aggregation], val keyColumns: Seq[String], diff --git a/spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala b/spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala index 9ac6e91eca..946d04680f 100644 --- a/spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala +++ b/spark/src/main/scala/ai/chronon/spark/GroupByUpload.scala @@ -30,6 +30,7 @@ import ai.chronon.api.Extensions.SourceOps import ai.chronon.api.GroupByServingInfo import ai.chronon.api.PartitionSpec import ai.chronon.api.QueryUtils +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api.ThriftJsonCodec import ai.chronon.online.Extensions.ChrononStructTypeOps import ai.chronon.online.GroupByServingInfoParsed @@ -50,8 +51,6 @@ import org.slf4j.LoggerFactory import scala.annotation.tailrec import scala.collection.Seq -import scala.util.ScalaJavaConversions.ListOps -import scala.util.ScalaJavaConversions.MapOps import scala.util.Try class GroupByUpload(endPartition: String, groupBy: GroupBy) extends Serializable { diff --git a/spark/src/main/scala/ai/chronon/spark/Join.scala b/spark/src/main/scala/ai/chronon/spark/Join.scala index 14a421ff2c..84beeaa607 100644 --- a/spark/src/main/scala/ai/chronon/spark/Join.scala +++ b/spark/src/main/scala/ai/chronon/spark/Join.scala @@ -19,6 +19,7 @@ package ai.chronon.spark import ai.chronon.api import ai.chronon.api.DataModel.Entities import ai.chronon.api.Extensions._ +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ import ai.chronon.online.PartitionRange import ai.chronon.online.SparkConversions @@ -38,8 +39,6 @@ import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.jdk.CollectionConverters._ import scala.util.Failure -import scala.util.ScalaJavaConversions.ListOps -import scala.util.ScalaJavaConversions.MapOps import scala.util.Success import scala.util.Try diff --git a/spark/src/main/scala/ai/chronon/spark/JoinBase.scala b/spark/src/main/scala/ai/chronon/spark/JoinBase.scala index 281098ffff..487878c233 100644 --- a/spark/src/main/scala/ai/chronon/spark/JoinBase.scala +++ b/spark/src/main/scala/ai/chronon/spark/JoinBase.scala @@ -24,6 +24,7 @@ import ai.chronon.api.DataModel.Events import ai.chronon.api.Extensions._ import ai.chronon.api.JoinPart import ai.chronon.api.PartitionSpec +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.online.Metrics import ai.chronon.online.PartitionRange import ai.chronon.spark.Extensions._ @@ -42,7 +43,6 @@ import java.time.Instant import java.util import scala.collection.JavaConverters._ import scala.collection.Seq -import scala.util.ScalaJavaConversions.ListOps abstract class JoinBase(joinConf: api.Join, endPartition: String, diff --git a/spark/src/main/scala/ai/chronon/spark/JoinUtils.scala b/spark/src/main/scala/ai/chronon/spark/JoinUtils.scala index c3768ccdf7..208cc5b0f7 100644 --- a/spark/src/main/scala/ai/chronon/spark/JoinUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/JoinUtils.scala @@ -21,6 +21,7 @@ import ai.chronon.api.Constants import ai.chronon.api.DataModel.Events import ai.chronon.api.Extensions.JoinOps import ai.chronon.api.Extensions._ +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.online.PartitionRange import ai.chronon.spark.Extensions._ import com.google.gson.Gson @@ -35,7 +36,6 @@ import org.slf4j.LoggerFactory import java.util import scala.jdk.CollectionConverters._ -import scala.util.ScalaJavaConversions.MapOps object JoinUtils { @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) diff --git a/spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala b/spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala index 3e3122af0a..c844b46a26 100644 --- a/spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala +++ b/spark/src/main/scala/ai/chronon/spark/LogFlattenerJob.scala @@ -18,6 +18,7 @@ package ai.chronon.spark import ai.chronon.api import ai.chronon.api.Extensions._ +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ import ai.chronon.online.OnlineDerivationUtil.timeFields import ai.chronon.online._ @@ -36,7 +37,6 @@ import java.util.Base64 import scala.collection.Seq import scala.collection.mutable import scala.util.Failure -import scala.util.ScalaJavaConversions.MapOps import scala.util.Success import scala.util.Try diff --git a/spark/src/main/scala/ai/chronon/spark/LoggingSchema.scala b/spark/src/main/scala/ai/chronon/spark/LoggingSchema.scala index 34bd85b24a..be3b8508d1 100644 --- a/spark/src/main/scala/ai/chronon/spark/LoggingSchema.scala +++ b/spark/src/main/scala/ai/chronon/spark/LoggingSchema.scala @@ -17,14 +17,13 @@ package ai.chronon.spark import ai.chronon.api.HashUtils +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api.StructField import ai.chronon.api.StructType import ai.chronon.online.AvroCodec import ai.chronon.online.JoinCodec import com.google.gson.Gson -import scala.util.ScalaJavaConversions.MapOps - /* * Schema of a published log event. valueCodec includes both base and derived columns. */ diff --git a/spark/src/main/scala/ai/chronon/spark/StagingQuery.scala b/spark/src/main/scala/ai/chronon/spark/StagingQuery.scala index de3d0158e2..3a4ff6dd33 100644 --- a/spark/src/main/scala/ai/chronon/spark/StagingQuery.scala +++ b/spark/src/main/scala/ai/chronon/spark/StagingQuery.scala @@ -19,13 +19,13 @@ package ai.chronon.spark import ai.chronon.api import ai.chronon.api.Extensions._ import ai.chronon.api.ParametricMacro +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.online.PartitionRange import ai.chronon.spark.Extensions._ import org.slf4j.Logger import org.slf4j.LoggerFactory import scala.collection.mutable -import scala.util.ScalaJavaConversions._ class StagingQuery(stagingQueryConf: api.StagingQuery, endPartition: String, tableUtils: TableUtils) { @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) diff --git a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala index 6fae79595a..888b4e3814 100644 --- a/spark/src/main/scala/ai/chronon/spark/TableUtils.scala +++ b/spark/src/main/scala/ai/chronon/spark/TableUtils.scala @@ -24,6 +24,7 @@ import ai.chronon.api.Extensions._ import ai.chronon.api.PartitionSpec import ai.chronon.api.Query import ai.chronon.api.QueryUtils +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.online.PartitionRange import ai.chronon.spark.Extensions.DataPointerOps import ai.chronon.spark.Extensions.DfStats @@ -54,8 +55,6 @@ import scala.collection.immutable import scala.collection.mutable import scala.reflect.runtime.universe._ import scala.util.Failure -import scala.util.ScalaJavaConversions.ListOps -import scala.util.ScalaJavaConversions.MapOps import scala.util.Try /** diff --git a/spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala b/spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala index 40da3a7a43..1880861ab0 100644 --- a/spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala +++ b/spark/src/main/scala/ai/chronon/spark/scripts/ObservabilityDemo.scala @@ -6,6 +6,7 @@ import ai.chronon.api.Constants import ai.chronon.api.Extensions.MetadataOps import ai.chronon.api.Extensions.WindowOps import ai.chronon.api.PartitionSpec +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api.Window import ai.chronon.observability.DriftMetric import ai.chronon.observability.TileDriftSeries @@ -27,7 +28,6 @@ import org.slf4j.LoggerFactory import java.util.concurrent.TimeUnit import scala.concurrent.Await import scala.concurrent.duration.Duration -import scala.util.ScalaJavaConversions.IteratorOps object ObservabilityDemo { @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) diff --git a/spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala b/spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala index cfe422f5d2..b29f6b2ee0 100644 --- a/spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala +++ b/spark/src/main/scala/ai/chronon/spark/stats/CompareJob.scala @@ -21,6 +21,7 @@ import ai.chronon.api.Constants import ai.chronon.api.DataModel.Events import ai.chronon.api.Extensions._ import ai.chronon.api.PartitionSpec +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.online.DataMetrics import ai.chronon.online.PartitionRange import ai.chronon.online.SparkConversions @@ -34,9 +35,6 @@ import org.apache.spark.sql.SaveMode import org.slf4j.Logger import org.slf4j.LoggerFactory -import scala.util.ScalaJavaConversions.ListOps -import scala.util.ScalaJavaConversions.MapOps - /** * Compare Job for comparing data between joins, staging queries and raw queries. * Leverage the compare module for computation between sources. diff --git a/spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala b/spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala index 1531355b31..b98df8432a 100644 --- a/spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala +++ b/spark/src/main/scala/ai/chronon/spark/stats/CompareMetrics.scala @@ -19,6 +19,7 @@ package ai.chronon.spark.stats import ai.chronon.aggregator.row.RowAggregator import ai.chronon.api.Extensions.AggregationPartOps import ai.chronon.api.Extensions.WindowUtils +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ import ai.chronon.online.DataMetrics import ai.chronon.online.SparkConversions @@ -32,7 +33,6 @@ import org.apache.spark.sql.functions import org.apache.spark.sql.types import scala.collection.immutable.SortedMap -import scala.util.ScalaJavaConversions.JMapOps object CompareMetrics { val leftSuffix = "_left" diff --git a/spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala b/spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala index f5e6cfa89a..e6aed4b58f 100644 --- a/spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala +++ b/spark/src/main/scala/ai/chronon/spark/stats/ConsistencyJob.scala @@ -18,6 +18,7 @@ package ai.chronon.spark.stats import ai.chronon import ai.chronon.api.Extensions._ +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ import ai.chronon.online.OnlineDerivationUtil.timeFields import ai.chronon.online._ @@ -28,9 +29,6 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import java.util -import scala.util.ScalaJavaConversions.JListOps -import scala.util.ScalaJavaConversions.ListOps -import scala.util.ScalaJavaConversions.MapOps class ConsistencyJob(session: SparkSession, joinConf: Join, endDate: String) extends Serializable { @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) diff --git a/spark/src/main/scala/ai/chronon/spark/stats/drift/Expressions.scala b/spark/src/main/scala/ai/chronon/spark/stats/drift/Expressions.scala index b7e4462add..b67b5e13cc 100644 --- a/spark/src/main/scala/ai/chronon/spark/stats/drift/Expressions.scala +++ b/spark/src/main/scala/ai/chronon/spark/stats/drift/Expressions.scala @@ -1,6 +1,7 @@ package ai.chronon.spark.stats.drift import ai.chronon.api.Constants +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.observability.Cardinality import ai.chronon.observability.TileKey import ai.chronon.observability.TileSummary @@ -12,7 +13,6 @@ import org.apache.spark.sql.types.StructType import java.lang import scala.collection.mutable import scala.jdk.CollectionConverters.mapAsJavaMapConverter -import scala.util.ScalaJavaConversions.JListOps object Expressions { diff --git a/spark/src/main/scala/ai/chronon/spark/stats/drift/Summarizer.scala b/spark/src/main/scala/ai/chronon/spark/stats/drift/Summarizer.scala index d819e7ff65..b2a7710177 100644 --- a/spark/src/main/scala/ai/chronon/spark/stats/drift/Summarizer.scala +++ b/spark/src/main/scala/ai/chronon/spark/stats/drift/Summarizer.scala @@ -2,6 +2,7 @@ package ai.chronon.spark.stats.drift import ai.chronon.api.ColorPrinter.ColorString import ai.chronon.api.Extensions._ +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ import ai.chronon.observability.Cardinality import ai.chronon.observability.TileKey @@ -31,8 +32,6 @@ import java.io.Serializable import java.nio.charset.Charset import scala.concurrent.Await import scala.util.Failure -import scala.util.ScalaJavaConversions.JMapOps -import scala.util.ScalaJavaConversions.MapOps import scala.util.Success import scala.util.Try diff --git a/spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala b/spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala index 4597819ff5..10a4e8537c 100644 --- a/spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala +++ b/spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala @@ -19,6 +19,7 @@ package ai.chronon.spark.streaming import ai.chronon.api import ai.chronon.api.Extensions.GroupByOps import ai.chronon.api.Extensions.SourceOps +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ import ai.chronon.online.Fetcher.Request import ai.chronon.online.KVStore.PutRequest @@ -52,10 +53,6 @@ import java.util import java.util.Base64 import scala.concurrent.Await import scala.concurrent.duration.DurationInt -import scala.util.ScalaJavaConversions.IteratorOps -import scala.util.ScalaJavaConversions.JIteratorOps -import scala.util.ScalaJavaConversions.ListOps -import scala.util.ScalaJavaConversions.MapOps // micro batching destroys and re-creates these objects repeatedly through ForeachBatchWriter and MapFunction // this allows for re-use diff --git a/spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala b/spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala index e5f9203645..ca5d4e182a 100644 --- a/spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala +++ b/spark/src/main/scala/ai/chronon/spark/utils/MockApi.scala @@ -19,6 +19,7 @@ package ai.chronon.spark.utils import ai.chronon.api.Constants import ai.chronon.api.Extensions.GroupByOps import ai.chronon.api.Extensions.SourceOps +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api.StructType import ai.chronon.online.Fetcher.Response import ai.chronon.online._ @@ -40,9 +41,6 @@ import java.util.concurrent.CompletableFuture import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.Seq import scala.concurrent.Future -import scala.util.ScalaJavaConversions.IteratorOps -import scala.util.ScalaJavaConversions.JListOps -import scala.util.ScalaJavaConversions.JMapOps import scala.util.Success class MockDecoder(inputSchema: StructType) extends Serde { diff --git a/spark/src/test/scala/ai/chronon/spark/test/ChainingFetcherTest.scala b/spark/src/test/scala/ai/chronon/spark/test/ChainingFetcherTest.scala index 6af7150d1a..56cce9b4c3 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/ChainingFetcherTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/ChainingFetcherTest.scala @@ -21,6 +21,7 @@ import ai.chronon.api import ai.chronon.api.Constants.MetadataDataset import ai.chronon.api.Extensions.JoinOps import ai.chronon.api.Extensions.MetadataOps +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ import ai.chronon.online.Fetcher.Request import ai.chronon.online.MetadataStore @@ -44,7 +45,6 @@ import java.util.TimeZone import java.util.concurrent.Executors import scala.collection.Seq import scala.concurrent.ExecutionContext -import scala.util.ScalaJavaConversions._ class ChainingFetcherTest extends TestCase { @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) diff --git a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala index 1e5c8b4eff..3d07ee5b5c 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/FetcherTest.scala @@ -22,6 +22,7 @@ import ai.chronon.api import ai.chronon.api.Constants.MetadataDataset import ai.chronon.api.Extensions.JoinOps import ai.chronon.api.Extensions.MetadataOps +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ import ai.chronon.online.Fetcher.Request import ai.chronon.online.Fetcher.Response @@ -63,7 +64,6 @@ import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.concurrent.duration.SECONDS import scala.io.Source -import scala.util.ScalaJavaConversions._ // Run as follows: sbt "spark/testOnly -- -n fetchertest" class FetcherTest extends AnyFunSuite with TaggedFilterSuite { diff --git a/spark/src/test/scala/ai/chronon/spark/test/GroupByTest.scala b/spark/src/test/scala/ai/chronon/spark/test/GroupByTest.scala index 9b812423b7..fd1b6d9422 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/GroupByTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/GroupByTest.scala @@ -170,7 +170,8 @@ class GroupByTest { val computed = resultDf.select("user", "ts", "listing_view_last30", "listing_view_count") computed.show() - val expected = eventDf.sqlContext.sql(""" + val expected = eventDf.sqlContext.sql( + """ |SELECT | events_last_k.user as user, | queries_last_k.ts as ts, @@ -351,9 +352,10 @@ class GroupByTest { val columns = aggregationsMetadata.map(a => a.name -> a.columnType).toMap assertEquals(Map( - "time_spent_ms" -> LongType, - "price" -> DoubleType - ), columns) + "time_spent_ms" -> LongType, + "price" -> DoubleType + ), + columns) } // test that OrderByLimit and OrderByLimitTimed serialization works well with Spark's data type @@ -423,8 +425,8 @@ class GroupByTest { tableUtils.createDatabase(namespace) DataFrameGen.events(spark, sourceSchema, count = 1000, partitions = 200).save(sourceTable) val source = Builders.Source.events( - query = - Builders.Query(selects = Builders.Selects("ts", "item", "time_spent_ms", "price"), startPartition = startPartition), + query = Builders.Query(selects = Builders.Selects("ts", "item", "time_spent_ms", "price"), + startPartition = startPartition), table = sourceTable ) (source, endPartition) @@ -560,7 +562,8 @@ class GroupByTest { val joinSource = TestUtils.getParentJoin(spark, namespace, "parent_join_table", "parent_gb") val query = Builders.Query(startPartition = today) val chainingGroupBy = TestUtils.getTestGBWithJoinSource(joinSource, query, namespace, "chaining_gb") - val newGroupBy = GroupBy.replaceJoinSource(chainingGroupBy, PartitionRange(today, today), tableUtils, computeDependency = false) + val newGroupBy = + GroupBy.replaceJoinSource(chainingGroupBy, PartitionRange(today, today), tableUtils, computeDependency = false) assertEquals(joinSource.metaData.outputTable, newGroupBy.sources.get(0).table) assertEquals(joinSource.left.topic + Constants.TopicInvalidSuffix, newGroupBy.sources.get(0).topic) @@ -656,13 +659,13 @@ class GroupByTest { new Window(15, TimeUnit.DAYS), new Window(60, TimeUnit.DAYS) ) - ), + ) ) backfill(name = "unit_test_group_by_descriptive_stats", - source = source, - endPartition = endPartition, - namespace = namespace, - tableUtils = tableUtils, - additionalAgg = aggs) + source = source, + endPartition = endPartition, + namespace = namespace, + tableUtils = tableUtils, + additionalAgg = aggs) } } diff --git a/spark/src/test/scala/ai/chronon/spark/test/GroupByUploadTest.scala b/spark/src/test/scala/ai/chronon/spark/test/GroupByUploadTest.scala index 2adf67065e..9b1eafa843 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/GroupByUploadTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/GroupByUploadTest.scala @@ -19,6 +19,7 @@ package ai.chronon.spark.test import ai.chronon.aggregator.test.Column import ai.chronon.aggregator.windowing.TsUtils import ai.chronon.api.Extensions._ +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ import ai.chronon.online.Fetcher import ai.chronon.spark.Extensions.DataframeOps @@ -35,8 +36,6 @@ import org.slf4j.LoggerFactory import scala.concurrent.Await import scala.concurrent.duration.DurationInt -import scala.util.ScalaJavaConversions.JMapOps -import scala.util.ScalaJavaConversions.ListOps class GroupByUploadTest { @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) diff --git a/spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala b/spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala index e5ea2e593d..8b595f9136 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/JoinTest.scala @@ -24,6 +24,7 @@ import ai.chronon.api.Constants import ai.chronon.api.Extensions._ import ai.chronon.api.LongType import ai.chronon.api.Operation +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api.StringType import ai.chronon.api.TimeUnit import ai.chronon.api.Window @@ -41,7 +42,6 @@ import org.junit.Assert._ import org.scalatest.funsuite.AnyFunSuite import scala.collection.JavaConverters._ -import scala.util.ScalaJavaConversions.ListOps case class TestRow(ds: String, value: String) {} diff --git a/spark/src/test/scala/ai/chronon/spark/test/LocalDataLoaderTest.scala b/spark/src/test/scala/ai/chronon/spark/test/LocalDataLoaderTest.scala index 07769d8f9c..45193b2181 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/LocalDataLoaderTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/LocalDataLoaderTest.scala @@ -32,10 +32,8 @@ object LocalDataLoaderTest { val tmpDir: File = Files.createTempDir() - val spark: SparkSession = SparkSessionBuilder.build( - "LocalDataLoaderTest", - local = true, - localWarehouseLocation = Some(tmpDir.getPath)) + val spark: SparkSession = + SparkSessionBuilder.build("LocalDataLoaderTest", local = true, localWarehouseLocation = Some(tmpDir.getPath)) @AfterClass def teardown(): Unit = { diff --git a/spark/src/test/scala/ai/chronon/spark/test/LocalTableExporterTest.scala b/spark/src/test/scala/ai/chronon/spark/test/LocalTableExporterTest.scala index 7ccfa92ce9..23149d6c7a 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/LocalTableExporterTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/LocalTableExporterTest.scala @@ -41,7 +41,8 @@ import java.io.File object LocalTableExporterTest { val tmpDir: File = Files.createTempDir() - val spark: SparkSession = SparkSessionBuilder.build("LocalTableExporterTest", local = true, localWarehouseLocation = Some(tmpDir.getPath)) + val spark: SparkSession = + SparkSessionBuilder.build("LocalTableExporterTest", local = true, localWarehouseLocation = Some(tmpDir.getPath)) @AfterClass def teardown(): Unit = { diff --git a/spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala b/spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala index f22cfedd33..172980c922 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/MigrationCompareTest.scala @@ -84,11 +84,12 @@ class MigrationCompareTest { //--------------------------------Staging Query----------------------------- val stagingQueryConf = Builders.StagingQuery( - query = s"select * from ${joinConf.metaData.outputTable} WHERE ds BETWEEN '{{ start_date }}' AND '{{ end_date }}'", + query = + s"select * from ${joinConf.metaData.outputTable} WHERE ds BETWEEN '{{ start_date }}' AND '{{ end_date }}'", startPartition = ninetyDaysAgo, metaData = Builders.MetaData(name = "test.item_snapshot_features_sq_3", - namespace = namespace, - tableProperties = Map("key" -> "val")) + namespace = namespace, + tableProperties = Map("key" -> "val")) ) (joinConf, stagingQueryConf) @@ -113,8 +114,8 @@ class MigrationCompareTest { query = s"select item, ts, ds from ${joinConf.metaData.outputTable}", startPartition = ninetyDaysAgo, metaData = Builders.MetaData(name = "test.item_snapshot_features_sq_4", - namespace = namespace, - tableProperties = Map("key" -> "val")) + namespace = namespace, + tableProperties = Map("key" -> "val")) ) val (compareDf, metricsDf, metrics: DataMetrics) = @@ -141,8 +142,8 @@ class MigrationCompareTest { query = s"select * from ${joinConf.metaData.outputTable} where ds BETWEEN '${monthAgo}' AND '${today}'", startPartition = ninetyDaysAgo, metaData = Builders.MetaData(name = "test.item_snapshot_features_sq_5", - namespace = namespace, - tableProperties = Map("key" -> "val")) + namespace = namespace, + tableProperties = Map("key" -> "val")) ) val (compareDf, metricsDf, metrics: DataMetrics) = diff --git a/spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala b/spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala index a5365faceb..92a1ca4543 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/MockKVStore.scala @@ -6,29 +6,29 @@ import scala.collection.mutable import scala.concurrent.Future class MockKVStore() extends KVStore with Serializable { - val num_puts: mutable.Map[String,Int] = collection.mutable.Map[String, Int]() + val num_puts: mutable.Map[String, Int] = collection.mutable.Map[String, Int]() - def bulkPut(sourceOfflineTable: String,destinationOnlineDataSet: String,partition: String): Unit = - throw new UnsupportedOperationException("Not implemented in mock") - def create(dataset: String): Unit = - { - num_puts(dataset) = 0 - } - def multiGet(requests: Seq[ai.chronon.online.KVStore.GetRequest]): scala.concurrent.Future[Seq[ai.chronon.online.KVStore.GetResponse]] = + def bulkPut(sourceOfflineTable: String, destinationOnlineDataSet: String, partition: String): Unit = + throw new UnsupportedOperationException("Not implemented in mock") + def create(dataset: String): Unit = { + num_puts(dataset) = 0 + } + def multiGet(requests: Seq[ai.chronon.online.KVStore.GetRequest]) + : scala.concurrent.Future[Seq[ai.chronon.online.KVStore.GetResponse]] = throw new UnsupportedOperationException("Not implemented in mock") def multiPut(keyValueDatasets: Seq[ai.chronon.online.KVStore.PutRequest]): scala.concurrent.Future[Seq[Boolean]] = { logger.info(s"Triggering multiput for ${keyValueDatasets.size}: rows") for (req <- keyValueDatasets if (!req.keyBytes.isEmpty && !req.valueBytes.isEmpty)) num_puts(req.dataset) += 1 val futureResponses = keyValueDatasets.map { req => - if (!req.keyBytes.isEmpty && !req.valueBytes.isEmpty) Future{true} - else Future{false} + if (!req.keyBytes.isEmpty && !req.valueBytes.isEmpty) Future { true } + else Future { false } } Future.sequence(futureResponses) } def show(): Unit = { num_puts.foreach(x => logger.info(s"Ran ${x._2} non-empty put actions for dataset ${x._1}")) - + } -} \ No newline at end of file +} diff --git a/spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala b/spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala index dd86f5f0eb..a5e98bfe0a 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/SchemaEvolutionTest.scala @@ -18,6 +18,7 @@ package ai.chronon.spark.test import ai.chronon.api.Constants.MetadataDataset import ai.chronon.api.Extensions.MetadataOps +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ import ai.chronon.online.Fetcher.Request import ai.chronon.online._ @@ -46,8 +47,6 @@ import scala.collection.Seq import scala.concurrent.Await import scala.concurrent.duration.Duration import scala.concurrent.duration.SECONDS -import scala.util.ScalaJavaConversions.JListOps -import scala.util.ScalaJavaConversions.ListOps case class GroupByTestSuite( name: String, diff --git a/spark/src/test/scala/ai/chronon/spark/test/StagingQueryTest.scala b/spark/src/test/scala/ai/chronon/spark/test/StagingQueryTest.scala index a77914b7d7..616f98f837 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/StagingQueryTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/StagingQueryTest.scala @@ -91,8 +91,11 @@ class StagingQueryTest { val expectedWithOverrideStartPartition = tableUtils.sql(s"select * from $viewName where ds = '$today' AND user IS NOT NULL") - val computedWithOverrideStartPartition = tableUtils.sql(s"select * from ${stagingQueryConf.metaData.outputTable} WHERE user IS NOT NULL") - val diffWithOverrideStartPartition = Comparison.sideBySide(expectedWithOverrideStartPartition, computedWithOverrideStartPartition, List("user", "ts", "ds")) + val computedWithOverrideStartPartition = + tableUtils.sql(s"select * from ${stagingQueryConf.metaData.outputTable} WHERE user IS NOT NULL") + val diffWithOverrideStartPartition = Comparison.sideBySide(expectedWithOverrideStartPartition, + computedWithOverrideStartPartition, + List("user", "ts", "ds")) if (diffWithOverrideStartPartition.count() > 0) { println(s"Actual count: ${expectedWithOverrideStartPartition.count()}") println(expectedWithOverrideStartPartition.show()) diff --git a/spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala b/spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala index 7632927cfc..47f57fc91a 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/TestUtils.scala @@ -18,6 +18,7 @@ package ai.chronon.spark.test import ai.chronon.aggregator.test.Column import ai.chronon.api +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ import ai.chronon.online.SparkConversions import ai.chronon.spark.Extensions._ @@ -27,8 +28,6 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.col -import scala.util.ScalaJavaConversions.JListOps - object TestUtils { def createViewsGroupBy(namespace: String, spark: SparkSession, diff --git a/spark/src/test/scala/ai/chronon/spark/test/bootstrap/DerivationTest.scala b/spark/src/test/scala/ai/chronon/spark/test/bootstrap/DerivationTest.scala index 4e9e8a0ffa..3e6d2daf1f 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/bootstrap/DerivationTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/bootstrap/DerivationTest.scala @@ -18,6 +18,7 @@ package ai.chronon.spark.test.bootstrap import ai.chronon.api.Builders.Derivation import ai.chronon.api.Extensions._ +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ import ai.chronon.online.Fetcher.Request import ai.chronon.online.MetadataStore @@ -37,7 +38,6 @@ import org.slf4j.LoggerFactory import scala.concurrent.Await import scala.concurrent.duration.Duration -import scala.util.ScalaJavaConversions.JListOps class DerivationTest { @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) @@ -52,8 +52,7 @@ class DerivationTest { tableUtils.createDatabase(namespace) val groupBy = BootstrapUtils.buildGroupBy(namespace, spark) - val derivation1 = Builders.Derivation(name = "user_amount_30d_avg", - expression = "amount_dollars_sum_30d / 30") + val derivation1 = Builders.Derivation(name = "user_amount_30d_avg", expression = "amount_dollars_sum_30d / 30") val derivation2 = Builders.Derivation( name = "*" ) @@ -268,7 +267,8 @@ class DerivationTest { outputDf("ts"), contextualBootstrapDf("user_txn_count_30d"), externalBootstrapDf("ext_payments_service_user_txn_count_15d").as("user_txn_count_15d"), - (concat(externalBootstrapDf("ext_payments_service_user_txn_count_15d"), lit(' '), outputDf("user"))).as("user_txn_count_15d_with_user_id"), + (concat(externalBootstrapDf("ext_payments_service_user_txn_count_15d"), lit(' '), outputDf("user"))) + .as("user_txn_count_15d_with_user_id"), outputDf("user_amount_30d"), outputDf("user_amount_15d"), coalesce(diffBootstrapDf("user_amount_30d_minus_15d"), outputDf("user_amount_30d_minus_15d")) @@ -560,7 +560,6 @@ class DerivationTest { assertFalse(schema1.contains("context_2")) assertTrue(schema1.contains("ext_contextual_context_2")) - /* * In order to keep the `key` format, use explicit rename derivation * Otherwise, in a * derivation, we keep only the values and discard the keys @@ -605,7 +604,6 @@ class DerivationTest { assertFalse(schema3.contains("context_2")) assertFalse(schema3.contains("ext_contextual_context_2")) - /* * If we want to keep both format, select both format explicitly */ @@ -636,21 +634,18 @@ class DerivationTest { tableUtils.createDatabase(namespace) val groupBy = BootstrapUtils.buildGroupBy(namespace, spark) groupBy.setBackfillStartDate(today) - groupBy.setDerivations(Seq( - Builders.Derivation( - name = "*"), - Builders.Derivation( - name = "amount_dollars_avg_15d", - expression = "amount_dollars_sum_15d / 15" - )).toJava) + groupBy.setDerivations( + Seq(Builders.Derivation(name = "*"), + Builders.Derivation( + name = "amount_dollars_avg_15d", + expression = "amount_dollars_sum_15d / 15" + )).toJava) ai.chronon.spark.GroupBy.computeBackfill(groupBy, today, tableUtils) - val actualDf = tableUtils.sql( - s""" + val actualDf = tableUtils.sql(s""" |select * from $namespace.${groupBy.metaData.cleanName} |""".stripMargin) - val expectedDf = tableUtils.sql( - s""" + val expectedDf = tableUtils.sql(s""" |select | user, | amount_dollars_sum_30d, diff --git a/spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala b/spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala index 2bb7807b71..30ddde1de7 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/bootstrap/LogBootstrapTest.scala @@ -17,6 +17,7 @@ package ai.chronon.spark.test.bootstrap import ai.chronon.api.Extensions._ +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ import ai.chronon.online.Fetcher.Request import ai.chronon.online.MetadataStore @@ -37,7 +38,6 @@ import org.slf4j.LoggerFactory import scala.concurrent.Await import scala.concurrent.duration.Duration -import scala.util.ScalaJavaConversions._ class LogBootstrapTest { @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) diff --git a/spark/src/test/scala/ai/chronon/spark/test/bootstrap/TableBootstrapTest.scala b/spark/src/test/scala/ai/chronon/spark/test/bootstrap/TableBootstrapTest.scala index 3d87715c12..ee729a35c5 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/bootstrap/TableBootstrapTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/bootstrap/TableBootstrapTest.scala @@ -17,6 +17,7 @@ package ai.chronon.spark.test.bootstrap import ai.chronon.api.Extensions.JoinOps +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api._ import ai.chronon.spark.Comparison import ai.chronon.spark.Extensions._ @@ -31,8 +32,6 @@ import org.junit.Test import org.slf4j.Logger import org.slf4j.LoggerFactory -import scala.util.ScalaJavaConversions.JListOps - class TableBootstrapTest { @transient lazy val logger: Logger = LoggerFactory.getLogger(getClass) diff --git a/spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala b/spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala index bf4463b85d..aa6764b4eb 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/stats/drift/DriftTest.scala @@ -6,6 +6,7 @@ import ai.chronon.api.Constants import ai.chronon.api.Extensions.MetadataOps import ai.chronon.api.Extensions.WindowOps import ai.chronon.api.PartitionSpec +import ai.chronon.api.ScalaJavaConversions._ import ai.chronon.api.Window import ai.chronon.observability.DriftMetric import ai.chronon.observability.TileSummarySeries @@ -26,9 +27,6 @@ import java.util.concurrent.TimeUnit import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration.Duration -import scala.util.ScalaJavaConversions.IteratorOps -import scala.util.ScalaJavaConversions.ListOps -import scala.util.ScalaJavaConversions.MapOps import scala.util.Success class DriftTest extends AnyFlatSpec with Matchers { @@ -82,7 +80,7 @@ class DriftTest extends AnyFlatSpec with Matchers { api.buildFetcher().putJoinConf(join) // upload summaries - val uploader = new SummaryUploader(tableUtils.loadTable(packedTable),api) + val uploader = new SummaryUploader(tableUtils.loadTable(packedTable), api) uploader.run() // test drift store methods @@ -116,8 +114,7 @@ class DriftTest extends AnyFlatSpec with Matchers { (nulls + currentNulls, total + currentCount) } - println( - s"""drift totals: $totals + println(s"""drift totals: $totals |drift nulls: $nulls |""".stripMargin.red) @@ -142,8 +139,7 @@ class DriftTest extends AnyFlatSpec with Matchers { (nulls + currentNulls, total + currentCount) } } - println( - s"""summary ptile totals: $summaryTotals + println(s"""summary ptile totals: $summaryTotals |summary ptile nulls: $summaryNulls |""".stripMargin) @@ -151,8 +147,8 @@ class DriftTest extends AnyFlatSpec with Matchers { summaryNulls.toDouble / summaryTotals.toDouble should be < 0.1 println("Summary series fetched successfully".green) - val startTs=1673308800000L - val endTs=1674172800000L + val startTs = 1673308800000L + val endTs = 1674172800000L val joinName = "risk.user_transactions.txn_join" val name = "dim_user_account_type" val window = new Window(10, ai.chronon.api.TimeUnit.HOURS) @@ -172,7 +168,7 @@ class DriftTest extends AnyFlatSpec with Matchers { val isCurrentNumeric = currentSummarySeries.headOption.forall(checkIfNumeric) val isBaselineNumeric = baselineSummarySeries.headOption.forall(checkIfNumeric) - val currentFeatureTs = { + val currentFeatureTs = { if (currentSummarySeries.isEmpty) Seq.empty else convertTileSummarySeriesToTimeSeries(currentSummarySeries.head, isCurrentNumeric, metric) } @@ -202,7 +198,6 @@ class DriftTest extends AnyFlatSpec with Matchers { /** Roll up over raw values */ case object ValuesMetric extends Metric - case class TimeSeriesPoint(value: Double, ts: Long, label: Option[String] = None, nullValue: Option[Int] = None) def checkIfNumeric(summarySeries: TileSummarySeries): Boolean = { @@ -210,8 +205,6 @@ class DriftTest extends AnyFlatSpec with Matchers { ptiles != null && ptiles.exists(_ != null) } - - private def convertTileSummarySeriesToTimeSeries(summarySeries: TileSummarySeries, isNumeric: Boolean, metric: Metric): Seq[TimeSeriesPoint] = { @@ -224,16 +217,18 @@ class DriftTest extends AnyFlatSpec with Matchers { val percentileSeriesPerBreak = summarySeries.percentiles.toScala val timeStamps = summarySeries.timestamps.toScala val breaks = DriftStore.breaks(20) - percentileSeriesPerBreak.zip(breaks).flatMap{ case (percentileSeries, break) => - percentileSeries.toScala.zip(timeStamps).map{case (value, ts) => TimeSeriesPoint(value, ts, Some(break))} + percentileSeriesPerBreak.zip(breaks).flatMap { + case (percentileSeries, break) => + percentileSeries.toScala.zip(timeStamps).map { case (value, ts) => TimeSeriesPoint(value, ts, Some(break)) } } } else { val histogramOfSeries = summarySeries.histogram.toScala val timeStamps = summarySeries.timestamps.toScala - histogramOfSeries.flatMap{ case (label, values) => - values.toScala.zip(timeStamps).map{case (value, ts) => TimeSeriesPoint(value.toDouble, ts, Some(label))} + histogramOfSeries.flatMap { + case (label, values) => + values.toScala.zip(timeStamps).map { case (value, ts) => TimeSeriesPoint(value.toDouble, ts, Some(label)) } }.toSeq } } } -} \ No newline at end of file +} diff --git a/spark/src/test/scala/ai/chronon/spark/test/udafs/ApproxDistinctTest.scala b/spark/src/test/scala/ai/chronon/spark/test/udafs/ApproxDistinctTest.scala index 92fb878eeb..999bb0bec2 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/udafs/ApproxDistinctTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/udafs/ApproxDistinctTest.scala @@ -27,22 +27,23 @@ class ApproxDistinctTest extends AnyFlatSpec with Matchers { val result = ApproxDistinct.columnCardinality(df) // Check if all columns are present in the result - result.keySet should contain allOf("int_col", "string_col", "double_col", "array_col", "map_col") + result.keySet should contain allOf ("int_col", "string_col", "double_col", "array_col", "map_col") // Check if the cardinality estimates are reasonable - result("int_col") should be(4L +- 1L) // Exact: 4 - result("string_col") should be(4L +- 1L) // Exact: 4 - result("double_col") should be(4L +- 1L) // Exact: 4 - result("array_col") should be(6L +- 1L) // Exact: 6 (unique elements in all arrays) - result("map_col") should be(4L +- 1L) // Exact: 4 (unique values in all maps) + result("int_col") should be(4L +- 1L) // Exact: 4 + result("string_col") should be(4L +- 1L) // Exact: 4 + result("double_col") should be(4L +- 1L) // Exact: 4 + result("array_col") should be(6L +- 1L) // Exact: 6 (unique elements in all arrays) + result("map_col") should be(4L +- 1L) // Exact: 4 (unique values in all maps) } it should "handle null values correctly" in { - val schema = types.StructType(Seq( - types.StructField("int_col", types.IntegerType, nullable = true), - types.StructField("string_col", types.StringType, nullable = true), - types.StructField("double_col", types.DoubleType, nullable = true) - )) + val schema = types.StructType( + Seq( + types.StructField("int_col", types.IntegerType, nullable = true), + types.StructField("string_col", types.StringType, nullable = true), + types.StructField("double_col", types.DoubleType, nullable = true) + )) val data = Seq( Row(1, "A", null), @@ -56,9 +57,9 @@ class ApproxDistinctTest extends AnyFlatSpec with Matchers { val df = spark.createDataFrame(rdd, schema) val result = ApproxDistinct.columnCardinality(df) - result("int_col") should be(3L +- 1L) // Exact: 3 (null is not counted) - result("string_col") should be(3L +- 1L) // Exact: 3 (null is not counted) - result("double_col") should be(3L +- 1L) // Exact: 3 (null is not counted) + result("int_col") should be(3L +- 1L) // Exact: 3 (null is not counted) + result("string_col") should be(3L +- 1L) // Exact: 3 (null is not counted) + result("double_col") should be(3L +- 1L) // Exact: 3 (null is not counted) } it should "handle empty DataFrame" in { @@ -69,4 +70,4 @@ class ApproxDistinctTest extends AnyFlatSpec with Matchers { result("int_col") should be(0L) result("string_col") should be(0L) } -} \ No newline at end of file +} diff --git a/spark/src/test/scala/ai/chronon/spark/test/udafs/HistogramTest.scala b/spark/src/test/scala/ai/chronon/spark/test/udafs/HistogramTest.scala index ba153fca17..b65af4da9c 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/udafs/HistogramTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/udafs/HistogramTest.scala @@ -32,22 +32,29 @@ class HistogramTest extends AnyFlatSpec with Matchers with BeforeAndAfterAll { Row("group2", null), Row("group3", null) ) - val mapSchema = StructType(Seq( - StructField("group", StringType, nullable = false), - StructField("data", MapType(StringType, LongType), nullable = true) - )) + val mapSchema = StructType( + Seq( + StructField("group", StringType, nullable = false), + StructField("data", MapType(StringType, LongType), nullable = true) + )) val mapDF = spark.createDataFrame(spark.sparkContext.parallelize(mapData), mapSchema) mapDF.createOrReplaceTempView("map_data") val stringData = Seq( - Row("group1", "a"), Row("group1", "b"), Row("group1", "a"), - Row("group2", "b"), Row("group2", "c"), Row("group2", "c"), Row("group2", null), + Row("group1", "a"), + Row("group1", "b"), + Row("group1", "a"), + Row("group2", "b"), + Row("group2", "c"), + Row("group2", "c"), + Row("group2", null), Row("group3", null) ) - val stringSchema = StructType(Seq( - StructField("group", StringType, nullable = false), - StructField("data", StringType, nullable = true) - )) + val stringSchema = StructType( + Seq( + StructField("group", StringType, nullable = false), + StructField("data", StringType, nullable = true) + )) val stringDF = spark.createDataFrame(spark.sparkContext.parallelize(stringData), stringSchema) stringDF.createOrReplaceTempView("string_data") @@ -58,10 +65,11 @@ class HistogramTest extends AnyFlatSpec with Matchers with BeforeAndAfterAll { Row("group2", Seq("a", "c", "c", null)), Row("group3", null) ) - val arraySchema = StructType(Seq( - StructField("group", StringType, nullable = false), - StructField("data", ArrayType(StringType), nullable = true) - )) + val arraySchema = StructType( + Seq( + StructField("group", StringType, nullable = false), + StructField("data", ArrayType(StringType), nullable = true) + )) val arrayDF = spark.createDataFrame(spark.sparkContext.parallelize(arrayData), arraySchema) arrayDF.createOrReplaceTempView("array_data") } diff --git a/spark/src/test/scala/ai/chronon/spark/test/udafs/NullnessCountersAggregatorTest.scala b/spark/src/test/scala/ai/chronon/spark/test/udafs/NullnessCountersAggregatorTest.scala index 9aa0b62bcf..ba54c737ef 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/udafs/NullnessCountersAggregatorTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/udafs/NullnessCountersAggregatorTest.scala @@ -16,10 +16,11 @@ class NullnessCountersAggregatorTest extends AnyFlatSpec with Matchers with Befo override def beforeAll(): Unit = { super.beforeAll() - val schema = StructType(Seq( - StructField("id", IntegerType, nullable = false), - StructField("string_array", ArrayType(StringType, containsNull = true), nullable = true) - )) + val schema = StructType( + Seq( + StructField("id", IntegerType, nullable = false), + StructField("string_array", ArrayType(StringType, containsNull = true), nullable = true) + )) val data = Seq( Row(1, Array("a", null, "c", null)), @@ -50,7 +51,8 @@ class NullnessCountersAggregatorTest extends AnyFlatSpec with Matchers with Befo |""".stripMargin ) innerDf.show() - val resultDf = spark.sql(""" + val resultDf = + spark.sql(""" WITH array_counts AS ( SELECT id, @@ -71,7 +73,7 @@ class NullnessCountersAggregatorTest extends AnyFlatSpec with Matchers with Befo resultDf.printSchema() val result = resultDf.collect().head - result.getLong(0) shouldBe 4 // Total nulls + result.getLong(0) shouldBe 4 // Total nulls result.getLong(1) shouldBe 12 // Total size (including nulls) } -} \ No newline at end of file +} diff --git a/spark/src/test/scala/ai/chronon/spark/test/udafs/UDAFSQLUsageTest.scala b/spark/src/test/scala/ai/chronon/spark/test/udafs/UDAFSQLUsageTest.scala index 6976c0cda7..37621609c9 100644 --- a/spark/src/test/scala/ai/chronon/spark/test/udafs/UDAFSQLUsageTest.scala +++ b/spark/src/test/scala/ai/chronon/spark/test/udafs/UDAFSQLUsageTest.scala @@ -32,22 +32,29 @@ class UDAFSQLUsageTest extends AnyFlatSpec with Matchers with BeforeAndAfterAll Row("group2", null), Row("group3", null) ) - val mapSchema = StructType(Seq( - StructField("group", StringType, nullable = false), - StructField("data", MapType(StringType, LongType), nullable = true) - )) + val mapSchema = StructType( + Seq( + StructField("group", StringType, nullable = false), + StructField("data", MapType(StringType, LongType), nullable = true) + )) val mapDF = spark.createDataFrame(spark.sparkContext.parallelize(mapData), mapSchema) mapDF.createOrReplaceTempView("map_data") val stringData = Seq( - Row("group1", "a"), Row("group1", "b"), Row("group1", "a"), - Row("group2", "b"), Row("group2", "c"), Row("group2", "c"), Row("group2", null), + Row("group1", "a"), + Row("group1", "b"), + Row("group1", "a"), + Row("group2", "b"), + Row("group2", "c"), + Row("group2", "c"), + Row("group2", null), Row("group3", null) ) - val stringSchema = StructType(Seq( - StructField("group", StringType, nullable = false), - StructField("data", StringType, nullable = true) - )) + val stringSchema = StructType( + Seq( + StructField("group", StringType, nullable = false), + StructField("data", StringType, nullable = true) + )) val stringDF = spark.createDataFrame(spark.sparkContext.parallelize(stringData), stringSchema) stringDF.createOrReplaceTempView("string_data") @@ -58,10 +65,11 @@ class UDAFSQLUsageTest extends AnyFlatSpec with Matchers with BeforeAndAfterAll Row("group2", Seq("a", "c", "c", null)), Row("group3", null) ) - val arraySchema = StructType(Seq( - StructField("group", StringType, nullable = false), - StructField("data", ArrayType(StringType), nullable = true) - )) + val arraySchema = StructType( + Seq( + StructField("group", StringType, nullable = false), + StructField("data", ArrayType(StringType), nullable = true) + )) val arrayDF = spark.createDataFrame(spark.sparkContext.parallelize(arrayData), arraySchema) arrayDF.createOrReplaceTempView("array_data") } diff --git a/tools/BUILD b/tools/BUILD new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tools/build_rules/BUILD b/tools/build_rules/BUILD new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tools/build_rules/common.bzl b/tools/build_rules/common.bzl new file mode 100644 index 0000000000..9638dc52ae --- /dev/null +++ b/tools/build_rules/common.bzl @@ -0,0 +1,18 @@ +load("@io_bazel_rules_scala_config//:config.bzl", "SCALA_MAJOR_VERSION") +load("@rules_jvm_external//:defs.bzl", "artifact") + +def jar(org, name, rev = None, classifier = None): + if rev: + fail("Passing rev is no longer supported in jar() and scala_jar()") + rev = "" + if classifier: + return "{}:{}:jar:{}:{}".format(org, name, classifier, rev) + else: + return "{}:{}:{}".format(org, name, rev) + +def scala_jar(org, name, rev = None, classifier = None): + name = "{}_{}".format(name, SCALA_MAJOR_VERSION) + return jar(org, name, rev, classifier) + +def exclude(org, name): + return "@maven//:" + org + ":" + name \ No newline at end of file diff --git a/tools/build_rules/dependencies/BUILD b/tools/build_rules/dependencies/BUILD new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tools/build_rules/dependencies/all_repositories.bzl b/tools/build_rules/dependencies/all_repositories.bzl new file mode 100644 index 0000000000..264bd3aea0 --- /dev/null +++ b/tools/build_rules/dependencies/all_repositories.bzl @@ -0,0 +1,18 @@ +load("@rules_jvm_external//:defs.bzl", "DEFAULT_REPOSITORY_NAME") +load("@rules_jvm_external//:specs.bzl", "json", "maven", "parse") + +# Repository artifacts are defined in external files +load(":maven_repository.bzl", "maven_repository") +load(":spark_repository.bzl", "spark_repository") + +all_repositories = [ + # The main repositories are defined in individual files, which are loaded above and referenced here + maven_repository, + spark_repository, +] + +def get_repository(repository_name): + for repository in all_repositories: + if repository.name == repository_name: + return repository + return None \ No newline at end of file diff --git a/tools/build_rules/dependencies/defs.bzl b/tools/build_rules/dependencies/defs.bzl new file mode 100644 index 0000000000..8057456220 --- /dev/null +++ b/tools/build_rules/dependencies/defs.bzl @@ -0,0 +1,34 @@ +load("@rules_jvm_external//:specs.bzl", "parse") +load("//tools/build_rules:utils.bzl", "flatten", "map") +load("@rules_jvm_external//:defs.bzl", "artifact") + +def _parse_versioned_artifact(artifact, version, exclusions): + result = parse.parse_maven_coordinate("{}:{}".format(artifact, version)) + if (exclusions != None): + result["exclusions"] = exclusions + return result + +def versioned_artifacts(version, artifacts, exclusions = None): + return map(lambda artifact: _parse_versioned_artifact(artifact, version, exclusions), artifacts) + +def repository(name, pinned = True, artifacts = [], overrides = {}, provided = False, vars = {}, excluded_artifacts = []): + final_artifacts = [] + flat_artifacts = flatten(artifacts) + for artifact in parse.parse_artifact_spec_list(flat_artifacts): + # Empty string in packaging seems to mess up Coursier, maybe a bug in RJE + if artifact.get("packaging") == "": + artifact.pop("packaging") + artifact["version"] = artifact["version"].format(**vars) + final_artifacts.append(artifact) + return struct( + name = name, + pinned = pinned, + artifacts = final_artifacts, + overrides = overrides, + provided = provided, + vars = vars, + excluded_artifacts = excluded_artifacts, + ) + +def get_jars_for_repository(repo_name, jars): + return [artifact(jar, repository_name = repo_name) for jar in jars] \ No newline at end of file diff --git a/tools/build_rules/dependencies/load_dependencies.bzl b/tools/build_rules/dependencies/load_dependencies.bzl new file mode 100644 index 0000000000..f27af2655e --- /dev/null +++ b/tools/build_rules/dependencies/load_dependencies.bzl @@ -0,0 +1,22 @@ +load("@rules_jvm_external//:defs.bzl", "artifact") +load("@bazel_skylib//lib:dicts.bzl", "dicts") +load("@rules_jvm_external//:defs.bzl", "maven_install") +load(":all_repositories.bzl", "all_repositories") + +_repository_urls = [ + "https://repo1.maven.org/maven2/", +] + +def load_all_dependencies(): + for repository in all_repositories: + maven_install( + name = repository.name, + artifacts = repository.artifacts, + repositories = _repository_urls, + fetch_sources = True, + duplicate_version_warning = "error", + fail_if_repin_required = True, + resolve_timeout = 5000, + maven_install_json = None, + excluded_artifacts = repository.excluded_artifacts, + ) \ No newline at end of file diff --git a/tools/build_rules/dependencies/maven_repository.bzl b/tools/build_rules/dependencies/maven_repository.bzl new file mode 100644 index 0000000000..60675f73b6 --- /dev/null +++ b/tools/build_rules/dependencies/maven_repository.bzl @@ -0,0 +1,35 @@ +load("@rules_jvm_external//:specs.bzl", "maven") +load(":defs.bzl", "repository", "versioned_artifacts") + +maven_repository = repository( + name = "maven", + pinned = False, + artifacts = [ + "org.scala-lang.modules:scala-collection-compat_2.12:2.6.0", + "org.scala-lang.modules:scala-parser-combinators_2.12:2.3.0", + "org.scala-lang.modules:scala-java8-compat_2.12:1.0.0", + "org.apache.commons:commons-lang3:3.12.0", + "org.apache.commons:commons-math3:3.6.1", + + # JUnit + "junit:junit:4.13.2", + "com.novocode:junit-interface:0.11", + "org.scalatestplus:mockito-3-4_2.12:3.2.10.0", + "org.mockito:mockito-core:4.6.1", + "org.mockito:mockito-scala_2.12:1.17.0", + "org.scalatest:scalatest_2.12:3.2.15", + "org.scalatest:scalatest-shouldmatchers_2.12:3.2.15", + "org.scalatest:scalatest-matchers-core_2.12:3.2.15", + "org.scalactic:scalactic_2.12:3.2.15", + + # Add other dependencies + "org.slf4j:slf4j-api:1.7.30", + "org.slf4j:slf4j-log4j12:1.7.30", + "com.fasterxml.jackson.core:jackson-core:2.12.5", + "com.fasterxml.jackson.core:jackson-databind:2.12.5", + "com.google.code.gson:gson:2.8.6", + "javax.annotation:javax.annotation-api:1.3.2", + ], + overrides = { + }, +) \ No newline at end of file diff --git a/tools/build_rules/dependencies/spark_repository.bzl b/tools/build_rules/dependencies/spark_repository.bzl new file mode 100644 index 0000000000..719fda0d40 --- /dev/null +++ b/tools/build_rules/dependencies/spark_repository.bzl @@ -0,0 +1,37 @@ +load("@rules_jvm_external//:specs.bzl", "maven") +load(":defs.bzl", "repository") + +spark_repository = repository( + name = "spark", + provided = True, + artifacts = [ + # Spark artifacts - for scala 2.12 + "org.apache.spark:spark-sql_2.12:3.5.1", + "org.apache.spark:spark-hive_2.12:3.5.1", + "org.apache.spark:spark-streaming_2.12:3.5.1", + + # Other dependencies + "org.apache.curator:apache-curator:2.12.0", + "com.esotericsoftware:kryo:5.1.1", + "com.yahoo.datasketches:sketches-core:0.13.4", + "com.yahoo.datasketches:memory:0.12.2", + "com.yahoo.datasketches:sketches-hive:0.13.0", + "org.apache.datasketches:datasketches-java:2.0.0", + "org.apache.datasketches:datasketches-memory:1.3.0", + + # Kafka dependencies - only Scala 2.12 + "org.apache.kafka:kafka_2.12:2.6.3", + + # Avro dependencies + "org.apache.avro:avro:1.8.2", + "org.apache.avro:avro-mapred:1.8.2", + "org.apache.hive:hive-metastore:2.3.9", + "org.apache.hive:hive-exec:3.1.2", + + # Monitoring + "io.prometheus.jmx:jmx_prometheus_javaagent:0.20.0", + ], + excluded_artifacts = [ + "org.pentaho:pentaho-aggdesigner-algorithm", + ], +) \ No newline at end of file diff --git a/tools/build_rules/jar_library.bzl b/tools/build_rules/jar_library.bzl new file mode 100644 index 0000000000..98f6530f1d --- /dev/null +++ b/tools/build_rules/jar_library.bzl @@ -0,0 +1,18 @@ +load("@bazel_skylib//lib:dicts.bzl", "dicts") +load("@rules_jvm_external//:defs.bzl", "artifact") + +DEFAULT_PROVIDED_REPO = "maven" # For backwards compatability + +def jar_library(name, jars = [], overrides = {}, visibility = ["//visibility:public"], **kwargs): + def _get_jars(repo_name): + return [artifact(jar, repository_name = repo_name) for jar in jars] + + repo_name = DEFAULT_PROVIDED_REPO + configured_jars = _get_jars(repo_name) + + native.java_library( + name = name, + exports = configured_jars, + visibility = visibility, + **kwargs + ) \ No newline at end of file diff --git a/tools/build_rules/maven_artifact.bzl b/tools/build_rules/maven_artifact.bzl new file mode 100644 index 0000000000..fe77331af7 --- /dev/null +++ b/tools/build_rules/maven_artifact.bzl @@ -0,0 +1,41 @@ +load("@rules_jvm_external//:defs.bzl", _rje_artifact = "artifact") +load("@io_bazel_rules_scala_config//:config.bzl", "SCALA_MAJOR_VERSION") +load("//tools/build_rules:jar_library.bzl", "jar_library") + +def _safe_name(coord): + return coord.replace(":", "_").replace(".", "_").replace("-", "_") + +def maven_artifact(coord, repository_name = "maven"): + """ + Helper macro to translate Maven coordinates into Bazel deps. Example: + java_library( + name = "foo", + srcs = ["Foo.java"], + deps = [maven_artifact("com.google.guava:guava")], + ) + Arguments: + repository_name: If provided, always fetch from this Maven repo instead of determining + the repo automatically. Be careful when using this as Bazel will not prevent multiple + jars from providing the same class on the classpath, in which case the order of "deps" + will determine which one "wins". + """ + if repository_name: + return _rje_artifact(coord, repository_name = repository_name) + + safe_name = _safe_name(coord) + + if not native.existing_rule(safe_name): + jar_library( + name = safe_name, + jars = [coord], + visibility = ["//visibility:private"], + tags = ["manual"], + ) + return safe_name + +def scala_artifact(coord, repository_name = "maven"): + """ + Same as "maven_artifact" but appends the current Scala version to the Maven coordinate. + """ + full_coord = coord + "_" + SCALA_MAJOR_VERSION + return maven_artifact(full_coord, repository_name) \ No newline at end of file diff --git a/tools/build_rules/prelude_bazel b/tools/build_rules/prelude_bazel new file mode 100644 index 0000000000..e6180f6c67 --- /dev/null +++ b/tools/build_rules/prelude_bazel @@ -0,0 +1,8 @@ +# Contains default rules and functions available to all BUILD files + +load( + "//tools/build_rules:common.bzl", + "jar", + "scala_jar") + +load("//tools/build_rules:maven_artifact.bzl", "maven_artifact", "scala_artifact") \ No newline at end of file diff --git a/tools/build_rules/spark/BUILD b/tools/build_rules/spark/BUILD new file mode 100644 index 0000000000..f4cb1dccdf --- /dev/null +++ b/tools/build_rules/spark/BUILD @@ -0,0 +1,107 @@ +package(default_visibility = ["//visibility:public"]) + +load("//tools/build_rules/dependencies:defs.bzl", "get_jars_for_repository") + +SPARK_JARS = [ + scala_jar( + name = "spark-core", + org = "org.apache.spark", + ), + scala_jar( + name = "spark-catalyst", + org = "org.apache.spark", + ), + scala_jar( + name = "spark-sql", + org = "org.apache.spark", + ), + scala_jar( + name = "spark-hive", + org = "org.apache.spark", + ), + scala_jar( + name = "spark-sketch", + org = "org.apache.spark", + ), + scala_jar( + name = "spark-streaming", + org = "org.apache.spark", + ), + scala_jar( + name = "spark-tags", + org = "org.apache.spark", + ), + jar( + name = "scala-library", + org = "org.scala-lang", + ), + scala_jar( + name = "spark-unsafe", + org = "org.apache.spark", + ), + jar( + name = "avro", + org = "org.apache.avro", + ), + jar( + name = "hive-metastore", + org = "org.apache.hive", + ), + jar( + name = "hive-exec", + org = "org.apache.hive", + ), + jar( + name = "hadoop-common", + org = "org.apache.hadoop", + ), + jar( + name = "jackson-core", + org = "com.fasterxml.jackson.core", + ), + jar( + name = "jackson-annotations", + org = "com.fasterxml.jackson.core", + ), + jar( + name = "jackson-databind", + org = "com.fasterxml.jackson.core", + ), + jar( + name = "kryo_shaded", + org = "com.esotericsoftware", + ), + scala_jar( + name = "json4s-jackson", + org = "org.json4s", + ), + jar( + name = "commons-lang3", + org = "org.apache.commons", + ), +] + +SPARK_3_5_JARS = SPARK_JARS + [ + scala_jar( + name = "spark-common-utils", + org = "org.apache.spark", + ), + scala_jar( + name = "spark-sql-api", + org = "org.apache.spark", + ), +] + +java_library( + name = "spark-exec", + visibility = ["//visibility:public"], + exports = get_jars_for_repository("spark", SPARK_3_5_JARS), +) + +java_binary( + name = "spark", + main_class = "None", #hack + runtime_deps = [ + "//third_party/java/spark:spark-exec", + ], +) \ No newline at end of file diff --git a/tools/build_rules/thrift/BUILD b/tools/build_rules/thrift/BUILD new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tools/build_rules/thrift/thrift.bzl b/tools/build_rules/thrift/thrift.bzl new file mode 100644 index 0000000000..93d67eb03b --- /dev/null +++ b/tools/build_rules/thrift/thrift.bzl @@ -0,0 +1,106 @@ +# Generates java files from the input thrift files +# Thrift gen command only supports single input file so we are creating actions separately for each thrift file +def generate_java_files_using_thrift(ctx): + thrift_path = ctx.attr.thrift_binary + output_directories = [] + + for src_file in ctx.files.srcs: + # Generate unique output directory for each thrift file + # This is necessary to run all the below actions in parallel using bazel + output_directory = ctx.actions.declare_directory( + ctx.label.name + "_" + src_file.basename.replace(".thrift", "") + ) + output_directories.append(output_directory) + + # Create action for each thrift file separately + ctx.actions.run( + outputs = [output_directory], + inputs = [src_file], + executable = thrift_path, + arguments = [ + "-strict", + "--gen", "java:generated_annotations=undated", + "-out", output_directory.path, + src_file.path + ], + progress_message = "Generating Java code from %s file" % src_file.path, + ) + + return output_directories + +# This is necessary as we have custom thrift dependency in `ai.chronon.api.thrift` package +def replace_java_files_with_custom_thrift_package_prefix(ctx, input_directories): + output_directories = [] + for input_directory in input_directories: + # Declare the output directory with modified files + output_directory = ctx.actions.declare_directory( + input_directory.basename + "_modified" + ) + output_directories.append(output_directory) + + replace_command = """ + shopt -s globstar + for input_file in {input_path}/**/*.java + do + output_file={output_path}/$(basename $input_file) + sed 's/org.apache.thrift/ai.chronon.api.thrift/g' $input_file > $output_file + done + """.format(input_path=input_directory.path, output_path=output_directory.path) + + ctx.actions.run_shell( + inputs=[input_directory], + outputs=[output_directory], + command=replace_command, + progress_message="Replacing package names in input Java files for %s" % input_directory.short_path, + ) + + return output_directories + +# Creates jar file including all files from the given input directories +def create_jar_file(ctx, input_directories): + jar_file = ctx.actions.declare_file(ctx.label.name + ".srcjar") + + jar_cmds = ["jar cf " + jar_file.path] + for input_directory in input_directories: + jar_cmds.append("-C " + input_directory.path + " .") + jar_cmd = " ".join(jar_cmds) + + ctx.actions.run_shell( + outputs=[jar_file], + inputs=input_directories, + command=jar_cmd, + progress_message="Creating srcjar from all input files", + ) + + return jar_file + +def _thrift_java_library_impl(ctx): + thrift_output_directories = generate_java_files_using_thrift(ctx) + final_output_directories = replace_java_files_with_custom_thrift_package_prefix(ctx, thrift_output_directories) + jar_file = create_jar_file(ctx, final_output_directories) + + return [DefaultInfo(files = depset([jar_file]))] + +_thrift_java_library = rule( + implementation = _thrift_java_library_impl, + attrs = { + "srcs": attr.label_list( + allow_files = [".thrift"], + mandatory = True, + doc = "List of .thrift source files", + ), + "thrift_binary": attr.string(), + }, +) + +def thrift_java_library(name, srcs, **kwargs): + _thrift_java_library( + name = name, + srcs = srcs, + thrift_binary = select({ + "@platforms//os:linux": "/usr/local/bin/thrift", + "@platforms//os:macos": "/opt/homebrew/bin/thrift", + "//conditions:default": "/usr/local/bin/thrift", + }), + **kwargs + ) \ No newline at end of file diff --git a/tools/build_rules/utils.bzl b/tools/build_rules/utils.bzl new file mode 100644 index 0000000000..4dca5e3432 --- /dev/null +++ b/tools/build_rules/utils.bzl @@ -0,0 +1,29 @@ +def map(f, items): + return [f(x) for x in items] + +def _is_list(x): + return type(x) == "list" + +def flat_map(f, items): + result = [] + for x in items: + fx = f(x) + result.extend(fx) if _is_list(fx) else result.append(fx) + return result + +def identity(x): + return x + +def flatten(items, max_depth = 1): + """Flatten a list of items. + see utils_tests.bzl for examples + Args: + items: the list to flatten + max_depth: The maximum depth to flatten to + Returns: + a flattened list of items + """ + result = items + for i in range(max_depth): + result = flat_map(identity, result) + return result \ No newline at end of file