Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,60 @@

package org.apache.hudi.client.bootstrap;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;

public class HoodieSparkBootstrapSchemaProvider extends HoodieBootstrapSchemaProvider {
public HoodieSparkBootstrapSchemaProvider(HoodieWriteConfig writeConfig) {
super(writeConfig);
}

@Override
protected Schema getBootstrapSourceSchema(HoodieEngineContext context, List<Pair<String, List<HoodieFileStatus>>> partitions) {
MessageType parquetSchema = partitions.stream().flatMap(p -> p.getValue().stream()).map(fs -> {
try {
Path filePath = FileStatusUtils.toPath(fs.getPath());
return new ParquetUtils().readSchema(context.getHadoopConf().get(), filePath);
} catch (Exception ex) {
return null;
}
}).filter(Objects::nonNull).findAny()
Schema schema = partitions.stream().flatMap(p -> p.getValue().stream()).map(fs -> {
Path filePath = FileStatusUtils.toPath(fs.getPath());
String extension = FSUtils.getFileExtension(filePath.getName());
if (PARQUET.getFileExtension().equals(extension)) {
return getBootstrapSourceSchemaParquet(writeConfig, context, filePath);
} else if (ORC.getFileExtension().equals(extension)) {
return getBootstrapSourceSchemaOrc(writeConfig, context, filePath);
} else {
throw new HoodieException("Could not determine schema from the data files.");
}
}
).filter(Objects::nonNull).findAny()
.orElseThrow(() -> new HoodieException("Could not determine schema from the data files."));
return schema;
}

private static Schema getBootstrapSourceSchemaParquet(HoodieWriteConfig writeConfig, HoodieEngineContext context, Path filePath) {
MessageType parquetSchema = new ParquetUtils().readSchema(context.getHadoopConf().get(), filePath);

ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(
Boolean.parseBoolean(SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString()),
Expand All @@ -65,4 +83,19 @@ protected Schema getBootstrapSourceSchema(HoodieEngineContext context, List<Pair

return AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, structName, recordNamespace);
}

private static Schema getBootstrapSourceSchemaOrc(HoodieWriteConfig writeConfig, HoodieEngineContext context, Path filePath) {
Reader orcReader = null;
try {
orcReader = OrcFile.createReader(filePath, OrcFile.readerOptions(context.getHadoopConf().get()));
} catch (IOException e) {
throw new HoodieException("Could not determine schema from the data files.");
}
TypeDescription orcSchema = orcReader.getSchema();
String tableName = HoodieAvroUtils.sanitizeName(writeConfig.getTableName());
String structName = tableName + "_record";
String recordNamespace = "hoodie." + tableName;
return AvroOrcUtils.createAvroSchemaWithDefaultValue(orcSchema, structName, recordNamespace, true);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.bootstrap;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.bootstrap.BootstrapWriteStatus;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BootstrapFileMapping;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.HoodieBootstrapHandle;
import org.apache.hudi.keygen.KeyGeneratorInterface;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroReadSupport;

import java.io.IOException;

public abstract class BaseBootstrapMetadataHandler implements BootstrapMetadataHandler {
private static final Logger LOG = LogManager.getLogger(ParquetBootstrapMetadataHandler.class);
protected HoodieWriteConfig config;
protected HoodieTable table;
protected HoodieFileStatus srcFileStatus;

public BaseBootstrapMetadataHandler(HoodieWriteConfig config, HoodieTable table, HoodieFileStatus srcFileStatus) {
this.config = config;
this.table = table;
this.srcFileStatus = srcFileStatus;
}

public BootstrapWriteStatus runMetadataBootstrap(String srcPartitionPath, String partitionPath, KeyGeneratorInterface keyGenerator) {
Path sourceFilePath = FileStatusUtils.toPath(srcFileStatus.getPath());
HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle = new HoodieBootstrapHandle(config, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
table, partitionPath, FSUtils.createNewFileIdPfx(), table.getTaskContextSupplier());
try {
Schema avroSchema = getAvroSchema(sourceFilePath);
Schema recordKeySchema = HoodieAvroUtils.generateProjectionSchema(avroSchema,
keyGenerator.getRecordKeyFieldNames());
LOG.info("Schema to be used for reading record Keys :" + recordKeySchema);
AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), recordKeySchema);
AvroReadSupport.setRequestedProjection(table.getHadoopConf(), recordKeySchema);
executeBootstrap(bootstrapHandle, sourceFilePath, keyGenerator, partitionPath, avroSchema);
} catch (Exception e) {
throw new HoodieException(e.getMessage(), e);
}

BootstrapWriteStatus writeStatus = (BootstrapWriteStatus) bootstrapHandle.writeStatuses().get(0);
BootstrapFileMapping bootstrapFileMapping = new BootstrapFileMapping(
config.getBootstrapSourceBasePath(), srcPartitionPath, partitionPath,
srcFileStatus, writeStatus.getFileId());
writeStatus.setBootstrapSourceFileMapping(bootstrapFileMapping);
return writeStatus;
}

abstract Schema getAvroSchema(Path sourceFilePath) throws IOException;

abstract void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle,
Path sourceFilePath, KeyGeneratorInterface keyGenerator, String partitionPath, Schema avroSchema) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.bootstrap;

import org.apache.hudi.client.bootstrap.BootstrapWriteStatus;
import org.apache.hudi.keygen.KeyGeneratorInterface;

/**
* Bootstrap metadata handler to assist in bootstrapping only metadata.
*/
public interface BootstrapMetadataHandler {
/**
* Execute bootstrap with only metatata.
* @param srcPartitionPath source partition path.
* @param partitionPath destination partition path.
* @param keyGenerator key generator to use.
* @return the {@link BootstrapWriteStatus} which has the result of execution.
*/
BootstrapWriteStatus runMetadataBootstrap(String srcPartitionPath, String partitionPath, KeyGeneratorInterface keyGenerator);
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.bootstrap;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.avro.model.HoodieFileStatus;
import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;

public class MetadataBootstrapHandlerFactory {

public static BootstrapMetadataHandler getMetadataHandler(HoodieWriteConfig config, HoodieTable table, HoodieFileStatus srcFileStatus) {
Path sourceFilePath = FileStatusUtils.toPath(srcFileStatus.getPath());

String extension = FSUtils.getFileExtension(sourceFilePath.toString());
BootstrapMetadataHandler bootstrapMetadataHandler;
if (ORC.getFileExtension().equals(extension)) {
return new OrcBootstrapMetadataHandler(config, table, srcFileStatus);
} else if (PARQUET.getFileExtension().equals(extension)) {
return new ParquetBootstrapMetadataHandler(config, table, srcFileStatus);
} else {
throw new HoodieIOException("Bootstrap Metadata Handler not implemented for base file format " + extension);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.bootstrap;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.bootstrap.BootstrapRecordPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.OrcReaderIterator;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieBootstrapHandle;
import org.apache.hudi.keygen.KeyGeneratorInterface;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;

import java.io.IOException;

class OrcBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
private static final Logger LOG = LogManager.getLogger(OrcBootstrapMetadataHandler.class);

public OrcBootstrapMetadataHandler(HoodieWriteConfig config, HoodieTable table, HoodieFileStatus srcFileStatus) {
super(config, table, srcFileStatus);
}

@Override
Schema getAvroSchema(Path sourceFilePath) throws IOException {
Reader orcReader = OrcFile.createReader(sourceFilePath, OrcFile.readerOptions(table.getHadoopConf()));
TypeDescription orcSchema = orcReader.getSchema();
return AvroOrcUtils.createAvroSchema(orcSchema);
}

@Override
void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle, Path sourceFilePath, KeyGeneratorInterface keyGenerator,
String partitionPath, Schema avroSchema) throws Exception {
BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void> wrapper = null;
Reader orcReader = OrcFile.createReader(sourceFilePath, OrcFile.readerOptions(table.getHadoopConf()));
TypeDescription orcSchema = orcReader.getSchema();
try (RecordReader reader = orcReader.rows(new Reader.Options(table.getHadoopConf()).schema(orcSchema))) {
wrapper = new SparkBoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config,
new OrcReaderIterator(reader, avroSchema, orcSchema), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
String recKey = keyGenerator.getKey(inp).getRecordKey();
GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
HoodieRecord rec = new HoodieRecord(new HoodieKey(recKey, partitionPath), payload);
return rec;
});
wrapper.execute();
} catch (Exception e) {
throw new HoodieException(e);
} finally {
bootstrapHandle.close();
if (null != wrapper) {
wrapper.shutdownNow();
}
}
}
}

Loading