Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
@Order(Ordered.HIGHEST_PRECEDENCE)
public class HoodieHistoryFileNameProvider extends DefaultHistoryFileNameProvider {

@Override
public String getHistoryFileName() {
return "hoodie-cmd.log";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,17 @@ public class HoodieSplashScreen extends DefaultBannerProvider {
+ "* *" + OsUtils.LINE_SEPARATOR
+ "===================================================================" + OsUtils.LINE_SEPARATOR;

@Override
public String getBanner() {
return screen;
}

@Override
public String getVersion() {
return "1.0";
}

@Override
public String getWelcomeMessage() {
return "Welcome to Apache Hudi CLI. Please type help if you are looking for help. ";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ protected AbstractHoodieClient(JavaSparkContext jsc, HoodieWriteConfig clientCon
/**
* Releases any resources used by the client.
*/
@Override
public void close() {
stopEmbeddedServerView(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,7 @@ private void rollbackInternal(String commitToRollback) {
/**
* Releases any resources used by the client.
*/
@Override
public void close() {
// Stop timeline-server if running
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public SparkBoundedInMemoryExecutor(final HoodieWriteConfig hoodieConfig, Bounde
this.sparkThreadTaskContext = TaskContext.get();
}

@Override
public void preExecute() {
// Passing parent thread's TaskContext to newly launched thread for it to access original TaskContext properties.
TaskContext$.MODULE$.setTaskContext(sparkThreadTaskContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public int hashCode() {
return Objects.hashCode(fileId, minRecordKey, maxRecordKey);
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("BloomIndexFileInfo {");
sb.append(" fileId=").append(fileId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ private Connection getHBaseConnection() {
*/
private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
hbaseConnection.close();
Expand All @@ -167,6 +168,7 @@ public void run() {
/**
* Ensure that any resources used for indexing are released here.
*/
@Override
public void close() {
this.hBaseIndexQPSResourceAllocator.releaseQPSResources();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public boolean canWrite(HoodieRecord record) {
/**
* Perform the actual writing of the given record into the backing file.
*/
@Override
public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
Option recordMetadata = record.getData().getMetadata();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public static Schema createHoodieWriteSchema(Schema originalSchema) {
return HoodieAvroUtils.addMetadataFields(originalSchema);
}

@Override
public Path makeNewPath(String partitionPath) {
Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath);
try {
Expand All @@ -103,6 +104,7 @@ public Path makeNewPath(String partitionPath) {
return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, writeToken, fileId));
}

@Override
public Schema getWriterSchema() {
return writerSchema;
}
Expand All @@ -113,20 +115,23 @@ public Schema getWriterSchema() {
* - Whether it belongs to the same partitionPath as existing records - Whether the current file written bytes lt max
* file size
*/
@Override
public boolean canWrite(HoodieRecord record) {
return false;
}

/**
* Perform the actual writing of the given record into the backing file.
*/
@Override
public void write(HoodieRecord record, Option<IndexedRecord> insertValue) {
// NO_OP
}

/**
* Perform the actual writing of the given record into the backing file.
*/
@Override
public void write(HoodieRecord record, Option<IndexedRecord> avroRecord, Option<Exception> exception) {
Option recordMetadata = record.getData().getMetadata();
if (exception.isPresent() && exception.get() instanceof Throwable) {
Expand All @@ -141,6 +146,7 @@ public void write(HoodieRecord record, Option<IndexedRecord> avroRecord, Option<
/**
* Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields.
*/
@Override
protected GenericRecord rewriteRecord(GenericRecord record) {
return HoodieAvroUtils.rewriteRecord(record, writerSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOEx
writeSupport.add(record.getRecordKey());
}

@Override
public boolean canWrite() {
return fs.getBytesWritten(file) < maxFileSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ public Iterator<List<WriteStatus>> handleInsertPartition(String commitTime, Inte
* @param jsc JavaSparkContext
* @return Cleaner Plan
*/
@Override
public HoodieCleanerPlan scheduleClean(JavaSparkContext jsc) {
try {
HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public HoodieLogFile getLogFile() {
*/
private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public FileSystem getFs() {
return fs;
}

@Override
public HoodieLogFile getLogFile() {
return logFile;
}
Expand Down Expand Up @@ -212,6 +213,7 @@ private void flush() throws IOException {
output.hsync();
}

@Override
public long getCurrentSize() throws IOException {
if (output == null) {
throw new IllegalStateException("Cannot get current size as the underlying stream has been closed already");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public HoodieTimeline getCommitsTimeline() {
* timeline * With Async compaction a requested/inflight compaction-instant is a valid baseInstant for a file-slice as
* there could be delta-commits with that baseInstant.
*/
@Override
public HoodieTimeline getCommitsAndCompactionTimeline() {
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ protected List<HoodieFileGroup> buildFileGroups(Stream<HoodieDataFile> dataFileS
/**
* Clears the partition Map and reset view states.
*/
@Override
public final void reset() {
try {
writeLock.lock();
Expand Down Expand Up @@ -380,6 +381,7 @@ public final Option<HoodieDataFile> getDataFileOn(String partitionStr, String in
/**
* Get Latest data file for a partition and file-Id.
*/
@Override
public final Option<HoodieDataFile> getLatestDataFile(String partitionStr, String fileId) {
try {
readLock.lock();
Expand Down Expand Up @@ -434,6 +436,7 @@ public final Stream<FileSlice> getLatestFileSlices(String partitionStr) {
/**
* Get Latest File Slice for a given fileId in a given partition.
*/
@Override
public final Option<FileSlice> getLatestFileSlice(String partitionStr, String fileId) {
try {
readLock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,15 @@ public Stream<HoodieFileGroup> fetchAllStoredFileGroups() {
});
}

@Override
public void close() {
closed = true;
super.reset();
partitionToFileGroupsMap = null;
fgIdToPendingCompaction = null;
}

@Override
public boolean isClosed() {
return closed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ protected Map<String, List<HoodieFileGroup>> createPartitionToFileGroups() {
}
}

@Override
protected Map<HoodieFileGroupId, Pair<String, CompactionOperation>> createFileIdToPendingCompactionMap(
Map<HoodieFileGroupId, Pair<String, CompactionOperation>> fgIdToPendingCompaction) {
try {
Expand All @@ -91,6 +92,7 @@ protected Map<HoodieFileGroupId, Pair<String, CompactionOperation>> createFileId
}
}

@Override
public Stream<HoodieFileGroup> getAllFileGroups() {
return ((ExternalSpillableMap) partitionToFileGroupsMap).valueStream()
.flatMap(fg -> ((List<HoodieFileGroup>) fg).stream());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public static long getObjectSize(Object obj) throws UnsupportedOperationExceptio

private final LoadingCache<Class<?>, ClassSizeInfo> classSizeInfos =
CacheBuilder.newBuilder().build(new CacheLoader<Class<?>, ClassSizeInfo>() {
@Override
public ClassSizeInfo load(Class<?> clazz) {
return new ClassSizeInfo(clazz);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ private void initFile(File writeOnlyFile) throws IOException {
*/
private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
if (writeOnlyFileHandle != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ private void close() {

private void addShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,12 @@ private Map<HoodieTableMetaClient, List<FileStatus>> groupFileStatus(FileStatus[
return grouped;
}

@Override
public void setConf(Configuration conf) {
this.conf = conf;
}

@Override
public Configuration getConf() {
return conf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,7 @@ public static class HoodieCombineFileInputFormatShim<K, V> extends CombineFileIn

public HoodieCombineFileInputFormatShim() {}

@Override
public Path[] getInputPathsShim(JobConf conf) {
try {
return FileInputFormat.getInputPaths(conf);
Expand All @@ -813,6 +814,7 @@ public Path[] getInputPathsShim(JobConf conf) {
}
}

@Override
public void createPool(JobConf conf, PathFilter... filters) {
super.createPool(conf, filters);
}
Expand All @@ -822,6 +824,7 @@ public RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporte
throw new IOException("CombineFileInputFormat.getRecordReader not needed.");
}

@Override
protected List<FileStatus> listStatus(JobContext job) throws IOException {
LOG.info("Listing status in HoodieCombineHiveInputFormat.HoodieCombineFileInputFormatShim");
List<FileStatus> result;
Expand Down Expand Up @@ -851,6 +854,7 @@ protected List<FileStatus> listStatus(JobContext job) throws IOException {
return result;
}

@Override
public CombineFileSplit[] getSplits(JobConf job, int numSplits) throws IOException {
long minSize = job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE, 0L);
if (job.getLong("mapreduce.input.fileinputformat.split.minsize.per.node", 0L) == 0L) {
Expand Down Expand Up @@ -879,10 +883,12 @@ public CombineFileSplit[] getSplits(JobConf job, int numSplits) throws IOExcepti
return (CombineFileSplit[]) inputSplitShims.toArray(new HadoopShimsSecure.InputSplitShim[inputSplitShims.size()]);
}

@Override
public HadoopShimsSecure.InputSplitShim getInputSplitShim() throws IOException {
return new HadoopShimsSecure.InputSplitShim();
}

@Override
public RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter reporter,
Class<RecordReader<K, V>> rrClass) throws IOException {
return new HadoopShimsSecure.CombineFileRecordReader(job, split, reporter, rrClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ private HoodieInstant fetchNextCompactionInstant() throws InterruptedException {
/**
* Start Compaction Service.
*/
@Override
protected Pair<CompletableFuture, ExecutorService> startService() {
ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction);
List<CompletableFuture<Boolean>> compactionFutures =
Expand Down