Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
"('amy', '2022-05-19', '2022-05-19 00:00:00')").executeUpdate()
connection.prepareStatement("INSERT INTO datetime VALUES " +
"('alex', '2022-05-18', '2022-05-18 00:00:00')").executeUpdate()
// '2022-01-01' is Saturday and is in ISO year 2021.
connection.prepareStatement("INSERT INTO datetime VALUES " +
"('tom', '2022-01-01', '2022-01-01 00:00:00')").executeUpdate()
}

override def testUpdateColumnType(tbl: String): Unit = {
Expand Down Expand Up @@ -269,27 +272,30 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
assert(rows2(0).getString(0) === "amy")
assert(rows2(1).getString(0) === "alex")

val df3 = sql(s"SELECT name FROM $tbl WHERE second(time1) = 0 AND month(date1) = 5")
val df3 = sql(s"SELECT name FROM $tbl WHERE second(time1) = 0")
checkFilterPushed(df3)
val rows3 = df3.collect()
assert(rows3.length === 2)
assert(rows3.length === 3)
assert(rows3(0).getString(0) === "amy")
assert(rows3(1).getString(0) === "alex")
assert(rows3(2).getString(0) === "tom")

val df4 = sql(s"SELECT name FROM $tbl WHERE hour(time1) = 0 AND minute(time1) = 0")
checkFilterPushed(df4)
val rows4 = df4.collect()
assert(rows4.length === 2)
assert(rows4.length === 3)
assert(rows4(0).getString(0) === "amy")
assert(rows4(1).getString(0) === "alex")
assert(rows4(2).getString(0) === "tom")

val df5 = sql(s"SELECT name FROM $tbl WHERE " +
"extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022")
"extract(WEEK from date1) > 10 AND extract(YEAR from date1) = 2022")
checkFilterPushed(df5)
val rows5 = df5.collect()
assert(rows5.length === 2)
assert(rows5.length === 3)
assert(rows5(0).getString(0) === "amy")
assert(rows5(1).getString(0) === "alex")
assert(rows5(2).getString(0) === "tom")

val df6 = sql(s"SELECT name FROM $tbl WHERE date_add(date1, 1) = date'2022-05-20' " +
"AND datediff(date1, '2022-05-10') > 0")
Expand All @@ -304,11 +310,25 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
assert(rows7.length === 1)
assert(rows7(0).getString(0) === "alex")

val df8 = sql(s"SELECT name FROM $tbl WHERE dayofweek(date1) = 4")
checkFilterPushed(df8)
val rows8 = df8.collect()
assert(rows8.length === 1)
assert(rows8(0).getString(0) === "alex")
withClue("dayofweek") {
val dow = sql(s"SELECT dayofweek(date1) FROM $tbl WHERE name = 'alex'")
.collect().head.getInt(0)
val df = sql(s"SELECT name FROM $tbl WHERE dayofweek(date1) = $dow")
Copy link
Contributor Author

@cloud-fan cloud-fan Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better idea is probably having a test infra that automatically checks the result between JDBC pushdown on and off. I'll leave it to followups.

checkFilterPushed(df)
val rows = df.collect()
assert(rows.length === 1)
assert(rows(0).getString(0) === "alex")
}

withClue("yearofweek") {
val yow = sql(s"SELECT extract(YEAROFWEEK from date1) FROM $tbl WHERE name = 'tom'")
.collect().head.getInt(0)
val df = sql(s"SELECT name FROM $tbl WHERE extract(YEAROFWEEK from date1) = $yow")
checkFilterPushed(df)
val rows = df.collect()
assert(rows.length === 1)
assert(rows(0).getString(0) === "tom")
}

val df9 = sql(s"SELECT name FROM $tbl WHERE " +
"dayofyear(date1) > 100 order by dayofyear(date1) limit 1")
Expand All @@ -318,12 +338,21 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
assert(rows9(0).getString(0) === "alex")

// Postgres does not support
val df10 = sql(s"SELECT name FROM $tbl WHERE trunc(date1, 'week') = date'2022-05-16'")
checkFilterPushed(df10, false)
val rows10 = df10.collect()
assert(rows10.length === 2)
assert(rows10(0).getString(0) === "amy")
assert(rows10(1).getString(0) === "alex")
withClue("unsupported") {
val df1 = sql(s"SELECT name FROM $tbl WHERE trunc(date1, 'week') = date'2022-05-16'")
checkFilterPushed(df1, false)
val rows1 = df1.collect()
assert(rows1.length === 2)
assert(rows1(0).getString(0) === "amy")
assert(rows1(1).getString(0) === "alex")

val df2 = sql(s"SELECT name FROM $tbl WHERE month(date1) = 5")
checkFilterPushed(df2, false)
val rows2 = df2.collect()
assert(rows2.length === 2)
assert(rows2(0).getString(0) === "amy")
assert(rows2(1).getString(0) === "alex")
}
}

test("Test reading 2d array from table created via CTAS command - positive test") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,27 @@ private case class PostgresDialect()

class PostgresSQLBuilder extends JDBCSQLBuilder {
override def visitExtract(field: String, source: String): String = {
field match {
case "DAY_OF_YEAR" => s"EXTRACT(DOY FROM $source)"
case "YEAR_OF_WEEK" => s"EXTRACT(YEAR FROM $source)"
case "DAY_OF_WEEK" => s"EXTRACT(DOW FROM $source)"
case _ => super.visitExtract(field, source)
// SECOND, MINUTE, HOUR, QUARTER, YEAR, DAY are identical on postgres and spark
// MONTH is different, postgres returns 0-11, spark returns 1-12.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the doc you provided, but it says 1-based for datetimes, 0-based for intervals on the pg side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, sorry I misread the doc.

Now looking into it again, Spark DS v2 pushdown does not support extracting fields from interval type yet (expressions like ExtractANSIIntervalMonths are not included in V2ExpressionBuilder). But even if we support it later, its semantic is the same as pgsql.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I believe they shall be identical as they all came from the work within the Postgres feature parity umbrella ticket.

I appreciate the confirmation.

// Postgres also returns 1-12 but just for interval columns, so without source
// data type, we cannot know how to push down it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

屏幕快照 2025-02-28 上午10 37 07

// DAY_OF_WEEK is DOW, day of week is full compatible with postgres,
// but in V2ExpressionBuilder they converted DAY_OF_WEEK to DAY_OF_WEEK_ISO,
// so we need to push down ISODOW
// (ISO and standard day of weeks differs in starting day,
// Sunday is 0 on standard DOW extraction, while in ISO it's 7)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thank you for the explanation.

// DAY_OF_YEAR have same semantic, but different name (On postgres, it is DOY)
// WEEK is a little bit specific function, but both spark and postgres uses ISO week
// YEAR_OF_WEEK is ISO year actually, (first few days of calendar year are actually past year
// by ISO standard of week counting, 1st january is actually 52nd week of year
val postgresField = field match {
case "MONTH" => throw new UnsupportedOperationException("Month is not currently supported")
case "DAY_OF_WEEK" => "ISODOW"
case "DAY_OF_YEAR" => "DOY"
case "YEAR_OF_WEEK" => "ISOYEAR"
case _ => field
}
super.visitExtract(postgresField, source)
}

override def visitBinaryArithmetic(name: String, l: String, r: String): String = {
Expand Down