Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/demo/config/test-suite/target.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"fields" : [
{
"name" : "timestamp",
"type" : "double"
"type" : "long"
}, {
"name" : "_row_key",
"type" : "string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void init() {

// generate test data
partitions = Arrays.asList("2018", "2019", "2020");
double timestamp = new Double(Instant.now().toEpochMilli()).longValue();
long timestamp = Instant.now().toEpochMilli();
for (int i = 0; i < partitions.size(); i++) {
Dataset<Row> df = TestBootstrap.generateTestRawTripDataset(timestamp,
i * NUM_OF_RECORDS, i * NUM_OF_RECORDS + NUM_OF_RECORDS, null, jsc, sqlContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,12 @@ private void verifyResultData(List<GenericRecord> expectData) {

List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
List<HoodieTripModel> result = readData.stream().map(row ->
new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
new HoodieTripModel(row.getLong(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
row.getDouble(5), row.getDouble(6), row.getDouble(7)))
.collect(Collectors.toList());

List<HoodieTripModel> expected = expectData.stream().map(g ->
new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()),
new HoodieTripModel(Long.parseLong(g.get("timestamp").toString()),
g.get("_row_key").toString(),
g.get("rider").toString(),
g.get("driver").toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,8 @@ public void testFileSizeUpsertRecords() throws Exception {

@Test
public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
HoodieWriteConfig config = makeHoodieClientConfigBuilder()
Schema schema = getSchemaFromResource(TestCopyOnWriteActionExecutor.class, "/testDataGeneratorSchema.txt");
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schema.toString())
.withStorageConfig(HoodieStorageConfig.newBuilder()
.parquetMaxFileSize(1000 * 1024).hfileMaxFileSize(1000 * 1024).build()).build();
metaClient = HoodieTableMetaClient.reload(metaClient);
Expand Down
128 changes: 128 additions & 0 deletions hudi-client/src/test/resources/testDataGeneratorSchema.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"type" : "record",
"name" : "triprec",
"fields" : [
{
"name" : "timestamp",
"type" : "long"
}, {
"name" : "_row_key",
"type" : "string"
}, {
"name" : "rider",
"type" : "string"
}, {
"name" : "driver",
"type" : "string"
}, {
"name" : "begin_lat",
"type" : "double"
}, {
"name" : "begin_lon",
"type" : "double"
}, {
"name" : "end_lat",
"type" : "double"
}, {
"name" : "end_lon",
"type" : "double"
}, {
"name" : "distance_in_meters",
"type" : "int"
}, {
"name" : "seconds_since_epoch",
"type" : "long"
}, {
"name" : "weight",
"type" : "float"
},{
"name" : "nation",
"type" : "bytes"
},{
"name" : "current_date",
"type" : {
"type" : "int",
"logicalType" : "date"
}
},{
"name" : "current_ts",
"type" : {
"type" : "long"
}
},{
"name" : "height",
"type" : {
"type" : "fixed",
"name" : "abc",
"size" : 5,
"logicalType" : "decimal",
"precision" : 10,
"scale": 6
}
}, {
"name" :"city_to_state",
"type" : {
"type" : "map",
"values": "string"
}
},
{
"name" : "fare",
"type" : {
"type" : "record",
"name" : "fare",
"fields" : [
{
"name" : "amount",
"type" : "double"
},
{
"name" : "currency",
"type" : "string"
}
]
}
},
{
"name" : "tip_history",
"type" : {
"type" : "array",
"items" : {
"type" : "record",
"name" : "tip_history",
"fields" : [
{
"name" : "amount",
"type" : "double"
},
{
"name" : "currency",
"type" : "string"
}
]
}
}
},
{
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
} ]
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class HoodieTestDataGenerator {
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};
public static final int DEFAULT_PARTITION_DEPTH = 3;
public static final String TRIP_SCHEMA_PREFIX = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"timestamp\",\"type\": \"long\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"},";
Expand All @@ -116,14 +116,14 @@ public class HoodieTestDataGenerator {
TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX;

public static final String TRIP_SCHEMA = "{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":["
+ "{\"name\":\"timestamp\",\"type\":\"double\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},"
+ "{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},"
+ "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
public static final String SHORT_TRIP_SCHEMA = "{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":["
+ "{\"name\":\"timestamp\",\"type\":\"double\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},"
+ "{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},"
+ "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";

public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,int,bigint,float,binary,int,bigint,decimal(10,6),"
public static final String TRIP_HIVE_COLUMN_TYPES = "bigint,string,string,string,double,double,double,double,int,bigint,float,binary,int,bigint,decimal(10,6),"
+ "map<string,string>,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";


Expand Down Expand Up @@ -204,7 +204,7 @@ public static RawTripTestPayload generateRandomValue(HoodieKey key, String insta
public static RawTripTestPayload generateRandomValue(
HoodieKey key, String instantTime, boolean isFlattened) throws IOException {
GenericRecord rec = generateGenericRecord(
key.getRecordKey(), "rider-" + instantTime, "driver-" + instantTime, 0.0,
key.getRecordKey(), "rider-" + instantTime, "driver-" + instantTime, 0,
false, isFlattened);
return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
}
Expand All @@ -213,20 +213,20 @@ public static RawTripTestPayload generateRandomValue(
* Generates a new avro record with TRIP_SCHEMA, retaining the key if optionally provided.
*/
public RawTripTestPayload generatePayloadForTripSchema(HoodieKey key, String commitTime) throws IOException {
GenericRecord rec = generateRecordForTripSchema(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0);
GenericRecord rec = generateRecordForTripSchema(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0);
return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_SCHEMA);
}

public RawTripTestPayload generatePayloadForShortTripSchema(HoodieKey key, String commitTime) throws IOException {
GenericRecord rec = generateRecordForShortTripSchema(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0);
GenericRecord rec = generateRecordForShortTripSchema(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0);
return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), SHORT_TRIP_SCHEMA);
}

/**
* Generates a new avro record of the above schema format for a delete.
*/
public static RawTripTestPayload generateRandomDeleteValue(HoodieKey key, String instantTime) throws IOException {
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + instantTime, "driver-" + instantTime, 0.0,
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + instantTime, "driver-" + instantTime, 0,
true, false);
return new RawTripTestPayload(Option.of(rec.toString()), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA, true);
}
Expand All @@ -235,17 +235,17 @@ public static RawTripTestPayload generateRandomDeleteValue(HoodieKey key, String
* Generates a new avro record of the above schema format, retaining the key if optionally provided.
*/
public static HoodieAvroPayload generateAvroPayload(HoodieKey key, String instantTime) {
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + instantTime, "driver-" + instantTime, 0.0);
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + instantTime, "driver-" + instantTime, 0);
return new HoodieAvroPayload(Option.of(rec));
}

public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
double timestamp) {
long timestamp) {
return generateGenericRecord(rowKey, riderName, driverName, timestamp, false, false);
}

public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
double timestamp, boolean isDeleteRecord,
long timestamp, boolean isDeleteRecord,
boolean isFlattened) {
GenericRecord rec = new GenericData.Record(isFlattened ? FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA);
rec.put("_row_key", rowKey);
Expand Down Expand Up @@ -303,7 +303,7 @@ public static GenericRecord generateGenericRecord(String rowKey, String riderNam
/*
Generate random record using TRIP_SCHEMA
*/
public GenericRecord generateRecordForTripSchema(String rowKey, String riderName, String driverName, double timestamp) {
public GenericRecord generateRecordForTripSchema(String rowKey, String riderName, String driverName, long timestamp) {
GenericRecord rec = new GenericData.Record(AVRO_TRIP_SCHEMA);
rec.put("_row_key", rowKey);
rec.put("timestamp", timestamp);
Expand All @@ -314,7 +314,7 @@ public GenericRecord generateRecordForTripSchema(String rowKey, String riderName
return rec;
}

public GenericRecord generateRecordForShortTripSchema(String rowKey, String riderName, String driverName, double timestamp) {
public GenericRecord generateRecordForShortTripSchema(String rowKey, String riderName, String driverName, long timestamp) {
GenericRecord rec = new GenericData.Record(AVRO_SHORT_TRIP_SCHEMA);
rec.put("_row_key", rowKey);
rec.put("timestamp", timestamp);
Expand Down Expand Up @@ -780,7 +780,7 @@ public List<GenericRecord> generateGenericRecords(int numRecords) {
List<GenericRecord> list = new ArrayList<>();
IntStream.range(0, numRecords).forEach(i -> {
list.add(generateGenericRecord(UUID.randomUUID().toString(), UUID.randomUUID().toString(), UUID.randomUUID()
.toString(), RAND.nextDouble()));
.toString(), RAND.nextLong()));
});
return list;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ public HoodieExampleDataGenerator(String[] partitionPaths, Map<Integer, KeyParti
*/
@SuppressWarnings("unchecked")
public T generateRandomValue(HoodieKey key, String commitTime) {
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0);
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0);
return (T) new HoodieAvroPayload(Option.of(rec));
}

public GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
double timestamp) {
long timestamp) {
GenericRecord rec = new GenericData.Record(avroSchema);
rec.put("uuid", rowKey);
rec.put("ts", timestamp);
Expand Down
4 changes: 2 additions & 2 deletions hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public int getNumExistingKeys() {
}

public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
double timestamp) {
long timestamp) {
GenericRecord rec = new GenericData.Record(avroSchema);
rec.put("uuid", rowKey);
rec.put("ts", timestamp);
Expand All @@ -118,7 +118,7 @@ public static GenericRecord generateGenericRecord(String rowKey, String riderNam
public static OverwriteWithLatestAvroPayload generateRandomValue(HoodieKey key, String riderDriverSuffix)
throws IOException {
GenericRecord rec =
generateGenericRecord(key.getRecordKey(), "rider-" + riderDriverSuffix, "driver-" + riderDriverSuffix, 0.0);
generateGenericRecord(key.getRecordKey(), "rider-" + riderDriverSuffix, "driver-" + riderDriverSuffix, 0);
return new OverwriteWithLatestAvroPayload(Option.of(rec));
}

Expand Down
Loading