From 34a530a85a4a1ce41dc10881e79a83e9931a0649 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 4 Mar 2022 14:15:26 -0800 Subject: [PATCH 1/6] Avoid including whole `MultipleSparkJobExecutionStrategy` object into the closure for Spark to serialize --- .../MultipleSparkJobExecutionStrategy.java | 37 +++++++++++-------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 91d1f4e4e4fa2..4dc29449632c4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -246,21 +246,26 @@ private JavaRDD> readRecordsForGroupWithLogs(JavaSparkContext js */ private JavaRDD> readRecordsForGroupBaseFiles(JavaSparkContext jsc, List clusteringOps) { - return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> { - List> iteratorsForPartition = new ArrayList<>(); - clusteringOpsPartition.forEachRemaining(clusteringOp -> { - try { - Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); - HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())); - iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema)); - } catch (IOException e) { - throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() - + " and " + clusteringOp.getDeltaFilePaths(), e); - } - }); + HoodieWriteConfig writeConfig = getWriteConfig(); + return jsc.parallelize(clusteringOps, clusteringOps.size()) + .mapPartitions(clusteringOpsPartition -> { + List> iteratorsForPartition = new ArrayList<>(); + clusteringOpsPartition.forEachRemaining(clusteringOp -> { + try { + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema())); + HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())); + iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema)); + } catch (IOException e) { + throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + + " and " + clusteringOp.getDeltaFilePaths(), e); + } + }); - return new ConcatenatingIterator<>(iteratorsForPartition); - }).map(this::transform); + return new ConcatenatingIterator<>(iteratorsForPartition); + }) + // NOTE: It's crucial to make sure that we don't capture whole "this" object into the + // closure, as this might lead to issues attempting to serialize its nested fields + .map(record -> transform(record, writeConfig)); } /** @@ -279,10 +284,10 @@ private JavaRDD[] convertStreamToArray(Stream> /** * Transform IndexedRecord into HoodieRecord. */ - private HoodieRecord transform(IndexedRecord indexedRecord) { + private static HoodieRecord transform(IndexedRecord indexedRecord, HoodieWriteConfig writeConfig) { GenericRecord record = (GenericRecord) indexedRecord; Option keyGeneratorOpt = Option.empty(); - if (!getWriteConfig().populateMetaFields()) { + if (!writeConfig.populateMetaFields()) { try { keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(getWriteConfig().getProps()))); } catch (IOException e) { From 00009bdcca154d461a6af1a028eb3d953f1ea03e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 4 Mar 2022 14:20:58 -0800 Subject: [PATCH 2/6] Fixing compilation --- .../run/strategy/MultipleSparkJobExecutionStrategy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 4dc29449632c4..9ceb1b7a816d5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -289,7 +289,7 @@ private static HoodieRecord transform(IndexedRecord indexedRecord, Hoodie Option keyGeneratorOpt = Option.empty(); if (!writeConfig.populateMetaFields()) { try { - keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(getWriteConfig().getProps()))); + keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps())); } catch (IOException e) { throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e); } From 06cc6b411f893feeb78b4a0f9362d038b5ed602d Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 4 Mar 2022 14:23:48 -0800 Subject: [PATCH 3/6] Extracted Hadoop Conf as well --- .../run/strategy/MultipleSparkJobExecutionStrategy.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 9ceb1b7a816d5..1485b3e50d775 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -18,6 +18,7 @@ package org.apache.hudi.client.clustering.run.strategy; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; @@ -246,14 +247,18 @@ private JavaRDD> readRecordsForGroupWithLogs(JavaSparkContext js */ private JavaRDD> readRecordsForGroupBaseFiles(JavaSparkContext jsc, List clusteringOps) { + Configuration hadoopConf = getHoodieTable().getHadoopConf(); HoodieWriteConfig writeConfig = getWriteConfig(); + + // NOTE: It's crucial to make sure that we don't capture whole "this" object into the + // closure, as this might lead to issues attempting to serialize its nested fields return jsc.parallelize(clusteringOps, clusteringOps.size()) .mapPartitions(clusteringOpsPartition -> { List> iteratorsForPartition = new ArrayList<>(); clusteringOpsPartition.forEachRemaining(clusteringOp -> { try { Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema())); - HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())); + HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf, new Path(clusteringOp.getDataFilePath())); iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema)); } catch (IOException e) { throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() @@ -263,8 +268,6 @@ private JavaRDD> readRecordsForGroupBaseFiles(JavaSparkContext j return new ConcatenatingIterator<>(iteratorsForPartition); }) - // NOTE: It's crucial to make sure that we don't capture whole "this" object into the - // closure, as this might lead to issues attempting to serialize its nested fields .map(record -> transform(record, writeConfig)); } From 3cd70db10fdc7848b014106e83f4b76c5123077b Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 4 Mar 2022 14:24:31 -0800 Subject: [PATCH 4/6] Fixing compilation --- .../strategy/MultipleSparkJobExecutionStrategy.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 1485b3e50d775..8b1df5dff2f3a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -18,7 +18,11 @@ package org.apache.hudi.client.clustering.run.strategy; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; @@ -26,7 +30,6 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.utils.ConcatenatingIterator; -import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.ClusteringOperation; @@ -59,11 +62,6 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; From 47dcbc4a33c91cfe152b87f95b84631dfcbc9345 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 4 Mar 2022 17:51:16 -0800 Subject: [PATCH 5/6] Fixed serialization issues --- .../run/strategy/MultipleSparkJobExecutionStrategy.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 8b1df5dff2f3a..54c1c9f5ac05d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -21,7 +21,6 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieClusteringGroup; @@ -30,6 +29,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.utils.ConcatenatingIterator; +import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.ClusteringOperation; @@ -245,7 +245,7 @@ private JavaRDD> readRecordsForGroupWithLogs(JavaSparkContext js */ private JavaRDD> readRecordsForGroupBaseFiles(JavaSparkContext jsc, List clusteringOps) { - Configuration hadoopConf = getHoodieTable().getHadoopConf(); + SerializableConfiguration hadoopConf = new SerializableConfiguration(getHoodieTable().getHadoopConf()); HoodieWriteConfig writeConfig = getWriteConfig(); // NOTE: It's crucial to make sure that we don't capture whole "this" object into the @@ -256,7 +256,7 @@ private JavaRDD> readRecordsForGroupBaseFiles(JavaSparkContext j clusteringOpsPartition.forEachRemaining(clusteringOp -> { try { Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema())); - HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf, new Path(clusteringOp.getDataFilePath())); + HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(clusteringOp.getDataFilePath())); iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema)); } catch (IOException e) { throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() From de9ad1511f673c1802b1638a57bd30cad33df485 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 4 Mar 2022 17:51:42 -0800 Subject: [PATCH 6/6] Fixed `getBulkInsertSortMode` to properly fallback to default --- .../src/main/java/org/apache/hudi/config/HoodieWriteConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 2d71c4d738886..bf707bc39f13e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1051,7 +1051,7 @@ public int getMaxConsistencyCheckIntervalMs() { } public BulkInsertSortMode getBulkInsertSortMode() { - String sortMode = getString(BULK_INSERT_SORT_MODE); + String sortMode = getStringOrDefault(BULK_INSERT_SORT_MODE); return BulkInsertSortMode.valueOf(sortMode.toUpperCase()); }