diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java index 65c20004c81ed..209531dd2e090 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java @@ -20,6 +20,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -58,18 +59,22 @@ public JavaRDD> repartitionRecords(JavaRDD> reco public boolean arePartitionRecordsSorted() { return true; } - - private static String getRecordSortColumnValues(HoodieRecord record, + + private static Object getRecordSortColumnValues(HoodieRecord record, String[] sortColumns, SerializableSchema schema) { try { GenericRecord genericRecord = (GenericRecord) record.getData().getInsertValue(schema.get()).get(); - StringBuilder sb = new StringBuilder(); - for (String col : sortColumns) { - sb.append(genericRecord.get(col)); - } + if (sortColumns.length == 1) { + return HoodieAvroUtils.getNestedFieldVal(genericRecord, sortColumns[0], true); + } else { + StringBuilder sb = new StringBuilder(); + for (String col : sortColumns) { + sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord, col, true)); + } - return sb.toString(); + return sb.toString(); + } } catch (IOException e) { throw new HoodieIOException("Unable to read record with key:" + record.getKey(), e); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java index 8f6da70844e07..4f6de8ba5f3c3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java @@ -62,12 +62,17 @@ private void readObject(ObjectInputStream in) throws IOException { // create a public write method for unit test public void writeObjectTo(ObjectOutputStream out) throws IOException { - out.writeUTF(schema.toString()); + // Note: writeUTF cannot support string length > 64K. So use writeObject which has small overhead (relatively). + out.writeObject(schema.toString()); } // create a public read method for unit test public void readObjectFrom(ObjectInputStream in) throws IOException { - schema = new Schema.Parser().parse(in.readUTF()); + try { + schema = new Schema.Parser().parse(in.readObject().toString()); + } catch (ClassNotFoundException e) { + throw new IOException("unable to parse schema", e); + } } @Override diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializableSchema.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializableSchema.java index 72843a453c24c..03421a3005f04 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializableSchema.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializableSchema.java @@ -49,6 +49,11 @@ public void testSerDeser() throws IOException { verifySchema(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS); } + @Test + public void testLargeSchema() throws IOException { + verifySchema(new Schema.Parser().parse(generateLargeSchema())); + } + private void verifySchema(Schema schema) throws IOException { SerializableSchema serializableSchema = new SerializableSchema(schema); assertEquals(schema, serializableSchema.get()); @@ -65,4 +70,20 @@ private void verifySchema(Schema schema) throws IOException { newSchema.readObjectFrom(new ObjectInputStream(new ByteArrayInputStream(bytesWritten))); assertEquals(schema, newSchema.get()); } + + // generate large schemas (>64K which is limitation of ObjectOutputStream#writeUTF) to validate it can be serialized + private String generateLargeSchema() { + StringBuilder schema = new StringBuilder(); + schema.append(HoodieTestDataGenerator.TRIP_SCHEMA_PREFIX); + int fieldNum = 1; + while (schema.length() < 80 * 1024) { + String fieldName = "field" + fieldNum; + schema.append("{\"name\": \"" + fieldName + "\",\"type\": {\"type\":\"record\", \"name\":\"" + fieldName + "\",\"fields\": [" + + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},"); + fieldNum++; + } + + schema.append(HoodieTestDataGenerator.TRIP_SCHEMA_SUFFIX); + return schema.toString(); + } }