Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@


/**
* Datanode container Database.
* Handles creation and interaction with the database.
* Provides methods for table creation, log data insertion, and index setup.
*/

public class ContainerDatanodeDatabase {

private static Map<String, String> queries;
public static final String CONTAINER_KEY_DELIMITER = "#";

static {
loadProperties();
Expand Down Expand Up @@ -117,25 +117,29 @@ private void createContainerLogTable() throws SQLException {
}
}

public void insertContainerDatanodeData(String key, List<DatanodeContainerInfo> transitionList) throws SQLException {
String[] parts = key.split(CONTAINER_KEY_DELIMITER);
if (parts.length != 2) {
System.err.println("Invalid key format: " + key);
return;
}

long containerId = Long.parseLong(parts[0]);
long datanodeId = Long.parseLong(parts[1]);
/**
* Inserts a list of container log entries into the DatanodeContainerLogTable.
*
* @param transitionList List of container log entries to insert into the table.
*/

public synchronized void insertContainerDatanodeData(List<DatanodeContainerInfo> transitionList) throws SQLException {

String insertSQL = queries.get("INSERT_DATANODE_CONTAINER_LOG");

long containerId = 0;
String datanodeId = null;

try (Connection connection = getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(insertSQL)) {

int count = 0;

for (DatanodeContainerInfo info : transitionList) {
preparedStatement.setLong(1, datanodeId);
datanodeId = info.getDatanodeId();
containerId = info.getContainerId();

preparedStatement.setString(1, datanodeId);
preparedStatement.setLong(2, containerId);
preparedStatement.setString(3, info.getTimestamp());
preparedStatement.setString(4, info.getState());
Expand Down Expand Up @@ -170,6 +174,11 @@ private void createDatanodeContainerIndex(Statement stmt) throws SQLException {
stmt.execute(createIndexSQL);
}

/**
* Extracts the latest container log data from the DatanodeContainerLogTable
* and inserts it into ContainerLogTable.
*/

public void insertLatestContainerLogData() throws SQLException {
createContainerLogTable();
String selectSQL = queries.get("SELECT_LATEST_CONTAINER_LOG");
Expand All @@ -183,12 +192,12 @@ public void insertLatestContainerLogData() throws SQLException {
int count = 0;

while (resultSet.next()) {
long datanodeId = resultSet.getLong("datanode_id");
String datanodeId = resultSet.getString("datanode_id");
long containerId = resultSet.getLong("container_id");
String containerState = resultSet.getString("container_state");
long bcsid = resultSet.getLong("bcsid");
try {
insertStmt.setLong(1, datanodeId);
insertStmt.setString(1, datanodeId);
insertStmt.setLong(2, containerId);
insertStmt.setString(3, containerState);
insertStmt.setLong(4, bcsid);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.ozone.containerlog.parser;

import java.io.BufferedReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Parses container log files and stores container details into a database.
* Uses multithreading to process multiple log files concurrently.
*/

public class ContainerLogFileParser {

private ExecutorService executorService;
private static final int MAX_OBJ_IN_LIST = 5000;

private static final String LOG_FILE_MARKER = ".log.";
private static final String LOG_LINE_SPLIT_REGEX = " \\| ";
private static final String KEY_VALUE_SPLIT_REGEX = "=";
private static final String KEY_ID = "ID";
private static final String KEY_BCSID = "BCSID";
private static final String KEY_STATE = "State";
private static final String KEY_INDEX = "Index";
private final AtomicBoolean hasErrorOccurred = new AtomicBoolean(false);

/**
* Scans the specified log directory, processes each file in a separate thread.
* Expects each log filename to follow the format: dn-container-<roll over number>.log.<datanodeId>
*
* @param logDirectoryPath Path to the directory containing container log files.
* @param dbstore Database object used to persist parsed container data.
* @param threadCount Number of threads to use for parallel processing.
*/

public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase dbstore, int threadCount)
throws SQLException {
try (Stream<Path> paths = Files.walk(Paths.get(logDirectoryPath))) {

List<Path> files = paths.filter(Files::isRegularFile).collect(Collectors.toList());

executorService = Executors.newFixedThreadPool(threadCount);

CountDownLatch latch = new CountDownLatch(files.size());
for (Path file : files) {
Path fileNamePath = file.getFileName();
String fileName = (fileNamePath != null) ? fileNamePath.toString() : "";

int pos = fileName.indexOf(LOG_FILE_MARKER);
if (pos == -1) {
System.out.println("Filename format is incorrect (missing .log.): " + fileName);
continue;
}

String datanodeId = fileName.substring(pos + 5);

if (datanodeId.isEmpty()) {
System.out.println("Filename format is incorrect, datanodeId is missing or empty: " + fileName);
continue;
}

executorService.submit(() -> {

String threadName = Thread.currentThread().getName();
try {
System.out.println(threadName + " is starting to process file: " + file.toString());
processFile(file.toString(), dbstore, datanodeId);
} catch (Exception e) {
System.err.println("Thread " + threadName + " is stopping to process the file: " + file.toString() +
" due to SQLException: " + e.getMessage());
hasErrorOccurred.set(true);
} finally {
try {
latch.countDown();
System.out.println(threadName + " finished processing file: " + file.toString() +
", Latch count after countdown: " + latch.getCount());
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
latch.await();

executorService.shutdown();

if (hasErrorOccurred.get()) {
throw new SQLException("Log file processing failed.");
}

} catch (IOException | InterruptedException e) {
e.printStackTrace();
} catch (NumberFormatException e) {
System.err.println("Invalid datanode ID");
}
}

/**
* Processes a single container log file and extracts container details.
* Parses, batches, and writes valid container log entries into the database.
*
* @param logFilePath Path to the log file.
* @param dbstore Database object used to persist parsed container data.
* @param datanodeId Datanode ID derived from the log filename.
*/

private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, String datanodeId)
throws SQLException {
List<DatanodeContainerInfo> batchList = new ArrayList<>(MAX_OBJ_IN_LIST + 100);

try (BufferedReader reader = Files.newBufferedReader(Paths.get(logFilePath), StandardCharsets.UTF_8)) {
String line;
while ((line = reader.readLine()) != null) {
String[] parts = line.split(LOG_LINE_SPLIT_REGEX);
String timestamp = parts[0].trim();
String logLevel = parts[1].trim();
String id = null, index = null;
String errorMessage = "No error";

DatanodeContainerInfo.Builder builder = new DatanodeContainerInfo.Builder()
.setDatanodeId(datanodeId)
.setTimestamp(timestamp)
.setLogLevel(logLevel);

for (int i = 2; i < parts.length; i++) {
String part = parts[i].trim();

if (part.contains(KEY_VALUE_SPLIT_REGEX)) {
String[] keyValue = part.split(KEY_VALUE_SPLIT_REGEX, 2);
if (keyValue.length == 2) {
String key = keyValue[0].trim();
String value = keyValue[1].trim();

switch (key) {
case KEY_ID:
id = value;
builder.setContainerId(Long.parseLong(value));
break;
case KEY_BCSID:
builder.setBcsid(Long.parseLong(value));
break;
case KEY_STATE:
builder.setState(value.replace("|", "").trim());
break;
case KEY_INDEX:
index = value;
builder.setIndexValue(Integer.parseInt(value));
break;
default:
break;
}
}
} else {
if (!part.isEmpty()) {
errorMessage = part.replace("|", "").trim();
}
}
}
builder.setErrorMessage(errorMessage);

if (index == null || !index.equals("0")) {
continue; //Currently only ratis replicated containers are considered.
}

if (id != null) {
try {
batchList.add(builder.build());

if (batchList.size() >= MAX_OBJ_IN_LIST) {
dbstore.insertContainerDatanodeData(batchList);
batchList.clear();
}
} catch (SQLException e) {
throw new SQLException(e.getMessage());
} catch (Exception e) {
System.err.println(
"Error processing the batch for container: " + id + " at datanode: " + datanodeId);
e.printStackTrace();
}
} else {
System.err.println("Log line does not have all required fields: " + line);
}
}
if (!batchList.isEmpty()) {
dbstore.insertContainerDatanodeData(batchList);
batchList.clear();
}

} catch (IOException e) {
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private DBConsts() {
public static final String DATABASE_NAME = "container_datanode.db";
public static final String PROPS_FILE = "container-log-db-queries.properties";
public static final int CACHE_SIZE = 1000000;
public static final int BATCH_SIZE = 1000;
public static final int BATCH_SIZE = 2500;
public static final String DATANODE_CONTAINER_LOG_TABLE_NAME = "DatanodeContainerLogTable";
public static final String CONTAINER_LOG_TABLE_NAME = "ContainerLogTable";

Expand Down
Loading