Skip to content
Closed
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -350,17 +350,15 @@ must also set a distribution key with the <tt>distkey</tt> option.
</td>
</tr>
<tr>
<td><tt>usestagingtable</tt></td>
<td><del><tt>usestagingtable</tt></del> (Deprecated)</td>
<td>No</td>
<td><tt>true</tt></td>
<td>
<p>When performing an overwrite of existing data, this setting can be used to stage the new data in a temporary
table, such that we make sure the <tt>COPY</tt> finishes successfully before making any changes to the existing table.
This means that we minimize the amount of time that the target table will be unavailable and restore the old
data should the <tt>COPY</tt> fail.</p>
<p>
Setting this deprecated option to <tt>false</tt> will cause an overwrite operation's destination table to be dropped immediately at the beginning of the write, making the overwrite operation non-atomic and reducing the availability of the destination table. This may reduce the temporary disk space requirements for overwrites.
</p>

<p>You may wish to disable this by setting the parameter to <tt>false</tt> if you can't spare the disk space in your
Redshift cluster and/or don't have requirements to keep the table availability high.</p>
<p>Since setting <tt>usestagingtable=false</tt> operation risks data loss / unavailability, we have chosen to deprecate it in favor of requiring users to manually drop the destination table themselves.</p>
</td>
</tr>
<tr>
Expand Down Expand Up @@ -470,15 +468,17 @@ When reading from / writing to Redshift, this library reads and writes data in S

### Guarantees of the Redshift data source for Spark

**Creating a new table**: Creating a new table is a two-step process, consisting of a `CREATE TABLE` command followed by a [`COPY`](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html) command to append the initial set of rows. Currently, these two steps are performed in separate transactions, so their effects may become visible at different times to readers. The `COPY` itself is atomic, so the table will never be visible in a state where it contains a non-empty subset of the saved rows. In a future release, this will be changed so that the `CREATE TABLE` and `COPY` statements are issued as part of the same transaction.

**Appending to an existing table**: In the [`COPY`](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html) command, this library uses [manifests](https://docs.aws.amazon.com/redshift/latest/dg/loading-data-files-using-manifest.html) to guard against certain eventually-consistent S3 operations. As a result, it appends to existing tables have the same atomic and transactional properties as regular Redshift `COPY` commands.

**Overwriting an existing table**: By default, this library uses transactions to perform overwrites. Outside of a transaction, it will create an empty temporary table and append the new rows using a `COPY` statement. If the `COPY` succeeds, it will use a transaction to atomically delete the overwritten table and rename the temporary table to destination table.
**Appending to an existing table**: When inserting rows into Redshift, this library uses the [`COPY`](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html) command and specifies [manifests](https://docs.aws.amazon.com/redshift/latest/dg/loading-data-files-using-manifest.html) to guard against certain eventually-consistent S3 operations. As a result, `spark-redshift` appends to existing tables have the same atomic and transactional properties as regular Redshift `COPY` commands.

In a future release, this will be changed so that the temporary table is created in the same transaction as the `COPY`.

This use of a staging table can be disabled by setting `usestagingtable` to `false`, in which case the destination table will be deleted before the `COPY`, sacrificing the atomicity of the overwrite operation.
**Creating a new table (`SaveMode.CreateIfNotExists`)**: Creating a new table is a two-step process, consisting of a `CREATE TABLE` command followed by a [`COPY`](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html) command to append the initial set of rows. Both of these operations are performed in a single transaction.

**Overwriting an existing table**: By default, this library uses transactions to perform overwrites, which are implemented by deleting the destination table, creating a new empty table, and appending rows to it.

If the deprecated `usestagingtable` setting is set to `false` then this library will commit the `DELETE TABLE` command before appending rows to the new table, sacrificing the atomicity of the overwrite operation but reducing the amount of staging space that Redshift needs during the overwrite.

**Querying Redshift tables**: Queries use Redshift's [`UNLOAD`](https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html) command to execute a query and save its results to S3 and use [manifests](https://docs.aws.amazon.com/redshift/latest/dg/loading-data-files-using-manifest.html) to guard against certain eventually-consistent S3 operations. As a result, queries from Redshift data source for Spark should have the same consistency properties as regular Redshift queries.

Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/com/databricks/spark/redshift/Parameters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ private[redshift] object Parameters {
def sortKeySpec: Option[String] = parameters.get("sortkeyspec")

/**
* DEPRECATED: see PR #157.
*
* When true, data is always loaded into a new temporary table when performing an overwrite.
* This is to ensure that the whole load process succeeds before dropping any data from
* Redshift, which can be useful if, in the event of failures, stale data is better than no data
Expand Down
102 changes: 39 additions & 63 deletions src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.spark.TaskContext
import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.util.Random
import scala.util.control.NonFatal

import com.databricks.spark.redshift.Parameters.MergedParameters
Expand All @@ -48,18 +47,15 @@ import org.apache.spark.sql.types._
* non-empty. After the write operation completes, we use this to construct a list of non-empty
* Avro partition files.
*
* - Use JDBC to issue any CREATE TABLE commands, if required.
*
* - If there is data to be written (i.e. not all partitions were empty), then use the list of
* non-empty Avro files to construct a JSON manifest file to tell Redshift to load those files.
* This manifest is written to S3 alongside the Avro files themselves. We need to use an
* explicit manifest, as opposed to simply passing the name of the directory containing the
* Avro files, in order to work around a bug related to parsing of empty Avro files (see #96).
*
* - Use JDBC to issue a COPY command in order to instruct Redshift to load the Avro data into
* the appropriate table. If the Overwrite SaveMode is being used, then by default the data
* will be loaded into a temporary staging table, which later will atomically replace the
* original table via a transaction.
* - Start a new JDBC transaction and disable auto-commit. Depending on the SaveMode, issue
* DELETE TABLE or CREATE TABLE commands, then use the COPY command to instruct Redshift to load
* the Avro data into the appropriate table.
*/
private[redshift] class RedshiftWriter(
jdbcWrapper: JDBCWrapper,
Expand Down Expand Up @@ -101,43 +97,6 @@ private[redshift] class RedshiftWriter(
s"AVRO 'auto' manifest ${params.extraCopyOptions}"
}

/**
* Sets up a staging table then runs the given action, passing the temporary table name
* as a parameter.
*/
private def withStagingTable(
conn: Connection,
table: TableName,
action: (String) => Unit) {
val randomSuffix = Math.abs(Random.nextInt()).toString
val tempTable =
table.copy(unescapedTableName = s"${table.unescapedTableName}_staging_$randomSuffix")
val backupTable =
table.copy(unescapedTableName = s"${table.unescapedTableName}_backup_$randomSuffix")
log.info("Loading new Redshift data to: " + tempTable)
log.info("Existing data will be backed up in: " + backupTable)

try {
action(tempTable.toString)

if (jdbcWrapper.tableExists(conn, table.toString)) {
jdbcWrapper.executeInterruptibly(conn.prepareStatement(
s"""
| BEGIN;
| ALTER TABLE $table RENAME TO ${backupTable.escapedTableName};
| ALTER TABLE $tempTable RENAME TO ${table.escapedTableName};
| DROP TABLE $backupTable;
| END;
""".stripMargin.trim))
} else {
jdbcWrapper.executeInterruptibly(conn.prepareStatement(
s"ALTER TABLE $tempTable RENAME TO ${table.escapedTableName}"))
}
} finally {
jdbcWrapper.executeInterruptibly(conn.prepareStatement(s"DROP TABLE IF EXISTS $tempTable"))
}
}

/**
* Generate COMMENT SQL statements for the table and columns.
*/
Expand All @@ -151,23 +110,15 @@ private[redshift] class RedshiftWriter(
}

/**
* Perform the Redshift load, including deletion of existing data in the case of an overwrite,
* and creating the table if it doesn't already exist.
* Perform the Redshift load by issuing a COPY statement.
*/
private def doRedshiftLoad(
conn: Connection,
data: DataFrame,
saveMode: SaveMode,
params: MergedParameters,
creds: AWSCredentials,
manifestUrl: Option[String]): Unit = {

// Overwrites must drop the table, in case there has been a schema update
if (saveMode == SaveMode.Overwrite) {
jdbcWrapper.executeInterruptibly(
conn.prepareStatement(s"DROP TABLE IF EXISTS ${params.table.get}"))
}

// If the table doesn't exist, we need to create it first, using JDBC to infer column types
val createStatement = createTableSql(data, params)
log.info(createStatement)
Expand All @@ -190,8 +141,11 @@ private[redshift] class RedshiftWriter(
jdbcWrapper.executeInterruptibly(conn.prepareStatement(copyStatement))
} catch {
case e: SQLException =>
log.error("SQLException thrown while running COPY query; will attempt to retrieve " +
"more information by querying the STL_LOAD_ERRORS table", e)
// Try to query Redshift's STL_LOAD_ERRORS table to figure out why the load failed.
// See http://docs.aws.amazon.com/redshift/latest/dg/r_STL_LOAD_ERRORS.html for details.
conn.rollback()
val errorLookupQuery =
"""
| SELECT *
Expand Down Expand Up @@ -374,6 +328,12 @@ private[redshift] class RedshiftWriter(
"For save operations you must specify a Redshift table name with the 'dbtable' parameter")
}

if (!params.useStagingTable) {
log.warn("Setting useStagingTable=false is deprecated; instead, we recommend that you " +
"drop the target table yourself. For more details on this deprecation, see" +
"https://github.com/databricks/spark-redshift/pull/157")
}

val creds: AWSCredentials =
AWSCredentialsUtils.load(params, sqlContext.sparkContext.hadoopConfiguration)

Expand All @@ -382,19 +342,35 @@ private[redshift] class RedshiftWriter(

Utils.checkThatBucketHasObjectLifecycleConfiguration(params.rootTempDir, s3ClientFactory(creds))

// Save the table's rows to S3:
val manifestUrl = unloadData(sqlContext, data, params.createPerQueryTempDir())
val conn = jdbcWrapper.getConnector(params.jdbcDriver, params.jdbcUrl, params.credentials)

conn.setAutoCommit(false)
try {
val tempDir = params.createPerQueryTempDir()
val manifestUrl = unloadData(sqlContext, data, tempDir)
if (saveMode == SaveMode.Overwrite && params.useStagingTable) {
withStagingTable(conn, params.table.get, stagingTable => {
val updatedParams = MergedParameters(params.parameters.updated("dbtable", stagingTable))
doRedshiftLoad(conn, data, saveMode, updatedParams, creds, manifestUrl)
})
} else {
doRedshiftLoad(conn, data, saveMode, params, creds, manifestUrl)
val table: TableName = params.table.get
if (saveMode == SaveMode.Overwrite) {
// Overwrites must drop the table in case there has been a schema update
jdbcWrapper.executeInterruptibly(conn.prepareStatement(s"DROP TABLE IF EXISTS $table;"))
if (!params.useStagingTable) {
// If we're not using a staging table, commit now so that Redshift doesn't have to
// maintain a snapshot of the old table during the COPY; this sacrifices atomicity for
// performance.
conn.commit()
}
}
log.info(s"Loading new Redshift data to: $table")
doRedshiftLoad(conn, data, params, creds, manifestUrl)
conn.commit()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to handle InterruptedException? i.e. can this block for a really long time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been my experience that virtually any Redshift command, COMMIT included, can block for several minutes under certain circumstances. I think the most likely cause is that the cluster has WLM parameters configured to put the connected client on a limited pool of some type, such that all commands will be queued when all slots are taken by other queries.

I have a feeling that means though, if you plan to send a ROLLBACK command in response to the interruption, that will also block for many minutes...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that it's safe for us to wrap this in order to catch InterruptedException since I don't think that it's safe to call .rollback() while the other thread is in the middle of executing commit(). Therefore, I'm going to leave this as-is for now and will revisit later if this turns out to be a problem in practice.

} catch {
case NonFatal(e) =>
try {
log.error("Exception thrown during Redshift load; will roll back transaction", e)
conn.rollback()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might a log.info or log.warn be good here, to inform anyone testing their application code that the load failed and what they're now waiting for is a rollback to finish?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

} catch {
case NonFatal(e2) =>
log.error("Exception while rolling back transaction", e2)
}
throw e
} finally {
conn.close()
}
Expand Down
12 changes: 12 additions & 0 deletions src/test/scala/com/databricks/spark/redshift/MockRedshift.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ class MockRedshift(
}
}

def verifyThatRollbackWasCalled(): Unit = {
jdbcConnections.foreach { conn =>
verify(conn, atLeastOnce()).rollback()
}
}

def verifyThatCommitWasNotCalled(): Unit = {
jdbcConnections.foreach { conn =>
verify(conn, never()).commit()
}
}

def verifyThatExpectedQueriesWereIssued(expectedQueries: Seq[Regex]): Unit = {
expectedQueries.zip(queriesIssued).foreach { case (expected, actual) =>
if (expected.findFirstMatchIn(actual).isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,20 +297,12 @@ class RedshiftSourceSuite
"usestagingtable" -> "true")

val expectedCommands = Seq(
"DROP TABLE IF EXISTS \"PUBLIC\".\"test_table_staging_.*\"".r,
"CREATE TABLE IF NOT EXISTS \"PUBLIC\".\"test_table_staging_.*\"".r,
"DELETE FROM \"PUBLIC\".\"test_table_staging_.*\" WHERE id < 100".r,
"DELETE FROM \"PUBLIC\".\"test_table_staging_.*\" WHERE id > 100".r,
"DELETE FROM \"PUBLIC\".\"test_table_staging_.*\" WHERE id = -1".r,
"COPY \"PUBLIC\".\"test_table_staging_.*\"".r,
"""
| BEGIN;
| ALTER TABLE "PUBLIC"\."test_table" RENAME TO "test_table_backup_.*";
| ALTER TABLE "PUBLIC"\."test_table_staging_.*" RENAME TO "test_table";
| DROP TABLE "PUBLIC"\."test_table_backup_.*";
| END;
""".stripMargin.trim.r,
"DROP TABLE IF EXISTS \"PUBLIC\"\\.\"test_table_staging_.*\"".r)
"DROP TABLE IF EXISTS \"PUBLIC\".\"test_table.*\"".r,
"CREATE TABLE IF NOT EXISTS \"PUBLIC\".\"test_table.*\"".r,
"DELETE FROM \"PUBLIC\".\"test_table.*\" WHERE id < 100".r,
"DELETE FROM \"PUBLIC\".\"test_table.*\" WHERE id > 100".r,
"DELETE FROM \"PUBLIC\".\"test_table.*\" WHERE id = -1".r,
"COPY \"PUBLIC\".\"test_table.*\"".r)

source.createRelation(testSqlContext, SaveMode.Overwrite, params, expectedDataDF)
mockRedshift.verifyThatExpectedQueriesWereIssued(expectedCommands)
Expand All @@ -324,19 +316,11 @@ class RedshiftSourceSuite
"distkey" -> "testint")

val expectedCommands = Seq(
"DROP TABLE IF EXISTS \"PUBLIC\"\\.\"test_table_staging_.*\"".r,
("CREATE TABLE IF NOT EXISTS \"PUBLIC\"\\.\"test_table_staging.*" +
"DROP TABLE IF EXISTS \"PUBLIC\"\\.\"test_table.*\"".r,
("CREATE TABLE IF NOT EXISTS \"PUBLIC\"\\.\"test_table.*" +
" DISTSTYLE KEY DISTKEY \\(testint\\).*").r,
"COPY \"PUBLIC\"\\.\"test_table_staging_.*\"".r,
"GRANT SELECT ON \"PUBLIC\"\\.\"test_table_staging.+\" TO jeremy".r,
"""
| BEGIN;
| ALTER TABLE "PUBLIC"\."test_table" RENAME TO "test_table_backup_.*";
| ALTER TABLE "PUBLIC"\."test_table_staging_.*" RENAME TO "test_table";
| DROP TABLE "PUBLIC"\."test_table_backup_.*";
| END;
""".stripMargin.trim.r,
"DROP TABLE IF EXISTS \"PUBLIC\"\\.\"test_table_staging_.*\"".r)
"COPY \"PUBLIC\"\\.\"test_table.*\"".r,
"GRANT SELECT ON \"PUBLIC\"\\.\"test_table\" TO jeremy".r)

val mockRedshift = new MockRedshift(
defaultParams("url"),
Expand Down Expand Up @@ -374,6 +358,8 @@ class RedshiftSourceSuite
testSqlContext, df, SaveMode.Append, Parameters.mergeParameters(defaultParams))
}
mockRedshift.verifyThatConnectionsWereClosed()
mockRedshift.verifyThatCommitWasNotCalled()
mockRedshift.verifyThatRollbackWasCalled()
mockRedshift.verifyThatExpectedQueriesWereIssued(Seq.empty)
}

Expand All @@ -383,23 +369,22 @@ class RedshiftSourceSuite
val mockRedshift = new MockRedshift(
defaultParams("url"),
Map(TableName.parseFromEscaped("test_table").toString -> TestUtils.testSchema),
jdbcQueriesThatShouldFail = Seq("COPY \"PUBLIC\".\"test_table_staging_.*\"".r))
jdbcQueriesThatShouldFail = Seq("COPY \"PUBLIC\".\"test_table.*\"".r))

val expectedCommands = Seq(
"DROP TABLE IF EXISTS \"PUBLIC\".\"test_table_staging_.*\"".r,
"CREATE TABLE IF NOT EXISTS \"PUBLIC\".\"test_table_staging_.*\"".r,
"COPY \"PUBLIC\".\"test_table_staging_.*\"".r,
".*FROM stl_load_errors.*".r,
"DROP TABLE IF EXISTS \"PUBLIC\".\"test_table_staging_.*\"".r
"DROP TABLE IF EXISTS \"PUBLIC\".\"test_table.*\"".r,
"CREATE TABLE IF NOT EXISTS \"PUBLIC\".\"test_table.*\"".r,
"COPY \"PUBLIC\".\"test_table.*\"".r,
".*FROM stl_load_errors.*".r
)

val source = new DefaultSource(mockRedshift.jdbcWrapper, _ => mockS3Client)
intercept[Exception] {
source.createRelation(testSqlContext, SaveMode.Overwrite, params, expectedDataDF)
mockRedshift.verifyThatConnectionsWereClosed()
mockRedshift.verifyThatExpectedQueriesWereIssued(expectedCommands)
}
mockRedshift.verifyThatConnectionsWereClosed()
mockRedshift.verifyThatCommitWasNotCalled()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also verify that rollback was called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

mockRedshift.verifyThatRollbackWasCalled()
mockRedshift.verifyThatExpectedQueriesWereIssued(expectedCommands)
}

Expand Down