Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate dynamic JDBC driver provisioning #391

Open
razvan opened this issue Apr 19, 2024 · 9 comments
Open

Investigate dynamic JDBC driver provisioning #391

razvan opened this issue Apr 19, 2024 · 9 comments

Comments

@razvan
Copy link
Member

razvan commented Apr 19, 2024

          @razvan Im using stackable Spark 3.5.1 

And trying to use JDBC catalog for Iceberg. I need PostgreSQL driver for that.

What I did first:

spec:
  deps:
    packages:
    - org.apache.logging.log4j:log4j-slf4j-impl:2.22.0
    - org.apache.hadoop:hadoop-aws:3.3.4
    - org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0
    - org.apache.iceberg:iceberg-aws-bundle:1.5.0
    - org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.79.0
    - org.postgresql:postgresql:42.6.0

After than in enviroment tab of SparkUI I can see postgresql driver. -
spark://pyspark-pi-testtesttest-10f9ee8ef5283865-driver-svc.spark.svc:7078/files/org.postgresql_postgresql-42.6.0.jar Added By User

But I got this exception:

File "/tmp/spark-56bc1d7c-393f-45f1-ab6e-32b50f313935/jdbc_1_get_metadata_json_path.py", line 25, in <module>
   spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog_to_migrate}.{ds}").show()
   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 File "/stackable/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 1631, in sql
 File "/stackable/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
 File "/stackable/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 179, in deco
 File "/stackable/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o96.sql.
: org.apache.iceberg.jdbc.UncheckedSQLException: Failed to connect: postgresql://rds-stage-jdbc-catalog.bbb.me:5432/catalog
   at org.apache.iceberg.jdbc.JdbcClientPool.newClient(JdbcClientPool.java:57)
   at org.apache.iceberg.jdbc.JdbcClientPool.newClient(JdbcClientPool.java:30)
   at org.apache.iceberg.ClientPoolImpl.get(ClientPoolImpl.java:125)
   at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:56)
   at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
   at org.apache.iceberg.jdbc.JdbcCatalog.initializeCatalogTables(JdbcCatalog.java:155)
   at org.apache.iceberg.jdbc.JdbcCatalog.initialize(JdbcCatalog.java:141)
   at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:255)
   at org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:309)
   at org.apache.iceberg.spark.SparkCatalog.buildIcebergCatalog(SparkCatalog.java:154)
   at org.apache.iceberg.spark.SparkCatalog.initialize(SparkCatalog.java:751)
   at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:65)
   at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:53)
   at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
   at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:53)
   at org.apache.spark.sql.connector.catalog.LookupCatalog$CatalogAndNamespace$.unapply(LookupCatalog.scala:86)
   at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:51)
   at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:30)
   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:32)
   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$4(AnalysisHelper.scala:175)
   at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1215)
   at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1214)
   at org.apache.spark.sql.catalyst.plans.logical.CreateNamespace.mapChildren(v2Commands.scala:548)
   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:175)
   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:32)
   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:32)
   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:32)
   at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:30)
   at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:27)
   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
   at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
   at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
   at scala.collection.immutable.List.foldLeft(List.scala:91)
   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
   at scala.collection.immutable.List.foreach(List.scala:431)
   at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
   at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:226)
   at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:222)
   at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
   at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:222)
   at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
   at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
   at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
   at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
   at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
   at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
   at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
   at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
   at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
   at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
   at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
   at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
   at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
   at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
   at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
   at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
   at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
   at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
   at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
   at py4j.Gateway.invoke(Gateway.java:282)
   at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   at py4j.commands.CallCommand.execute(CallCommand.java:79)
   at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
   at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
   at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.sql.SQLException: No suitable driver found for postgresql://rds-stage-jdbc-catalog.bbb.me:5432/catalog
   at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:702)
   at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:189)
   at org.apache.iceberg.jdbc.JdbcClientPool.newClient(JdbcClientPool.java:55)
   ... 89 more

Also some of my settings:
spark.driver.userClassPathFirst: 'false'
spark.executor.userClassPathFirst: 'false'

without that other packages is not working and trigger strange errors.
Also providing this jar under PVC is not helping. I can see it in SparkUI in classpath but still getting this error

Originally posted by @supsupsap in #245 (comment)

@supsupsap
Copy link

supsupsap commented Apr 19, 2024

@razvan Also custom image with provided postgresql jar didnt help...

FROM docker.stackable.tech/stackable/spark-k8s:3.5.1-stackable24.3.0

RUN curl --fail -o /stackable/spark/jars/postgresql-42.6.0.jar "https://jdbc.postgresql.org/download/postgresql-42.6.0.jar"

And I see this in SparkUI
/stackable/spark/jars/postgresql-42.6.0.jar System Classpath

But still getting

Caused by: java.sql.SQLException: No suitable driver found for postgresql://rds-stage-jdbc-catalog.bbb.me:5432/catalog
	at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:702)
	at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:189)
	at org.apache.iceberg.jdbc.JdbcClientPool.newClient(JdbcClientPool.java:55)
	... 89 more

@supsupsap
Copy link

@razvan May be its because my curl is on top of entrypoint layer? I mean may be it should be on the same level like here https://github.com/stackabletech/docker-images/blob/main/spark-k8s/Dockerfile#L67?

@razvan
Copy link
Member Author

razvan commented Apr 19, 2024

Is the SparkApplication spec updated to point to the custom image, like this?

spec:
  sparkImage:
    custom: <your image name>
    productVersion: 3.5.1
    pullPolicy: IfNotPresent

@supsupsap
Copy link

Of course.

@supsupsap
Copy link

sparkImage:
    productVersion: 3.5.1
    custom: registry.bbb.me/bbb/stackable_spark_postgres_image:v0.0.3 
    pullPolicy: IfNotPresent

@razvan
Copy link
Member Author

razvan commented Apr 19, 2024

Update: I had some leftover deps in the spec. Removed them and it still works:

  deps:
    packages:
      - org.postgresql:postgresql:42.6.0

This works for me with an image I built from your Dockerfile:

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: write-to-postgresql
data:
  write-to-postgresql.py: |
    from pyspark.sql import SparkSession
    from pyspark.sql.types import *

    spark = SparkSession.builder.appName("write-to-postgresql").getOrCreate()

    df = spark.createDataFrame([1,2,3], IntegerType())
    
    # Specifying create table column data types on write
    df.write \
    .option("createTableColumnTypes", "value INTEGER") \
    .jdbc("jdbc:postgresql://spark-postgresql/spark", "sparktest",
          properties={"user": "spark", "password": "spark"})
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
  name: spark-postgresql
spec:
  version: "1.0"
  sparkImage:
    imagePullPolicy: IfNotPresent
    custom: "docker.stackable.tech/sandbox/spark-k8s:3.5.1-stackable24.3.0-postgresql"
    productVersion: "3.5.1"
  mode: cluster
  mainApplicationFile: "local:///stackable/spark/jobs/write-to-postgresql.py"
  job:
    logging:
      enableVectorAgent: False
    config:
      volumeMounts:
        - name: script
          mountPath: /stackable/spark/jobs
  driver:
    logging:
      enableVectorAgent: False
    config:
      volumeMounts:
        - name: script
          mountPath: /stackable/spark/jobs
  executor:
    logging:
      enableVectorAgent: False
    config:
      volumeMounts:
        - name: script
          mountPath: /stackable/spark/jobs
  volumes:
    - name: script
      configMap:
        name: write-to-postgresql

@supsupsap
Copy link

yeah it was my fault. my connection string was just postgresql:/
not jdbc:postgresql:
so yeah its working now. thank you!

@supsupsap
Copy link

@maltesander I think its still not completed. Because providing - org.postgresql:postgresql:42.6.0 in packages still not working with corrected jdbc scheme

@maltesander
Copy link
Member

@maltesander I think its still not completed. Because providing - org.postgresql:postgresql:42.6.0 in packages still not working with corrected jdbc scheme

Sorry, reopening then :)

@maltesander maltesander reopened this Apr 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants