-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Add WriterFactory #2873
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Add WriterFactory #2873
Conversation
|
|
||
| public <T> DataWriter<T> build() throws IOException { | ||
| Preconditions.checkArgument(spec != null, "Cannot create data writer without spec"); | ||
| Preconditions.checkArgument(spec.isUnpartitioned() || partition != null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra validation I originally had in the write factory impl but I think it makes sense to move it here.
spark/src/main/java/org/apache/iceberg/spark/source/SparkWriterFactory.java
Show resolved
Hide resolved
| private final Schema positionDeleteRowSchema; | ||
| private final SortOrder positionDeleteSortOrder; | ||
|
|
||
| protected BaseWriterFactory(Table table, FileFormat dataFileFormat, Schema dataSchema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will be the common usage pattern of this method? The default of these parameters are coming from the table and if some configuration is set, then the values are overwritten? If so, maybe we would like to have a builder which helps with it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an abstract class that should be extended by query engine integrations. There is SparkWriterFactory that extends this class and that one has a builder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this builder be very different for different implementations, or would it worth to have the common parts factored out for a basebuilder? Or we will have only a few implementations, and it does not worth the effort?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can try prototyping a common builder tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, it does not seem very clean as we need to provide an accessor method for each argument, which makes the implementation kind of bulky.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd probably keep it separate for now.
|
|
||
| @Test | ||
| public void testPositionDeleteWriterWithRow() throws IOException { | ||
| Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know why ORC deletes are left out? Any serious blockers, or just there were no interest yet to implement them?
Thanks, Peter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think just because we still don't provide the correct ORC positional/equality delete writers, but I'm not sure whether there is a potential blockers that prevent us to provide ORC positional/equality delete writer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No blockers that I know about. Just that I didn't have the time to update ORC for the feature when I added it.
| * Creates a new {@link DataWriter}. | ||
| * | ||
| * @param file the output file | ||
| * @param spec the partition spec written data belongs to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there possible that we will write data/delete files to an existing historical partition spec ? I mean almost all the cases, we will produce data/delete files in the latests partition spec, so when opening new writers among different parallelize tasks, there seems no need to pass an extra partition spec with the same value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is actually a use case for that in Spark and I think it applies to other engines too. Imagine we have a table with multiple specs. In Spark, we plan to project _spec and _partition metadata columns. Whenever we a delete file, we have to use the partition spec the referenced data rows belong to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I can understand the point that we want to write the delete files and data files in the same partition for a given partition spec. But I still don't get the point what's the use case that spark will write data files or delete files into an historical partition spec. Are there any other issues or PRs that I missed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider the following sequence of actions:
- create table (ts, category, data) partitioned by days(ts)
- day 1 adds some data files
- alter table add partition field category
- day 2 adds some data files
- delete some data where day=1 and data=xxx, in this case the delete writer should use the old partition spec. If writing using the latest spec, it would produce 1 delete file per category.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, @jackye1995 's comment answered my question perfectly. Make sense !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jackye1995's example is correct. The reason why we need to project _spec and _partition for all rows is that we may need to write using multiple specs within a txn. In the considered example, we need to use the old spec for deleting records in DAY 1 and we may need to use the current spec for deleting records in DAY 2 (all records for DAY 2 are actually written using the new spec).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To sum up, the spec for deletes should match the data spec of rows we reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing to keep in mind is that when encoding deletes, we need to encode them for all of the partition specs that are in use in the table. In @jackye1995's example, I think the predicate needs to be added to spec=1/day=1 and spec=2/day=1/category=* because deletes aren't scoped to just the current spec and there could be data in the new spec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue, that applies to equality deletes and upsert use cases, right? In case of MERGE INTO, we know the source row spec id and its partition value by querying _spec and _partition metadata columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more thing: the deletes can also be added to the unpartitioned spec for global deletes. That avoids the need to find all of the categories that exist in this example.
| EncryptionKeyMetadata keyMetadata = file.keyMetadata(); | ||
| Map<String, String> properties = table.properties(); | ||
|
|
||
| // TODO: build and pass a correct metrics config for equality deletes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To simplify the configuration keys, I think the equality delete writer could use the same metric configs as the data writer, because in theory the equality delete files has a column set which is actually a subset of the user defined table schema columns.
But for positional delete writers, it's a different user story. Because its columns are file_path and offset, the file_path shouldn't be truncated even if people set a write.metadata.metrics.default=truncate(16) because we should filter the data files to join as much as possible for efficient.
I'm OK to propose another separate PR to address the above issues for metric configs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree about using the data metric configuration for equality deletes. It wasn't there in SparkAppenderFactory but I think we can add it now. I'll do that in this PR and then we can address position deletes later.
| } | ||
|
|
||
| @Test | ||
| public void testEqualityDeleteWriterWithMultipleSpecs() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this new unit test, looks good to me !
| SortOrder dataSortOrder, FileFormat deleteFileFormat, | ||
| int[] equalityFieldIds, Schema equalityDeleteRowSchema, | ||
| SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema, | ||
| SortOrder positionDeleteSortOrder) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is positionDeleteSortOrder a configurable input? Shouldn't it always be sorted based on the spec definition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure a position delete is necessarily sorted. I thought it can be unsorted too, even though we will probably ensure it is sorted while writing from Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the spec says it is:
The rows in the delete file must be sorted by
file_paththenpositionto optimize filtering rows while scanning.
If not sorted, I believe that Deletes.streamingFilter algorithm would break:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jackye1995 is correct, the positional delete files are always sorted by the pair <file_path, row_offset>, pls take a look at this PR: https://github.com/apache/iceberg/pull/1858/files#diff-179b5fea5d3aef7c16dd6104c17e1dc53ac9067f13695f3469d21e96c323eb97R126. So for positional delete files we don't need an extra SortOrder now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, you are right. Will we ever want to include data columns from row in the sort order in position deletes?
Thoughts, @openinx @jackye1995?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I don't think we would ever need the position delete order to change. The order is fixed so that we can always merge deletes while reading (why it's ordered by _pos), and so that we can merge delete files using streams as well (that's why it's first ordered by _file).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed with @rdblue , we don't need the positionDeleteSortOrder, this need to be removed in the next PR update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll drop it for now then.
| int[] equalityFieldIds, Schema equalityDeleteRowSchema, | ||
| SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema, | ||
| SortOrder positionDeleteSortOrder) { | ||
| this.table = table; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since table is a mutable instance and its lifecycle is binded to the catalog connection's lifecycle. Take hive catalog as an example, If someone create a table like the following:
Table table;
try(Catalog catalog = loadCatalog){
table = catalog.loadTable(...);
}And then we pass the table through the writer path, what concern me is people may use this table instance to access the latest table state (for example table.refresh() to get the latest partition spec or SortOrder etc) for their purpose - which will lead to the interruption as the catalog catalog has been closed. That's why we did not introduce pass an iceberg Table to the underlying writer path because it's hard to check whether the catalog connection has been closed or not. So I recommend to just pass the detailed parameters that we need to construct the delete/data writers, such as table properties, schema, partitionSpec, sortOrder etc, rather than providing a whole Table instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the Table passed in here is supposed to be an instance of the SerializableTable and refreshing is not allowed. So the problem described should not exist. But maybe we should make this explicit, at least in the doc of the end factory builder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's helpful to hold a table for the classes that extend this class, to easily retrieve attributes like table schema and table properties for initializing this BaseWriterFactory constructor, but I think within this class itself, it might be more clear to explicitly state the required attributes (I believe in this case, just the table property map) than passing the entire table object that contains some duplications of the fields already passing in, to ensure people who modify this class later don't retrieve those attributes from this table object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jackye1995 is correct. We have introduced SerializableTable so that we can pass a read-only table object and have access to needed fields. Previously, we had to modify a lot of places every time we needed to pass a new entity to the writer. Passing a read-only object is cleaner.
openinx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost looks good to me now, I think it's ready to go if we fixed the newly introduced minor comments. @aokolnychyi you can decide whether it's OK to get this merged.
|
Thanks for the initial review round, I'll update the PR later today. |
|
@openinx, I've updated the PR to set the metrics config for equality deletes in Parquet. Unfortunately, |
|
One open question is about position delete sort order here. |
| return parquetBuilder.buildEqualityWriter(); | ||
|
|
||
| default: | ||
| throw new UnsupportedOperationException("Unsupported format for equality deletes: " + deleteFileFormat); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to fail or should we just fall back to Avro?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say we better be explicit here. This will be validated early enough and I think it is safer to rely on the user to pick the format for deletes.
That being said, I expect we will implement the ORC support soon. Should be just temporary.
| EncryptionKeyMetadata keyMetadata = file.keyMetadata(); | ||
| Map<String, String> properties = table.properties(); | ||
|
|
||
| // TODO: build and pass a correct metrics config for position deletes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. Seems like we should move MetricsConfig to use column IDs by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we haven been coming back to this over and over again.
rdblue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me other than the position delete sort order.
|
Thanks for reviewing, everyone! |
This PR adds a new interface called
WriterFactoryfor creating data and delete writers.This change also includes
BaseWriterFactoryto be shared by query engine integrations and a Spark writer factory.