-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33577][SS] Add support for V1Table in stream writer table API and create table if not exist by default #30521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
Outdated
Show resolved
Hide resolved
|
Test build #131874 has finished for PR 30521 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very skeptical. It seems like we want to break the semantic of CreateTableStatement and do something special if it's issued by DataStreamWriter. We shouldn't encourage it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you plan to do with this special createBy option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I planed to leave a way for distinguishing the table created by streaming or batch. But yes, we should not do anything special on the table side. Removed in 82430ea.
|
In streaming query we only do append - there's no other options for creating table if we handle it. I don't think it's a difficult requirement for end users to create table in prior, hence I'd in favor of dealing with existing table only. That's also why I'm actually in favor of Furthermore, I see we're still putting lots of efforts in V1 table (most likely file (streaming) sink), instead of finding the reason we can't migrate file (streaming) sink to V2 and resolving it. (Probably #29066 would help unblocking it?) I roughly remember we said external data sources leveraging streaming V1 sink is not a support range, and V1 sink lacks the functionalities tied with the output mode - you are not even able to do truncate even the mode is "complete". I'm not sure this is a good direction. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think leveraging the old (probably DSv1) options is not sufficient - this doesn't have full coverage on DSv2 table - no Transform on partitioning, no properties, no options.
Using source (via format(...)) as USE <provider> is also not intuitive - it is only effective when table creation is taking place, and it occurs implicitly.
Please compare the usage with creating table on DataFrameWriterV2. I still think this worths having V2 writer for streaming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using source (via format(...)) as USE <provider> is also not intuitive - it is only effective when table creation is taking place, and it occurs implicitly.
Yes, this is indeed a reasonable concern. We should check the source and provider. Especially when they are different. Done in b6393ba and UT added.
That's not necessarily true. You can perform complete mode writes, which overwrites the entire data every time. I'd argue that the semantics for
Users are LAAAAZZY. As a developer, I would also prefer that people explicitly create their tables first, but plenty of users complain about that workflow. Sometimes they want to perform some debugging, get started immediately. They don't want to perform 2-3 steps to create the table first.
Can't we parse the string partitions as expressions? If they're making a function call such as |
Sorry probably I wasn't clear. This isn't true for DSv1 Sink interface unless data source does the hack to require providing output mode to the data source option directly. You have no idea of output mode in DSv1, and that's what I have been concerned about. Output mode is effectively no-op at least for behavior on DSv1 sink. For the backward compatibility we allow to do update/complete as append, but that's just to not break backward compatibility on old data sources and we shouldn't continue doing this. I've already raised related discussion in dev. mailing list months ago, but no response. I wish we don't ignore the discussion thread in dev mailing list.
I agree about this, but user are not always wanted to create a table if it doesn't exist. That's the reason there's
My bad, probably you're talking about DSv2. Even in DataFrameWriter we don't do that (please correct me if I'm mistaken) - please refer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's throw AnalysisException like other assertions in this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd rather add require to assert table.provider.isDefined instead (even it's only possible for view according to code comment of CatalogTable), just for defensive programming. No strong opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, agree with both points. Done in b6393ba.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Writing to V1 table shouldn't rely on normalizedParCols, but then you'll be stuck how to provide Array[Transform] (provided by CatalogTable) to Seq[String].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see ResolveSessionCatalog.buildCatalogTable() leverages partitioning.asPartitionColumns - that could be used here to set partitionBy. Not beauty to deal with putting provider and partitioning manually here instead of letting analyzer does it, but as there's no logical node for Streaming write, I guess there's no option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we shouldn't set partitionBy here since that will ignore the original setting of partitioningColumns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So are we completely ignoring the table's partition information? I don't think this is same as DSv2 path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We didn't ignore. IMO, the partition information should be handled at the DataSource level, no matter V1/V2. Different DS should have their own ability and strategy to handle the partition overwrite/append issue(part discussion of schema evolution). So for the API level, we need to pass both information down to the runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please find the usage of normalizedParCols in DataStreamWriter. This config is only effective in DSv1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got the point. So your point focus on how DSv2 addressing the partition column conflict. My point is want to explain we shouldn't overwrite the user's input, we need to pass both(user-input and table catalog partition info) into the data source. Data source need to know both(user-input and table catalog partition info). I already added this and further discussion to SPARK-33638's comment since it's part of full support for the V2 table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I missed this. I'm not sure we provide the both into data source, but data source is probably able to know about the partitioning (as table partitioning is given by data source), so consider this as minor and make a follow-up if necessary. In anyway you'll want to check this to achieve my review comments on documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we provide the both into data source
My bad, didn't make it clear. We don't provide both, only pass the user-provided one(in V1). The partitioning in the catalog is able to know in a data source as you said. I need to change we need to pass both to data source need to know both(we need to pass the user-provided partitioning). Let me rephrase the last comment to make it clear.
Yes, of cause, a follow-up is needed.
|
Summarizing my review comments, I have lots of concerns on creating table, but only have a few concerns on writing to V1 table. I can make a compromise for the writing to V1 table, but not for creating table. That requires proper design which fully covers both DSv1 and DSv2 (or DSv2 if we give up and take only one), and IMO users should have a way to avoid unintentionally creating a table even the table doesn't exist. |
|
Ideally we need to handle these cases:
The current PR can't cover case 2 but I don't know how common it is for streaming users. Adding a |
|
My comments remain the same. If we can address them (full support on v2 create table, don't provide the "only" option "create table if exist") in DataStreamWriter without making it complicated I'm OK with it. (Though the complication looks to worth splitting out.) Both must be addressed - I don't think case 2 is rare which can be ignored. |
For supporting case 2: |
We pass the output mode to the sink here:
Could you clarify this? We create an output path if it doesn't exist since the beginning in streaming, and I have not heard complaints about this. Why not make the behavior of creating table consistent with creating an output path? |
|
Test build #132049 has finished for PR 30521 at commit
|
My bad, please disregard about the sentence
We strongly recommend (known to be a best practice if I'm not mistaken) to create a Kafka topic in prior to run the streaming query as in many cases creating topic by default configuration is tend to be not sufficient (most probably num of partitions), and same here I haven't heard complaints about this. Fair enough?
I don't think both needs to be consistent. It needs to be consistent with batch path, and there's explicitly "append mode". It should be just possible to not create table even if the table doesn't exist. I have commented multiple times to explain about why we should not create table by default and how DataStreamWriter cannot cover all cases, so I'll just quote my comments instead.
In overall, I don't see any deep consideration about v2 table here, whereas my initial rationalization of adding the API was to enable support v2 table. Can we please stop thinking only on v1 table and ensure we also cover v2 table? |
|
I'd be OK to do this if we can make a change a bit:
|
|
Test build #132063 has finished for PR 30521 at commit
|
The
I don't remember this recommendation. Did I miss any document? Kafka's
I was talking about the streaming case. IMO, table can be viewed as an alias of a path. I think making the streaming write for both table and path consistent makes more sense.
I doubt we can support v2 table perfectly in the existing DataStreamWriter. It's likely we would need to add DataStreamWriterV2 similar to DataFrameWriterV2. I prefer to focus on making v1 table work since the file format in streaming doesn't support DSv2, and the file format is the most common case for tables. |
truncate is defined as overwrite with where condition is literally true. We are talking about the same, and my point is that the availability is checked by Spark. If that's not a big deal, OK.
That is limited to the file based tables - with DSv2 you should be able to match anything feasible with table. I see some efforts were done to support JDBC specific catalog even in community, and there was a talk about applying DSv2 with Cassandra. We can even create a Kafka specific catalog, which I considered a bit but stuck about schema as we wouldn't want to continue providing just key and value in binary form even for Kafka table. For me,
That has been the main concern. The @cloud-fan Can we please consider this again?
IMO supporting DSv2 for file formats is what we need to spend efforts to fix ASAP. If DSv2 lacks something so cannot be done, we should see what is missing and also fix. DSv1 streaming API isn't officially supported - it's behind the private package. That said we are not dogfooding with the DSv2 which is the only the official way to implement data source from the ecosystem. With documenting that DataStreamWriter doesn't fully support DSv2 and "promising" DataStreamWriterV2 (TODO comment in codebase, JIRA issue, etc.) I'm OK with tolerate it as of now. As I said, DataFrameWriter already is. Just there's an alternative for the batch query whereas there's no alternative for now for the streaming query. |
|
I'll clarify the initial rationalization of the API Even we have added DSv2 which supports streaming write, there has been no way to do streaming write by the table name. That is more concerning on data source with supporting catalog - even we have tables in catalog, we couldn't do the streaming write and have to rely on the old way (format & path) which is nonsense, so I had to add the API. That was initially proposed to That said, the API was proposed to cover DSv2 case, specifically for the data source which supports catalog. Focusing on v1 table doesn't match the initial rationalization. If it's going to support creating table, it should cover DSv2 as the first class. Further addition as well. |
### What changes were proposed in this pull request? As the discussion in #30521 (comment), rename the API to `toTable`. ### Why are the changes needed? Rename the API for further extension and accuracy. ### Does this PR introduce _any_ user-facing change? Yes, it's an API change but the new API is not released yet. ### How was this patch tested? Existing UT. Closes #30571 from xuanyuanking/SPARK-32896-follow. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
|
Kubernetes integration test status success |
|
Kubernetes integration test status failure |
|
Test build #132087 has finished for PR 30521 at commit
|
|
Test build #132089 has finished for PR 30521 at commit
|
|
Sorry for the misunderstanding. I thought you could agree with the current approach after adding comments/todo in code per #30521 (comment). Summarizing the current status:
Personally, I prefer not to add a default param in the API if possible since it should be natural on the streaming side to follow the existing streaming sink behavior, e.g., Kafka creates a topic by default per #30521 (comment) and FileStreamSink creates a path per #30521 (comment). |
|
I think I have been answered lots of opinions, especially providing objection of the base assumption Revisit the @cloud-fan comment #30521 (comment) I can take the suggestion but with that I don't believe case 2 in @cloud-fan comment is rare. Either we should have two different methods like @cloud-fan proposed ( We shouldn't restrict the behavior what users will be affected by. |
|
To make clear about the meaning of SPARK-33638: The existence of SPARK-33638 means you must provide the way to avoid creating DSv2 table even if that doesn't exist. SPARK-33638 says current interface is insufficient for DSv2 table, and it's nonsense to create DSv2 table automatically with insufficient information. |
|
Em... I just reviewed my comment in #30521 (comment). I didn't mention the assumption. Actually, I almost finish the code for the default flag, but it might be better not to add more API or extra flag based on the current sink behavior. Sorry, maybe we have a misunderstanding about the meaning of SPARK-33638. I fully agree with you on the support for the V2 table, so I created the Jira and kept adding the discussion in this PR to the Jira comment. I created SPARK-33638 to track the issue we need to solve for the V2 support, but this PR is for V1. I don't think if there's something that can't be supported in the V2 table now, then we need to block V1 support. Do you agree? (Here I give an issue that needs further discussion for the V2 support. If you don't care about the detail, please ignore. The discussion was also commented in SPARK-33638) So we can mark down all the issues we already found and fix them one by one in SPARK-33638. |
@HeartSaVioR could you please make it clear what we should to get consensus? If I understand correctly, we were discussing whether creating the table if not exist is a good behavior for users or not. For this, I think Kafka (this was raised by you actually) and FileStreamSink are two supportive data points that creating the table if not exist is a good behavior. At least, it shows more users would like Spark to create tables for them. Of cause, it's obvious that it's impossible to make a default behavior that all users would be happen. We need to make a tradeoff between:
IMO, the first one is better as more users can get benefit from this behavior. Do you agree that this is a better default behavior?
Regarding this, this is not an assumption. Sorry, I was not clear. I meant |
|
Apologize misunderstood you. That was not an assumption, got it. My request has been clear, respect v2 table. v2 tables are disregarded in this discussion even I made clear multiple times this API was intended to support write v2 catalog table with streaming query which "wasn't possible". From the view of v2 table, create table by default is unhappy for sure. Even the table has partitioning information, end users should make an assumption of "what if the table doesn't exist?" and either add create table all the time or add partitioning information to write path all the time. This is even no longer possible if they use non-identity transform. Same for table properties, but for the table properties we even don't give a chance to provide. So their only workaround is ensuring create table is made all the time, which is not a thing I can agree with. Similar problem happens in batch side when you use DataFrameWriter without SaveMode.Append, but I don't claim it's wrong because 1) there's still SaveMode.Append which doesn't enforce creating table 2) there's reserved API for providing full support of v2 table. Eventually streaming path must provide the same, but 2) could be deferred via SPARK-33638. 1) should be made now. That defines the lowest bar:
Additionally, filing SPARK-33638 doesn't mean I am OK to not address lack of support on v2 table. That's just deferred because time is running out for 3.1.0. For the symmetric support across batch and streaming, SPARK-33638 should probably be a blocker for Spark 3.2.0. |
I don't know which is the best way to handle. It seems both are not good. The configuration of partition columns was added without table support. Without table support, we don't know whether data source has such partition information or not, so we're forced to "always" provide the information, even it's unnecessary. With existing table, the table should have partition information in prior, hence the configuration is useless unless we mean to create table. In DataFrameWriterV2, once you provide the partition information or table property, you are no longer able to do append. You are forced to create or replace, which should always respect the input or simply fail. There's no confusion on such part. More and more I revisit DataFrameWriterV2, more and more I realize how much DataStreamWriter is lacking on table support. That was OK (shouldn't be blamed), because there was no support on table write, but that's no longer an excuse once we are adding it. Simple table write was also OK as we should just follow the table information, but we're now creating table as well. Anyway it would be safer to follow how we do with SaveMode.Append in DataFrameWriter. |
For such users, they still need to ensure creating table is made all the time if we don't create it for them. Right? And we would ask other users to create table all the time as well. Does creating the table by default block any future improvement we can do? It sounds to me that you agree that DataStreamWriter is lacking on table support, and we need to add a new DataStreamWriterV2 similar to DataFrameWriterV2. I don't see this behavior would make any work we may do in future harder. |
|
There are three types of users:
IMO, since creating the table automatically makes the first type of users write less code and doesn't change the codes for the second type and the third type, why not do this? In addition, as I already pointed out, this is consistent with existing Kafka sink and file stream sink. Regarding the implementation part, I totally understand the limitations you pointed out. But as I said above, I don't see this behavior will make any differences for future work we will do. IMO, these limitations are not good reasons to block this. |
I'm not sure why you're still considering these users as "second class". We're affecting ecosystem data sources which are happily adopting the shiny new DSv2 API with their efforts. (Despite things are like a beta testing.) Saying again, DSv1 streaming writer interface is "behind" the private package and ecosystem is "not" encouraged to leverage it. (Just like saying we don't officially support.) The root problem is that file table is still v1, not something else. I tend to concern about the "surprise" moment (if we can imagine in prior). I concern more about the possibility of table being created mistakenly without proper options. That's not a trade-off for usability. Even I stepped back about default behavior if we really want to retain only one method, but still enable end users to claim avoiding creating table. Is it too hard for us to do that?
I see we're trying to think v1 vs v2, but in any moment, we must ensure the interface is reasonable for both v1 and v2. That said, if DataStreamWriterV2 is marked as blocker for 3.1.0 I'm totally OK with it (as the lack must be addressed before releasing 3.1.0), but we're not expecting it. Right? Please correct me if I'm mistaken - I'm happy to go with dealing with SPARK-33638 in QA period. If you guys strongly insist to have only one method which creates table by default and no workaround other than create table by end users, let's just disable automatic creating table for v2 table (throw exception if the table doesn't exist), and mark SPARK-33638 as blocker for 3.2.0. For sure, the lack of functionality must be documented in javadoc. |
|
Honestly I'm fine with adding the necessary options (taking transforms, and table properties) for creating V2 tables into DataStreamWriter. I don't think we need a new API like DataStreamWriterV2, because we haven't made or ran into the same problems that |
I didn't say this. I meant these users need to create the table before starting the query no matter which behavior we decide.
Could you give an example? For people familiar with DataFrameWriterV2, when they try to use APIs (such as
If you meant adding a new method
Totally agree that we should document the limitations. |
OK I agree with this. Probably I've messed up with everything as there're bunch of inputs from different folks and I had to defense. My bad.
I would say that's not good on UX hence I even haven't thought about it, but I agree it still makes sense. That could be a thing to tolerate, as we will document the limitations as well.
I'm not sure I understand. Could you please elaborate about which options do you have in mind? I don't expect us to struggle with adding something in There's already inconsistency in DataStreamWriter - the configuration We should resolve the confusion, and the effects should be also documented in javadoc as well, creating table vs table exists, DSv1 vs DSv2 (4 different situations should be all documented). So there're different opinions across different folks -
I see there's strong claim about the 2 - I'll step back this as well, with claim that we should stop making more confusions on the DataStreamWriter (DataStreamWriterV2 should be designed), and SPARK-33638 is a blocker for 3.2.0. Even we define it as a blocker for 3.2.0, it will be available on public probably at 2nd half of next year at earliest. There's a gap, but if we at least make a strong promise, I'm OK with that. I'm sorry but strong promise only means making it as blocker. We tend to defer something and promise handling it later, but in many cases we struggle with others and completely forget it. SPARK-27237 was the one. |
|
Agree to make SPARK-33638 a blocker. I will keep working on the V2 table support after the 3.1 release. If there's something difficult on design and we can't make it, I think we can revert the V1 support or whole Already changed SPARK-33638 to a blocker. Do you agree to unblock this PR and merge it to 3.1 before code freeze? |
|
For the SPARK-27237, my LGTM still stands. Maybe we can merge it first since the 3.1 code freeze is 1 hour left... Let me leave a comment there. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I stepped back from my concerns, and it looks OK with minors.
Documentation issue still stands, but we can file a blocker issue to follow-up, as it's not something making this waiting for another minor release.
+1 with follow-up.
| val tableName = "stream_test" | ||
| withTable(tableName) { | ||
| // The file written by batch will not be seen after the table was written by a streaming | ||
| // query. This is because we loads files from the metadata log instead of listing them |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: load instead of loads, but as it's a nit, OK to fix in follow-up PR.
| val tableName = "stream_test" | ||
| withTable(tableName) { | ||
| // The file written by batch will not be seen after the table was written by a streaming | ||
| // query. This is because we loads files from the metadata log instead of listing them |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
| val tableName = "stream_test" | ||
| withTable(tableName) { | ||
| // The file written by batch will not be seen after the table was written by a streaming | ||
| // query. This is because we loads files from the metadata log instead of listing them |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
|
Let me clear of follow-up items in my perspective:
Once this is merged either I or @xuanyuanking could file a follow-up JIRA issue. |
|
@HeartSaVioR Thanks for the detailed comment. Already file a follow-up JIRA SPARK-33659. Please check. |
|
Thanks for the quick reaction. SPARK-33659 looks OK to me. Given the urgency of branch cut, I'll merge this. |
|
Thanks for the contribution! I merged into master. |
…toTable API ### What changes were proposed in this pull request? Follow up work for #30521, document the following behaviors in the API doc: - Figure out the effects when configurations are (provider/partitionBy) conflicting with the existing table. - Document the lack of functionality on creating a v2 table, and guide that the users should ensure a table is created in prior to avoid the behavior unintended/insufficient table is being created. ### Why are the changes needed? We didn't have full support for the V2 table created in the API now. (TODO SPARK-33638) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Document only. Closes #30885 from xuanyuanking/SPARK-33659. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…toTable API ### What changes were proposed in this pull request? Follow up work for #30521, document the following behaviors in the API doc: - Figure out the effects when configurations are (provider/partitionBy) conflicting with the existing table. - Document the lack of functionality on creating a v2 table, and guide that the users should ensure a table is created in prior to avoid the behavior unintended/insufficient table is being created. ### Why are the changes needed? We didn't have full support for the V2 table created in the API now. (TODO SPARK-33638) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Document only. Closes #30885 from xuanyuanking/SPARK-33659. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: HyukjinKwon <[email protected]> (cherry picked from commit 86c1cfc) Signed-off-by: HyukjinKwon <[email protected]>
What changes were proposed in this pull request?
After SPARK-32896, we have table API for stream writer but only support DataSource v2 tables. Here we add the following enhancements:
Why are the changes needed?
Make the API covers more use cases. Especially for the file provider based tables.
Does this PR introduce any user-facing change?
Yes, new features added.
How was this patch tested?
Add new UTs.