diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml index 66538e0d97d62..d350777cf70c8 100644 --- a/hudi-client/pom.xml +++ b/hudi-client/pom.xml @@ -85,11 +85,6 @@ log4j log4j - - org.slf4j - slf4j-api - ${slf4j.version} - diff --git a/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java b/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java index 8457b908f39fa..dd108be023b08 100644 --- a/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieClient.java @@ -26,9 +26,9 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -39,7 +39,7 @@ */ public abstract class AbstractHoodieClient implements Serializable, AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(AbstractHoodieClient.class); + private static final Logger LOG = LogManager.getLogger(AbstractHoodieClient.class); protected final transient FileSystem fs; protected final transient JavaSparkContext jsc; diff --git a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java index 00e0f751ee29c..56a47b73263e6 100644 --- a/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/CompactionAdminClient.java @@ -45,9 +45,9 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.io.IOException; @@ -65,7 +65,7 @@ */ public class CompactionAdminClient extends AbstractHoodieClient { - private static final Logger LOG = LoggerFactory.getLogger(CompactionAdminClient.class); + private static final Logger LOG = LogManager.getLogger(CompactionAdminClient.class); public CompactionAdminClient(JavaSparkContext jsc, String basePath) { super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build()); @@ -358,14 +358,13 @@ private List runRenamingOps(HoodieTableMetaClient metaClient, if (!dryRun) { return jsc.parallelize(renameActions, parallelism).map(lfPair -> { try { - LOG.info("RENAME {} => {}", lfPair.getLeft().getPath(), lfPair.getRight().getPath()); + LOG.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath()); renameLogFile(metaClient, lfPair.getLeft(), lfPair.getRight()); return new RenameOpResult(lfPair, true, Option.empty()); } catch (IOException e) { LOG.error("Error renaming log file", e); - LOG.error("\n\n\n***NOTE Compaction is in inconsistent state. " - + "Try running \"compaction repair {} \" to recover from failure ***\n\n\n", - lfPair.getLeft().getBaseCommitTime()); + LOG.error("\n\n\n***NOTE Compaction is in inconsistent state. Try running \"compaction repair " + + lfPair.getLeft().getBaseCommitTime() + "\" to recover from failure ***\n\n\n"); return new RenameOpResult(lfPair, false, Option.of(e)); } }).collect(); @@ -396,7 +395,7 @@ protected List> getRenamingActionsForUnschedu HoodieCompactionPlan plan = getCompactionPlan(metaClient, compactionInstant); if (plan.getOperations() != null) { LOG.info( - "Number of Compaction Operations :{} for instant :{}", plan.getOperations().size(), compactionInstant); + "Number of Compaction Operations :" + plan.getOperations().size() + " for instant :" + compactionInstant); List ops = plan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList()); return jsc.parallelize(ops, parallelism).flatMap(op -> { @@ -410,7 +409,7 @@ protected List> getRenamingActionsForUnschedu } }).collect(); } - LOG.warn("No operations for compaction instant : {}", compactionInstant); + LOG.warn("No operations for compaction instant : " + compactionInstant); return new ArrayList<>(); } diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java index 68503c67eabce..9411782bc238a 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java @@ -39,16 +39,16 @@ import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; public class HoodieCleanClient extends AbstractHoodieClient { - private static final Logger LOG = LoggerFactory.getLogger(HoodieCleanClient.class); + private static final Logger LOG = LogManager.getLogger(HoodieCleanClient.class); private final transient HoodieMetrics metrics; public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics) { @@ -85,7 +85,7 @@ protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOExcept // If there are inflight(failed) or previously requested clean operation, first perform them table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> { - LOG.info("There were previously unfinished cleaner operations. Finishing Instant={}", hoodieInstant); + LOG.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant); runClean(table, hoodieInstant); }); @@ -122,7 +122,7 @@ protected Option scheduleClean(String startCleanTime) { // Save to both aux and timeline folder try { table.getActiveTimeline().saveToCleanRequested(cleanInstant, AvroUtils.serializeCleanerPlan(cleanerPlan)); - LOG.info("Requesting Cleaning with instant time {}", cleanInstant); + LOG.info("Requesting Cleaning with instant time " + cleanInstant); } catch (IOException e) { LOG.error("Got exception when saving cleaner requested file", e); throw new HoodieIOException(e.getMessage(), e); @@ -173,20 +173,20 @@ private HoodieCleanMetadata runClean(HoodieTable table, HoodieInstant cleanIn Option durationInMs = Option.empty(); if (context != null) { durationInMs = Option.of(metrics.getDurationInMs(context.stop())); - LOG.info("cleanerElaspsedTime (Minutes): {}", durationInMs.get() / (1000 * 60)); + LOG.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60)); } HoodieTableMetaClient metaClient = createMetaClient(true); // Create the metadata and save it HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(metaClient, cleanInstant.getTimestamp(), durationInMs, cleanStats); - LOG.info("Cleaned {} files. Earliest Retained : {}", metadata.getTotalFilesDeleted(), metadata.getEarliestCommitToRetain()); + LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files. Earliest Retained :" + metadata.getEarliestCommitToRetain()); metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted()); table.getActiveTimeline().transitionCleanInflightToComplete( new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, cleanInstant.getTimestamp()), AvroUtils.serializeCleanMetadata(metadata)); - LOG.info("Marked clean started on {} as complete", cleanInstant.getTimestamp()); + LOG.info("Marked clean started on " + cleanInstant.getTimestamp() + " as complete"); return metadata; } catch (IOException e) { throw new HoodieIOException("Failed to clean up after commit", e); diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java index f309f40573af1..3c4290c89020d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieReadClient.java @@ -35,6 +35,8 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -49,8 +51,6 @@ import java.util.Set; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -58,7 +58,7 @@ */ public class HoodieReadClient extends AbstractHoodieClient { - private static final Logger LOG = LoggerFactory.getLogger(HoodieReadClient.class); + private static final Logger LOG = LogManager.getLogger(HoodieReadClient.class); /** * TODO: We need to persist the index type into hoodie.properties and be able to access the index just with a simple diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java index 09e3f58be7935..efb6d20115ff8 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java @@ -67,6 +67,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -84,8 +86,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -96,7 +96,7 @@ */ public class HoodieWriteClient extends AbstractHoodieClient { - private static final Logger LOG = LoggerFactory.getLogger(HoodieWriteClient.class); + private static final Logger LOG = LogManager.getLogger(HoodieWriteClient.class); private static final String UPDATE_STR = "update"; private static final String LOOKUP_STR = "lookup"; private final boolean rollbackPending; @@ -399,13 +399,13 @@ private JavaRDD bulkInsertInternal(JavaRDD> deduped private void commitOnAutoCommit(String commitTime, JavaRDD resultRDD, String actionType) { if (config.shouldAutoCommit()) { - LOG.info("Auto commit enabled: Committing {}", commitTime); + LOG.info("Auto commit enabled: Committing " + commitTime); boolean commitResult = commit(commitTime, resultRDD, Option.empty(), actionType); if (!commitResult) { throw new HoodieCommitException("Failed to commit " + commitTime); } } else { - LOG.info("Auto commit disabled for {}", commitTime); + LOG.info("Auto commit disabled for " + commitTime); } } @@ -454,13 +454,13 @@ private JavaRDD upsertRecordsInternal(JavaRDD> prep if (preppedRecords.getStorageLevel() == StorageLevel.NONE()) { preppedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER()); } else { - LOG.info("RDD PreppedRecords was persisted at: {}", preppedRecords.getStorageLevel()); + LOG.info("RDD PreppedRecords was persisted at: " + preppedRecords.getStorageLevel()); } WorkloadProfile profile = null; if (hoodieTable.isWorkloadProfileNeeded()) { profile = new WorkloadProfile(preppedRecords); - LOG.info("Workload profile : {}", profile); + LOG.info("Workload profile :" + profile); saveWorkloadProfileMetadataToInflight(profile, hoodieTable, commitTime); } @@ -526,7 +526,7 @@ public boolean commit(String commitTime, JavaRDD writeStatuses, private boolean commit(String commitTime, JavaRDD writeStatuses, Option> extraMetadata, String actionType) { - LOG.info("Commiting {}", commitTime); + LOG.info("Commiting " + commitTime); // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); @@ -573,7 +573,7 @@ private boolean commit(String commitTime, JavaRDD writeStatuses, metadata, actionType); writeContext = null; } - LOG.info("Committed {}", commitTime); + LOG.info("Committed " + commitTime); } catch (IOException e) { throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + commitTime, e); @@ -607,7 +607,7 @@ public boolean savepoint(String user, String comment) { } String latestCommit = table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp(); - LOG.info("Savepointing latest commit {}", latestCommit); + LOG.info("Savepointing latest commit " + latestCommit); return savepoint(latestCommit, user, comment); } @@ -658,7 +658,7 @@ public boolean savepoint(String commitTime, String user, String comment) { config.shouldAssumeDatePartitioning())) .mapToPair((PairFunction>) partitionPath -> { // Scan all partitions files with this commit time - LOG.info("Collecting latest files in partition path {}", partitionPath); + LOG.info("Collecting latest files in partition path " + partitionPath); ReadOptimizedView view = table.getROFileSystemView(); List latestFiles = view.getLatestDataFilesBeforeOrOn(partitionPath, commitTime) .map(HoodieDataFile::getFileName).collect(Collectors.toList()); @@ -672,7 +672,7 @@ public boolean savepoint(String commitTime, String user, String comment) { table.getActiveTimeline() .saveAsComplete(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, commitTime), AvroUtils.serializeSavepointMetadata(metadata)); - LOG.info("Savepoint {} created", commitTime); + LOG.info("Savepoint " + commitTime + " created"); return true; } catch (IOException e) { throw new HoodieSavepointException("Failed to savepoint " + commitTime, e); @@ -696,13 +696,13 @@ public void deleteSavepoint(String savepointTime) { HoodieInstant savePoint = new HoodieInstant(false, HoodieTimeline.SAVEPOINT_ACTION, savepointTime); boolean isSavepointPresent = table.getCompletedSavepointTimeline().containsInstant(savePoint); if (!isSavepointPresent) { - LOG.warn("No savepoint present {}", savepointTime); + LOG.warn("No savepoint present " + savepointTime); return; } activeTimeline.revertToInflight(savePoint); activeTimeline.deleteInflight(new HoodieInstant(true, HoodieTimeline.SAVEPOINT_ACTION, savepointTime)); - LOG.info("Savepoint {} deleted", savepointTime); + LOG.info("Savepoint " + savepointTime + " deleted"); } /** @@ -730,7 +730,7 @@ private void deleteRequestedCompaction(String compactionTime) { } else { throw new IllegalArgumentException("Compaction is not in requested state " + compactionTime); } - LOG.info("Compaction {} deleted", compactionTime); + LOG.info("Compaction " + compactionTime + " deleted"); } /** @@ -758,7 +758,7 @@ public boolean rollbackToSavepoint(String savepointTime) { List commitsToRollback = commitTimeline.findInstantsAfter(savepointTime, Integer.MAX_VALUE).getInstants() .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); - LOG.info("Rolling back commits {}", commitsToRollback); + LOG.info("Rolling back commits " + commitsToRollback); restoreToInstant(savepointTime); @@ -818,7 +818,7 @@ public void restoreToInstant(final String instantTime) throws HoodieRollbackExce // delete these files when it does not see a corresponding instant file under .hoodie List statsForCompaction = doRollbackAndGetStats(instant); instantsToStats.put(instant.getTimestamp(), statsForCompaction); - LOG.info("Deleted compaction instant {}", instant); + LOG.info("Deleted compaction instant " + instant); break; default: throw new IllegalArgumentException("invalid action name " + instant.getAction()); @@ -859,7 +859,7 @@ private List doRollbackAndGetStats(final HoodieInstant insta if (commitTimeline.empty() && inflightAndRequestedCommitTimeline.empty()) { // nothing to rollback - LOG.info("No commits to rollback {}", commitToRollback); + LOG.info("No commits to rollback " + commitToRollback); } // Make sure only the last n commits are being rolled back @@ -880,13 +880,13 @@ private List doRollbackAndGetStats(final HoodieInstant insta List stats = table.rollback(jsc, instantToRollback, true); - LOG.info("Deleted inflight commits {}", commitToRollback); + LOG.info("Deleted inflight commits " + commitToRollback); // cleanup index entries if (!index.rollbackCommit(commitToRollback)) { throw new HoodieRollbackException("Rollback index changes failed, for time :" + commitToRollback); } - LOG.info("Index rolled back for commits {}", commitToRollback); + LOG.info("Index rolled back for commits " + commitToRollback); return stats; } @@ -907,7 +907,7 @@ private void finishRollback(final Timer.Context context, List { @@ -1047,7 +1047,7 @@ private void startCommit(String instantTime) { */ public Option scheduleCompaction(Option> extraMetadata) throws IOException { String instantTime = HoodieActiveTimeline.createNewInstantTime(); - LOG.info("Generate a new instant time {}", instantTime); + LOG.info("Generate a new instant time " + instantTime); boolean notEmpty = scheduleCompactionAtInstant(instantTime, extraMetadata); return notEmpty ? Option.of(instantTime) : Option.empty(); } @@ -1291,9 +1291,9 @@ protected void commitCompaction(JavaRDD compactedStatuses, HoodieTa + config.getBasePath() + " at time " + compactionCommitTime, e); } } - LOG.info("Compacted successfully on commit {}", compactionCommitTime); + LOG.info("Compacted successfully on commit " + compactionCommitTime); } else { - LOG.info("Compaction did not run for commit {}", compactionCommitTime); + LOG.info("Compaction did not run for commit " + compactionCommitTime); } } @@ -1304,7 +1304,7 @@ private void finalizeWrite(HoodieTable table, String instantTime, List durationInMs = Option.of(metrics.getDurationInMs(finalizeCtx.stop())); durationInMs.ifPresent(duration -> { - LOG.info("Finalize write elapsed time (milliseconds): {}", duration); + LOG.info("Finalize write elapsed time (milliseconds): " + duration); metrics.updateFinalizeWriteMetrics(duration, stats.size()); }); } @@ -1344,7 +1344,7 @@ private HoodieCommitMetadata doCompactionCommit(HoodieTable table, JavaRDD extends HoodieIndex { private static final byte[] PARTITION_PATH_COLUMN = Bytes.toBytes("partition_path"); private static final int SLEEP_TIME_MILLISECONDS = 100; - private static final Logger LOG = LoggerFactory.getLogger(HBaseIndex.class); + private static final Logger LOG = LogManager.getLogger(HBaseIndex.class); private static Connection hbaseConnection = null; private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null; private float qpsFraction; @@ -115,7 +115,7 @@ private void init(HoodieWriteConfig config) { @VisibleForTesting public HBaseIndexQPSResourceAllocator createQPSResourceAllocator(HoodieWriteConfig config) { try { - LOG.info("createQPSResourceAllocator : {}", config.getHBaseQPSResourceAllocatorClass()); + LOG.info("createQPSResourceAllocator :" + config.getHBaseQPSResourceAllocatorClass()); return (HBaseIndexQPSResourceAllocator) ReflectionUtils .loadClass(config.getHBaseQPSResourceAllocatorClass(), config); } catch (Exception e) { @@ -320,7 +320,7 @@ private Function2, Iterator> updateL doPutsAndDeletes(hTable, puts, deletes); } catch (Exception e) { Exception we = new Exception("Error updating index for " + writeStatus, e); - LOG.error("Error updating index for {}", writeStatus, e); + LOG.error(we); writeStatus.setGlobalError(we); } writeStatusList.add(writeStatus); @@ -370,7 +370,7 @@ public JavaRDD updateLocation(JavaRDD writeStatusRDD, HoodieTable hoodieTable) { final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config); setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc); - LOG.info("multiPutBatchSize: before HBase puts {}", multiPutBatchSize); + LOG.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize); JavaRDD writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true); // caching the index updated status RDD writeStatusJavaRDD = writeStatusJavaRDD.persist(config.getWriteStatusStorageLevel()); @@ -398,15 +398,15 @@ private void setPutBatchSize(JavaRDD writeStatusRDD, this.numRegionServersForTable = getNumRegionServersAliveForTable(); final float desiredQPSFraction = hBaseIndexQPSResourceAllocator.calculateQPSFractionForPutsTime(numPuts, this.numRegionServersForTable); - LOG.info("Desired QPSFraction : {}", desiredQPSFraction); - LOG.info("Number HBase puts : {}", numPuts); - LOG.info("HBase Puts Parallelism : {}", hbasePutsParallelism); + LOG.info("Desired QPSFraction :" + desiredQPSFraction); + LOG.info("Number HBase puts :" + numPuts); + LOG.info("Hbase Puts Parallelism :" + hbasePutsParallelism); final float availableQpsFraction = hBaseIndexQPSResourceAllocator.acquireQPSResources(desiredQPSFraction, numPuts); LOG.info("Allocated QPS Fraction :" + availableQpsFraction); multiPutBatchSize = putBatchSizeCalculator.getBatchSize(numRegionServersForTable, maxQpsPerRegionServer, hbasePutsParallelism, maxExecutors, SLEEP_TIME_MILLISECONDS, availableQpsFraction); - LOG.info("multiPutBatchSize : {}", multiPutBatchSize); + LOG.info("multiPutBatchSize :" + multiPutBatchSize); } } @@ -420,7 +420,7 @@ public Tuple2 getHBasePutAccessParallelism(final JavaRDD extends HoodieWriteHandle { - private static final Logger LOG = LoggerFactory.getLogger(HoodieAppendHandle.class); + private static final Logger LOG = LogManager.getLogger(HoodieAppendHandle.class); // This acts as the sequenceID for records written private static AtomicLong recordIndex = new AtomicLong(1); private final String fileId; @@ -123,7 +123,7 @@ private void init(HoodieRecord record) { } else { // This means there is no base data file, start appending to a new log file fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime, this.fileId)); - LOG.info("New InsertHandle for partition : {}", partitionPath); + LOG.info("New InsertHandle for partition :" + partitionPath); } writeStatus.getStat().setPrevCommit(baseInstantTime); writeStatus.setFileId(fileId); @@ -137,7 +137,7 @@ private void init(HoodieRecord record) { ((HoodieDeltaWriteStat) writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion()); ((HoodieDeltaWriteStat) writeStatus.getStat()).setLogOffset(writer.getCurrentSize()); } catch (Exception e) { - LOG.error("Error in update task at commit {}", instantTime, e); + LOG.error("Error in update task at commit " + instantTime, e); writeStatus.setGlobalError(e); throw new HoodieUpsertException("Failed to initialize HoodieAppendHandle for FileId: " + fileId + " on commit " + instantTime + " on HDFS path " + hoodieTable.getMetaClient().getBasePath() + partitionPath, e); @@ -179,7 +179,7 @@ private Option getIndexedRecord(HoodieRecord hoodieRecord) { hoodieRecord.deflate(); return avroRecord; } catch (Exception e) { - LOG.error("Error writing record {}", hoodieRecord, e); + LOG.error("Error writing record " + hoodieRecord, e); writeStatus.markFailure(hoodieRecord, e, recordMetadata); } return Option.empty(); @@ -232,7 +232,7 @@ public void write(HoodieRecord record, Option insertValue) { // Not throwing exception from here, since we don't want to fail the entire job // for a single record writeStatus.markFailure(record, t, recordMetadata); - LOG.error("Error writing record {}", record, t); + LOG.error("Error writing record " + record, t); } } @@ -259,8 +259,8 @@ public WriteStatus close() { runtimeStats.setTotalUpsertTime(timer.endTimer()); stat.setRuntimeStats(runtimeStats); - LOG.info("AppendHandle for partitionPath {} fileID {}, took {} ms.", stat.getPartitionPath(), - stat.getFileId(), runtimeStats.getTotalUpsertTime()); + LOG.info(String.format("AppendHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), + stat.getFileId(), runtimeStats.getTotalUpsertTime())); return writeStatus; } catch (IOException e) { @@ -308,7 +308,7 @@ private void flushToDiskIfRequired(HoodieRecord record) { if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) { // Recompute averageRecordSize before writing a new block and update existing value with // avg of new and old - LOG.info("AvgRecordSize => {}", averageRecordSize); + LOG.info("AvgRecordSize => " + averageRecordSize); averageRecordSize = (averageRecordSize + SizeEstimator.estimate(record)) / 2; doAppend(header); estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords; diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java index d75df4b8edb9e..9c319c881b288 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCleanHelper.java @@ -40,8 +40,8 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.IOException; import java.io.Serializable; @@ -62,7 +62,7 @@ */ public class HoodieCleanHelper> implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(HoodieCleanHelper.class); + private static final Logger LOG = LogManager.getLogger(HoodieCleanHelper.class); private final SyncableFileSystemView fileSystemView; private final HoodieTimeline commitTimeline; @@ -101,7 +101,8 @@ public List getPartitionPathsToClean(Option newInstantToR if ((cleanMetadata.getEarliestCommitToRetain() != null) && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) { LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed " - + "since last cleaned at {}. New Instant to retain : {}", cleanMetadata.getEarliestCommitToRetain(), newInstantToRetain); + + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain() + + ". New Instant to retain : " + newInstantToRetain); return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(), HoodieTimeline.GREATER_OR_EQUAL) && HoodieTimeline.compareTimestamps(instant.getTimestamp(), newInstantToRetain.get().getTimestamp(), HoodieTimeline.LESSER)).flatMap(instant -> { @@ -126,7 +127,8 @@ public List getPartitionPathsToClean(Option newInstantToR * single file (i.e run it with versionsRetained = 1) */ private List getFilesToCleanKeepingLatestVersions(String partitionPath) { - LOG.info("Cleaning {}, retaining latest {} file versions. ", partitionPath, config.getCleanerFileVersionsRetained()); + LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained() + + " file versions. "); List fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); List deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints @@ -185,7 +187,7 @@ private List getFilesToCleanKeepingLatestVersions(String partitionPath) */ private List getFilesToCleanKeepingLatestCommits(String partitionPath) { int commitsRetained = config.getCleanerCommitsRetained(); - LOG.info("Cleaning {}, retaining latest {} commits. ", partitionPath, commitsRetained); + LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. "); List deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints @@ -272,7 +274,7 @@ public List getDeletePaths(String partitionPath) { } else { throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name()); } - LOG.info("{} patterns used to delete in partition path: {}", deletePaths.size(), partitionPath); + LOG.info(deletePaths.size() + " patterns used to delete in partition path:" + partitionPath); return deletePaths; } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java index 9baad754d6086..bafbc8dfa912d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCommitArchiveLog.java @@ -54,9 +54,9 @@ import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -72,7 +72,7 @@ */ public class HoodieCommitArchiveLog { - private static final Logger LOG = LoggerFactory.getLogger(HoodieCommitArchiveLog.class); + private static final Logger LOG = LogManager.getLogger(HoodieCommitArchiveLog.class); private final Path archiveFilePath; private final HoodieTableMetaClient metaClient; @@ -119,9 +119,9 @@ public boolean archiveIfRequired(final JavaSparkContext jsc) throws IOException boolean success = true; if (!instantsToArchive.isEmpty()) { this.writer = openWriter(); - LOG.info("Archiving instants {}", instantsToArchive); + LOG.info("Archiving instants " + instantsToArchive); archive(instantsToArchive); - LOG.info("Deleting archived instants {}", instantsToArchive); + LOG.info("Deleting archived instants " + instantsToArchive); success = deleteArchivedInstants(instantsToArchive); } else { LOG.info("No Instants to archive"); @@ -188,14 +188,14 @@ private Stream getInstantsToArchive(JavaSparkContext jsc) { } private boolean deleteArchivedInstants(List archivedInstants) throws IOException { - LOG.info("Deleting instants {}", archivedInstants); + LOG.info("Deleting instants " + archivedInstants); boolean success = true; for (HoodieInstant archivedInstant : archivedInstants) { Path commitFile = new Path(metaClient.getMetaPath(), archivedInstant.getFileName()); try { if (metaClient.getFs().exists(commitFile)) { success &= metaClient.getFs().delete(commitFile, false); - LOG.info("Archived and deleted instant file {}", commitFile); + LOG.info("Archived and deleted instant file " + commitFile); } } catch (IOException e) { throw new HoodieIOException("Failed to delete archived instant " + archivedInstant, e); @@ -205,7 +205,7 @@ private boolean deleteArchivedInstants(List archivedInstants) thr // Remove older meta-data from auxiliary path too Option latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION) || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp))); - LOG.info("Latest Committed Instant={}", latestCommitted); + LOG.info("Latest Committed Instant=" + latestCommitted); if (latestCommitted.isPresent()) { success &= deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get()); } @@ -233,7 +233,7 @@ private boolean deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant thre Path metaFile = new Path(metaClient.getMetaAuxiliaryPath(), deleteInstant.getFileName()); if (metaClient.getFs().exists(metaFile)) { success &= metaClient.getFs().delete(metaFile, false); - LOG.info("Deleted instant file in auxiliary metapath : {}", metaFile); + LOG.info("Deleted instant file in auxiliary metapath : " + metaFile); } } return success; @@ -243,7 +243,7 @@ public void archive(List instants) throws HoodieCommitException { try { HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants(); Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema(); - LOG.info("Wrapper schema {}", wrapperSchema.toString()); + LOG.info("Wrapper schema " + wrapperSchema.toString()); List records = new ArrayList<>(); for (HoodieInstant hoodieInstant : instants) { try { @@ -252,7 +252,7 @@ public void archive(List instants) throws HoodieCommitException { writeToFile(wrapperSchema, records); } } catch (Exception e) { - LOG.error("Failed to archive commits, commit file: {}", hoodieInstant.getFileName(), e); + LOG.error("Failed to archive commits, .commit file: " + hoodieInstant.getFileName(), e); if (this.config.isFailOnTimelineArchivingEnabled()) { throw e; } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index bece8810de4cb..095e0a0e35331 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -36,16 +36,16 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.TaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Iterator; public class HoodieCreateHandle extends HoodieWriteHandle { - private static final Logger LOG = LoggerFactory.getLogger(HoodieCreateHandle.class); + private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class); private final HoodieStorageWriter storageWriter; private final Path path; @@ -73,7 +73,7 @@ public HoodieCreateHandle(HoodieWriteConfig config, String commitTime, HoodieTab } catch (IOException e) { throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e); } - LOG.info("New CreateHandle for partition : {} with fileId {}", partitionPath, fileId); + LOG.info("New CreateHandle for partition :" + partitionPath + " with fileId " + fileId); } /** @@ -120,7 +120,7 @@ public void write(HoodieRecord record, Option avroRecord) { // Not throwing exception from here, since we don't want to fail the entire job // for a single record writeStatus.markFailure(record, t, recordMetadata); - LOG.error("Error writing record {}", record, t); + LOG.error("Error writing record " + record, t); } } @@ -152,7 +152,8 @@ public WriteStatus getWriteStatus() { */ @Override public WriteStatus close() { - LOG.info("Closing the file {} as we are done with all the records {}", writeStatus.getFileId(), recordsWritten); + LOG + .info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten); try { storageWriter.close(); @@ -174,8 +175,8 @@ public WriteStatus close() { stat.setRuntimeStats(runtimeStats); writeStatus.setStat(stat); - LOG.info("CreateHandle for partitionPath {} fileID {}, took {} ms.", stat.getPartitionPath(), - stat.getFileId(), runtimeStats.getTotalCreateTime()); + LOG.info(String.format("CreateHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), + stat.getFileId(), runtimeStats.getTotalCreateTime())); return writeStatus; } catch (IOException e) { diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java index 45472bceabf14..9f3bdbbdd786f 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java @@ -31,8 +31,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.HashSet; @@ -44,7 +44,7 @@ */ public class HoodieKeyLookupHandle extends HoodieReadHandle { - private static final Logger LOG = LoggerFactory.getLogger(HoodieKeyLookupHandle.class); + private static final Logger LOG = LogManager.getLogger(HoodieKeyLookupHandle.class); private final HoodieTableType tableType; @@ -63,7 +63,7 @@ public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable hoodieTabl HoodieTimer timer = new HoodieTimer().startTimer(); this.bloomFilter = ParquetUtils.readBloomFilterFromParquetMetadata(hoodieTable.getHadoopConf(), new Path(getLatestDataFile().getPath())); - LOG.info("Read bloom filter from {} in {} ms", partitionPathFilePair, timer.endTimer()); + LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFilePair, timer.endTimer())); } /** @@ -82,7 +82,7 @@ public static List checkCandidatesAgainstFile(Configuration configuratio LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath, timer.endTimer(), candidateRecordKeys.size(), foundRecordKeys.size())); if (LOG.isDebugEnabled()) { - LOG.debug("Keys matching for file {} => {}", filePath, foundRecordKeys); + LOG.debug("Keys matching for file " + filePath + " => " + foundRecordKeys); } } } catch (Exception e) { @@ -98,7 +98,7 @@ public void addKey(String recordKey) { // check record key against bloom filter of current file & add to possible keys if needed if (bloomFilter.mightContain(recordKey)) { if (LOG.isDebugEnabled()) { - LOG.debug("Record key {} matches bloom filter in {}", recordKey, partitionPathFilePair); + LOG.debug("Record key " + recordKey + " matches bloom filter in " + partitionPathFilePair); } candidateRecordKeys.add(recordKey); } @@ -110,14 +110,15 @@ public void addKey(String recordKey) { */ public KeyLookupResult getLookupResult() { if (LOG.isDebugEnabled()) { - LOG.debug("#The candidate row keys for {} => {}", partitionPathFilePair, candidateRecordKeys); + LOG.debug("#The candidate row keys for " + partitionPathFilePair + " => " + candidateRecordKeys); } HoodieDataFile dataFile = getLatestDataFile(); List matchingKeys = checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath())); - LOG.info("Total records ({}), bloom filter candidates ({})/fp({}), actual matches ({})", totalKeysChecked, - candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size()); + LOG.info( + String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", totalKeysChecked, + candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size())); return new KeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(), dataFile.getCommitTime(), matchingKeys); } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 0c801e7823ab4..518b883340043 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -43,9 +43,9 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.TaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashSet; @@ -56,7 +56,7 @@ @SuppressWarnings("Duplicates") public class HoodieMergeHandle extends HoodieWriteHandle { - private static final Logger LOG = LoggerFactory.getLogger(HoodieMergeHandle.class); + private static final Logger LOG = LogManager.getLogger(HoodieMergeHandle.class); private Map> keyToNewRecords; private Set writtenRecordKeys; @@ -137,7 +137,7 @@ public void write(HoodieRecord record, Option avroRecord, Option< if (exception.isPresent() && exception.get() instanceof Throwable) { // Not throwing exception from here, since we don't want to fail the entire job for a single record writeStatus.markFailure(record, exception.get(), recordMetadata); - LOG.error("Error writing record {}", record, exception.get()); + LOG.error("Error writing record " + record, exception.get()); } else { write(record, avroRecord); } @@ -155,7 +155,7 @@ protected GenericRecord rewriteRecord(GenericRecord record) { * Extract old file path, initialize StorageWriter and WriteStatus. */ private void init(String fileId, String partitionPath, HoodieDataFile dataFileToBeMerged) { - LOG.info("partitionPath: {}, fileId to be merged: {}", partitionPath, fileId); + LOG.info("partitionPath:" + partitionPath + ", fileId to be merged:" + fileId); this.writtenRecordKeys = new HashSet<>(); writeStatus.setStat(new HoodieWriteStat()); try { @@ -171,7 +171,8 @@ private void init(String fileId, String partitionPath, HoodieDataFile dataFileTo + FSUtils.makeDataFileName(instantTime, writeToken, fileId)).toString(); newFilePath = new Path(config.getBasePath(), relativePath); - LOG.info("Merging new data into oldPath {}, as newPath {}", oldFilePath.toString(), newFilePath.toString()); + LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(), + newFilePath.toString())); // file name is same for all records, in this bunch writeStatus.setFileId(fileId); writeStatus.setPartitionPath(partitionPath); @@ -186,7 +187,7 @@ private void init(String fileId, String partitionPath, HoodieDataFile dataFileTo storageWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime, newFilePath, hoodieTable, config, writerSchema); } catch (IOException io) { - LOG.error("Error in update task at commit {}", instantTime, io); + LOG.error("Error in update task at commit " + instantTime, io); writeStatus.setGlobalError(io); throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle for FileId: " + fileId + " on commit " + instantTime + " on path " + hoodieTable.getMetaClient().getBasePath(), io); @@ -200,7 +201,7 @@ private String init(String fileId, Iterator> newRecordsItr) { try { // Load the new records in a map long memoryForMerge = config.getMaxMemoryPerPartitionMerge(); - LOG.info("MaxMemoryPerPartitionMerge => {}", memoryForMerge); + LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema)); } catch (IOException io) { @@ -217,10 +218,12 @@ private String init(String fileId, Iterator> newRecordsItr) { // NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist keyToNewRecords.put(record.getRecordKey(), record); } - LOG.info("Number of entries in MemoryBasedMap => {}. Total size in bytes of MemoryBasedMap => {}. " - + "Number of entries in DiskBasedMap => {}. Size of file spilled to disk => {}", - ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries(), ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize(), - ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries(), ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes()); + LOG.info("Number of entries in MemoryBasedMap => " + + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries() + + "Total size in bytes of MemoryBasedMap => " + + ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize() + "Number of entries in DiskBasedMap => " + + ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries() + "Size of file spilled to disk => " + + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes()); return partitionPath; } @@ -250,7 +253,7 @@ private boolean writeRecord(HoodieRecord hoodieRecord, Option hoodieRecord.deflate(); return true; } catch (Exception e) { - LOG.error("Error writing record {}", hoodieRecord, e); + LOG.error("Error writing record " + hoodieRecord, e); writeStatus.markFailure(hoodieRecord, e, recordMetadata); } return false; @@ -292,12 +295,12 @@ public void write(GenericRecord oldRecord) { try { storageWriter.writeAvro(key, oldRecord); } catch (ClassCastException e) { - LOG.error("Schema mismatch when rewriting old record {} from file {} to file {} with writerSchema {}", - oldRecord, getOldFilePath(), newFilePath, writerSchema.toString(true)); + LOG.error("Schema mismatch when rewriting old record " + oldRecord + " from file " + getOldFilePath() + + " to file " + newFilePath + " with writerSchema " + writerSchema.toString(true)); throw new HoodieUpsertException(errMsg, e); } catch (IOException e) { - LOG.error("Failed to merge old record into new file for key {} from old file {} to new file {}", - key, getOldFilePath(), newFilePath, e); + LOG.error("Failed to merge old record into new file for key " + key + " from old file " + getOldFilePath() + + " to new file " + newFilePath, e); throw new HoodieUpsertException(errMsg, e); } recordsWritten++; @@ -342,8 +345,6 @@ public WriteStatus close() { LOG.info(String.format("MergeHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), stat.getFileId(), runtimeStats.getTotalUpsertTime())); - LOG.info("MergeHandle for partitionPath {} fileID {}, took {} ms.", stat.getPartitionPath(), - stat.getFileId(), runtimeStats.getTotalUpsertTime()); return writeStatus; } catch (IOException e) { diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 50256bcf3853a..7a1939a4747dc 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -36,9 +36,9 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.TaskContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; @@ -47,7 +47,7 @@ */ public abstract class HoodieWriteHandle extends HoodieIOHandle { - private static final Logger LOG = LoggerFactory.getLogger(HoodieWriteHandle.class); + private static final Logger LOG = LogManager.getLogger(HoodieWriteHandle.class); protected final Schema originalSchema; protected final Schema writerSchema; protected HoodieTimer timer; @@ -97,7 +97,7 @@ public Path makeNewPath(String partitionPath) { protected void createMarkerFile(String partitionPath) { Path markerPath = makeNewMarkerPath(partitionPath); try { - LOG.info("Creating Marker Path={}", markerPath); + LOG.info("Creating Marker Path=" + markerPath); fs.create(markerPath, false).close(); } catch (IOException e) { throw new HoodieException("Failed to create marker file " + markerPath, e); @@ -147,7 +147,7 @@ public void write(HoodieRecord record, Option avroRecord, Option< if (exception.isPresent() && exception.get() instanceof Throwable) { // Not throwing exception from here, since we don't want to fail the entire job for a single record writeStatus.markFailure(record, exception.get(), recordMetadata); - LOG.error("Error writing record {}", record, exception.get()); + LOG.error("Error writing record " + record, exception.get()); } else { write(record, avroRecord); } diff --git a/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java b/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java index 1d02d4ea319d2..6f976014a853b 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/compact/HoodieRealtimeTableCompactor.java @@ -47,13 +47,13 @@ import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.util.AccumulatorV2; import org.apache.spark.util.LongAccumulator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; @@ -74,7 +74,7 @@ */ public class HoodieRealtimeTableCompactor implements HoodieCompactor { - private static final Logger LOG = LoggerFactory.getLogger(HoodieRealtimeTableCompactor.class); + private static final Logger LOG = LogManager.getLogger(HoodieRealtimeTableCompactor.class); // Accumulator to keep track of total log files for a dataset private AccumulatorV2 totalLogFiles; // Accumulator to keep track of total log file slices for a dataset @@ -92,7 +92,7 @@ public JavaRDD compact(JavaSparkContext jsc, HoodieCompactionPlan c HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); List operations = compactionPlan.getOperations().stream() .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList()); - LOG.info("Compactor compacting {} files", operations); + LOG.info("Compactor compacting " + operations + " files"); return jsc.parallelize(operations, operations.size()) .map(s -> compact(table, metaClient, config, s, compactionInstantTime)).flatMap(List::iterator); @@ -103,8 +103,8 @@ private List compact(HoodieCopyOnWriteTable hoodieCopyOnWriteTable, FileSystem fs = metaClient.getFs(); Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); - LOG.info("Compacting base {} with delta files {} for commit {}", - operation.getDataFileName(), operation.getDeltaFileNames(), commitTime); + LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames() + + " for commit " + commitTime); // TODO - FIX THIS // Reads the entire avro file. Always only specific blocks should be read from the avro file // (failure recover). @@ -115,7 +115,7 @@ private List compact(HoodieCopyOnWriteTable hoodieCopyOnWriteTable, .getActiveTimeline().getTimelineOfActions(Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) .filterCompletedInstants().lastInstant().get().getTimestamp(); - LOG.info("MaxMemoryPerCompaction => {}", config.getMaxMemoryPerCompaction()); + LOG.info("MaxMemoryPerCompaction => " + config.getMaxMemoryPerCompaction()); List logFiles = operation.getDeltaFileNames().stream().map( p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString()) @@ -176,7 +176,7 @@ public HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc, HoodieT // TODO : check if maxMemory is not greater than JVM or spark.executor memory // TODO - rollback any compactions in flight HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); - LOG.info("Compacting {} with commit {}", metaClient.getBasePath(), compactionCommitTime); + LOG.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime); List partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning()); @@ -189,7 +189,7 @@ public HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc, HoodieT } RealtimeView fileSystemView = hoodieTable.getRTFileSystemView(); - LOG.info("Compaction looking for files to compact in {} partitions", partitionPaths); + LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); List operations = jsc.parallelize(partitionPaths, partitionPaths.size()) .flatMap((FlatMapFunction) partitionPath -> fileSystemView .getLatestFileSlices(partitionPath) @@ -206,10 +206,10 @@ public HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc, HoodieT config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles)); }).filter(c -> !c.getDeltaFileNames().isEmpty()).collect(toList()).iterator()) .collect().stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList()); - LOG.info("Total of {} compactions are retrieved", operations.size()); - LOG.info("Total number of latest files slices {}", totalFileSlices.value()); - LOG.info("Total number of log files {}", totalLogFiles.value()); - LOG.info("Total number of file slices {}", totalFileSlices.value()); + LOG.info("Total of " + operations.size() + " compactions are retrieved"); + LOG.info("Total number of latest files slices " + totalFileSlices.value()); + LOG.info("Total number of log files " + totalLogFiles.value()); + LOG.info("Total number of file slices " + totalFileSlices.value()); // Filter the compactions with the passed in filter. This lets us choose most effective // compactions only HoodieCompactionPlan compactionPlan = config.getCompactionStrategy().generateCompactionPlan(config, operations, @@ -221,7 +221,7 @@ public HoodieCompactionPlan generateCompactionPlan(JavaSparkContext jsc, HoodieT + "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactions + ", Selected workload :" + compactionPlan); if (compactionPlan.getOperations().isEmpty()) { - LOG.warn("After filtering, Nothing to compact for {}", metaClient.getBasePath()); + LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); } return compactionPlan; } diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index c0dd905023f1d..b6fcd09e8facf 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -24,15 +24,15 @@ import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; /** * Wrapper for metrics-related operations. */ public class HoodieMetrics { - private static final Logger LOG = LoggerFactory.getLogger(HoodieMetrics.class); + private static final Logger LOG = LogManager.getLogger(HoodieMetrics.class); // Some timers public String rollbackTimerName = null; public String cleanTimerName = null; @@ -155,7 +155,8 @@ public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, Hoo public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) { if (config.isMetricsOn()) { - LOG.info("Sending rollback metrics (duration={}, numFilesDeleted={})", durationInMs, numFilesDeleted); + LOG.info( + String.format("Sending rollback metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted)); Metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs); Metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted); } @@ -163,7 +164,8 @@ public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) { public void updateCleanMetrics(long durationInMs, int numFilesDeleted) { if (config.isMetricsOn()) { - LOG.info("Sending clean metrics (duration={}, numFilesDeleted={})", durationInMs, numFilesDeleted); + LOG.info( + String.format("Sending clean metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted)); Metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs); Metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted); } @@ -171,7 +173,8 @@ public void updateCleanMetrics(long durationInMs, int numFilesDeleted) { public void updateFinalizeWriteMetrics(long durationInMs, long numFilesFinalized) { if (config.isMetricsOn()) { - LOG.info("Sending finalize write metrics (duration={}, numFilesFinalized={})", durationInMs, numFilesFinalized); + LOG.info(String.format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)", durationInMs, + numFilesFinalized)); Metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs); Metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized); } @@ -179,7 +182,7 @@ public void updateFinalizeWriteMetrics(long durationInMs, long numFilesFinalized public void updateIndexMetrics(final String action, final long durationInMs) { if (config.isMetricsOn()) { - LOG.info("Sending index metrics ({}.duration, {})", action, durationInMs); + LOG.info(String.format("Sending index metrics (%s.duration, %d)", action, durationInMs)); Metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java index a3e95fe4463e8..2559a4b4c2f51 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java @@ -22,8 +22,8 @@ import org.apache.hudi.exception.HoodieException; import com.google.common.base.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import javax.management.remote.JMXConnectorServer; import javax.management.remote.JMXConnectorServerFactory; @@ -38,7 +38,7 @@ */ public class JmxMetricsReporter extends MetricsReporter { - private static final Logger LOG = LoggerFactory.getLogger(JmxMetricsReporter.class); + private static final Logger LOG = LogManager.getLogger(JmxMetricsReporter.class); private final JMXConnectorServer connector; public JmxMetricsReporter(HoodieWriteConfig config) { diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java index 4d006840a4586..4b194416a3578 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java @@ -24,8 +24,8 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; import com.google.common.io.Closeables; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.Closeable; @@ -33,7 +33,7 @@ * This is the main class of the metrics system. */ public class Metrics { - private static final Logger LOG = LoggerFactory.getLogger(Metrics.class); + private static final Logger LOG = LogManager.getLogger(Metrics.class); private static volatile boolean initialized = false; private static Metrics metrics = null; diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java index bb33a978af12b..aac6c708f54dc 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java @@ -24,8 +24,8 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.graphite.Graphite; import com.codahale.metrics.graphite.GraphiteReporter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import java.io.Closeable; import java.net.InetSocketAddress; @@ -36,7 +36,7 @@ */ public class MetricsGraphiteReporter extends MetricsReporter { - private static final Logger LOG = LoggerFactory.getLogger(MetricsGraphiteReporter.class); + private static final Logger LOG = LogManager.getLogger(MetricsGraphiteReporter.class); private final MetricRegistry registry; private final GraphiteReporter graphiteReporter; private final HoodieWriteConfig config; diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java index a80c1efbe7b36..b9d433d9457f3 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java @@ -21,15 +21,15 @@ import org.apache.hudi.config.HoodieWriteConfig; import com.codahale.metrics.MetricRegistry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; /** * Factory class for creating MetricsReporter. */ public class MetricsReporterFactory { - private static final Logger LOG = LoggerFactory.getLogger(MetricsReporterFactory.class); + private static final Logger LOG = LogManager.getLogger(MetricsReporterFactory.class); public static MetricsReporter createReporter(HoodieWriteConfig config, MetricRegistry registry) { MetricsReporterType type = config.getMetricsReporterType(); @@ -45,7 +45,7 @@ public static MetricsReporter createReporter(HoodieWriteConfig config, MetricReg reporter = new JmxMetricsReporter(config); break; default: - LOG.error("Reporter type[{}] is not supported.", type); + LOG.error("Reporter type[" + type + "] is not supported."); break; } return reporter; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index 1ccf026d0caf5..f1f277bad7811 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -58,6 +58,8 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.hadoop.ParquetReader; @@ -79,8 +81,6 @@ import java.util.Set; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.Tuple2; /** @@ -92,7 +92,7 @@ */ public class HoodieCopyOnWriteTable extends HoodieTable { - private static final Logger LOG = LoggerFactory.getLogger(HoodieCopyOnWriteTable.class); + private static final Logger LOG = LogManager.getLogger(HoodieCopyOnWriteTable.class); public HoodieCopyOnWriteTable(HoodieWriteConfig config, JavaSparkContext jsc) { super(config, jsc); @@ -130,7 +130,7 @@ private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathSt try { boolean deleteResult = fs.delete(deletePath, false); if (deleteResult) { - LOG.debug("Cleaned file at path : {}", deletePath); + LOG.debug("Cleaned file at path :" + deletePath); } return deleteResult; } catch (FileNotFoundException fio) { @@ -172,7 +172,7 @@ public Iterator> handleUpdate(String commitTime, String fileId throws IOException { // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records if (!recordItr.hasNext()) { - LOG.info("Empty partition with fileId => {}", fileId); + LOG.info("Empty partition with fileId => " + fileId); return Collections.singletonList((List) Collections.EMPTY_LIST).iterator(); } // these are updates @@ -212,8 +212,8 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle ups // TODO(vc): This needs to be revisited if (upsertHandle.getWriteStatus().getPartitionPath() == null) { - LOG.info("Upsert Handle has partition path as null {}, {}", upsertHandle.getOldFilePath(), - upsertHandle.getWriteStatus()); + LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " + + upsertHandle.getWriteStatus()); } return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator(); } @@ -291,7 +291,8 @@ public HoodieCleanerPlan scheduleClean(JavaSparkContext jsc) { LOG.info("Nothing to clean here. It is already clean"); return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build(); } - LOG.info("Total Partitions to clean : {}, with policy {}", partitionsToClean.size(), config.getCleanerPolicy()); + LOG.info( + "Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy()); int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); LOG.info("Using cleanerParallelism: " + cleanerParallelism); @@ -317,7 +318,7 @@ public List clean(JavaSparkContext jsc, HoodieInstant cleanInst int cleanerParallelism = Math.min( (int) (cleanerPlan.getFilesToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()), config.getCleanerParallelism()); - LOG.info("Using cleanerParallelism: {}", cleanerParallelism); + LOG.info("Using cleanerParallelism: " + cleanerParallelism); List> partitionCleanStats = jsc .parallelize(cleanerPlan.getFilesToBeDeletedPerPartition().entrySet().stream() .flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), y))) @@ -353,7 +354,7 @@ public List rollback(JavaSparkContext jsc, HoodieInstant ins HoodieActiveTimeline activeTimeline = this.getActiveTimeline(); if (instant.isCompleted()) { - LOG.info("Unpublishing instant {}", instant); + LOG.info("Unpublishing instant " + instant); instant = activeTimeline.revertToInflight(instant); } @@ -363,7 +364,7 @@ public List rollback(JavaSparkContext jsc, HoodieInstant ins String commit = instant.getTimestamp(); // delete all the data files for this commit - LOG.info("Clean out all parquet files generated for commit: {}", commit); + LOG.info("Clean out all parquet files generated for commit: " + commit); List rollbackRequests = generateRollbackRequests(instant); //TODO: We need to persist this as rollback workload and use it in case of partial failures @@ -371,7 +372,7 @@ public List rollback(JavaSparkContext jsc, HoodieInstant ins } // Delete Inflight instant if enabled deleteInflightAndRequestedInstant(deleteInstants, activeTimeline, instant); - LOG.info("Time(in ms) taken to finish rollback {}", (System.currentTimeMillis() - startTime)); + LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); return stats; } @@ -397,7 +398,7 @@ protected void deleteInflightAndRequestedInstant(boolean deleteInstant, HoodieAc // Remove the rolled back inflight commits if (deleteInstant) { - LOG.info("Deleting instant={}", instantToBeDeleted); + LOG.info("Deleting instant=" + instantToBeDeleted); activeTimeline.deletePending(instantToBeDeleted); if (instantToBeDeleted.isInflight() && !metaClient.getTimelineLayoutVersion().isNullVersion()) { // Delete corresponding requested instant @@ -405,9 +406,9 @@ protected void deleteInflightAndRequestedInstant(boolean deleteInstant, HoodieAc instantToBeDeleted.getTimestamp()); activeTimeline.deletePending(instantToBeDeleted); } - LOG.info("Deleted pending commit {}", instantToBeDeleted); + LOG.info("Deleted pending commit " + instantToBeDeleted); } else { - LOG.warn("Rollback finished without deleting inflight instant file. Instant={}", instantToBeDeleted); + LOG.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted); } } @@ -575,10 +576,9 @@ class UpsertPartitioner extends Partitioner { assignUpdates(profile); assignInserts(profile); - LOG.info("Total Buckets :{}, buckets info => {}, \n" - + "Partition to insert buckets => {}, \n" - + "UpdateLocations mapped to buckets =>{}", - totalBuckets, bucketInfoMap, partitionPathToInsertBuckets, updateLocationToBucket); + LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n" + + "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n" + + "UpdateLocations mapped to buckets =>" + updateLocationToBucket); } private void assignUpdates(WorkloadProfile profile) { @@ -606,13 +606,13 @@ private void assignInserts(WorkloadProfile profile) { long averageRecordSize = averageBytesPerRecord(metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(), config.getCopyOnWriteRecordSizeEstimate()); - LOG.info("AvgRecordSize => {}", averageRecordSize); + LOG.info("AvgRecordSize => " + averageRecordSize); for (String partitionPath : partitionPaths) { WorkloadStat pStat = profile.getWorkloadStat(partitionPath); if (pStat.getNumInserts() > 0) { List smallFiles = getSmallFiles(partitionPath); - LOG.info("For partitionPath : {} Small Files => {}", partitionPath, smallFiles); + LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); long totalUnassignedInserts = pStat.getNumInserts(); List bucketNumbers = new ArrayList<>(); @@ -627,10 +627,10 @@ private void assignInserts(WorkloadProfile profile) { int bucket; if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) { bucket = updateLocationToBucket.get(smallFile.location.getFileId()); - LOG.info("Assigning {} inserts to existing update bucket {}", recordsToAppend, bucket); + LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket); } else { bucket = addUpdateBucket(smallFile.location.getFileId()); - LOG.info("Assigning {} inserts to new update bucket {}", recordsToAppend, bucket); + LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); } bucketNumbers.add(bucket); recordsPerBucket.add(recordsToAppend); @@ -646,8 +646,8 @@ private void assignInserts(WorkloadProfile profile) { } int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket); - LOG.info("After small file assignment: unassignedInserts => {}, totalInsertBuckets => {}, " - + "recordsPerBucket => {}", totalUnassignedInserts, insertBuckets, insertRecordsPerBucket); + LOG.info("After small file assignment: unassignedInserts => " + totalUnassignedInserts + + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket); for (int b = 0; b < insertBuckets; b++) { bucketNumbers.add(totalBuckets); recordsPerBucket.add(totalUnassignedInserts / insertBuckets); @@ -667,7 +667,7 @@ private void assignInserts(WorkloadProfile profile) { bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts(); insertBuckets.add(bkt); } - LOG.info("Total insert buckets for partition path {} => {}", partitionPath, insertBuckets); + LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets); partitionPathToInsertBuckets.put(partitionPath, insertBuckets); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index 8845772941e13..a654fcbf2bba6 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -44,11 +44,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UncheckedIOException; @@ -77,7 +77,7 @@ */ public class HoodieMergeOnReadTable extends HoodieCopyOnWriteTable { - private static final Logger LOG = LoggerFactory.getLogger(HoodieMergeOnReadTable.class); + private static final Logger LOG = LogManager.getLogger(HoodieMergeOnReadTable.class); // UpsertPartitioner for MergeOnRead table type private MergeOnReadUpsertPartitioner mergeOnReadUpsertPartitioner; @@ -98,10 +98,10 @@ public Partitioner getUpsertPartitioner(WorkloadProfile profile) { @Override public Iterator> handleUpdate(String commitTime, String fileId, Iterator> recordItr) throws IOException { - LOG.info("Merging updates for commit {} for file {}", commitTime, fileId); + LOG.info("Merging updates for commit " + commitTime + " for file " + fileId); if (!index.canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) { - LOG.info("Small file corrections for updates for commit {} for file {}", commitTime, fileId); + LOG.info("Small file corrections for updates for commit " + commitTime + " for file " + fileId); return super.handleUpdate(commitTime, fileId, recordItr); } else { HoodieAppendHandle appendHandle = new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr); @@ -124,7 +124,7 @@ public Iterator> handleInsert(String commitTime, String idPfx, @Override public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String instantTime) { - LOG.info("Checking if compaction needs to be run on {}", config.getBasePath()); + LOG.info("Checking if compaction needs to be run on " + config.getBasePath()); Option lastCompaction = getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); String deltaCommitsSinceTs = "0"; @@ -135,12 +135,13 @@ public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String inst int deltaCommitsSinceLastCompaction = getActiveTimeline().getDeltaCommitTimeline() .findInstantsAfter(deltaCommitsSinceTs, Integer.MAX_VALUE).countInstants(); if (config.getInlineCompactDeltaCommitMax() > deltaCommitsSinceLastCompaction) { - LOG.info("Not running compaction as only {} delta commits was found since last compaction {}. Waiting for {}", - deltaCommitsSinceLastCompaction, deltaCommitsSinceTs, config.getInlineCompactDeltaCommitMax()); + LOG.info("Not running compaction as only " + deltaCommitsSinceLastCompaction + + " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for " + + config.getInlineCompactDeltaCommitMax()); return new HoodieCompactionPlan(); } - LOG.info("Compacting merge on read table {}", config.getBasePath()); + LOG.info("Compacting merge on read table " + config.getBasePath()); HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor(); try { return compactor.generateCompactionPlan(jsc, this, config, instantTime, @@ -170,11 +171,11 @@ public List rollback(JavaSparkContext jsc, HoodieInstant ins long startTime = System.currentTimeMillis(); String commit = instant.getTimestamp(); - LOG.error("Rolling back instant {}", instant); + LOG.error("Rolling back instant " + instant); // Atomically un-publish all non-inflight commits if (instant.isCompleted()) { - LOG.error("Un-publishing instant {}, deleteInstants={}", instant, deleteInstants); + LOG.error("Un-publishing instant " + instant + ", deleteInstants=" + deleteInstants); instant = this.getActiveTimeline().revertToInflight(instant); } @@ -190,7 +191,7 @@ public List rollback(JavaSparkContext jsc, HoodieInstant ins // For Requested State (like failure during index lookup), there is nothing to do rollback other than // deleting the timeline file if (!instant.isRequested()) { - LOG.info("Unpublished {}", commit); + LOG.info("Unpublished " + commit); List rollbackRequests = generateRollbackRequests(jsc, instant); // TODO: We need to persist this as rollback workload and use it in case of partial failures allRollbackStats = new RollbackExecutor(metaClient, config).performRollback(jsc, instant, rollbackRequests); @@ -199,7 +200,7 @@ public List rollback(JavaSparkContext jsc, HoodieInstant ins // Delete Inflight instants if enabled deleteInflightAndRequestedInstant(deleteInstants, this.getActiveTimeline(), instant); - LOG.info("Time(in ms) taken to finish rollback {}", (System.currentTimeMillis() - startTime)); + LOG.info("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); return allRollbackStats; } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 5bb0ffa70d765..d2f5715644946 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -52,11 +52,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -73,7 +73,7 @@ */ public abstract class HoodieTable implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(HoodieTable.class); + private static final Logger LOG = LogManager.getLogger(HoodieTable.class); protected final HoodieWriteConfig config; protected final HoodieTableMetaClient metaClient; @@ -324,7 +324,7 @@ protected void deleteMarkerDir(String instantTs) { Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs)); if (fs.exists(markerDir)) { // For append only case, we do not write to marker dir. Hence, the above check - LOG.info("Removing marker directory={}", markerDir); + LOG.info("Removing marker directory=" + markerDir); fs.delete(markerDir, true); } } catch (IOException ioe) { @@ -363,7 +363,7 @@ protected void cleanFailedWrites(JavaSparkContext jsc, String instantTs, List deleteCleanedFiles(HoodieTableMetaClient metaCl */ private Map deleteCleanedFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, Map results, String commit, String partitionPath) throws IOException { - LOG.info("Cleaning path {}", partitionPath); + LOG.info("Cleaning path " + partitionPath); FileSystem fs = metaClient.getFs(); PathFilter filter = (path) -> { if (path.toString().contains(".parquet")) { @@ -208,7 +208,7 @@ private Map deleteCleanedFiles(HoodieTableMetaClient metaCl for (FileStatus file : toBeDeleted) { boolean success = fs.delete(file.getPath(), false); results.put(file, success); - LOG.info("Delete file {} \t {}", file.getPath(), success); + LOG.info("Delete file " + file.getPath() + "\t" + success); } return results; }