Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() t
.setBasePath(service.getCfg().targetBasePath)
.build();
String instantTime = InProcessTimeGenerator.createNewInstantTime();
InputBatch inputBatch = service.readFromSource(instantTime, metaClient).getLeft();
InputBatch inputBatch = service.readFromSource(instantTime, metaClient);
return Pair.of(inputBatch.getSchemaProvider(), Pair.of(inputBatch.getCheckpointForNextBatch(), (JavaRDD<HoodieRecord>) inputBatch.getBatch().get()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class SparkSampleWritesUtils {

private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class);

public static Option<HoodieWriteConfig> getWriteConfigWithRecordSizeEstimate(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig) {
public static Option<HoodieWriteConfig> getWriteConfigWithRecordSizeEstimate(JavaSparkContext jsc, Option<JavaRDD<HoodieRecord>> recordsOpt, HoodieWriteConfig writeConfig) {
if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) {
LOG.debug("Skip overwriting record size estimate as it's disabled.");
return Option.empty();
Expand All @@ -76,7 +76,7 @@ public static Option<HoodieWriteConfig> getWriteConfigWithRecordSizeEstimate(Jav
}
try {
String instantTime = getInstantFromTemporalAccessor(Instant.now().atZone(ZoneId.systemDefault()));
Pair<Boolean, String> result = doSampleWrites(jsc, records, writeConfig, instantTime);
Pair<Boolean, String> result = doSampleWrites(jsc, recordsOpt, writeConfig, instantTime);
if (result.getLeft()) {
long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight());
LOG.info("Overwriting record size estimate to " + avgSize);
Expand All @@ -90,7 +90,7 @@ public static Option<HoodieWriteConfig> getWriteConfigWithRecordSizeEstimate(Jav
return Option.empty();
}

private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRDD<HoodieRecord> records, HoodieWriteConfig writeConfig, String instantTime)
private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, Option<JavaRDD<HoodieRecord>> recordsOpt, HoodieWriteConfig writeConfig, String instantTime)
throws IOException {
final String sampleWritesBasePath = getSampleWritesBasePath(jsc, writeConfig, instantTime);
HoodieTableMetaClient.withPropertyBuilder()
Expand All @@ -109,25 +109,31 @@ private static Pair<Boolean, String> doSampleWrites(JavaSparkContext jsc, JavaRD
.withAutoCommit(true)
.withPath(sampleWritesBasePath)
.build();
Pair<Boolean, String> emptyRes = Pair.of(false, null);
try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) {
int size = writeConfig.getIntOrDefault(SAMPLE_WRITES_SIZE);
List<HoodieRecord> samples = records.coalesce(1).take(size);
sampleWriteClient.startCommitWithTime(instantTime);
JavaRDD<WriteStatus> writeStatusRDD = sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime);
if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0) {
LOG.error(String.format("sample writes for table %s failed with errors.", writeConfig.getTableName()));
if (LOG.isTraceEnabled()) {
LOG.trace("Printing out the top 100 errors");
writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
LOG.trace("Global error :", ws.getGlobalError());
ws.getErrors().forEach((key, throwable) ->
LOG.trace(String.format("Error for key: %s", key), throwable));
});
return recordsOpt.map(records -> {
List<HoodieRecord> samples = records.coalesce(1).take(size);
if (samples.isEmpty()) {
return emptyRes;
}
return Pair.of(false, null);
} else {
return Pair.of(true, sampleWritesBasePath);
}
sampleWriteClient.startCommitWithTime(instantTime);
JavaRDD<WriteStatus> writeStatusRDD = sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime);
if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0) {
LOG.error(String.format("sample writes for table %s failed with errors.", writeConfig.getTableName()));
if (LOG.isTraceEnabled()) {
LOG.trace("Printing out the top 100 errors");
writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
LOG.trace("Global error :", ws.getGlobalError());
ws.getErrors().forEach((key, throwable) ->
LOG.trace(String.format("Error for key: %s", key), throwable));
});
}
return emptyRes;
} else {
return Pair.of(true, sampleWritesBasePath);
}
}).orElse(emptyRes);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,32 +402,26 @@ public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException
.build();
String instantTime = metaClient.createNewInstantTime();

Pair<InputBatch,Boolean> inputBatchIsEmptyPair = readFromSource(instantTime, metaClient);
InputBatch inputBatch = readFromSource(instantTime, metaClient);

if (inputBatchIsEmptyPair != null) {
final JavaRDD<HoodieRecord> recordsFromSource;
if (useRowWriter) {
recordsFromSource = hoodieSparkContext.emptyRDD();
} else {
recordsFromSource = (JavaRDD<HoodieRecord>) inputBatchIsEmptyPair.getKey().getBatch().get();
}
if (inputBatch != null) {

// this is the first input batch. If schemaProvider not set, use it and register Avro Schema and start
// compactor
if (writeClient == null) {
this.schemaProvider = inputBatchIsEmptyPair.getKey().getSchemaProvider();
this.schemaProvider = inputBatch.getSchemaProvider();
// Setup HoodieWriteClient and compaction now that we decided on schema
setupWriteClient(recordsFromSource);
setupWriteClient(inputBatch.getBatch());
} else {
Schema newSourceSchema = inputBatchIsEmptyPair.getKey().getSchemaProvider().getSourceSchema();
Schema newTargetSchema = inputBatchIsEmptyPair.getKey().getSchemaProvider().getTargetSchema();
Schema newSourceSchema = inputBatch.getSchemaProvider().getSourceSchema();
Schema newTargetSchema = inputBatch.getSchemaProvider().getTargetSchema();
if ((newSourceSchema != null && !processedSchema.isSchemaPresent(newSourceSchema))
|| (newTargetSchema != null && !processedSchema.isSchemaPresent(newTargetSchema))) {
String sourceStr = newSourceSchema == null ? NULL_PLACEHOLDER : newSourceSchema.toString(true);
String targetStr = newTargetSchema == null ? NULL_PLACEHOLDER : newTargetSchema.toString(true);
LOG.info("Seeing new schema. Source: {0}, Target: {1}", sourceStr, targetStr);
// We need to recreate write client with new schema and register them.
reInitWriteClient(newSourceSchema, newTargetSchema, recordsFromSource);
reInitWriteClient(newSourceSchema, newTargetSchema, inputBatch.getBatch());
if (newSourceSchema != null) {
processedSchema.addSchema(newSourceSchema);
}
Expand All @@ -454,7 +448,7 @@ public Pair<Option<String>, JavaRDD<WriteStatus>> syncOnce() throws IOException
}
}

result = writeToSinkAndDoMetaSync(instantTime, inputBatchIsEmptyPair.getKey(), inputBatchIsEmptyPair.getValue(), metrics, overallTimerContext);
result = writeToSinkAndDoMetaSync(instantTime, inputBatch, metrics, overallTimerContext);
}

metrics.updateStreamerSyncMetrics(System.currentTimeMillis());
Expand Down Expand Up @@ -484,7 +478,7 @@ private Option<String> getLastPendingCompactionInstant(Option<HoodieTimeline> co
* @throws Exception in case of any Exception
*/

public Pair<InputBatch, Boolean> readFromSource(String instantTime, HoodieTableMetaClient metaClient) throws IOException {
public InputBatch readFromSource(String instantTime, HoodieTableMetaClient metaClient) throws IOException {
// Retrieve the previous round checkpoints, if any
Option<String> resumeCheckpointStr = Option.empty();
if (commitsTimelineOpt.isPresent()) {
Expand All @@ -499,7 +493,7 @@ public Pair<InputBatch, Boolean> readFromSource(String instantTime, HoodieTableM

int maxRetryCount = cfg.retryOnSourceFailures ? cfg.maxRetryCount : 1;
int curRetryCount = 0;
Pair<InputBatch, Boolean> sourceDataToSync = null;
InputBatch sourceDataToSync = null;
while (curRetryCount++ < maxRetryCount && sourceDataToSync == null) {
try {
sourceDataToSync = fetchFromSourceAndPrepareRecords(resumeCheckpointStr, instantTime, metaClient);
Expand All @@ -519,7 +513,7 @@ public Pair<InputBatch, Boolean> readFromSource(String instantTime, HoodieTableM
return sourceDataToSync;
}

private Pair<InputBatch, Boolean> fetchFromSourceAndPrepareRecords(Option<String> resumeCheckpointStr, String instantTime,
private InputBatch fetchFromSourceAndPrepareRecords(Option<String> resumeCheckpointStr, String instantTime,
HoodieTableMetaClient metaClient) {
HoodieRecordType recordType = createRecordMerger(props).getRecordType();
if (recordType == HoodieRecordType.SPARK && HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ
Expand All @@ -544,17 +538,14 @@ private Pair<InputBatch, Boolean> fetchFromSourceAndPrepareRecords(Option<String

// handle empty batch with change in checkpoint
hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking if input is empty");
Pair<InputBatch, Boolean> preparedInputBatchIsEmptyPair = handleEmptyBatch(useRowWriter, inputBatch, checkpointStr, schemaProvider);
if (preparedInputBatchIsEmptyPair.getValue()) { // return if empty batch
return preparedInputBatchIsEmptyPair;
}


if (useRowWriter) { // no additional processing required for row writer.
return Pair.of(inputBatch, false);
return inputBatch;
} else {
JavaRDD<HoodieRecord> records = HoodieStreamerUtils.createHoodieRecords(cfg, props, inputBatch.getBatch(), schemaProvider,
recordType, autoGenerateRecordKeys, instantTime);
return Pair.of(new InputBatch(Option.of(records), checkpointStr, schemaProvider), false);
return new InputBatch(Option.of(records), checkpointStr, schemaProvider);
}
}

Expand Down Expand Up @@ -652,33 +643,6 @@ private InputBatch fetchNextBatchFromSource(Option<String> resumeCheckpointStr,
}
}

/**
* Handles empty batch from input.
* @param useRowWriter true if row write code path.
* @param inputBatch {@link InputBatch} instance to use.
* @param checkpointForNextBatch checkpiont to use for next batch.
* @param schemaProvider {@link SchemaProvider} instance of interest.
* @return a Pair of InputBatch and boolean. boolean value is set to true on empty batch.
*/
private Pair<InputBatch, Boolean> handleEmptyBatch(boolean useRowWriter, InputBatch inputBatch,
String checkpointForNextBatch, SchemaProvider schemaProvider) {
hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking if input is empty");
if (useRowWriter) {
Option<Dataset<Row>> rowDatasetOptional = inputBatch.getBatch();
if ((!rowDatasetOptional.isPresent()) || (rowDatasetOptional.get().isEmpty())) {
LOG.info("No new data, perform empty commit.");
return Pair.of(new InputBatch<>(Option.of(sparkSession.emptyDataFrame()), checkpointForNextBatch, schemaProvider), true);
}
} else {
Option<JavaRDD<GenericRecord>> avroRDDOptional = inputBatch.getBatch();
if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) {
LOG.info("No new data, perform empty commit.");
return Pair.of(new InputBatch(Option.of(hoodieSparkContext.emptyRDD()), checkpointForNextBatch, schemaProvider), true);
}
}
return Pair.of(inputBatch, false);
}

/**
* Apply schema reconcile and schema evolution rules(schema on read) and generate new target schema provider.
*
Expand Down Expand Up @@ -801,24 +765,28 @@ private HoodieWriteConfig prepareHoodieConfigForRowWriter(Schema writerSchema) {
*
* @param instantTime instant time to use for ingest.
* @param inputBatch input batch that contains the records, checkpoint, and schema provider
* @param inputIsEmpty true if input batch is empty.
* @param metrics Metrics
* @param overallTimerContext Timer Context
* @return Option Compaction instant if one is scheduled
*/
private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch, boolean inputIsEmpty,
private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch,
HoodieIngestionMetrics metrics,
Timer.Context overallTimerContext) {
Option<String> scheduledCompactionInstant = Option.empty();
// write to hudi and fetch result
Pair<WriteClientWriteResult, Boolean> writeClientWriteResultIsEmptyPair = writeToSink(inputBatch, instantTime, inputIsEmpty);
JavaRDD<WriteStatus> writeStatusRDD = writeClientWriteResultIsEmptyPair.getKey().getWriteStatusRDD();
Map<String, List<String>> partitionToReplacedFileIds = writeClientWriteResultIsEmptyPair.getKey().getPartitionToReplacedFileIds();
boolean isEmpty = writeClientWriteResultIsEmptyPair.getRight();
WriteClientWriteResult writeClientWriteResult = writeToSink(inputBatch, instantTime);
JavaRDD<WriteStatus> writeStatusRDD = writeClientWriteResult.getWriteStatusRDD();
Map<String, List<String>> partitionToReplacedFileIds = writeClientWriteResult.getPartitionToReplacedFileIds();

// process write status
long totalErrorRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
long totalRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
long totalSuccessfulRecords = totalRecords - totalErrorRecords;
LOG.info(String.format("instantTime=%s, totalRecords=%d, totalErrorRecords=%d, totalSuccessfulRecords=%d",
instantTime, totalRecords, totalErrorRecords, totalSuccessfulRecords));
if (totalRecords == 0) {
LOG.info("No new data, perform empty commit.");
}
boolean hasErrors = totalErrorRecords > 0;
if (!hasErrors || cfg.commitOnErrors) {
HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
Expand Down Expand Up @@ -863,8 +831,10 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSinkAndDoMetaSync(Stri
scheduledCompactionInstant = writeClient.scheduleCompaction(Option.empty());
}

if (!isEmpty || cfg.forceEmptyMetaSync) {
if ((totalSuccessfulRecords > 0) || cfg.forceEmptyMetaSync) {
runMetaSync();
} else {
LOG.info(String.format("Not running metaSync totalSuccessfulRecords=%d", totalSuccessfulRecords));
}
} else {
LOG.info("Commit " + instantTime + " failed!");
Expand Down Expand Up @@ -924,22 +894,20 @@ private String startCommit(String instantTime, boolean retryEnabled) {
throw lastException;
}

private Pair<WriteClientWriteResult, Boolean> writeToSink(InputBatch inputBatch, String instantTime, boolean inputIsEmpty) {
private WriteClientWriteResult writeToSink(InputBatch inputBatch, String instantTime) {
WriteClientWriteResult writeClientWriteResult = null;
instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
boolean isEmpty = inputIsEmpty;

if (useRowWriter) {
Dataset<Row> df = (Dataset<Row>) inputBatch.getBatch().get();
Dataset<Row> df = (Dataset<Row>) inputBatch.getBatch().orElse(hoodieSparkContext.emptyRDD());
HoodieWriteConfig hoodieWriteConfig = prepareHoodieConfigForRowWriter(inputBatch.getSchemaProvider().getTargetSchema());
BaseDatasetBulkInsertCommitActionExecutor executor = new HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig, writeClient, instantTime);
writeClientWriteResult = new WriteClientWriteResult(executor.execute(df, !HoodieStreamerUtils.getPartitionColumns(props).isEmpty()).getWriteStatuses());
} else {
JavaRDD<HoodieRecord> records = (JavaRDD<HoodieRecord>) inputBatch.getBatch().get();
JavaRDD<HoodieRecord> records = (JavaRDD<HoodieRecord>) inputBatch.getBatch().orElse(hoodieSparkContext.emptyRDD());
// filter dupes if needed
if (cfg.filterDupes) {
records = DataSourceUtils.dropDuplicates(hoodieSparkContext.jsc(), records, writeClient.getConfig());
isEmpty = records.isEmpty();
}

HoodieWriteResult writeResult = null;
Expand Down Expand Up @@ -973,7 +941,7 @@ private Pair<WriteClientWriteResult, Boolean> writeToSink(InputBatch inputBatch,
throw new HoodieStreamerException("Unknown operation : " + cfg.operation);
}
}
return Pair.of(writeClientWriteResult, isEmpty);
return writeClientWriteResult;
}

private String getSyncClassShortName(String syncClassName) {
Expand Down Expand Up @@ -1028,23 +996,23 @@ public void runMetaSync() {
* SchemaProvider creation is a precursor to HoodieWriteClient and AsyncCompactor creation. This method takes care of
* this constraint.
*/
private void setupWriteClient(JavaRDD<HoodieRecord> records) throws IOException {
private void setupWriteClient(Option<JavaRDD<HoodieRecord>> recordsOpt) throws IOException {
if ((null != schemaProvider)) {
Schema sourceSchema = schemaProvider.getSourceSchema();
Schema targetSchema = schemaProvider.getTargetSchema();
reInitWriteClient(sourceSchema, targetSchema, records);
reInitWriteClient(sourceSchema, targetSchema, recordsOpt);
}
}

private void reInitWriteClient(Schema sourceSchema, Schema targetSchema, JavaRDD<HoodieRecord> records) throws IOException {
private void reInitWriteClient(Schema sourceSchema, Schema targetSchema, Option<JavaRDD<HoodieRecord>> recordsOpt) throws IOException {
LOG.info("Setting up new Hoodie Write Client");
if (HoodieStreamerUtils.isDropPartitionColumns(props)) {
targetSchema = HoodieAvroUtils.removeFields(targetSchema, HoodieStreamerUtils.getPartitionColumns(props));
}
registerAvroSchemas(sourceSchema, targetSchema);
final HoodieWriteConfig initialWriteConfig = getHoodieClientConfig(targetSchema);
final HoodieWriteConfig writeConfig = SparkSampleWritesUtils
.getWriteConfigWithRecordSizeEstimate(hoodieSparkContext.jsc(), records, initialWriteConfig)
.getWriteConfigWithRecordSizeEstimate(hoodieSparkContext.jsc(), recordsOpt, initialWriteConfig)
.orElse(initialWriteConfig);

if (writeConfig.isEmbeddedTimelineServerEnabled()) {
Expand Down
Loading