Skip to content

Conversation

@aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Sep 21, 2021

This PR adds the PartitioningWriter interface and two implementations:

  • ClusteredWriter
  • FanoutWriter

It is a subset of changes in PR #2945.

@aokolnychyi
Copy link
Contributor Author

}

private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {
Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx, I've tried to address this comment. However, this would require to maintain a map of StructLike wrappers by spec. I am not sure that will be cleaner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay now, don't have to address that comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make a PartitionMap class that works like PartitionSet for this. No need to do it right now though. I agree that we should move forward with this implementation and update it later if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, PartitionMap could a be a solution here.

writer.write(row);
}

private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compared to the old implementation, we have an extra computeIfAbsent call and using StructLikeMap instead of a regular map. The performance hit seems negligible according to benchmark results.

@aokolnychyi
Copy link
Contributor Author

I went ahead and added benchmarks to this PR.

Benchmark                                                                Mode  Cnt   Score   Error  Units
ParquetWritersBenchmark.writePartitionedClusteredDataWriter                ss    5  10.076 ± 0.261   s/op
ParquetWritersBenchmark.writePartitionedLegacyDataWriter                   ss    5  10.124 ± 0.500   s/op

ParquetWritersBenchmark.writePartitionedFanoutDataWriter                   ss    5  10.082 ± 0.371   s/op
ParquetWritersBenchmark.writePartitionedLegacyFanoutDataWriter             ss    5   9.971 ± 0.322   s/op

ParquetWritersBenchmark.writeUnpartitionedClusteredDataWriter              ss    5   9.075 ± 0.458   s/op
ParquetWritersBenchmark.writeUnpartitionedLegacyDataWriter                 ss    5   8.981 ± 0.292   s/op

ParquetWritersBenchmark.writePartitionedClusteredEqualityDeleteWriter      ss    5  10.136 ± 0.389   s/op
ParquetWritersBenchmark.writeUnpartitionedClusteredPositionDeleteWriter    ss    5   7.462 ± 0.690   s/op
Benchmark                                                             Mode  Cnt   Score   Error  Units
AvroWritersBenchmark.writePartitionedClusteredDataWriter                ss    5  11.114 ± 0.108   s/op
AvroWritersBenchmark.writePartitionedLegacyDataWriter                   ss    5  11.094 ± 0.422   s/op

AvroWritersBenchmark.writePartitionedFanoutDataWriter                   ss    5  11.223 ± 0.316   s/op
AvroWritersBenchmark.writePartitionedLegacyFanoutDataWriter             ss    5  11.029 ± 0.283   s/op

AvroWritersBenchmark.writeUnpartitionedClusteredDataWriter              ss    5  10.716 ± 0.295   s/op
AvroWritersBenchmark.writeUnpartitionedLegacyDataWriter                 ss    5  10.602 ± 0.509   s/op

AvroWritersBenchmark.writePartitionedClusteredEqualityDeleteWriter      ss    5  10.115 ± 0.215   s/op
AvroWritersBenchmark.writeUnpartitionedClusteredPositionDeleteWriter    ss    5   7.447 ± 0.526   s/op

}

ext {
jmhVersion = '1.21'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somehow, this did not seem to have any effect. I had to move it to the jmh block.

writerFactory, fileFactory, table.io(),
fileFormat, TARGET_FILE_SIZE);

writer.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we close the writer twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure it is idempotent. Spark may call close multiple times.


DeleteWriteResult result = writer.result();
Assert.assertEquals("Must be 3 delete files", 3, result.deleteFiles().size());
Assert.assertEquals("Must not reference data files", 0, writer.result().referencedDataFiles().size());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between this check and the one beneath it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking referencesDataFiles is consistent with the number of referenced data files reported.

@openinx
Copy link
Member

openinx commented Sep 23, 2021

Thanks @aokolnychyi for pinging me, let me take a look today !

}

if (completedSpecIds.contains(spec.specId())) {
throw new IllegalStateException("Already closed files for spec: " + spec.specId());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will prefer to add the partitionSpec and the specId together (rather than only the specId) in the IllegalStateException message, because I've seen many users publish questions about what's wrong about the message Already closed files for partition ..., it just a sort issue. What I am trying to say is: it's quite easy for the iceberg beginners to get the meaning of Already closed files for spec: 3 if we keep the current message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, it is the right time to add a longer error message that will clarify what happened. I'll look into that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree with @openinx here. This is a good opportunity to improve that error message. Now that this is the clustered writer, we can say that incoming records need to be clustered by partition. You can use PartitionSet for this so it's really easy to track.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also better to use a string representation of the spec rather than the spec ID.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new exception looks like this:

java.lang.IllegalStateException: Incoming records violate the writer assumption that records are clustered by spec and by partition within each spec. Either cluster the incoming records or switch to fanout writers.
Encountered records that belong to already closed files:
partition 'data=aaa' in spec [
  1000: data: identity(2)
]

currentPartition = StructCopy.copy(partition);
currentWriter = newWriter(currentSpec, currentPartition);

} else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will throw NullPointerException if the partition is null because the partitionComparator cannot compare null values , right ? I remember we will use null value for partition for unifying the partitioned writer and unpartitioned writer code path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right the comparator will throw an NPE but I think partition != currentPartition prevents us from calling the comparator whenever at least one value is null. Partition can be null only for unpartitioned specs. As long as we are writing unpartitioned records, partition != currentPartition will be false.

Whenever partition != currentPartition and at least one of them is null, it means we are changing the spec. If so, it will be handled by the if block above and we won't call the comparator at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. If currentPartition is null (as it is initialized) and a non-null partition is passed in, then the first check is true and the second check runs, which will pass both to the comparator. If we don't think that the comparator can handle null then we should update this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue, in the use case you mention, this if branch won't be invoked as the one above it will work. This if branch is only tested when we wrote at least a record and the new record belongs to the same spec as the previous record. That means if one partition is null, the second must be too, so partition != currentPartition is false and the comparator is not used.

This is something that will be invoked for every row so I would like to avoid any extra checks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, got it. That sounds fine.


@Override
protected FileWriter<T, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
// TODO: support ORC rolling writers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed a separate issue for this: #3169

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(
spec.specId(),
id -> StructLikeMap.create(spec.partitionType()));
FileWriter<T, R> writer = specWriters.get(partition);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For fanout write path, this line is the one of the hottest line because it will need to compare the partition field values for every row. For unpartitioned table, we also need to get the null key from the specWriters map. In the old implementation, we don't need to get the writer from the map for unpartitioned table. Is there any performance regression when comparing the two ?

Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am not mistaken, we only use the fanout writer for partitioned tables. Even in the old implementation.

You are right about this being the place where we need attention. Like I said here, we have an extra computeIfAbsent call and using StructLikeMap instead of a regular map with PartitionKey. While the performance hit seems to be negligible according to benchmark results I posted, I'd up to optimize this as much as possible.

One thing to consider is the performance of equals and hashCode in StructLikeWrapper vs PartitionKey. It is relatively simple and efficient in PartitionKey where we compare/iterate through object array. In the wrapper, these methods are more involved but don't seem drastically expensive.

One optimization idea is to introduce a cache of Comparators and JavaHash objects we use in the wrapper. At this point, we will create a comparator and a java hash for every partition we add to StructLikeMap. Even if we write to 1k partitions, I am not sure the difference is noticeable.

Another optimization idea can be to introduce a new interface to indicate when a StructLike is backed by an array of values. If two structs implement that interface, we can just compare the arrays in StructLikeWrapper.

I am going to do a separate benchmark for HashMap with PartitionKey and StructLikeMap with PartitionKey.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Sep 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did more benchmarks for 2.5 million records and 1000k partitions. I was using get/put methods heavily.

StructLikeMap<String> map = StructLikeMap.create(SPEC.partitionType());

PartitionKey partitionKey = new PartitionKey(SPEC, SCHEMA);
StructType dataSparkType = SparkSchemaUtil.convert(SCHEMA);
InternalRowWrapper internalRowWrapper = new InternalRowWrapper(dataSparkType);

for (InternalRow row : rows) {
  partitionKey.partition(internalRowWrapper.wrap(row));
  String res = map.get(partitionKey);
  if (res == null) {
    map.put(StructCopy.copy(partitionKey), "XXX");
 }
}

blackhole.consume(map);

Performance numbers came very close both time and memory-wise.

Benchmark                          Mode  Cnt  Score   Error  Units
MapBenchmark.hashMap                 ss    5  0.274 ± 0.066   s/op
MapBenchmark.structLikeMap           ss    5  0.358 ± 0.056   s/op

Given such a minor difference for 2.5 million records, I'd say we should be good without any optimizations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be because we cache the hash value in StructLikeWrapper.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detail explanation and performance report, @aokolnychyi !

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, just a few comments

if (completedPartitions.contains(partition)) {
String path = spec.partitionToPath(partition);
String errMsg = String.format("Already closed files for partition '%s' in spec %d", path, spec.specId());
throw new IllegalStateException(errMsg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: do we need a variable for errMsg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a big fan of splitting lines so I added an extra variable. This place changed a little bit. Let me know what you currently think.

() -> {
try {
writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa"));
} catch (IOException e) {
Copy link
Contributor

@rdblue rdblue Sep 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that write should throw IOException. We always wrap IOException in UncheckedIOException so it makes no sense for us to throw it from the writer interface.

I think I missed this when reviewing the FileWriter interfaces.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somehow, I assumed our delete writers throw one. I'll update FileWriter and PartitioningWriter interfaces.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think I remember now. Classes like PartitioningWriter close other writers and close throws an exception. I'll need to wrap such places and rethrow UncheckedIOException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do that in a follow-up.

@aokolnychyi aokolnychyi merged commit 11f327a into apache:master Sep 23, 2021
@aokolnychyi
Copy link
Contributor Author

Thanks for reviewing, @RussellSpitzer @openinx @rdblue!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants