Skip to content

Conversation

@david-zlai
Copy link
Contributor

@david-zlai david-zlai commented Jan 7, 2025

Summary

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:444)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2733)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$mapValues$1(PairRDDFunctions.scala:750)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.mapValues(PairRDDFunctions.scala:749)
	at ai.chronon.spark.GroupBy.hopsAggregate(GroupBy.scala:391)
	at ai.chronon.spark.GroupBy.snapshotEventsBase(GroupBy.scala:166)
	at ai.chronon.spark.GroupByUpload.snapshotEvents(GroupByUpload.scala:84)
	at ai.chronon.spark.GroupByUpload$.run(GroupByUpload.scala:234)
	at ai.chronon.spark.Driver$GroupByUploader$.run(Driver.scala:520)
	at ai.chronon.spark.Driver$.main(Driver.scala:974)
	at ai.chronon.spark.Driver.main(Driver.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1032)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1124)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1133)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.google.cloud.bigquery.BigQueryImpl
Serialization stack:
	- object not serializable (class: com.google.cloud.bigquery.BigQueryImpl, value: com.google.cloud.bigquery.BigQueryImpl@47354906)
	- field (class: ai.chronon.integrations.cloud_gcp.GcpFormatProvider, name: bigQueryClient, type: interface com.google.cloud.bigquery.BigQuery)
	- object (class ai.chronon.integrations.cloud_gcp.GcpFormatProvider, GcpFormatProvider(org.apache.spark.sql.SparkSession@1b58b5f9))
	- field (class: ai.chronon.spark.TableUtils, name: tableFormatProvider, type: interface ai.chronon.spark.FormatProvider)
	- object (class ai.chronon.spark.TableUtils, ai.chronon.spark.TableUtils@a2ee97c)
	- element of array (index: 4)
	- array (class [Ljava.lang.Object;, size 8)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class ai.chronon.spark.GroupBy$, functionalInterfaceMethod=scala/Function0.apply:()Ljava/lang/Object;, implementation=invokeStatic ai/chronon/spark/GroupBy$.$anonfun$from$21:(Lai/chronon/spark/GroupBy$;Lai/chronon/api/GroupBy;Lscala/collection/immutable/List;Lai/chronon/online/PartitionRange;Lai/chronon/spark/TableUtils;[Ljava/lang/String;Lscala/Option;Z)Lorg/apache/spark/sql/Dataset;, instantiatedMethodType=()Lorg/apache/spark/sql/Dataset;, numCaptured=8])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class ai.chronon.spark.GroupBy$$$Lambda$3944/0x00007f641a860440, ai.chronon.spark.GroupBy$$$Lambda$3944/0x00007f641a860440@76968f03)
	- field (class: ai.chronon.spark.GroupBy, name: mutationDfFn, type: interface scala.Function0)
	- object (class ai.chronon.spark.GroupBy, ai.chronon.spark.GroupBy@1e5cded6)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class ai.chronon.spark.GroupBy, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic ai/chronon/spark/GroupBy.$anonfun$hopsAggregate$1:(Lai/chronon/spark/GroupBy;Lorg/apache/spark/sql/Row;)Lai/chronon/online/RowWrapper;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Lai/chronon/online/RowWrapper;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class ai.chronon.spark.GroupBy$$Lambda$4090/0x00007f641a44f040, ai.chronon.spark.GroupBy$$Lambda$4090/0x00007f641a44f040@d556143)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441)
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near 'BIGQUERY': extra input 'BIGQUERY'.(line 7, pos 2)

== SQL ==
CREATE TABLE data.quickstart_purchases_v1_upload (
    `key_bytes` binary,
    `value_bytes` binary,
    `key_json` string,
    `value_json` string,
    `ds` string
) BIGQUERY 
--^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(parsers.scala:257)
	at org.apache.spark.sql.catalyst.parser.AbstractParser.parse(parsers.scala:98)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:54)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(AbstractSqlParser.scala:68)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$5(SparkSession.scala:684)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:683)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
	at ai.chronon.spark.TableUtils.sql(TableUtils.scala:343)
	at ai.chronon.spark.TableUtils.insertUnPartitioned(TableUtils.scala:363)

Checklist

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested
  • Documentation update

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features

    • Introduced a new method for creating tables, simplifying the process and adding checks for existing tables.
  • Improvements

    • Updated existing methods to utilize the new table creation logic, enhancing modularity and efficiency.
  • Tests

    • Added new test methods to verify table creation functionality and behavior with "BIGQUERY" format.

… during reflection and that can't be serialized.
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 7, 2025

Walkthrough

The pull request introduces changes to the TableUtils.scala file in the Spark module. A new method, createTable, is added to centralize the logic for table creation based on a DataFrame and various parameters. The existing methods insertPartitions and insertUnPartitioned have been refactored to utilize this new method, removing inline checks for table existence. Additionally, several new test methods have been added to TableUtilsTest to validate the functionality of the new table creation logic.

Changes

File Change Summary
spark/src/main/scala/ai/chronon/spark/TableUtils.scala - Added method createTable for centralized table creation logic.
- Refactored insertPartitions and insertUnPartitioned to use createTable.
spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala - Added test methods testInsertPartitionsTableExistsAlready, testCreateTableAlreadyExists, and testCreateTableBigQuery to validate table creation logic.
- Updated import statement for assertFalse.

Possibly related PRs

Suggested reviewers

  • piyush-zlai
  • nikhil-zlai
  • varant-zlai

Poem

🌟 In the realm of Spark, changes unfold,
A method to create, with logic bold.
Refactored with care, tests now in place,
A streamlined approach, a cleaner space.
Spark shines brighter, in every trace! 🚀

Warning

Review ran into problems

🔥 Problems

GitHub Actions: Resource not accessible by integration - https://docs.github.com/rest/actions/workflow-runs#list-workflow-runs-for-a-repository.

Please grant the required permissions to the CodeRabbit GitHub App under the organization or repository settings.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 7958562 and 3fd9e08.

📒 Files selected for processing (2)
  • spark/src/main/scala/ai/chronon/spark/TableUtils.scala (4 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: fetcher_spark_tests
  • GitHub Check: join_spark_tests
  • GitHub Check: other_spark_tests
  • GitHub Check: mutation_spark_tests
🔇 Additional comments (9)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (5)

91-93: LGTM! Good use of @transient annotation.

The annotation prevents serialization issues with non-serializable format providers like BigQueryImpl.


277-281: LGTM! Good handling of BigQuery tables.

Skips explicit table creation and logs the decision clearly.


Line range hint 326-344: LGTM! Good integration with createTable.

The column selection logic correctly handles both new and existing tables.


397-402: LGTM! Consistent with insertPartitions.

The createTable integration matches the pattern used in insertPartitions.


296-300: Track the TODO for BigQuery table properties.

The TODO indicates that BigQuery tables need support for persisting table properties/tags.

Run this script to check if there are any related issues:

✅ Verification successful

TODO is valid and tracked in Asana

The TODO for BigQuery table properties persistence is unique and tracked at: https://app.asana.com/0/1208949807589885/1209111629687568/f

While BigQuery property reading is implemented in BigQueryFormat.scala, persistence is not yet supported.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if there's already an issue for BigQuery table properties
gh issue list --search "bigquery table properties"

Length of output: 52


Script:

#!/bin/bash
# Search for BigQuery-related code and properties
rg -i "bigquery.*properties" -g '*.scala' -g '*.java'

# Look for similar TODOs about BigQuery tables
rg -i "todo.*bigquery.*table" -g '*.scala' -g '*.java'

Length of output: 622


Script:

#!/bin/bash
# Get context around the BigQuery property usage
rg -A 5 -B 5 "federation\.bigquery\.table\.type" cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryFormat.scala

Length of output: 725

spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala (4)

435-473: LGTM! Good test coverage for existing table scenario.

Test verifies that insertPartitions works correctly with pre-existing tables.


475-505: LGTM! Good test for table existence handling.

Test properly verifies createTable behavior and includes cleanup.


507-532: Add data verification after table creation.

Add schema and data verification after table creation, as suggested in previous reviews.


534-557: LGTM! Good test for BigQuery behavior.

Test verifies that createTable skips table creation for BigQuery format.


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)

284-287: Extract duplicated BigQuery check

The BigQuery format check is duplicated. Consider extracting it to a method:

+  private def shouldCreateTable(writeFormat: Format): Boolean = {
+    writeFormat.createTableTypeString.toUpperCase() != "BIGQUERY"
+  }
+
-    if (!tableExists(tableName) && writeFormat.createTableTypeString.toUpperCase() != "BIGQUERY") {
+    if (!tableExists(tableName) && shouldCreateTable(writeFormat)) {

Also applies to: 370-371

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between a6e18f3 and d9f0834.

📒 Files selected for processing (1)
  • spark/src/main/scala/ai/chronon/spark/TableUtils.scala (4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: table_utils_delta_format_spark_tests
  • GitHub Check: fetcher_spark_tests
  • GitHub Check: other_spark_tests
  • GitHub Check: join_spark_tests
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: mutation_spark_tests
🔇 Additional comments (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)

91-91: LGTM! Good addition of @transient

The annotation prevents serialization issues with BigQueryImpl during reflection.

Comment on lines 186 to 188
val result = format.primaryPartitions(tableName, partitionColumn, subPartitionsFilter)(sparkSession)
print("result partitions: " + result)
result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use logger instead of print

Replace print with proper logging:

-    print("result partitions: " + result)
+    logger.info(s"Retrieved partitions: ${result.mkString(", ")}")

Committable suggestion skipped: line range outside the PR's diff.


// for saving to bigquery, the creation sql doesn't work here. But we can rely on the bq connector to
// create the table for us
if (!tableExists(tableName) && writeFormat.createTableTypeString.toUpperCase() != "BIGQUERY") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you wrap this around the alter table portion as well? Longer term I think we'll need to rethink the layering here but this is probably fine for now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might also be necessary for the if branch further below:

 if (autoExpand) {
      expandTable(tableName, dfRearranged.schema)
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

included both into createTable

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between aac350a and d770011.

📒 Files selected for processing (2)
  • spark/src/main/scala/ai/chronon/spark/TableUtils.scala (4 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • spark/src/main/scala/ai/chronon/spark/TableUtils.scala
⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: table_utils_delta_format_spark_tests
  • GitHub Check: other_spark_tests
  • GitHub Check: mutation_spark_tests
  • GitHub Check: fetcher_spark_tests
  • GitHub Check: join_spark_tests
  • GitHub Check: scala_compile_fmt_fix
🔇 Additional comments (2)
spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala (2)

34-34: LGTM!

Clean consolidation of assertion imports.


457-479: 🛠️ Refactor suggestion

Verify BigQuery integration and add cleanup.

  1. Add table cleanup in a finally block
  2. Consider mocking BigQuery for proper integration testing

Let's verify if this is an integration test:

Comment on lines 433 to 455
@Test
def testCreateTable(): Unit = {
val tableName = "db.test_create_table"
spark.sql("CREATE DATABASE IF NOT EXISTS db")

val columns = Array(
StructField("long_field", LongType),
StructField("int_field", IntType),
StructField("string_field", StringType)
)
val df = makeDf(
spark,
StructType(
tableName,
columns
),
List(
Row(1L, 2, "3")
)
)
tableUtils.createTable(df, tableName)
assertTrue(spark.catalog.tableExists(tableName))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add table cleanup and data verification.

The test should:

  1. Clean up the table after the test
  2. Verify the schema and data, not just table existence
 @Test
 def testCreateTable(): Unit = {
   val tableName = "db.test_create_table"
   spark.sql("CREATE DATABASE IF NOT EXISTS db")
+  try {
     val columns = Array(
       StructField("long_field", LongType),
       StructField("int_field", IntType),
       StructField("string_field", StringType)
     )
     val df = makeDf(
       spark,
       StructType(
         tableName,
         columns
       ),
       List(
         Row(1L, 2, "3")
       )
     )
     tableUtils.createTable(df, tableName)
     assertTrue(spark.catalog.tableExists(tableName))
+    val createdTable = spark.table(tableName)
+    assertEquals(df.schema, createdTable.schema)
+    assertEquals(df.collect().toSeq, createdTable.collect().toSeq)
+  } finally {
+    spark.sql(s"DROP TABLE IF EXISTS $tableName")
+  }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@Test
def testCreateTable(): Unit = {
val tableName = "db.test_create_table"
spark.sql("CREATE DATABASE IF NOT EXISTS db")
val columns = Array(
StructField("long_field", LongType),
StructField("int_field", IntType),
StructField("string_field", StringType)
)
val df = makeDf(
spark,
StructType(
tableName,
columns
),
List(
Row(1L, 2, "3")
)
)
tableUtils.createTable(df, tableName)
assertTrue(spark.catalog.tableExists(tableName))
}
@Test
def testCreateTable(): Unit = {
val tableName = "db.test_create_table"
spark.sql("CREATE DATABASE IF NOT EXISTS db")
try {
val columns = Array(
StructField("long_field", LongType),
StructField("int_field", IntType),
StructField("string_field", StringType)
)
val df = makeDf(
spark,
StructType(
tableName,
columns
),
List(
Row(1L, 2, "3")
)
)
tableUtils.createTable(df, tableName)
assertTrue(spark.catalog.tableExists(tableName))
val createdTable = spark.table(tableName)
assertEquals(df.schema, createdTable.schema)
assertEquals(df.collect().toSeq, createdTable.collect().toSeq)
} finally {
spark.sql(s"DROP TABLE IF EXISTS $tableName")
}
}

@david-zlai david-zlai changed the title Add transient to TableFormatProvider because it contains BigQueryImpl during reflection and that can't be serialized. for bigquery table spark jobs, add transient to unblock serialization issues and do not create table when attempting to insert Jan 7, 2025
fileFormat,
autoExpand)

val finalizedDf = if (autoExpand) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the chain of logic here doesn't match the original then - in this if branch we are trying to retrieve the schema from the table, but it's possible it doesn't yet exist, since autoExpand may be set true for a BigQuery table. getSchemaFromTable will probably throw an exception if the table doesn't exist. So we probably need to pull this df logic into createTable as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird this piece of code is only in insertPartitions but not at all in insertUnPartitioned 🙃

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh...insertUnpartitioned doesn't even have autoExpand in it

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (1)

Line range hint 280-289: Consider handling race condition in table creation.

The check-then-create pattern could lead to race conditions in concurrent scenarios.

-    if (!tableExists(tableName)) {
-      val creationSql = createTableSql(tableName, df.schema, partitionColumns, tableProperties, fileFormat)
-      try {
-        sql(creationSql)
-      } catch {
-        case _: TableAlreadyExistsException =>
-          logger.info(s"Table $tableName already exists, skipping creation")
-        case e: Exception =>
-          logger.error(s"Failed to create table $tableName", e)
-          throw e
-      }
+    val creationSql = createTableSql(tableName, df.schema, partitionColumns, tableProperties, fileFormat)
+    try {
+      sql(creationSql)
+    } catch {
+      case _: TableAlreadyExistsException =>
+        logger.info(s"Table $tableName already exists, skipping creation")
+      case e: Exception =>
+        logger.error(s"Failed to create table $tableName", e)
+        throw e
+    }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 9a33536 and 945c635.

📒 Files selected for processing (2)
  • spark/src/main/scala/ai/chronon/spark/TableUtils.scala (4 hunks)
  • spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: scala_compile_fmt_fix
  • GitHub Check: fetcher_spark_tests
  • GitHub Check: join_spark_tests
🔇 Additional comments (4)
spark/src/main/scala/ai/chronon/spark/TableUtils.scala (4)

91-93: LGTM! Good addition of transient keyword.

The transient keyword prevents serialization issues with BigQueryImpl during reflection.


273-276: LGTM! Good BigQuery handling.

Correctly prevents explicit table creation for BigQuery, relying on the connector instead.


301-326: LGTM! Good refactoring.

Clean integration with createTable method and proper handling of column rearrangement.


390-396: LGTM! Clean integration.

Properly integrated with createTable method.

@david-zlai david-zlai requested a review from tchow-zlai January 7, 2025 20:53
Copy link
Collaborator

@tchow-zlai tchow-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for applying the changes!

}
}
if (tableProperties != null && tableProperties.nonEmpty) {
sql(alterTablePropertiesSql(tableName, tableProperties))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we actually want to persist some of these table properties though. The behavior here isn't quite equivalent for bigquery in that any custom properties we pass through here ultimately don't make it to the bigquery table. That might be problematic down the line.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@david-zlai mind just adding a TODO here so we can take care of it for BigQuery?

@tchow-zlai tchow-zlai requested a review from piyush-zlai January 8, 2025 02:56
sparkSession.conf.get("spark.chronon.table_write.cache.blocking", "false").toBoolean

private[spark] lazy val tableFormatProvider: FormatProvider = {
// Add transient here because it can contain BigQueryImpl during reflection with bq flavor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe swap the 'BigqueryImpl' with the generic phrase - concrete format provider which might not be serializable? (As this isn't restricted to BQ)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing

fileFormat,
autoExpand)

val finalizedDf = if (autoExpand && isTableCreated) {
Copy link
Collaborator

@tchow-zlai tchow-zlai Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@david-zlai will this be correct if the table already exists and therefore does not need to be created? I think looking at the code yes, but could you add a unit test to verify that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a test

Copy link
Collaborator

@tchow-zlai tchow-zlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@david-zlai david-zlai merged commit 38fe624 into main Jan 8, 2025
9 checks passed
@david-zlai david-zlai deleted the davidhan/fix_transient branch January 8, 2025 21:14
tchow-zlai pushed a commit that referenced this pull request Jan 9, 2025
… issues and do not create table when attempting to insert (#174)

## Summary

- Add transient to TableFormatProvider because it contains BigQueryImpl
during reflection and that can't be serialized.

https://console.cloud.google.com/dataproc/jobs/1cf413cc-0430-48f3-9b6c-793df2e6331e/monitoring?region=us-central1&inv=1&invt=AbmOUg&project=canary-443022
```
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:444)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2733)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$mapValues$1(PairRDDFunctions.scala:750)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.mapValues(PairRDDFunctions.scala:749)
	at ai.chronon.spark.GroupBy.hopsAggregate(GroupBy.scala:391)
	at ai.chronon.spark.GroupBy.snapshotEventsBase(GroupBy.scala:166)
	at ai.chronon.spark.GroupByUpload.snapshotEvents(GroupByUpload.scala:84)
	at ai.chronon.spark.GroupByUpload$.run(GroupByUpload.scala:234)
	at ai.chronon.spark.Driver$GroupByUploader$.run(Driver.scala:520)
	at ai.chronon.spark.Driver$.main(Driver.scala:974)
	at ai.chronon.spark.Driver.main(Driver.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1032)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1124)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1133)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.google.cloud.bigquery.BigQueryImpl
Serialization stack:
	- object not serializable (class: com.google.cloud.bigquery.BigQueryImpl, value: com.google.cloud.bigquery.BigQueryImpl@47354906)
	- field (class: ai.chronon.integrations.cloud_gcp.GcpFormatProvider, name: bigQueryClient, type: interface com.google.cloud.bigquery.BigQuery)
	- object (class ai.chronon.integrations.cloud_gcp.GcpFormatProvider, GcpFormatProvider(org.apache.spark.sql.SparkSession@1b58b5f9))
	- field (class: ai.chronon.spark.TableUtils, name: tableFormatProvider, type: interface ai.chronon.spark.FormatProvider)
	- object (class ai.chronon.spark.TableUtils, ai.chronon.spark.TableUtils@a2ee97c)
	- element of array (index: 4)
	- array (class [Ljava.lang.Object;, size 8)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class ai.chronon.spark.GroupBy$, functionalInterfaceMethod=scala/Function0.apply:()Ljava/lang/Object;, implementation=invokeStatic ai/chronon/spark/GroupBy$.$anonfun$from$21:(Lai/chronon/spark/GroupBy$;Lai/chronon/api/GroupBy;Lscala/collection/immutable/List;Lai/chronon/online/PartitionRange;Lai/chronon/spark/TableUtils;[Ljava/lang/String;Lscala/Option;Z)Lorg/apache/spark/sql/Dataset;, instantiatedMethodType=()Lorg/apache/spark/sql/Dataset;, numCaptured=8])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class ai.chronon.spark.GroupBy$$$Lambda$3944/0x00007f641a860440, ai.chronon.spark.GroupBy$$$Lambda$3944/0x00007f641a860440@76968f03)
	- field (class: ai.chronon.spark.GroupBy, name: mutationDfFn, type: interface scala.Function0)
	- object (class ai.chronon.spark.GroupBy, ai.chronon.spark.GroupBy@1e5cded6)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class ai.chronon.spark.GroupBy, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic ai/chronon/spark/GroupBy.$anonfun$hopsAggregate$1:(Lai/chronon/spark/GroupBy;Lorg/apache/spark/sql/Row;)Lai/chronon/online/RowWrapper;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Lai/chronon/online/RowWrapper;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class ai.chronon.spark.GroupBy$$Lambda$4090/0x00007f641a44f040, ai.chronon.spark.GroupBy$$Lambda$4090/0x00007f641a44f040@d556143)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441)
```
- Do not create table when attempting to insert to a bigquery table. we
can rely on bq connector to create the table during indirect writes:
https://github.com/GoogleCloudDataproc/spark-bigquery-connector?tab=readme-ov-file#indirect-write

https://console.cloud.google.com/dataproc/jobs/job-a9239c5a/monitoring?region=us-central1&inv=1&invt=AbmOUg&project=canary-443022
```
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near 'BIGQUERY': extra input 'BIGQUERY'.(line 7, pos 2)

== SQL ==
CREATE TABLE data.quickstart_purchases_v1_upload (
    `key_bytes` binary,
    `value_bytes` binary,
    `key_json` string,
    `value_json` string,
    `ds` string
) BIGQUERY 
--^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(parsers.scala:257)
	at org.apache.spark.sql.catalyst.parser.AbstractParser.parse(parsers.scala:98)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:54)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(AbstractSqlParser.scala:68)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$5(SparkSession.scala:684)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:683)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
	at ai.chronon.spark.TableUtils.sql(TableUtils.scala:343)
	at ai.chronon.spark.TableUtils.insertUnPartitioned(TableUtils.scala:363)
```

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Introduced a new method for creating tables, simplifying the process
and adding checks for existing tables.

- **Improvements**
- Updated existing methods to utilize the new table creation logic,
enhancing modularity and efficiency.

- **Tests**
- Added new test methods to verify table creation functionality and
behavior with "BIGQUERY" format.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1209110958473696
Co-authored-by: Thomas Chow <[email protected]>
kumar-zlai pushed a commit that referenced this pull request Apr 25, 2025
… issues and do not create table when attempting to insert (#174)

## Summary

- Add transient to TableFormatProvider because it contains BigQueryImpl
during reflection and that can't be serialized.

https://console.cloud.google.com/dataproc/jobs/1cf413cc-0430-48f3-9b6c-793df2e6331e/monitoring?region=us-central1&inv=1&invt=AbmOUg&project=canary-443022
```
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:444)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2733)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$mapValues$1(PairRDDFunctions.scala:750)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.mapValues(PairRDDFunctions.scala:749)
	at ai.chronon.spark.GroupBy.hopsAggregate(GroupBy.scala:391)
	at ai.chronon.spark.GroupBy.snapshotEventsBase(GroupBy.scala:166)
	at ai.chronon.spark.GroupByUpload.snapshotEvents(GroupByUpload.scala:84)
	at ai.chronon.spark.GroupByUpload$.run(GroupByUpload.scala:234)
	at ai.chronon.spark.Driver$GroupByUploader$.run(Driver.scala:520)
	at ai.chronon.spark.Driver$.main(Driver.scala:974)
	at ai.chronon.spark.Driver.main(Driver.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1032)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1124)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1133)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.google.cloud.bigquery.BigQueryImpl
Serialization stack:
	- object not serializable (class: com.google.cloud.bigquery.BigQueryImpl, value: com.google.cloud.bigquery.BigQueryImpl@47354906)
	- field (class: ai.chronon.integrations.cloud_gcp.GcpFormatProvider, name: bigQueryClient, type: interface com.google.cloud.bigquery.BigQuery)
	- object (class ai.chronon.integrations.cloud_gcp.GcpFormatProvider, GcpFormatProvider(org.apache.spark.sql.SparkSession@1b58b5f9))
	- field (class: ai.chronon.spark.TableUtils, name: tableFormatProvider, type: interface ai.chronon.spark.FormatProvider)
	- object (class ai.chronon.spark.TableUtils, ai.chronon.spark.TableUtils@a2ee97c)
	- element of array (index: 4)
	- array (class [Ljava.lang.Object;, size 8)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class ai.chronon.spark.GroupBy$, functionalInterfaceMethod=scala/Function0.apply:()Ljava/lang/Object;, implementation=invokeStatic ai/chronon/spark/GroupBy$.$anonfun$from$21:(Lai/chronon/spark/GroupBy$;Lai/chronon/api/GroupBy;Lscala/collection/immutable/List;Lai/chronon/online/PartitionRange;Lai/chronon/spark/TableUtils;[Ljava/lang/String;Lscala/Option;Z)Lorg/apache/spark/sql/Dataset;, instantiatedMethodType=()Lorg/apache/spark/sql/Dataset;, numCaptured=8])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class ai.chronon.spark.GroupBy$$$Lambda$3944/0x00007f641a860440, ai.chronon.spark.GroupBy$$$Lambda$3944/0x00007f641a860440@76968f03)
	- field (class: ai.chronon.spark.GroupBy, name: mutationDfFn, type: interface scala.Function0)
	- object (class ai.chronon.spark.GroupBy, ai.chronon.spark.GroupBy@1e5cded6)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class ai.chronon.spark.GroupBy, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic ai/chronon/spark/GroupBy.$anonfun$hopsAggregate$1:(Lai/chronon/spark/GroupBy;Lorg/apache/spark/sql/Row;)Lai/chronon/online/RowWrapper;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Lai/chronon/online/RowWrapper;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class ai.chronon.spark.GroupBy$$Lambda$4090/0x00007f641a44f040, ai.chronon.spark.GroupBy$$Lambda$4090/0x00007f641a44f040@d556143)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441)
```
- Do not create table when attempting to insert to a bigquery table. we
can rely on bq connector to create the table during indirect writes:
https://github.com/GoogleCloudDataproc/spark-bigquery-connector?tab=readme-ov-file#indirect-write

https://console.cloud.google.com/dataproc/jobs/job-a9239c5a/monitoring?region=us-central1&inv=1&invt=AbmOUg&project=canary-443022
```
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near 'BIGQUERY': extra input 'BIGQUERY'.(line 7, pos 2)

== SQL ==
CREATE TABLE data.quickstart_purchases_v1_upload (
    `key_bytes` binary,
    `value_bytes` binary,
    `key_json` string,
    `value_json` string,
    `ds` string
) BIGQUERY 
--^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(parsers.scala:257)
	at org.apache.spark.sql.catalyst.parser.AbstractParser.parse(parsers.scala:98)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:54)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(AbstractSqlParser.scala:68)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$5(SparkSession.scala:684)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:683)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
	at ai.chronon.spark.TableUtils.sql(TableUtils.scala:343)
	at ai.chronon.spark.TableUtils.insertUnPartitioned(TableUtils.scala:363)
```

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Introduced a new method for creating tables, simplifying the process
and adding checks for existing tables.

- **Improvements**
- Updated existing methods to utilize the new table creation logic,
enhancing modularity and efficiency.

- **Tests**
- Added new test methods to verify table creation functionality and
behavior with "BIGQUERY" format.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1209110958473696
kumar-zlai pushed a commit that referenced this pull request Apr 29, 2025
… issues and do not create table when attempting to insert (#174)

## Summary

- Add transient to TableFormatProvider because it contains BigQueryImpl
during reflection and that can't be serialized.

https://console.cloud.google.com/dataproc/jobs/1cf413cc-0430-48f3-9b6c-793df2e6331e/monitoring?region=us-central1&inv=1&invt=AbmOUg&project=canary-443022
```
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:444)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2733)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$mapValues$1(PairRDDFunctions.scala:750)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.mapValues(PairRDDFunctions.scala:749)
	at ai.chronon.spark.GroupBy.hopsAggregate(GroupBy.scala:391)
	at ai.chronon.spark.GroupBy.snapshotEventsBase(GroupBy.scala:166)
	at ai.chronon.spark.GroupByUpload.snapshotEvents(GroupByUpload.scala:84)
	at ai.chronon.spark.GroupByUpload$.run(GroupByUpload.scala:234)
	at ai.chronon.spark.Driver$GroupByUploader$.run(Driver.scala:520)
	at ai.chronon.spark.Driver$.main(Driver.scala:974)
	at ai.chronon.spark.Driver.main(Driver.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1032)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1124)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1133)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.google.cloud.bigquery.BigQueryImpl
Serialization stack:
	- object not serializable (class: com.google.cloud.bigquery.BigQueryImpl, value: com.google.cloud.bigquery.BigQueryImpl@47354906)
	- field (class: ai.chronon.integrations.cloud_gcp.GcpFormatProvider, name: bigQueryClient, type: interface com.google.cloud.bigquery.BigQuery)
	- object (class ai.chronon.integrations.cloud_gcp.GcpFormatProvider, GcpFormatProvider(org.apache.spark.sql.SparkSession@1b58b5f9))
	- field (class: ai.chronon.spark.TableUtils, name: tableFormatProvider, type: interface ai.chronon.spark.FormatProvider)
	- object (class ai.chronon.spark.TableUtils, ai.chronon.spark.TableUtils@a2ee97c)
	- element of array (index: 4)
	- array (class [Ljava.lang.Object;, size 8)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class ai.chronon.spark.GroupBy$, functionalInterfaceMethod=scala/Function0.apply:()Ljava/lang/Object;, implementation=invokeStatic ai/chronon/spark/GroupBy$.$anonfun$from$21:(Lai/chronon/spark/GroupBy$;Lai/chronon/api/GroupBy;Lscala/collection/immutable/List;Lai/chronon/online/PartitionRange;Lai/chronon/spark/TableUtils;[Ljava/lang/String;Lscala/Option;Z)Lorg/apache/spark/sql/Dataset;, instantiatedMethodType=()Lorg/apache/spark/sql/Dataset;, numCaptured=8])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class ai.chronon.spark.GroupBy$$$Lambda$3944/0x00007f641a860440, ai.chronon.spark.GroupBy$$$Lambda$3944/0x00007f641a860440@76968f03)
	- field (class: ai.chronon.spark.GroupBy, name: mutationDfFn, type: interface scala.Function0)
	- object (class ai.chronon.spark.GroupBy, ai.chronon.spark.GroupBy@1e5cded6)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class ai.chronon.spark.GroupBy, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic ai/chronon/spark/GroupBy.$anonfun$hopsAggregate$1:(Lai/chronon/spark/GroupBy;Lorg/apache/spark/sql/Row;)Lai/chronon/online/RowWrapper;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Lai/chronon/online/RowWrapper;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class ai.chronon.spark.GroupBy$$Lambda$4090/0x00007f641a44f040, ai.chronon.spark.GroupBy$$Lambda$4090/0x00007f641a44f040@d556143)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441)
```
- Do not create table when attempting to insert to a bigquery table. we
can rely on bq connector to create the table during indirect writes:
https://github.com/GoogleCloudDataproc/spark-bigquery-connector?tab=readme-ov-file#indirect-write

https://console.cloud.google.com/dataproc/jobs/job-a9239c5a/monitoring?region=us-central1&inv=1&invt=AbmOUg&project=canary-443022
```
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near 'BIGQUERY': extra input 'BIGQUERY'.(line 7, pos 2)

== SQL ==
CREATE TABLE data.quickstart_purchases_v1_upload (
    `key_bytes` binary,
    `value_bytes` binary,
    `key_json` string,
    `value_json` string,
    `ds` string
) BIGQUERY 
--^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(parsers.scala:257)
	at org.apache.spark.sql.catalyst.parser.AbstractParser.parse(parsers.scala:98)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:54)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(AbstractSqlParser.scala:68)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$5(SparkSession.scala:684)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:683)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
	at ai.chronon.spark.TableUtils.sql(TableUtils.scala:343)
	at ai.chronon.spark.TableUtils.insertUnPartitioned(TableUtils.scala:363)
```

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Introduced a new method for creating tables, simplifying the process
and adding checks for existing tables.

- **Improvements**
- Updated existing methods to utilize the new table creation logic,
enhancing modularity and efficiency.

- **Tests**
- Added new test methods to verify table creation functionality and
behavior with "BIGQUERY" format.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1209110958473696
chewy-zlai pushed a commit that referenced this pull request May 15, 2025
… issues and do not create table when attempting to insert (#174)

## Summary

- Add transient to TableFormatProvider because it contains BigQueryImpl
during reflection and that can't be serialized.

https://console.cloud.google.com/dataproc/jobs/1cf413cc-0430-48f3-9b6c-793df2e6331e/monitoring?region=us-central1&inv=1&invt=AbmOUg&project=canary-443022
```
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:444)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2733)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$mapValues$1(PairRDDFunctions.scala:750)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.mapValues(PairRDDFunctions.scala:749)
	at ai.chronon.spark.GroupBy.hopsAggregate(GroupBy.scala:391)
	at ai.chronon.spark.GroupBy.snapshotEventsBase(GroupBy.scala:166)
	at ai.chronon.spark.GroupByUpload.snapshotEvents(GroupByUpload.scala:84)
	at ai.chronon.spark.GroupByUpload$.run(GroupByUpload.scala:234)
	at ai.chronon.spark.Driver$GroupByUploader$.run(Driver.scala:520)
	at ai.chronon.spark.Driver$.main(Driver.scala:974)
	at ai.chronon.spark.Driver.main(Driver.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1032)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1124)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1133)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.google.cloud.bigquery.BigQueryImpl
Serialization stack:
	- object not serializable (class: com.google.cloud.bigquery.BigQueryImpl, value: com.google.cloud.bigquery.BigQueryImpl@47354906)
	- field (class: ai.chronon.integrations.cloud_gcp.GcpFormatProvider, name: bigQueryClient, type: interface com.google.cloud.bigquery.BigQuery)
	- object (class ai.chronon.integrations.cloud_gcp.GcpFormatProvider, GcpFormatProvider(org.apache.spark.sql.SparkSession@1b58b5f9))
	- field (class: ai.chronon.spark.TableUtils, name: tableFormatProvider, type: interface ai.chronon.spark.FormatProvider)
	- object (class ai.chronon.spark.TableUtils, ai.chronon.spark.TableUtils@a2ee97c)
	- element of array (index: 4)
	- array (class [Ljava.lang.Object;, size 8)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class ai.chronon.spark.GroupBy$, functionalInterfaceMethod=scala/Function0.apply:()Ljava/lang/Object;, implementation=invokeStatic ai/chronon/spark/GroupBy$.$anonfun$from$21:(Lai/chronon/spark/GroupBy$;Lai/chronon/api/GroupBy;Lscala/collection/immutable/List;Lai/chronon/online/PartitionRange;Lai/chronon/spark/TableUtils;[Ljava/lang/String;Lscala/Option;Z)Lorg/apache/spark/sql/Dataset;, instantiatedMethodType=()Lorg/apache/spark/sql/Dataset;, numCaptured=8])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class ai.chronon.spark.GroupBy$$$Lambda$3944/0x00007f641a860440, ai.chronon.spark.GroupBy$$$Lambda$3944/0x00007f641a860440@76968f03)
	- field (class: ai.chronon.spark.GroupBy, name: mutationDfFn, type: interface scala.Function0)
	- object (class ai.chronon.spark.GroupBy, ai.chronon.spark.GroupBy@1e5cded6)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class ai.chronon.spark.GroupBy, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic ai/chronon/spark/GroupBy.$anonfun$hopsAggregate$1:(Lai/chronon/spark/GroupBy;Lorg/apache/spark/sql/Row;)Lai/chronon/online/RowWrapper;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Lai/chronon/online/RowWrapper;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class ai.chronon.spark.GroupBy$$Lambda$4090/0x00007f641a44f040, ai.chronon.spark.GroupBy$$Lambda$4090/0x00007f641a44f040@d556143)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441)
```
- Do not create table when attempting to insert to a bigquery table. we
can rely on bq connector to create the table during indirect writes:
https://github.com/GoogleCloudDataproc/spark-bigquery-connector?tab=readme-ov-file#indirect-write

https://console.cloud.google.com/dataproc/jobs/job-a9239c5a/monitoring?region=us-central1&inv=1&invt=AbmOUg&project=canary-443022
```
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near 'BIGQUERY': extra input 'BIGQUERY'.(line 7, pos 2)

== SQL ==
CREATE TABLE data.quickstart_purchases_v1_upload (
    `key_bytes` binary,
    `value_bytes` binary,
    `key_json` string,
    `value_json` string,
    `ds` string
) BIGQUERY 
--^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(parsers.scala:257)
	at org.apache.spark.sql.catalyst.parser.AbstractParser.parse(parsers.scala:98)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:54)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(AbstractSqlParser.scala:68)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$5(SparkSession.scala:684)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:683)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
	at ai.chronon.spark.TableUtils.sql(TableUtils.scala:343)
	at ai.chronon.spark.TableUtils.insertUnPartitioned(TableUtils.scala:363)
```

## Checklist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Introduced a new method for creating tables, simplifying the process
and adding checks for existing tables.

- **Improvements**
- Updated existing methods to utilize the new table creation logic,
enhancing modularity and efficiency.

- **Tests**
- Added new test methods to verify table creation functionality and
behavior with "BIGQUERY" format.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1209110958473696
chewy-zlai pushed a commit that referenced this pull request May 16, 2025
…alization issues and do not create table when attempting to insert (#174)

## Summary

- Add transient to TableFormatProvider because it contains BigQueryImpl
during reflection and that can't be serialized.

https://console.cloud.google.com/dataproc/jobs/1cf413cc-0430-48f3-9b6c-793df2e6331e/monitoring?region=us-central1&inv=1&invt=AbmOUg&project=canary-443022
```
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:444)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2733)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$mapValues$1(PairRDDFunctions.scala:750)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.mapValues(PairRDDFunctions.scala:749)
	at ai.chronon.spark.GroupBy.hopsAggregate(GroupBy.scala:391)
	at ai.chronon.spark.GroupBy.snapshotEventsBase(GroupBy.scala:166)
	at ai.chronon.spark.GroupByUpload.snapshotEvents(GroupByUpload.scala:84)
	at ai.chronon.spark.GroupByUpload$.run(GroupByUpload.scala:234)
	at ai.chronon.spark.Driver$GroupByUploader$.run(Driver.scala:520)
	at ai.chronon.spark.Driver$.main(Driver.scala:974)
	at ai.chronon.spark.Driver.main(Driver.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1032)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1124)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1133)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.google.cloud.bigquery.BigQueryImpl
Serialization staour clients:
	- object not serializable (class: com.google.cloud.bigquery.BigQueryImpl, value: com.google.cloud.bigquery.BigQueryImpl@47354906)
	- field (class: ai.chronon.integrations.cloud_gcp.GcpFormatProvider, name: bigQueryClient, type: interface com.google.cloud.bigquery.BigQuery)
	- object (class ai.chronon.integrations.cloud_gcp.GcpFormatProvider, GcpFormatProvider(org.apache.spark.sql.SparkSession@1b58b5f9))
	- field (class: ai.chronon.spark.TableUtils, name: tableFormatProvider, type: interface ai.chronon.spark.FormatProvider)
	- object (class ai.chronon.spark.TableUtils, ai.chronon.spark.TableUtils@a2ee97c)
	- element of array (index: 4)
	- array (class [Ljava.lang.Object;, size 8)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class ai.chronon.spark.GroupBy$, functionalInterfaceMethod=scala/Function0.apply:()Ljava/lang/Object;, implementation=invokeStatic ai/chronon/spark/GroupBy$.$anonfun$from$21:(Lai/chronon/spark/GroupBy$;Lai/chronon/api/GroupBy;Lscala/collection/immutable/List;Lai/chronon/online/PartitionRange;Lai/chronon/spark/TableUtils;[Ljava/lang/String;Lscala/Option;Z)Lorg/apache/spark/sql/Dataset;, instantiatedMethodType=()Lorg/apache/spark/sql/Dataset;, numCaptured=8])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class ai.chronon.spark.GroupBy$$$Lambda$3944/0x00007f641a860440, ai.chronon.spark.GroupBy$$$Lambda$3944/0x00007f641a860440@76968f03)
	- field (class: ai.chronon.spark.GroupBy, name: mutationDfFn, type: interface scala.Function0)
	- object (class ai.chronon.spark.GroupBy, ai.chronon.spark.GroupBy@1e5cded6)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class ai.chronon.spark.GroupBy, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic ai/chronon/spark/GroupBy.$anonfun$hopsAggregate$1:(Lai/chronon/spark/GroupBy;Lorg/apache/spark/sql/Row;)Lai/chronon/online/RowWrapper;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Lai/chronon/online/RowWrapper;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class ai.chronon.spark.GroupBy$$Lambda$4090/0x00007f641a44f040, ai.chronon.spark.GroupBy$$Lambda$4090/0x00007f641a44f040@d556143)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441)
```
- Do not create table when attempting to insert to a bigquery table. we
can rely on bq connector to create the table during indirect writes:
https://github.com/GoogleCloudDataproc/spark-bigquery-connector?tab=readme-ov-file#indirect-write

https://console.cloud.google.com/dataproc/jobs/job-a9239c5a/monitoring?region=us-central1&inv=1&invt=AbmOUg&project=canary-443022
```
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
[PARSE_SYNTAX_ERROR] Syntax error at or near 'BIGQUERY': extra input 'BIGQUERY'.(line 7, pos 2)

== SQL ==
CREATE TABLE data.quiour clientsstart_purchases_v1_upload (
    `key_bytes` binary,
    `value_bytes` binary,
    `key_json` string,
    `value_json` string,
    `ds` string
) BIGQUERY 
--^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(parsers.scala:257)
	at org.apache.spark.sql.catalyst.parser.AbstractParser.parse(parsers.scala:98)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:54)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(AbstractSqlParser.scala:68)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$5(SparkSession.scala:684)
	at org.apache.spark.sql.catalyst.QueryPlanningTraour clientser.measurePhase(QueryPlanningTraour clientser.scala:138)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:683)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
	at ai.chronon.spark.TableUtils.sql(TableUtils.scala:343)
	at ai.chronon.spark.TableUtils.insertUnPartitioned(TableUtils.scala:363)
```

## Cheour clientslist
- [x] Added Unit Tests
- [ ] Covered by existing CI
- [ ] Integration tested
- [ ] Documentation update



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Introduced a new method for creating tables, simplifying the process
and adding cheour clientss for existing tables.

- **Improvements**
- Updated existing methods to utilize the new table creation logic,
enhancing modularity and efficiency.

- **Tests**
- Added new test methods to verify table creation functionality and
behavior with "BIGQUERY" format.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1209110958473696
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants