From de21cd2ea3c7219bc575a4e506367b403a8586e6 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 27 Feb 2025 17:01:19 +0800 Subject: [PATCH 1/2] Use correct pgsql datetime fields when pushing down EXTRACT --- .../jdbc/v2/PostgresIntegrationSuite.scala | 61 ++++++++++++++----- .../spark/sql/jdbc/PostgresDialect.scala | 25 ++++++-- 2 files changed, 65 insertions(+), 21 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index eaf2a07ed459..315f3c5f703c 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -185,6 +185,9 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT "('amy', '2022-05-19', '2022-05-19 00:00:00')").executeUpdate() connection.prepareStatement("INSERT INTO datetime VALUES " + "('alex', '2022-05-18', '2022-05-18 00:00:00')").executeUpdate() + // '2022-01-01' is Saturday and is in ISO year 2021. + connection.prepareStatement("INSERT INTO datetime VALUES " + + "('tom', '2022-01-01', '2022-01-01 00:00:00')").executeUpdate() } override def testUpdateColumnType(tbl: String): Unit = { @@ -269,27 +272,30 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT assert(rows2(0).getString(0) === "amy") assert(rows2(1).getString(0) === "alex") - val df3 = sql(s"SELECT name FROM $tbl WHERE second(time1) = 0 AND month(date1) = 5") + val df3 = sql(s"SELECT name FROM $tbl WHERE second(time1) = 0") checkFilterPushed(df3) val rows3 = df3.collect() - assert(rows3.length === 2) + assert(rows3.length === 3) assert(rows3(0).getString(0) === "amy") assert(rows3(1).getString(0) === "alex") + assert(rows3(2).getString(0) === "tom") val df4 = sql(s"SELECT name FROM $tbl WHERE hour(time1) = 0 AND minute(time1) = 0") checkFilterPushed(df4) val rows4 = df4.collect() - assert(rows4.length === 2) + assert(rows4.length === 3) assert(rows4(0).getString(0) === "amy") assert(rows4(1).getString(0) === "alex") + assert(rows4(2).getString(0) === "tom") val df5 = sql(s"SELECT name FROM $tbl WHERE " + - "extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022") + "extract(WEEK from date1) > 10 AND extract(YEAR from date1) = 2022") checkFilterPushed(df5) val rows5 = df5.collect() - assert(rows5.length === 2) + assert(rows5.length === 3) assert(rows5(0).getString(0) === "amy") assert(rows5(1).getString(0) === "alex") + assert(rows5(2).getString(0) === "tom") val df6 = sql(s"SELECT name FROM $tbl WHERE date_add(date1, 1) = date'2022-05-20' " + "AND datediff(date1, '2022-05-10') > 0") @@ -304,11 +310,25 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT assert(rows7.length === 1) assert(rows7(0).getString(0) === "alex") - val df8 = sql(s"SELECT name FROM $tbl WHERE dayofweek(date1) = 4") - checkFilterPushed(df8) - val rows8 = df8.collect() - assert(rows8.length === 1) - assert(rows8(0).getString(0) === "alex") + withClue("dayofweek") { + val dow = sql(s"SELECT dayofweek(date1) FROM $tbl WHERE name = 'alex'") + .collect().head.getInt(0) + val df = sql(s"SELECT name FROM $tbl WHERE dayofweek(date1) = $dow") + checkFilterPushed(df) + val rows = df.collect() + assert(rows.length === 1) + assert(rows(0).getString(0) === "alex") + } + + withClue("yearofweek") { + val yow = sql(s"SELECT extract(YEAROFWEEK from date1) FROM $tbl WHERE name = 'tom'") + .collect().head.getInt(0) + val df = sql(s"SELECT name FROM $tbl WHERE extract(YEAROFWEEK from date1) = $yow") + checkFilterPushed(df) + val rows = df.collect() + assert(rows.length === 1) + assert(rows(0).getString(0) === "tom") + } val df9 = sql(s"SELECT name FROM $tbl WHERE " + "dayofyear(date1) > 100 order by dayofyear(date1) limit 1") @@ -318,12 +338,21 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT assert(rows9(0).getString(0) === "alex") // Postgres does not support - val df10 = sql(s"SELECT name FROM $tbl WHERE trunc(date1, 'week') = date'2022-05-16'") - checkFilterPushed(df10, false) - val rows10 = df10.collect() - assert(rows10.length === 2) - assert(rows10(0).getString(0) === "amy") - assert(rows10(1).getString(0) === "alex") + withClue("unsupported") { + val df1 = sql(s"SELECT name FROM $tbl WHERE trunc(date1, 'week') = date'2022-05-16'") + checkFilterPushed(df1, false) + val rows1 = df1.collect() + assert(rows1.length === 2) + assert(rows1(0).getString(0) === "amy") + assert(rows1(1).getString(0) === "alex") + + val df2 = sql(s"SELECT name FROM $tbl WHERE month(date1) = 5") + checkFilterPushed(df2, false) + val rows2 = df2.collect() + assert(rows2.length === 2) + assert(rows2(0).getString(0) === "amy") + assert(rows2(1).getString(0) === "alex") + } } test("Test reading 2d array from table created via CTAS command - positive test") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index d7fb12fcba83..f7b40329db46 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -303,12 +303,27 @@ private case class PostgresDialect() class PostgresSQLBuilder extends JDBCSQLBuilder { override def visitExtract(field: String, source: String): String = { - field match { - case "DAY_OF_YEAR" => s"EXTRACT(DOY FROM $source)" - case "YEAR_OF_WEEK" => s"EXTRACT(YEAR FROM $source)" - case "DAY_OF_WEEK" => s"EXTRACT(DOW FROM $source)" - case _ => super.visitExtract(field, source) + // SECOND, MINUTE, HOUR, QUARTER, YEAR, DAY are identical on postgres and spark + // MONTH is different, postgres returns 0-11, spark returns 1-12. + // Postgres also returns 1-12 but just for interval columns, so without source + // data type, we cannot know how to push down it + // DAY_OF_WEEK is DOW, day of week is full compatible with postgres, + // but in V2ExpressionBuilder they converted DAY_OF_WEEK to DAY_OF_WEEK_ISO, + // so we need to push down ISODOW + // (ISO and standard day of weeks differs in starting day, + // Sunday is 0 on standard DOW extraction, while in ISO it's 7) + // DAY_OF_YEAR have same semantic, but different name (On postgres, it is DOY) + // WEEK is a little bit specific function, but both spark and postgres uses ISO week + // YEAR_OF_WEEK is ISO year actually, (first few days of calendar year are actually past year + // by ISO standard of week counting, 1st january is actually 52nd week of year + val postgresField = field match { + case "MONTH" => throw new UnsupportedOperationException("Month is not currently supported") + case "DAY_OF_WEEK" => "ISODOW" + case "DAY_OF_YEAR" => "DOY" + case "YEAR_OF_WEEK" => "ISOYEAR" + case _ => field } + super.visitExtract(postgresField, source) } override def visitBinaryArithmetic(name: String, l: String, r: String): String = { From 7f11a26bfe7defb476003114c5eb17b225c41c36 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 28 Feb 2025 11:35:37 +0800 Subject: [PATCH 2/2] address comments --- .../jdbc/v2/PostgresIntegrationSuite.scala | 26 ++++++------------- .../spark/sql/jdbc/PostgresDialect.scala | 11 +++----- 2 files changed, 12 insertions(+), 25 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 315f3c5f703c..af3c17dc98ae 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -272,13 +272,12 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT assert(rows2(0).getString(0) === "amy") assert(rows2(1).getString(0) === "alex") - val df3 = sql(s"SELECT name FROM $tbl WHERE second(time1) = 0") + val df3 = sql(s"SELECT name FROM $tbl WHERE second(time1) = 0 AND month(date1) = 5") checkFilterPushed(df3) val rows3 = df3.collect() - assert(rows3.length === 3) + assert(rows3.length === 2) assert(rows3(0).getString(0) === "amy") assert(rows3(1).getString(0) === "alex") - assert(rows3(2).getString(0) === "tom") val df4 = sql(s"SELECT name FROM $tbl WHERE hour(time1) = 0 AND minute(time1) = 0") checkFilterPushed(df4) @@ -338,21 +337,12 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT assert(rows9(0).getString(0) === "alex") // Postgres does not support - withClue("unsupported") { - val df1 = sql(s"SELECT name FROM $tbl WHERE trunc(date1, 'week') = date'2022-05-16'") - checkFilterPushed(df1, false) - val rows1 = df1.collect() - assert(rows1.length === 2) - assert(rows1(0).getString(0) === "amy") - assert(rows1(1).getString(0) === "alex") - - val df2 = sql(s"SELECT name FROM $tbl WHERE month(date1) = 5") - checkFilterPushed(df2, false) - val rows2 = df2.collect() - assert(rows2.length === 2) - assert(rows2(0).getString(0) === "amy") - assert(rows2(1).getString(0) === "alex") - } + val df10 = sql(s"SELECT name FROM $tbl WHERE trunc(date1, 'week') = date'2022-05-16'") + checkFilterPushed(df10, false) + val rows10 = df10.collect() + assert(rows10.length === 2) + assert(rows10(0).getString(0) === "amy") + assert(rows10(1).getString(0) === "alex") } test("Test reading 2d array from table created via CTAS command - positive test") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index f7b40329db46..4a9fbe8e8a41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -303,10 +303,8 @@ private case class PostgresDialect() class PostgresSQLBuilder extends JDBCSQLBuilder { override def visitExtract(field: String, source: String): String = { - // SECOND, MINUTE, HOUR, QUARTER, YEAR, DAY are identical on postgres and spark - // MONTH is different, postgres returns 0-11, spark returns 1-12. - // Postgres also returns 1-12 but just for interval columns, so without source - // data type, we cannot know how to push down it + // SECOND, MINUTE, HOUR, DAY, MONTH, QUARTER, YEAR are identical on postgres and spark for + // both datetime and interval types. // DAY_OF_WEEK is DOW, day of week is full compatible with postgres, // but in V2ExpressionBuilder they converted DAY_OF_WEEK to DAY_OF_WEEK_ISO, // so we need to push down ISODOW @@ -314,10 +312,9 @@ private case class PostgresDialect() // Sunday is 0 on standard DOW extraction, while in ISO it's 7) // DAY_OF_YEAR have same semantic, but different name (On postgres, it is DOY) // WEEK is a little bit specific function, but both spark and postgres uses ISO week - // YEAR_OF_WEEK is ISO year actually, (first few days of calendar year are actually past year - // by ISO standard of week counting, 1st january is actually 52nd week of year + // YEAR_OF_WEEK is ISO year actually. First few days of a calendar year can belong to the + // past year by ISO standard of week counting. val postgresField = field match { - case "MONTH" => throw new UnsupportedOperationException("Month is not currently supported") case "DAY_OF_WEEK" => "ISODOW" case "DAY_OF_YEAR" => "DOY" case "YEAR_OF_WEEK" => "ISOYEAR"