Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.gobblin.util.HadoopUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
Expand Down Expand Up @@ -61,6 +63,7 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
protected int batchSize;
protected final S inputSchema;

private final boolean validateORCAfterClose;
private final boolean selfTuningWriter;
private int selfTuneRowsBetweenCheck;
private double rowBatchMemoryUsageFactor;
Expand Down Expand Up @@ -94,6 +97,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
this.inputSchema = builder.getSchema();
this.typeDescription = getOrcSchema();
this.selfTuningWriter = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, false);
this.validateORCAfterClose = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_VALIDATE_FILE_AFTER_CLOSE, false);
this.maxOrcBatchSize = properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE,
GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE);
this.batchSize = this.selfTuningWriter ?
Expand Down Expand Up @@ -258,7 +262,18 @@ public void close()
public void commit()
throws IOException {
closeInternal();
// Validate the ORC file after writer close. Default is false as it introduce more load to FS and decrease the performance
if(this.validateORCAfterClose) {
try (Reader reader = OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf))) {
} catch (Exception e) {
log.error("Found error when validating staging ORC file {} during commit phase. "
+ "Will delete the malformed file and terminate the commit", this.stagingFile, e);
HadoopUtils.deletePath(this.fs, this.stagingFile, false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to do this validation here or if its being published at a folder level at a different step, is when we have the reader validate all the files going into publish? My concern is that even if we are validating and deleting files at the writer step, this still assumes that this writer will shut down using a happy path and not suddenly. If it shuts down due to some container error code e.g. 143 and not go through the cleanup process then it can still be moved to another folder later down the line?

throw e;
}
}
super.commit();

if (this.selfTuningWriter) {
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_RECORD_SIZE, String.valueOf(getEstimatedRecordSizeBytes()));
properties.setProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
*/
public class GobblinOrcWriterConfigs {
public static final String ORC_WRITER_PREFIX = "orcWriter.";
/**
* Configuration for enabling validation of ORC file to detect malformation. If enabled, will throw exception and
* delete malformed ORC file during commit
*/
public static final String ORC_WRITER_VALIDATE_FILE_AFTER_CLOSE = ORC_WRITER_PREFIX + "validate.file.after.close";
/**
* Default buffer size in the ORC Writer before sending the records to the native ORC Writer
*/
Expand Down