diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 2d71c4d738886..bf707bc39f13e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1051,7 +1051,7 @@ public int getMaxConsistencyCheckIntervalMs() { } public BulkInsertSortMode getBulkInsertSortMode() { - String sortMode = getString(BULK_INSERT_SORT_MODE); + String sortMode = getStringOrDefault(BULK_INSERT_SORT_MODE); return BulkInsertSortMode.valueOf(sortMode.toUpperCase()); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 91d1f4e4e4fa2..54c1c9f5ac05d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -18,6 +18,10 @@ package org.apache.hudi.client.clustering.run.strategy; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; @@ -25,7 +29,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.utils.ConcatenatingIterator; -import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.ClusteringOperation; @@ -58,11 +62,6 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -246,21 +245,28 @@ private JavaRDD> readRecordsForGroupWithLogs(JavaSparkContext js */ private JavaRDD> readRecordsForGroupBaseFiles(JavaSparkContext jsc, List clusteringOps) { - return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> { - List> iteratorsForPartition = new ArrayList<>(); - clusteringOpsPartition.forEachRemaining(clusteringOp -> { - try { - Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); - HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())); - iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema)); - } catch (IOException e) { - throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() - + " and " + clusteringOp.getDeltaFilePaths(), e); - } - }); + SerializableConfiguration hadoopConf = new SerializableConfiguration(getHoodieTable().getHadoopConf()); + HoodieWriteConfig writeConfig = getWriteConfig(); + + // NOTE: It's crucial to make sure that we don't capture whole "this" object into the + // closure, as this might lead to issues attempting to serialize its nested fields + return jsc.parallelize(clusteringOps, clusteringOps.size()) + .mapPartitions(clusteringOpsPartition -> { + List> iteratorsForPartition = new ArrayList<>(); + clusteringOpsPartition.forEachRemaining(clusteringOp -> { + try { + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema())); + HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(clusteringOp.getDataFilePath())); + iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema)); + } catch (IOException e) { + throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + + " and " + clusteringOp.getDeltaFilePaths(), e); + } + }); - return new ConcatenatingIterator<>(iteratorsForPartition); - }).map(this::transform); + return new ConcatenatingIterator<>(iteratorsForPartition); + }) + .map(record -> transform(record, writeConfig)); } /** @@ -279,12 +285,12 @@ private JavaRDD[] convertStreamToArray(Stream> /** * Transform IndexedRecord into HoodieRecord. */ - private HoodieRecord transform(IndexedRecord indexedRecord) { + private static HoodieRecord transform(IndexedRecord indexedRecord, HoodieWriteConfig writeConfig) { GenericRecord record = (GenericRecord) indexedRecord; Option keyGeneratorOpt = Option.empty(); - if (!getWriteConfig().populateMetaFields()) { + if (!writeConfig.populateMetaFields()) { try { - keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(getWriteConfig().getProps()))); + keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps())); } catch (IOException e) { throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e); }