Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
|)
""".stripMargin
).executeUpdate()
connection.prepareStatement(
"CREATE TABLE datetime (name VARCHAR(32), date1 DATE, time1 TIMESTAMP)")
.executeUpdate()
}

override def dataPreparation(connection: Connection): Unit = {
super.dataPreparation(connection)
connection.prepareStatement("INSERT INTO datetime VALUES " +
"('amy', '2022-05-19', '2022-05-19 00:00:00')").executeUpdate()
connection.prepareStatement("INSERT INTO datetime VALUES " +
"('alex', '2022-05-18', '2022-05-18 00:00:00')").executeUpdate()
}

override def testUpdateColumnType(tbl: String): Unit = {
Expand Down Expand Up @@ -123,4 +134,77 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
)
}
}

override def testDatetime(tbl: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for my curiosity, what does super.testDatatime do? an empty function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Just is an empty function.

val df1 = sql(s"SELECT name FROM $tbl WHERE " +
"dayofyear(date1) > 100 AND dayofmonth(date1) > 10 ")
checkFilterPushed(df1)
val rows1 = df1.collect()
assert(rows1.length === 2)
assert(rows1(0).getString(0) === "amy")
assert(rows1(1).getString(0) === "alex")

val df2 = sql(s"SELECT name FROM $tbl WHERE year(date1) = 2022 AND quarter(date1) = 2")
checkFilterPushed(df2)
val rows2 = df2.collect()
assert(rows2.length === 2)
assert(rows2(0).getString(0) === "amy")
assert(rows2(1).getString(0) === "alex")

val df3 = sql(s"SELECT name FROM $tbl WHERE second(time1) = 0 AND month(date1) = 5")
checkFilterPushed(df3)
val rows3 = df3.collect()
assert(rows3.length === 2)
assert(rows3(0).getString(0) === "amy")
assert(rows3(1).getString(0) === "alex")

val df4 = sql(s"SELECT name FROM $tbl WHERE hour(time1) = 0 AND minute(time1) = 0")
checkFilterPushed(df4)
val rows4 = df4.collect()
assert(rows4.length === 2)
assert(rows4(0).getString(0) === "amy")
assert(rows4(1).getString(0) === "alex")

val df5 = sql(s"SELECT name FROM $tbl WHERE " +
"extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022")
checkFilterPushed(df5)
val rows5 = df5.collect()
assert(rows5.length === 2)
assert(rows5(0).getString(0) === "amy")
assert(rows5(1).getString(0) === "alex")

val df6 = sql(s"SELECT name FROM $tbl WHERE date_add(date1, 1) = date'2022-05-20' " +
"AND datediff(date1, '2022-05-10') > 0")
checkFilterPushed(df6, false)
val rows6 = df6.collect()
assert(rows6.length === 1)
assert(rows6(0).getString(0) === "amy")

val df7 = sql(s"SELECT name FROM $tbl WHERE weekday(date1) = 2")
checkFilterPushed(df7)
val rows7 = df7.collect()
assert(rows7.length === 1)
assert(rows7(0).getString(0) === "alex")

val df8 = sql(s"SELECT name FROM $tbl WHERE dayofweek(date1) = 4")
checkFilterPushed(df8)
val rows8 = df8.collect()
assert(rows8.length === 1)
assert(rows8(0).getString(0) === "alex")

val df9 = sql(s"SELECT name FROM $tbl WHERE " +
"dayofyear(date1) > 100 order by dayofyear(date1) limit 1")
checkFilterPushed(df9)
val rows9 = df9.collect()
assert(rows9.length === 1)
assert(rows9(0).getString(0) === "alex")

// Postgres does not support
val df10 = sql(s"SELECT name FROM $tbl WHERE trunc(date1, 'week') = date'2022-05-16'")
checkFilterPushed(df10, false)
val rows10 = df10.collect()
assert(rows10.length === 2)
assert(rows10(0).getString(0) === "amy")
assert(rows10(1).getString(0) === "alex")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import java.util
import java.util.Locale

import scala.util.Using
import scala.util.control.NonFatal

import org.apache.spark.internal.LogKeys.COLUMN_NAME
import org.apache.spark.internal.MDC
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NonEmptyNamespaceException, NoSuchIndexException}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.connector.expressions.{Expression, NamedReference}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
Expand Down Expand Up @@ -298,6 +299,28 @@ private case class PostgresDialect()
}
}

class PostgresSQLBuilder extends JDBCSQLBuilder {
override def visitExtract(field: String, source: String): String = {
field match {
case "DAY_OF_YEAR" => s"EXTRACT(DOY FROM $source)"
case "YEAR_OF_WEEK" => s"EXTRACT(YEAR FROM $source)"
case "DAY_OF_WEEK" => s"EXTRACT(DOW FROM $source)"
case _ => super.visitExtract(field, source)
}
}
}

override def compileExpression(expr: Expression): Option[String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an interesting change. Shall we do this by default? It's better to push down nothing than failing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#48322 will avoid the issue.

val postgresSQLBuilder = new PostgresSQLBuilder()
try {
Some(postgresSQLBuilder.build(expr))
} catch {
case NonFatal(e) =>
logWarning("Error occurs while compiling V2 expression", e)
None
}
}

override def supportsLimit: Boolean = true

override def supportsOffset: Boolean = true
Expand Down