Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.gobblin.util.HadoopUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.orc.OrcConf;
Expand Down Expand Up @@ -61,6 +62,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
protected int batchSize;
protected final S inputSchema;

private final boolean validateORCDuringCommit;
private final boolean selfTuningWriter;
private int selfTuneRowsBetweenCheck;
private double rowBatchMemoryUsageFactor;
Expand Down Expand Up @@ -94,6 +96,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
this.inputSchema = builder.getSchema();
this.typeDescription = getOrcSchema();
this.selfTuningWriter = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, false);
this.validateORCDuringCommit = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_VALIDATE_FILE_DURING_COMMIT, false);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any concern not defaulting to True ?
I feel the validation should be "default". unless i miss something obvious

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are concerns about an extra HDFS call, and what that would do to HDFS load. Internally we will enable it everywhere but we wouldn't want anyone to accidentally start having increased load, so usually we keep things disabled by default for backward compatibility

this.maxOrcBatchSize = properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE,
GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE);
this.batchSize = this.selfTuningWriter ?
Expand Down Expand Up @@ -258,7 +261,18 @@ public void close()
public void commit()
throws IOException {
closeInternal();
if(this.validateORCDuringCommit) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#3818 (comment)

This issue is still present. We want to move this to close function and not just commit because we flush the buffer there too and if it's malformed then we want to catch it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I think the current issue is caused during the close sequence, so we need to destroy the file

Caused by: java.nio.channels.ClosedChannelException
	at org.apache.hadoop.hdfs.DataStreamer$LastExceptionInStreamer.throwException4Close(DataStreamer.java:324)
	at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:152)
	at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:105)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.hadoop.fs.RetryingOutputStream.lambda$write$1(RetryingOutputStream.java:237)
	at org.apache.hadoop.fs.RetryPolicy.lambda$run$0(RetryPolicy.java:137)
	at org.apache.hadoop.fs.NoOpRetryPolicy.run(NoOpRetryPolicy.java:36)
	at org.apache.hadoop.fs.RetryPolicy.run(RetryPolicy.java:136)
	at org.apache.hadoop.fs.RetryingOutputStream.runWithRetries(RetryingOutputStream.java:301)
	at org.apache.hadoop.fs.RetryingOutputStream.write(RetryingOutputStream.java:234)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.orc.impl.PhysicalFsWriter$DirectStream.output(PhysicalFsWriter.java:314)
	at org.apache.orc.impl.OutStream.outputBuffer(OutStream.java:163)
	at org.apache.orc.impl.OutStream.flush(OutStream.java:359)
	at org.apache.orc.impl.PhysicalFsWriter.writeFileFooter(PhysicalFsWriter.java:457)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im curious whats the validation and how it works. Does it validate on the header or something else.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does all sorts of validations. I attached below 1 example. You can dig around the class for a bunch of others

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, the current issues we see are from writing a bad orc file and then moving it to the taskoutput directory where the file is effectively committed.

We do NOT want to modify the behavior of the base data publisher because its such a widely used class with very wide implications. But the current behavior of the base data publisher is to read all the files in the output dir and use runners to move them all in parallel. It has nothing to do with who originally wrote the file, it will blindly move all of them at that point.

The base data publisher is not a good place to do validation either because it does not care about the data being moved, it's agnostic to data formats.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try {
OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment that this is increasing HDFS load because we open again.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add reader to the try with statement so that it is always closed

} catch (Exception e) {
log.error("Found error when validating ORC file during commit phase", e);
HadoopUtils.deletePath(this.fs, this.stagingFile, false);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to do this validation here or if its being published at a folder level at a different step, is when we have the reader validate all the files going into publish? My concern is that even if we are validating and deleting files at the writer step, this still assumes that this writer will shut down using a happy path and not suddenly. If it shuts down due to some container error code e.g. 143 and not go through the cleanup process then it can still be moved to another folder later down the line?

log.error("Delete the malformed ORC file after close the writer: {}", this.stagingFile);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this an 'info' log?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also make the statement past tense.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more an error situation because ideally it shouldn't happen, so we need to delete the file and terminate the commit/ingestion

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 269 is error message, agreed.
This is info because you are giving information that you have deleted the malformed file.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's an info as we delete the file. But we can log the message at the ERROR level in case user disabled for info level, as this is critical information we'd like it to speak loud.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if thats the case you can modify the error message to be like at line 269.

"Found error when validating ORC file during commit phase. Deleting the malformed ORC file and closing the writer."

The tense of the statement is important in logs to understand whats being done and what is already done.

throw e;
}
}
super.commit();

if (this.selfTuningWriter) {
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_RECORD_SIZE, String.valueOf(getEstimatedRecordSizeBytes()));
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
*/
public class GobblinOrcWriterConfigs {
public static final String ORC_WRITER_PREFIX = "orcWriter.";
/**
* Configuration for enabling validation of ORC file to detect malformation. If enabled, will throw exception and
* delete malformed ORC file during commit
*/
public static final String ORC_WRITER_VALIDATE_FILE_DURING_COMMIT = ORC_WRITER_PREFIX + "validate.commit.file";
/**
* Default buffer size in the ORC Writer before sending the records to the native ORC Writer
*/
Expand Down