Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
private int orcFileWriterRowsBetweenCheck;
private long orcStripeSize;
private int maxOrcBatchSize;
private int batchSizeRowCheckFactor;
private boolean enableLimitBufferSizeOrcStripe;

private int concurrentWriterTasks;
private long orcWriterStripeSizeBytes;
Expand All @@ -109,6 +111,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
this.typeDescription = getOrcSchema();
this.selfTuningWriter = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, false);
this.validateORCAfterClose = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_VALIDATE_FILE_AFTER_CLOSE, false);
this.batchSizeRowCheckFactor = properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR, GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR);
this.maxOrcBatchSize = properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE,
GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_BATCH_SIZE);
this.batchSize = this.selfTuningWriter ?
Expand All @@ -133,6 +136,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
GobblinOrcWriterConfigs.DEFAULT_MIN_ORC_WRITER_ROWCHECK);
this.orcFileWriterMaxRowsBetweenCheck = properties.getPropAsInt(GobblinOrcWriterConfigs.ORC_WRITER_MAX_ROWCHECK,
GobblinOrcWriterConfigs.DEFAULT_MAX_ORC_WRITER_ROWCHECK);
this.enableLimitBufferSizeOrcStripe = properties.getPropAsBoolean(GobblinOrcWriterConfigs.ORC_WRITER_ENABLE_BUFFER_LIMIT_ORC_STRIPE, false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for this, do you plan to clean up the code afterwards?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah didn't want to introduce an unwanted regression that would be hard to roll back, can make this the static/default behavior if performance is good

// Create file-writer
this.writerConfig = new Configuration();
// Populate job Configurations into Conf as well so that configurations related to ORC writer can be tuned easily.
Expand Down Expand Up @@ -312,11 +316,18 @@ void tuneBatchSize(long averageSizePerRecord) throws IOException {
this.currentOrcWriterMaxUnderlyingMemory = Math.max(this.currentOrcWriterMaxUnderlyingMemory, orcFileWriter.estimateMemory());
}
long maxMemoryInFileWriter = Math.max(currentOrcWriterMaxUnderlyingMemory, prevOrcWriterMaxUnderlyingMemory);

int newBatchSize = (int) ((this.availableMemory*1.0 / currentPartitionedWriters * this.rowBatchMemoryUsageFactor - maxMemoryInFileWriter
- this.estimatedBytesAllocatedConverterMemory) / averageSizePerRecord);

if (this.enableLimitBufferSizeOrcStripe) {
// For large records, prevent the batch size from greatly exceeding the size of a stripe as the native ORC Writer will flush its buffer after a stripe is filled
int maxRecordsPerStripeSize = (int) Math.round(this.orcWriterStripeSizeBytes * 1.0 / averageSizePerRecord);
this.orcFileWriterMaxRowsBetweenCheck = Math.min(this.orcFileWriterMaxRowsBetweenCheck, maxRecordsPerStripeSize);
this.maxOrcBatchSize = Math.min(this.maxOrcBatchSize, maxRecordsPerStripeSize);
}
// Handle scenarios where new batch size can be 0 or less due to overestimating memory used by other components
newBatchSize = Math.min(Math.max(1, newBatchSize), this.maxOrcBatchSize);

if (Math.abs(newBatchSize - this.batchSize) > GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_TUNE_BATCHSIZE_SENSITIVITY * this.batchSize) {
// Add a factor when tuning up the batch size to prevent large sudden increases in memory usage
if (newBatchSize > this.batchSize) {
Expand All @@ -337,7 +348,7 @@ void tuneBatchSize(long averageSizePerRecord) throws IOException {
void initializeOrcFileWriter() {
try {
this.orcFileWriterRowsBetweenCheck = Math.max(
Math.min(this.batchSize * GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR, this.orcFileWriterMaxRowsBetweenCheck),
Math.min(this.batchSize * this.batchSizeRowCheckFactor, this.orcFileWriterMaxRowsBetweenCheck),
this.orcFileWriterMinRowsBetweenCheck
);
this.writerConfig.set(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), String.valueOf(this.orcFileWriterRowsBetweenCheck));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public class GobblinOrcWriterConfigs {
* Max buffer size of the Gobblin ORC Writer that it can be tuned to
*/
public static final String ORC_WRITER_AUTO_SELFTUNE_MAX_BATCH_SIZE = ORC_WRITER_PREFIX + "auto.selfTune.max.batch.size";
/**
* The ratio of native ORC Writer buffer size to Gobblin ORC Writer buffer size
*/
public static final String ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR = "auto.selfTune.rowCheck.factor";

/**
* How often should the Gobblin ORC Writer check for tuning
*/
Expand All @@ -60,6 +65,12 @@ public class GobblinOrcWriterConfigs {
*/
public static final String ORC_WRITER_MAX_ROWCHECK = ORC_WRITER_PREFIX + "max.rows.between.memory.checks";

/**
* Enable a maximum buffer size of both the native ORC writer and the Gobblin ORC writer by the size of a stripe divided by the estimated
* size of each record. This is to capture the case when records are extremely large and cause large buffer sizes to dominate the memory usage
*/
public static final String ORC_WRITER_ENABLE_BUFFER_LIMIT_ORC_STRIPE = ORC_WRITER_PREFIX + "auto.selfTune.max.buffer.orc.stripe";

public static final String ORC_WRITER_INSTRUMENTED = ORC_WRITER_PREFIX + "instrumented";

public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
Expand All @@ -70,10 +81,9 @@ public class GobblinOrcWriterConfigs {
*/
public static final int DEFAULT_CONCURRENT_WRITERS = 3;
public static final double DEFAULT_ORC_WRITER_BATCHSIZE_MEMORY_USAGE_FACTOR = 0.3;
/**
* The ratio of native ORC Writer buffer size to Gobblin ORC Writer buffer size
*/

public static final int DEFAULT_ORC_WRITER_BATCHSIZE_ROWCHECK_FACTOR = 5;

public static final int DEFAULT_MAX_ORC_WRITER_BATCH_SIZE = DEFAULT_ORC_WRITER_BATCH_SIZE;
public static final int DEFAULT_ORC_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK = 500;
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,21 +268,23 @@ public void testSelfTuneRowBatchCalculation() throws Exception {
// Force a larger initial batchSize that can be tuned down
orcWriter.batchSize = 10;
orcWriter.rowBatch.ensureSize(10);
// Given that the available memory is very high, the resulting batchsize should be maxed out
orcWriter.availableMemory = 100000000;
// Given the amount of available memory and a low stripe size, and estimated rowBatchSize, the resulting batchsize should be maxed out
// Consider that the batch size incrementally increases based on the difference between target and current batchsize (10)
orcWriter.tuneBatchSize(10);
System.out.println(orcWriter.batchSize);
// Take into account that increases in batchsize are multiplied by a factor to prevent large jumps in batchsize
Assert.assertTrue(orcWriter.batchSize == (GobblinOrcWriterConfigs.DEFAULT_ORC_WRITER_BATCH_SIZE+10)/2);
Assert.assertEquals(orcWriter.batchSize, 505);
orcWriter.tuneBatchSize(10);
Assert.assertEquals(orcWriter.batchSize, 752);

orcWriter.availableMemory = 100;
orcWriter.tuneBatchSize(10);
// Given that the amount of available memory is low, the resulting batchsize should be 1
Assert.assertTrue(orcWriter.batchSize == 1);
Assert.assertEquals(orcWriter.batchSize,1);
orcWriter.availableMemory = 10000;
orcWriter.rowBatch.ensureSize(10000);
// Since the rowBatch is large, the resulting batchsize should still be 1 even with more memory
orcWriter.tuneBatchSize(10);
Assert.assertTrue(orcWriter.batchSize == 1);
Assert.assertEquals(orcWriter.batchSize, 1);
}

@Test
Expand Down Expand Up @@ -322,4 +324,42 @@ public void testStatePersistenceWhenClosingWriter() throws IOException {
Assert.assertNotNull(dummyState.getProp(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_NATIVE_WRITER_MEMORY));
Assert.assertNotNull(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute());
}
}

@Test
public void testSelfTuneRowBatchCalculationWithStripeMax() throws Exception {
Schema schema =
new Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("orc_writer_test/schema.avsc"));
List<GenericRecord> recordList = deserializeAvroRecords(this.getClass(), schema, "orc_writer_test/data_multi.json");

// Mock WriterBuilder, bunch of mocking behaviors to work-around precondition checks in writer builder
FsDataWriterBuilder<Schema, GenericRecord> mockBuilder =
(FsDataWriterBuilder<Schema, GenericRecord>) Mockito.mock(FsDataWriterBuilder.class);
when(mockBuilder.getSchema()).thenReturn(schema);

State dummyState = new WorkUnit();
String stagingDir = Files.createTempDir().getAbsolutePath();
String outputDir = Files.createTempDir().getAbsolutePath();
dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir);
dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH, "selfTune");
dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir);
dummyState.setProp(GobblinOrcWriterConfigs.ORC_WRITER_AUTO_SELFTUNE_ENABLED, "true");
dummyState.setProp(OrcConf.STRIPE_SIZE.getAttribute(), "100");
dummyState.setProp(GobblinOrcWriterConfigs.ORC_WRITER_ENABLE_BUFFER_LIMIT_ORC_STRIPE, "true");
when(mockBuilder.getFileName(dummyState)).thenReturn("file");

// Having a closer to manage the life-cycle of the writer object.
Closer closer = Closer.create();
GobblinOrcWriter orcWriter = closer.register(new GobblinOrcWriter(mockBuilder, dummyState));
// Force a larger initial batchSize that can be tuned down
orcWriter.batchSize = 10;
orcWriter.rowBatch.ensureSize(10);
orcWriter.availableMemory = 100000000;
// Since the stripe size is 100, the resulting batchsize should be 10 (100/10)
orcWriter.tuneBatchSize(10);
Assert.assertEquals(orcWriter.batchSize,10);

// Increasing the estimated record size should decrease the max batch size
orcWriter.tuneBatchSize(100);
Assert.assertEquals(orcWriter.batchSize,1);
}
}