diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java index 6e82f42411e0e..1d2b4e0edaa1a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java @@ -18,25 +18,35 @@ package org.apache.hudi.client.bootstrap; -import org.apache.avro.Schema; -import org.apache.hadoop.fs.Path; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.AvroOrcUtils; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; import org.apache.parquet.schema.MessageType; import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; +import java.io.IOException; import java.util.List; import java.util.Objects; +import static org.apache.hudi.common.model.HoodieFileFormat.ORC; +import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; + public class HoodieSparkBootstrapSchemaProvider extends HoodieBootstrapSchemaProvider { public HoodieSparkBootstrapSchemaProvider(HoodieWriteConfig writeConfig) { super(writeConfig); @@ -44,16 +54,24 @@ public HoodieSparkBootstrapSchemaProvider(HoodieWriteConfig writeConfig) { @Override protected Schema getBootstrapSourceSchema(HoodieEngineContext context, List>> partitions) { - MessageType parquetSchema = partitions.stream().flatMap(p -> p.getValue().stream()).map(fs -> { - try { - Path filePath = FileStatusUtils.toPath(fs.getPath()); - return new ParquetUtils().readSchema(context.getHadoopConf().get(), filePath); - } catch (Exception ex) { - return null; - } - }).filter(Objects::nonNull).findAny() + Schema schema = partitions.stream().flatMap(p -> p.getValue().stream()).map(fs -> { + Path filePath = FileStatusUtils.toPath(fs.getPath()); + String extension = FSUtils.getFileExtension(filePath.getName()); + if (PARQUET.getFileExtension().equals(extension)) { + return getBootstrapSourceSchemaParquet(writeConfig, context, filePath); + } else if (ORC.getFileExtension().equals(extension)) { + return getBootstrapSourceSchemaOrc(writeConfig, context, filePath); + } else { + throw new HoodieException("Could not determine schema from the data files."); + } + } + ).filter(Objects::nonNull).findAny() .orElseThrow(() -> new HoodieException("Could not determine schema from the data files.")); + return schema; + } + private static Schema getBootstrapSourceSchemaParquet(HoodieWriteConfig writeConfig, HoodieEngineContext context, Path filePath) { + MessageType parquetSchema = new ParquetUtils().readSchema(context.getHadoopConf().get(), filePath); ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter( Boolean.parseBoolean(SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString()), @@ -65,4 +83,19 @@ protected Schema getBootstrapSourceSchema(HoodieEngineContext context, List bootstrapHandle = new HoodieBootstrapHandle(config, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, + table, partitionPath, FSUtils.createNewFileIdPfx(), table.getTaskContextSupplier()); + try { + Schema avroSchema = getAvroSchema(sourceFilePath); + Schema recordKeySchema = HoodieAvroUtils.generateProjectionSchema(avroSchema, + keyGenerator.getRecordKeyFieldNames()); + LOG.info("Schema to be used for reading record Keys :" + recordKeySchema); + AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), recordKeySchema); + AvroReadSupport.setRequestedProjection(table.getHadoopConf(), recordKeySchema); + executeBootstrap(bootstrapHandle, sourceFilePath, keyGenerator, partitionPath, avroSchema); + } catch (Exception e) { + throw new HoodieException(e.getMessage(), e); + } + + BootstrapWriteStatus writeStatus = (BootstrapWriteStatus) bootstrapHandle.writeStatuses().get(0); + BootstrapFileMapping bootstrapFileMapping = new BootstrapFileMapping( + config.getBootstrapSourceBasePath(), srcPartitionPath, partitionPath, + srcFileStatus, writeStatus.getFileId()); + writeStatus.setBootstrapSourceFileMapping(bootstrapFileMapping); + return writeStatus; + } + + abstract Schema getAvroSchema(Path sourceFilePath) throws IOException; + + abstract void executeBootstrap(HoodieBootstrapHandle bootstrapHandle, + Path sourceFilePath, KeyGeneratorInterface keyGenerator, String partitionPath, Schema avroSchema) throws Exception; +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapMetadataHandler.java new file mode 100644 index 0000000000000..75daca739c8f5 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapMetadataHandler.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.bootstrap; + +import org.apache.hudi.client.bootstrap.BootstrapWriteStatus; +import org.apache.hudi.keygen.KeyGeneratorInterface; + +/** + * Bootstrap metadata handler to assist in bootstrapping only metadata. + */ +public interface BootstrapMetadataHandler { + /** + * Execute bootstrap with only metatata. + * @param srcPartitionPath source partition path. + * @param partitionPath destination partition path. + * @param keyGenerator key generator to use. + * @return the {@link BootstrapWriteStatus} which has the result of execution. + */ + BootstrapWriteStatus runMetadataBootstrap(String srcPartitionPath, String partitionPath, KeyGeneratorInterface keyGenerator); +} + + diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/MetadataBootstrapHandlerFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/MetadataBootstrapHandlerFactory.java new file mode 100644 index 0000000000000..533e7ad27a8fa --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/MetadataBootstrapHandlerFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.bootstrap; + +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.bootstrap.FileStatusUtils; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.avro.model.HoodieFileStatus; +import static org.apache.hudi.common.model.HoodieFileFormat.ORC; +import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; + +public class MetadataBootstrapHandlerFactory { + + public static BootstrapMetadataHandler getMetadataHandler(HoodieWriteConfig config, HoodieTable table, HoodieFileStatus srcFileStatus) { + Path sourceFilePath = FileStatusUtils.toPath(srcFileStatus.getPath()); + + String extension = FSUtils.getFileExtension(sourceFilePath.toString()); + BootstrapMetadataHandler bootstrapMetadataHandler; + if (ORC.getFileExtension().equals(extension)) { + return new OrcBootstrapMetadataHandler(config, table, srcFileStatus); + } else if (PARQUET.getFileExtension().equals(extension)) { + return new ParquetBootstrapMetadataHandler(config, table, srcFileStatus); + } else { + throw new HoodieIOException("Bootstrap Metadata Handler not implemented for base file format " + extension); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java new file mode 100644 index 0000000000000..9587c5b30cb74 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.bootstrap; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieFileStatus; +import org.apache.hudi.client.bootstrap.BootstrapRecordPayload; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.AvroOrcUtils; +import org.apache.hudi.common.util.OrcReaderIterator; +import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.execution.SparkBoundedInMemoryExecutor; +import org.apache.hudi.io.HoodieBootstrapHandle; +import org.apache.hudi.keygen.KeyGeneratorInterface; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; + +import java.io.IOException; + +class OrcBootstrapMetadataHandler extends BaseBootstrapMetadataHandler { + private static final Logger LOG = LogManager.getLogger(OrcBootstrapMetadataHandler.class); + + public OrcBootstrapMetadataHandler(HoodieWriteConfig config, HoodieTable table, HoodieFileStatus srcFileStatus) { + super(config, table, srcFileStatus); + } + + @Override + Schema getAvroSchema(Path sourceFilePath) throws IOException { + Reader orcReader = OrcFile.createReader(sourceFilePath, OrcFile.readerOptions(table.getHadoopConf())); + TypeDescription orcSchema = orcReader.getSchema(); + return AvroOrcUtils.createAvroSchema(orcSchema); + } + + @Override + void executeBootstrap(HoodieBootstrapHandle bootstrapHandle, Path sourceFilePath, KeyGeneratorInterface keyGenerator, + String partitionPath, Schema avroSchema) throws Exception { + BoundedInMemoryExecutor wrapper = null; + Reader orcReader = OrcFile.createReader(sourceFilePath, OrcFile.readerOptions(table.getHadoopConf())); + TypeDescription orcSchema = orcReader.getSchema(); + try (RecordReader reader = orcReader.rows(new Reader.Options(table.getHadoopConf()).schema(orcSchema))) { + wrapper = new SparkBoundedInMemoryExecutor(config, + new OrcReaderIterator(reader, avroSchema, orcSchema), new BootstrapRecordConsumer(bootstrapHandle), inp -> { + String recKey = keyGenerator.getKey(inp).getRecordKey(); + GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA); + gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey); + BootstrapRecordPayload payload = new BootstrapRecordPayload(gr); + HoodieRecord rec = new HoodieRecord(new HoodieKey(recKey, partitionPath), payload); + return rec; + }); + wrapper.execute(); + } catch (Exception e) { + throw new HoodieException(e); + } finally { + bootstrapHandle.close(); + if (null != wrapper) { + wrapper.shutdownNow(); + } + } + } +} + diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java new file mode 100644 index 0000000000000..058c2d4267abb --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.bootstrap; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieFileStatus; +import org.apache.hudi.client.bootstrap.BootstrapRecordPayload; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.ParquetReaderIterator; +import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.execution.SparkBoundedInMemoryExecutor; +import org.apache.hudi.io.HoodieBootstrapHandle; +import org.apache.hudi.keygen.KeyGeneratorInterface; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +import java.io.IOException; + +class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler { + private static final Logger LOG = LogManager.getLogger(ParquetBootstrapMetadataHandler.class); + + public ParquetBootstrapMetadataHandler(HoodieWriteConfig config, HoodieTable table, HoodieFileStatus srcFileStatus) { + super(config, table, srcFileStatus); + } + + @Override + Schema getAvroSchema(Path sourceFilePath) throws IOException { + ParquetMetadata readFooter = ParquetFileReader.readFooter(table.getHadoopConf(), sourceFilePath, + ParquetMetadataConverter.NO_FILTER); + MessageType parquetSchema = readFooter.getFileMetaData().getSchema(); + return new AvroSchemaConverter().convert(parquetSchema); + } + + @Override + void executeBootstrap(HoodieBootstrapHandle bootstrapHandle, + Path sourceFilePath, KeyGeneratorInterface keyGenerator, String partitionPath, Schema avroSchema) throws Exception { + BoundedInMemoryExecutor wrapper = null; + try { + ParquetReader reader = + AvroParquetReader.builder(sourceFilePath).withConf(table.getHadoopConf()).build(); + wrapper = new SparkBoundedInMemoryExecutor(config, + new ParquetReaderIterator(reader), new BootstrapRecordConsumer(bootstrapHandle), inp -> { + String recKey = keyGenerator.getKey(inp).getRecordKey(); + GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA); + gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey); + BootstrapRecordPayload payload = new BootstrapRecordPayload(gr); + HoodieRecord rec = new HoodieRecord(new HoodieKey(recKey, partitionPath), payload); + return rec; + }); + wrapper.execute(); + } catch (Exception e) { + throw new HoodieException(e); + } finally { + bootstrapHandle.close(); + if (null != wrapper) { + wrapper.shutdownNow(); + } + } + } +} + diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java index bdeb041b31479..7486d07e1335c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java @@ -100,6 +100,8 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hudi.table.action.bootstrap.MetadataBootstrapHandlerFactory.getMetadataHandler; + public class SparkBootstrapCommitActionExecutor> extends BaseCommitActionExecutor>, JavaRDD, JavaRDD, HoodieBootstrapWriteMetadata> { @@ -398,8 +400,8 @@ private JavaRDD runMetadataBootstrap(List handleMetadataBootstrap(partitionFsPair.getLeft(), partitionFsPair.getRight().getLeft(), - partitionFsPair.getRight().getRight(), keyGenerator)); + .map(partitionFsPair -> getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap(partitionFsPair.getLeft(), + partitionFsPair.getRight().getLeft(), keyGenerator)); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java index 0f1f49fedc860..de2e345a86989 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java @@ -52,6 +52,7 @@ import org.apache.orc.storage.serde2.io.DateWritable; import org.apache.hudi.exception.HoodieIOException; import org.apache.orc.TypeDescription; +import static org.apache.avro.JsonProperties.NULL_VALUE; /** * Methods including addToVector, addUnionValue, createOrcSchema are originally from @@ -796,4 +797,78 @@ private static Schema getActualSchemaType(Schema unionSchema) { return Schema.createUnion(nonNullMembers); } } + + public static Schema createAvroSchemaWithDefaultValue(TypeDescription orcSchema, String recordName, String namespace, boolean nullable) { + Schema avroSchema = createAvroSchemaWithNamespace(orcSchema,recordName,namespace); + List fields = new ArrayList(); + List fieldList = avroSchema.getFields(); + for (Field field : fieldList) { + Schema fieldSchema = field.schema(); + Schema nullableSchema = Schema.createUnion(Schema.create(Schema.Type.NULL),fieldSchema); + if (nullable) { + fields.add(new Schema.Field(field.name(), nullableSchema, null, NULL_VALUE)); + } else { + fields.add(new Schema.Field(field.name(), fieldSchema, null, (Object) null)); + } + } + Schema schema = Schema.createRecord(recordName, null, null, false); + schema.setFields(fields); + return schema; + } + + private static Schema createAvroSchemaWithNamespace(TypeDescription orcSchema, String recordName, String namespace) { + switch (orcSchema.getCategory()) { + case BOOLEAN: + return Schema.create(Schema.Type.BOOLEAN); + case BYTE: + // tinyint (8 bit), use int to hold it + return Schema.create(Schema.Type.INT); + case SHORT: + // smallint (16 bit), use int to hold it + return Schema.create(Schema.Type.INT); + case INT: + // the Avro logical type could be AvroTypeUtil.LOGICAL_TYPE_TIME_MILLIS, but there is no way to distinguish + return Schema.create(Schema.Type.INT); + case LONG: + // the Avro logical type could be AvroTypeUtil.LOGICAL_TYPE_TIME_MICROS, but there is no way to distinguish + return Schema.create(Schema.Type.LONG); + case FLOAT: + return Schema.create(Schema.Type.FLOAT); + case DOUBLE: + return Schema.create(Schema.Type.DOUBLE); + case VARCHAR: + case CHAR: + case STRING: + return Schema.create(Schema.Type.STRING); + case DATE: + Schema date = Schema.create(Schema.Type.INT); + LogicalTypes.date().addToSchema(date); + return date; + case TIMESTAMP: + Schema timestamp = Schema.create(Schema.Type.LONG); + LogicalTypes.timestampMillis().addToSchema(timestamp); + return timestamp; + case BINARY: + return Schema.create(Schema.Type.BYTES); + case DECIMAL: + Schema decimal = Schema.create(Schema.Type.BYTES); + LogicalTypes.decimal(orcSchema.getPrecision(), orcSchema.getScale()).addToSchema(decimal); + return decimal; + case LIST: + return Schema.createArray(createAvroSchemaWithNamespace(orcSchema.getChildren().get(0), recordName, "")); + case MAP: + return Schema.createMap(createAvroSchemaWithNamespace(orcSchema.getChildren().get(1), recordName, "")); + case STRUCT: + List childFields = new ArrayList<>(); + for (int i = 0; i < orcSchema.getChildren().size(); i++) { + TypeDescription childType = orcSchema.getChildren().get(i); + String childName = orcSchema.getFieldNames().get(i); + childFields.add(new Field(childName, createAvroSchemaWithNamespace(childType, childName, ""), null, null)); + } + return Schema.createRecord(recordName, null, namespace, false, childFields); + default: + throw new IllegalStateException(String.format("Unrecognized ORC type: %s", orcSchema.getCategory().getName())); + + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java index bf2473913585d..e418043fe0ecd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java @@ -19,8 +19,6 @@ package org.apache.hudi.common.util; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -50,8 +48,6 @@ import org.apache.orc.RecordReader; import org.apache.orc.TypeDescription; -import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_SCHEMA_METADATA_KEY; - /** * Utility functions for ORC files. */ @@ -226,9 +222,8 @@ public Map readFooter(Configuration conf, boolean required, public Schema readAvroSchema(Configuration conf, Path orcFilePath) { try { Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf)); - ByteBuffer schemaBuffer = reader.getMetadataValue(HOODIE_AVRO_SCHEMA_METADATA_KEY); - String schemaText = StandardCharsets.UTF_8.decode(schemaBuffer).toString(); - return new Schema.Parser().parse(schemaText); + TypeDescription orcSchema = reader.getSchema(); + return AvroOrcUtils.createAvroSchema(orcSchema); } catch (IOException io) { throw new HoodieIOException("Unable to get Avro schema for ORC file:" + orcFilePath, io); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index 67b147aa63645..d03dca0c81887 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -111,6 +111,13 @@ public static HoodieTableMetaClient init(Configuration hadoopConf, String basePa return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties); } + public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, HoodieFileFormat baseFileFormat) throws IOException { + Properties props = new Properties(); + props.setProperty(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key(), bootstrapBasePath); + props.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), baseFileFormat.name()); + return init(getDefaultHadoopConf(), basePath, tableType, props); + } + public static T serializeDeserialize(T object, Class clazz) { // Using Kryo as the default serializer in Spark Jobs Kryo kryo = new Kryo(); diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieSparkBootstrapExample.java b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieSparkBootstrapExample.java new file mode 100644 index 0000000000000..e385e476dd26e --- /dev/null +++ b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieSparkBootstrapExample.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.spark; + +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.config.HoodieBootstrapConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.examples.common.HoodieExampleSparkUtils; +import org.apache.hudi.keygen.NonpartitionedKeyGenerator; +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.Dataset; + + + +public class HoodieSparkBootstrapExample { + + private static String tableType = HoodieTableType.MERGE_ON_READ.name(); + + + public static void main(String[] args) throws Exception { + if (args.length < 5) { + System.err.println("Usage: HoodieWriteClientExample "); + System.exit(1); + } + String recordKey = args[0]; + String tableName = args[1]; + String partitionPath = args[2]; + String preCombineField = args[3]; + String basePath = args[4]; + + SparkConf sparkConf = HoodieExampleSparkUtils.defaultSparkConf("hoodie-client-example"); + + SparkSession spark = SparkSession + .builder() + .appName("Java Spark SQL basic example") + .config("spark.some.config.option", "some-value") + .enableHiveSupport() + .getOrCreate(); + + Dataset df = spark.emptyDataFrame(); + + df.write().format("hudi").option(HoodieWriteConfig.TBL_NAME.key(), tableName) + .option(DataSourceWriteOptions.OPERATION().key(), DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL()) + .option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey) + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), partitionPath) + .option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), preCombineField) + .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieFileFormat.ORC.name()) + .option(HoodieBootstrapConfig.BASE_PATH.key(), basePath) + .option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key(), NonpartitionedKeyGenerator.class.getCanonicalName()) + .mode(SaveMode.Overwrite).save("/hudi/"+tableName); + + df.count(); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java new file mode 100644 index 0000000000000..560b590183cf5 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.bootstrap; + +import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.HoodieSparkUtils; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieFileStatus; +import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.bootstrap.FileStatusUtils; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; + +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SparkSession; + +import java.io.IOException; +import java.util.List; + +public abstract class SparkFullBootstrapDataProviderBase extends FullRecordBootstrapDataProvider> { + + private final transient SparkSession sparkSession; + + public SparkFullBootstrapDataProviderBase(TypedProperties props, + HoodieSparkEngineContext context) { + super(props, context); + this.sparkSession = SparkSession.builder().config(context.getJavaSparkContext().getConf()).getOrCreate(); + } + + @Override + public JavaRDD generateInputRecords(String tableName, String sourceBasePath, + List>> partitionPathsWithFiles) { + String[] filePaths = partitionPathsWithFiles.stream().map(Pair::getValue) + .flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString())) + .toArray(String[]::new); + + Dataset inputDataset = sparkSession.read().format(getFormat()).load(filePaths); + try { + KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); + String structName = tableName + "_record"; + String namespace = "hoodie." + tableName; + RDD genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false, + Option.empty()); + return genericRecords.toJavaRDD().map(gr -> { + String orderingVal = HoodieAvroUtils.getNestedFieldValAsString( + gr, props.getString("hoodie.datasource.write.precombine.field"), false); + try { + return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), + props.getString("hoodie.datasource.write.payload.class")); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + }); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + } + + protected abstract String getFormat(); +} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkOrcBootstrapDataProvider.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkOrcBootstrapDataProvider.java new file mode 100644 index 0000000000000..9176d19366625 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkOrcBootstrapDataProvider.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.bootstrap; + +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.TypedProperties; + +/** + * Spark Data frame based bootstrap input provider. + */ +public class SparkOrcBootstrapDataProvider extends SparkFullBootstrapDataProviderBase { + + public SparkOrcBootstrapDataProvider(TypedProperties props, + HoodieSparkEngineContext context) { + super(props, context); + } + + @Override + protected String getFormat() { + return "orc"; + } +} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java index 6051317460c65..e3bdbfe0aa888 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkParquetBootstrapDataProvider.java @@ -18,69 +18,21 @@ package org.apache.hudi.bootstrap; -import org.apache.hudi.DataSourceUtils; -import org.apache.hudi.HoodieSparkUtils; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.avro.model.HoodieFileStatus; -import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.keygen.KeyGenerator; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.SparkSession; - -import java.io.IOException; -import java.util.List; /** * Spark Data frame based bootstrap input provider. */ -public class SparkParquetBootstrapDataProvider extends FullRecordBootstrapDataProvider> { - - private final transient SparkSession sparkSession; +public class SparkParquetBootstrapDataProvider extends SparkFullBootstrapDataProviderBase { public SparkParquetBootstrapDataProvider(TypedProperties props, HoodieSparkEngineContext context) { super(props, context); - this.sparkSession = SparkSession.builder().config(context.getJavaSparkContext().getConf()).getOrCreate(); } @Override - public JavaRDD generateInputRecords(String tableName, String sourceBasePath, - List>> partitionPathsWithFiles) { - String[] filePaths = partitionPathsWithFiles.stream().map(Pair::getValue) - .flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString())) - .toArray(String[]::new); - - Dataset inputDataset = sparkSession.read().parquet(filePaths); - try { - KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); - String structName = tableName + "_record"; - String namespace = "hoodie." + tableName; - RDD genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false, - Option.empty()); - return genericRecords.toJavaRDD().map(gr -> { - String orderingVal = HoodieAvroUtils.getNestedFieldValAsString( - gr, props.getString("hoodie.datasource.write.precombine.field"), false); - try { - return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), - props.getString("hoodie.datasource.write.payload.class")); - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); - } - }); - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); - } + protected String getFormat() { + return "parquet"; } } \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 1d0e8af1647f6..ddbd7fc06a95b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -377,6 +377,7 @@ object HoodieSparkSqlWriter { val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD) val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME) val populateMetaFields = parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()).toBoolean + val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT) HoodieTableMetaClient.withPropertyBuilder() .setTableType(HoodieTableType.valueOf(tableType)) @@ -386,6 +387,7 @@ object HoodieSparkSqlWriter { .setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME)) .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null)) .setBootstrapIndexClass(bootstrapIndexClass) + .setBaseFileFormat(baseFileFormat) .setBootstrapBasePath(bootstrapBasePath) .setPartitionFields(partitionColumns) .setPopulateMetaFields(populateMetaFields) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java new file mode 100644 index 0000000000000..fba09091add50 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional; + +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.avro.model.HoodieFileStatus; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.bootstrap.BootstrapMode; +import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider; +import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector; +import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector; +import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.bootstrap.FileStatusUtils; +import org.apache.hudi.common.bootstrap.index.BootstrapIndex; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.FileCreateUtils; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.testutils.RawTripTestPayload; +import org.apache.hudi.common.util.AvroOrcUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.OrcReaderIterator; +import org.apache.hudi.common.util.PartitionPathEncodeUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieBootstrapConfig; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; +import org.apache.hudi.index.HoodieIndex.IndexType; +import org.apache.hudi.keygen.NonpartitionedKeyGenerator; +import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.table.action.bootstrap.BootstrapUtils; +import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.api.java.UDF1; +import org.apache.spark.sql.types.DataTypes; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.StreamSupport; + +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toList; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.generateGenericRecord; +import static org.apache.spark.sql.functions.callUDF; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests Bootstrap Client functionality. + */ +@Tag("functional") +public class TestOrcBootstrap extends HoodieClientTestBase { + + + public static final String TRIP_HIVE_COLUMN_TYPES = "bigint,string,string,string,double,double,double,double," + + "struct,array>,boolean"; + @TempDir + public java.nio.file.Path tmpFolder; + + protected String bootstrapBasePath = null; + + private HoodieParquetInputFormat roInputFormat; + private JobConf roJobConf; + + private HoodieParquetRealtimeInputFormat rtInputFormat; + private JobConf rtJobConf; + private SparkSession spark; + + @BeforeEach + public void setUp() throws Exception { + bootstrapBasePath = tmpFolder.toAbsolutePath().toString() + "/data"; + initPath(); + initSparkContexts(); + initTestDataGenerator(); + initMetaClient(); + reloadInputFormats(); + } + + @AfterEach + public void tearDown() throws IOException { + cleanupSparkContexts(); + cleanupClients(); + cleanupTestDataGenerator(); + } + + private void reloadInputFormats() { + // initialize parquet input format + roInputFormat = new HoodieParquetInputFormat(); + roJobConf = new JobConf(jsc.hadoopConfiguration()); + roInputFormat.setConf(roJobConf); + + } + + public Schema generateNewDataSetAndReturnSchema(long timestamp, int numRecords, List partitionPaths, + String srcPath) throws Exception { + boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty(); + Dataset df = generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths, jsc, sqlContext); + df.printSchema(); + if (isPartitioned) { + df.write().partitionBy("datestr").format("orc").mode(SaveMode.Overwrite).save(srcPath); + } else { + df.write().format("orc").mode(SaveMode.Overwrite).save(srcPath); + } + String filePath = FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), + srcPath, context).stream().findAny().map(p -> p.getValue().stream().findAny()) + .orElse(null).get().getPath()).toString(); + Reader orcReader = OrcFile.createReader(new Path(filePath), OrcFile.readerOptions(metaClient.getHadoopConf())); + + TypeDescription orcSchema = orcReader.getSchema(); + + return AvroOrcUtils.createAvroSchemaWithDefaultValue(orcSchema, "test_orc_record", null, true); + } + + @Test + public void testMetadataBootstrapUnpartitionedCOW() throws Exception { + testBootstrapCommon(false, false, EffectiveMode.METADATA_BOOTSTRAP_MODE); + } + + @Test + public void testMetadataBootstrapWithUpdatesCOW() throws Exception { + testBootstrapCommon(true, false, EffectiveMode.METADATA_BOOTSTRAP_MODE); + } + + private enum EffectiveMode { + FULL_BOOTSTRAP_MODE, + METADATA_BOOTSTRAP_MODE, + MIXED_BOOTSTRAP_MODE + } + + private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, EffectiveMode mode) throws Exception { + + if (deltaCommit) { + metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, bootstrapBasePath, HoodieFileFormat.ORC); + } else { + metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, bootstrapBasePath, HoodieFileFormat.ORC); + } + + int totalRecords = 100; + String keyGeneratorClass = partitioned ? SimpleKeyGenerator.class.getCanonicalName() + : NonpartitionedKeyGenerator.class.getCanonicalName(); + final String bootstrapModeSelectorClass; + final String bootstrapCommitInstantTs; + final boolean checkNumRawFiles; + final boolean isBootstrapIndexCreated; + final int numInstantsAfterBootstrap; + final List bootstrapInstants; + switch (mode) { + case FULL_BOOTSTRAP_MODE: + bootstrapModeSelectorClass = FullRecordBootstrapModeSelector.class.getCanonicalName(); + bootstrapCommitInstantTs = HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS; + checkNumRawFiles = false; + isBootstrapIndexCreated = false; + numInstantsAfterBootstrap = 1; + bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs); + break; + case METADATA_BOOTSTRAP_MODE: + bootstrapModeSelectorClass = MetadataOnlyBootstrapModeSelector.class.getCanonicalName(); + bootstrapCommitInstantTs = HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS; + checkNumRawFiles = true; + isBootstrapIndexCreated = true; + numInstantsAfterBootstrap = 1; + bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs); + break; + default: + bootstrapModeSelectorClass = TestRandomBootstapModeSelector.class.getName(); + bootstrapCommitInstantTs = HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS; + checkNumRawFiles = false; + isBootstrapIndexCreated = true; + numInstantsAfterBootstrap = 2; + bootstrapInstants = Arrays.asList(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, + HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS); + break; + } + List partitions = Arrays.asList("2020/04/01", "2020/04/02", "2020/04/03"); + long timestamp = Instant.now().toEpochMilli(); + Schema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords, partitions, bootstrapBasePath); + HoodieWriteConfig config = getConfigBuilder(schema.toString()) + .withAutoCommit(true) + .withSchema(schema.toString()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(1) + .build()) + .withBootstrapConfig(HoodieBootstrapConfig.newBuilder() + .withBootstrapBasePath(bootstrapBasePath) + .withBootstrapKeyGenClass(keyGeneratorClass) + .withFullBootstrapInputProvider(TestFullBootstrapDataProvider.class.getName()) + .withBootstrapParallelism(3) + .withBootstrapModeSelector(bootstrapModeSelectorClass).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .build(); + SparkRDDWriteClient client = new SparkRDDWriteClient(context, config); + client.bootstrap(Option.empty()); + checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, + numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants); + + // Rollback Bootstrap + if (deltaCommit) { + FileCreateUtils.deleteDeltaCommit(metaClient.getBasePath(), bootstrapCommitInstantTs); + } else { + FileCreateUtils.deleteCommit(metaClient.getBasePath(), bootstrapCommitInstantTs); + } + client.rollbackFailedBootstrap(); + metaClient.reloadActiveTimeline(); + assertEquals(0, metaClient.getCommitsTimeline().countInstants()); + assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), basePath, context) + .stream().flatMap(f -> f.getValue().stream()).count()); + + BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient); + assertFalse(index.useIndex()); + + // Run bootstrap again + client = new SparkRDDWriteClient(context, config); + client.bootstrap(Option.empty()); + + metaClient.reloadActiveTimeline(); + index = BootstrapIndex.getBootstrapIndex(metaClient); + if (isBootstrapIndexCreated) { + assertTrue(index.useIndex()); + } else { + assertFalse(index.useIndex()); + } + + checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, + numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants); + + // Upsert case + long updateTimestamp = Instant.now().toEpochMilli(); + String updateSPath = tmpFolder.toAbsolutePath().toString() + "/data2"; + generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, partitions, updateSPath); + JavaRDD updateBatch = + generateInputBatch(jsc, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), updateSPath, context), + schema); + String newInstantTs = client.startCommit(); + client.upsert(updateBatch, newInstantTs); + checkBootstrapResults(totalRecords, schema, newInstantTs, false, numInstantsAfterBootstrap + 1, + updateTimestamp, deltaCommit ? timestamp : updateTimestamp, deltaCommit); + + if (deltaCommit) { + Option compactionInstant = client.scheduleCompaction(Option.empty()); + assertTrue(compactionInstant.isPresent()); + client.compact(compactionInstant.get()); + checkBootstrapResults(totalRecords, schema, compactionInstant.get(), checkNumRawFiles, + numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit, + Arrays.asList(compactionInstant.get())); + } + } + + @Test + public void testMetadataBootstrapWithUpdatesMOR() throws Exception { + testBootstrapCommon(true, true, EffectiveMode.METADATA_BOOTSTRAP_MODE); + } + + @Test + public void testFullBootstrapOnlyCOW() throws Exception { + testBootstrapCommon(true, false, EffectiveMode.FULL_BOOTSTRAP_MODE); + } + + @Test + public void testFullBootstrapWithUpdatesMOR() throws Exception { + testBootstrapCommon(true, true, EffectiveMode.FULL_BOOTSTRAP_MODE); + } + + @Test + public void testMetaAndFullBootstrapCOW() throws Exception { + testBootstrapCommon(true, false, EffectiveMode.MIXED_BOOTSTRAP_MODE); + } + + @Test + public void testMetadataAndFullBootstrapWithUpdatesMOR() throws Exception { + testBootstrapCommon(true, true, EffectiveMode.MIXED_BOOTSTRAP_MODE); + } + + private void checkBootstrapResults(int totalRecords, Schema schema, String maxInstant, boolean checkNumRawFiles, + int expNumInstants, long expTimestamp, long expROTimestamp, boolean isDeltaCommit) throws Exception { + checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, expNumInstants, expNumInstants, + expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant)); + } + + private void checkBootstrapResults(int totalRecords, Schema schema, String instant, boolean checkNumRawFiles, + int expNumInstants, int numVersions, long expTimestamp, long expROTimestamp, boolean isDeltaCommit, + List instantsWithValidRecords) throws Exception { + metaClient.reloadActiveTimeline(); + assertEquals(expNumInstants, metaClient.getCommitsTimeline().filterCompletedInstants().countInstants()); + assertEquals(instant, metaClient.getActiveTimeline() + .getCommitsTimeline().filterCompletedInstants().lastInstant().get().getTimestamp()); + + Dataset bootstrapped = sqlContext.read().format("orc").load(basePath); + Dataset original = sqlContext.read().format("orc").load(bootstrapBasePath); + bootstrapped.registerTempTable("bootstrapped"); + original.registerTempTable("original"); + if (checkNumRawFiles) { + List files = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), + bootstrapBasePath, context).stream().flatMap(x -> x.getValue().stream()).collect(Collectors.toList()); + assertEquals(files.size() * numVersions, + sqlContext.sql("select distinct _hoodie_file_name from bootstrapped").count()); + } + + if (!isDeltaCommit) { + String predicate = String.join(", ", + instantsWithValidRecords.stream().map(p -> "\"" + p + "\"").collect(Collectors.toList())); + assertEquals(totalRecords, sqlContext.sql("select * from bootstrapped where _hoodie_commit_time IN " + + "(" + predicate + ")").count()); + Dataset missingOriginal = sqlContext.sql("select a._row_key from original a where a._row_key not " + + "in (select _hoodie_record_key from bootstrapped)"); + assertEquals(0, missingOriginal.count()); + Dataset missingBootstrapped = sqlContext.sql("select a._hoodie_record_key from bootstrapped a " + + "where a._hoodie_record_key not in (select _row_key from original)"); + assertEquals(0, missingBootstrapped.count()); + //sqlContext.sql("select * from bootstrapped").show(10, false); + } + + + } + + public static class TestFullBootstrapDataProvider extends FullRecordBootstrapDataProvider> { + + public TestFullBootstrapDataProvider(TypedProperties props, HoodieSparkEngineContext context) { + super(props, context); + } + + @Override + public JavaRDD generateInputRecords(String tableName, String sourceBasePath, + List>> partitionPaths) { + String[] filePaths = partitionPaths.stream().map(Pair::getValue) + .flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString())) + .toArray(String[]::new); + + JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); + + String filePath = FileStatusUtils.toPath(partitionPaths.stream().flatMap(p -> p.getValue().stream()) + .findAny().get().getPath()).toString(); + try { + Reader orcReader = OrcFile.createReader( + new Path(filePath), new OrcFile.ReaderOptions(jsc.hadoopConfiguration())); + + TypeDescription orcSchema = orcReader.getSchema(); + Schema avroSchema = AvroOrcUtils.createAvroSchemaWithDefaultValue(orcSchema, "test_orc_record", null, true); + return generateInputBatch(jsc, partitionPaths, avroSchema); + + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + } + } + + private static JavaRDD generateInputBatch(JavaSparkContext jsc, + List>> partitionPaths, Schema writerSchema) { + List> fullFilePathsWithPartition = partitionPaths.stream().flatMap(p -> p.getValue().stream() + .map(x -> Pair.of(p.getKey(), FileStatusUtils.toPath(x.getPath())))).collect(Collectors.toList()); + return jsc.parallelize(fullFilePathsWithPartition.stream().flatMap(p -> { + try { + Configuration conf = jsc.hadoopConfiguration(); + AvroReadSupport.setAvroReadSchema(conf, writerSchema); + Reader orcReader = OrcFile.createReader( + p.getValue(), + new OrcFile.ReaderOptions(jsc.hadoopConfiguration())); + RecordReader recordReader = orcReader.rows(); + + TypeDescription orcSchema = orcReader.getSchema(); + + Schema avroSchema = AvroOrcUtils.createAvroSchemaWithDefaultValue(orcSchema, "test_orc_record", null, true); + + Iterator recIterator = new OrcReaderIterator(recordReader, avroSchema, orcSchema); + + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(recIterator, 0), false).map(gr -> { + try { + String key = gr.get("_row_key").toString(); + String pPath = p.getKey(); + return new HoodieRecord<>(new HoodieKey(key, pPath), new RawTripTestPayload(gr.toString(), key, pPath, + HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + }).collect(Collectors.toList())); + } + + public static class TestRandomBootstapModeSelector extends BootstrapModeSelector { + private int currIdx = new Random().nextInt(2); + + public TestRandomBootstapModeSelector(HoodieWriteConfig writeConfig) { + super(writeConfig); + } + + @Override + public Map> select(List>> partitions) { + List> selections = new ArrayList<>(); + partitions.stream().forEach(p -> { + final BootstrapMode mode; + if (currIdx == 0) { + mode = BootstrapMode.METADATA_ONLY; + } else { + mode = BootstrapMode.FULL_RECORD; + } + currIdx = (currIdx + 1) % 2; + selections.add(Pair.of(mode, p.getKey())); + }); + return selections.stream().collect(Collectors.groupingBy(Pair::getKey, mapping(Pair::getValue, toList()))); + } + } + + public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) { + HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr, IndexType.BLOOM) + .withExternalSchemaTrasformation(true); + TypedProperties properties = new TypedProperties(); + properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); + properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "datestr"); + builder = builder.withProps(properties); + return builder; + } + + public static Dataset generateTestRawTripDataset(long timestamp, int from, int to, List partitionPaths, + JavaSparkContext jsc, SQLContext sqlContext) { + boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty(); + final List records = new ArrayList<>(); + IntStream.range(from, to).forEach(i -> { + String id = "" + i; + records.add(generateGenericRecord("trip_" + id, Long.toString(timestamp), "rider_" + id, "driver_" + id, + timestamp, false, false).toString()); + }); + if (isPartitioned) { + sqlContext.udf().register("partgen", + (UDF1) (val) -> PartitionPathEncodeUtils.escapePathName(partitionPaths.get( + Integer.parseInt(val.split("_")[1]) % partitionPaths.size())), + DataTypes.StringType); + } + JavaRDD rdd = jsc.parallelize(records); + Dataset df = sqlContext.read().json(rdd); + if (isPartitioned) { + df = df.withColumn("datestr", callUDF("partgen", new Column("_row_key"))); + // Order the columns to ensure generated avro schema aligns with Hive schema + df = df.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", + "end_lat", "end_lon", "fare", "tip_history", "_hoodie_is_deleted", "datestr"); + } else { + // Order the columns to ensure generated avro schema aligns with Hive schema + df = df.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", + "end_lat", "end_lon", "fare", "tip_history", "_hoodie_is_deleted"); + } + return df; + } +}