Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ public class HoodieHiveUtils {
public static final String DEFAULT_SCAN_MODE = SNAPSHOT_SCAN_MODE;
public static final int DEFAULT_MAX_COMMITS = 1;
public static final int MAX_COMMIT_ALL = -1;
public static final int DEFAULT_LEVELS_TO_BASEPATH = 3;
public static final Pattern HOODIE_CONSUME_MODE_PATTERN_STRING = Pattern.compile("hoodie\\.(.*)\\.consume\\.mode");
public static final String GLOBALLY_CONSISTENT_READ_TIMESTAMP = "last_replication_timestamp";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.HoodieHFileInputFormat;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
Expand All @@ -68,6 +69,7 @@

import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;

public class HoodieInputFormatUtils {

Expand Down Expand Up @@ -324,14 +326,24 @@ public static Map<Path, HoodieTableMetaClient> getTableMetaClientByPartitionPath
* Extract HoodieTableMetaClient from a partition path (not base path)
*/
public static HoodieTableMetaClient getTableMetaClientForBasePathUnchecked(Configuration conf, Path partitionPath) throws IOException {
Path baseDir = partitionPath;
FileSystem fs = partitionPath.getFileSystem(conf);
int levels = HoodieHiveUtils.DEFAULT_LEVELS_TO_BASEPATH;
if (HoodiePartitionMetadata.hasPartitionMetadata(fs, partitionPath)) {
HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, partitionPath);
metadata.readFromFS();
levels = metadata.getPartitionDepth();
int levels = metadata.getPartitionDepth();
baseDir = HoodieHiveUtils.getNthParent(partitionPath, levels);
} else {
for (int i = 0; i < partitionPath.depth(); i++) {
if (fs.exists(new Path(baseDir, METAFOLDER_NAME))) {
break;
} else if (i == partitionPath.depth() - 1) {
throw new TableNotFoundException(partitionPath.toString());
} else {
baseDir = baseDir.getParent();
}
}
}
Path baseDir = HoodieHiveUtils.getNthParent(partitionPath, levels);
LOG.info("Reading hoodie metadata from path " + baseDir.toString());
return HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir.toString()).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,21 @@ public void testInputFormatLoad() throws IOException {
assertEquals(10, inputSplits.length);
}

@Test
public void testInputFormatLoadWithEmptyTable() throws IOException {
// initial hoodie table
String bathPathStr = "/tmp/test_empty_table";
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), bathPathStr, HoodieTableType.COPY_ON_WRITE,
baseFileFormat);
// Add the paths
FileInputFormat.setInputPaths(jobConf, bathPathStr);

FileStatus[] files = inputFormat.listStatus(jobConf);
assertEquals(0, files.length);
InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 0);
assertEquals(0, inputSplits.length);
}

@Test
public void testInputFormatUpdates() throws IOException {
// initial commit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,21 @@ public void testInputFormatLoad() throws IOException {
assertEquals(10, files.length);
}

@Test
public void testInputFormatLoadWithEmptyTable() throws IOException {
// initial hoodie table
String bathPathStr = "/tmp/test_empty_table";
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), bathPathStr, HoodieTableType.COPY_ON_WRITE,
baseFileFormat);
// Add the paths
FileInputFormat.setInputPaths(jobConf, bathPathStr);

FileStatus[] files = inputFormat.listStatus(jobConf);
assertEquals(0, files.length);
InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 0);
assertEquals(0, inputSplits.length);
}

@Test
public void testInputFormatUpdates() throws IOException {
// initial commit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ public class TestInputPathHandler {
// non Hoodie table
public static final String TRIPS_STATS_TEST_NAME = "trips_stats";

// empty snapshot table
public static final String EMPTY_SNAPSHOT_TEST_NAME = "empty_snapshot";

// empty incremental table
public static final String EMPTY_INCREMENTAL_TEST_NAME = "empty_incremental";

@TempDir
static java.nio.file.Path parentPath;

Expand All @@ -67,6 +73,8 @@ public class TestInputPathHandler {
private static String basePathTable2 = null;
private static String basePathTable3 = null;
private static String basePathTable4 = null; // non hoodie Path
private static String basePathTable5 = null;
private static String basePathTable6 = null;
private static List<String> incrementalTables;
private static List<Path> incrementalPaths;
private static List<Path> snapshotPaths;
Expand Down Expand Up @@ -110,6 +118,9 @@ static void initTables() throws IOException {
basePathTable2 = parentPath.resolve(MODEL_TRIPS_TEST_NAME).toAbsolutePath().toString();
basePathTable3 = parentPath.resolve(ETL_TRIPS_TEST_NAME).toAbsolutePath().toString();
basePathTable4 = parentPath.resolve(TRIPS_STATS_TEST_NAME).toAbsolutePath().toString();
String tempPath = "/tmp/";
basePathTable5 = tempPath + EMPTY_SNAPSHOT_TEST_NAME;
basePathTable6 = tempPath + EMPTY_INCREMENTAL_TEST_NAME;

dfs.mkdirs(new Path(basePathTable1));
initTableType(dfs.getConf(), basePathTable1, RAW_TRIPS_TEST_NAME, HoodieTableType.MERGE_ON_READ);
Expand All @@ -126,13 +137,20 @@ static void initTables() throws IOException {
dfs.mkdirs(new Path(basePathTable4));
nonHoodiePaths.addAll(generatePartitions(dfs, basePathTable4));

initTableType(dfs.getConf(), basePathTable5, EMPTY_SNAPSHOT_TEST_NAME, HoodieTableType.COPY_ON_WRITE);
snapshotPaths.add(new Path(basePathTable5));

initTableType(dfs.getConf(), basePathTable6, EMPTY_INCREMENTAL_TEST_NAME, HoodieTableType.MERGE_ON_READ);
incrementalPaths.add(new Path(basePathTable6));

inputPaths.addAll(incrementalPaths);
inputPaths.addAll(snapshotPaths);
inputPaths.addAll(nonHoodiePaths);

incrementalTables = new ArrayList<>();
incrementalTables.add(RAW_TRIPS_TEST_NAME);
incrementalTables.add(MODEL_TRIPS_TEST_NAME);
incrementalTables.add(EMPTY_INCREMENTAL_TEST_NAME);
}

static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
Expand Down