Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ public class HoodieExampleDataGenerator<T extends HoodieRecordPayload<T>> {
public static final String[] DEFAULT_PARTITION_PATHS =
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};
public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ "
+ "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},"
+ "{\"name\":\"fare\",\"type\": \"double\"}]}";
+ "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},"
+ "{\"name\":\"fare\",\"type\": \"double\"}]}";
public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);

private static final Random RAND = new Random(46474747);
Expand Down Expand Up @@ -130,12 +130,36 @@ public Stream<HoodieRecord<T>> generateInsertsStream(String commitTime, Integer
});
}

/**
* Generates new inserts, across a single partition path. It also updates the list of existing keys.
*/
public List<HoodieRecord<T>> generateInsertsOnPartition(String commitTime, Integer n, String partitionPath) {
return generateInsertsStreamOnPartition(commitTime, n, partitionPath).collect(Collectors.toList());
}

/**
* Generates new inserts, across a single partition path. It also updates the list of existing keys.
*/
public Stream<HoodieRecord<T>> generateInsertsStreamOnPartition(String commitTime, Integer n, String partitionPath) {
int currSize = getNumExistingKeys();

return IntStream.range(0, n).boxed().map(i -> {
HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
KeyPartition kp = new KeyPartition();
kp.key = key;
kp.partitionPath = partitionPath;
existingKeys.put(currSize + i, kp);
numExistingKeys++;
return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime));
});
}

/**
* Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned
* list
*
* @param commitTime Commit Timestamp
* @param n Number of updates (including dups)
* @param n Number of updates (including dups)
* @return list of hoodie record updates
*/
public List<HoodieRecord<T>> generateUpdates(String commitTime, Integer n) {
Expand All @@ -148,15 +172,32 @@ public List<HoodieRecord<T>> generateUpdates(String commitTime, Integer n) {
return updates;
}

/**
* Generates new updates, one for each of the keys above
* list
*
* @param commitTime Commit Timestamp
* @return list of hoodie record updates
*/
public List<HoodieRecord<T>> generateUniqueUpdates(String commitTime) {
List<HoodieRecord<T>> updates = new ArrayList<>();
for (int i = 0; i < numExistingKeys; i++) {
KeyPartition kp = existingKeys.get(i);
HoodieRecord<T> record = generateUpdateRecord(kp.key, commitTime);
updates.add(record);
}
return updates;
}

public HoodieRecord<T> generateUpdateRecord(HoodieKey key, String commitTime) {
return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime));
}

private Option<String> convertToString(HoodieRecord<T> record) {
try {
String str = HoodieAvroUtils
.bytesToAvro(((HoodieAvroPayload)record.getData()).getRecordBytes(), avroSchema)
.toString();
.bytesToAvro(((HoodieAvroPayload) record.getData()).getRecordBytes(), avroSchema)
.toString();
str = "{" + str.substring(str.indexOf("\"ts\":"));
return Option.of(str.replaceAll("}", ", \"partitionpath\": \"" + record.getPartitionPath() + "\"}"));
} catch (IOException e) {
Expand All @@ -166,7 +207,7 @@ private Option<String> convertToString(HoodieRecord<T> record) {

public List<String> convertToStringList(List<HoodieRecord<T>> records) {
return records.stream().map(this::convertToString).filter(Option::isPresent).map(Option::get)
.collect(Collectors.toList());
.collect(Collectors.toList());
}

public int getNumExistingKeys() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
import org.apache.hudi.examples.common.HoodieExampleSparkUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
Expand Down Expand Up @@ -65,30 +66,51 @@ public static void main(String[] args) {
public static void runQuickstart(JavaSparkContext jsc, SparkSession spark, String tableName, String tablePath) {
final HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>();

insertData(spark, jsc, tablePath, tableName, dataGen);
String snapshotQuery = "SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table";

Dataset<Row> insertDf = insertData(spark, jsc, tablePath, tableName, dataGen);
queryData(spark, jsc, tablePath, tableName, dataGen);
assert insertDf.except(spark.sql(snapshotQuery)).count() == 0;

updateData(spark, jsc, tablePath, tableName, dataGen);
Dataset<Row> snapshotBeforeUpdate = spark.sql(snapshotQuery);
Dataset<Row> updateDf = updateData(spark, jsc, tablePath, tableName, dataGen);
queryData(spark, jsc, tablePath, tableName, dataGen);
Dataset<Row> snapshotAfterUpdate = spark.sql(snapshotQuery);
assert snapshotAfterUpdate.intersect(updateDf).count() == updateDf.count();
assert snapshotAfterUpdate.except(updateDf).except(snapshotBeforeUpdate).count() == 0;

incrementalQuery(spark, tablePath, tableName);
pointInTimeQuery(spark, tablePath, tableName);

delete(spark, tablePath, tableName);
Dataset<Row> snapshotBeforeDelete = snapshotAfterUpdate;
Dataset<Row> deleteDf = delete(spark, tablePath, tableName);
queryData(spark, jsc, tablePath, tableName, dataGen);
Dataset<Row> snapshotAfterDelete = spark.sql(snapshotQuery);
assert snapshotAfterDelete.intersect(deleteDf).count() == 0;
assert snapshotBeforeDelete.except(deleteDf).except(snapshotAfterDelete).count() == 0;

insertOverwriteData(spark, jsc, tablePath, tableName, dataGen);
Dataset<Row> snapshotBeforeOverwrite = snapshotAfterDelete;
Dataset<Row> overwriteDf = insertOverwriteData(spark, jsc, tablePath, tableName, dataGen);
queryData(spark, jsc, tablePath, tableName, dataGen);
Dataset<Row> withoutThirdPartitionDf = snapshotBeforeOverwrite.filter("partitionpath != '" + HoodieExampleDataGenerator.DEFAULT_THIRD_PARTITION_PATH + "'");
Dataset<Row> expectedDf = withoutThirdPartitionDf.union(overwriteDf);
Dataset<Row> snapshotAfterOverwrite = spark.sql(snapshotQuery);
assert snapshotAfterOverwrite.except(expectedDf).count() == 0;


Dataset<Row> snapshotBeforeDeleteByPartition = snapshotAfterOverwrite;
deleteByPartition(spark, tablePath, tableName);
queryData(spark, jsc, tablePath, tableName, dataGen);
Dataset<Row> snapshotAfterDeleteByPartition = spark.sql(snapshotQuery);
assert snapshotAfterDeleteByPartition.intersect(snapshotBeforeDeleteByPartition.filter("partitionpath == '" + HoodieExampleDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "'")).count() == 0;
assert snapshotAfterDeleteByPartition.count() == snapshotBeforeDeleteByPartition.filter("partitionpath != '" + HoodieExampleDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "'").count();
}

/**
* Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi dataset as below.
*/
public static void insertData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName,
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
public static Dataset<Row> insertData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName,
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
String commitTime = Long.toString(System.currentTimeMillis());
List<String> inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20));
Dataset<Row> df = spark.read().json(jsc.parallelize(inserts, 1));
Expand All @@ -101,15 +123,16 @@ public static void insertData(SparkSession spark, JavaSparkContext jsc, String t
.option(TBL_NAME.key(), tableName)
.mode(Overwrite)
.save(tablePath);
return df;
}

/**
* Generate new records, load them into a {@link Dataset} and insert-overwrite it into the Hudi dataset
*/
public static void insertOverwriteData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName,
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
public static Dataset<Row> insertOverwriteData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName,
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
String commitTime = Long.toString(System.currentTimeMillis());
List<String> inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20));
List<String> inserts = dataGen.convertToStringList(dataGen.generateInsertsOnPartition(commitTime, 20, HoodieExampleDataGenerator.DEFAULT_THIRD_PARTITION_PATH));
Dataset<Row> df = spark.read().json(jsc.parallelize(inserts, 1));

df.write().format("org.apache.hudi")
Expand All @@ -121,9 +144,9 @@ public static void insertOverwriteData(SparkSession spark, JavaSparkContext jsc,
.option(TBL_NAME.key(), tableName)
.mode(Append)
.save(tablePath);
return df;
}


/**
* Load the data files into a DataFrame.
*/
Expand Down Expand Up @@ -157,11 +180,11 @@ public static void queryData(SparkSession spark, JavaSparkContext jsc, String ta
* This is similar to inserting new data. Generate updates to existing trips using the data generator,
* load into a DataFrame and write DataFrame into the hudi dataset.
*/
public static void updateData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName,
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
public static Dataset<Row> updateData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName,
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {

String commitTime = Long.toString(System.currentTimeMillis());
List<String> updates = dataGen.convertToStringList(dataGen.generateUpdates(commitTime, 10));
List<String> updates = dataGen.convertToStringList(dataGen.generateUniqueUpdates(commitTime));
Dataset<Row> df = spark.read().json(jsc.parallelize(updates, 1));
df.write().format("org.apache.hudi")
.options(QuickstartUtils.getQuickstartWriteConfigs())
Expand All @@ -171,16 +194,18 @@ public static void updateData(SparkSession spark, JavaSparkContext jsc, String t
.option(TBL_NAME.key(), tableName)
.mode(Append)
.save(tablePath);
return df;
}

/**
* Deleta data based in data information.
*/
public static void delete(SparkSession spark, String tablePath, String tableName) {
public static Dataset<Row> delete(SparkSession spark, String tablePath, String tableName) {

Dataset<Row> roViewDF = spark.read().format("org.apache.hudi").load(tablePath + "/*/*/*/*");
roViewDF.createOrReplaceTempView("hudi_ro_table");
Dataset<Row> df = spark.sql("select uuid, partitionpath, ts from hudi_ro_table limit 2");
Dataset<Row> toBeDeletedDf = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table limit 2");
Dataset<Row> df = toBeDeletedDf.select("uuid", "partitionpath", "ts");

df.write().format("org.apache.hudi")
.options(QuickstartUtils.getQuickstartWriteConfigs())
Expand All @@ -191,10 +216,11 @@ public static void delete(SparkSession spark, String tablePath, String tableName
.option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value())
.mode(Append)
.save(tablePath);
return toBeDeletedDf;
}

/**
* Delete the data of a single or multiple partitions.
* Delete the data of the first partition.
*/
public static void deleteByPartition(SparkSession spark, String tablePath, String tableName) {
Dataset<Row> df = spark.emptyDataFrame();
Expand All @@ -204,9 +230,8 @@ public static void deleteByPartition(SparkSession spark, String tablePath, Strin
.option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid")
.option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partitionpath")
.option(TBL_NAME.key(), tableName)
.option("hoodie.datasource.write.operation", WriteOperationType.DELETE.value())
.option("hoodie.datasource.write.partitions.to.delete",
String.join(", ", HoodieExampleDataGenerator.DEFAULT_PARTITION_PATHS))
.option("hoodie.datasource.write.operation", WriteOperationType.DELETE_PARTITION.value())
.option("hoodie.datasource.write.partitions.to.delete", HoodieExampleDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
.mode(Append)
.save(tablePath);
}
Expand All @@ -223,7 +248,7 @@ public static void incrementalQuery(SparkSession spark, String tablePath, String
.map((Function<Row, String>) row -> row.getString(0))
.take(50);

String beginTime = commits.get(commits.size() - 2); // commit time we are interested in
String beginTime = commits.get(commits.size() - 1); // commit time we are interested in

// incrementally query data
Dataset<Row> incViewDF = spark
Expand All @@ -250,7 +275,7 @@ public static void pointInTimeQuery(SparkSession spark, String tablePath, String
.map((Function<Row, String>) row -> row.getString(0))
.take(50);
String beginTime = "000"; // Represents all commits > this time.
String endTime = commits.get(commits.size() - 2); // commit time we are interested in
String endTime = commits.get(commits.size() - 1); // commit time we are interested in

//incrementally query data
Dataset<Row> incViewDF = spark.read().format("org.apache.hudi")
Expand Down