From f10c0ff3b2ed292cf229d8181472a582e8e152a6 Mon Sep 17 00:00:00 2001 From: Emlyn Corrin Date: Fri, 12 Feb 2016 10:20:54 +0000 Subject: [PATCH 1/9] Allow setting comments of table and columns from metadata --- .../com/databricks/spark/redshift/Parameters.scala | 5 +++++ .../databricks/spark/redshift/RedshiftWriter.scala | 14 +++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/databricks/spark/redshift/Parameters.scala b/src/main/scala/com/databricks/spark/redshift/Parameters.scala index d8d2a64f..8a98a65b 100644 --- a/src/main/scala/com/databricks/spark/redshift/Parameters.scala +++ b/src/main/scala/com/databricks/spark/redshift/Parameters.scala @@ -205,6 +205,11 @@ private[redshift] object Parameters { */ def extraCopyOptions: String = parameters.get("extracopyoptions").getOrElse("") + /** + * Description of the table, set using the SQL COMMENT command. + */ + def description: Option[String] = parameters.get("description") + /** * List of semi-colon separated SQL statements to run before write operations. * This can be useful for running DELETE operations to clean up data diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala index dd5aed23..3c61ea09 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala @@ -138,6 +138,16 @@ private[redshift] class RedshiftWriter( } } + /** + * Generate COMMENT SQL statements for the table and columns. + */ + private def commentActions(tableComment: Option[String], schema: StructType): List[String] = { + tableComment.toList.map(desc => s"COMMENT ON TABLE %s IS '$desc'") ++ + schema.fields + .withFilter(f => f.metadata.contains("description")) + .map(f => s"""COMMENT ON COLUMN %s.${f.name} IS '${f.metadata.getString("description")}'""") + } + /** * Perform the Redshift load, including deletion of existing data in the case of an overwrite, * and creating the table if it doesn't already exist. @@ -161,8 +171,10 @@ private[redshift] class RedshiftWriter( log.info(createStatement) jdbcWrapper.executeInterruptibly(conn.prepareStatement(createStatement)) + val preActions = commentActions(params.description, data.schema) ++ params.preActions + // Execute preActions - params.preActions.foreach { action => + preActions.foreach { action => val actionSql = if (action.contains("%s")) action.format(params.table.get) else action log.info("Executing preAction: " + actionSql) jdbcWrapper.executeInterruptibly(conn.prepareStatement(actionSql)) From 9ea7896567071bd0c16f1f4f9680966487985d60 Mon Sep 17 00:00:00 2001 From: Emlyn Corrin Date: Fri, 12 Feb 2016 10:32:14 +0000 Subject: [PATCH 2/9] Allow setting column encoding from metadata --- .../databricks/spark/redshift/RedshiftJDBCWrapper.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala index cd01c807..7d9bf6b9 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala @@ -262,7 +262,12 @@ private[redshift] class JDBCWrapper { case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC") } val nullable = if (field.nullable) "" else "NOT NULL" - sb.append(s""", "${name.replace("\"", "\\\"")}" $typ $nullable""".trim) + val encoding = if (field.metadata.contains("encoding")) { + s"ENCODE ${field.metadata.getString("encoding")}" + } else { + "" + } + sb.append(s""", "${name.replace("\"", "\\\"")}" $typ $nullable $encoding""".trim) }} if (sb.length < 2) "" else sb.substring(2) } From 05078f60fbe42c5c0076cd6045ac81647e5d19bf Mon Sep 17 00:00:00 2001 From: Emlyn Corrin Date: Fri, 12 Feb 2016 11:03:01 +0000 Subject: [PATCH 3/9] Update README --- README.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/README.md b/README.md index e3e6e0ce..e5dfc9ab 100644 --- a/README.md +++ b/README.md @@ -335,6 +335,14 @@ data should the COPY fail.

Redshift cluster and/or don't have requirements to keep the table availability high.

+ + description + No + No default + +

A description for the table. Will be set using the SQL COMMENT command, and should show up in most query tools. +See also the description metadata to set descriptions on individual columns. + preactions No @@ -414,6 +422,14 @@ df.write .save() ``` +### Configuring column encoding + +When creating a table, `spark-redshift` can be configured to use a specific compression encoding on individual columns. You can use the `encoding` column metadata field to specify a compression encoding for each column (see [Amazon docs](http://docs.aws.amazon.com/redshift/latest/dg/c_Compression_encodings.html) for available encodings). + +### Setting descriptions on columns + +Redshift allows columns to have descriptions attached that should show up in most query tools (using the `COMMENT` command). You can set the `description` column metadata field to specify a description for individual columns. + ## Transactional Guarantees This section describes `spark-redshift`'s transactional guarantees. From 0066e7e16e4569bba8590f450c40690b15abc090 Mon Sep 17 00:00:00 2001 From: Emlyn Corrin Date: Fri, 12 Feb 2016 11:37:03 +0000 Subject: [PATCH 4/9] Tests --- .../spark/redshift/RedshiftWriter.scala | 3 +- .../spark/redshift/RedshiftSourceSuite.scala | 35 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala index 3c61ea09..ef1c6769 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala @@ -141,7 +141,8 @@ private[redshift] class RedshiftWriter( /** * Generate COMMENT SQL statements for the table and columns. */ - private def commentActions(tableComment: Option[String], schema: StructType): List[String] = { + private[redshift] def commentActions(tableComment: Option[String], schema: StructType): + List[String] = { tableComment.toList.map(desc => s"COMMENT ON TABLE %s IS '$desc'") ++ schema.fields .withFilter(f => f.metadata.contains("description")) diff --git a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala index 9329b68f..54aed918 100644 --- a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala @@ -445,6 +445,41 @@ class RedshiftSourceSuite assert(createTableCommand === expectedCreateTableCommand) } + test("configuring encoding on columns") { + val lzoMetadata = new MetadataBuilder().putString("encoding", "LZO").build() + val runlengthMetadata = new MetadataBuilder().putString("encoding", "RUNLENGTH").build() + val schema = StructType( + StructField("lzo_str", StringType, metadata = lzoMetadata) :: + StructField("runlength_str", StringType, metadata = runlengthMetadata) :: + StructField("default_str", StringType) :: + Nil) + val df = testSqlContext.createDataFrame(sc.emptyRDD[Row], schema) + val createTableCommand = + DefaultRedshiftWriter.createTableSql(df, MergedParameters.apply(defaultParams)).trim + val expectedCreateTableCommand = + """CREATE TABLE IF NOT EXISTS "PUBLIC"."test_table" ("lzo_str" TEXT ENCODE LZO,""" + + """ "runlength_str" TEXT ENCODE RUNLENGTH, "default_str" TEXT)""" + assert(createTableCommand === expectedCreateTableCommand) + } + + test("configuring descriptions on columns") { + val descriptionMetadata1 = new MetadataBuilder().putString("description", "Test1").build() + val descriptionMetadata2 = new MetadataBuilder().putString("description", "Test2").build() + val schema = StructType( + StructField("first_str", StringType, metadata = descriptionMetadata1) :: + StructField("second_str", StringType, metadata = descriptionMetadata2) :: + StructField("default_str", StringType) :: + Nil) + val df = testSqlContext.createDataFrame(sc.emptyRDD[Row], schema) + val commentCommands = + DefaultRedshiftWriter.commentActions(Some("Test"), schema) + val expectedCommentCommands = List( + "COMMENT ON TABLE %s IS 'Test'", + "COMMENT ON COLUMN %s.first_str IS 'Test1'", + "COMMENT ON COLUMN %s.second_str IS 'Test2'") + assert(commentCommands === expectedCommentCommands) + } + test("Respect SaveMode.ErrorIfExists when table exists") { val mockRedshift = new MockRedshift( defaultParams("url"), From d1b3bae820ca3dddc2722e03d253344ff38d9861 Mon Sep 17 00:00:00 2001 From: Emlyn Corrin Date: Fri, 12 Feb 2016 16:59:50 +0000 Subject: [PATCH 5/9] Escape quotes in columns and comments --- .../com/databricks/spark/redshift/RedshiftWriter.scala | 5 +++-- .../com/databricks/spark/redshift/RedshiftSourceSuite.scala | 6 +++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala index ef1c6769..458638ae 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala @@ -143,10 +143,11 @@ private[redshift] class RedshiftWriter( */ private[redshift] def commentActions(tableComment: Option[String], schema: StructType): List[String] = { - tableComment.toList.map(desc => s"COMMENT ON TABLE %s IS '$desc'") ++ + tableComment.toList.map(desc => s"COMMENT ON TABLE %s IS '${desc.replace("'", "''")}'") ++ schema.fields .withFilter(f => f.metadata.contains("description")) - .map(f => s"""COMMENT ON COLUMN %s.${f.name} IS '${f.metadata.getString("description")}'""") + .map(f => s"""COMMENT ON COLUMN %s."${f.name.replace("\"", "\\\"")}"""" + + s" IS '${f.metadata.getString("description").replace("'", "''")}'") } /** diff --git a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala index 54aed918..47a305d5 100644 --- a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala @@ -464,7 +464,7 @@ class RedshiftSourceSuite test("configuring descriptions on columns") { val descriptionMetadata1 = new MetadataBuilder().putString("description", "Test1").build() - val descriptionMetadata2 = new MetadataBuilder().putString("description", "Test2").build() + val descriptionMetadata2 = new MetadataBuilder().putString("description", "Test'2").build() val schema = StructType( StructField("first_str", StringType, metadata = descriptionMetadata1) :: StructField("second_str", StringType, metadata = descriptionMetadata2) :: @@ -475,8 +475,8 @@ class RedshiftSourceSuite DefaultRedshiftWriter.commentActions(Some("Test"), schema) val expectedCommentCommands = List( "COMMENT ON TABLE %s IS 'Test'", - "COMMENT ON COLUMN %s.first_str IS 'Test1'", - "COMMENT ON COLUMN %s.second_str IS 'Test2'") + "COMMENT ON COLUMN %s.\"first_str\" IS 'Test1'", + "COMMENT ON COLUMN %s.\"second_str\" IS 'Test''2'") assert(commentCommands === expectedCommentCommands) } From 8b3809094c96b1bcee8847cf609a3c2e292885e8 Mon Sep 17 00:00:00 2001 From: Emlyn Corrin Date: Mon, 15 Feb 2016 10:08:27 +0000 Subject: [PATCH 6/9] Add integration tests --- .../redshift/RedshiftIntegrationSuite.scala | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala b/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala index 9ddfc291..3c3616b8 100644 --- a/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala +++ b/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala @@ -386,6 +386,93 @@ class RedshiftIntegrationSuite extends IntegrationSuiteBase { } } + test("configuring compression on columns") { + val tableName = s"configuring_compression_on_columns_$randomSuffix" + try { + val metadata = new MetadataBuilder().putString("encoding", "LZO").build() + val schema = StructType( + StructField("x", StringType, metadata = metadata) :: Nil) + sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 512))), schema).write + .format("com.databricks.spark.redshift") + .option("url", jdbcUrl) + .option("dbtable", tableName) + .option("tempdir", tempDir) + .mode(SaveMode.ErrorIfExists) + .save() + assert(DefaultJDBCWrapper.tableExists(conn, tableName)) + val loadedDf = sqlContext.read + .format("com.databricks.spark.redshift") + .option("url", jdbcUrl) + .option("dbtable", tableName) + .option("tempdir", tempDir) + .load() + checkAnswer(loadedDf, Seq(Row("a" * 512))) + val encodingDF = sqlContext.read + .format("com.databricks.spark.redshift") + .option("url", jdbcUrl) + .option("dbtable", + s"(SELECT column, encoding FROM pg_table_def WHERE tablename='$tableName')") + .option("tempdir", tempDir) + .load() + checkAnswer(encodingDF, Seq(Row("x", "LZO"))) + } finally { + conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() + conn.commit() + } + } + + test("configuring comments on columns") { + val tableName = s"configuring_comments_on_columns_$randomSuffix" + try { + val metadata = new MetadataBuilder().putString("description", "Hello Column").build() + val schema = StructType( + StructField("x", StringType, metadata = metadata) :: Nil) + sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 512))), schema).write + .format("com.databricks.spark.redshift") + .option("url", jdbcUrl) + .option("dbtable", tableName) + .option("tempdir", tempDir) + .mode(SaveMode.ErrorIfExists) + .save() + assert(DefaultJDBCWrapper.tableExists(conn, tableName)) + val loadedDf = sqlContext.read + .format("com.databricks.spark.redshift") + .option("url", jdbcUrl) + .option("dbtable", tableName) + .option("tempdir", tempDir) + .option("description", "Hello Table") + .load() + checkAnswer(loadedDf, Seq(Row("a" * 512))) + val tableDF = sqlContext.read + .format("com.databricks.spark.redshift") + .option("url", jdbcUrl) + .option("dbtable", s"(SELECT obj_description('$tableName'::regclass))") + .option("tempdir", tempDir) + .load() + checkAnswer(tableDF, Seq(Row("Hello Table"))) + val commentQuery = + s""" + |(SELECT c.column_name, pgd.description + |FROM pg_catalog.pg_statio_all_tables st + |INNER JOIN pg_catalog.pg_description pgd + | ON (pgd.objoid=st.relid) + |INNER JOIN information_schema.columns c + | ON (pgd.objsubid=c.ordinal_position AND c.table_name=st.relname) + |WHERE c.table_name='$tableName') + """.stripMargin; + val columnDF = sqlContext.read + .format("com.databricks.spark.redshift") + .option("url", jdbcUrl) + .option("dbtable", commentQuery) + .option("tempdir", tempDir) + .load() + checkAnswer(columnDF, Seq(Row("x", "Hello Column"))) + } finally { + conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() + conn.commit() + } + } + test("informative error message when saving a table with string that is longer than max length") { val tableName = s"error_message_when_string_too_long_$randomSuffix" try { From 2e47c2cccaa3ac2e09c561aa4c996f373ed659d9 Mon Sep 17 00:00:00 2001 From: Emlyn Corrin Date: Mon, 15 Feb 2016 10:11:06 +0000 Subject: [PATCH 7/9] Remove reference to spark-redshift in README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index e5dfc9ab..e9b2dbbe 100644 --- a/README.md +++ b/README.md @@ -424,7 +424,7 @@ df.write ### Configuring column encoding -When creating a table, `spark-redshift` can be configured to use a specific compression encoding on individual columns. You can use the `encoding` column metadata field to specify a compression encoding for each column (see [Amazon docs](http://docs.aws.amazon.com/redshift/latest/dg/c_Compression_encodings.html) for available encodings). +When creating a table, this library can be configured to use a specific compression encoding on individual columns. You can use the `encoding` column metadata field to specify a compression encoding for each column (see [Amazon docs](http://docs.aws.amazon.com/redshift/latest/dg/c_Compression_encodings.html) for available encodings). ### Setting descriptions on columns From 5e429233dabe01987ab68224ad321d16e3c91013 Mon Sep 17 00:00:00 2001 From: Emlyn Corrin Date: Wed, 6 Jul 2016 21:37:19 +0100 Subject: [PATCH 8/9] Address review comments --- .../spark/redshift/RedshiftIntegrationSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala b/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala index 3c3616b8..90cb9ee9 100644 --- a/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala +++ b/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala @@ -392,7 +392,7 @@ class RedshiftIntegrationSuite extends IntegrationSuiteBase { val metadata = new MetadataBuilder().putString("encoding", "LZO").build() val schema = StructType( StructField("x", StringType, metadata = metadata) :: Nil) - sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 512))), schema).write + sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 128))), schema).write .format("com.databricks.spark.redshift") .option("url", jdbcUrl) .option("dbtable", tableName) @@ -406,12 +406,12 @@ class RedshiftIntegrationSuite extends IntegrationSuiteBase { .option("dbtable", tableName) .option("tempdir", tempDir) .load() - checkAnswer(loadedDf, Seq(Row("a" * 512))) + checkAnswer(loadedDf, Seq(Row("a" * 128))) val encodingDF = sqlContext.read .format("com.databricks.spark.redshift") .option("url", jdbcUrl) .option("dbtable", - s"(SELECT column, encoding FROM pg_table_def WHERE tablename='$tableName')") + s"""(SELECT "column", encoding FROM pg_table_def WHERE tablename='$tableName')""") .option("tempdir", tempDir) .load() checkAnswer(encodingDF, Seq(Row("x", "LZO"))) @@ -427,7 +427,7 @@ class RedshiftIntegrationSuite extends IntegrationSuiteBase { val metadata = new MetadataBuilder().putString("description", "Hello Column").build() val schema = StructType( StructField("x", StringType, metadata = metadata) :: Nil) - sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 512))), schema).write + sqlContext.createDataFrame(sc.parallelize(Seq(Row("a" * 128))), schema).write .format("com.databricks.spark.redshift") .option("url", jdbcUrl) .option("dbtable", tableName) @@ -442,11 +442,11 @@ class RedshiftIntegrationSuite extends IntegrationSuiteBase { .option("tempdir", tempDir) .option("description", "Hello Table") .load() - checkAnswer(loadedDf, Seq(Row("a" * 512))) + checkAnswer(loadedDf, Seq(Row("a" * 128))) val tableDF = sqlContext.read .format("com.databricks.spark.redshift") .option("url", jdbcUrl) - .option("dbtable", s"(SELECT obj_description('$tableName'::regclass))") + .option("dbtable", s"(SELECT pg_catalog.obj_description('$tableName'::regclass))") .option("tempdir", tempDir) .load() checkAnswer(tableDF, Seq(Row("Hello Table"))) From 88ba25acb80e679b0b6b047238e9bd3fea9fa252 Mon Sep 17 00:00:00 2001 From: Emlyn Corrin Date: Wed, 6 Jul 2016 23:01:03 +0100 Subject: [PATCH 9/9] Fix integration tests. --- .../redshift/RedshiftIntegrationSuite.scala | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala b/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala index 90cb9ee9..6555a9d2 100644 --- a/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala +++ b/src/it/scala/com/databricks/spark/redshift/RedshiftIntegrationSuite.scala @@ -408,13 +408,12 @@ class RedshiftIntegrationSuite extends IntegrationSuiteBase { .load() checkAnswer(loadedDf, Seq(Row("a" * 128))) val encodingDF = sqlContext.read - .format("com.databricks.spark.redshift") + .format("jdbc") .option("url", jdbcUrl) .option("dbtable", - s"""(SELECT "column", encoding FROM pg_table_def WHERE tablename='$tableName')""") - .option("tempdir", tempDir) + s"""(SELECT "column", lower(encoding) FROM pg_table_def WHERE tablename='$tableName')""") .load() - checkAnswer(encodingDF, Seq(Row("x", "LZO"))) + checkAnswer(encodingDF, Seq(Row("x", "lzo"))) } finally { conn.prepareStatement(s"drop table if exists $tableName").executeUpdate() conn.commit() @@ -431,6 +430,7 @@ class RedshiftIntegrationSuite extends IntegrationSuiteBase { .format("com.databricks.spark.redshift") .option("url", jdbcUrl) .option("dbtable", tableName) + .option("description", "Hello Table") .option("tempdir", tempDir) .mode(SaveMode.ErrorIfExists) .save() @@ -440,14 +440,12 @@ class RedshiftIntegrationSuite extends IntegrationSuiteBase { .option("url", jdbcUrl) .option("dbtable", tableName) .option("tempdir", tempDir) - .option("description", "Hello Table") .load() checkAnswer(loadedDf, Seq(Row("a" * 128))) val tableDF = sqlContext.read - .format("com.databricks.spark.redshift") + .format("jdbc") .option("url", jdbcUrl) .option("dbtable", s"(SELECT pg_catalog.obj_description('$tableName'::regclass))") - .option("tempdir", tempDir) .load() checkAnswer(tableDF, Seq(Row("Hello Table"))) val commentQuery = @@ -459,12 +457,11 @@ class RedshiftIntegrationSuite extends IntegrationSuiteBase { |INNER JOIN information_schema.columns c | ON (pgd.objsubid=c.ordinal_position AND c.table_name=st.relname) |WHERE c.table_name='$tableName') - """.stripMargin; + """.stripMargin val columnDF = sqlContext.read - .format("com.databricks.spark.redshift") + .format("jdbc") .option("url", jdbcUrl) .option("dbtable", commentQuery) - .option("tempdir", tempDir) .load() checkAnswer(columnDF, Seq(Row("x", "Hello Column"))) } finally {