Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -64,20 +65,17 @@ public class HDFSParquetImporter implements Serializable {

private static volatile Logger log = LogManager.getLogger(HDFSParquetImporter.class);

public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd");
private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class);
private static final DateTimeFormatter PARTITION_FORMATTER = DateTimeFormatter.ofPattern("yyyy/MM/dd")
.withZone(ZoneId.systemDefault());
private final Config cfg;
private transient FileSystem fs;
/**
* Bag of properties with source, hoodie client, key generator etc.
*/
private TypedProperties props;

public HDFSParquetImporter(Config cfg) throws IOException {
public HDFSParquetImporter(Config cfg) {
this.cfg = cfg;
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
log.info("Creating Cleaner with configs : " + props.toString());
}

public static void main(String[] args) throws Exception {
Expand All @@ -98,8 +96,11 @@ public static void main(String[] args) throws Exception {

}

public int dataImport(JavaSparkContext jsc, int retry) throws Exception {
public int dataImport(JavaSparkContext jsc, int retry) {
this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
log.info("Starting data import with configs : " + props.toString());
int ret = -1;
try {
// Verify that targetPath is not present.
Expand All @@ -110,7 +111,7 @@ public int dataImport(JavaSparkContext jsc, int retry) throws Exception {
ret = dataImport(jsc);
} while (ret != 0 && retry-- > 0);
} catch (Throwable t) {
logger.error(t);
log.error(t);
}
return ret;
}
Expand Down Expand Up @@ -141,7 +142,7 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
JavaRDD<WriteStatus> writeResponse = load(client, instantTime, hoodieRecords);
return UtilHelpers.handleErrors(jsc, instantTime, writeResponse);
} catch (Throwable t) {
logger.error("Error occurred.", t);
log.error("Error occurred.", t);
}
return -1;
}
Expand All @@ -159,8 +160,7 @@ protected JavaRDD<HoodieRecord<HoodieRecordPayload>> buildHoodieRecordsForImport
return jsc
.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class,
job.getConfiguration())
// To reduce large number of
// tasks.
// To reduce large number of tasks.
.coalesce(16 * cfg.parallelism).map(entry -> {
GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) entry)._2();
Object partitionField = genericRecord.get(cfg.partitionKey);
Expand All @@ -172,16 +172,16 @@ protected JavaRDD<HoodieRecord<HoodieRecordPayload>> buildHoodieRecordsForImport
throw new HoodieIOException("row field is missing. :" + cfg.rowKey);
}
String partitionPath = partitionField.toString();
logger.info("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")");
log.debug("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")");
if (partitionField instanceof Number) {
try {
long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L);
partitionPath = PARTITION_FORMATTER.format(new Date(ts));
partitionPath = PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
} catch (NumberFormatException nfe) {
logger.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")");
log.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")");
}
}
return new HoodieRecord<>(new HoodieKey((String) rowField, partitionPath),
return new HoodieRecord<>(new HoodieKey(rowField.toString(), partitionPath),
new HoodieJsonPayload(genericRecord.toString()));
});
}
Expand All @@ -195,11 +195,31 @@ protected JavaRDD<HoodieRecord<HoodieRecordPayload>> buildHoodieRecordsForImport
* @param <T> Type
*/
protected <T extends HoodieRecordPayload> JavaRDD<WriteStatus> load(HoodieWriteClient client, String instantTime,
JavaRDD<HoodieRecord<T>> hoodieRecords) {
if (cfg.command.toLowerCase().equals("insert")) {
return client.insert(hoodieRecords, instantTime);
JavaRDD<HoodieRecord<T>> hoodieRecords) throws Exception {
switch (cfg.command.toLowerCase()) {
case "upsert": {
return client.upsert(hoodieRecords, instantTime);
}
case "bulkinsert": {
return client.bulkInsert(hoodieRecords, instantTime);
}
default: {
return client.insert(hoodieRecords, instantTime);
}
}
}

public static class CommandValidator implements IValueValidator<String> {

List<String> validCommands = Arrays.asList("insert", "upsert", "bulkinsert");

@Override
public void validate(String name, String value) throws ParameterException {
if (value == null || !validCommands.contains(value.toLowerCase())) {
throw new ParameterException(
String.format("Invalid command: value:%s: supported commands:%s", value, validCommands));
}
}
return client.upsert(hoodieRecords, instantTime);
}

public static class FormatValidator implements IValueValidator<String> {
Expand All @@ -217,8 +237,8 @@ public void validate(String name, String value) throws ParameterException {

public static class Config implements Serializable {

@Parameter(names = {"--command", "-c"}, description = "Write command Valid values are insert(default)/upsert",
required = false)
@Parameter(names = {"--command", "-c"}, description = "Write command Valid values are insert(default)/upsert/bulkinsert",
required = false, validateValueWith = CommandValidator.class)
public String command = "INSERT";
@Parameter(names = {"--src-path", "-sp"}, description = "Base path for the input dataset", required = true)
public String srcPath = null;
Expand All @@ -233,7 +253,7 @@ public static class Config implements Serializable {
public String rowKey = null;
@Parameter(names = {"--partition-key-field", "-pk"}, description = "Partition key field name", required = true)
public String partitionKey = null;
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert(default)/upsert/bulkinsert", required = true)
public int parallelism = 1;
@Parameter(names = {"--schema-file", "-sf"}, description = "path for Avro schema file", required = true)
public String schemaFile = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
.orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).withParallelism(parallelism, parallelism)
HoodieWriteConfig.newBuilder().withPath(basePath)
.withParallelism(parallelism, parallelism)
.withBulkInsertParallelism(parallelism)
.withSchema(schemaStr).combineInput(true, true).withCompactionConfig(compactionConfig)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withProps(properties).build();
Expand Down