Skip to content
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
e06fe47
Local changes for testing out bazel setup
kumar-zlai Jan 23, 2025
6fb913b
Initial working version for flume job
kumar-zlai Jan 24, 2025
ef85a25
Merged with latest main branch
kumar-zlai Jan 24, 2025
619ba85
Both flink and spark jobs are working fine with Dataproc submit
kumar-zlai Jan 24, 2025
e541be0
reverting temporary local changes
kumar-zlai Jan 24, 2025
2d7f800
Minor change to add new line at end of file
kumar-zlai Jan 24, 2025
af6c6ad
To add logic for removing runtime dependencies needed for Flume jobs
kumar-zlai Jan 24, 2025
f6f4dbd
Minor changes to address PR comments
kumar-zlai Jan 27, 2025
be5d4a0
Partial commit fixing some tests. still few tests are failing
kumar-zlai Jan 27, 2025
ece3475
Merged with latest main
kumar-zlai Jan 27, 2025
112c4d8
Potential fix for tests but not ideal
kumar-zlai Jan 28, 2025
d9daa7c
Testing out resources for scala tests
kumar-zlai Jan 28, 2025
56eb85b
Partial working version
kumar-zlai Jan 29, 2025
cef6479
[*WIP*] Controller - Agent interface
nikhil-zlai Jan 30, 2025
d52edae
flink test errors
nikhil-zlai Jan 30, 2025
a68ce87
add jackson back to spark module
nikhil-zlai Jan 30, 2025
354e187
Partial commit to debug further
kumar-zlai Jan 30, 2025
fe2b6b0
minor fix to comment unwanted line
kumar-zlai Jan 30, 2025
da04f6d
Tests working partially only 1 failure
kumar-zlai Jan 30, 2025
3b36159
hub tests fix still not working
kumar-zlai Jan 31, 2025
52cb1d7
Merge branch 'main' into bazel-quick-start
kumar-zlai Jan 31, 2025
712590f
[*WIP*] Controller - Agent interface
nikhil-zlai Jan 30, 2025
0c9347a
flink test errors
nikhil-zlai Jan 30, 2025
27176c1
add jackson back to spark module
nikhil-zlai Jan 30, 2025
252e429
Resolved merge conflicts with nikhils branch
kumar-zlai Jan 31, 2025
fd7eb94
Rebasing latest changes
kumar-zlai Jan 30, 2025
f2885f1
Merged latest changes from the branch
kumar-zlai Jan 31, 2025
7746ece
Build working with test failures
kumar-zlai Jan 31, 2025
37ac482
changing list to seq to satisfy bazel gods
nikhil-zlai Jan 31, 2025
d47e270
test fixes
nikhil-zlai Jan 31, 2025
93b3ced
bazel fixes
nikhil-zlai Jan 31, 2025
8ad4c73
all tests passing
nikhil-zlai Jan 31, 2025
69dfce0
revert hive exec, or spark tests fail
nikhil-zlai Jan 31, 2025
22c11b0
Initial CI changes just for flink tests
kumar-zlai Jan 31, 2025
0dc715d
updated bazel download command for github docker image
kumar-zlai Jan 31, 2025
22408c7
changed spark join test to bazel
kumar-zlai Jan 31, 2025
58882eb
Did minor cleanup no semantic changes
kumar-zlai Feb 1, 2025
bf85d1d
Removed agent related code as we don't need them for this change
kumar-zlai Feb 1, 2025
389dc1a
Merged with latest main
kumar-zlai Feb 1, 2025
d8c8181
Addressed PR comments and ran scala fmt
kumar-zlai Feb 1, 2025
21d9a5a
Minor changes to address build failures in service_commons tests
kumar-zlai Feb 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/image/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ ENV PATH=$PATH:$JAVA_HOME/bin
RUN curl -L "https://github.com/sbt/sbt/releases/download/v1.8.2/sbt-1.8.2.tgz" | tar -xz -C /usr/local
ENV PATH="/usr/local/sbt/bin:${PATH}"

# bazel
RUN curl -fsSL "https://github.com/bazelbuild/bazelisk/releases/download/v1.18.0/bazelisk-linux-amd64" -o /usr/local/bin/bazel
ENV PATH="/usr/local/bin/bazel:${PATH}"

# thrift
ARG THRIFT_VERSION=0.21.0
RUN apk add --no-cache \
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/test_scala_no_spark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ jobs:

- name: Run Flink tests
run: |
export SBT_OPTS="-Xmx24G -Xms4G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
sbt "++ 2.12.18 flink/test"
# export SBT_OPTS="-Xmx24G -Xms4G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
# sbt "++ 2.12.18 flink/test"
bazel test //flink:test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix YAML syntax error.

Remove the empty line between the commented lines and the bazel command to maintain valid YAML syntax.

-#          export SBT_OPTS="-Xmx24G -Xms4G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
-#          sbt "++ 2.12.18 flink/test"
-
-          bazel test //flink:test
+#          export SBT_OPTS="-Xmx24G -Xms4G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
+#          sbt "++ 2.12.18 flink/test"
+          bazel test //flink:test
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# export SBT_OPTS="-Xmx24G -Xms4G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
# sbt "++ 2.12.18 flink/test"
bazel test //flink:test
# export SBT_OPTS="-Xmx24G -Xms4G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
# sbt "++ 2.12.18 flink/test"
bazel test //flink:test
🧰 Tools
🪛 YAMLlint (1.35.1)

[error] 53-53: syntax error: expected , but found ''

(syntax)


- name: Run Async KV Store writer test
run: |
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/test_scala_spark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ jobs:

- name: Run spark join tests
run: |
export SBT_OPTS="-Xmx8G -Xms2G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
sbt "spark/testOnly -- -n jointest"
# export SBT_OPTS="-Xmx8G -Xms2G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
# sbt "spark/testOnly -- -n jointest"
bazel test //spark:test_test_suite_src_test_scala_ai_chronon_spark_test_JoinUtilsTest.scala
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify the Bazel target path.

The Bazel target path is unnecessarily verbose. Consider using a more maintainable format.

-          bazel test //spark:test_test_suite_src_test_scala_ai_chronon_spark_test_JoinUtilsTest.scala
+          bazel test //spark/test/scala/ai/chronon/spark/test:JoinUtilsTest
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# export SBT_OPTS="-Xmx8G -Xms2G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
# sbt "spark/testOnly -- -n jointest"
bazel test //spark:test_test_suite_src_test_scala_ai_chronon_spark_test_JoinUtilsTest.scala
# export SBT_OPTS="-Xmx8G -Xms2G --add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
# sbt "spark/testOnly -- -n jointest"
bazel test //spark/test/scala/ai/chronon/spark/test:JoinUtilsTest
🧰 Tools
🪛 YAMLlint (1.35.1)

[error] 61-61: syntax error: expected , but found ''

(syntax)


table_utils_delta_format_spark_tests:
runs-on: ubuntu-latest
Expand Down
22 changes: 22 additions & 0 deletions .ijwb/.bazelproject
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
directories:
Copy link
Collaborator

@tchow-zlai tchow-zlai Feb 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(discussed on call) do we pull this up one level to avoid being overwritten by intellij?

# Add the directories you want added as source here
# By default, we've added your entire workspace ('.')
.

# Automatically includes all relevant targets under the 'directories' above
derive_targets_from_directories: true

targets:
# If source code isn't resolving, add additional targets that compile it here

additional_languages:
# Uncomment any additional languages you want supported
# android
# dart
# go
# javascript
# kotlin
python
scala
typescript
java
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package(default_visibility = ["//visibility:public"])
47 changes: 42 additions & 5 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,29 @@ http_archive(
"https://github.com/bazelbuild/rules_java/releases/download/7.0.6/rules_java-7.0.6.tar.gz",
],
)
load("@rules_java//java:repositories.bzl", "rules_java_dependencies", "rules_java_toolchains")

load("@rules_java//java:repositories.bzl", "remote_jdk17_repos", "rules_java_dependencies", "rules_java_toolchains")

rules_java_dependencies()

rules_java_toolchains()
load("@rules_java//java:repositories.bzl", "remote_jdk17_repos")

remote_jdk17_repos()

# For JVM support
http_archive(
name = "rules_jvm_external",
strip_prefix = "rules_jvm_external-6.6",
sha256 = "3afe5195069bd379373528899c03a3072f568d33bd96fe037bd43b1f590535e7",
url = "https://github.com/bazel-contrib/rules_jvm_external/releases/download/6.6/rules_jvm_external-6.6.tar.gz"
strip_prefix = "rules_jvm_external-6.6",
url = "https://github.com/bazel-contrib/rules_jvm_external/releases/download/6.6/rules_jvm_external-6.6.tar.gz",
)

load("@rules_jvm_external//:repositories.bzl", "rules_jvm_external_deps")

rules_jvm_external_deps()

load("@rules_jvm_external//:setup.bzl", "rules_jvm_external_setup")

rules_jvm_external_setup()

# For additional rulesets like java_test_suite
Expand All @@ -48,9 +55,13 @@ http_archive(
strip_prefix = "rules_jvm-0.24.0",
urls = ["https://github.com/bazel-contrib/rules_jvm/releases/download/v0.24.0/rules_jvm-v0.24.0.tar.gz"],
)

load("@contrib_rules_jvm//:repositories.bzl", "contrib_rules_jvm_deps")

contrib_rules_jvm_deps()

load("@contrib_rules_jvm//:setup.bzl", "contrib_rules_jvm_setup")

contrib_rules_jvm_setup()

# For Scala support
Expand All @@ -60,15 +71,37 @@ http_archive(
strip_prefix = "rules_scala-6.6.0",
url = "https://github.com/bazelbuild/rules_scala/releases/download/v6.6.0/rules_scala-v6.6.0.tar.gz",
)

# Initialize Scala with specific version support
load("@io_bazel_rules_scala//:scala_config.bzl", "scala_config")

scala_config(scala_version = SCALA_VERSION)

load("@io_bazel_rules_scala//scala:scala_maven_import_external.bzl", "scala_maven_import_external")

scala_maven_import_external(
name = "scala_compiler_source_2_12_18",
artifact = "org.scala-lang:scala-compiler:%s:sources" % SCALA_VERSION,
artifact_sha256 = "f79ee80f140218253f2a38c9d73f8a9b552d06afce7a5f61cf08079a388e21df",
licenses = ["notice"],
server_urls = [
"https://repo1.maven.org/maven2",
"https://mirror.bazel.build/repo1.maven.org/maven2",
],
)

load("@io_bazel_rules_scala//scala:scala.bzl", "scala_repositories")

scala_repositories()

load("@io_bazel_rules_scala//scala:toolchains.bzl", "scala_register_toolchains")

scala_register_toolchains()

load("@io_bazel_rules_scala//testing:scalatest.bzl", "scalatest_repositories", "scalatest_toolchain")

scalatest_repositories()

scalatest_toolchain()

# For Protobuf support
Expand All @@ -80,10 +113,14 @@ http_archive(
"https://github.com/bazelbuild/rules_proto/archive/refs/tags/5.3.0-21.7.tar.gz",
],
)

load("@rules_proto//proto:repositories.bzl", "rules_proto_dependencies", "rules_proto_toolchains")

rules_proto_dependencies()

rules_proto_toolchains()

# To load all dependencies used across our modules
load("//tools/build_rules/dependencies:load_dependencies.bzl", "load_all_dependencies")
load_all_dependencies()

load_all_dependencies()
62 changes: 31 additions & 31 deletions aggregator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,45 @@ scala_library(
],
)

test_deps = [
":lib",
"//api:lib",
"//api:thrift",
# Libraries
maven_artifact("org.slf4j:slf4j-api"),
maven_artifact("org.apache.logging.log4j:log4j-slf4j-impl"),
maven_artifact("com.google.code.gson:gson"),
maven_artifact("org.apache.datasketches:datasketches-memory"),
maven_artifact("org.apache.datasketches:datasketches-java"),
maven_artifact("org.apache.commons:commons-lang3"),
maven_artifact("org.apache.commons:commons-math3"),
maven_artifact("org.scala-lang.modules:scala-collection-compat_2.12"),
# Testing
scala_artifact("org.scalatest:scalatest-matchers-core"),
scala_artifact("org.scalatest:scalatest-core"),
scala_artifact("org.scalatest:scalatest"),
scala_artifact("org.scalatest:scalatest-flatspec"),
scala_artifact("org.scalatest:scalatest-funsuite"),
scala_artifact("org.scalatest:scalatest-shouldmatchers"),
scala_artifact("org.scalactic:scalactic"),
scala_artifact("org.scalatestplus:mockito-3-4"),
scala_artifact("org.mockito:mockito-scala"),
maven_artifact("org.mockito:mockito-core"),
maven_artifact("org.scalatest:scalatest-compatible"),
maven_artifact("junit:junit"),
maven_artifact("com.novocode:junit-interface"),
]

scala_library(
name = "test-lib",
srcs = glob(["src/test/**/*.scala"]),
visibility = ["//visibility:public"],
deps = [
":lib",
"//api:lib",
"//api:thrift",
maven_artifact("junit:junit"),
maven_artifact("com.novocode:junit-interface"),
maven_artifact("org.slf4j:slf4j-api"),
maven_artifact("org.apache.logging.log4j:log4j-slf4j-impl"),
maven_artifact("com.google.code.gson:gson"),
maven_artifact("org.apache.datasketches:datasketches-memory"),
maven_artifact("org.apache.datasketches:datasketches-java"),
maven_artifact("org.apache.commons:commons-lang3"),
maven_artifact("org.apache.commons:commons-math3"),
maven_artifact("org.scala-lang.modules:scala-collection-compat_2.12"),
],
deps = test_deps,
)

scala_test_suite(
name = "test",
srcs = glob(["src/test/**/*.scala"]),
visibility = ["//visibility:public"],
deps = [
":lib",
":test-lib",
"//api:lib",
"//api:thrift",
maven_artifact("junit:junit"),
maven_artifact("com.novocode:junit-interface"),
maven_artifact("org.slf4j:slf4j-api"),
maven_artifact("org.apache.logging.log4j:log4j-slf4j-impl"),
maven_artifact("com.google.code.gson:gson"),
maven_artifact("org.apache.datasketches:datasketches-memory"),
maven_artifact("org.apache.datasketches:datasketches-java"),
maven_artifact("org.apache.commons:commons-lang3"),
maven_artifact("org.apache.commons:commons-math3"),
maven_artifact("org.scala-lang.modules:scala-collection-compat_2.12"),
],
deps = test_deps + [":test-lib"],
)
4 changes: 2 additions & 2 deletions api/src/test/scala/ai/chronon/api/test/ExtensionsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class ExtensionsTest extends AnyFlatSpec {
}

it should "group by keys should contain partition column" in {
val groupBy = spy(new GroupBy())
val groupBy = spy[GroupBy](new GroupBy())
val baseKeys = List("a", "b")
val partitionColumn = "ds"
groupBy.accuracy = Accuracy.SNAPSHOT
Expand All @@ -127,7 +127,7 @@ class ExtensionsTest extends AnyFlatSpec {
}

it should "group by keys should contain time column for temporal accuracy" in {
val groupBy = spy(new GroupBy())
val groupBy = spy[GroupBy](new GroupBy())
val baseKeys = List("a", "b")
val partitionColumn = "ds"
groupBy.accuracy = Accuracy.TEMPORAL
Expand Down
136 changes: 136 additions & 0 deletions api/thrift/agent.thrift
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
namespace java ai.chronon.api


struct YarnAutoScalingSpec {
1: optional i32 minInstances
2: optional i32 maxInstances
3: optional i32 scaleUpFactor // 1.5x, 2x etc
4: optional i32 scaleDownFactor
5: optional string cooldownPeriod
}

// our clusters are created transiently prior to running the job
struct YarnClusterSpec {
1: optional string clusterName
2: optional string hostType
3: optional i32 hostCount

// dataproc = x.y.z, emr = x.y.z, etc
10: optional string yarnOfferingVersion

// to access the right data and right back to kvstore
20: optional string networkPolicy
30: optional YarnAutoScalingSpec autoScalingSpec
}

enum YarnJobType {
SPARK = 0,
FLINK = 1,
}

struct YarnJob {
// create transient cluster with this name and runs an app with the same yarn name
1: optional string appName
2: optional YarnJobType jobType

10: optional list<string> args
11: optional map<string, string> env
12: optional map<string, string> conf
// creates local file with this name and contents - relative to cwd
// contains the groupBy, join, queries etc
13: optional map<string, string> fileWithContents

20: optional string chrononVersion
21: optional YarnClusterSpec clusterSpec
}

struct KvWrite {
1: optional string key
2: optional string value
3: optional string timestamp
}

// currently used for writing join metadata to kvstore needed prior to fetching joins
struct KvWriteJob {
1: optional string scope // projectId in gcp, account name in aws
2: optional string dataset
3: optional string table
4: optional list<KvWrite> writes
}

struct PartitionListingJob {
1: optional string scope // projectId in gcp, account name in aws
2: optional string dataset
3: optional string table
4: optional string partitionColumn
5: optional list<string> extraPartitionFilters
}

// agent accepts jobs and runs them
union JobBase {
1: YarnJob yarnJob
2: KvWriteJob kvWriteJob
3: PartitionListingJob partitionListingJob
}

struct Job {
1: optional string jobId
2: optional JobBase jobUnion
3: optional i32 statusReportInterval
4: optional i32 maxRetries
}

struct JobListGetRequest {
// this is only sent on the first request after a start
1: optional list<string> existingJobsIds
}

struct JobListResponse {
// controller responds with jobs data plane agent is not aware of
1: optional list<Job> jobsToStart
2: optional list<string> jobsToStop
}

enum JobStatusType {
PENDING = 0,
RUNNING = 1,
SUCCEEDED = 2,
FAILED = 3,
STOPPED = 4
}

struct ResourceUsage {
1: optional i64 vcoreSeconds
2: optional i64 megaByteSeconds
3: optional i64 cumulativeDiskWriteBytes
4: optional i64 cumulativeDiskReadBytes
}

struct YarnIncrementalJobStatus {
// batch / streaming job
1: optional map<JobStatusType, i64> statusChangeTimes
2: optional ResourceUsage resourceUsage
// driver logs - probably only errors and exceptions
3: optional list<string> logsSinceLastPush
}

struct JobInfo {
1: optional string jobId
2: optional JobStatusType currentStatus

10: optional YarnIncrementalJobStatus yarnIcrementalStatus
}

struct DatePartitionRange {
1: optional string start
2: optional string end
}

struct PartitionListingPutRequest {
1: optional map<PartitionListingJob, list<DatePartitionRange>> partitions
2: optional map<PartitionListingJob, string> errors
}

struct JobInfoPutRequest {
1: optional list<JobInfo> jobStatuses
}
Loading