Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,21 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
} finally {
this.txnManager.endTransaction(Option.of(inflightInstant));
}
// do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period
runTableServicesInline(table, metadata, extraMetadata);

// We don't want to fail the commit if hoodie.fail.writes.on.inline.table.service.exception is false. We catch warn if false
try {
// do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period
runTableServicesInline(table, metadata, extraMetadata);
} catch (Exception e) {
if (config.isFailOnInlineTableServiceExceptionEnabled()) {
throw e;
}
LOG.warn("Inline compaction or clustering failed with exception: " + e.getMessage()
+ ". Moving further since \"hoodie.fail.writes.on.inline.table.service.exception\" is set to false.");
}

emitCommitMetrics(instantTime, metadata, commitActionType);

// callback if needed.
if (config.writeCommitCallbackOn()) {
if (null == commitCallback) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Timeline archiving removes older instants from the timeline, after each write operation, to minimize metadata overhead. "
+ "Controls whether or not, the write should be failed as well, if such archiving fails.");

public static final ConfigProperty<String> FAIL_ON_INLINE_TABLE_SERVICE_EXCEPTION = ConfigProperty
.key("hoodie.fail.writes.on.inline.table.service.exception")
.defaultValue("true")
.withDocumentation("Table services such as compaction and clustering can fail and prevent syncing to "
+ "the metaclient. Set this to true to fail writes when table services fail");

public static final ConfigProperty<Long> INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty
.key("hoodie.consistency.check.initial_interval_ms")
.defaultValue(2000L)
Expand Down Expand Up @@ -1151,6 +1157,10 @@ public boolean isFailOnTimelineArchivingEnabled() {
return getBoolean(FAIL_ON_TIMELINE_ARCHIVING_ENABLE);
}

public boolean isFailOnInlineTableServiceExceptionEnabled() {
return getBoolean(FAIL_ON_INLINE_TABLE_SERVICE_EXCEPTION);
}

public int getMaxConsistencyChecks() {
return getInt(MAX_CONSISTENCY_CHECKS);
}
Expand Down Expand Up @@ -2355,6 +2365,11 @@ public Builder withDeleteParallelism(int parallelism) {
return this;
}

public Builder withFailureOnInlineTableServiceException(boolean fail) {
writeConfig.setValue(FAIL_ON_INLINE_TABLE_SERVICE_EXCEPTION, String.valueOf(fail));
return this;
}

public Builder withParallelism(int insertShuffleParallelism, int upsertShuffleParallelism) {
writeConfig.setValue(INSERT_PARALLELISM_VALUE, String.valueOf(insertShuffleParallelism));
writeConfig.setValue(UPSERT_PARALLELISM_VALUE, String.valueOf(upsertShuffleParallelism));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieUpsertException;
Expand Down Expand Up @@ -1754,30 +1755,62 @@ private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig cluste
return allRecords.getLeft().getLeft();
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testFailWritesOnInlineTableServiceExceptions(boolean shouldFail) throws IOException {
try {
Properties properties = new Properties();
properties.setProperty("hoodie.fail.writes.on.inline.table.service.exception", String.valueOf(shouldFail));
properties.setProperty("hoodie.auto.commit", "false");
properties.setProperty("hoodie.clustering.inline.max.commits", "1");
properties.setProperty("hoodie.clustering.inline", "true");
testInsertTwoBatches(true, "2015/03/16", properties, true);
assertFalse(shouldFail);
} catch (HoodieException e) {
assertEquals(CLUSTERING_FAILURE, e.getMessage());
assertTrue(shouldFail);
}
}

private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields) throws IOException {
return testInsertTwoBatches(populateMetaFields, "2015/03/16");
}

private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws IOException {
return testInsertTwoBatches(populateMetaFields, partitionPath, new Properties(), false);
}

/**
* This method returns following three items:
* 1. List of all HoodieRecord written in the two batches of insert.
* 2. Commit instants of the two batches.
* 3. List of new file group ids that were written in the two batches.
*/
private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields, String partitionPath) throws IOException {
private Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>> testInsertTwoBatches(boolean populateMetaFields, String partitionPath, Properties props,
boolean failInlineClustering) throws IOException {
// create config to not update small files.
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields,
populateMetaFields ? new Properties() : getPropertiesForKeyGen());
SparkRDDWriteClient client = getHoodieWriteClient(config);
populateMetaFields ? props : getPropertiesForKeyGen());
SparkRDDWriteClient client;
if (failInlineClustering) {
if (null != writeClient) {
writeClient.close();
writeClient = null;
}
client = new WriteClientBrokenClustering(context, config);
} else {
client = getHoodieWriteClient(config);
}

dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
String commitTime1 = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime1, populateMetaFields);
List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime1, populateMetaFields, failInlineClustering);
Set<HoodieFileGroupId> fileIds1 = getFileGroupIdsFromWriteStatus(statuses1);

String commitTime2 = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records2 = dataGen.generateInserts(commitTime2, 200);
List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, commitTime2, populateMetaFields);
List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, commitTime2, populateMetaFields, failInlineClustering);
Set<HoodieFileGroupId> fileIds2 = getFileGroupIdsFromWriteStatus(statuses2);
Set<HoodieFileGroupId> fileIdsUnion = new HashSet<>(fileIds1);
fileIdsUnion.addAll(fileIds2);
Expand Down Expand Up @@ -2075,11 +2108,20 @@ private void verifyRecordsWrittenWithPreservedMetadata(Set<String> commitTimes,
}

private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields) throws IOException {
return writeAndVerifyBatch(client, inserts, commitTime, populateMetaFields, false);
}

private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields, boolean autoCommitOff) throws IOException {
client.startCommitWithTime(commitTime);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts, 2);
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime).collect();
JavaRDD<WriteStatus> statusRDD = client.upsert(insertRecordsRDD1, commitTime);
if (autoCommitOff) {
client.commit(commitTime, statusRDD);
}
List<WriteStatus> statuses = statusRDD.collect();
assertNoWriteErrors(statuses);
verifyRecordsWritten(commitTime, populateMetaFields, inserts, statuses, client.getConfig());

return statuses;
}

Expand Down Expand Up @@ -2755,4 +2797,18 @@ protected void validateRecordsBeforeAndAfter(final Dataset<Row> before, final Da
}
}

public static class WriteClientBrokenClustering<T extends HoodieRecordPayload> extends org.apache.hudi.client.SparkRDDWriteClient<T> {

public WriteClientBrokenClustering(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
super(context, clientConfig);
}

@Override
protected Option<String> inlineClustering(Option<Map<String, String>> extraMetadata) {
throw new HoodieException(CLUSTERING_FAILURE);
}

}

public static String CLUSTERING_FAILURE = "CLUSTERING FAILURE";
}