-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Positional deletes creates partitioned path on unpartitioned tables #7685
Conversation
Full stack trace:
|
@@ -435,7 +443,10 @@ public void delete(InternalRow metadata, InternalRow id) throws IOException { | |||
|
|||
InternalRow partition = metadata.getStruct(partitionOrdinal, partitionRowWrapper.size()); | |||
StructProjection partitionProjection = partitionProjections.get(specId); | |||
partitionProjection.wrap(partitionRowWrapper.wrap(partition)); | |||
|
|||
if (partitionProjection != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about adding a separate interface for partition projections? That way, we can avoid an extra if branch in the code that gets invoked in a tight loop. We would also handle other places. This PR updates DELETE commands but we have a similar problem in UPDATE and MERGE as those can write deletes against an upartitioned spec.
interface PartitionProjection implements StructLike {
PartitionProjection create(PartitionSpec spec, StructType commonPartitionType) {
...
}
PartitionProjection wrap(StructLike newStruct);
private static class UnpartitionedSpecProjection implements PartitionProjection {
// something always returns null
}
private static class PartitionedSpecProjection implements PartitionProjection {
// something that delegates to StructProjection and uses spec.partitionType()
}
}
The naming above is totally random.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought @stevenzwu already made the change that projections of null just return null, or did I remember that wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @RussellSpitzer for jumping in. It looks like it is related to nestesd structs: #7507
I was able to reproduce this on master. @aokolnychyi I've updated the PR, let me know what you think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RussellSpitzer PR #7507 limited its scope to only nested null struct. Originally, I was also thinking about failing the wrap call if the root struct is null to avoid the if-null check on the root struct object. Ryan made a valid point that get methods can be called before wrap is ever called. Hence we still need the null check. Also Ryan is thinking maybe null root struct value is possible in a collection. Hence we kept the old behavior of StructProjection wrapping a null root value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the context @stevenzwu. I think this is different since it affects the partitions. cc @rdblue
|
||
@Override | ||
public PartitionProjection wrap(StructLike newStruct) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I traced through the code. this seems to be the relevant fix.
It tries to influence the OutputFileFactory
, which then calls LocationProvider
interface.
public EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partition) {
String newDataLocation = locations.newDataLocation(spec, partition, generateFilename());
OutputFile rawOutputFile = io.newOutputFile(newDataLocation);
return encryptionManager.encrypt(rawOutputFile);
}
Location provider impl class (e.g. default) would create the empty string for unpartitioned table.
@Override
public String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename) {
return String.format("%s/%s/%s", dataLocation, spec.partitionToPath(partitionData), filename);
}
I am wondering if the fix should be done in OutputFileFactory
. If it is unpartitioned spec, it should call the other newDataLocation
method without partition from LocationProvider
interface.
/**
* Return a fully-qualified data file location for the given filename.
*
* @param filename a file name
* @return a fully-qualified location URI for a data file
*/
String newDataLocation(String filename);
/**
* Return a fully-qualified data file location for the given partition and filename.
*
* @param spec a partition spec
* @param partitionData a tuple of partition data for data in the file, matching the given spec
* @param filename a file name
* @return a fully-qualified location URI for a data file
*/
String newDataLocation(PartitionSpec spec, StructLike partitionData, String filename);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, I am saying the 2nd newOutputFile
from OutputFileFactory
can behave the same as the first method if the partition object is unpartitioned.
/** Generates an {@link EncryptedOutputFile} for unpartitioned writes. */
public EncryptedOutputFile newOutputFile() {
OutputFile file = io.newOutputFile(locations.newDataLocation(generateFilename()));
return encryptionManager.encrypt(file);
}
/** Generates an {@link EncryptedOutputFile} for partitioned writes in a given spec. */
public EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partition) {
String newDataLocation = locations.newDataLocation(spec, partition, generateFilename());
OutputFile rawOutputFile = io.newOutputFile(newDataLocation);
return encryptionManager.encrypt(rawOutputFile);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu that's similar to what I had before: ca63c4b
I think we expect the struct to be null
when there is not partition. It looks like this is expected in many parts of the code. I'm fine either way, but I'd love to get the opinion of @rdblue or @aokolnychyi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the context. I missed the earlier part of the discussion.
what about the one-liner fix in the first commit? does it cause failure in some other code paths?
880cb49
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other fix also fixed it for the positional deletes. But as Anton mentioned, the fix that we have right now also covers the UPDATE and MERGE path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if passing an empty struct instead of null will have any impact in other places. For instance, DataFiles
and FileMetadata
builders explicitly nullify partitions if the spec is unpartitioned, so even we pass an empty struct, it will use null if the spec is unpartitioned (which is good). Let me check other places. If there is no issue, maybe, we can go ahead and fix the output factory/writer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want always null, I think adding PartitionProjection
is reasonable to avoid if/else in tight loops and to reuse this logic in other places. That said, we know our file builders will always set null for unpartitioned tables. We only need to fix our writers.
What if we fix RollingFileWriter
? It currently checks partition == null
.
private EncryptedOutputFile newFile() {
if (partition == null) {
return fileFactory.newOutputFile();
} else {
return fileFactory.newOutputFile(spec, partition);
}
}
We could replace it with spec.isUnpartitioned()
.
I also don't mind adding an extra branch to OutputFileFactory
to be safe but I guess that's optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixing the writers seems to be a smaller change, so maybe we can start with that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi That works for me. Let me update the PR
Let me catch up today. Sorry for the delay. |
I was doing some work on the Python side: apache#6775 But ran into an issue when creating some integration tests for testing the positional deletes. I ended up with double slashes: s3://warehouse/default/test_positional_mor_deletes/data//00000-32-70be11f7-3c4b-40e0-b35a-334e97ef6554-00001-deletes.parquet It looks like the Struct is not-null, but the partition not partitioned, therefore it creates a partitioned path, but with the empty struct we'll end up with a double slash `//` that Minio doesn't like. Outputfactory.java ```java public EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partition) { // partition is a StructCopy String newDataLocation = locations.newDataLocation(spec, partition, generateFilename()); OutputFile rawOutputFile = io.newOutputFile(newDataLocation); return encryptionManager.encrypt(rawOutputFile); } ``` ClusteredWriter.java ```java // copy the partition key as the key object may be reused this.currentPartition = StructCopy.copy(partition); // partition is a StructProjection this.currentWriter = newWriter(currentSpec, currentPartition); ``` Resolves apache#7678
@@ -109,7 +109,7 @@ protected void openCurrentWriter() { | |||
} | |||
|
|||
private EncryptedOutputFile newFile() { | |||
if (partition == null) { | |||
if (spec.isUnpartitioned() || partition == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check needs to be added to other writers, like FanoutWriter
, ClusteredWriter
. They all have the same pattern.
return partition == null
? fileFactory.newOutputFile()
: fileFactory.newOutputFile(spec, partition);
}
Originally I was wondering if this check should be done in the method below of OutputFileFactory
class. But I see it is not appropriate, as the Java doc clearly indicates that this method should be used for partitioned writes. Hence caller needs to separate out unpartitioned vs partitioned writes.
/** Generates an {@link EncryptedOutputFile} for partitioned writes in a given spec. */
public EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partition)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with adding those as well. WDYT @aokolnychyi
@@ -137,7 +137,7 @@ protected EncryptedOutputFile newOutputFile( | |||
Preconditions.checkArgument( | |||
spec.isUnpartitioned() || partition != null, | |||
"Partition must not be null when creating output file for partitioned spec"); | |||
return partition == null | |||
return spec.isUnpartitioned() || partition == null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not caused by this PR. but since we are touching the code, should we use the same style of if-else
.
I am fine with either if-else or the ternary operator. I know @aokolnychyi prefers if-else for multi-line statements based on a previous review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch @stevenzwu. Consistency is king 👍🏻
Let me take another look in a moment. |
Thanks, @Fokko! Thanks for reviewing, @stevenzwu! Could we also follow up to remove |
Thanks for the review @aokolnychyi and @stevenzwu!
These are public, I think we have to deprecate them first. |
I was doing some work on the Python side:
#6775
But ran into an issue when creating some integration tests for testing the positional deletes. I ended up with double slashes:
It looks like the Struct is not-null, but the table is not partitioned, therefore it tries to create a partitioned path, but with the empty struct we'll end up with a double slash
//
that Minio doesn't like.Outputfactory.java
ClusteredWriter.java
I still have to dig into why there is a StructProjection.
Resolves #7678
Can be reproduced by running the following: