diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java index d00d31c66d15..de3ab8439979 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java @@ -35,13 +35,13 @@ /** - * Datanode container Database. + * Handles creation and interaction with the database. + * Provides methods for table creation, log data insertion, and index setup. */ public class ContainerDatanodeDatabase { private static Map queries; - public static final String CONTAINER_KEY_DELIMITER = "#"; static { loadProperties(); @@ -117,25 +117,29 @@ private void createContainerLogTable() throws SQLException { } } - public void insertContainerDatanodeData(String key, List transitionList) throws SQLException { - String[] parts = key.split(CONTAINER_KEY_DELIMITER); - if (parts.length != 2) { - System.err.println("Invalid key format: " + key); - return; - } - - long containerId = Long.parseLong(parts[0]); - long datanodeId = Long.parseLong(parts[1]); + /** + * Inserts a list of container log entries into the DatanodeContainerLogTable. + * + * @param transitionList List of container log entries to insert into the table. + */ + + public synchronized void insertContainerDatanodeData(List transitionList) throws SQLException { String insertSQL = queries.get("INSERT_DATANODE_CONTAINER_LOG"); + long containerId = 0; + String datanodeId = null; + try (Connection connection = getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(insertSQL)) { int count = 0; for (DatanodeContainerInfo info : transitionList) { - preparedStatement.setLong(1, datanodeId); + datanodeId = info.getDatanodeId(); + containerId = info.getContainerId(); + + preparedStatement.setString(1, datanodeId); preparedStatement.setLong(2, containerId); preparedStatement.setString(3, info.getTimestamp()); preparedStatement.setString(4, info.getState()); @@ -170,6 +174,11 @@ private void createDatanodeContainerIndex(Statement stmt) throws SQLException { stmt.execute(createIndexSQL); } + /** + * Extracts the latest container log data from the DatanodeContainerLogTable + * and inserts it into ContainerLogTable. + */ + public void insertLatestContainerLogData() throws SQLException { createContainerLogTable(); String selectSQL = queries.get("SELECT_LATEST_CONTAINER_LOG"); @@ -183,12 +192,12 @@ public void insertLatestContainerLogData() throws SQLException { int count = 0; while (resultSet.next()) { - long datanodeId = resultSet.getLong("datanode_id"); + String datanodeId = resultSet.getString("datanode_id"); long containerId = resultSet.getLong("container_id"); String containerState = resultSet.getString("container_state"); long bcsid = resultSet.getLong("bcsid"); try { - insertStmt.setLong(1, datanodeId); + insertStmt.setString(1, datanodeId); insertStmt.setLong(2, containerId); insertStmt.setString(3, containerState); insertStmt.setLong(4, bcsid); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java new file mode 100644 index 000000000000..bafce798e5a7 --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.containerlog.parser; + +import java.io.BufferedReader; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Parses container log files and stores container details into a database. + * Uses multithreading to process multiple log files concurrently. + */ + +public class ContainerLogFileParser { + + private ExecutorService executorService; + private static final int MAX_OBJ_IN_LIST = 5000; + + private static final String LOG_FILE_MARKER = ".log."; + private static final String LOG_LINE_SPLIT_REGEX = " \\| "; + private static final String KEY_VALUE_SPLIT_REGEX = "="; + private static final String KEY_ID = "ID"; + private static final String KEY_BCSID = "BCSID"; + private static final String KEY_STATE = "State"; + private static final String KEY_INDEX = "Index"; + private final AtomicBoolean hasErrorOccurred = new AtomicBoolean(false); + + /** + * Scans the specified log directory, processes each file in a separate thread. + * Expects each log filename to follow the format: dn-container-.log. + * + * @param logDirectoryPath Path to the directory containing container log files. + * @param dbstore Database object used to persist parsed container data. + * @param threadCount Number of threads to use for parallel processing. + */ + + public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase dbstore, int threadCount) + throws SQLException { + try (Stream paths = Files.walk(Paths.get(logDirectoryPath))) { + + List files = paths.filter(Files::isRegularFile).collect(Collectors.toList()); + + executorService = Executors.newFixedThreadPool(threadCount); + + CountDownLatch latch = new CountDownLatch(files.size()); + for (Path file : files) { + Path fileNamePath = file.getFileName(); + String fileName = (fileNamePath != null) ? fileNamePath.toString() : ""; + + int pos = fileName.indexOf(LOG_FILE_MARKER); + if (pos == -1) { + System.out.println("Filename format is incorrect (missing .log.): " + fileName); + continue; + } + + String datanodeId = fileName.substring(pos + 5); + + if (datanodeId.isEmpty()) { + System.out.println("Filename format is incorrect, datanodeId is missing or empty: " + fileName); + continue; + } + + executorService.submit(() -> { + + String threadName = Thread.currentThread().getName(); + try { + System.out.println(threadName + " is starting to process file: " + file.toString()); + processFile(file.toString(), dbstore, datanodeId); + } catch (Exception e) { + System.err.println("Thread " + threadName + " is stopping to process the file: " + file.toString() + + " due to SQLException: " + e.getMessage()); + hasErrorOccurred.set(true); + } finally { + try { + latch.countDown(); + System.out.println(threadName + " finished processing file: " + file.toString() + + ", Latch count after countdown: " + latch.getCount()); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + latch.await(); + + executorService.shutdown(); + + if (hasErrorOccurred.get()) { + throw new SQLException("Log file processing failed."); + } + + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } catch (NumberFormatException e) { + System.err.println("Invalid datanode ID"); + } + } + + /** + * Processes a single container log file and extracts container details. + * Parses, batches, and writes valid container log entries into the database. + * + * @param logFilePath Path to the log file. + * @param dbstore Database object used to persist parsed container data. + * @param datanodeId Datanode ID derived from the log filename. + */ + + private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, String datanodeId) + throws SQLException { + List batchList = new ArrayList<>(MAX_OBJ_IN_LIST + 100); + + try (BufferedReader reader = Files.newBufferedReader(Paths.get(logFilePath), StandardCharsets.UTF_8)) { + String line; + while ((line = reader.readLine()) != null) { + String[] parts = line.split(LOG_LINE_SPLIT_REGEX); + String timestamp = parts[0].trim(); + String logLevel = parts[1].trim(); + String id = null, index = null; + String errorMessage = "No error"; + + DatanodeContainerInfo.Builder builder = new DatanodeContainerInfo.Builder() + .setDatanodeId(datanodeId) + .setTimestamp(timestamp) + .setLogLevel(logLevel); + + for (int i = 2; i < parts.length; i++) { + String part = parts[i].trim(); + + if (part.contains(KEY_VALUE_SPLIT_REGEX)) { + String[] keyValue = part.split(KEY_VALUE_SPLIT_REGEX, 2); + if (keyValue.length == 2) { + String key = keyValue[0].trim(); + String value = keyValue[1].trim(); + + switch (key) { + case KEY_ID: + id = value; + builder.setContainerId(Long.parseLong(value)); + break; + case KEY_BCSID: + builder.setBcsid(Long.parseLong(value)); + break; + case KEY_STATE: + builder.setState(value.replace("|", "").trim()); + break; + case KEY_INDEX: + index = value; + builder.setIndexValue(Integer.parseInt(value)); + break; + default: + break; + } + } + } else { + if (!part.isEmpty()) { + errorMessage = part.replace("|", "").trim(); + } + } + } + builder.setErrorMessage(errorMessage); + + if (index == null || !index.equals("0")) { + continue; //Currently only ratis replicated containers are considered. + } + + if (id != null) { + try { + batchList.add(builder.build()); + + if (batchList.size() >= MAX_OBJ_IN_LIST) { + dbstore.insertContainerDatanodeData(batchList); + batchList.clear(); + } + } catch (SQLException e) { + throw new SQLException(e.getMessage()); + } catch (Exception e) { + System.err.println( + "Error processing the batch for container: " + id + " at datanode: " + datanodeId); + e.printStackTrace(); + } + } else { + System.err.println("Log line does not have all required fields: " + line); + } + } + if (!batchList.isEmpty()) { + dbstore.insertContainerDatanodeData(batchList); + batchList.clear(); + } + + } catch (IOException e) { + e.printStackTrace(); + } + } +} diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DBConsts.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DBConsts.java index b00f77a9efba..7f8bb903ae70 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DBConsts.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DBConsts.java @@ -31,7 +31,7 @@ private DBConsts() { public static final String DATABASE_NAME = "container_datanode.db"; public static final String PROPS_FILE = "container-log-db-queries.properties"; public static final int CACHE_SIZE = 1000000; - public static final int BATCH_SIZE = 1000; + public static final int BATCH_SIZE = 2500; public static final String DATANODE_CONTAINER_LOG_TABLE_NAME = "DatanodeContainerLogTable"; public static final String CONTAINER_LOG_TABLE_NAME = "ContainerLogTable"; diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java index 18a1825f5994..a4134c0b3a8d 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java @@ -21,73 +21,117 @@ *Holds information about a container. */ -public class DatanodeContainerInfo { +public final class DatanodeContainerInfo { + + private final long containerId; + private final String datanodeId; + private final String timestamp; + private final String state; + private final long bcsid; + private final String errorMessage; + private final String logLevel; + private final int indexValue; + + private DatanodeContainerInfo(Builder builder) { + this.containerId = builder.containerId; + this.datanodeId = builder.datanodeId; + this.timestamp = builder.timestamp; + this.state = builder.state; + this.bcsid = builder.bcsid; + this.errorMessage = builder.errorMessage; + this.logLevel = builder.logLevel; + this.indexValue = builder.indexValue; + } - private String timestamp; - private String state; - private long bcsid; - private String errorMessage; - private String logLevel; - private int indexValue; + /** + * Builder for DatanodeContainerInfo. + */ + + public static class Builder { + private long containerId; + private String datanodeId; + private String timestamp; + private String state; + private long bcsid; + private String errorMessage; + private String logLevel; + private int indexValue; + + public Builder setContainerId(long containerId) { + this.containerId = containerId; + return this; + } + + public Builder setDatanodeId(String datanodeId) { + this.datanodeId = datanodeId; + return this; + } + + public Builder setTimestamp(String timestamp) { + this.timestamp = timestamp; + return this; + } + + public Builder setState(String state) { + this.state = state; + return this; + } + + public Builder setBcsid(long bcsid) { + this.bcsid = bcsid; + return this; + } + + public Builder setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + return this; + } + + public Builder setLogLevel(String logLevel) { + this.logLevel = logLevel; + return this; + } + + public Builder setIndexValue(int indexValue) { + this.indexValue = indexValue; + return this; + } + + public DatanodeContainerInfo build() { + return new DatanodeContainerInfo(this); + } + } - public DatanodeContainerInfo() { + public long getContainerId() { + return containerId; } - public DatanodeContainerInfo(String timestamp, String state, long bcsid, String errorMessage, - String logLevel, int indexValue) { - this.timestamp = timestamp; - this.state = state; - this.bcsid = bcsid; - this.errorMessage = errorMessage; - this.logLevel = logLevel; - this.indexValue = indexValue; + + public String getDatanodeId() { + return datanodeId; } public String getTimestamp() { return timestamp; } - public void setTimestamp(String timestamp) { - this.timestamp = timestamp; - } - public String getState() { return state; } - public void setState(String state) { - this.state = state; - } - public long getBcsid() { return bcsid; } - public void setBcsid(long bcsid) { - this.bcsid = bcsid; - } - public String getErrorMessage() { return errorMessage; } - public void setErrorMessage(String errorMessage) { - this.errorMessage = errorMessage; - } - public String getLogLevel() { return logLevel; } - public void setLogLevel(String logLevel) { - this.logLevel = logLevel; - } - public int getIndexValue() { return indexValue; } - - public void setIndexValue(int indexValue) { - this.indexValue = indexValue; - } - } + diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogController.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogController.java new file mode 100644 index 000000000000..1043d8914704 --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogController.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.debug.container; + +import org.apache.hadoop.hdds.cli.DebugSubcommand; +import org.kohsuke.MetaInfServices; +import picocli.CommandLine; + +/** + * A controller for managing container log operations like parsing and listing containers. + */ + +@CommandLine.Command( + name = "container", + subcommands = { + ContainerLogParser.class + }, + description = "Parse, Store, Retrieve" +) +@MetaInfServices(DebugSubcommand.class) +public class ContainerLogController implements DebugSubcommand { + +} diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java new file mode 100644 index 000000000000..c9ef86d5dd0f --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.debug.container; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.SQLException; +import java.util.concurrent.Callable; +import org.apache.hadoop.ozone.containerlog.parser.ContainerDatanodeDatabase; +import org.apache.hadoop.ozone.containerlog.parser.ContainerLogFileParser; +import picocli.CommandLine; + +/** + * Parses container logs, processes them, and updates the database accordingly. + */ + +@CommandLine.Command( + name = "container_log_parse", + description = "parse the container logs" +) +public class ContainerLogParser implements Callable { + private static final int DEFAULT_THREAD_COUNT = 10; + + @CommandLine.Option(names = {"--parse"}, + description = "path to the dir which contains log files") + private String path; + + @CommandLine.Option(names = {"--thread-count"}, + description = "Thread count for concurrent processing.", + defaultValue = "10") + private int threadCount; + + @CommandLine.ParentCommand + private ContainerLogController parent; + + @Override + public Void call() throws Exception { + if (threadCount <= 0) { + System.out.println("Invalid threadCount value provided (" + threadCount + "). Using default value: " + + DEFAULT_THREAD_COUNT); + threadCount = DEFAULT_THREAD_COUNT; + } + + if (path != null) { + Path logPath = Paths.get(path); + if (!Files.exists(logPath) || !Files.isDirectory(logPath)) { + System.err.println("Invalid path provided: " + path); + return null; + } + + ContainerDatanodeDatabase cdd = new ContainerDatanodeDatabase(); + ContainerLogFileParser parser = new ContainerLogFileParser(); + + try { + + cdd.createDatanodeContainerLogTable(); + + parser.processLogEntries(path, cdd, threadCount); + + cdd.insertLatestContainerLogData(); + System.out.println("Successfully parsed the log files and updated the respective tables"); + + } catch (SQLException e) { + System.err.println("Error occurred while processing logs or inserting data into the database: " + + e.getMessage()); + } catch (Exception e) { + System.err.println("An unexpected error occurred: " + e.getMessage()); + } + + } else { + System.out.println("path to logs folder not provided"); + } + + return null; + } + +} diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/package-info.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/package-info.java new file mode 100644 index 000000000000..9354dfe28b84 --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Provides functionality for managing container log operations, including parsing, processing, and storing log data. + */ + +package org.apache.hadoop.ozone.debug.container; diff --git a/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties b/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties index c47cb6fcaaf7..01a405556708 100644 --- a/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties +++ b/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties @@ -15,8 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # -CREATE_DATANODE_CONTAINER_LOG_TABLE=CREATE TABLE IF NOT EXISTS DatanodeContainerLogTable (datanode_id INTEGER NOT NULL, container_id INTEGER NOT NULL, timestamp TEXT NOT NULL, container_state TEXT NOT NULL, bcsid INTEGER NOT NULL, error_message TEXT NOT NUL, log_level TEXT NOT NULL, index_value INTEGER NOT NULL); -CREATE_CONTAINER_LOG_TABLE=CREATE TABLE IF NOT EXISTS ContainerLogTable (datanode_id INTEGER NOT NULL, container_id INTEGER NOT NULL, latest_state TEXT NOT NULL, latest_bcsid INTEGER NOT NULL, PRIMARY KEY (datanode_id, container_id)); +CREATE_DATANODE_CONTAINER_LOG_TABLE=CREATE TABLE IF NOT EXISTS DatanodeContainerLogTable (datanode_id TEXT NOT NULL, container_id INTEGER NOT NULL, timestamp TEXT NOT NULL, container_state TEXT, bcsid INTEGER, error_message TEXT, log_level TEXT NOT NULL, index_value INTEGER); +CREATE_CONTAINER_LOG_TABLE=CREATE TABLE IF NOT EXISTS ContainerLogTable (datanode_id TEXT NOT NULL, container_id INTEGER NOT NULL, latest_state TEXT, latest_bcsid INTEGER, PRIMARY KEY (datanode_id, container_id)); CREATE_DATANODE_CONTAINER_INDEX=CREATE INDEX IF NOT EXISTS idx_datanode_container ON DatanodeContainerLogTable (datanode_id, container_id, timestamp); INSERT_DATANODE_CONTAINER_LOG=INSERT INTO DatanodeContainerLogTable (datanode_id, container_id, timestamp, container_state, bcsid, error_message, log_level, index_value) VALUES (?, ?, ?, ?, ?, ?, ?, ?); INSERT_CONTAINER_LOG=INSERT OR REPLACE INTO ContainerLogTable (datanode_id, container_id, latest_state, latest_bcsid) VALUES (?, ?, ?, ?);