diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index eaf2a07ed459..af3c17dc98ae 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -185,6 +185,9 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT "('amy', '2022-05-19', '2022-05-19 00:00:00')").executeUpdate() connection.prepareStatement("INSERT INTO datetime VALUES " + "('alex', '2022-05-18', '2022-05-18 00:00:00')").executeUpdate() + // '2022-01-01' is Saturday and is in ISO year 2021. + connection.prepareStatement("INSERT INTO datetime VALUES " + + "('tom', '2022-01-01', '2022-01-01 00:00:00')").executeUpdate() } override def testUpdateColumnType(tbl: String): Unit = { @@ -279,17 +282,19 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT val df4 = sql(s"SELECT name FROM $tbl WHERE hour(time1) = 0 AND minute(time1) = 0") checkFilterPushed(df4) val rows4 = df4.collect() - assert(rows4.length === 2) + assert(rows4.length === 3) assert(rows4(0).getString(0) === "amy") assert(rows4(1).getString(0) === "alex") + assert(rows4(2).getString(0) === "tom") val df5 = sql(s"SELECT name FROM $tbl WHERE " + - "extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022") + "extract(WEEK from date1) > 10 AND extract(YEAR from date1) = 2022") checkFilterPushed(df5) val rows5 = df5.collect() - assert(rows5.length === 2) + assert(rows5.length === 3) assert(rows5(0).getString(0) === "amy") assert(rows5(1).getString(0) === "alex") + assert(rows5(2).getString(0) === "tom") val df6 = sql(s"SELECT name FROM $tbl WHERE date_add(date1, 1) = date'2022-05-20' " + "AND datediff(date1, '2022-05-10') > 0") @@ -304,11 +309,25 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT assert(rows7.length === 1) assert(rows7(0).getString(0) === "alex") - val df8 = sql(s"SELECT name FROM $tbl WHERE dayofweek(date1) = 4") - checkFilterPushed(df8) - val rows8 = df8.collect() - assert(rows8.length === 1) - assert(rows8(0).getString(0) === "alex") + withClue("dayofweek") { + val dow = sql(s"SELECT dayofweek(date1) FROM $tbl WHERE name = 'alex'") + .collect().head.getInt(0) + val df = sql(s"SELECT name FROM $tbl WHERE dayofweek(date1) = $dow") + checkFilterPushed(df) + val rows = df.collect() + assert(rows.length === 1) + assert(rows(0).getString(0) === "alex") + } + + withClue("yearofweek") { + val yow = sql(s"SELECT extract(YEAROFWEEK from date1) FROM $tbl WHERE name = 'tom'") + .collect().head.getInt(0) + val df = sql(s"SELECT name FROM $tbl WHERE extract(YEAROFWEEK from date1) = $yow") + checkFilterPushed(df) + val rows = df.collect() + assert(rows.length === 1) + assert(rows(0).getString(0) === "tom") + } val df9 = sql(s"SELECT name FROM $tbl WHERE " + "dayofyear(date1) > 100 order by dayofyear(date1) limit 1") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index d7fb12fcba83..4a9fbe8e8a41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -303,12 +303,24 @@ private case class PostgresDialect() class PostgresSQLBuilder extends JDBCSQLBuilder { override def visitExtract(field: String, source: String): String = { - field match { - case "DAY_OF_YEAR" => s"EXTRACT(DOY FROM $source)" - case "YEAR_OF_WEEK" => s"EXTRACT(YEAR FROM $source)" - case "DAY_OF_WEEK" => s"EXTRACT(DOW FROM $source)" - case _ => super.visitExtract(field, source) + // SECOND, MINUTE, HOUR, DAY, MONTH, QUARTER, YEAR are identical on postgres and spark for + // both datetime and interval types. + // DAY_OF_WEEK is DOW, day of week is full compatible with postgres, + // but in V2ExpressionBuilder they converted DAY_OF_WEEK to DAY_OF_WEEK_ISO, + // so we need to push down ISODOW + // (ISO and standard day of weeks differs in starting day, + // Sunday is 0 on standard DOW extraction, while in ISO it's 7) + // DAY_OF_YEAR have same semantic, but different name (On postgres, it is DOY) + // WEEK is a little bit specific function, but both spark and postgres uses ISO week + // YEAR_OF_WEEK is ISO year actually. First few days of a calendar year can belong to the + // past year by ISO standard of week counting. + val postgresField = field match { + case "DAY_OF_WEEK" => "ISODOW" + case "DAY_OF_YEAR" => "DOY" + case "YEAR_OF_WEEK" => "ISOYEAR" + case _ => field } + super.visitExtract(postgresField, source) } override def visitBinaryArithmetic(name: String, l: String, r: String): String = {