Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ public class HoodieTestDataGenerator implements AutoCloseable {
public static final String TRIP_FLATTENED_SCHEMA =
TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX;

public static final String TRIP_NESTED_EXAMPLE_SCHEMA =
TRIP_SCHEMA_PREFIX + FARE_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;

public static final String TRIP_SCHEMA = "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":["
+ "{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},"
+ "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
Expand All @@ -139,6 +142,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {


public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
public static final Schema NESTED_AVRO_SCHEMA = new Schema.Parser().parse(TRIP_NESTED_EXAMPLE_SCHEMA);
public static final TypeDescription ORC_SCHEMA = AvroOrcUtils.createOrcSchema(new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA));
public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
Expand All @@ -159,13 +163,21 @@ public HoodieTestDataGenerator(long seed) {
this(seed, DEFAULT_PARTITION_PATHS, new HashMap<>());
}

public HoodieTestDataGenerator(String schema, long seed) {
this(schema, seed, DEFAULT_PARTITION_PATHS, new HashMap<>());
}

public HoodieTestDataGenerator(long seed, String[] partitionPaths, Map<Integer, KeyPartition> keyPartitionMap) {
this(TRIP_EXAMPLE_SCHEMA, seed, partitionPaths, keyPartitionMap);
}

public HoodieTestDataGenerator(String schema, long seed, String[] partitionPaths, Map<Integer, KeyPartition> keyPartitionMap) {
this.rand = new Random(seed);
this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length);
this.existingKeysBySchema = new HashMap<>();
this.existingKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap);
this.existingKeysBySchema.put(schema, keyPartitionMap);
this.numKeysBySchema = new HashMap<>();
this.numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap.size());
this.numKeysBySchema.put(schema, keyPartitionMap.size());

logger.info(String.format("Test DataGenerator's seed (%s)", seed));
}
Expand Down Expand Up @@ -223,6 +235,8 @@ public RawTripTestPayload generateRandomValueAsPerSchema(String schemaStr, Hoodi
return generatePayloadForTripSchema(key, commitTime);
} else if (SHORT_TRIP_SCHEMA.equals(schemaStr)) {
return generatePayloadForShortTripSchema(key, commitTime);
} else if (TRIP_NESTED_EXAMPLE_SCHEMA.equals(schemaStr)) {
return generateNestedExampleRandomValue(key, commitTime);
}

return null;
Expand Down Expand Up @@ -255,6 +269,11 @@ private RawTripTestPayload generateRandomValue(
return generateRandomValue(key, instantTime, isFlattened, 0);
}

private RawTripTestPayload generateNestedExampleRandomValue(
HoodieKey key, String instantTime) throws IOException {
return generateNestedExampleRandomValue(key, instantTime, 0);
}

private RawTripTestPayload generateRandomValue(
HoodieKey key, String instantTime, boolean isFlattened, int ts) throws IOException {
GenericRecord rec = generateGenericRecord(
Expand All @@ -263,6 +282,14 @@ private RawTripTestPayload generateRandomValue(
return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
}

private RawTripTestPayload generateNestedExampleRandomValue(
HoodieKey key, String instantTime, int ts) throws IOException {
GenericRecord rec = generateNestedExampleGenericRecord(
key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, ts,
false);
return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
}

/**
* Generates a new avro record with TRIP_SCHEMA, retaining the key if optionally provided.
*/
Expand Down Expand Up @@ -298,10 +325,11 @@ public GenericRecord generateGenericRecord(String rowKey, String partitionPath,
return generateGenericRecord(rowKey, partitionPath, riderName, driverName, timestamp, false, false);
}

public GenericRecord generateGenericRecord(String rowKey, String partitionPath, String riderName, String driverName,
long timestamp, boolean isDeleteRecord,
boolean isFlattened) {
GenericRecord rec = new GenericData.Record(isFlattened ? FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA);

/**
* Populate rec with values for TRIP_SCHEMA_PREFIX
*/
private void generateTripPrefixValues(GenericRecord rec, String rowKey, String partitionPath, String riderName, String driverName, long timestamp) {
rec.put("_row_key", rowKey);
rec.put("timestamp", timestamp);
rec.put("partition_path", partitionPath);
Expand All @@ -311,47 +339,108 @@ public GenericRecord generateGenericRecord(String rowKey, String partitionPath,
rec.put("begin_lon", rand.nextDouble());
rec.put("end_lat", rand.nextDouble());
rec.put("end_lon", rand.nextDouble());
if (isFlattened) {
rec.put("fare", rand.nextDouble() * 100);
rec.put("currency", "USD");
} else {
rec.put("distance_in_meters", rand.nextInt());
rec.put("seconds_since_epoch", rand.nextLong());
rec.put("weight", rand.nextFloat());
byte[] bytes = "Canada".getBytes();
rec.put("nation", ByteBuffer.wrap(bytes));
long randomMillis = genRandomTimeMillis(rand);
Instant instant = Instant.ofEpochMilli(randomMillis);
rec.put("current_date", (int) LocalDateTime.ofInstant(instant, ZoneOffset.UTC).toLocalDate().toEpochDay());
rec.put("current_ts", randomMillis);

BigDecimal bigDecimal = new BigDecimal(String.format("%5f", rand.nextFloat()));
Schema decimalSchema = AVRO_SCHEMA.getField("height").schema();
Conversions.DecimalConversion decimalConversions = new Conversions.DecimalConversion();
GenericFixed genericFixed = decimalConversions.toFixed(bigDecimal, decimalSchema, LogicalTypes.decimal(10, 6));
rec.put("height", genericFixed);

rec.put("city_to_state", Collections.singletonMap("LA", "CA"));

GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
fareRecord.put("amount", rand.nextDouble() * 100);
fareRecord.put("currency", "USD");
rec.put("fare", fareRecord);

GenericArray<GenericRecord> tipHistoryArray = new GenericData.Array<>(1, AVRO_SCHEMA.getField("tip_history").schema());
Schema tipSchema = new Schema.Parser().parse(AVRO_SCHEMA.getField("tip_history").schema().toString()).getElementType();
GenericRecord tipRecord = new GenericData.Record(tipSchema);
tipRecord.put("amount", rand.nextDouble() * 100);
tipRecord.put("currency", "USD");
tipHistoryArray.add(tipRecord);
rec.put("tip_history", tipHistoryArray);
}
}

/**
* Populate rec with values for FARE_FLATTENED_SCHEMA
*/
private void generateFareFlattenedValues(GenericRecord rec) {
rec.put("fare", rand.nextDouble() * 100);
rec.put("currency", "USD");
}

/**
* Populate rec with values for EXTRA_TYPE_SCHEMA
*/
private void generateExtraSchemaValues(GenericRecord rec) {
rec.put("distance_in_meters", rand.nextInt());
rec.put("seconds_since_epoch", rand.nextLong());
rec.put("weight", rand.nextFloat());
byte[] bytes = "Canada".getBytes();
rec.put("nation", ByteBuffer.wrap(bytes));
long randomMillis = genRandomTimeMillis(rand);
Instant instant = Instant.ofEpochMilli(randomMillis);
rec.put("current_date", (int) LocalDateTime.ofInstant(instant, ZoneOffset.UTC).toLocalDate().toEpochDay());
rec.put("current_ts", randomMillis);

BigDecimal bigDecimal = new BigDecimal(String.format("%5f", rand.nextFloat()));
Schema decimalSchema = AVRO_SCHEMA.getField("height").schema();
Conversions.DecimalConversion decimalConversions = new Conversions.DecimalConversion();
GenericFixed genericFixed = decimalConversions.toFixed(bigDecimal, decimalSchema, LogicalTypes.decimal(10, 6));
rec.put("height", genericFixed);
}

/**
* Populate rec with values for MAP_TYPE_SCHEMA
*/
private void generateMapTypeValues(GenericRecord rec) {
rec.put("city_to_state", Collections.singletonMap("LA", "CA"));
}

/**
* Populate rec with values for FARE_NESTED_SCHEMA
*/
private void generateFareNestedValues(GenericRecord rec) {
GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
fareRecord.put("amount", rand.nextDouble() * 100);
fareRecord.put("currency", "USD");
rec.put("fare", fareRecord);
}

/**
* Populate rec with values for TIP_NESTED_SCHEMA
*/
private void generateTipNestedValues(GenericRecord rec) {
GenericArray<GenericRecord> tipHistoryArray = new GenericData.Array<>(1, AVRO_SCHEMA.getField("tip_history").schema());
Schema tipSchema = new Schema.Parser().parse(AVRO_SCHEMA.getField("tip_history").schema().toString()).getElementType();
GenericRecord tipRecord = new GenericData.Record(tipSchema);
tipRecord.put("amount", rand.nextDouble() * 100);
tipRecord.put("currency", "USD");
tipHistoryArray.add(tipRecord);
rec.put("tip_history", tipHistoryArray);
}

/**
* Populate rec with values for TRIP_SCHEMA_SUFFIX
*/
private void generateTripSuffixValues(GenericRecord rec, boolean isDeleteRecord) {
if (isDeleteRecord) {
rec.put("_hoodie_is_deleted", true);
} else {
rec.put("_hoodie_is_deleted", false);
}
}


/**
* Generate record conforming to TRIP_EXAMPLE_SCHEMA or TRIP_FLATTENED_SCHEMA if isFlattened is true
*/
public GenericRecord generateGenericRecord(String rowKey, String partitionPath, String riderName, String driverName,
long timestamp, boolean isDeleteRecord,
boolean isFlattened) {
GenericRecord rec = new GenericData.Record(isFlattened ? FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA);
generateTripPrefixValues(rec, rowKey, partitionPath, riderName, driverName, timestamp);
if (isFlattened) {
generateFareFlattenedValues(rec);
} else {
generateExtraSchemaValues(rec);
generateMapTypeValues(rec);
generateFareNestedValues(rec);
generateTipNestedValues(rec);
}
generateTripSuffixValues(rec, isDeleteRecord);
return rec;
}

/**
* Generate record conforming to TRIP_NESTED_EXAMPLE_SCHEMA
*/
public GenericRecord generateNestedExampleGenericRecord(String rowKey, String partitionPath, String riderName, String driverName,
long timestamp, boolean isDeleteRecord) {
GenericRecord rec = new GenericData.Record(NESTED_AVRO_SCHEMA);
generateTripPrefixValues(rec, rowKey, partitionPath, riderName, driverName, timestamp);
generateFareNestedValues(rec);
generateTripSuffixValues(rec, isDeleteRecord);
return rec;
}

Expand Down Expand Up @@ -474,13 +563,17 @@ public List<HoodieRecord> generateInsertsAsPerSchema(String commitTime, Integer
}

/**
* Generates new inserts with nested schema, uniformly across the partition paths above.
* Generates new inserts for TRIP_EXAMPLE_SCHEMA with nested schema, uniformly across the partition paths above.
* It also updates the list of existing keys.
*/
public List<HoodieRecord> generateInserts(String instantTime, Integer n) {
return generateInserts(instantTime, n, false);
}

public List<HoodieRecord> generateInsertsNestedExample(String instantTime, Integer n) {
return generateInsertsStream(instantTime, n, false, TRIP_NESTED_EXAMPLE_SCHEMA).collect(Collectors.toList());
}

/**
* Generates new inserts, uniformly across the partition paths above.
* It also updates the list of existing keys.
Expand Down Expand Up @@ -721,6 +814,10 @@ public List<HoodieRecord> generateUniqueUpdates(String instantTime, Integer n) {
return generateUniqueUpdatesStream(instantTime, n, TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList());
}

public List<HoodieRecord> generateUniqueUpdatesNestedExample(String instantTime, Integer n) {
return generateUniqueUpdatesStream(instantTime, n, TRIP_NESTED_EXAMPLE_SCHEMA).collect(Collectors.toList());
}

public List<HoodieRecord> generateUniqueUpdatesAsPerSchema(String instantTime, Integer n, String schemaStr) {
return generateUniqueUpdatesStream(instantTime, n, schemaStr).collect(Collectors.toList());
}
Expand Down
Loading