diff --git a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java index a0f8c73a690d8..f17fc3a310a7f 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieStorageWriterFactory.java @@ -18,6 +18,9 @@ package org.apache.hudi.io.storage; +import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG; +import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; + import java.io.IOException; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; @@ -25,6 +28,7 @@ import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.BloomFilter; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; import org.apache.parquet.avro.AvroSchemaConverter; @@ -34,9 +38,12 @@ public class HoodieStorageWriterFactory { public static HoodieStorageWriter getStorageWriter( String commitTime, Path path, HoodieTable hoodieTable, HoodieWriteConfig config, Schema schema) throws IOException { - //TODO - based on the metadata choose the implementation of HoodieStorageWriter - // Currently only parquet is supported - return newParquetStorageWriter(commitTime, path, config, schema, hoodieTable); + final String name = path.getName(); + final String extension = FSUtils.isLogFile(path) ? HOODIE_LOG.getFileExtension() : FSUtils.getFileExtension(name); + if (PARQUET.getFileExtension().equals(extension)) { + return newParquetStorageWriter(commitTime, path, config, schema, hoodieTable); + } + throw new UnsupportedOperationException(extension + " format not supported yet."); } private static parquetWriter = + HoodieStorageWriterFactory.getStorageWriter( + commitTime, parquetPath, table, cfg, HoodieTestDataGenerator.avroSchema); + Assert.assertTrue(parquetWriter instanceof HoodieParquetWriter); + + // other file format exception. + final Path logPath = new Path(basePath + "/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1"); + try { + HoodieStorageWriter logWriter = + HoodieStorageWriterFactory.getStorageWriter( + commitTime, logPath, table, cfg, HoodieTestDataGenerator.avroSchema); + fail("should fail since log storage writer is not supported yet."); + } catch (Exception e) { + Assert.assertTrue(e instanceof UnsupportedOperationException); + Assert.assertTrue(e.getMessage().contains("format not supported yet.")); + } + } +}