diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java index d4665282be12f..8809213582acc 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java @@ -87,7 +87,7 @@ public Pair>> fetchSource() t .setBasePath(service.getCfg().targetBasePath) .build(); String instantTime = InProcessTimeGenerator.createNewInstantTime(); - InputBatch inputBatch = service.readFromSource(instantTime, metaClient).getLeft(); + InputBatch inputBatch = service.readFromSource(instantTime, metaClient); return Pair.of(inputBatch.getSchemaProvider(), Pair.of(inputBatch.getCheckpointForNextBatch(), (JavaRDD) inputBatch.getBatch().get())); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java index ad1de230f4149..a6f9513a14e3c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java @@ -70,63 +70,63 @@ public class HoodieStreamerUtils { * Takes care of dropping columns, precombine, auto key generation. * Both AVRO and SPARK record types are supported. */ - static JavaRDD createHoodieRecords(HoodieStreamer.Config cfg, TypedProperties props, Option> avroRDDOptional, + static Option> createHoodieRecords(HoodieStreamer.Config cfg, TypedProperties props, Option> avroRDDOptional, SchemaProvider schemaProvider, HoodieRecord.HoodieRecordType recordType, boolean autoGenerateRecordKeys, String instantTime) { boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT); Set partitionColumns = getPartitionColumns(props); - JavaRDD avroRDD = avroRDDOptional.get(); + return avroRDDOptional.map(avroRDD -> { + JavaRDD records; + SerializableSchema avroSchema = new SerializableSchema(schemaProvider.getTargetSchema()); + SerializableSchema processedAvroSchema = new SerializableSchema(isDropPartitionColumns(props) ? HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get()); + if (recordType == HoodieRecord.HoodieRecordType.AVRO) { + records = avroRDD.mapPartitions( + (FlatMapFunction, HoodieRecord>) genericRecordIterator -> { + if (autoGenerateRecordKeys) { + props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId())); + props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime); + } + BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + List avroRecords = new ArrayList<>(); + while (genericRecordIterator.hasNext()) { + GenericRecord genRec = genericRecordIterator.next(); + HoodieKey hoodieKey = new HoodieKey(builtinKeyGenerator.getRecordKey(genRec), builtinKeyGenerator.getPartitionPath(genRec)); + GenericRecord gr = isDropPartitionColumns(props) ? HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec; + HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr, + (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, props.getBoolean( + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), + Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())))) + : DataSourceUtils.createPayload(cfg.payloadClassName, gr); + avroRecords.add(new HoodieAvroRecord<>(hoodieKey, payload)); + } + return avroRecords.iterator(); + }); + } else if (recordType == HoodieRecord.HoodieRecordType.SPARK) { + // TODO we should remove it if we can read InternalRow from source. + records = avroRDD.mapPartitions(itr -> { + if (autoGenerateRecordKeys) { + props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId())); + props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime); + } + BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + StructType baseStructType = AvroConversionUtils.convertAvroSchemaToStructType(processedAvroSchema.get()); + StructType targetStructType = isDropPartitionColumns(props) ? AvroConversionUtils + .convertAvroSchemaToStructType(HoodieAvroUtils.removeFields(processedAvroSchema.get(), partitionColumns)) : baseStructType; + HoodieAvroDeserializer deserializer = SparkAdapterSupport$.MODULE$.sparkAdapter().createAvroDeserializer(processedAvroSchema.get(), baseStructType); - JavaRDD records; - SerializableSchema avroSchema = new SerializableSchema(schemaProvider.getTargetSchema()); - SerializableSchema processedAvroSchema = new SerializableSchema(isDropPartitionColumns(props) ? HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get()); - if (recordType == HoodieRecord.HoodieRecordType.AVRO) { - records = avroRDD.mapPartitions( - (FlatMapFunction, HoodieRecord>) genericRecordIterator -> { - if (autoGenerateRecordKeys) { - props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId())); - props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime); - } - BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); - List avroRecords = new ArrayList<>(); - while (genericRecordIterator.hasNext()) { - GenericRecord genRec = genericRecordIterator.next(); - HoodieKey hoodieKey = new HoodieKey(builtinKeyGenerator.getRecordKey(genRec), builtinKeyGenerator.getPartitionPath(genRec)); - GenericRecord gr = isDropPartitionColumns(props) ? HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec; - HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr, - (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, props.getBoolean( - KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), - Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())))) - : DataSourceUtils.createPayload(cfg.payloadClassName, gr); - avroRecords.add(new HoodieAvroRecord<>(hoodieKey, payload)); - } - return avroRecords.iterator(); + return new CloseableMappingIterator<>(ClosableIterator.wrap(itr), rec -> { + InternalRow row = (InternalRow) deserializer.deserialize(rec).get(); + String recordKey = builtinKeyGenerator.getRecordKey(row, baseStructType).toString(); + String partitionPath = builtinKeyGenerator.getPartitionPath(row, baseStructType).toString(); + return new HoodieSparkRecord(new HoodieKey(recordKey, partitionPath), + HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, targetStructType).apply(row), targetStructType, false); }); - } else if (recordType == HoodieRecord.HoodieRecordType.SPARK) { - // TODO we should remove it if we can read InternalRow from source. - records = avroRDD.mapPartitions(itr -> { - if (autoGenerateRecordKeys) { - props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId())); - props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime); - } - BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); - StructType baseStructType = AvroConversionUtils.convertAvroSchemaToStructType(processedAvroSchema.get()); - StructType targetStructType = isDropPartitionColumns(props) ? AvroConversionUtils - .convertAvroSchemaToStructType(HoodieAvroUtils.removeFields(processedAvroSchema.get(), partitionColumns)) : baseStructType; - HoodieAvroDeserializer deserializer = SparkAdapterSupport$.MODULE$.sparkAdapter().createAvroDeserializer(processedAvroSchema.get(), baseStructType); - - return new CloseableMappingIterator<>(ClosableIterator.wrap(itr), rec -> { - InternalRow row = (InternalRow) deserializer.deserialize(rec).get(); - String recordKey = builtinKeyGenerator.getRecordKey(row, baseStructType).toString(); - String partitionPath = builtinKeyGenerator.getPartitionPath(row, baseStructType).toString(); - return new HoodieSparkRecord(new HoodieKey(recordKey, partitionPath), - HoodieInternalRowUtils.getCachedUnsafeProjection(baseStructType, targetStructType).apply(row), targetStructType, false); }); - }); - } else { - throw new UnsupportedOperationException(recordType.name()); - } - return records; + } else { + throw new UnsupportedOperationException(recordType.name()); + } + return records; + }); } /** diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java index 6c87f53a56522..0fd7a41ab5563 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SparkSampleWritesUtils.java @@ -64,7 +64,7 @@ public class SparkSampleWritesUtils { private static final Logger LOG = LoggerFactory.getLogger(SparkSampleWritesUtils.class); - public static Option getWriteConfigWithRecordSizeEstimate(JavaSparkContext jsc, JavaRDD records, HoodieWriteConfig writeConfig) { + public static Option getWriteConfigWithRecordSizeEstimate(JavaSparkContext jsc, Option> recordsOpt, HoodieWriteConfig writeConfig) { if (!writeConfig.getBoolean(SAMPLE_WRITES_ENABLED)) { LOG.debug("Skip overwriting record size estimate as it's disabled."); return Option.empty(); @@ -76,7 +76,7 @@ public static Option getWriteConfigWithRecordSizeEstimate(Jav } try { String instantTime = getInstantFromTemporalAccessor(Instant.now().atZone(ZoneId.systemDefault())); - Pair result = doSampleWrites(jsc, records, writeConfig, instantTime); + Pair result = doSampleWrites(jsc, recordsOpt, writeConfig, instantTime); if (result.getLeft()) { long avgSize = getAvgSizeFromSampleWrites(jsc, result.getRight()); LOG.info("Overwriting record size estimate to " + avgSize); @@ -90,7 +90,7 @@ public static Option getWriteConfigWithRecordSizeEstimate(Jav return Option.empty(); } - private static Pair doSampleWrites(JavaSparkContext jsc, JavaRDD records, HoodieWriteConfig writeConfig, String instantTime) + private static Pair doSampleWrites(JavaSparkContext jsc, Option> recordsOpt, HoodieWriteConfig writeConfig, String instantTime) throws IOException { final String sampleWritesBasePath = getSampleWritesBasePath(jsc, writeConfig, instantTime); HoodieTableMetaClient.withPropertyBuilder() @@ -109,25 +109,31 @@ private static Pair doSampleWrites(JavaSparkContext jsc, JavaRD .withAutoCommit(true) .withPath(sampleWritesBasePath) .build(); + Pair emptyRes = Pair.of(false, null); try (SparkRDDWriteClient sampleWriteClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), sampleWriteConfig, Option.empty())) { int size = writeConfig.getIntOrDefault(SAMPLE_WRITES_SIZE); - List samples = records.coalesce(1).take(size); - sampleWriteClient.startCommitWithTime(instantTime); - JavaRDD writeStatusRDD = sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime); - if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0) { - LOG.error(String.format("sample writes for table %s failed with errors.", writeConfig.getTableName())); - if (LOG.isTraceEnabled()) { - LOG.trace("Printing out the top 100 errors"); - writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> { - LOG.trace("Global error :", ws.getGlobalError()); - ws.getErrors().forEach((key, throwable) -> - LOG.trace(String.format("Error for key: %s", key), throwable)); - }); + return recordsOpt.map(records -> { + List samples = records.coalesce(1).take(size); + if (samples.isEmpty()) { + return emptyRes; } - return Pair.of(false, null); - } else { - return Pair.of(true, sampleWritesBasePath); - } + sampleWriteClient.startCommitWithTime(instantTime); + JavaRDD writeStatusRDD = sampleWriteClient.bulkInsert(jsc.parallelize(samples, 1), instantTime); + if (writeStatusRDD.filter(WriteStatus::hasErrors).count() > 0) { + LOG.error(String.format("sample writes for table %s failed with errors.", writeConfig.getTableName())); + if (LOG.isTraceEnabled()) { + LOG.trace("Printing out the top 100 errors"); + writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> { + LOG.trace("Global error :", ws.getGlobalError()); + ws.getErrors().forEach((key, throwable) -> + LOG.trace(String.format("Error for key: %s", key), throwable)); + }); + } + return emptyRes; + } else { + return Pair.of(true, sampleWritesBasePath); + } + }).orElse(emptyRes); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index fc0160f57e23a..294ba3fe13c0d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -402,32 +402,26 @@ public Pair, JavaRDD> syncOnce() throws IOException .build(); String instantTime = metaClient.createNewInstantTime(); - Pair inputBatchIsEmptyPair = readFromSource(instantTime, metaClient); + InputBatch inputBatch = readFromSource(instantTime, metaClient); - if (inputBatchIsEmptyPair != null) { - final JavaRDD recordsFromSource; - if (useRowWriter) { - recordsFromSource = hoodieSparkContext.emptyRDD(); - } else { - recordsFromSource = (JavaRDD) inputBatchIsEmptyPair.getKey().getBatch().get(); - } + if (inputBatch != null) { // this is the first input batch. If schemaProvider not set, use it and register Avro Schema and start // compactor if (writeClient == null) { - this.schemaProvider = inputBatchIsEmptyPair.getKey().getSchemaProvider(); + this.schemaProvider = inputBatch.getSchemaProvider(); // Setup HoodieWriteClient and compaction now that we decided on schema - setupWriteClient(recordsFromSource); + setupWriteClient(inputBatch.getBatch()); } else { - Schema newSourceSchema = inputBatchIsEmptyPair.getKey().getSchemaProvider().getSourceSchema(); - Schema newTargetSchema = inputBatchIsEmptyPair.getKey().getSchemaProvider().getTargetSchema(); + Schema newSourceSchema = inputBatch.getSchemaProvider().getSourceSchema(); + Schema newTargetSchema = inputBatch.getSchemaProvider().getTargetSchema(); if ((newSourceSchema != null && !processedSchema.isSchemaPresent(newSourceSchema)) || (newTargetSchema != null && !processedSchema.isSchemaPresent(newTargetSchema))) { String sourceStr = newSourceSchema == null ? NULL_PLACEHOLDER : newSourceSchema.toString(true); String targetStr = newTargetSchema == null ? NULL_PLACEHOLDER : newTargetSchema.toString(true); LOG.info("Seeing new schema. Source: {0}, Target: {1}", sourceStr, targetStr); // We need to recreate write client with new schema and register them. - reInitWriteClient(newSourceSchema, newTargetSchema, recordsFromSource); + reInitWriteClient(newSourceSchema, newTargetSchema, inputBatch.getBatch()); if (newSourceSchema != null) { processedSchema.addSchema(newSourceSchema); } @@ -454,7 +448,7 @@ public Pair, JavaRDD> syncOnce() throws IOException } } - result = writeToSinkAndDoMetaSync(instantTime, inputBatchIsEmptyPair.getKey(), inputBatchIsEmptyPair.getValue(), metrics, overallTimerContext); + result = writeToSinkAndDoMetaSync(instantTime, inputBatch, metrics, overallTimerContext); } metrics.updateStreamerSyncMetrics(System.currentTimeMillis()); @@ -484,7 +478,7 @@ private Option getLastPendingCompactionInstant(Option co * @throws Exception in case of any Exception */ - public Pair readFromSource(String instantTime, HoodieTableMetaClient metaClient) throws IOException { + public InputBatch readFromSource(String instantTime, HoodieTableMetaClient metaClient) throws IOException { // Retrieve the previous round checkpoints, if any Option resumeCheckpointStr = Option.empty(); if (commitsTimelineOpt.isPresent()) { @@ -499,7 +493,7 @@ public Pair readFromSource(String instantTime, HoodieTableM int maxRetryCount = cfg.retryOnSourceFailures ? cfg.maxRetryCount : 1; int curRetryCount = 0; - Pair sourceDataToSync = null; + InputBatch sourceDataToSync = null; while (curRetryCount++ < maxRetryCount && sourceDataToSync == null) { try { sourceDataToSync = fetchFromSourceAndPrepareRecords(resumeCheckpointStr, instantTime, metaClient); @@ -519,7 +513,7 @@ public Pair readFromSource(String instantTime, HoodieTableM return sourceDataToSync; } - private Pair fetchFromSourceAndPrepareRecords(Option resumeCheckpointStr, String instantTime, + private InputBatch fetchFromSourceAndPrepareRecords(Option resumeCheckpointStr, String instantTime, HoodieTableMetaClient metaClient) { HoodieRecordType recordType = createRecordMerger(props).getRecordType(); if (recordType == HoodieRecordType.SPARK && HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ @@ -544,17 +538,14 @@ private Pair fetchFromSourceAndPrepareRecords(Option preparedInputBatchIsEmptyPair = handleEmptyBatch(useRowWriter, inputBatch, checkpointStr, schemaProvider); - if (preparedInputBatchIsEmptyPair.getValue()) { // return if empty batch - return preparedInputBatchIsEmptyPair; - } + if (useRowWriter) { // no additional processing required for row writer. - return Pair.of(inputBatch, false); + return inputBatch; } else { - JavaRDD records = HoodieStreamerUtils.createHoodieRecords(cfg, props, inputBatch.getBatch(), schemaProvider, + Option> recordsOpt = HoodieStreamerUtils.createHoodieRecords(cfg, props, inputBatch.getBatch(), schemaProvider, recordType, autoGenerateRecordKeys, instantTime); - return Pair.of(new InputBatch(Option.of(records), checkpointStr, schemaProvider), false); + return new InputBatch(recordsOpt, checkpointStr, schemaProvider); } } @@ -652,33 +643,6 @@ private InputBatch fetchNextBatchFromSource(Option resumeCheckpointStr, } } - /** - * Handles empty batch from input. - * @param useRowWriter true if row write code path. - * @param inputBatch {@link InputBatch} instance to use. - * @param checkpointForNextBatch checkpiont to use for next batch. - * @param schemaProvider {@link SchemaProvider} instance of interest. - * @return a Pair of InputBatch and boolean. boolean value is set to true on empty batch. - */ - private Pair handleEmptyBatch(boolean useRowWriter, InputBatch inputBatch, - String checkpointForNextBatch, SchemaProvider schemaProvider) { - hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking if input is empty"); - if (useRowWriter) { - Option> rowDatasetOptional = inputBatch.getBatch(); - if ((!rowDatasetOptional.isPresent()) || (rowDatasetOptional.get().isEmpty())) { - LOG.info("No new data, perform empty commit."); - return Pair.of(new InputBatch<>(Option.of(sparkSession.emptyDataFrame()), checkpointForNextBatch, schemaProvider), true); - } - } else { - Option> avroRDDOptional = inputBatch.getBatch(); - if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) { - LOG.info("No new data, perform empty commit."); - return Pair.of(new InputBatch(Option.of(hoodieSparkContext.emptyRDD()), checkpointForNextBatch, schemaProvider), true); - } - } - return Pair.of(inputBatch, false); - } - /** * Apply schema reconcile and schema evolution rules(schema on read) and generate new target schema provider. * @@ -801,24 +765,28 @@ private HoodieWriteConfig prepareHoodieConfigForRowWriter(Schema writerSchema) { * * @param instantTime instant time to use for ingest. * @param inputBatch input batch that contains the records, checkpoint, and schema provider - * @param inputIsEmpty true if input batch is empty. * @param metrics Metrics * @param overallTimerContext Timer Context * @return Option Compaction instant if one is scheduled */ - private Pair, JavaRDD> writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch, boolean inputIsEmpty, + private Pair, JavaRDD> writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch, HoodieIngestionMetrics metrics, Timer.Context overallTimerContext) { Option scheduledCompactionInstant = Option.empty(); // write to hudi and fetch result - Pair writeClientWriteResultIsEmptyPair = writeToSink(inputBatch, instantTime, inputIsEmpty); - JavaRDD writeStatusRDD = writeClientWriteResultIsEmptyPair.getKey().getWriteStatusRDD(); - Map> partitionToReplacedFileIds = writeClientWriteResultIsEmptyPair.getKey().getPartitionToReplacedFileIds(); - boolean isEmpty = writeClientWriteResultIsEmptyPair.getRight(); + WriteClientWriteResult writeClientWriteResult = writeToSink(inputBatch, instantTime); + JavaRDD writeStatusRDD = writeClientWriteResult.getWriteStatusRDD(); + Map> partitionToReplacedFileIds = writeClientWriteResult.getPartitionToReplacedFileIds(); // process write status long totalErrorRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue(); long totalRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue(); + long totalSuccessfulRecords = totalRecords - totalErrorRecords; + LOG.info(String.format("instantTime=%s, totalRecords=%d, totalErrorRecords=%d, totalSuccessfulRecords=%d", + instantTime, totalRecords, totalErrorRecords, totalSuccessfulRecords)); + if (totalRecords == 0) { + LOG.info("No new data, perform empty commit."); + } boolean hasErrors = totalErrorRecords > 0; if (!hasErrors || cfg.commitOnErrors) { HashMap checkpointCommitMetadata = new HashMap<>(); @@ -863,8 +831,10 @@ private Pair, JavaRDD> writeToSinkAndDoMetaSync(Stri scheduledCompactionInstant = writeClient.scheduleCompaction(Option.empty()); } - if (!isEmpty || cfg.forceEmptyMetaSync) { + if ((totalSuccessfulRecords > 0) || cfg.forceEmptyMetaSync) { runMetaSync(); + } else { + LOG.info(String.format("Not running metaSync totalSuccessfulRecords=%d", totalSuccessfulRecords)); } } else { LOG.info("Commit " + instantTime + " failed!"); @@ -924,22 +894,20 @@ private String startCommit(String instantTime, boolean retryEnabled) { throw lastException; } - private Pair writeToSink(InputBatch inputBatch, String instantTime, boolean inputIsEmpty) { + private WriteClientWriteResult writeToSink(InputBatch inputBatch, String instantTime) { WriteClientWriteResult writeClientWriteResult = null; instantTime = startCommit(instantTime, !autoGenerateRecordKeys); - boolean isEmpty = inputIsEmpty; if (useRowWriter) { - Dataset df = (Dataset) inputBatch.getBatch().get(); + Dataset df = (Dataset) inputBatch.getBatch().orElse(hoodieSparkContext.emptyRDD()); HoodieWriteConfig hoodieWriteConfig = prepareHoodieConfigForRowWriter(inputBatch.getSchemaProvider().getTargetSchema()); BaseDatasetBulkInsertCommitActionExecutor executor = new HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig, writeClient, instantTime); writeClientWriteResult = new WriteClientWriteResult(executor.execute(df, !HoodieStreamerUtils.getPartitionColumns(props).isEmpty()).getWriteStatuses()); } else { - JavaRDD records = (JavaRDD) inputBatch.getBatch().get(); + JavaRDD records = (JavaRDD) inputBatch.getBatch().orElse(hoodieSparkContext.emptyRDD()); // filter dupes if needed if (cfg.filterDupes) { records = DataSourceUtils.dropDuplicates(hoodieSparkContext.jsc(), records, writeClient.getConfig()); - isEmpty = records.isEmpty(); } HoodieWriteResult writeResult = null; @@ -973,7 +941,7 @@ private Pair writeToSink(InputBatch inputBatch, throw new HoodieStreamerException("Unknown operation : " + cfg.operation); } } - return Pair.of(writeClientWriteResult, isEmpty); + return writeClientWriteResult; } private String getSyncClassShortName(String syncClassName) { @@ -1028,15 +996,15 @@ public void runMetaSync() { * SchemaProvider creation is a precursor to HoodieWriteClient and AsyncCompactor creation. This method takes care of * this constraint. */ - private void setupWriteClient(JavaRDD records) throws IOException { + private void setupWriteClient(Option> recordsOpt) throws IOException { if ((null != schemaProvider)) { Schema sourceSchema = schemaProvider.getSourceSchema(); Schema targetSchema = schemaProvider.getTargetSchema(); - reInitWriteClient(sourceSchema, targetSchema, records); + reInitWriteClient(sourceSchema, targetSchema, recordsOpt); } } - private void reInitWriteClient(Schema sourceSchema, Schema targetSchema, JavaRDD records) throws IOException { + private void reInitWriteClient(Schema sourceSchema, Schema targetSchema, Option> recordsOpt) throws IOException { LOG.info("Setting up new Hoodie Write Client"); if (HoodieStreamerUtils.isDropPartitionColumns(props)) { targetSchema = HoodieAvroUtils.removeFields(targetSchema, HoodieStreamerUtils.getPartitionColumns(props)); @@ -1044,7 +1012,7 @@ private void reInitWriteClient(Schema sourceSchema, Schema targetSchema, JavaRDD registerAvroSchemas(sourceSchema, targetSchema); final HoodieWriteConfig initialWriteConfig = getHoodieClientConfig(targetSchema); final HoodieWriteConfig writeConfig = SparkSampleWritesUtils - .getWriteConfigWithRecordSizeEstimate(hoodieSparkContext.jsc(), records, initialWriteConfig) + .getWriteConfigWithRecordSizeEstimate(hoodieSparkContext.jsc(), recordsOpt, initialWriteConfig) .orElse(initialWriteConfig); if (writeConfig.isEmbeddedTimelineServerEnabled()) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java index db7cb54fe7696..9c985dd0cf593 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java @@ -466,8 +466,8 @@ public void testNonNullableColumnDrop(String tableType, .stream().anyMatch(t -> t.getType().equals(Schema.Type.STRING))); assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant) > 0); } catch (Exception e) { - assertTrue(e.getMessage().contains("java.lang.NullPointerException") - || e.getMessage().contains("Incoming batch schema is not compatible with the table's one")); + assertTrue(containsErrorMessage(e, "java.lang.NullPointerException", + "Incoming batch schema is not compatible with the table's one")); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java index e1676219ca0a5..2706a97e5d5c0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSparkSampleWritesUtils.java @@ -80,7 +80,7 @@ public void skipOverwriteRecordSizeEstimateWhenTimelineNonEmpty() throws Excepti .withPath(basePath()) .build(); JavaRDD records = jsc().parallelize(dataGen.generateInserts(commitTime, 1), 1); - Option writeConfigOpt = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), records, originalWriteConfig); + Option writeConfigOpt = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), Option.of(records), originalWriteConfig); assertFalse(writeConfigOpt.isPresent()); assertEquals(originalRecordSize, originalWriteConfig.getCopyOnWriteRecordSizeEstimate(), "Original record size estimate should not be changed."); } @@ -100,7 +100,7 @@ public void overwriteRecordSizeEstimateForEmptyTable() { String commitTime = HoodieTestDataGenerator.getCommitTimeAtUTC(1); JavaRDD records = jsc().parallelize(dataGen.generateInserts(commitTime, 2000), 2); - Option writeConfigOpt = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), records, originalWriteConfig); + Option writeConfigOpt = SparkSampleWritesUtils.getWriteConfigWithRecordSizeEstimate(jsc(), Option.of(records), originalWriteConfig); assertTrue(writeConfigOpt.isPresent()); assertEquals(779.0, writeConfigOpt.get().getCopyOnWriteRecordSizeEstimate(), 10.0); }