Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
protected int batchSize;
protected final S inputSchema;

private final boolean validateORCDuringCommit;
private final boolean selfTuningWriter;
private int selfTuneRowsBetweenCheck;
private double rowBatchMemoryUsageFactor;
Expand Down Expand Up @@ -94,6 +95,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
this.inputSchema = builder.getSchema();
this.typeDescription = getOrcSchema();
this.selfTuningWriter = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, false);
this.validateORCDuringCommit = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_VALIDATE_FILE_DURING_COMMIT, false);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any concern not defaulting to True ?
I feel the validation should be "default". unless i miss something obvious

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are concerns about an extra HDFS call, and what that would do to HDFS load. Internally we will enable it everywhere but we wouldn't want anyone to accidentally start having increased load, so usually we keep things disabled by default for backward compatibility

this.maxOrcBatchSize = properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE,
GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE);
this.batchSize = this.selfTuningWriter ?
Expand Down Expand Up @@ -259,6 +261,15 @@ public void commit()
throws IOException {
closeInternal();
super.commit();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line calls FsDataWriter and triggers the moving from staging to output directory. Is there a reason for us to do all that work and then do the validation?

I also wonder why this is part of the commit step and not part of the close step. close does not call this method, but it does do the flush.

If we close and the flushed file turns out not to be valid, we will miss the validation here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking maybe there's something wrong during moving. But given the issue is malformed files, so the issue should already be there after writer closed. So move the logic to after closeInternal() is called.

if(this.validateORCDuringCommit) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#3818 (comment)

This issue is still present. We want to move this to close function and not just commit because we flush the buffer there too and if it's malformed then we want to catch it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I think the current issue is caused during the close sequence, so we need to destroy the file

Caused by: java.nio.channels.ClosedChannelException
	at org.apache.hadoop.hdfs.DataStreamer$LastExceptionInStreamer.throwException4Close(DataStreamer.java:324)
	at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:152)
	at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:105)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.hadoop.fs.RetryingOutputStream.lambda$write$1(RetryingOutputStream.java:237)
	at org.apache.hadoop.fs.RetryPolicy.lambda$run$0(RetryPolicy.java:137)
	at org.apache.hadoop.fs.NoOpRetryPolicy.run(NoOpRetryPolicy.java:36)
	at org.apache.hadoop.fs.RetryPolicy.run(RetryPolicy.java:136)
	at org.apache.hadoop.fs.RetryingOutputStream.runWithRetries(RetryingOutputStream.java:301)
	at org.apache.hadoop.fs.RetryingOutputStream.write(RetryingOutputStream.java:234)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.orc.impl.PhysicalFsWriter$DirectStream.output(PhysicalFsWriter.java:314)
	at org.apache.orc.impl.OutStream.outputBuffer(OutStream.java:163)
	at org.apache.orc.impl.OutStream.flush(OutStream.java:359)
	at org.apache.orc.impl.PhysicalFsWriter.writeFileFooter(PhysicalFsWriter.java:457)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im curious whats the validation and how it works. Does it validate on the header or something else.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does all sorts of validations. I attached below 1 example. You can dig around the class for a bunch of others

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, the current issues we see are from writing a bad orc file and then moving it to the taskoutput directory where the file is effectively committed.

We do NOT want to modify the behavior of the base data publisher because its such a widely used class with very wide implications. But the current behavior of the base data publisher is to read all the files in the output dir and use runners to move them all in parallel. It has nothing to do with who originally wrote the file, it will blindly move all of them at that point.

The base data publisher is not a good place to do validation either because it does not care about the data being moved, it's agnostic to data formats.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try {
OrcFile.createReader(this.outputFile, new OrcFile.ReaderOptions(conf));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future readers, I think the below observation is nuanced and worth spelling out. It may even be worth a comment.

This will work for files of size [1,3] bytes. It will not catch empty files, which I think is a very very subtle thing. and I think that's okay as long as users are using native ORC readers.

https://github.com/apache/orc/blob/24beffb6fed6d408e25654e53c255f564c8bd8a9/java/core/src/java/org/apache/orc/impl/ReaderImpl.java#L797C1-L802

Since it seems like part of the standard, computing engines like trino support it https://trino.io/blog/2019/05/29/improved-hive-bucketing.html#whats-the-problem. For some time, presto did not support these empty files but now also does to follow the convention of hive

} catch (IOException ioException) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From reading the readerimpl, it can throw 2 checked exceptions,

Both of which extend IOException.

My question to you is what if there's any other runtime exception? Should we still delete? I lean toward yes. But maybe I am missing some edge case here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we should delete in case of other runtime or unchecked exception, for the sake of robustness. Changed to generic Exception

log.error("Found error when validating ORC file {} during commit phase", this.outputFile, ioException);
log.error("Delete the malformed ORC file is successful: {}", this.fs.delete(this.outputFile, false));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the severity of failing to delete this ORC file, do you think we should retry this operation?

Check for references to retryer in the code base for an easy out of the box impl

@homatthew homatthew Nov 2, 2023

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And is there a world where this operation fails because of Filesystem is closed error? Do we need to account for that edge case?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also notice that the parent class FsDataWriter uses HadoopUtils.deletePath. It's subtle, but perhaps we should use that method instead when deleting because they consider some edge case where it throws an io exception if the file exists but the file fails to delete. I'd imagine if the FileSystem delete covered that edge case, the util method would not have been created, but it's hard to validate when there will be an io exception because it is not documented anywhere.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using HadoopUtils.deletePath to delete the file now

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not sure about the retries. If fs delete file fails, we won't delete the file but also won't retry. This works when we call commit because we throw the IO exception to prevent the file from being moved. But we do not do this when we close the file in the close function, which calls closeInternal().

If we flush the buffer, we should check after that the file is valid

throw ioException;
}
}
if (this.selfTuningWriter) {
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_RECORD_SIZE, String.valueOf(getEstimatedRecordSizeBytes()));
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
*/
public class GobblinOrcWriterConfigs {
public static final String ORC_WRITER_PREFIX = "orcWriter.";
/**
* Configuration for enabling validation of ORC file to detect malformation. If enabled, will throw exception and
* delete malformed ORC file during commit
*/
public static final String ORC_WRITER_VALIDATE_FILE_DURING_COMMIT = ORC_WRITER_PREFIX + "validate.commit.file";
/**
* Default buffer size in the ORC Writer before sending the records to the native ORC Writer
*/
Expand Down