-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Add position and equality delta writer interfaces #3176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.io; | ||
|
|
||
| import java.io.IOException; | ||
| import org.apache.iceberg.PartitionSpec; | ||
| import org.apache.iceberg.StructLike; | ||
| import org.apache.iceberg.deletes.PositionDelete; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
|
|
||
| public class BasePositionDeltaWriter<T> implements PositionDeltaWriter<T> { | ||
|
|
||
| private final PartitioningWriter<T, DataWriteResult> dataWriter; | ||
| private final PartitioningWriter<PositionDelete<T>, DeleteWriteResult> deleteWriter; | ||
| private final PositionDelete<T> positionDelete; | ||
|
|
||
| private boolean closed; | ||
|
|
||
| public BasePositionDeltaWriter(PartitioningWriter<T, DataWriteResult> dataWriter, | ||
| PartitioningWriter<PositionDelete<T>, DeleteWriteResult> deleteWriter) { | ||
| Preconditions.checkArgument(dataWriter != null, "Data writer cannot be null"); | ||
| Preconditions.checkArgument(deleteWriter != null, "Delete writer cannot be null"); | ||
|
|
||
| this.dataWriter = dataWriter; | ||
| this.deleteWriter = deleteWriter; | ||
| this.positionDelete = PositionDelete.create(); | ||
| } | ||
|
|
||
| @Override | ||
| public void insert(T row, PartitionSpec spec, StructLike partition) { | ||
| dataWriter.write(row, spec, partition); | ||
| } | ||
|
|
||
| @Override | ||
| public void delete(CharSequence path, long pos, T row, PartitionSpec spec, StructLike partition) { | ||
| positionDelete.set(path, pos, row); | ||
| deleteWriter.write(positionDelete, spec, partition); | ||
| } | ||
|
|
||
| @Override | ||
| public WriteResult result() { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am using the existing |
||
| Preconditions.checkState(closed, "Cannot get result from unclosed writer"); | ||
|
|
||
| DataWriteResult dataWriteResult = dataWriter.result(); | ||
| DeleteWriteResult deleteWriteResult = deleteWriter.result(); | ||
|
|
||
| return WriteResult.builder() | ||
| .addDataFiles(dataWriteResult.dataFiles()) | ||
| .addDeleteFiles(deleteWriteResult.deleteFiles()) | ||
| .addReferencedDataFiles(deleteWriteResult.referencedDataFiles()) | ||
| .build(); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| if (!closed) { | ||
| dataWriter.close(); | ||
| deleteWriter.close(); | ||
|
|
||
| this.closed = true; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.io; | ||
|
|
||
| import java.io.Closeable; | ||
| import org.apache.iceberg.DataFile; | ||
| import org.apache.iceberg.DeleteFile; | ||
| import org.apache.iceberg.PartitionSpec; | ||
| import org.apache.iceberg.StructLike; | ||
|
|
||
| /** | ||
| * A writer capable of writing data and equality deletes that may belong to different specs and partitions. | ||
| * | ||
| * @param <T> the row type | ||
| */ | ||
| public interface EqualityDeltaWriter<T> extends Closeable { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one will be implemented by the CDC writer that I will submit in a separate PR. It is large. |
||
|
|
||
| /** | ||
| * Inserts a row to the provided spec/partition. | ||
| * | ||
| * @param row a data record | ||
| * @param spec a partition spec | ||
| * @param partition a partition or null if the spec is unpartitioned | ||
| */ | ||
| void insert(T row, PartitionSpec spec, StructLike partition); | ||
|
|
||
| /** | ||
| * Deletes a row from the provided spec/partition. | ||
| * <p> | ||
| * This method assumes the delete record has the same schema as the rows that will be inserted. | ||
| * | ||
| * @param row a delete record | ||
| * @param spec a partition spec | ||
| * @param partition a partition or null if the spec is unpartitioned | ||
| */ | ||
| void delete(T row, PartitionSpec spec, StructLike partition); | ||
|
|
||
| /** | ||
| * Deletes a key from the provided spec/partition. | ||
| * <p> | ||
| * This method assumes the delete key contains values only for equality fields. | ||
| * | ||
| * @param key a delete key | ||
| * @param spec a partition spec | ||
| * @param partition a partition or null if the spec is unpartitioned | ||
| */ | ||
| void deleteKey(T key, PartitionSpec spec, StructLike partition); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rdblue, this is |
||
|
|
||
| /** | ||
| * Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s. | ||
| * The result is valid only after the writer is closed. | ||
| * | ||
| * @return the writer result | ||
| */ | ||
| WriteResult result(); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.io; | ||
|
|
||
| import java.io.Closeable; | ||
| import org.apache.iceberg.DataFile; | ||
| import org.apache.iceberg.DeleteFile; | ||
| import org.apache.iceberg.PartitionSpec; | ||
| import org.apache.iceberg.StructLike; | ||
|
|
||
| /** | ||
| * A writer capable of writing data and position deletes that may belong to different specs and partitions. | ||
| * | ||
| * @param <T> the row type | ||
| */ | ||
| public interface PositionDeltaWriter<T> extends Closeable { | ||
|
|
||
| /** | ||
| * Inserts a row to the provided spec/partition. | ||
| * | ||
| * @param row a data record | ||
| * @param spec a partition spec | ||
| * @param partition a partition or null if the spec is unpartitioned | ||
| */ | ||
| void insert(T row, PartitionSpec spec, StructLike partition); | ||
|
|
||
| /** | ||
| * Deletes a position in the provided spec/partition. | ||
| * | ||
| * @param path a data file path | ||
| * @param pos a position | ||
| * @param spec a partition spec | ||
| * @param partition a partition or null if the spec is unpartitioned | ||
| */ | ||
| default void delete(CharSequence path, long pos, PartitionSpec spec, StructLike partition) { | ||
| delete(path, pos, null, spec, partition); | ||
| } | ||
|
|
||
| /** | ||
| * Deletes a position in the provided spec/partition and records the deleted row in the delete file. | ||
| * | ||
| * @param path a data file path | ||
| * @param pos a position | ||
| * @param row a deleted row | ||
| * @param spec a partition spec | ||
| * @param partition a partition or null if the spec is unpartitioned | ||
| */ | ||
| void delete(CharSequence path, long pos, T row, PartitionSpec spec, StructLike partition); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rdblue, I kept the optional |
||
|
|
||
| /** | ||
| * Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s. | ||
| * The result is valid only after the writer is closed. | ||
| * | ||
| * @return the writer result | ||
| */ | ||
| WriteResult result(); | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the writer to use for Spark merge-on-read.