diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index b086a6c9edbab..8f44b8b7d0b34 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -37,6 +37,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -77,6 +78,7 @@ import com.codahale.metrics.Timer; import org.apache.avro.Schema; +import org.apache.avro.SchemaCompatibility; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -794,7 +796,7 @@ private HoodieWriteConfig getHoodieClientConfig(Schema schema) { .withProps(props); if (schema != null) { - builder.withSchema(schema.toString()); + builder.withSchema(getSchemaForWriteConfig(schema).toString()); } HoodieWriteConfig config = builder.build(); @@ -829,6 +831,25 @@ private HoodieWriteConfig getHoodieClientConfig(Schema schema) { return config; } + private Schema getSchemaForWriteConfig(Schema targetSchema) { + Schema newWriteSchema = targetSchema; + try { + if (targetSchema != null) { + // check if targetSchema is equal to NULL schema + if (SchemaCompatibility.checkReaderWriterCompatibility(targetSchema, InputBatch.NULL_SCHEMA).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE + && SchemaCompatibility.checkReaderWriterCompatibility(InputBatch.NULL_SCHEMA, targetSchema).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE) { + // target schema is null. fetch schema from commit metadata and use it + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setPayloadClassName(cfg.payloadClassName).build(); + TableSchemaResolver schemaResolver = new TableSchemaResolver(meta); + newWriteSchema = schemaResolver.getTableAvroSchema(false); + } + } + return newWriteSchema; + } catch (Exception e) { + throw new HoodieException("Failed to fetch schema from table."); + } + } + /** * Register Avro Schemas. * diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java index b2f6f784ca98b..04e3a574dc5c0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/InputBatch.java @@ -28,6 +28,7 @@ public class InputBatch { + public static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL); private final Option batch; private final String checkpointForNextBatch; private final SchemaProvider schemaProvider; @@ -69,7 +70,7 @@ public NullSchemaProvider(TypedProperties props, JavaSparkContext jssc) { @Override public Schema getSourceSchema() { - return Schema.create(Schema.Type.NULL); + return NULL_SCHEMA; } } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 2707e8392cb33..ad94ada59b2fd 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -1518,12 +1518,6 @@ private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTrans prepareParquetDFSSource(useSchemaProvider, hasTransformer, ""); } - private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, - String propsFileName, String parquetSourceRoot, boolean addCommonProps) throws IOException { - prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps, - "partition_path"); - } - private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, String propsFileName, String parquetSourceRoot, boolean addCommonProps, String partitionPath) throws IOException { prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps, @@ -1562,7 +1556,13 @@ private void testParquetDFSSource(boolean useSchemaProvider, List transf } private void testParquetDFSSource(boolean useSchemaProvider, List transformerClassNames, boolean testEmptyBatch) throws Exception { - prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null, testEmptyBatch ? "1" : ""); + PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfs" + testNum; + int parquetRecordsCount = 10; + boolean hasTransformer = transformerClassNames != null && !transformerClassNames.isEmpty(); + prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, false, null, null); + prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET, + PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" : ""); + String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName() @@ -1570,21 +1570,38 @@ private void testParquetDFSSource(boolean useSchemaProvider, List transf transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false, useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc); deltaStreamer.sync(); - TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext); - testNum++; + TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); if (testEmptyBatch) { prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null); - // parquet source to return empty batch deltaStreamer.sync(); // since we mimic'ed empty batch, total records should be same as first sync(). - TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext); + TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build(); // validate table schema fetches valid schema from last but one commit. TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); assertNotEquals(tableSchemaResolver.getTableAvroSchema(), Schema.create(Schema.Type.NULL).toString()); } + + // proceed w/ non empty batch. + prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "3.parquet", false, null, null); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(parquetRecordsCount + 100, tableBasePath, sqlContext); + // validate commit metadata for all completed commits to have valid schema in extra metadata. + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build(); + metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(entry -> assertValidSchemaInCommitMetadata(entry, metaClient)); + testNum++; + } + + private void assertValidSchemaInCommitMetadata(HoodieInstant instant, HoodieTableMetaClient metaClient) { + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class); + assertFalse(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))); + } catch (IOException ioException) { + throw new HoodieException("Failed to parse commit metadata for " + instant.toString()); + } } private void testORCDFSSource(boolean useSchemaProvider, List transformerClassNames) throws Exception {