Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
*/
public class HiveUtil {

private static final String driverName = "org.apache.hive.jdbc.HiveDriver";
private static final String DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver";

static {
try {
Class.forName(driverName);
Class.forName(DRIVER_NAME);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("Could not find " + driverName + " in classpath. ", e);
throw new IllegalStateException("Could not find " + DRIVER_NAME + " in classpath. ", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
*/
public abstract class AbstractHoodieClient implements Serializable, AutoCloseable {

private static final Logger logger = LogManager.getLogger(AbstractHoodieClient.class);
private static final Logger LOG = LogManager.getLogger(AbstractHoodieClient.class);

protected final transient FileSystem fs;
protected final transient JavaSparkContext jsc;
Expand Down Expand Up @@ -79,7 +79,7 @@ public void close() {
private synchronized void stopEmbeddedServerView(boolean resetViewStorageConfig) {
if (timelineServer.isPresent() && shouldStopTimelineServer) {
// Stop only if owner
logger.info("Stopping Timeline service !!");
LOG.info("Stopping Timeline service !!");
timelineServer.get().stop();
}

Expand All @@ -94,22 +94,22 @@ private synchronized void startEmbeddedServerView() {
if (config.isEmbeddedTimelineServerEnabled()) {
if (!timelineServer.isPresent()) {
// Run Embedded Timeline Server
logger.info("Starting Timeline service !!");
LOG.info("Starting Timeline service !!");
timelineServer = Option.of(new EmbeddedTimelineService(jsc.hadoopConfiguration(), jsc.getConf(),
config.getClientSpecifiedViewStorageConfig()));
try {
timelineServer.get().startServer();
// Allow executor to find this newly instantiated timeline service
config.setViewStorageConfig(timelineServer.get().getRemoteFileSystemViewConfig());
} catch (IOException e) {
logger.warn("Unable to start timeline service. Proceeding as if embedded server is disabled", e);
LOG.warn("Unable to start timeline service. Proceeding as if embedded server is disabled", e);
stopEmbeddedServerView(false);
}
} else {
logger.info("Timeline Server already running. Not restarting the service");
LOG.info("Timeline Server already running. Not restarting the service");
}
} else {
logger.info("Embedded Timeline Server is disabled. Not starting timeline service");
LOG.info("Embedded Timeline Server is disabled. Not starting timeline service");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
*/
public class HoodieReadClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {

private static final Logger logger = LogManager.getLogger(HoodieReadClient.class);
private static final Logger LOG = LogManager.getLogger(HoodieReadClient.class);

/**
* TODO: We need to persist the index type into hoodie.properties and be able to access the index just with a simple
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
*/
public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness implements Serializable {

private static final Logger logger = LoggerFactory.getLogger(HoodieClientTestHarness.class);
private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);

protected transient JavaSparkContext jsc = null;
protected transient SQLContext sqlContext;
Expand Down Expand Up @@ -119,13 +119,13 @@ protected void initSparkContexts() {
*/
protected void cleanupSparkContexts() {
if (sqlContext != null) {
logger.info("Clearing sql context cache of spark-session used in previous test-case");
LOG.info("Clearing sql context cache of spark-session used in previous test-case");
sqlContext.clearCache();
sqlContext = null;
}

if (jsc != null) {
logger.info("Closing spark context used in previous test-case");
LOG.info("Closing spark context used in previous test-case");
jsc.close();
jsc.stop();
jsc = null;
Expand Down Expand Up @@ -157,7 +157,7 @@ protected void initFileSystemWithDefaultConfiguration() {
*/
protected void cleanupFileSystem() throws IOException {
if (fs != null) {
logger.warn("Closing file-system instance used in previous test-run");
LOG.warn("Closing file-system instance used in previous test-run");
fs.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
*/
public class HoodieClientTestUtils {

private static final transient Logger log = LogManager.getLogger(HoodieClientTestUtils.class);
private static final transient Logger LOG = LogManager.getLogger(HoodieClientTestUtils.class);
private static final Random RANDOM = new Random();

public static List<WriteStatus> collectStatuses(Iterator<List<WriteStatus>> statusListItr) {
Expand Down Expand Up @@ -166,7 +166,7 @@ public static Dataset<Row> readCommit(String basePath, SQLContext sqlContext, Ho
try {
HashMap<String, String> paths =
getLatestFileIDsToFullPath(basePath, commitTimeline, Arrays.asList(commitInstant));
log.info("Path :" + paths.values());
LOG.info("Path :" + paths.values());
return sqlContext.read().parquet(paths.values().toArray(new String[paths.size()]))
.filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
*/
public class TestRawTripPayload implements HoodieRecordPayload<TestRawTripPayload> {

private static final transient ObjectMapper mapper = new ObjectMapper();
private static final transient ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private String partitionPath;
private String rowKey;
private byte[] jsonDataCompressed;
Expand All @@ -71,7 +71,7 @@ public TestRawTripPayload(String jsonData, String rowKey, String partitionPath,
public TestRawTripPayload(String jsonData) throws IOException {
this.jsonDataCompressed = compressData(jsonData);
this.dataSize = jsonData.length();
Map<String, Object> jsonRecordMap = mapper.readValue(jsonData, Map.class);
Map<String, Object> jsonRecordMap = OBJECT_MAPPER.readValue(jsonData, Map.class);
this.rowKey = jsonRecordMap.get("_row_key").toString();
this.partitionPath = jsonRecordMap.get("time").toString().split("T")[0].replace("-", "/");
this.isDeleted = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*/
public class MercifulJsonConverter {

private static final Map<Schema.Type, JsonToAvroFieldProcessor> fieldTypeProcessors = getFieldTypeProcessors();
private static final Map<Schema.Type, JsonToAvroFieldProcessor> FIELD_TYPE_PROCESSORS = getFieldTypeProcessors();

private final ObjectMapper mapper;

Expand Down Expand Up @@ -125,7 +125,7 @@ private static Object convertJsonToAvroField(Object value, String name, Schema s
throw new HoodieJsonToAvroConversionException(null, name, schema);
}

JsonToAvroFieldProcessor processor = fieldTypeProcessors.get(schema.getType());
JsonToAvroFieldProcessor processor = FIELD_TYPE_PROCESSORS.get(schema.getType());
if (null != processor) {
return processor.convertToAvro(value, name, schema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
*/
public class HoodieTableConfig implements Serializable {

private static final transient Logger log = LogManager.getLogger(HoodieTableConfig.class);
private static final transient Logger LOG = LogManager.getLogger(HoodieTableConfig.class);

public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
public static final String HOODIE_TABLE_NAME_PROP_NAME = "hoodie.table.name";
Expand All @@ -67,7 +67,7 @@ public class HoodieTableConfig implements Serializable {
public HoodieTableConfig(FileSystem fs, String metaPath) {
Properties props = new Properties();
Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
log.info("Loading dataset properties from " + propertyPath);
LOG.info("Loading dataset properties from " + propertyPath);
try {
try (FSDataInputStream inputStream = fs.open(propertyPath)) {
props.load(inputStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
*/
public class HoodieTableMetaClient implements Serializable {

private static final transient Logger log = LogManager.getLogger(HoodieTableMetaClient.class);
private static final transient Logger LOG = LogManager.getLogger(HoodieTableMetaClient.class);
public static String METAFOLDER_NAME = ".hoodie";
public static String TEMPFOLDER_NAME = METAFOLDER_NAME + File.separator + ".temp";
public static String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + File.separator + ".aux";
Expand Down Expand Up @@ -92,7 +92,7 @@ public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadAc

public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
ConsistencyGuardConfig consistencyGuardConfig) throws DatasetNotFoundException {
log.info("Loading HoodieTableMetaClient from " + basePath);
LOG.info("Loading HoodieTableMetaClient from " + basePath);
this.basePath = basePath;
this.consistencyGuardConfig = consistencyGuardConfig;
this.hadoopConf = new SerializableConfiguration(conf);
Expand All @@ -103,10 +103,10 @@ public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadAc
DatasetNotFoundException.checkValidDataset(fs, basePathDir, metaPathDir);
this.tableConfig = new HoodieTableConfig(fs, metaPath);
this.tableType = tableConfig.getTableType();
log.info("Finished Loading Table of type " + tableType + " from " + basePath);
LOG.info("Finished Loading Table of type " + tableType + " from " + basePath);
this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad;
if (loadActiveTimelineOnLoad) {
log.info("Loading Active commit timeline for " + basePath);
LOG.info("Loading Active commit timeline for " + basePath);
getActiveTimeline();
}
}
Expand Down Expand Up @@ -303,7 +303,7 @@ public static HoodieTableMetaClient initTableType(Configuration hadoopConf, Stri
*/
public static HoodieTableMetaClient initDatasetAndGetMetaClient(Configuration hadoopConf, String basePath,
Properties props) throws IOException {
log.info("Initializing " + basePath + " as hoodie dataset " + basePath);
LOG.info("Initializing " + basePath + " as hoodie dataset " + basePath);
Path basePathDir = new Path(basePath);
final FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
if (!fs.exists(basePathDir)) {
Expand Down Expand Up @@ -340,7 +340,7 @@ public static HoodieTableMetaClient initDatasetAndGetMetaClient(Configuration ha
// We should not use fs.getConf as this might be different from the original configuration
// used to create the fs in unit tests
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
log.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath);
LOG.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath);
return metaClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
*/
public abstract class AbstractHoodieLogRecordScanner {

private static final Logger log = LogManager.getLogger(AbstractHoodieLogRecordScanner.class);
private static final Logger LOG = LogManager.getLogger(AbstractHoodieLogRecordScanner.class);

// Reader schema for the records
protected final Schema readerSchema;
Expand Down Expand Up @@ -131,7 +131,7 @@ public void scan() {
Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
while (logFormatReaderWrapper.hasNext()) {
HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
log.info("Scanning log file " + logFile);
LOG.info("Scanning log file " + logFile);
scannedLogFiles.add(logFile);
totalLogFiles.set(scannedLogFiles.size());
// Use the HoodieLogFileReader to iterate through the blocks in the log file
Expand All @@ -145,7 +145,7 @@ public void scan() {
}
switch (r.getBlockType()) {
case AVRO_DATA_BLOCK:
log.info("Reading a data block from file " + logFile.getPath());
LOG.info("Reading a data block from file " + logFile.getPath());
if (isNewInstantBlock(r) && !readBlocksLazily) {
// If this is an avro data block belonging to a different commit/instant,
// then merge the last blocks and records into the main result
Expand All @@ -155,7 +155,7 @@ public void scan() {
currentInstantLogBlocks.push(r);
break;
case DELETE_BLOCK:
log.info("Reading a delete block from file " + logFile.getPath());
LOG.info("Reading a delete block from file " + logFile.getPath());
if (isNewInstantBlock(r) && !readBlocksLazily) {
// If this is a delete data block belonging to a different commit/instant,
// then merge the last blocks and records into the main result
Expand All @@ -177,7 +177,7 @@ public void scan() {
// written per ingestion batch for a file but in reality we need to rollback (B1 & B2)
// The following code ensures the same rollback block (R1) is used to rollback
// both B1 & B2
log.info("Reading a command block from file " + logFile.getPath());
LOG.info("Reading a command block from file " + logFile.getPath());
// This is a command block - take appropriate action based on the command
HoodieCommandBlock commandBlock = (HoodieCommandBlock) r;
String targetInstantForCommandBlock =
Expand All @@ -196,34 +196,34 @@ public void scan() {
HoodieLogBlock lastBlock = currentInstantLogBlocks.peek();
// handle corrupt blocks separately since they may not have metadata
if (lastBlock.getBlockType() == CORRUPT_BLOCK) {
log.info("Rolling back the last corrupted log block read in " + logFile.getPath());
LOG.info("Rolling back the last corrupted log block read in " + logFile.getPath());
currentInstantLogBlocks.pop();
numBlocksRolledBack++;
} else if (lastBlock.getBlockType() != CORRUPT_BLOCK
&& targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME))) {
// rollback last data block or delete block
log.info("Rolling back the last log block read in " + logFile.getPath());
LOG.info("Rolling back the last log block read in " + logFile.getPath());
currentInstantLogBlocks.pop();
numBlocksRolledBack++;
} else if (!targetInstantForCommandBlock
.contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME))) {
// invalid or extra rollback block
log.warn("TargetInstantTime " + targetInstantForCommandBlock
LOG.warn("TargetInstantTime " + targetInstantForCommandBlock
+ " invalid or extra rollback command block in " + logFile.getPath());
break;
} else {
// this should not happen ideally
log.warn("Unable to apply rollback command block in " + logFile.getPath());
LOG.warn("Unable to apply rollback command block in " + logFile.getPath());
}
}
log.info("Number of applied rollback blocks " + numBlocksRolledBack);
LOG.info("Number of applied rollback blocks " + numBlocksRolledBack);
break;
default:
throw new UnsupportedOperationException("Command type not yet supported.");
}
break;
case CORRUPT_BLOCK:
log.info("Found a corrupt block in " + logFile.getPath());
LOG.info("Found a corrupt block in " + logFile.getPath());
totalCorruptBlocks.incrementAndGet();
// If there is a corrupt block - we will assume that this was the next data block
currentInstantLogBlocks.push(r);
Expand All @@ -234,13 +234,13 @@ public void scan() {
}
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty()) {
log.info("Merging the final data blocks");
LOG.info("Merging the final data blocks");
processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size());
}
// Done
progress = 1.0f;
} catch (Exception e) {
log.error("Got exception when reading log file", e);
LOG.error("Got exception when reading log file", e);
throw new HoodieIOException("IOException when reading log file ");
} finally {
try {
Expand All @@ -249,7 +249,7 @@ public void scan() {
}
} catch (IOException ioe) {
// Eat exception as we do not want to mask the original exception that can happen
log.error("Unable to close log format reader", ioe);
LOG.error("Unable to close log format reader", ioe);
}
}
}
Expand Down Expand Up @@ -297,7 +297,7 @@ private void processAvroDataBlock(HoodieAvroDataBlock dataBlock) throws Exceptio
*/
private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> lastBlocks, int numLogFilesSeen) throws Exception {
while (!lastBlocks.isEmpty()) {
log.info("Number of remaining logblocks to merge " + lastBlocks.size());
LOG.info("Number of remaining logblocks to merge " + lastBlocks.size());
// poll the element at the bottom of the stack since that's the order it was inserted
HoodieLogBlock lastBlock = lastBlocks.pollLast();
switch (lastBlock.getBlockType()) {
Expand All @@ -308,7 +308,7 @@ private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> lastBlocks, int
Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey);
break;
case CORRUPT_BLOCK:
log.warn("Found a corrupt block which was not rolled back");
LOG.warn("Found a corrupt block which was not rolled back");
break;
default:
break;
Expand Down
Loading