-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Add WriterFactory #2873
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Add WriterFactory #2873
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -321,6 +321,8 @@ public DataWriteBuilder withSortOrder(SortOrder newSortOrder) { | |
|
|
||
| public <T> DataWriter<T> build() throws IOException { | ||
| Preconditions.checkArgument(spec != null, "Cannot create data writer without spec"); | ||
| Preconditions.checkArgument(spec.isUnpartitioned() || partition != null, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra validation I originally had in the write factory impl but I think it makes sense to move it here. |
||
| "Partition must not be null when creating data writer for partitioned spec"); | ||
|
|
||
| FileAppender<T> fileAppender = appenderBuilder.build(); | ||
| return new DataWriter<>(fileAppender, FileFormat.AVRO, location, spec, partition, keyMetadata, sortOrder); | ||
|
|
@@ -428,6 +430,10 @@ public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws IOException { | |
| Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids"); | ||
| Preconditions.checkState(createWriterFunc != null, | ||
| "Cannot create equality delete file unless createWriterFunc is set"); | ||
| Preconditions.checkArgument(spec != null, | ||
| "Spec must not be null when creating equality delete writer"); | ||
| Preconditions.checkArgument(spec.isUnpartitioned() || partition != null, | ||
| "Partition must not be null for partitioned writes"); | ||
|
|
||
| meta("delete-type", "equality"); | ||
| meta("delete-field-ids", IntStream.of(equalityFieldIds) | ||
|
|
@@ -446,6 +452,10 @@ public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws IOException { | |
|
|
||
| public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException { | ||
| Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids"); | ||
| Preconditions.checkArgument(spec != null, | ||
| "Spec must not be null when creating position delete writer"); | ||
| Preconditions.checkArgument(spec.isUnpartitioned() || partition != null, | ||
| "Partition must not be null for partitioned writes"); | ||
|
|
||
| meta("delete-type", "position"); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.io; | ||
|
|
||
| import org.apache.iceberg.PartitionSpec; | ||
| import org.apache.iceberg.StructLike; | ||
| import org.apache.iceberg.deletes.EqualityDeleteWriter; | ||
| import org.apache.iceberg.deletes.PositionDeleteWriter; | ||
| import org.apache.iceberg.encryption.EncryptedOutputFile; | ||
|
|
||
| /** | ||
| * A factory for creating data and delete writers. | ||
| */ | ||
| public interface WriterFactory<T> { | ||
openinx marked this conversation as resolved.
Show resolved
Hide resolved
openinx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /** | ||
| * Creates a new {@link DataWriter}. | ||
| * | ||
| * @param file the output file | ||
| * @param spec the partition spec written data belongs to | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there possible that we will write data/delete files to an existing historical partition spec ? I mean almost all the cases, we will produce data/delete files in the latests partition spec, so when opening new writers among different parallelize tasks, there seems no need to pass an extra partition spec with the same value.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is actually a use case for that in Spark and I think it applies to other engines too. Imagine we have a table with multiple specs. In Spark, we plan to project
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I can understand the point that we want to write the delete files and data files in the same partition for a given partition spec. But I still don't get the point what's the use case that spark will write data files or delete files into an historical partition spec. Are there any other issues or PRs that I missed ?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider the following sequence of actions:
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, @jackye1995 's comment answered my question perfectly. Make sense !
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jackye1995's example is correct. The reason why we need to project
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To sum up, the spec for deletes should match the data spec of rows we reference.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another thing to keep in mind is that when encoding deletes, we need to encode them for all of the partition specs that are in use in the table. In @jackye1995's example, I think the predicate needs to be added to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rdblue, that applies to equality deletes and upsert use cases, right? In case of MERGE INTO, we know the source row spec id and its partition value by querying
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One more thing: the deletes can also be added to the unpartitioned spec for global deletes. That avoids the need to find all of the categories that exist in this example. |
||
| * @param partition the partition written data belongs to or null if the spec is unpartitioned | ||
| * @return the constructed data writer | ||
| */ | ||
| DataWriter<T> newDataWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition); | ||
|
|
||
| /** | ||
| * Creates a new {@link EqualityDeleteWriter}. | ||
| * | ||
| * @param file the output file | ||
| * @param spec the partition spec written deletes belong to | ||
| * @param partition the partition written deletes belong to or null if the spec is unpartitioned | ||
openinx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| * @return the constructed equality delete writer | ||
| */ | ||
| EqualityDeleteWriter<T> newEqualityDeleteWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition); | ||
|
|
||
| /** | ||
| * Creates a new {@link PositionDeleteWriter}. | ||
| * | ||
| * @param file the output file | ||
| * @param spec the partition spec written deletes belong to | ||
| * @param partition the partition written deletes belong to or null if the spec is unpartitioned | ||
| * @return the constructed position delete writer | ||
| */ | ||
| PositionDeleteWriter<T> newPositionDeleteWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition); | ||
aokolnychyi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,253 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.data; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.UncheckedIOException; | ||
| import java.util.Map; | ||
| import org.apache.iceberg.FileFormat; | ||
| import org.apache.iceberg.MetricsConfig; | ||
| import org.apache.iceberg.PartitionSpec; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.SortOrder; | ||
| import org.apache.iceberg.StructLike; | ||
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.avro.Avro; | ||
| import org.apache.iceberg.deletes.EqualityDeleteWriter; | ||
| import org.apache.iceberg.deletes.PositionDeleteWriter; | ||
| import org.apache.iceberg.encryption.EncryptedOutputFile; | ||
| import org.apache.iceberg.encryption.EncryptionKeyMetadata; | ||
| import org.apache.iceberg.io.DataWriter; | ||
| import org.apache.iceberg.io.OutputFile; | ||
| import org.apache.iceberg.io.WriterFactory; | ||
| import org.apache.iceberg.orc.ORC; | ||
| import org.apache.iceberg.parquet.Parquet; | ||
|
|
||
| /** | ||
| * A base writer factory to be extended by query engine integrations. | ||
| */ | ||
| public abstract class BaseWriterFactory<T> implements WriterFactory<T> { | ||
| private final Table table; | ||
| private final FileFormat dataFileFormat; | ||
| private final Schema dataSchema; | ||
| private final SortOrder dataSortOrder; | ||
| private final FileFormat deleteFileFormat; | ||
| private final int[] equalityFieldIds; | ||
| private final Schema equalityDeleteRowSchema; | ||
| private final SortOrder equalityDeleteSortOrder; | ||
| private final Schema positionDeleteRowSchema; | ||
|
|
||
| protected BaseWriterFactory(Table table, FileFormat dataFileFormat, Schema dataSchema, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What will be the common usage pattern of this method? The default of these parameters are coming from the table and if some configuration is set, then the values are overwritten? If so, maybe we would like to have a builder which helps with it?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an abstract class that should be extended by query engine integrations. There is
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would this builder be very different for different implementations, or would it worth to have the common parts factored out for a basebuilder? Or we will have only a few implementations, and it does not worth the effort?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can try prototyping a common builder tomorrow.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, it does not seem very clean as we need to provide an accessor method for each argument, which makes the implementation kind of bulky.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd probably keep it separate for now. |
||
| SortOrder dataSortOrder, FileFormat deleteFileFormat, | ||
| int[] equalityFieldIds, Schema equalityDeleteRowSchema, | ||
| SortOrder equalityDeleteSortOrder, Schema positionDeleteRowSchema) { | ||
| this.table = table; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since Table table;
try(Catalog catalog = loadCatalog){
table = catalog.loadTable(...);
}And then we pass the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's helpful to hold a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jackye1995 is correct. We have introduced |
||
| this.dataFileFormat = dataFileFormat; | ||
| this.dataSchema = dataSchema; | ||
| this.dataSortOrder = dataSortOrder; | ||
| this.deleteFileFormat = deleteFileFormat; | ||
| this.equalityFieldIds = equalityFieldIds; | ||
| this.equalityDeleteRowSchema = equalityDeleteRowSchema; | ||
| this.equalityDeleteSortOrder = equalityDeleteSortOrder; | ||
| this.positionDeleteRowSchema = positionDeleteRowSchema; | ||
| } | ||
|
|
||
| protected abstract void configureDataWrite(Avro.DataWriteBuilder builder); | ||
| protected abstract void configureEqualityDelete(Avro.DeleteWriteBuilder builder); | ||
| protected abstract void configurePositionDelete(Avro.DeleteWriteBuilder builder); | ||
|
|
||
| protected abstract void configureDataWrite(Parquet.DataWriteBuilder builder); | ||
| protected abstract void configureEqualityDelete(Parquet.DeleteWriteBuilder builder); | ||
| protected abstract void configurePositionDelete(Parquet.DeleteWriteBuilder builder); | ||
|
|
||
| // TODO: provide ways to configure ORC delete writers once we support them | ||
| protected abstract void configureDataWrite(ORC.DataWriteBuilder builder); | ||
aokolnychyi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| @Override | ||
| public DataWriter<T> newDataWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition) { | ||
| OutputFile outputFile = file.encryptingOutputFile(); | ||
| EncryptionKeyMetadata keyMetadata = file.keyMetadata(); | ||
| Map<String, String> properties = table.properties(); | ||
| MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); | ||
|
|
||
| try { | ||
| switch (dataFileFormat) { | ||
| case AVRO: | ||
| Avro.DataWriteBuilder avroBuilder = Avro.writeData(outputFile) | ||
| .schema(dataSchema) | ||
| .setAll(properties) | ||
| .metricsConfig(metricsConfig) | ||
| .withSpec(spec) | ||
| .withPartition(partition) | ||
| .withKeyMetadata(keyMetadata) | ||
| .withSortOrder(dataSortOrder) | ||
| .overwrite(); | ||
|
|
||
| configureDataWrite(avroBuilder); | ||
|
|
||
| return avroBuilder.build(); | ||
|
|
||
| case PARQUET: | ||
| Parquet.DataWriteBuilder parquetBuilder = Parquet.writeData(outputFile) | ||
| .schema(dataSchema) | ||
| .setAll(properties) | ||
| .metricsConfig(metricsConfig) | ||
| .withSpec(spec) | ||
| .withPartition(partition) | ||
| .withKeyMetadata(keyMetadata) | ||
| .withSortOrder(dataSortOrder) | ||
| .overwrite(); | ||
|
|
||
| configureDataWrite(parquetBuilder); | ||
|
|
||
| return parquetBuilder.build(); | ||
|
|
||
| case ORC: | ||
| ORC.DataWriteBuilder orcBuilder = ORC.writeData(outputFile) | ||
| .schema(dataSchema) | ||
| .setAll(properties) | ||
| .metricsConfig(metricsConfig) | ||
| .withSpec(spec) | ||
| .withPartition(partition) | ||
| .withKeyMetadata(keyMetadata) | ||
| .withSortOrder(dataSortOrder) | ||
| .overwrite(); | ||
|
|
||
| configureDataWrite(orcBuilder); | ||
|
|
||
| return orcBuilder.build(); | ||
|
|
||
| default: | ||
| throw new UnsupportedOperationException("Unsupported data file format: " + dataFileFormat); | ||
| } | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException(e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public EqualityDeleteWriter<T> newEqualityDeleteWriter(EncryptedOutputFile file, PartitionSpec spec, | ||
| StructLike partition) { | ||
| OutputFile outputFile = file.encryptingOutputFile(); | ||
| EncryptionKeyMetadata keyMetadata = file.keyMetadata(); | ||
| Map<String, String> properties = table.properties(); | ||
| MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); | ||
|
|
||
| try { | ||
| switch (deleteFileFormat) { | ||
| case AVRO: | ||
| // TODO: support metrics configs in Avro equality delete writer | ||
|
|
||
| Avro.DeleteWriteBuilder avroBuilder = Avro.writeDeletes(outputFile) | ||
| .setAll(properties) | ||
| .rowSchema(equalityDeleteRowSchema) | ||
| .equalityFieldIds(equalityFieldIds) | ||
| .withSpec(spec) | ||
| .withPartition(partition) | ||
| .withKeyMetadata(keyMetadata) | ||
| .withSortOrder(equalityDeleteSortOrder) | ||
| .overwrite(); | ||
|
|
||
| configureEqualityDelete(avroBuilder); | ||
|
|
||
| return avroBuilder.buildEqualityWriter(); | ||
|
|
||
| case PARQUET: | ||
| Parquet.DeleteWriteBuilder parquetBuilder = Parquet.writeDeletes(outputFile) | ||
| .setAll(properties) | ||
| .metricsConfig(metricsConfig) | ||
| .rowSchema(equalityDeleteRowSchema) | ||
| .equalityFieldIds(equalityFieldIds) | ||
| .withSpec(spec) | ||
| .withPartition(partition) | ||
| .withKeyMetadata(keyMetadata) | ||
| .withSortOrder(equalityDeleteSortOrder) | ||
| .overwrite(); | ||
|
|
||
| configureEqualityDelete(parquetBuilder); | ||
|
|
||
| return parquetBuilder.buildEqualityWriter(); | ||
|
|
||
| default: | ||
| throw new UnsupportedOperationException("Unsupported format for equality deletes: " + deleteFileFormat); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to fail or should we just fall back to Avro?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd say we better be explicit here. This will be validated early enough and I think it is safer to rely on the user to pick the format for deletes. That being said, I expect we will implement the ORC support soon. Should be just temporary. |
||
| } | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException("Failed to create new equality delete writer", e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public PositionDeleteWriter<T> newPositionDeleteWriter(EncryptedOutputFile file, PartitionSpec spec, | ||
| StructLike partition) { | ||
| OutputFile outputFile = file.encryptingOutputFile(); | ||
| EncryptionKeyMetadata keyMetadata = file.keyMetadata(); | ||
| Map<String, String> properties = table.properties(); | ||
|
|
||
| // TODO: build and pass a correct metrics config for position deletes | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm. Seems like we should move
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, we haven been coming back to this over and over again. |
||
|
|
||
| try { | ||
| switch (deleteFileFormat) { | ||
| case AVRO: | ||
| Avro.DeleteWriteBuilder avroBuilder = Avro.writeDeletes(outputFile) | ||
| .setAll(properties) | ||
| .rowSchema(positionDeleteRowSchema) | ||
| .withSpec(spec) | ||
| .withPartition(partition) | ||
| .withKeyMetadata(keyMetadata) | ||
| .overwrite(); | ||
|
|
||
| configurePositionDelete(avroBuilder); | ||
|
|
||
| return avroBuilder.buildPositionWriter(); | ||
|
|
||
| case PARQUET: | ||
| Parquet.DeleteWriteBuilder parquetBuilder = Parquet.writeDeletes(outputFile) | ||
| .setAll(properties) | ||
| .rowSchema(positionDeleteRowSchema) | ||
| .withSpec(spec) | ||
| .withPartition(partition) | ||
| .withKeyMetadata(keyMetadata) | ||
| .overwrite(); | ||
|
|
||
| configurePositionDelete(parquetBuilder); | ||
|
|
||
| return parquetBuilder.buildPositionWriter(); | ||
|
|
||
| default: | ||
| throw new UnsupportedOperationException("Unsupported format for position deletes: " + deleteFileFormat); | ||
| } | ||
|
|
||
| } catch (IOException e) { | ||
| throw new UncheckedIOException("Failed to create new position delete writer", e); | ||
| } | ||
| } | ||
|
|
||
| protected Schema dataSchema() { | ||
| return dataSchema; | ||
| } | ||
|
|
||
| protected Schema equalityDeleteRowSchema() { | ||
| return equalityDeleteRowSchema; | ||
| } | ||
|
|
||
| protected Schema positionDeleteRowSchema() { | ||
| return positionDeleteRowSchema; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.