Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,7 @@ private Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>>
}
}
List<HoodieRecord<T>> taggedRecords = new ArrayList<>();
HTable hTable = null;
try {
hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName));
try (HTable hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName))) {
List<Get> statements = new ArrayList<>();
List<HoodieRecord> currentBatchOfRecords = new LinkedList<>();
// Do the tagging.
Expand Down Expand Up @@ -250,15 +248,6 @@ private Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>>
}
} catch (IOException e) {
throw new HoodieIndexException("Failed to Tag indexed locations because of exception with HBase Client", e);
} finally {
if (hTable != null) {
try {
hTable.close();
} catch (IOException e) {
// Ignore
}
}

}
return taggedRecords.iterator();
};
Expand Down Expand Up @@ -444,16 +433,14 @@ public static class HbasePutBatchSizeCalculator implements Serializable {
*/
public int getBatchSize(int numRegionServersForTable, int maxQpsPerRegionServer, int numTasksDuringPut,
int maxExecutors, int sleepTimeMs, float qpsFraction) {
int numRSAlive = numRegionServersForTable;
int maxReqPerSec = (int) (qpsFraction * numRSAlive * maxQpsPerRegionServer);
int numTasks = numTasksDuringPut;
int maxParallelPuts = Math.max(1, Math.min(numTasks, maxExecutors));
int maxReqPerSec = (int) (qpsFraction * numRegionServersForTable * maxQpsPerRegionServer);
int maxParallelPuts = Math.max(1, Math.min(numTasksDuringPut, maxExecutors));
int maxReqsSentPerTaskPerSec = MILLI_SECONDS_IN_A_SECOND / sleepTimeMs;
int multiPutBatchSize = Math.max(1, maxReqPerSec / (maxParallelPuts * maxReqsSentPerTaskPerSec));
LOG.info("HbaseIndexThrottling: qpsFraction :" + qpsFraction);
LOG.info("HbaseIndexThrottling: numRSAlive :" + numRSAlive);
LOG.info("HbaseIndexThrottling: numRSAlive :" + numRegionServersForTable);
LOG.info("HbaseIndexThrottling: maxReqPerSec :" + maxReqPerSec);
LOG.info("HbaseIndexThrottling: numTasks :" + numTasks);
LOG.info("HbaseIndexThrottling: numTasks :" + numTasksDuringPut);
LOG.info("HbaseIndexThrottling: maxExecutors :" + maxExecutors);
LOG.info("HbaseIndexThrottling: maxParallelPuts :" + maxParallelPuts);
LOG.info("HbaseIndexThrottling: maxReqsSentPerTaskPerSec :" + maxReqsSentPerTaskPerSec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ private Stream<HoodieInstant> getInstantsToArchive(JavaSparkContext jsc) {
HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
.getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants();
Stream<HoodieInstant> instants = cleanAndRollbackTimeline.getInstants()
.collect(Collectors.groupingBy(s -> s.getAction())).entrySet().stream().map(i -> {
if (i.getValue().size() > maxCommitsToKeep) {
return i.getValue().subList(0, i.getValue().size() - minCommitsToKeep);
.collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream().map(hoodieInstants -> {
if (hoodieInstants.size() > maxCommitsToKeep) {
return hoodieInstants.subList(0, hoodieInstants.size() - minCommitsToKeep);
} else {
return new ArrayList<HoodieInstant>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,22 @@ public abstract class CompactionStrategy implements Serializable {
public Map<String, Double> captureMetrics(HoodieWriteConfig writeConfig, Option<HoodieBaseFile> dataFile,
String partitionPath, List<HoodieLogFile> logFiles) {
Map<String, Double> metrics = Maps.newHashMap();
Long defaultMaxParquetFileSize = writeConfig.getParquetMaxFileSize();
long defaultMaxParquetFileSize = writeConfig.getParquetMaxFileSize();
// Total size of all the log files
Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter(size -> size >= 0)
.reduce((size1, size2) -> size1 + size2).orElse(0L);
.reduce(Long::sum).orElse(0L);
// Total read will be the base file + all the log files
Long totalIORead =
FSUtils.getSizeInMB((dataFile.isPresent() ? dataFile.get().getFileSize() : 0L) + totalLogFileSize);
// Total write will be similar to the size of the base file
Long totalIOWrite =
FSUtils.getSizeInMB(dataFile.isPresent() ? dataFile.get().getFileSize() : defaultMaxParquetFileSize);
// Total IO will the the IO for read + write
Long totalIO = totalIORead + totalIOWrite;
long totalIO = totalIORead + totalIOWrite;
// Save these metrics and we will use during the filter
metrics.put(TOTAL_IO_READ_MB, totalIORead.doubleValue());
metrics.put(TOTAL_IO_WRITE_MB, totalIOWrite.doubleValue());
metrics.put(TOTAL_IO_MB, totalIO.doubleValue());
metrics.put(TOTAL_IO_MB, (double) totalIO);
metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize.doubleValue());
metrics.put(TOTAL_LOG_FILES, (double) logFiles.size());
return metrics;
Expand Down
17 changes: 7 additions & 10 deletions hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,14 @@ private Metrics(HoodieWriteConfig metricConfig) {
}
// reporter.start();

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
reporter.report();
Closeables.close(reporter.getReporter(), true);
} catch (Exception e) {
e.printStackTrace();
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
reporter.report();
Closeables.close(reporter.getReporter(), true);
} catch (Exception e) {
e.printStackTrace();
}
});
}));
}

public static Metrics getInstance() {
Expand Down
37 changes: 19 additions & 18 deletions hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -106,7 +107,7 @@ private String insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, Ho
Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> insertFn) throws Exception {

/**
/*
* do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages
* in insert(), if the implementation diverges.)
*/
Expand Down Expand Up @@ -606,8 +607,8 @@ public void testUpgradeDowngrade() {
String filePath2 = metaClient.getBasePath() + "/" + partition1 + "/" + fileName2;

List<String> deletePathPatterns1 = Arrays.asList(filePath1, filePath2);
List<String> successDeleteFiles1 = Arrays.asList(filePath1);
List<String> failedDeleteFiles1 = Arrays.asList(filePath2);
List<String> successDeleteFiles1 = Collections.singletonList(filePath1);
List<String> failedDeleteFiles1 = Collections.singletonList(filePath2);

// create partition1 clean stat.
HoodieCleanStat cleanStat1 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
Expand All @@ -630,7 +631,8 @@ public void testUpgradeDowngrade() {

// map with relative path.
Map<String, Tuple3> newExpected = new HashMap<>();
newExpected.put(partition1, new Tuple3<>(Arrays.asList(fileName1, fileName2), Arrays.asList(fileName1), Arrays.asList(fileName2)));
newExpected.put(partition1, new Tuple3<>(Arrays.asList(fileName1, fileName2), Collections.singletonList(fileName1),
Collections.singletonList(fileName2)));
newExpected.put(partition2, new Tuple3<>(deletePathPatterns2, successDeleteFiles2, failedDeleteFiles2));

HoodieCleanMetadata metadata =
Expand Down Expand Up @@ -1079,19 +1081,18 @@ private void testPendingCompactions(HoodieWriteConfig config, int expNumFilesDel
});

// Test for progress (Did we clean some files ?)
long numFilesUnderCompactionDeleted = hoodieCleanStats.stream().flatMap(cleanStat -> {
return convertPathToFileIdWithCommitTime(newMetaClient, cleanStat.getDeletePathPatterns())
.map(fileIdWithCommitTime -> {
if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) {
Assert.assertTrue("Deleted instant time must be less than pending compaction",
HoodieTimeline.compareTimestamps(
fileIdToLatestInstantBeforeCompaction.get(fileIdWithCommitTime.getKey()),
fileIdWithCommitTime.getValue(), HoodieTimeline.GREATER));
return true;
}
return false;
});
}).filter(x -> x).count();
long numFilesUnderCompactionDeleted = hoodieCleanStats.stream()
.flatMap(cleanStat -> convertPathToFileIdWithCommitTime(newMetaClient, cleanStat.getDeletePathPatterns())
.map(fileIdWithCommitTime -> {
if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) {
Assert.assertTrue("Deleted instant time must be less than pending compaction",
HoodieTimeline.compareTimestamps(
fileIdToLatestInstantBeforeCompaction.get(fileIdWithCommitTime.getKey()),
fileIdWithCommitTime.getValue(), HoodieTimeline.GREATER));
return true;
}
return false;
})).filter(x -> x).count();
long numDeleted =
hoodieCleanStats.stream().mapToLong(cleanStat -> cleanStat.getDeletePathPatterns().size()).sum();
// Tighter check for regression
Expand Down Expand Up @@ -1123,7 +1124,7 @@ private List<String> createMarkerFiles(String commitTime, int numFiles) throws I
* @throws IOException in case of error
*/
private int getTotalTempFiles() throws IOException {
RemoteIterator itr = fs.listFiles(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME), true);
RemoteIterator<?> itr = fs.listFiles(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME), true);
int count = 0;
while (itr.hasNext()) {
count++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,10 @@ public static void createCompactionAuxiliaryMetadata(String basePath, HoodieInst
Path commitFile =
new Path(basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instant.getFileName());
FileSystem fs = FSUtils.getFs(basePath, configuration);
FSDataOutputStream os = fs.create(commitFile, true);
HoodieCompactionPlan workload = new HoodieCompactionPlan();
try {
try (FSDataOutputStream os = fs.create(commitFile, true)) {
HoodieCompactionPlan workload = new HoodieCompactionPlan();
// Write empty commit metadata
os.writeBytes(new String(AvroUtils.serializeCompactionPlan(workload).get(), StandardCharsets.UTF_8));
} finally {
os.close();
}
}

Expand All @@ -225,13 +222,10 @@ public static void createSavepointFile(String basePath, String commitTime, Confi
Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeSavePointFileName(commitTime));
FileSystem fs = FSUtils.getFs(basePath, configuration);
FSDataOutputStream os = fs.create(commitFile, true);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
try {
try (FSDataOutputStream os = fs.create(commitFile, true)) {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
// Write empty commit metadata
os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} finally {
os.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -246,13 +247,17 @@ public void testTagLocation() throws Exception {
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5));

String filename0 =
HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", Arrays.asList(record1), schema, null, false);
HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", Collections.singletonList(record1),
schema, null, false);
String filename1 =
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Lists.newArrayList(), schema, null, false);
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Lists.newArrayList(),
schema, null, false);
String filename2 =
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Arrays.asList(record2), schema, null, false);
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record2),
schema, null, false);
String filename3 =
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Arrays.asList(record4), schema, null, false);
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record4),
schema, null, false);

// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
metaClient = HoodieTableMetaClient.reload(metaClient);
Expand All @@ -265,21 +270,29 @@ public void testTagLocation() throws Exception {
JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, table);

for (HoodieRecord record : taggedRecordRDD.collect()) {
if (record.getRecordKey().equals("000")) {
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename0)));
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange1.getJsonData());
} else if (record.getRecordKey().equals("001")) {
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename2)));
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange2.getJsonData());
} else if (record.getRecordKey().equals("002")) {
assertTrue(!record.isCurrentLocationKnown());
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange3.getJsonData());
} else if (record.getRecordKey().equals("003")) {
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3)));
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange5.getJsonData());
} else if (record.getRecordKey().equals("004")) {
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3)));
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange4.getJsonData());
switch (record.getRecordKey()) {
case "000":
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename0));
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange1.getJsonData());
break;
case "001":
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename2));
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange2.getJsonData());
break;
case "002":
assertFalse(record.isCurrentLocationKnown());
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange3.getJsonData());
break;
case "003":
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3));
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange5.getJsonData());
break;
case "004":
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3));
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange4.getJsonData());
break;
default:
throw new IllegalArgumentException("Unknown Key: " + record.getRecordKey());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,8 @@ private byte[] compressData(String jsonData) throws IOException {
}

private String unCompressData(byte[] data) throws IOException {
InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(data));
try {
try (InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(data))) {
return FileIOUtils.readAsUTFString(iis, dataSize);
} finally {
iis.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
* Scans a log file and provides block level iterator on the log file Loads the entire block contents in memory Can emit
Expand Down Expand Up @@ -107,25 +108,22 @@ public HoodieLogFile getLogFile() {
* Close the inputstream if not closed when the JVM exits.
*/
private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
close();
} catch (Exception e) {
LOG.warn("unable to close input stream for log file " + logFile, e);
// fail silently for any sort of exception
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
close();
} catch (Exception e) {
LOG.warn("unable to close input stream for log file " + logFile, e);
// fail silently for any sort of exception
}
});
}));
}

// TODO : convert content and block length to long by using ByteBuffer, raw byte [] allows
// for max of Integer size
private HoodieLogBlock readBlock() throws IOException {

int blocksize = -1;
int type = -1;
int blocksize;
int type;
HoodieLogBlockType blockType = null;
Map<HeaderMetadataType, String> header = null;

Expand Down Expand Up @@ -190,7 +188,7 @@ private HoodieLogBlock readBlock() throws IOException {
// 9. Read the log block end position in the log file
long blockEndPos = inputStream.getPos();

switch (blockType) {
switch (Objects.requireNonNull(blockType)) {
// based on type read the block
case AVRO_DATA_BLOCK:
if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
Expand Down Expand Up @@ -278,10 +276,10 @@ public void close() throws IOException {
}
}

@Override
/**
/*
* hasNext is not idempotent. TODO - Fix this. It is okay for now - PR
*/
@Override
public boolean hasNext() {
try {
return readMagic();
Expand Down Expand Up @@ -315,10 +313,7 @@ private boolean hasNextMagic() throws IOException {
long pos = inputStream.getPos();
// 1. Read magic header from the start of the block
inputStream.readFully(MAGIC_BUFFER, 0, 6);
if (!Arrays.equals(MAGIC_BUFFER, HoodieLogFormat.MAGIC)) {
return false;
}
return true;
return Arrays.equals(MAGIC_BUFFER, HoodieLogFormat.MAGIC);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public static List<String> getAllFoldersWithPartitionMetaFile(FileSystem fs, Str
return partitions;
}

public static final List<String> getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs,
public static List<String> getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs,
String markerDir) throws IOException {
List<String> dataFiles = new LinkedList<>();
processFiles(fs, markerDir, (status) -> {
Expand Down
Loading