Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,13 @@ the following case-insensitive options:
This is a JDBC writer related option. When <code>SaveMode.Overwrite</code> is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to <code>false</code>. This option applies only to writing.
</td>
</tr>

<tr>
<td><code>cascadeTruncate</code></td>
<td>
This is a JDBC writer related option. If enabled and supported by the JDBC database (PostgreSQL and Oracle at the moment), this options allows execution of a <code>TRUNCATE TABLE t CASCADE</code> (in the case of PostgreSQL a <code>TRUNCATE TABLE ONLY t CASCADE</code> is executed to prevent inadvertently truncating descendant tables). This will affect other tables, and thus should be used with care. This option applies only to writing. It defaults to the default cascading truncate behaviour of the JDBC database in question, specified in the <code>isCascadeTruncate</code> in each JDBCDialect.
</td>
</tr>

<tr>
<td><code>createTableOptions</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ class JDBCOptions(
// ------------------------------------------------------------
// if to truncate the table from the JDBC database
val isTruncate = parameters.getOrElse(JDBC_TRUNCATE, "false").toBoolean

val isCascadeTruncate: Option[Boolean] = parameters.get(JDBC_CASCADE_TRUNCATE).map(_.toBoolean)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I'm not sure that I lost the previous context, but what about getOrElse like val isTruncate on line 121?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I didn't do that is because of the existence of the isCascadingTruncateTable function for each dialect. According to the docs, that indicates whether or not a TRUNCATE TABLE command results in cascading behaviour by default for a given dialect. I thought it would be nice to then use that value as the default value for isCascadeTruncate. In that way, if there ever is a dialect that cascades truncations by default, we don't 'hardcode' a default value.

// the create table option , which can be table_options or partition_options.
// E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"
// TODO: to reuse the existing partition parameters for those partition specific options
Expand Down Expand Up @@ -225,6 +227,7 @@ object JDBCOptions {
val JDBC_QUERY_TIMEOUT = newOption("queryTimeout")
val JDBC_BATCH_FETCH_SIZE = newOption("fetchsize")
val JDBC_TRUNCATE = newOption("truncate")
val JDBC_CASCADE_TRUNCATE = newOption("cascadeTruncate")
val JDBC_CREATE_TABLE_OPTIONS = newOption("createTableOptions")
val JDBC_CREATE_TABLE_COLUMN_TYPES = newOption("createTableColumnTypes")
val JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES = newOption("customSchema")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ object JdbcUtils extends Logging {
val statement = conn.createStatement
try {
statement.setQueryTimeout(options.queryTimeout)
statement.executeUpdate(dialect.getTruncateQuery(options.table))
val truncateQuery = if (options.isCascadeTruncate.isDefined) {
dialect.getTruncateQuery(options.table, options.isCascadeTruncate)
} else {
dialect.getTruncateQuery(options.table)
}
statement.executeUpdate(truncateQuery)
} finally {
statement.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,16 @@ private class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect
}
}

override def getTruncateQuery(table: String): String = {
dialects.head.getTruncateQuery(table)
/**
* The SQL query used to truncate a table.
* @param table The table to truncate.
* @param cascade Whether or not to cascade the truncation. Default value is the
* value of isCascadingTruncateTable()
* @return The SQL query to use for truncating a table
*/
override def getTruncateQuery(
table: String,
cascade: Option[Boolean] = isCascadingTruncateTable): String = {
dialects.head.getTruncateQuery(table, cascade)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ private object DerbyDialect extends JdbcDialect {
Option(JdbcType("DECIMAL(31,5)", java.sql.Types.DECIMAL))
case _ => None
}

override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.sql.{Connection, Date, Timestamp}
import org.apache.commons.lang3.StringUtils

import org.apache.spark.annotation.{DeveloperApi, InterfaceStability, Since}
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -120,12 +121,27 @@ abstract class JdbcDialect extends Serializable {
* The SQL query that should be used to truncate a table. Dialects can override this method to
* return a query that is suitable for a particular database. For PostgreSQL, for instance,
* a different query is used to prevent "TRUNCATE" affecting other tables.
* @param table The name of the table.
* @param table The table to truncate
* @return The SQL query to use for truncating a table
*/
@Since("2.3.0")
def getTruncateQuery(table: String): String = {
s"TRUNCATE TABLE $table"
getTruncateQuery(table, isCascadingTruncateTable)
}

/**
* The SQL query that should be used to truncate a table. Dialects can override this method to
* return a query that is suitable for a particular database. For PostgreSQL, for instance,
* a different query is used to prevent "TRUNCATE" affecting other tables.
* @param table The table to truncate
* @param cascade Whether or not to cascade the truncation
* @return The SQL query to use for truncating a table
*/
@Since("2.4.0")
def getTruncateQuery(
table: String,
cascade: Option[Boolean] = isCascadingTruncateTable): String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indent.

s"TRUNCATE TABLE $table"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,20 @@ private case object OracleDialect extends JdbcDialect {
}

override def isCascadingTruncateTable(): Option[Boolean] = Some(false)

/**
* The SQL query used to truncate a table.
* @param table The table to truncate
* @param cascade Whether or not to cascade the truncation. Default value is the
* value of isCascadingTruncateTable()
* @return The SQL query to use for truncating a table
*/
override def getTruncateQuery(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this function after isCascadingTruncateTable like the other files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

table: String,
cascade: Option[Boolean] = isCascadingTruncateTable): String = {
cascade match {
case Some(true) => s"TRUNCATE TABLE $table CASCADE"
case _ => s"TRUNCATE TABLE $table"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,27 @@ private object PostgresDialect extends JdbcDialect {
s"SELECT 1 FROM $table LIMIT 1"
}

override def isCascadingTruncateTable(): Option[Boolean] = Some(false)

/**
* The SQL query used to truncate a table. For Postgres, the default behaviour is to
* also truncate any descendant tables. As this is a (possibly unwanted) side-effect,
* the Postgres dialect adds 'ONLY' to truncate only the table in question
* @param table The name of the table.
* @return The SQL query to use for truncating a table
*/
override def getTruncateQuery(table: String): String = {
s"TRUNCATE TABLE ONLY $table"
* The SQL query used to truncate a table. For Postgres, the default behaviour is to
* also truncate any descendant tables. As this is a (possibly unwanted) side-effect,
* the Postgres dialect adds 'ONLY' to truncate only the table in question
* @param table The table to truncate
* @param cascade Whether or not to cascade the truncation. Default value is the value of
* isCascadingTruncateTable(). Cascading a truncation will truncate tables
* with a foreign key relationship to the target table. However, it will not
* truncate tables with an inheritance relationship to the target table, as
* the truncate query always includes "ONLY" to prevent this behaviour.
* @return The SQL query to use for truncating a table
*/
override def getTruncateQuery(
table: String,
cascade: Option[Boolean] = isCascadingTruncateTable): String = {
cascade match {
case Some(true) => s"TRUNCATE TABLE ONLY $table CASCADE"
Copy link
Member

@dongjoon-hyun dongjoon-hyun Feb 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you already test this in PostgreSQL.
But, to make it sure for us about using these ONLY and CASCADE at the same time, could you share us the result of a manual test of this with PostgreSQL tables, descendant, and foreign-key referenced tables here?

Copy link
Contributor Author

@danielvdende danielvdende Feb 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I made a quick example, as you can see using TRUNCATE TABLE ONLY $table CASCADE will truncate the foreign key-ed table, but leave the inheritance relationship intact: (this is what you meant, right?)

CREATE SCHEMA
postgres=# CREATE TABLE parent(a INT);
CREATE TABLE
postgres=# ALTER TABLE parent ADD CONSTRAINT some_constraint PRIMARY KEY(a);
ALTER TABLE
postgres=# CREATE TABLE child(b INT) INHERITS (parent);
CREATE TABLE
postgres=# CREATE TABLE forkey(c INT);
CREATE TABLE
postgres=# ALTER TABLE forkey ADD FOREIGN KEY(c) REFERENCES parent(a);
ALTER TABLE

postgres=# INSERT INTO parent VALUES(1);
INSERT 0 1
postgres=# select * from parent;
 a
---
 1
(1 row)

postgres=# select * from child;
 a | b
---+---
(0 rows)

postgres=# INSERT INTO child VALUES(2);
INSERT 0 1
postgres=# select * from child;
 a | b
---+---
 2 |
(1 row)

postgres=# select * from parent;
 a
---
 1
 2
(2 rows)

postgres=# INSERT INTO forkey VALUES(1);
INSERT 0 1
postgres=# select * from forkey;
 c
---
 1
(1 row)

postgres=# TRUNCATE TABLE ONLY parent CASCADE;
NOTICE:  truncate cascades to table "forkey"
TRUNCATE TABLE
postgres=# select * from parent;
 a
---
 2
(1 row)

postgres=# select * from child;
 a | b
---+---
 2 |
(1 row)

postgres=# select * from forkey;
 c
---
(0 rows)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Thank you for testing. Could you add more description about this into the parameter description of cascade?

   * @param cascade Whether or not to cascade the truncation. Default value is the value of
   *                isCascadingTruncateTable()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done :-)

case _ => s"TRUNCATE TABLE ONLY $table"
}
}

override def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = {
Expand All @@ -110,5 +122,4 @@ private object PostgresDialect extends JdbcDialect {
}
}

override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,22 @@ private case object TeradataDialect extends JdbcDialect {
case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR))
case _ => None
}

// Teradata does not support cascading a truncation
override def isCascadingTruncateTable(): Option[Boolean] = Some(false)

/**
* The SQL query used to truncate a table. Teradata does not support the 'TRUNCATE' syntax that
* other dialects use. Instead, we need to use a 'DELETE FROM' statement.
* @param table The table to truncate.
* @param cascade Whether or not to cascade the truncation. Default value is the
* value of isCascadingTruncateTable(). Teradata does not support cascading a
* 'DELETE FROM' statement (and as mentioned, does not support 'TRUNCATE' syntax)
* @return The SQL query to use for truncating a table
*/
override def getTruncateQuery(
table: String,
cascade: Option[Boolean] = isCascadingTruncateTable): String = {
s"DELETE FROM $table ALL"
}
}
46 changes: 39 additions & 7 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -861,19 +861,51 @@ class JDBCSuite extends QueryTest
}

test("truncate table query by jdbc dialect") {
val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db")
val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")
val mysql = JdbcDialects.get("jdbc:mysql://127.0.0.1/db")
val postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")
val db2 = JdbcDialects.get("jdbc:db2://127.0.0.1/db")
val h2 = JdbcDialects.get(url)
val derby = JdbcDialects.get("jdbc:derby:db")
val oracle = JdbcDialects.get("jdbc:oracle://127.0.0.1/db")
val teradata = JdbcDialects.get("jdbc:teradata://127.0.0.1/db")

val table = "weblogs"
val defaultQuery = s"TRUNCATE TABLE $table"
val postgresQuery = s"TRUNCATE TABLE ONLY $table"
assert(MySQL.getTruncateQuery(table) == defaultQuery)
assert(Postgres.getTruncateQuery(table) == postgresQuery)
assert(db2.getTruncateQuery(table) == defaultQuery)
assert(h2.getTruncateQuery(table) == defaultQuery)
assert(derby.getTruncateQuery(table) == defaultQuery)
val teradataQuery = s"DELETE FROM $table ALL"

Seq(mysql, db2, h2, derby).foreach{ dialect =>
assert(dialect.getTruncateQuery(table, Some(true)) == defaultQuery)
}

assert(postgres.getTruncateQuery(table) == postgresQuery)
assert(oracle.getTruncateQuery(table) == defaultQuery)
assert(teradata.getTruncateQuery(table) == teradataQuery)
}

test("SPARK-22880: Truncate table with CASCADE by jdbc dialect") {
// cascade in a truncate should only be applied for databases that support this,
// even if the parameter is passed.
val mysql = JdbcDialects.get("jdbc:mysql://127.0.0.1/db")
val postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")
val db2 = JdbcDialects.get("jdbc:db2://127.0.0.1/db")
val h2 = JdbcDialects.get(url)
val derby = JdbcDialects.get("jdbc:derby:db")
val oracle = JdbcDialects.get("jdbc:oracle://127.0.0.1/db")
val teradata = JdbcDialects.get("jdbc:teradata://127.0.0.1/db")

val table = "weblogs"
val defaultQuery = s"TRUNCATE TABLE $table"
val postgresQuery = s"TRUNCATE TABLE ONLY $table CASCADE"
val oracleQuery = s"TRUNCATE TABLE $table CASCADE"
val teradataQuery = s"DELETE FROM $table ALL"

Seq(mysql, db2, h2, derby).foreach{ dialect =>
assert(dialect.getTruncateQuery(table, Some(true)) == defaultQuery)
}
assert(postgres.getTruncateQuery(table, Some(true)) == postgresQuery)
assert(oracle.getTruncateQuery(table, Some(true)) == oracleQuery)
assert(teradata.getTruncateQuery(table, Some(true)) == teradataQuery)
}

test("Test DataFrame.where for Date and Timestamp") {
Expand Down