Skip to content

Conversation

@aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Aug 5, 2021

This PR has the following contributions:

  • Support for writing to multiple specs.
  • Support for StructLike in partitioned writers. Previously, we always assumed we can derive PartitionKey from data. That may not be the case for some operations as partitions may come from a metadata column.
  • A new DeltaWriter interface for writing data and deletes.
  • CDC writer that keeps track of records across partitions and can delete a key inserted in any partition.
  • More types of delete writers (e.g. clustered/fanout equality/position deletes).
  • Composition over inheritance to simplify reuse of writers.

New interfaces are added to core with an example of how they can be consumed in Spark 3.

Writer

The first major proposed API is the Writer interface that defines a contract for writing a number of files of a single type within one spec/partition. Existing DataWriter, EqualityDeleteWriter, PositionDeleteWriter classes are the simplest implementations of that API.

Then we have RollingWriter that implements Writer and wraps another writer to split the incoming records into multiple files within one spec/partition. We have RollingDataWriter, RollingEqualityDeleteWriter, RollingPositionDeleteWriter as actual implementations.

PartitionAwareWriter

All Writer implementations are limited to writing to a single spec/partition. To support writes to multiple specs and partitions, we have PartitionAwareWriter. In Iceberg, we support two types of writes: fanout and clustered. That’s why I am proposing to add ClusteredWriter and FanoutWriter. On one hand, ClusteredWriter will write to multiple specs and partitions ensuring the incoming data is properly clustered. On the other hand, FanoutWriter will keep a number of writers open and will not require a particular order of data. ClusteredWriter is very similar to our existing PartitionedWriter but it also detects changes in the spec, not only in partition values.

DeltaTaskWriter

This PR also introduces a new DeltaTaskWriter interface that will be used by query engine integrations.

@aokolnychyi
Copy link
Contributor Author

cc @openinx @stevenzwu @RussellSpitzer @rdblue @cwsteinbach @danielcweeks @kbendick @karuppayya @flyrain @pvary @jackye1995 @yyanyy @szehon-ho @rymurr @jun-he

This PR is still work in progress but I'd like to get some early feedback. I've benchmarked new writers locally using async-profiler. I'll clean the benchmarking code later and push here. Once we agree on the APIs, I'll split the PR into smaller chunks and add tests.

As it is a large change, I'd advise to start with Writer, then PartitionAwareWriter, then TaskWriter and DeltaTaskWriter.

import org.apache.iceberg.util.StructLikeMap;
import org.apache.iceberg.util.StructProjection;

public class CDCTaskWriter<T> extends BaseDeltaTaskWriter<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is essentially a copy of the existing BaseEqualityDeltaWriter but using new abstractions.

/**
* A writer capable of writing to multiple specs and partitions ensuring the incoming records are properly clustered.
*/
public abstract class ClusteredWriter<T, R> implements PartitionAwareWriter<T, R> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming suggestions are welcome.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like "clustered" since that's the assumption.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming has a slight conflict with the "sorted" position delete writer. That writer keeps deletes in memory and sorts them prior to writing them out. That's for the CDC use case where position deletes may be in any order, rather than the MERGE use case where we should be able to ask the engine to produce deletes in the expected file/pos order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, any ideas here? The main distinction is that ClusteredWriter writes to multiple specs/partitions and just checks the data is clustered. That writer actually sorts position deletes and writes only to a single spec/partition.

import org.apache.iceberg.util.StructLikeSet;

/**
* A writer capable of writing to multiple specs and partitions ensuring the incoming records are properly clustered.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is similar to our existing PartitionedWriter but with some notable differences I'll mention below.


@Override
public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
if (!spec.equals(currentSpec)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We support writing across multiple specs now. Background here.

protected abstract R aggregatedResult();

@Override
public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of deriving a partition struct through PartitionKey, the new writer accepts StructLike. In some cases, this will come from a metadata column instead.

currentPartition = partition != null ? StructCopy.copy(partition) : null;
currentWriter = newWriter(currentSpec, currentPartition);

} else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This place is important as it is invoked for every single row. Previously, we used equals in PartitionKey. In this PR, I am using our struct comparator as the passed StructLike may not necessarily be PartitionKey.

My benchmarks show this is on par with the previous implementation.

}

// copy the partition key as the key object is reused
currentPartition = partition != null ? StructCopy.copy(partition) : null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using StructCopy instead of PartitionKey#copy for the same reasons as above. This is less critical and is invoked only when we detect a partition change (i.e. NOT for every row).


currentSpec = spec;
partitionComparator = Comparators.forType(partitionType);
completedPartitions = StructLikeSet.create(partitionType);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using StructLikeSet instead of a regular Set as we may get arbitrary StructLike implementations. This set is not going to be checked for every row so I don't worry a lot about performance here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the original relied on PartitionKey to handle hashing and so it was safe. That was before we introduced StructLikeSet, which is probably the easier way to go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using PartitionKey is not even an option. For delete files, the partition struct will come from a metadata column, not by applying PartitionKey on data. It seems StructLikeSet is a nice solution and since we are not calling it for every single row, it should not be much more expensive.

private Writer<T, R> writer(PartitionSpec spec, StructLike partition) {
Map<StructLike, Writer<T, R>> specWriters = writers.computeIfAbsent(
spec.specId(),
id -> StructLikeMap.create(spec.partitionType()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using StructLikeMap here.

import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.PositionDelete;

public class MixedDeltaTaskWriter<T> extends BaseDeltaTaskWriter<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming TBD.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what we are going to use in Spark to write deltas during MERGE INTO.

/**
* A writer capable of writing files of a single type (i.e. data/delete) to multiple specs and partitions.
*/
public interface PartitionAwareWriter<T, R> extends Closeable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I made this extend Writer. However, that required introducing PartitionAwareRow that would wrap a row, spec, partition. Benchmarks showed that we spend some extra time wrapping every single row. I think the performance is more important in this case.

/**
* A rolling writer capable of splitting incoming data/deletes into multiple files within one spec/partition.
*/
public abstract class RollingWriter<T, W extends Writer<T, R>, R> implements Writer<T, R> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to our existing BaseRollingWriter.

@aokolnychyi aokolnychyi force-pushed the v2-writers-proto-v3 branch from 6a82e84 to 8fd5525 Compare August 6, 2021 00:09
PartitionSpec spec = table.spec();
FileIO io = table.io();

OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, partitionId, taskId)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of how to use the new writers in Spark 3.

import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;

public interface V2TaskWriter<T> extends Closeable {
Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have V2 in the name temporarily. We already have TaskWriter and I did not want to change that in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, we better call it something else and keep the old hierarchy for now.

@rdblue rdblue requested review from openinx and rdblue August 6, 2021 15:22
@aokolnychyi
Copy link
Contributor Author

Here are some benchmark numbers for writing 2.5 million records (flat schema, 7 columns). I am using bucketing with 32 buckets on an int column for partitioned writes.

Benchmark                                                                       Mode  Cnt   Score   Error  Units
TaskWriterParquetBenchmark.writePartitionedDataNewFanoutWriter                    ss    5  10.432 ± 0.382   s/op
TaskWriterParquetBenchmark.writePartitionedDataOldFanoutWriter                    ss    5  11.315 ± 0.345   s/op
TaskWriterParquetBenchmark.writePartitionedDataNewWriter                          ss    5  11.416 ± 0.994   s/op
TaskWriterParquetBenchmark.writePartitionedDataOldWriter                          ss    5  11.331 ± 0.238   s/op
TaskWriterParquetBenchmark.writePartitionedEqualityDeleteNewWriter                ss    5  11.795 ± 1.553   s/op
TaskWriterParquetBenchmark.writeUnpartitionedDataNewWriter                        ss    5  10.736 ± 1.058   s/op
TaskWriterParquetBenchmark.writeUnpartitionedDataOldWriter                        ss    5  10.501 ± 2.084   s/op
TaskWriterParquetBenchmark.writeUnpartitionedEqualityDeleteNewWriter              ss    5   9.935 ± 0.166   s/op
TaskWriterParquetBenchmark.writeUnpartitionedPositionDeleteWithoutRowNewWriter    ss    5   8.833 ± 0.791   s/op

Memory-wise it is very similar. Here is an example.

TaskWriterParquetBenchmark.writePartitionedDataNewWriter:·gc.alloc.rate                                            ss    5         177.302 ±        17.914  MB/sec
TaskWriterParquetBenchmark.writePartitionedDataNewWriter:·gc.churn.G1_Eden_Space                                   ss    5         136.865 ±        12.818  MB/sec
TaskWriterParquetBenchmark.writePartitionedDataNewWriter:·gc.churn.G1_Old_Gen                                      ss    5           5.411 ±         0.646  MB/sec
TaskWriterParquetBenchmark.writePartitionedDataOldWriter:·gc.alloc.rate                                            ss    5         177.730 ±        11.985  MB/sec
TaskWriterParquetBenchmark.writePartitionedDataOldWriter:·gc.churn.G1_Eden_Space                                   ss    5         137.768 ±        21.407  MB/sec
TaskWriterParquetBenchmark.writePartitionedDataOldWriter:·gc.churn.G1_Old_Gen                                      ss    5           5.420 ±         0.892  MB/sec

@Override
protected void add(DeleteWriteResult result) {
deleteFiles.addAll(result.deleteFiles());
referencedDataFiles.addAll(result.referencedDataFiles());
Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed CharSequenceSet#addAll calls in flame graphs (nothing too bad). If we want to squeeze a little bit of performance, we can replace addAll with union to avoid coping things. Alternatively, we can detect whether the set we add is also CharSequenceSet. If so, no need to unwrap and wrap records again.

I am not sure it is going to be worth it, though.

void delete(CharSequence path, long pos, T row, PartitionSpec spec, StructLike partition) throws IOException;

// position delete without persisting row
default void delete(CharSequence path, long pos, PartitionSpec spec, StructLike partition) throws IOException {
Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One potential optimization is to wrap query engine specific String representations as CharSequence. For example, we do spend some noticeable amount of time converting UTF8String into Java String whenever we retrieve the path from InternalRow. Should be investigated separately, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. UTF8String handling is something we should probably look into independently.

Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks ok to me from my limited experience of Writers.

A bit of a bummer about not able to do inheritance, but saw that it's about performance and not having to wrap the row and partitionSpec


void insert(T row, PartitionSpec spec, StructLike partition) throws IOException;

void abort() throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expectation for calling abort? Should the writer already be closed?

Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This essentially mimics the current interface we have. I am happy to reconsider having abort in our task writers. I guess the idea is to reuse the cleanup logic in all query engine integrations. We could have some utility methods instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed abort and it looks like we don't really need V2TaskWriter anymore. So I removed that interface completely and just kept DeltaWriter.

}

@Override
public void write(T row) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to throw IOException if the current writer does not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, no. I guess IOException was just defined in the parent interface.

appender.add(row);
}

@Deprecated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't expose those deleteAll and delete methods in the public iceberg-api module, so is there any neccessary to keep those deprecated API for at least a release ? I'm thinking that we could just remove those from this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll be up for that. I did not do this in this WIP PR to avoid touching more places. I think this is a low-level API which we can break.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a low-level API, but it could be used by external projects like Hive since this is the easiest way to correctly write Iceberg files. I think we should deprecate them like Anton did here.


@Override
public void write(PositionDelete<T> positionDelete) throws IOException {
pathSet.add(positionDelete.path());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think it's clear to rename pathSet as referencedDataFiles. When I check this variable at the first glance, I was thinking: pathSet ? which path set ? what's used for. I did not get it until I checked the referencedDataFiles() method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I'll do that rename when I split this into smaller PRs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the rename.

import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.PositionDelete;

public class MixedDeltaWriter<T> extends BaseDeltaWriter<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the specific case that MixedDeltaWriter will be used for ? If it's the batch UPDATE/DELETE case, then we should don't produce the equality-deletes , then I think we could just remain the delete(T row, PartitionSpec spec, StructLike partition) as Unsupported, we don't even wanna to initialize the equalityDeleteWriter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

Comment on lines +64 to +77
Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(
spec.specId(),
id -> StructLikeMap.create(spec.partitionType()));
FileWriter<T, R> writer = specWriters.get(partition);

if (writer == null) {
// copy the partition key as the key object is reused
StructLike copiedPartition = partition != null ? StructCopy.copy(partition) : null;
writer = newWriter(spec, copiedPartition);
specWriters.put(copiedPartition, writer);
}

return writer;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we maintain those writers from different partition spec and different partition into a flatten Map<K, FileWriter<T,R>> rather than a nested Map, by using a composited key <partitionSpecId, partitionData> . I think that will make the writer choose strategy quite simple, it will also simplify the ClusteredFileWriter's write method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change that. No preference from my side here.

/**
* A rolling equality delete writer that splits incoming deletes into multiple files within one spec/partition.
*/
public class RollingEqualityDeleteWriter<T> extends RollingDeleteWriter<T, EqualityDeleteWriter<T>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there necessary to share the abstracted RollingDeleteWriter between RollingEqualityDeleteWriter & RollingPositionDeleteWriter ? The key common thing is the referencedDataFiles I think, but the RollingEqualityDeleteWriter usually don't produce any referenced data files, right ? So I think we may could just make the RollingEqualityDeleteWriter extend the RollingFileWriter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to avoid implementing addResult and aggregatedResult in both but now that I look at it, it is probably not worth a separate class. I'll remove it.


@Override
protected DataWriter<T> newWriter(EncryptedOutputFile file) {
return writerFactory.newDataWriter(file, spec(), partition());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current BaseRollingWriter implementation passes the partition key to this method each time. It looks like the new classes expose spec() and partition() to avoid passing them here, since these 3 method implementations are the only place where those getters are called. I think I'd prefer dropping the 2 getter methods and just passing the spec and partition in here. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, these implementations could keep their own copies of spec and partition in the constructor to avoid it. But I'd rather not do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not pass the spec and partition on purpose as our rolling writers, as opposed to other more sophisticated writers, can write only to a single partition. If I see the spec and partition being passed, I won't know it is always the same value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion here. If you feel it is not worth it, I can surely adapt.

if (partition == null) {
this.currentFile = fileFactory.newOutputFile();
} else {
this.currentFile = fileFactory.newOutputFile(partition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should call newOutputFile(spec, partition), not this one that uses the default spec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, this version should probably be deprecated so that we can remove it once we move over to these writers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I think I did this PR before we had the new method. I'll update.


private boolean shouldRollToNewFile() {
// TODO: ORC file now not support target file size before closed
return !fileFormat.equals(FileFormat.ORC) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new structure, does this still make sense? The file format is only passed through to this point for this check, but couldn't we just use a check before creating this class?

if (fileFormat == FileFormat.ORC) {
  return new DataWriter(...);
} else {
  return new RollingDataWriter(...);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Not urgent to fix in this PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember I tried and then reverted. I look at it now and don't see why it did not work before. I'll update.

io.deleteFile(currentFile.encryptingOutputFile());
} else {
R result = currentWriter.result();
addResult(result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: do we need result or can we just call addResult(currentWriter.result())?

@Override
public void write(PositionDelete<T> positionDelete) throws IOException {
pathSet.add(positionDelete.path());
appender.add(positionDelete);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class makes an assumption similar to the ClusteredFileWriter classes. Should we name it OrderedPositionDeleteWriter instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will definitely be more descriptive. Do you think it will break anyone? This class existed for a while and we reference it in multiple places such as WriterFactory. I think either we keep the original name and deprecate old methods here and in EqualityDeleteWriter or just drop old methods and rename as needed.

}

if (completedSpecs.contains(spec.specId())) {
throw new IllegalStateException("Already closed files for spec: " + spec.specId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be a requirement. We could pass (spec1, part1), (spec2, part3), (spec1, part2) and nothing would really break other than this. Do you think it is likely that clustering by spec is going to be the case most of the time? I'm not sure if this is something we should worry about removing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this from the performance perspective to not keep a file open for each seen spec. Right now, I keep only a single file open at a time. I know we will cluster by spec in Spark merge-on-read. Do you have use cases in mind where we won't cluster by spec?

partitionComparator = Comparators.forType(partitionType);
completedPartitions = StructLikeSet.create(partitionType);
// copy the partition key as the key object is reused
currentPartition = partition != null ? StructCopy.copy(partition) : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the null handling into StructCopy instead of doing it here?

currentWriter.close();

R result = currentWriter.result();
addResult(result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useless variable, result?


@Override
protected FileWriter<T, DataWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
return new RollingDataWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSizeInBytes, spec, partition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where it would be easy to do the check to see if fileFormat is ORC and skip creating a rolling writer if it is.

id -> StructLikeMap.create(spec.partitionType()));
FileWriter<T, R> writer = specWriters.get(partition);

if (writer == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not use computeIfAbsent for the StructLikeMap as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, it's the copy.

void delete(T row, PartitionSpec spec, StructLike partition) throws IOException;

// position delete with persisting row
void delete(CharSequence path, long pos, T row, PartitionSpec spec, StructLike partition) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other APIs put the optional row last. What about moving spec and partition before row? Would that be too inconsistent with others?

import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.PositionDelete;

public class MixedDeltaWriter<T> extends BaseDeltaWriter<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs Javadoc and a better name that shows this only accepts position deletes.

import org.apache.iceberg.util.StructLikeMap;
import org.apache.iceberg.util.StructProjection;

public class CDCWriter<T> extends BaseDeltaWriter<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also needs some Javadoc to explain the context.

this.dataWriter = dataWriter;
this.equalityDeleteWriter = equalityDeleteWriter;
this.positionDeleteWriter = positionDeleteWriter;
this.positionDelete = new PositionDelete<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: PositionDelete.create()

The constructor should probably be made private.


public CDCWriter(FanoutDataWriter<T> dataWriter,
PartitionAwareFileWriter<T, DeleteWriteResult> equalityDeleteWriter,
PartitionAwareFileWriter<PositionDelete<T>, DeleteWriteResult> positionDeleteWriter,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this needs to be FanoutSortedPositionDeleteWriter because the position deletes could be in any order.

return path;
}

public long rowOffset() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called position nearly everywhere else. Why call it rowOffset here?

}
}

private static class PartitionAwarePathOffset {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this extend PositionDelete so it can be passed directly into the delete writer?

}

@Override
public void delete(T row, PartitionSpec spec, StructLike partition) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to clearly document that this assumes that the row to delete has the same schema as the rows that will be inserted. We could also have a directDelete method that passes just the equality delete columns (key). That's worth considering if you want to split the interface for equality and position delete use cases.

}
}

protected abstract void closeWriters() throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This base class is really strange to me because the actual close is abstract, but then methods to close writers are in this implementation. I think I'd probably see if I could remove this class.

@aokolnychyi
Copy link
Contributor Author

I am closing this one in favor of smaller PRs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants