diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java index 3c876e75c28f5..ea558c39275da 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java @@ -18,21 +18,18 @@ package org.apache.hudi.table.action.bootstrap; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.bootstrap.HoodieBootstrapSchemaProvider; import org.apache.hudi.client.bootstrap.BootstrapMode; -import org.apache.hudi.client.bootstrap.BootstrapRecordPayload; import org.apache.hudi.client.bootstrap.BootstrapWriteStatus; import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider; +import org.apache.hudi.client.bootstrap.HoodieBootstrapSchemaProvider; import org.apache.hudi.client.bootstrap.HoodieSparkBootstrapSchemaProvider; import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector; import org.apache.hudi.client.bootstrap.translator.BootstrapPartitionPathTranslator; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.utils.SparkMemoryUtils; import org.apache.hudi.client.utils.SparkValidatorUtils; -import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.bootstrap.index.BootstrapIndex; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; @@ -49,45 +46,27 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ParquetReaderIterator; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieCommitException; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieKeyGeneratorException; -import org.apache.hudi.execution.SparkBoundedInMemoryExecutor; -import org.apache.hudi.io.HoodieBootstrapHandle; import org.apache.hudi.keygen.KeyGeneratorInterface; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; -import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor; import org.apache.hudi.table.action.commit.BaseCommitActionExecutor; +import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor; +import org.apache.hudi.table.marker.WriteMarkersFactory; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.parquet.avro.AvroParquetReader; -import org.apache.parquet.avro.AvroReadSupport; -import org.apache.parquet.avro.AvroSchemaConverter; -import org.apache.parquet.format.converter.ParquetMetadataConverter; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.ParquetReader; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.schema.MessageType; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -296,57 +275,6 @@ protected BaseSparkCommitActionExecutor getBulkInsertActionExecutor(JavaRDD bootstrapHandle = new HoodieBootstrapHandle(config, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, - table, partitionPath, FSUtils.createNewFileIdPfx(), table.getTaskContextSupplier()); - Schema avroSchema = null; - try { - ParquetMetadata readFooter = ParquetFileReader.readFooter(table.getHadoopConf(), sourceFilePath, - ParquetMetadataConverter.NO_FILTER); - MessageType parquetSchema = readFooter.getFileMetaData().getSchema(); - avroSchema = new AvroSchemaConverter().convert(parquetSchema); - Schema recordKeySchema = HoodieAvroUtils.generateProjectionSchema(avroSchema, - keyGenerator.getRecordKeyFieldNames()); - LOG.info("Schema to be used for reading record Keys :" + recordKeySchema); - AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), recordKeySchema); - AvroReadSupport.setRequestedProjection(table.getHadoopConf(), recordKeySchema); - - BoundedInMemoryExecutor wrapper = null; - try (ParquetReader reader = - AvroParquetReader.builder(sourceFilePath).withConf(table.getHadoopConf()).build()) { - wrapper = new SparkBoundedInMemoryExecutor(config, - new ParquetReaderIterator(reader), new BootstrapRecordConsumer(bootstrapHandle), inp -> { - String recKey = keyGenerator.getKey(inp).getRecordKey(); - GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA); - gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey); - BootstrapRecordPayload payload = new BootstrapRecordPayload(gr); - HoodieRecord rec = new HoodieRecord(new HoodieKey(recKey, partitionPath), payload); - return rec; - }); - wrapper.execute(); - } catch (Exception e) { - throw new HoodieException(e); - } finally { - bootstrapHandle.close(); - if (null != wrapper) { - wrapper.shutdownNow(); - } - } - } catch (IOException e) { - throw new HoodieIOException(e.getMessage(), e); - } - - BootstrapWriteStatus writeStatus = (BootstrapWriteStatus) bootstrapHandle.writeStatuses().get(0); - BootstrapFileMapping bootstrapFileMapping = new BootstrapFileMapping( - config.getBootstrapSourceBasePath(), srcPartitionPath, partitionPath, - srcFileStatus, writeStatus.getFileId()); - writeStatus.setBootstrapSourceFileMapping(bootstrapFileMapping); - return writeStatus; - } - /** * Return Bootstrap Mode selections for partitions listed and figure out bootstrap Schema. * @return