From ea266a1cbbf1bd9c3beabd2680d85b312e9cf2ef Mon Sep 17 00:00:00 2001 From: Hisoka Date: Wed, 26 Apr 2023 10:49:47 +0800 Subject: [PATCH 1/2] [SPARK-43267][JDBC] Handle postgres unknown user-defined column array as string --- .../scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index b53a0e66ba752..e98917891d4d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -93,13 +93,15 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { case "numeric" | "decimal" if precision > 0 => Some(DecimalType.bounded(precision, scale)) case "numeric" | "decimal" => // SPARK-26538: handle numeric without explicit precision and scale. - Some(DecimalType. SYSTEM_DEFAULT) + Some(DecimalType.SYSTEM_DEFAULT) case "money" => // money[] type seems to be broken and difficult to handle. // So this method returns None for now. // See SPARK-34333 and https://github.com/pgjdbc/pgjdbc/issues/1405 None - case _ => None + case _ => + // SPARK-43267: handle unknown types in array as string, because there are user-defined types + Some(StringType) } override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { From 07030c41dd4afb56606dbdc3499efef622d6dc90 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 1 Jun 2023 09:56:13 +0800 Subject: [PATCH 2/2] add test --- .../sql/jdbc/PostgresIntegrationSuite.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index f840876fc5d00..c539452bb9ae0 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -147,6 +147,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { |('2013-04-05 12:01:02'), |('2013-04-05 18:01:02.123'), |('2013-04-05 18:01:02.123456')""".stripMargin).executeUpdate() + + conn.prepareStatement("CREATE DOMAIN not_null_text AS TEXT DEFAULT ''").executeUpdate() + conn.prepareStatement("create table custom_type(type_array not_null_text[]," + + "type not_null_text)").executeUpdate() + conn.prepareStatement("INSERT INTO custom_type (type_array, type) VALUES" + + "('{1,fds,fdsa}','fdasfasdf')").executeUpdate() + } test("Type mapping for various types") { @@ -416,4 +423,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { val df_actual = sqlContext.read.jdbc(jdbcUrl, "timestamp_ntz_roundtrip", prop) assert(df_actual.collect()(0) == df_expected.collect()(0)) } + + test("SPARK-43267: user-defined column in array test") { + val df = sqlContext.read.jdbc(jdbcUrl, "custom_type", new Properties) + val row = df.collect() + assert(row.length === 1) + assert(row(0).length === 2) + assert(row(0).getSeq[String](0) == Seq("1", "fds", "fdsa")) + assert(row(0).getString(1) == "fdasfasdf") + } }