Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, all the classes under the containerlog.parser are util methods used for the debug.logs.container classes. It doesn't make sense to keep containerlog.parser in another package unrelated to debug.logs.container. (As each package here seems to be corresponding to a parent command).

I suggest moving it under the utils package in tools (like mentioned in a below comment for DBConsts). Or we can also keep it under the debug.logs.container by calling it a utils package instead of containerlog.parser

Original file line number Diff line number Diff line change
Expand Up @@ -17,58 +17,37 @@

package org.apache.hadoop.ozone.containerlog.parser;

import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sqlite.SQLiteConfig;

/**
* Handles creation and interaction with the database.
* Provides methods for table creation, log data insertion, and index setup.
*/
public class ContainerDatanodeDatabase {

private static Map<String, String> queries;

static {
loadProperties();
}

private static final Logger LOG =
LoggerFactory.getLogger(ContainerDatanodeDatabase.class);

private static void loadProperties() {
Properties props = new Properties();
try (InputStream inputStream = ContainerDatanodeDatabase.class.getClassLoader()
.getResourceAsStream(DBConsts.PROPS_FILE)) {

if (inputStream != null) {
props.load(inputStream);
queries = props.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey().toString(),
e -> e.getValue().toString()
));
} else {
throw new FileNotFoundException("Property file '" + DBConsts.PROPS_FILE + "' not found.");
}
} catch (Exception e) {
LOG.error(e.getMessage());

private static String databasePath;

public static void setDatabasePath(String dbPath) {
if (databasePath == null) {
databasePath = dbPath;
}
}

private static Connection getConnection() throws Exception {
if (databasePath == null) {
throw new IllegalStateException("Database path not set");
}

Class.forName(DBConsts.DRIVER);

SQLiteConfig config = new SQLiteConfig();
Expand All @@ -79,38 +58,38 @@ private static Connection getConnection() throws Exception {
config.setSynchronous(SQLiteConfig.SynchronousMode.OFF);
config.setTempStore(SQLiteConfig.TempStore.MEMORY);

return DriverManager.getConnection(DBConsts.CONNECTION_PREFIX + DBConsts.DATABASE_NAME, config.toProperties());
return DriverManager.getConnection(DBConsts.CONNECTION_PREFIX + databasePath, config.toProperties());
}

public void createDatanodeContainerLogTable() throws SQLException {
String createTableSQL = queries.get("CREATE_DATANODE_CONTAINER_LOG_TABLE");
String createTableSQL = DBConsts.CREATE_DATANODE_CONTAINER_LOG_TABLE;
try (Connection connection = getConnection();
Statement dropStmt = connection.createStatement();
Statement createStmt = connection.createStatement()) {
dropTable(DBConsts.DATANODE_CONTAINER_LOG_TABLE_NAME, dropStmt);
createStmt.execute(createTableSQL);
createDatanodeContainerIndex(createStmt);
} catch (SQLException e) {
LOG.error("Error while creating the table: {}", e.getMessage());
System.err.println("Error while creating the table: " + e.getMessage());
throw e;
} catch (Exception e) {
LOG.error(e.getMessage());
System.err.println("Unexpected error: " + e.getMessage());
throw new RuntimeException(e);
}
}

private void createContainerLogTable() throws SQLException {
String createTableSQL = queries.get("CREATE_CONTAINER_LOG_TABLE");
String createTableSQL = DBConsts.CREATE_CONTAINER_LOG_TABLE;
try (Connection connection = getConnection();
Statement dropStmt = connection.createStatement();
Statement createStmt = connection.createStatement()) {
dropTable(DBConsts.CONTAINER_LOG_TABLE_NAME, dropStmt);
createStmt.execute(createTableSQL);
} catch (SQLException e) {
LOG.error("Error while creating the table: {}", e.getMessage());
System.err.println("Error while creating the table: " + e.getMessage());
throw e;
} catch (Exception e) {
LOG.error(e.getMessage());
System.err.println("Unexpected error: " + e.getMessage());
throw new RuntimeException(e);
}
}
Expand All @@ -123,7 +102,7 @@ private void createContainerLogTable() throws SQLException {

public synchronized void insertContainerDatanodeData(List<DatanodeContainerInfo> transitionList) throws SQLException {

String insertSQL = queries.get("INSERT_DATANODE_CONTAINER_LOG");
String insertSQL = DBConsts.INSERT_DATANODE_CONTAINER_LOG;

long containerId = 0;
String datanodeId = null;
Expand Down Expand Up @@ -159,16 +138,16 @@ public synchronized void insertContainerDatanodeData(List<DatanodeContainerInfo>
preparedStatement.executeBatch();
}
} catch (SQLException e) {
LOG.error("Failed to insert container log for container {} on datanode {}", containerId, datanodeId, e);
System.err.println("Failed to insert container log for container " + containerId + " on datanode " + datanodeId);
throw e;
} catch (Exception e) {
LOG.error(e.getMessage());
System.err.println("Unexpected error: " + e.getMessage());
throw new RuntimeException(e);
}
}

private void createDatanodeContainerIndex(Statement stmt) throws SQLException {
String createIndexSQL = queries.get("CREATE_DATANODE_CONTAINER_INDEX");
String createIndexSQL = DBConsts.CREATE_DATANODE_CONTAINER_INDEX;
stmt.execute(createIndexSQL);
}

Expand All @@ -179,8 +158,8 @@ private void createDatanodeContainerIndex(Statement stmt) throws SQLException {

public void insertLatestContainerLogData() throws SQLException {
createContainerLogTable();
String selectSQL = queries.get("SELECT_LATEST_CONTAINER_LOG");
String insertSQL = queries.get("INSERT_CONTAINER_LOG");
String selectSQL = DBConsts.SELECT_LATEST_CONTAINER_LOG;
String insertSQL = DBConsts.INSERT_CONTAINER_LOG;

try (Connection connection = getConnection();
PreparedStatement selectStmt = connection.prepareStatement(selectSQL);
Expand Down Expand Up @@ -208,8 +187,8 @@ public void insertLatestContainerLogData() throws SQLException {
count = 0;
}
} catch (SQLException e) {
LOG.error("Failed to insert container log entry for container {} on datanode {} ",
containerId, datanodeId, e);
System.err.println("Failed to insert container log entry for container " + containerId + " on datanode "
+ datanodeId);
throw e;
}
}
Expand All @@ -218,18 +197,95 @@ public void insertLatestContainerLogData() throws SQLException {
insertStmt.executeBatch();
}
} catch (SQLException e) {
LOG.error("Failed to insert container log entry: {}", e.getMessage());
System.err.println("Failed to insert container log entry: " + e.getMessage());
throw e;
} catch (Exception e) {
LOG.error(e.getMessage());
System.err.println("Unexpected error: " + e.getMessage());
throw new RuntimeException(e);
}
}

private void dropTable(String tableName, Statement stmt) throws SQLException {
String dropTableSQL = queries.get("DROP_TABLE").replace("{table_name}", tableName);
String dropTableSQL = DBConsts.DROP_TABLE.replace("{table_name}", tableName);
stmt.executeUpdate(dropTableSQL);
}

private void createContainerLogIndex(Statement stmt) throws SQLException {
String createIndexSQL = DBConsts.CREATE_INDEX_LATEST_STATE;
stmt.execute(createIndexSQL);
}

/**
* Lists containers filtered by the specified state and writes their details to stdout
* unless redirected to a file explicitly.
* The output includes timestamp, datanode ID, container ID, BCSID, error message, and index value,
* written in a human-readable table format to a file or console.
* Behavior based on the {@code limit} parameter:
* If {@code limit} is provided, only up to the specified number of rows are printed.
* If the number of matching containers exceeds the {@code limit},
* a note is printed indicating more containers exist.
*
* @param state the container state to filter by (e.g., "OPEN", "CLOSED", "QUASI_CLOSED")
* @param limit the maximum number of rows to display; use {@link Integer#MAX_VALUE} to fetch all rows
*/

public void listContainersByState(String state, Integer limit) throws SQLException {
int count = 0;

boolean limitProvided = limit != Integer.MAX_VALUE;

String baseQuery = DBConsts.SELECT_LATEST_CONTAINER_LOGS_BY_STATE;
String finalQuery = limitProvided ? baseQuery + " LIMIT ?" : baseQuery;

try (Connection connection = getConnection();
Statement stmt = connection.createStatement()) {

createContainerLogIndex(stmt);

try (PreparedStatement pstmt = connection.prepareStatement(finalQuery)) {
pstmt.setString(1, state);
if (limitProvided) {
pstmt.setInt(2, limit + 1);
}

try (ResultSet rs = pstmt.executeQuery();
PrintWriter writer = new PrintWriter(new OutputStreamWriter(System.out,
StandardCharsets.UTF_8), true)) {

writer.printf("%-25s | %-35s | %-15s | %-15s | %-40s | %-12s%n",
"Timestamp", "Datanode ID", "Container ID", "BCSID", "Message", "Index Value");
writer.println("-------------------------------------------------------------------------------------" +
"---------------------------------------------------------------------------------------");

while (rs.next()) {
if (limitProvided && count >= limit) {
writer.println("Note: There might be more containers. Use -all option to list all entries");
break;
}
String timestamp = rs.getString("timestamp");
String datanodeId = rs.getString("datanode_id");
long containerId = rs.getLong("container_id");
long latestBcsid = rs.getLong("latest_bcsid");
String errorMessage = rs.getString("error_message");
int indexValue = rs.getInt("index_value");
count++;

writer.printf("%-25s | %-35s | %-15d | %-15d | %-40s | %-12d%n",
timestamp, datanodeId, containerId, latestBcsid, errorMessage, indexValue);
}

if (count == 0) {
writer.printf("No containers found for state: %s%n", state);
} else {
writer.printf("Number of containers listed: %d%n", count);
}
}
}
} catch (SQLException e) {
throw new SQLException("Error while retrieving containers with state " + state);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class ContainerLogFileParser {
*/

public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase dbstore, int threadCount)
throws SQLException {
throws SQLException, IOException, InterruptedException {
try (Stream<Path> paths = Files.walk(Paths.get(logDirectoryPath))) {

List<Path> files = paths.filter(Files::isRegularFile).collect(Collectors.toList());
Expand Down Expand Up @@ -116,10 +116,6 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase
throw new SQLException("Log file processing failed.");
}

} catch (IOException | InterruptedException e) {
e.printStackTrace();
} catch (NumberFormatException e) {
System.err.println("Invalid datanode ID");
}
}

Expand All @@ -133,7 +129,7 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase
*/

private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, String datanodeId)
throws SQLException {
throws SQLException, IOException {
List<DatanodeContainerInfo> batchList = new ArrayList<>(MAX_OBJ_IN_LIST + 100);

try (BufferedReader reader = Files.newBufferedReader(Paths.get(logFilePath), StandardCharsets.UTF_8)) {
Expand Down Expand Up @@ -199,11 +195,7 @@ private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore,
batchList.clear();
}
} catch (SQLException e) {
throw new SQLException(e.getMessage());
} catch (Exception e) {
System.err.println(
"Error processing the batch for container: " + id + " at datanode: " + datanodeId);
e.printStackTrace();
throw e;
}
} else {
System.err.println("Log line does not have all required fields: " + line);
Expand All @@ -214,8 +206,6 @@ private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore,
batchList.clear();
}

} catch (IOException e) {
e.printStackTrace();
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to rename this to SQLDBConstants to avoid confusing it with rocksDB constants.
Also, moving this to hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/utils would make more sense.

Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,49 @@
* Constants used for ContainerDatanodeDatabase.
*/
public final class DBConsts {


public static final String DEFAULT_DB_FILENAME = "container_datanode.db";
public static final String DRIVER = "org.sqlite.JDBC";
public static final String CONNECTION_PREFIX = "jdbc:sqlite:";
public static final String DATABASE_NAME = "container_datanode.db";
public static final String PROPS_FILE = "container-log-db-queries.properties";
public static final int CACHE_SIZE = 1000000;
public static final int BATCH_SIZE = 2500;
public static final String DATANODE_CONTAINER_LOG_TABLE_NAME = "DatanodeContainerLogTable";
public static final String CONTAINER_LOG_TABLE_NAME = "ContainerLogTable";

public static final String CREATE_DATANODE_CONTAINER_LOG_TABLE =
"CREATE TABLE IF NOT EXISTS DatanodeContainerLogTable (datanode_id TEXT NOT NULL, " +
"container_id INTEGER NOT NULL, timestamp TEXT NOT NULL, container_state TEXT, bcsid INTEGER, " +
"error_message TEXT, log_level TEXT NOT NULL," +
" index_value INTEGER);";
public static final String CREATE_CONTAINER_LOG_TABLE =
"CREATE TABLE IF NOT EXISTS ContainerLogTable (datanode_id TEXT NOT NULL, container_id INTEGER NOT NULL," +
" latest_state TEXT, latest_bcsid INTEGER, PRIMARY KEY (datanode_id, container_id));";
public static final String CREATE_DATANODE_CONTAINER_INDEX =
"CREATE INDEX IF NOT EXISTS idx_datanode_container ON DatanodeContainerLogTable (datanode_id," +
" container_id, timestamp);";
public static final String INSERT_DATANODE_CONTAINER_LOG =
"INSERT INTO DatanodeContainerLogTable (datanode_id, container_id, timestamp, container_state, bcsid," +
" error_message, log_level, index_value) VALUES (?, ?, ?, ?, ?, ?, ?, ?);";
public static final String INSERT_CONTAINER_LOG =
"INSERT OR REPLACE INTO ContainerLogTable (datanode_id, container_id, latest_state," +
" latest_bcsid) VALUES (?, ?, ?, ?);";
public static final String SELECT_LATEST_CONTAINER_LOG =
"SELECT a.datanode_id, a.container_id, a.container_state, a.bcsid, a.timestamp FROM DatanodeContainerLogTable" +
" AS a JOIN (SELECT datanode_id, container_id, MAX(timestamp) as timestamp FROM DatanodeContainerLogTable" +
" GROUP BY datanode_id, container_id) as b ON a.datanode_id = b.datanode_id AND " +
"a.container_id = b.container_id AND a.timestamp=b.timestamp;";
public static final String DROP_TABLE = "DROP TABLE IF EXISTS {table_name};";
public static final String CREATE_INDEX_LATEST_STATE =
"CREATE INDEX IF NOT EXISTS idx_container_log_state ON ContainerLogTable(latest_state);";
public static final String SELECT_LATEST_CONTAINER_LOGS_BY_STATE =
"SELECT cl.datanode_id, cl.container_id, cl.latest_state, cl.latest_bcsid, dcl.error_message, dcl.index_value," +
" dcl.timestamp FROM ContainerLogTable cl LEFT JOIN DatanodeContainerLogTable dcl ON" +
" cl.datanode_id = dcl.datanode_id AND cl.container_id = dcl.container_id AND cl.latest_bcsid = dcl.bcsid " +
"AND cl.latest_state = dcl.container_state WHERE cl.latest_state = ? " +
"AND dcl.timestamp = (SELECT MAX(timestamp) FROM DatanodeContainerLogTable sub_dcl " +
"WHERE sub_dcl.datanode_id = cl.datanode_id AND" +
" sub_dcl.container_id = cl.container_id AND sub_dcl.bcsid = cl.latest_bcsid" +
" AND sub_dcl.container_state = cl.latest_state)";

private DBConsts() {
//Never constructed
}
Expand Down
Loading
Loading