Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;


/**
* Class to be used in tests to keep generating test inserts and updates against a corpus.
* <p>
Expand All @@ -73,14 +74,15 @@ public class HoodieTestDataGenerator {
public static final String[] DEFAULT_PARTITION_PATHS =
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};
public static final int DEFAULT_PARTITION_DEPTH = 3;
public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},"
+ "{\"name\":\"fare\",\"type\": \"double\"}]}";
public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"},"
+ "{\"name\":\"fare\",\"type\": \"double\"},"
+ "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}";
public static String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double";
public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double,boolean";
public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
public static Schema avroSchemaWithMetadataFields = HoodieAvroUtils.addMetadataFields(avroSchema);

Expand Down Expand Up @@ -117,6 +119,15 @@ public static TestRawTripPayload generateRandomValue(HoodieKey key, String commi
return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
}

/**
* Generates a new avro record of the above schema format for a delete.
*/
public static TestRawTripPayload generateRandomDeleteValue(HoodieKey key, String commitTime) throws IOException {
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0,
true);
return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
}

/**
* Generates a new avro record of the above schema format, retaining the key if optionally provided.
*/
Expand All @@ -126,7 +137,12 @@ public static HoodieAvroPayload generateAvroPayload(HoodieKey key, String commit
}

public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
double timestamp) {
double timestamp) {
return generateGenericRecord(rowKey, riderName, driverName, timestamp, false);
}

public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
double timestamp, boolean isDeleteRecord) {
GenericRecord rec = new GenericData.Record(avroSchema);
rec.put("_row_key", rowKey);
rec.put("timestamp", timestamp);
Expand All @@ -137,12 +153,18 @@ public static GenericRecord generateGenericRecord(String rowKey, String riderNam
rec.put("end_lat", rand.nextDouble());
rec.put("end_lon", rand.nextDouble());
rec.put("fare", rand.nextDouble() * 100);
if (isDeleteRecord) {
rec.put("_hoodie_is_deleted", true);
} else {
rec.put("_hoodie_is_deleted", false);
}
return rec;
}

public static void createCommitFile(String basePath, String commitTime, Configuration configuration) {
Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime),
HoodieTimeline.makeRequestedCommitFileName(commitTime)).forEach(f -> {
HoodieTimeline.makeRequestedCommitFileName(commitTime))
.forEach(f -> {
Path commitFile = new Path(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
FSDataOutputStream os = null;
Expand Down Expand Up @@ -176,7 +198,7 @@ public static void createCompactionRequestedFile(String basePath, String commitT
}

public static void createCompactionAuxiliaryMetadata(String basePath, HoodieInstant instant,
Configuration configuration) throws IOException {
Configuration configuration) throws IOException {
Path commitFile =
new Path(basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instant.getFileName());
FileSystem fs = FSUtils.getFs(basePath, configuration);
Expand Down Expand Up @@ -332,7 +354,7 @@ public List<HoodieRecord> generateUpdatesWithDiffPartition(String commitTime, Li
* list
*
* @param commitTime Commit Timestamp
* @param n Number of updates (including dups)
* @param n Number of updates (including dups)
* @return list of hoodie record updates
*/
public List<HoodieRecord> generateUpdates(String commitTime, Integer n) throws IOException {
Expand All @@ -349,7 +371,7 @@ public List<HoodieRecord> generateUpdates(String commitTime, Integer n) throws I
* Generates deduped updates of keys previously inserted, randomly distributed across the keys above.
*
* @param commitTime Commit Timestamp
* @param n Number of unique records
* @param n Number of unique records
* @return list of hoodie record updates
*/
public List<HoodieRecord> generateUniqueUpdates(String commitTime, Integer n) {
Expand All @@ -370,7 +392,7 @@ public List<HoodieKey> generateUniqueDeletes(Integer n) {
* Generates deduped updates of keys previously inserted, randomly distributed across the keys above.
*
* @param commitTime Commit Timestamp
* @param n Number of unique records
* @param n Number of unique records
* @return stream of hoodie record updates
*/
public Stream<HoodieRecord> generateUniqueUpdatesStream(String commitTime, Integer n) {
Expand Down Expand Up @@ -418,11 +440,46 @@ public Stream<HoodieKey> generateUniqueDeleteStream(Integer n) {
index = (index + 1) % numExistingKeys;
kp = existingKeys.get(index);
}
existingKeys.remove(kp);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the remove be with the key rather than the value?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you are right. I figured this recently. working on the fix.

numExistingKeys--;
used.add(kp);
return kp.key;
});
}

/**
* Generates deduped delete records previously inserted, randomly distributed across the keys above.
*
* @param commitTime Commit Timestamp
* @param n Number of unique records
* @return stream of hoodie records for delete
*/
public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String commitTime, Integer n) {
final Set<KeyPartition> used = new HashSet<>();

if (n > numExistingKeys) {
throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys");
}

return IntStream.range(0, n).boxed().map(i -> {
int index = numExistingKeys == 1 ? 0 : rand.nextInt(numExistingKeys - 1);
KeyPartition kp = existingKeys.get(index);
// Find the available keyPartition starting from randomly chosen one.
while (used.contains(kp)) {
index = (index + 1) % numExistingKeys;
kp = existingKeys.get(index);
}
existingKeys.remove(kp);
numExistingKeys--;
used.add(kp);
try {
return new HoodieRecord(kp.key, generateRandomDeleteValue(kp.key, commitTime));
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
});
}

public String[] getPartitionPaths() {
return partitionPaths;
}
Expand Down
7 changes: 4 additions & 3 deletions hudi-spark/src/main/java/org/apache/hudi/BaseAvroPayload.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
* Base class for all AVRO record based payloads, that can be ordered based on a field.
*/
public abstract class BaseAvroPayload implements Serializable {

/**
* Avro data extracted from the source converted to bytes.
*/
Expand All @@ -43,8 +42,10 @@ public abstract class BaseAvroPayload implements Serializable {
protected final Comparable orderingVal;

/**
* @param record
* @param orderingVal
* Instantiate {@link BaseAvroPayload}.
*
* @param record Generic record for the payload.
* @param orderingVal {@link Comparable} to be used in pre combine.
*/
public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {
try {
Expand Down
16 changes: 8 additions & 8 deletions hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public static Object getNestedFieldVal(GenericRecord record, String fieldName) {

/**
* Create a key generator class via reflection, passing in any configs needed.
*
* <p>
* If the class name of key generator is configured through the properties file, i.e., {@code props}, use the
* corresponding key generator class; otherwise, use the default key generator class specified in {@code
* DataSourceWriteOptions}.
Expand All @@ -125,7 +125,7 @@ public static HoodieRecordPayload createPayload(String payloadClass, GenericReco
throws IOException {
try {
return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
new Class<?>[]{GenericRecord.class, Comparable.class}, record, orderingVal);
new Class<?>[] {GenericRecord.class, Comparable.class}, record, orderingVal);
} catch (Throwable e) {
throw new IOException("Could not create payload for class: " + payloadClass, e);
}
Expand All @@ -140,7 +140,7 @@ public static void checkRequiredProperties(TypedProperties props, List<String> c
}

public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath,
String tblName, Map<String, String> parameters) {
String tblName, Map<String, String> parameters) {

// inline compaction is on by default for MOR
boolean inlineCompact = parameters.get(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY())
Expand All @@ -162,7 +162,7 @@ public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String
}

public static JavaRDD<WriteStatus> doWriteOperation(HoodieWriteClient client, JavaRDD<HoodieRecord> hoodieRecords,
String commitTime, String operation) {
String commitTime, String operation) {
if (operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())) {
return client.bulkInsert(hoodieRecords, commitTime);
} else if (operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())) {
Expand All @@ -174,19 +174,19 @@ public static JavaRDD<WriteStatus> doWriteOperation(HoodieWriteClient client, Ja
}

public static JavaRDD<WriteStatus> doDeleteOperation(HoodieWriteClient client, JavaRDD<HoodieKey> hoodieKeys,
String commitTime) {
String commitTime) {
return client.delete(hoodieKeys, commitTime);
}

public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey,
String payloadClass) throws IOException {
String payloadClass) throws IOException {
HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal);
return new HoodieRecord<>(hKey, payload);
}

@SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords,
HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> timelineService) {
HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> timelineService) {
HoodieReadClient client = null;
try {
client = new HoodieReadClient<>(jssc, writeConfig, timelineService);
Expand All @@ -205,7 +205,7 @@ public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRD

@SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords,
Map<String, String> parameters, Option<EmbeddedTimelineService> timelineService) {
Map<String, String> parameters, Option<EmbeddedTimelineService> timelineService) {
HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build();
return dropDuplicates(jssc, incomingHoodieRecords, writeConfig, timelineService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {

/**
* @param record
* @param orderingVal
*
*/
public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
Expand All @@ -61,8 +60,15 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we change the codes in combineAndGetUpdateValue(), but row doesn't have _hoodie_delete_marker column, we need to Option.of(genericRecord) twice.
Should we put the codes in getinsertvalue? like this:

  @Override
  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
    // combining strategy here trivially ignores currentValue on disk and writes this record
    return getInsertValue(schema);
  }

  @Override
  public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
    GenericRecord baseRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema);
    Object deleteMarker = baseRecord.get("_hoodie_delete_marker");
    if (deleteMarker instanceof Boolean && (boolean) deleteMarker) {
      return Option.empty();
    }
    return Option.of(baseRecord);
  }
``

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this in getInsertValue() means even inserts with the flag set will be deleted.. Not sure if this is intended behavior.. We only want to delete if updating and marker set?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have a performance concern here? Option.of should be very cheap right.. In any case, we can achieve the effect of what you mean, by simply hanging onto to the original Option[GenenricRecord]?

Option<IndexedRecord> val = getInsertValue(schema)
GenericRecord genericRecord = (GenericRecord) val.get();
...

Copy link
Copy Markdown
Contributor

@cdmikechen cdmikechen Dec 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar

Doing this in getInsertValue() means even inserts with the flag set will be deleted.. Not sure if this is intended behavior.. We only want to delete if updating and marker set?

If this is in a Kaapa architecture, it works. But if this is in a similar Lambda architecture, data should be rebuilt sometimes, it may will get whole data change logs by bulk insert.
Of course, this is just my assumption. Maybe our test cases haven't happen at present. If I think too much, and in fact it can't be found in actual cases, please ignore my review.

do you have a performance concern here? Option.of should be very cheap right.. In any case, we can achieve the effect of what you mean, by simply hanging onto to the original Option[GenenricRecord]?

Yes, Option.of may new another object. I personally feel that if an existing object already exists, unless there is a specific need, we should try to use the original object instead of creating a new one.

GenericRecord genericRecord = (GenericRecord) getInsertValue(schema).get();
// combining strategy here trivially ignores currentValue on disk and writes this record
return getInsertValue(schema);
Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
if (deleteMarker instanceof Boolean && (boolean) deleteMarker) {
return Option.empty();
} else {
return Option.of(genericRecord);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ public class DeltaSync implements Serializable {
private final HoodieTableType tableType;

public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider,
HoodieTableType tableType, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
HoodieTableType tableType, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {

this.cfg = cfg;
this.jssc = jssc;
Expand Down Expand Up @@ -288,7 +288,7 @@ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource
// default to RowBasedSchemaProvider
schemaProvider = this.schemaProvider == null || this.schemaProvider.getTargetSchema() == null
? transformed.map(r -> (SchemaProvider) new RowBasedSchemaProvider(r.schema())).orElse(
dataAndCheckpoint.getSchemaProvider())
dataAndCheckpoint.getSchemaProvider())
: this.schemaProvider;
} else {
// Pull the data from the source & prepare the write
Expand Down Expand Up @@ -316,22 +316,22 @@ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource
(Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField));
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
});

return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
}

/**
* Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive if needed.
*
* @param records Input Records
* @param records Input Records
* @param checkpointStr Checkpoint String
* @param metrics Metrics
* @param metrics Metrics
* @return Option Compaction instant if one is scheduled
*/
private Option<String> writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr,
HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception {
HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception {

Option<String> scheduledCompactionInstant = Option.empty();

// filter dupes if needed
if (cfg.filterDupes) {
// turn upserts to insert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ public SourceFormatAdapter(Source source) {

/**
* Fetch new data in avro format. If the source provides data in different format, they are translated to Avro format
*
* @param lastCkptStr
* @param sourceLimit
* @return
*/
public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String> lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
Expand All @@ -78,10 +74,6 @@ public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String

/**
* Fetch new data in row format. If the source provides data in different format, they are translated to Row format
*
* @param lastCkptStr
* @param sourceLimit
* @return
*/
public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
Expand All @@ -95,7 +87,8 @@ public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptS
.ofNullable(
r.getBatch()
.map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
source.getSparkSession()))
source.getSparkSession())
)
.orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
Expand Down
Loading