Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand Down Expand Up @@ -58,18 +59,22 @@ public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> reco
public boolean arePartitionRecordsSorted() {
return true;
}
private static String getRecordSortColumnValues(HoodieRecord<? extends HoodieRecordPayload> record,

private static Object getRecordSortColumnValues(HoodieRecord<? extends HoodieRecordPayload> record,
String[] sortColumns,
SerializableSchema schema) {
try {
GenericRecord genericRecord = (GenericRecord) record.getData().getInsertValue(schema.get()).get();
StringBuilder sb = new StringBuilder();
for (String col : sortColumns) {
sb.append(genericRecord.get(col));
}
if (sortColumns.length == 1) {
return HoodieAvroUtils.getNestedFieldVal(genericRecord, sortColumns[0], true);
} else {
StringBuilder sb = new StringBuilder();
for (String col : sortColumns) {
sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord, col, true));
}

return sb.toString();
return sb.toString();
}
} catch (IOException e) {
throw new HoodieIOException("Unable to read record with key:" + record.getKey(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,17 @@ private void readObject(ObjectInputStream in) throws IOException {

// create a public write method for unit test
public void writeObjectTo(ObjectOutputStream out) throws IOException {
out.writeUTF(schema.toString());
// Note: writeUTF cannot support string length > 64K. So use writeObject which has small overhead (relatively).
out.writeObject(schema.toString());
}

// create a public read method for unit test
public void readObjectFrom(ObjectInputStream in) throws IOException {
schema = new Schema.Parser().parse(in.readUTF());
try {
schema = new Schema.Parser().parse(in.readObject().toString());
} catch (ClassNotFoundException e) {
throw new IOException("unable to parse schema", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public void testSerDeser() throws IOException {
verifySchema(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS);
}

@Test
public void testLargeSchema() throws IOException {
verifySchema(new Schema.Parser().parse(generateLargeSchema()));
}

private void verifySchema(Schema schema) throws IOException {
SerializableSchema serializableSchema = new SerializableSchema(schema);
assertEquals(schema, serializableSchema.get());
Expand All @@ -65,4 +70,20 @@ private void verifySchema(Schema schema) throws IOException {
newSchema.readObjectFrom(new ObjectInputStream(new ByteArrayInputStream(bytesWritten)));
assertEquals(schema, newSchema.get());
}

// generate large schemas (>64K which is limitation of ObjectOutputStream#writeUTF) to validate it can be serialized
private String generateLargeSchema() {
StringBuilder schema = new StringBuilder();
schema.append(HoodieTestDataGenerator.TRIP_SCHEMA_PREFIX);
int fieldNum = 1;
while (schema.length() < 80 * 1024) {
String fieldName = "field" + fieldNum;
schema.append("{\"name\": \"" + fieldName + "\",\"type\": {\"type\":\"record\", \"name\":\"" + fieldName + "\",\"fields\": ["
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},");
fieldNum++;
}

schema.append(HoodieTestDataGenerator.TRIP_SCHEMA_SUFFIX);
return schema.toString();
}
}