Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.deltastreamer.DeltaSync;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
Expand All @@ -38,30 +39,30 @@ public HoodieDeltaStreamerWrapper(Config cfg, JavaSparkContext jssc) throws Exce
super(cfg, jssc);
}

public JavaRDD<WriteStatus> upsert(Operation operation) throws Exception {
public JavaRDD<WriteStatus> upsert(WriteOperationType operation) throws Exception {
cfg.operation = operation;
return deltaSyncService.get().getDeltaSync().syncOnce().getRight();
}

public JavaRDD<WriteStatus> insert() throws Exception {
return upsert(Operation.INSERT);
return upsert(WriteOperationType.INSERT);
}

public JavaRDD<WriteStatus> bulkInsert() throws
Exception {
return upsert(Operation.BULK_INSERT);
return upsert(WriteOperationType.BULK_INSERT);
}

public void scheduleCompact() throws Exception {
// Since we don't support scheduleCompact() operation in delta-streamer, assume upsert without any data that will
// trigger scheduling compaction
upsert(Operation.UPSERT);
upsert(WriteOperationType.UPSERT);
}

public JavaRDD<WriteStatus> compact() throws Exception {
// Since we don't support compact() operation in delta-streamer, assume upsert without any data that will trigger
// inline compaction
return upsert(Operation.UPSERT);
return upsert(WriteOperationType.UPSERT);
}

public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
Expand All @@ -38,7 +39,6 @@
import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -126,7 +126,7 @@ public Option<String> startCommit() {

public JavaRDD<WriteStatus> upsert(Option<String> instantTime) throws Exception {
if (cfg.useDeltaStreamer) {
return deltaStreamerWrapper.upsert(Operation.UPSERT);
return deltaStreamerWrapper.upsert(WriteOperationType.UPSERT);
} else {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand Down Expand Up @@ -378,7 +379,7 @@ public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(
return Pair.of(schemaProvider, Pair.of(checkpointStr, jssc.emptyRDD()));
}

boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(HoodieDeltaStreamer.Operation.UPSERT);
boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT);
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
Expand Down Expand Up @@ -192,15 +193,11 @@ private boolean onDeltaSyncShutdown(boolean error) {
return true;
}

public enum Operation {
UPSERT, INSERT, BULK_INSERT
}

protected static class OperationConverter implements IStringConverter<Operation> {
protected static class OperationConverter implements IStringConverter<WriteOperationType> {

@Override
public Operation convert(String value) throws ParameterException {
return Operation.valueOf(value);
public WriteOperationType convert(String value) throws ParameterException {
return WriteOperationType.valueOf(value);
}
}

Expand Down Expand Up @@ -272,7 +269,7 @@ public static class Config implements Serializable {

@Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
+ "is purely new data/inserts to gain speed)", converter = OperationConverter.class)
public Operation operation = Operation.UPSERT;
public WriteOperationType operation = WriteOperationType.UPSERT;

@Parameter(names = {"--filter-dupes"},
description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
Expand Down Expand Up @@ -552,7 +549,7 @@ public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Config
}
}

ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != Operation.UPSERT,
ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != WriteOperationType.UPSERT,
"'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.");

this.props = properties.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.config.TypedProperties;
Expand Down Expand Up @@ -69,7 +70,7 @@ public HoodieMultiTableDeltaStreamer(Config config, JavaSparkContext jssc) throw
this.jssc = jssc;
String commonPropsFile = config.propsFilePath;
String configFolder = config.configFolder;
ValidationUtils.checkArgument(!config.filterDupes || config.operation != HoodieDeltaStreamer.Operation.UPSERT,
ValidationUtils.checkArgument(!config.filterDupes || config.operation != WriteOperationType.UPSERT,
"'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.");
FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration());
configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? configFolder.substring(0, configFolder.length() - 1) : configFolder;
Expand Down Expand Up @@ -268,7 +269,7 @@ public static class Config implements Serializable {

@Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
+ "is purely new data/inserts to gain speed)", converter = HoodieDeltaStreamer.OperationConverter.class)
public HoodieDeltaStreamer.Operation operation = HoodieDeltaStreamer.Operation.UPSERT;
public WriteOperationType operation = WriteOperationType.UPSERT;

@Parameter(names = {"--filter-dupes"},
description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
Expand Down
Loading