-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 #24798
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #106170 has finished for PR 24798 at commit
|
| ident, query.schema, partitioning.toArray, properties.asJava) | ||
| writeToStagedTable(stagedTable, writeOptions, ident) | ||
| case _ => | ||
| // Note that this operation is potentially unsafe, but these are the strict semantics of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we talked about this, and we concluded that this is the appropriate behavior - but I'm still not sure it is wise to support an inherently unsafe and potentially inconsistent operation. It's worth considering if we should throw UnsupportedOperationException here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'm still on the fence about this, too.
|
|
||
| public interface StagedTable extends Table { | ||
|
|
||
| void commitStagedChanges(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not immediately obvious if this API belongs in StagedTable, or if it should be tied to the BatchWrite's commit() operation. The idea I had with tying it to StagedTable is:
- Make the atomic swap part more explicit from the perspective of the physical plan execution, and
- Allow both
StagedTableandTableto share the sameWriteBuilderandBatchWriteimplementations that persist the rows, and decouple the atomic swap in this module only.
If we wanted to move the swap implementation behind the BatchWrite#commit and BatchWrite#abort APIs, then it's worth asking if we need the StagedTable interface at all - so TransactionalTableCatalog would return plain Table objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this. So the write's commit stashes changes in the staged table, which can finish or roll back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also solves the problem of where to document how to complete the changes staged in a StagedTable. Can you add docs that describe what these methods should do, and for the StagedTable interface?
| import org.apache.spark.sql.sources.v2.StagedTable; | ||
| import org.apache.spark.sql.types.StructType; | ||
|
|
||
| public interface TransactionalTableCatalog { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TransactionalTableCatalog is proposed in the SPIP, but we don't really encode any formal notion of transactions in these APIs. Transactionality has a particular connotation in the DBMS nomenclature, e.g. START TRANSACTION statements. Perhaps we can rename this to AtomicTableCatalog or SupportsAtomicOperations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think renaming this is a good idea. How about StagingTableCatalog? The main capability it introduces is staging a table so that it can be used for a write, but doesn't yet exist.
| (AS? query)? #createHiveTable | ||
| | CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier | ||
| LIKE source=tableIdentifier locationSpec? #createTableLike | ||
| | replaceTableHeader ('(' colTypeList ')')? tableProvider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there other flavors of REPLACE TABLE that we need to support?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that we should support all of what's already here, at least not to begin with.
I think that the main use of REPLACE TABLE as an atomic operation is REPLACE TABLE ... AS SELECT. That's because the replacement should only happen if the write succeeds and the write could easily fail for a lot of reasons. Without a write, this is just syntactic sugar for a combined drop and create.
I think the initial PR should focus on just the RTAS case. That simplifies this because it no longer needs the type list. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should support the USING clause that is used to pass the provider name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because of the tableProvider field at the end I think USING is still supported right? As mentioned elsewhere, this is copied from CTAS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a test for it?
|
Test build #106171 has finished for PR 24798 at commit
|
|
Test build #106178 has finished for PR 24798 at commit
|
|
@rdblue @gatorsmile @HyukjinKwon this should be ready to go now, modulo the questions I've posted inline. |
|
Test build #106213 has finished for PR 24798 at commit
|
|
Test build #106216 has finished for PR 24798 at commit
|
| | replaceTableHeader ('(' colTypeList ')')? tableProvider | ||
| ((OPTIONS options=tablePropertyList) | | ||
| (PARTITIONED BY partitioning=transformList) | | ||
| bucketSpec | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should bucketing be added using BUCKET BY? Or should we rely on bucket as a transform in the PARTITIONED BY clause?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as #24798 (comment) - this is copied from the create table spec.
| | CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier | ||
| LIKE source=tableIdentifier locationSpec? #createTableLike | ||
| | replaceTableHeader ('(' colTypeList ')')? tableProvider | ||
| ((OPTIONS options=tablePropertyList) | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should OPTIONS be supported in v2? Right now, we copy options into table properties because v2 has no separate options. I also think it is confusing to users that there are table properties and options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general I copied this entirely from the equivalent create table statement. How does the syntax for REPLACE TABLE differ from that of the existing CREATE TABLE? My understanding is REPLACE TABLE is exactly equivalent to CREATE TABLE with the exception of not having an IF NOT EXISTS option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, it should be the same a CREATE TABLE. That's a good reason to carry this forward.
| ; | ||
|
|
||
| replaceTableHeader | ||
| : REPLACE TEMPORARY? TABLE multipartIdentifier |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd probably remove TEMPORARY to begin with. What is the behavior of a temporary table? I think it used to be a view.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it looks fine since this is not allowed in the AST builder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's get rid of TEMPORARY TABLE. It was a mistake and we've almost removed everything about TEMPORARY TABLE in Spark, only a few parser rules are left for backward compatibility reason.
To clarify, there is no TEMPORARY TABLE in Spark, it never had. Spark only has TABLE, VIEW and TEMP VIEW.
| location: Option[String], | ||
| comment: Option[String]) extends ParsedStatement { | ||
|
|
||
| override def output: Seq[Attribute] = Seq.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ParsedStatement now defaults these methods, so you can remove them.
| s"got ${other.getClass.getName}: $sql") | ||
| test("create/replace table using - schema") { | ||
| val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet" | ||
| val replaceSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
REPLACE?
| throw new TableAlreadyExistsException(ident) | ||
| } | ||
| catalog match { | ||
| case txnCatalog: TransactionalTableCatalog => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because so little is shared between the two implementations, I think I would probably separate them into different exec nodes. An added benefit of that is that the physical plan would tell users whether Spark is going to use an atomic operation. That way I could check EXPLAIN and run if it is atomic or do more testing if it is not.
| stagedTable.commitStagedChanges() | ||
| writtenRows | ||
| case _ => | ||
| // table does not support writes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also note that the catch block will abort the staged changes?
| before { | ||
| spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) | ||
| spark.conf.set( | ||
| "spark.sql.catalog.testcatatomic", classOf[TestTransactionalInMemoryCatalog].getName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: consider adding an underscore to make the catalog name more readable.
| checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source")) | ||
| } | ||
|
|
||
| test("ReplaceTableAsSelect: basic v2 implementation using atomic catalog.") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: these test cases are dense because they have no new lines. Blank lines between tasks, like creating the original table, replacing it, and assertions, would help readability.
| checkAnswer( | ||
| spark.internalCreateDataFrame(rdd, replacedTable.schema), | ||
| spark.table("source").select("id")) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of the success cases should be applied to both atomic and non-atomic catalogs because we expect a difference in behavior only in failure cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I modified some of the success cases, don't know there are more that need to be adjusted. Think we don't have to be completely exhaustive here.
| s" AS SELECT id FROM source") | ||
| } | ||
| val replacedTable = testCatalog.loadTable(Identifier.of(Array(), "table_name")) | ||
| assert(replacedTable != table, "Table should have been replaced.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a better test assertion is that the schema matches the new table. This test could be true for the same underlying metadata if two separate instances of a table are loaded from a catalog.
| } | ||
|
|
||
| test("ReplaceTableAsSelect: Non-atomic catalog creates the empty table, but leaves the" + | ||
| " table empty if the write fails.") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't the table dropped in this case? I would expect this to have the behavior of non-atomic CTAS after the initial delete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the behavior here is ambiguous. Suppose then another user went and started writing to the table concurrently - should this job drop the table that the other job is writing to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the intent of RTAS is to run a combined DROP TABLE and CREATE TABLE ... AS SELECT .... CTAS doesn't worry about concurrent writes because the table doesn't "exist" until the write completes. That's why we delete after a CTAS if the write fails, even though it also has a non-atomic case where the table exists and could technically be written to concurrently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm so I took a closer look at this. It turns out that using Utils.tryWithSafeFinallyAndFailureCallbacks is risky in all of these code paths as it is currently implemented.
That method, when it tries to run the catch block, will first try to set the failure reason on the task context via TaskContext.get().markTaskFailed. But since we're running this try...finally block on the driver, there is no such task context to get via TaskContext.get.
What happens in this case then is that this test passes when it should fail, because indeed, the table should be dropped. But the catch block that drops the table never gets run, because TaskContext.get().markTaskFailed NPEs before the catch block can be run.
I think there's a few ways forward:
- Don't use the Utils method to do try-catch-finally
- Patch the Utils method to check for null on the current task context before trying to mark the task failure reason on it.
I'm going with 2) for now, but 1) is very reasonable as well.
Either way, yeah the table should end up being dropped at the end, so this test also has to be patched.
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala
Outdated
Show resolved
Hide resolved
|
Approach and interface LGTM! +9000 on "keep[ing] commits smaller and more focused." in the future. Would really help speed up the development cycle. |
|
Test build #107756 has finished for PR 24798 at commit
|
|
Test build #107764 has finished for PR 24798 at commit
|
| * @param partitions transforms to use for partitioning data in the table | ||
| * @param properties a string map of table properties | ||
| * @return metadata for the new table | ||
| * @throws TableAlreadyExistsException If a table or view already exists for the identifier |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused here, replace table does require the table exists, right?
| * | ||
| * A new table will be created using the schema of the query, and rows from the query are appended. | ||
| * If the table exists, its contents and schema should be replaced with the schema and the contents | ||
| * of the query. This is a non-atomic implementation that drops the table and then runs non-atomic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to https://github.com/apache/spark/pull/24798/files#r302746896 , this is a broken implementation. RTAS should be able to query any existing tables, including the one that is being replaced. If we do want to have a non-atomic version, how about
- create a table with a random but unique name (like UUID), insert data to it
- drop the target table
- rename the table created in step 1 to the target table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO the non-atomic version is allowed to have undefined behavior when failure happens middle way. But it should work as the atomic version if no failure happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That comment applies only to RTAS queries that read the table that will be replaced. We can fix that in a follow-up.
| def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, String]): Unit = { | ||
| if (tableProperties.containsKey(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY) | ||
| && tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY) | ||
| .equalsIgnoreCase("true")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can just write "true".equalsIgnoreCase(tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))
if the key doesn't exist, "true".equalsIgnoreCase(null) returns false.
|
|
||
| override def commitStagedChanges(): Unit = { | ||
| if (replaceIfExists) { | ||
| tables.put(ident, delegateTable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: when committing REPLACE TABLE, we should fail if the table is already dropped by others.
|
Agree with @brkyvz that it's too late to split as this PR has already got many reviews. Please try to keep the PR smaller and more focused next time. Generally looks good, only a few comments. |
| extends StagedTable with SupportsWrite with SupportsRead { | ||
|
|
||
| override def commitStagedChanges(): Unit = { | ||
| if (droppedTables.contains(ident)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's weird to record all the dropped tables in the history. I think a simple version is
if (replaceIfExists) {
if (!tables.containsKey(ident)) {
throw new RuntimeException("table already dropped")
}
tables.put(ident, delegateTable)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That doesn't work because the implementation of stageCreate doesn't actually put the table in the tables map at all. So you can't necessarily say the table was dropped just because the table is not in the tables map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unclear to me if this is the correct behavior - if something dropped the table from underneath this, the subsequent commit of the replace or atomic-create operation should have the final say, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think about a REPLACE TABLE and DROP TABLE happen at the same time. It doesn't matter which one gets executed first, but the final result must be reachable by one certain execution order.
If REPLACE TABLE executes first, then there should be no table at the end as it's dropped.
If DROP TABLE executes first, then REPLACE TABLE should fail and there is still no table at the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW I think my proposal works for REPLACE TABLE right? stageCreate is for CTAS and I think your current code(without tracking dropped tables) already works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this a bit more and chatted with @rdblue and I realized the confusion is that the staging catalog API doesn't even support passing through the orCreate flag to the catalog. I think we need to pass this information along to the catalog, otherwise the catalog won't know that the user wanted CREATE OR REPLACE semantics.
I'm more inclined to add an extra method to StagingTableCatalog called stageCreateOrReplace, in addition to the other methods we have here already. Then the behavior of commitStagedChanges depends on whether or not the table was instantiated via stageCreateOrReplace vs. stageReplace vs. stageCreate.
|
Test build #107806 has finished for PR 24798 at commit
|
…priate semantics.
|
Latest patch adds |
|
Test build #107871 has finished for PR 24798 at commit
|
| * | ||
| * Expected format: | ||
| * {{{ | ||
| * REPLACE TABLE [IF NOT EXISTS] [db_name.]table_name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't match the actual syntax now.
| * @param properties a string map of table properties | ||
| * @return metadata for the new table | ||
| * @throws UnsupportedOperationException If a requested partition transform is not supported | ||
| * @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the implementation should throw TableNotFoundException if the table to replace doesn't exist.
| orCreate: Boolean) extends AtomicTableWriteExec { | ||
|
|
||
| override protected def doExecute(): RDD[InternalRow] = { | ||
| val stagedTable = if (catalog.tableExists(ident)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can simplify this to
val stagedTable = if (orCreate) {
catalog.stageCreateOrReplace(
ident, query.schema, partitioning.toArray, properties.asJava)
} else {
catalog.stageReplace(
ident, query.schema, partitioning.toArray, properties.asJava)
}
stageReplace should throw exception itself if the table doesn't exist. The implementation already needs to do it before committing, it doesn't hurt to also do it at the beginning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree that there is no need to check whether the table exists. We had a similar discussion on CREATE TABLE. Spark should check existence to ensure that the error is consistently thrown. If the table does not exist and orCreate is false, then Spark should thrown an exception and not rely on the source to do it.
That said, I think it would be simpler to update the logic a little:
if (orCreate) {
catalog.stageCreateOrReplace(
ident, query.schema, partitioning.toArray, properties.asJava)
} else if (catalog.tableExists(ident) {
catalog.stageReplace(
ident, query.schema, partitioning.toArray, properties.asJava)
} else {
throw new CannotReplaceMissingTableException(ident)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's minor so I don't want to block this PR on it, but Spark is unable to make sure the error is consistently thrown because anything can happen after you check the table existence and before you do the actual operation.
That said, this is just a best-effort, which is not that useful as it's not a guarantee.
|
Updated some docs and cleaned up implementations based on comments. |
| try { | ||
| catalog.stageReplace( | ||
| identifier, tableSchema, partitioning.toArray, tableProperties.asJava) | ||
| } catch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The try...catch here is more for flavor and consistency - since @cloud-fan suggested that StagingTableCatalog#stageReplace should be able to throw NoSuchTableException, which could theoretically happen if the table is dropped between the above tableExists call and catalog.stageReplace calls. This ensures that the same type of exception is thrown from the code path for the same kind of illegal state.
|
Test build #107924 has finished for PR 24798 at commit
|
|
thanks, merging to master! |
…T with V2 ## What changes were proposed in this pull request? Implements the `REPLACE TABLE` and `REPLACE TABLE AS SELECT` logical plans. `REPLACE TABLE` is now a valid operation in spark-sql provided that the tables being modified are managed by V2 catalogs. This also introduces an atomic mix-in that table catalogs can choose to implement. Table catalogs can now implement `TransactionalTableCatalog`. The semantics of this API are that table creation and replacement can be "staged" and then "committed". On the execution of `REPLACE TABLE AS SELECT`, `REPLACE TABLE`, and `CREATE TABLE AS SELECT`, if the catalog implements transactional operations, the physical plan will use said functionality. Otherwise, these operations fall back on non-atomic variants. For `REPLACE TABLE` in particular, the usage of non-atomic operations can unfortunately lead to inconsistent state. ## How was this patch tested? Unit tests - multiple additions to `DataSourceV2SQLSuite`. Closes apache#24798 from mccheah/spark-27724. Authored-by: mcheah <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Implements the
REPLACE TABLEandREPLACE TABLE AS SELECTlogical plans.REPLACE TABLEis now a valid operation in spark-sql provided that the tables being modified are managed by V2 catalogs.This also introduces an atomic mix-in that table catalogs can choose to implement. Table catalogs can now implement
TransactionalTableCatalog. The semantics of this API are that table creation and replacement can be "staged" and then "committed".On the execution of
REPLACE TABLE AS SELECT,REPLACE TABLE, andCREATE TABLE AS SELECT, if the catalog implements transactional operations, the physical plan will use said functionality. Otherwise, these operations fall back on non-atomic variants. ForREPLACE TABLEin particular, the usage of non-atomic operations can unfortunately lead to inconsistent state.How was this patch tested?
Unit tests - multiple additions to
DataSourceV2SQLSuite.