From 06f7559bcffadecad971016fb8a05d35341a2139 Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Wed, 9 Apr 2025 12:24:43 +0530 Subject: [PATCH 01/13] HDDS-12581. Multi-threaded Log File Parsing with Batch Updates to DB --- .../parser/ContainerLogFileParser.java | 282 ++++++++++++++++++ .../container/ContainerLogController.java | 38 +++ .../debug/container/ContainerLogParser.java | 102 +++++++ .../ozone/debug/container/package-info.java | 22 ++ .../container-log-db-queries.properties | 2 +- 5 files changed, 445 insertions(+), 1 deletion(-) create mode 100644 hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java create mode 100644 hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogController.java create mode 100644 hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java create mode 100644 hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/package-info.java diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java new file mode 100644 index 000000000000..3c98baf89de7 --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.containerlog.parser; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Container log file Parsing logic. + */ + +public class ContainerLogFileParser { + + private ExecutorService executorService; + private final Map> batchMap = new HashMap<>(500); + private final Map locks = new HashMap<>(500); + private final ReentrantLock globalLock = new ReentrantLock(); + private static final int MAX_KEYS_IN_MAP = 400; + private final AtomicBoolean isGlobalLockHeld = new AtomicBoolean(false); + private final Object globalLockNotifier = new Object(); + + private static final String FILENAME_PARTS_REGEX = "-"; + private static final String DATANODE_ID_REGEX = "\\."; + private static final String LOG_LINE_SPLIT_REGEX = " \\| "; + private static final String KEY_VALUE_SPLIT_REGEX = "="; + + public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase dbstore, int threadCount) + throws SQLException { + try (Stream paths = Files.walk(Paths.get(logDirectoryPath))) { + + List files = paths.filter(Files::isRegularFile).collect(Collectors.toList()); + + executorService = Executors.newFixedThreadPool(threadCount); + + CountDownLatch latch = new CountDownLatch(files.size()); + //int count = 1; + for (Path file : files) { + String fileName = file.getFileName().toString(); + String[] parts = fileName.split(FILENAME_PARTS_REGEX); + + if (parts.length < 3) { + System.out.println("Filename format is incorrect (not enough parts): " + fileName); + continue; + } + + long datanodeId = 0; + String datanodeIdStr = parts[2]; + if (datanodeIdStr.contains(".log")) { + datanodeIdStr = datanodeIdStr.split(DATANODE_ID_REGEX)[0]; + } + + try { + datanodeId = Long.parseLong(datanodeIdStr); + System.out.println("Parsed datanodeId: " + datanodeId); + + } catch (NumberFormatException e) { + System.out.println("Invalid datanode ID in filename: " + fileName); + continue; + } + + long finalDatanodeId = datanodeId; + //count++; + //int finalCount = count; + executorService.submit(() -> { + + String threadName = Thread.currentThread().getName(); + try { + System.out.println(threadName + " is starting to process file: " + file.toString()); + processFile(file.toString(), dbstore, finalDatanodeId); + } catch (Exception e) { + System.out.println("Thread " + threadName + " is stopping to process the file: " + file.toString() + + " due to SQLException: " + e.getMessage()); + executorService.shutdown(); + } finally { + try { + latch.countDown(); + System.out.println(threadName + " finished processing file: " + file.toString() + + ", Latch count after countdown: " + latch.getCount()); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + latch.await(); + + + if (!batchMap.isEmpty()) { + processAndClearAllBatches(dbstore); + } + + executorService.shutdown(); + + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } catch (SQLException e) { + System.out.println("Thread " + Thread.currentThread().getName() + + " is stopping due to SQLException: " + e.getMessage()); + executorService.shutdown(); + throw e; + } + } + + + private synchronized void processAndClearAllBatches(ContainerDatanodeDatabase dbstore) throws SQLException { + globalLock.lock(); + try { + isGlobalLockHeld.set(true); + for (Map.Entry> entry : batchMap.entrySet()) { + String key = entry.getKey(); + List transitionList = entry.getValue(); + + dbstore.insertContainerDatanodeData(key, transitionList); + transitionList.clear(); + } + batchMap.clear(); + locks.clear(); + + } finally { + globalLock.unlock(); + isGlobalLockHeld.set(false); + synchronized (globalLockNotifier) { + notifyAll(); + } + } + } + + public void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, long datanodeId) throws SQLException { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(logFilePath), + StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + String[] parts = line.split(LOG_LINE_SPLIT_REGEX); + String timestamp = parts[0]; + String logLevel = parts[1]; + String id = null, bcsid = null, state = null, index = null; + + for (String part : parts) { + part = part.trim(); + + if (part.contains(KEY_VALUE_SPLIT_REGEX)) { + String[] keyValue = part.split(KEY_VALUE_SPLIT_REGEX); + if (keyValue.length == 2) { + String key = keyValue[0].trim(); + String value = keyValue[1].trim(); + + switch (key) { + case "ID": + id = value; + break; + case "BCSID": + bcsid = value; + break; + case "State": + state = value.replace("|", "").trim(); + break; + case "Index": + index = value; + break; + default: + System.out.println("Unexpected key: " + key); + break; + } + } + } + + } + + if (index == null || !index.equals("0")) { + continue; + } + + String errorMessage = "No error"; + if (line.contains("ERROR") || line.contains("WARN")) { + errorMessage = null; + for (int i = parts.length - 1; i >= 0; i--) { + String part = parts[i].trim(); + + if (part.isEmpty()) { + continue; + } + + if (part.startsWith("State=") || part.startsWith("ID=") || part.startsWith("BCSID=") || + part.startsWith("Index=") || part.equals("ERROR") || part.equals("INFO") + || part.equals("WARN") || part.equals(timestamp)) { + continue; + } + if (part.endsWith("|")) { + part = part.replace("|", "").trim(); + } + errorMessage = part; + + break; + } + } + + if (id != null && bcsid != null && state != null) { + try { + long containerId = Long.parseLong(id); + long bcsidValue = Long.parseLong(bcsid); + + String key = containerId + "#" + datanodeId; + + try { + synchronized (globalLockNotifier) { + while (isGlobalLockHeld.get()) { + try { + globalLockNotifier.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + if (!isGlobalLockHeld.get()) { + ReentrantLock lock = locks.computeIfAbsent(key, k -> new ReentrantLock()); + lock.lock(); + + try { + List currentBatch = batchMap.computeIfAbsent(key, k -> new ArrayList<>()); + + currentBatch.add(new DatanodeContainerInfo(timestamp, state, bcsidValue, errorMessage, + logLevel, Integer.parseInt(index))); + + if (batchMap.size() >= MAX_KEYS_IN_MAP) { + processAndClearAllBatches(dbstore); + } + } finally { + lock.unlock(); + } + } + } + } catch (SQLException e) { + throw new SQLException(e.getMessage()); + } catch (Exception e) { + System.out.println("Error processing the batch for key: " + key); + e.printStackTrace(); + } + } catch (NumberFormatException e) { + System.out.println("Error parsing ID or BCSID as Long: " + line); + } + } else { + System.out.println("Log line does not have all required fields: " + line); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + } +} diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogController.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogController.java new file mode 100644 index 000000000000..1043d8914704 --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogController.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.debug.container; + +import org.apache.hadoop.hdds.cli.DebugSubcommand; +import org.kohsuke.MetaInfServices; +import picocli.CommandLine; + +/** + * A controller for managing container log operations like parsing and listing containers. + */ + +@CommandLine.Command( + name = "container", + subcommands = { + ContainerLogParser.class + }, + description = "Parse, Store, Retrieve" +) +@MetaInfServices(DebugSubcommand.class) +public class ContainerLogController implements DebugSubcommand { + +} diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java new file mode 100644 index 000000000000..7e935fddeebd --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.debug.container; + +import java.sql.SQLException; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.ozone.containerlog.parser.ContainerDatanodeDatabase; +import org.apache.hadoop.ozone.containerlog.parser.ContainerLogFileParser; +import picocli.CommandLine; + +/** + * Parses container logs, processes them, and updates the database accordingly. + */ + +@CommandLine.Command( + name = "container_log_parse", + description = "parse the container logs" +) +public class ContainerLogParser implements Callable { + @CommandLine.Option(names = {"--parse"}, + description = "path to the dir which contains log files") + private String path; + + @CommandLine.Option(names = {"--thread-count"}, + description = "Thread count for concurrent processing.", + defaultValue = "10") + private int threadCount; + + @CommandLine.ParentCommand + private ContainerLogController parent; + + @Override + public Void call() throws Exception { + if (path != null) { + + ContainerDatanodeDatabase cdd = new ContainerDatanodeDatabase(); + ContainerLogFileParser parser = new ContainerLogFileParser(); + AtomicBoolean isProcessingSuccessful = new AtomicBoolean(true); + + try { + // Try creating the table, if this fails, mark the process as unsuccessful + try { + cdd.createDatanodeContainerLogTable(); + } catch (SQLException e) { + isProcessingSuccessful.set(false); + System.err.println("Error occurred while creating the Datanode Container Log Table: " + e.getMessage()); + } + + // If the table creation was successful, proceed with processing the log entries + if (isProcessingSuccessful.get()) { + try { + parser.processLogEntries(path, cdd, threadCount); + } catch (Exception e) { + isProcessingSuccessful.set(false); + System.err.println("Error occurred during processing logs: " + e.getMessage()); + } + + // Insert the latest log data, if processing was successful + if (isProcessingSuccessful.get()) { + try { + cdd.insertLatestContainerLogData(); + } catch (SQLException e) { + isProcessingSuccessful.set(false); + System.err.println("Error occurred while inserting container log data: " + e.getMessage()); + } + } + + if (isProcessingSuccessful.get()) { + System.out.println("Successfully parsed the log files and updated the respective tables"); + } else { + System.err.println("Log processing failed."); + } + } + } catch (Exception e) { + isProcessingSuccessful.set(false); + System.err.println("An unexpected error occurred: " + e.getMessage()); + } + + } else { + System.out.println("path to logs folder not provided"); + } + + return null; + } + +} diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/package-info.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/package-info.java new file mode 100644 index 000000000000..9354dfe28b84 --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Provides functionality for managing container log operations, including parsing, processing, and storing log data. + */ + +package org.apache.hadoop.ozone.debug.container; diff --git a/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties b/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties index c47cb6fcaaf7..39bcefea1df6 100644 --- a/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties +++ b/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -CREATE_DATANODE_CONTAINER_LOG_TABLE=CREATE TABLE IF NOT EXISTS DatanodeContainerLogTable (datanode_id INTEGER NOT NULL, container_id INTEGER NOT NULL, timestamp TEXT NOT NULL, container_state TEXT NOT NULL, bcsid INTEGER NOT NULL, error_message TEXT NOT NUL, log_level TEXT NOT NULL, index_value INTEGER NOT NULL); +CREATE_DATANODE_CONTAINER_LOG_TABLE=CREATE TABLE IF NOT EXISTS DatanodeContainerLogTable (datanode_id INTEGER NOT NULL, container_id INTEGER NOT NULL, timestamp TEXT NOT NULL, container_state TEXT NOT NULL, bcsid INTEGER NOT NULL, error_message TEXT NOT NULL, log_level TEXT NOT NULL, index_value INTEGER NOT NULL); CREATE_CONTAINER_LOG_TABLE=CREATE TABLE IF NOT EXISTS ContainerLogTable (datanode_id INTEGER NOT NULL, container_id INTEGER NOT NULL, latest_state TEXT NOT NULL, latest_bcsid INTEGER NOT NULL, PRIMARY KEY (datanode_id, container_id)); CREATE_DATANODE_CONTAINER_INDEX=CREATE INDEX IF NOT EXISTS idx_datanode_container ON DatanodeContainerLogTable (datanode_id, container_id, timestamp); INSERT_DATANODE_CONTAINER_LOG=INSERT INTO DatanodeContainerLogTable (datanode_id, container_id, timestamp, container_state, bcsid, error_message, log_level, index_value) VALUES (?, ?, ?, ?, ?, ?, ?, ?); From 534c5d484d4d9922c24cd47a757bf789b4e286d6 Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Wed, 9 Apr 2025 13:07:44 +0530 Subject: [PATCH 02/13] fixed notifyall --- .../containerlog/parser/ContainerLogFileParser.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java index 3c98baf89de7..bab5096ed48e 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java @@ -18,9 +18,7 @@ package org.apache.hadoop.ozone.containerlog.parser; import java.io.BufferedReader; -import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -68,7 +66,8 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase CountDownLatch latch = new CountDownLatch(files.size()); //int count = 1; for (Path file : files) { - String fileName = file.getFileName().toString(); + Path fileNamePath = file.getFileName(); + String fileName = (fileNamePath != null) ? fileNamePath.toString() : ""; String[] parts = fileName.split(FILENAME_PARTS_REGEX); if (parts.length < 3) { @@ -153,14 +152,13 @@ private synchronized void processAndClearAllBatches(ContainerDatanodeDatabase d globalLock.unlock(); isGlobalLockHeld.set(false); synchronized (globalLockNotifier) { - notifyAll(); + globalLockNotifier.notifyAll(); } } } public void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, long datanodeId) throws SQLException { - try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(logFilePath), - StandardCharsets.UTF_8))) { + try (BufferedReader reader = Files.newBufferedReader(Paths.get(logFilePath), StandardCharsets.UTF_8)) { String line; while ((line = reader.readLine()) != null) { String[] parts = line.split(LOG_LINE_SPLIT_REGEX); From 195de5322d2ee3497b848813f04c3b0fd9dad59a Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Wed, 9 Apr 2025 14:27:24 +0530 Subject: [PATCH 03/13] removed unnecessary comments and print statements --- .../ozone/containerlog/parser/ContainerLogFileParser.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java index bab5096ed48e..ac82b65d8c73 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java @@ -64,7 +64,7 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase executorService = Executors.newFixedThreadPool(threadCount); CountDownLatch latch = new CountDownLatch(files.size()); - //int count = 1; + for (Path file : files) { Path fileNamePath = file.getFileName(); String fileName = (fileNamePath != null) ? fileNamePath.toString() : ""; @@ -83,7 +83,6 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase try { datanodeId = Long.parseLong(datanodeIdStr); - System.out.println("Parsed datanodeId: " + datanodeId); } catch (NumberFormatException e) { System.out.println("Invalid datanode ID in filename: " + fileName); @@ -91,8 +90,7 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase } long finalDatanodeId = datanodeId; - //count++; - //int finalCount = count; + executorService.submit(() -> { String threadName = Thread.currentThread().getName(); From 1b83e3b53a88313576b86744099dbd83c367e268 Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Thu, 10 Apr 2025 14:44:14 +0530 Subject: [PATCH 04/13] Updated locking and using batch list --- .../parser/ContainerDatanodeDatabase.java | 16 +- .../parser/ContainerLogFileParser.java | 190 +++++++----------- .../parser/DatanodeContainerInfo.java | 23 ++- .../debug/container/ContainerLogParser.java | 46 +---- 4 files changed, 109 insertions(+), 166 deletions(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java index d00d31c66d15..98a1d33ad1ba 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java @@ -117,15 +117,7 @@ private void createContainerLogTable() throws SQLException { } } - public void insertContainerDatanodeData(String key, List transitionList) throws SQLException { - String[] parts = key.split(CONTAINER_KEY_DELIMITER); - if (parts.length != 2) { - System.err.println("Invalid key format: " + key); - return; - } - - long containerId = Long.parseLong(parts[0]); - long datanodeId = Long.parseLong(parts[1]); + public void insertContainerDatanodeData(List transitionList) throws SQLException { String insertSQL = queries.get("INSERT_DATANODE_CONTAINER_LOG"); @@ -135,8 +127,8 @@ public void insertContainerDatanodeData(String key, List int count = 0; for (DatanodeContainerInfo info : transitionList) { - preparedStatement.setLong(1, datanodeId); - preparedStatement.setLong(2, containerId); + preparedStatement.setLong(1, info.getDatanodeId()); + preparedStatement.setLong(2, info.getContainerId()); preparedStatement.setString(3, info.getTimestamp()); preparedStatement.setString(4, info.getState()); preparedStatement.setLong(5, info.getBcsid()); @@ -157,7 +149,7 @@ public void insertContainerDatanodeData(String key, List preparedStatement.executeBatch(); } } catch (SQLException e) { - LOG.error("Failed to insert container log for container {} on datanode {}", containerId, datanodeId, e); + LOG.error("Failed to insert container log", e); throw e; } catch (Exception e) { LOG.error(e.getMessage()); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java index ac82b65d8c73..f0bb8cf0d586 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java @@ -25,9 +25,7 @@ import java.nio.file.Paths; import java.sql.SQLException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -42,9 +40,8 @@ public class ContainerLogFileParser { - private ExecutorService executorService; - private final Map> batchMap = new HashMap<>(500); - private final Map locks = new HashMap<>(500); + private ExecutorService executorService; + private final List batchList = new ArrayList<>(500); private final ReentrantLock globalLock = new ReentrantLock(); private static final int MAX_KEYS_IN_MAP = 400; private final AtomicBoolean isGlobalLockHeld = new AtomicBoolean(false); @@ -75,30 +72,21 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase continue; } - long datanodeId = 0; String datanodeIdStr = parts[2]; if (datanodeIdStr.contains(".log")) { datanodeIdStr = datanodeIdStr.split(DATANODE_ID_REGEX)[0]; } - try { - datanodeId = Long.parseLong(datanodeIdStr); - - } catch (NumberFormatException e) { - System.out.println("Invalid datanode ID in filename: " + fileName); - continue; - } - - long finalDatanodeId = datanodeId; + long datanodeId = Long.parseLong(datanodeIdStr); executorService.submit(() -> { String threadName = Thread.currentThread().getName(); try { System.out.println(threadName + " is starting to process file: " + file.toString()); - processFile(file.toString(), dbstore, finalDatanodeId); + processFile(file.toString(), dbstore, datanodeId); } catch (Exception e) { - System.out.println("Thread " + threadName + " is stopping to process the file: " + file.toString() + + System.err.println("Thread " + threadName + " is stopping to process the file: " + file.toString() + " due to SQLException: " + e.getMessage()); executorService.shutdown(); } finally { @@ -114,8 +102,7 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase } latch.await(); - - if (!batchMap.isEmpty()) { + if (!batchList.isEmpty()) { processAndClearAllBatches(dbstore); } @@ -123,156 +110,127 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase } catch (IOException | InterruptedException e) { e.printStackTrace(); + } catch (NumberFormatException e) { + System.err.println("Invalid datanode ID"); } catch (SQLException e) { - System.out.println("Thread " + Thread.currentThread().getName() + + System.err.println("Thread " + Thread.currentThread().getName() + " is stopping due to SQLException: " + e.getMessage()); executorService.shutdown(); throw e; } } + private synchronized void processAndClearAllBatches(ContainerDatanodeDatabase dbstore) throws SQLException { + List localBatchList = null; - private synchronized void processAndClearAllBatches(ContainerDatanodeDatabase dbstore) throws SQLException { globalLock.lock(); try { isGlobalLockHeld.set(true); - for (Map.Entry> entry : batchMap.entrySet()) { - String key = entry.getKey(); - List transitionList = entry.getValue(); - dbstore.insertContainerDatanodeData(key, transitionList); - transitionList.clear(); - } - batchMap.clear(); - locks.clear(); + localBatchList = new ArrayList<>(batchList); + batchList.clear(); - } finally { + } finally { globalLock.unlock(); isGlobalLockHeld.set(false); synchronized (globalLockNotifier) { globalLockNotifier.notifyAll(); } } + + if (localBatchList != null && !localBatchList.isEmpty()) { + dbstore.insertContainerDatanodeData(localBatchList); + localBatchList.clear(); + } } - public void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, long datanodeId) throws SQLException { + + private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, long datanodeId) throws SQLException { try (BufferedReader reader = Files.newBufferedReader(Paths.get(logFilePath), StandardCharsets.UTF_8)) { String line; while ((line = reader.readLine()) != null) { String[] parts = line.split(LOG_LINE_SPLIT_REGEX); - String timestamp = parts[0]; - String logLevel = parts[1]; + String timestamp = parts[0].trim(); + String logLevel = parts[1].trim(); String id = null, bcsid = null, state = null, index = null; + String errorMessage = "No error"; - for (String part : parts) { - part = part.trim(); + for (int i = 2; i < parts.length; i++) { + String part = parts[i].trim(); if (part.contains(KEY_VALUE_SPLIT_REGEX)) { - String[] keyValue = part.split(KEY_VALUE_SPLIT_REGEX); + String[] keyValue = part.split(KEY_VALUE_SPLIT_REGEX, 2); if (keyValue.length == 2) { String key = keyValue[0].trim(); String value = keyValue[1].trim(); switch (key) { - case "ID": - id = value; - break; - case "BCSID": - bcsid = value; - break; - case "State": - state = value.replace("|", "").trim(); - break; - case "Index": - index = value; - break; - default: - System.out.println("Unexpected key: " + key); - break; + case "ID": + id = value; + break; + case "BCSID": + bcsid = value; + break; + case "State": + state = value.replace("|", "").trim(); + break; + case "Index": + index = value; + break; + default: + break; } } - } - - } - - if (index == null || !index.equals("0")) { - continue; - } - - String errorMessage = "No error"; - if (line.contains("ERROR") || line.contains("WARN")) { - errorMessage = null; - for (int i = parts.length - 1; i >= 0; i--) { - String part = parts[i].trim(); - - if (part.isEmpty()) { - continue; - } - - if (part.startsWith("State=") || part.startsWith("ID=") || part.startsWith("BCSID=") || - part.startsWith("Index=") || part.equals("ERROR") || part.equals("INFO") - || part.equals("WARN") || part.equals(timestamp)) { - continue; - } - if (part.endsWith("|")) { - part = part.replace("|", "").trim(); + } else { + if (!part.isEmpty()) { + errorMessage = part.replace("|", "").trim(); } - errorMessage = part; - - break; } } - if (id != null && bcsid != null && state != null) { - try { - long containerId = Long.parseLong(id); - long bcsidValue = Long.parseLong(bcsid); - - String key = containerId + "#" + datanodeId; + if (index == null || !index.equals("0")) { + continue; //Currently only ratis replicated containers are considered. + } + if (id != null && bcsid != null && state != null) { try { - synchronized (globalLockNotifier) { - while (isGlobalLockHeld.get()) { - try { - globalLockNotifier.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + long containerId = Long.parseLong(id); + long bcsidValue = Long.parseLong(bcsid); + + try { + synchronized (globalLockNotifier) { + while (isGlobalLockHeld.get()) { + try { + globalLockNotifier.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } - } - if (!isGlobalLockHeld.get()) { - ReentrantLock lock = locks.computeIfAbsent(key, k -> new ReentrantLock()); - lock.lock(); + if (!isGlobalLockHeld.get()) { - try { - List currentBatch = batchMap.computeIfAbsent(key, k -> new ArrayList<>()); + batchList.add(new DatanodeContainerInfo(containerId, datanodeId, timestamp, state, bcsidValue, errorMessage, logLevel, Integer.parseInt(index))); - currentBatch.add(new DatanodeContainerInfo(timestamp, state, bcsidValue, errorMessage, - logLevel, Integer.parseInt(index))); - - if (batchMap.size() >= MAX_KEYS_IN_MAP) { + if (batchList.size() >= MAX_KEYS_IN_MAP) { processAndClearAllBatches(dbstore); } - } finally { - lock.unlock(); } } + } catch (SQLException e) { + throw new SQLException(e.getMessage()); + } catch (Exception e) { + System.err.println("Error processing the batch for container: " + containerId + " at datanode: " + datanodeId); + e.printStackTrace(); } - } catch (SQLException e) { - throw new SQLException(e.getMessage()); - } catch (Exception e) { - System.out.println("Error processing the batch for key: " + key); - e.printStackTrace(); + } catch (NumberFormatException e) { + System.err.println("Error parsing ID or BCSID as Long: " + line); } - } catch (NumberFormatException e) { - System.out.println("Error parsing ID or BCSID as Long: " + line); + } else { + System.err.println("Log line does not have all required fields: " + line); } - } else { - System.out.println("Log line does not have all required fields: " + line); } - } - } catch (IOException e) { - e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); } } -} + } diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java index 18a1825f5994..c515c615cbf8 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java @@ -23,6 +23,8 @@ public class DatanodeContainerInfo { + private long containerId; + private long datanodeId; private String timestamp; private String state; private long bcsid; @@ -32,8 +34,9 @@ public class DatanodeContainerInfo { public DatanodeContainerInfo() { } - public DatanodeContainerInfo(String timestamp, String state, long bcsid, String errorMessage, - String logLevel, int indexValue) { + public DatanodeContainerInfo(long containerId, long datanodeId, String timestamp, String state, long bcsid, String errorMessage, String logLevel, int indexValue) { + this.containerId=containerId; + this.datanodeId=datanodeId; this.timestamp = timestamp; this.state = state; this.bcsid = bcsid; @@ -42,6 +45,22 @@ public DatanodeContainerInfo(String timestamp, String state, long bcsid, String this.indexValue = indexValue; } + public long getContainerId() { + return containerId; + } + + public void setContainerId(long containerId) { + this.containerId = containerId; + } + + public long getDatanodeId() { + return datanodeId; + } + + public void setDatanodeId(long datanodeId) { + this.datanodeId = datanodeId; + } + public String getTimestamp() { return timestamp; } diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java index 7e935fddeebd..4b69c1209bbb 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java @@ -19,7 +19,6 @@ import java.sql.SQLException; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.ozone.containerlog.parser.ContainerDatanodeDatabase; import org.apache.hadoop.ozone.containerlog.parser.ContainerLogFileParser; import picocli.CommandLine; @@ -51,44 +50,19 @@ public Void call() throws Exception { ContainerDatanodeDatabase cdd = new ContainerDatanodeDatabase(); ContainerLogFileParser parser = new ContainerLogFileParser(); - AtomicBoolean isProcessingSuccessful = new AtomicBoolean(true); try { - // Try creating the table, if this fails, mark the process as unsuccessful - try { - cdd.createDatanodeContainerLogTable(); - } catch (SQLException e) { - isProcessingSuccessful.set(false); - System.err.println("Error occurred while creating the Datanode Container Log Table: " + e.getMessage()); - } - - // If the table creation was successful, proceed with processing the log entries - if (isProcessingSuccessful.get()) { - try { - parser.processLogEntries(path, cdd, threadCount); - } catch (Exception e) { - isProcessingSuccessful.set(false); - System.err.println("Error occurred during processing logs: " + e.getMessage()); - } - - // Insert the latest log data, if processing was successful - if (isProcessingSuccessful.get()) { - try { - cdd.insertLatestContainerLogData(); - } catch (SQLException e) { - isProcessingSuccessful.set(false); - System.err.println("Error occurred while inserting container log data: " + e.getMessage()); - } - } - - if (isProcessingSuccessful.get()) { - System.out.println("Successfully parsed the log files and updated the respective tables"); - } else { - System.err.println("Log processing failed."); - } - } + + cdd.createDatanodeContainerLogTable(); + + parser.processLogEntries(path, cdd, threadCount); + + cdd.insertLatestContainerLogData(); + System.out.println("Successfully parsed the log files and updated the respective tables"); + + } catch (SQLException e) { + System.err.println("Error occurred while processing logs or inserting data into the database: " + e.getMessage()); } catch (Exception e) { - isProcessingSuccessful.set(false); System.err.println("An unexpected error occurred: " + e.getMessage()); } From 00cdec293272c260354a40b0f2c59c9ffb3f795b Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Thu, 10 Apr 2025 17:12:13 +0530 Subject: [PATCH 05/13] Removed locking and using Builder design pattern --- .../parser/ContainerDatanodeDatabase.java | 7 +- .../parser/ContainerLogFileParser.java | 154 +++++++----------- .../ozone/containerlog/parser/DBConsts.java | 2 +- .../parser/DatanodeContainerInfo.java | 135 ++++++++------- .../debug/container/ContainerLogParser.java | 3 +- 5 files changed, 146 insertions(+), 155 deletions(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java index 98a1d33ad1ba..7cf0d5864d22 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java @@ -41,7 +41,6 @@ public class ContainerDatanodeDatabase { private static Map queries; - public static final String CONTAINER_KEY_DELIMITER = "#"; static { loadProperties(); @@ -117,7 +116,7 @@ private void createContainerLogTable() throws SQLException { } } - public void insertContainerDatanodeData(List transitionList) throws SQLException { + public synchronized void insertContainerDatanodeData(List transitionList) throws SQLException { String insertSQL = queries.get("INSERT_DATANODE_CONTAINER_LOG"); @@ -171,9 +170,9 @@ public void insertLatestContainerLogData() throws SQLException { PreparedStatement selectStmt = connection.prepareStatement(selectSQL); ResultSet resultSet = selectStmt.executeQuery(); PreparedStatement insertStmt = connection.prepareStatement(insertSQL)) { - + int count = 0; - + while (resultSet.next()) { long datanodeId = resultSet.getLong("datanode_id"); long containerId = resultSet.getLong("container_id"); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java index f0bb8cf0d586..0b8c4f65759a 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java @@ -29,8 +29,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -41,11 +39,7 @@ public class ContainerLogFileParser { private ExecutorService executorService; - private final List batchList = new ArrayList<>(500); - private final ReentrantLock globalLock = new ReentrantLock(); - private static final int MAX_KEYS_IN_MAP = 400; - private final AtomicBoolean isGlobalLockHeld = new AtomicBoolean(false); - private final Object globalLockNotifier = new Object(); + private static final int MAX_OBJ_IN_LIST = 5000; private static final String FILENAME_PARTS_REGEX = "-"; private static final String DATANODE_ID_REGEX = "\\."; @@ -61,7 +55,6 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase executorService = Executors.newFixedThreadPool(threadCount); CountDownLatch latch = new CountDownLatch(files.size()); - for (Path file : files) { Path fileNamePath = file.getFileName(); String fileName = (fileNamePath != null) ? fileNamePath.toString() : ""; @@ -102,50 +95,18 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase } latch.await(); - if (!batchList.isEmpty()) { - processAndClearAllBatches(dbstore); - } - executorService.shutdown(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } catch (NumberFormatException e) { System.err.println("Invalid datanode ID"); - } catch (SQLException e) { - System.err.println("Thread " + Thread.currentThread().getName() + - " is stopping due to SQLException: " + e.getMessage()); - executorService.shutdown(); - throw e; } } - private synchronized void processAndClearAllBatches(ContainerDatanodeDatabase dbstore) throws SQLException { - List localBatchList = null; - - globalLock.lock(); - try { - isGlobalLockHeld.set(true); - - localBatchList = new ArrayList<>(batchList); - batchList.clear(); - - } finally { - globalLock.unlock(); - isGlobalLockHeld.set(false); - synchronized (globalLockNotifier) { - globalLockNotifier.notifyAll(); - } - } - - if (localBatchList != null && !localBatchList.isEmpty()) { - dbstore.insertContainerDatanodeData(localBatchList); - localBatchList.clear(); - } - } - - private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, long datanodeId) throws SQLException { + List batchList = new ArrayList<>(5100); + try (BufferedReader reader = Files.newBufferedReader(Paths.get(logFilePath), StandardCharsets.UTF_8)) { String line; while ((line = reader.readLine()) != null) { @@ -165,20 +126,20 @@ private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, String value = keyValue[1].trim(); switch (key) { - case "ID": - id = value; - break; - case "BCSID": - bcsid = value; - break; - case "State": - state = value.replace("|", "").trim(); - break; - case "Index": - index = value; - break; - default: - break; + case "ID": + id = value; + break; + case "BCSID": + bcsid = value; + break; + case "State": + state = value.replace("|", "").trim(); + break; + case "Index": + index = value; + break; + default: + break; } } } else { @@ -188,49 +149,54 @@ private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, } } - if (index == null || !index.equals("0")) { - continue; //Currently only ratis replicated containers are considered. - } + if (index == null || !index.equals("0")) { + continue; //Currently only ratis replicated containers are considered. + } + + if (id != null && bcsid != null && state != null) { + try { + long containerId = Long.parseLong(id); + long bcsidValue = Long.parseLong(bcsid); - if (id != null && bcsid != null && state != null) { try { - long containerId = Long.parseLong(id); - long bcsidValue = Long.parseLong(bcsid); - - try { - synchronized (globalLockNotifier) { - while (isGlobalLockHeld.get()) { - try { - globalLockNotifier.wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - if (!isGlobalLockHeld.get()) { - - batchList.add(new DatanodeContainerInfo(containerId, datanodeId, timestamp, state, bcsidValue, errorMessage, logLevel, Integer.parseInt(index))); - - if (batchList.size() >= MAX_KEYS_IN_MAP) { - processAndClearAllBatches(dbstore); - } - } - } - } catch (SQLException e) { - throw new SQLException(e.getMessage()); - } catch (Exception e) { - System.err.println("Error processing the batch for container: " + containerId + " at datanode: " + datanodeId); - e.printStackTrace(); + batchList.add( + new DatanodeContainerInfo.Builder() + .setContainerId(containerId) + .setDatanodeId(datanodeId) + .setTimestamp(timestamp) + .setState(state) + .setBcsid(bcsidValue) + .setErrorMessage(errorMessage) + .setLogLevel(logLevel) + .setIndexValue(Integer.parseInt(index)) + .build() + ); + + if (batchList.size() >= MAX_OBJ_IN_LIST) { + dbstore.insertContainerDatanodeData(batchList); + batchList.clear(); } - } catch (NumberFormatException e) { - System.err.println("Error parsing ID or BCSID as Long: " + line); + } catch (SQLException e) { + throw new SQLException(e.getMessage()); + } catch (Exception e) { + System.err.println( + "Error processing the batch for container: " + containerId + " at datanode: " + datanodeId); + e.printStackTrace(); } - } else { - System.err.println("Log line does not have all required fields: " + line); + } catch (NumberFormatException e) { + System.err.println("Error parsing ID or BCSID as Long: " + line); } + } else { + System.err.println("Log line does not have all required fields: " + line); } - } catch (IOException e) { - e.printStackTrace(); + } + if (!batchList.isEmpty()) { + dbstore.insertContainerDatanodeData(batchList); + batchList.clear(); + } + + } catch (IOException e) { + e.printStackTrace(); } } - } +} diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DBConsts.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DBConsts.java index b00f77a9efba..7f8bb903ae70 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DBConsts.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DBConsts.java @@ -31,7 +31,7 @@ private DBConsts() { public static final String DATABASE_NAME = "container_datanode.db"; public static final String PROPS_FILE = "container-log-db-queries.properties"; public static final int CACHE_SIZE = 1000000; - public static final int BATCH_SIZE = 1000; + public static final int BATCH_SIZE = 2500; public static final String DATANODE_CONTAINER_LOG_TABLE_NAME = "DatanodeContainerLogTable"; public static final String CONTAINER_LOG_TABLE_NAME = "ContainerLogTable"; diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java index c515c615cbf8..82e068e50106 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java @@ -21,92 +21,117 @@ *Holds information about a container. */ -public class DatanodeContainerInfo { - - private long containerId; - private long datanodeId; - private String timestamp; - private String state; - private long bcsid; - private String errorMessage; - private String logLevel; - private int indexValue; - - public DatanodeContainerInfo() { - } - public DatanodeContainerInfo(long containerId, long datanodeId, String timestamp, String state, long bcsid, String errorMessage, String logLevel, int indexValue) { - this.containerId=containerId; - this.datanodeId=datanodeId; - this.timestamp = timestamp; - this.state = state; - this.bcsid = bcsid; - this.errorMessage = errorMessage; - this.logLevel = logLevel; - this.indexValue = indexValue; +public final class DatanodeContainerInfo { + + private final long containerId; + private final long datanodeId; + private final String timestamp; + private final String state; + private final long bcsid; + private final String errorMessage; + private final String logLevel; + private final int indexValue; + + private DatanodeContainerInfo(Builder builder) { + this.containerId = builder.containerId; + this.datanodeId = builder.datanodeId; + this.timestamp = builder.timestamp; + this.state = builder.state; + this.bcsid = builder.bcsid; + this.errorMessage = builder.errorMessage; + this.logLevel = builder.logLevel; + this.indexValue = builder.indexValue; + } + + /** + * Builder for DatanodeContainerInfo.. + */ + + public static class Builder { + private long containerId; + private long datanodeId; + private String timestamp; + private String state; + private long bcsid; + private String errorMessage; + private String logLevel; + private int indexValue; + + public Builder setContainerId(long containerId) { + this.containerId = containerId; + return this; + } + + public Builder setDatanodeId(long datanodeId) { + this.datanodeId = datanodeId; + return this; + } + + public Builder setTimestamp(String timestamp) { + this.timestamp = timestamp; + return this; + } + + public Builder setState(String state) { + this.state = state; + return this; + } + + public Builder setBcsid(long bcsid) { + this.bcsid = bcsid; + return this; + } + + public Builder setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + return this; + } + + public Builder setLogLevel(String logLevel) { + this.logLevel = logLevel; + return this; + } + + public Builder setIndexValue(int indexValue) { + this.indexValue = indexValue; + return this; + } + + public DatanodeContainerInfo build() { + return new DatanodeContainerInfo(this); + } } public long getContainerId() { return containerId; } - public void setContainerId(long containerId) { - this.containerId = containerId; - } - public long getDatanodeId() { return datanodeId; } - public void setDatanodeId(long datanodeId) { - this.datanodeId = datanodeId; - } - public String getTimestamp() { return timestamp; } - public void setTimestamp(String timestamp) { - this.timestamp = timestamp; - } - public String getState() { return state; } - public void setState(String state) { - this.state = state; - } - public long getBcsid() { return bcsid; } - public void setBcsid(long bcsid) { - this.bcsid = bcsid; - } - public String getErrorMessage() { return errorMessage; } - public void setErrorMessage(String errorMessage) { - this.errorMessage = errorMessage; - } - public String getLogLevel() { return logLevel; } - public void setLogLevel(String logLevel) { - this.logLevel = logLevel; - } - public int getIndexValue() { return indexValue; } - - public void setIndexValue(int indexValue) { - this.indexValue = indexValue; - } - } + diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java index 4b69c1209bbb..3e11fd630507 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java @@ -61,7 +61,8 @@ public Void call() throws Exception { System.out.println("Successfully parsed the log files and updated the respective tables"); } catch (SQLException e) { - System.err.println("Error occurred while processing logs or inserting data into the database: " + e.getMessage()); + System.err.println("Error occurred while processing logs or inserting data into the database: " + + e.getMessage()); } catch (Exception e) { System.err.println("An unexpected error occurred: " + e.getMessage()); } From ad568bef7941b3a381ab32669f117f3f1fa314a6 Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Thu, 10 Apr 2025 19:06:40 +0530 Subject: [PATCH 06/13] defined constants and simplified builder usage inside switch case --- .../parser/ContainerLogFileParser.java | 70 +++++++++---------- .../container-log-db-queries.properties | 4 +- 2 files changed, 34 insertions(+), 40 deletions(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java index 0b8c4f65759a..89f5e18ad3e9 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java @@ -45,6 +45,10 @@ public class ContainerLogFileParser { private static final String DATANODE_ID_REGEX = "\\."; private static final String LOG_LINE_SPLIT_REGEX = " \\| "; private static final String KEY_VALUE_SPLIT_REGEX = "="; + private static final String KEY_ID = "ID"; + private static final String KEY_BCSID = "BCSID"; + private static final String KEY_STATE = "State"; + private static final String KEY_INDEX = "Index"; public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase dbstore, int threadCount) throws SQLException { @@ -113,8 +117,12 @@ private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, String[] parts = line.split(LOG_LINE_SPLIT_REGEX); String timestamp = parts[0].trim(); String logLevel = parts[1].trim(); - String id = null, bcsid = null, state = null, index = null; - String errorMessage = "No error"; + String id = null, index = null; + + DatanodeContainerInfo.Builder builder = new DatanodeContainerInfo.Builder() + .setDatanodeId(datanodeId) + .setTimestamp(timestamp) + .setLogLevel(logLevel); for (int i = 2; i < parts.length; i++) { String part = parts[i].trim(); @@ -126,17 +134,19 @@ private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, String value = keyValue[1].trim(); switch (key) { - case "ID": + case KEY_ID: id = value; + builder.setContainerId(Long.parseLong(value)); break; - case "BCSID": - bcsid = value; + case KEY_BCSID: + builder.setBcsid(Long.parseLong(value)); break; - case "State": - state = value.replace("|", "").trim(); + case KEY_STATE: + builder.setState(value.replace("|", "").trim()); break; - case "Index": + case KEY_INDEX: index = value; + builder.setIndexValue(Integer.parseInt(value)); break; default: break; @@ -144,7 +154,9 @@ private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, } } else { if (!part.isEmpty()) { - errorMessage = part.replace("|", "").trim(); + builder.setErrorMessage(part.replace("|", "").trim()); + } else { + builder.setErrorMessage("No error"); } } } @@ -153,38 +165,20 @@ private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, continue; //Currently only ratis replicated containers are considered. } - if (id != null && bcsid != null && state != null) { + if (id != null) { try { - long containerId = Long.parseLong(id); - long bcsidValue = Long.parseLong(bcsid); + batchList.add(builder.build()); - try { - batchList.add( - new DatanodeContainerInfo.Builder() - .setContainerId(containerId) - .setDatanodeId(datanodeId) - .setTimestamp(timestamp) - .setState(state) - .setBcsid(bcsidValue) - .setErrorMessage(errorMessage) - .setLogLevel(logLevel) - .setIndexValue(Integer.parseInt(index)) - .build() - ); - - if (batchList.size() >= MAX_OBJ_IN_LIST) { - dbstore.insertContainerDatanodeData(batchList); - batchList.clear(); - } - } catch (SQLException e) { - throw new SQLException(e.getMessage()); - } catch (Exception e) { - System.err.println( - "Error processing the batch for container: " + containerId + " at datanode: " + datanodeId); - e.printStackTrace(); + if (batchList.size() >= MAX_OBJ_IN_LIST) { + dbstore.insertContainerDatanodeData(batchList); + batchList.clear(); } - } catch (NumberFormatException e) { - System.err.println("Error parsing ID or BCSID as Long: " + line); + } catch (SQLException e) { + throw new SQLException(e.getMessage()); + } catch (Exception e) { + System.err.println( + "Error processing the batch for container: " + id + " at datanode: " + datanodeId); + e.printStackTrace(); } } else { System.err.println("Log line does not have all required fields: " + line); diff --git a/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties b/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties index 39bcefea1df6..2decc3f9a1a4 100644 --- a/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties +++ b/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties @@ -15,8 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # -CREATE_DATANODE_CONTAINER_LOG_TABLE=CREATE TABLE IF NOT EXISTS DatanodeContainerLogTable (datanode_id INTEGER NOT NULL, container_id INTEGER NOT NULL, timestamp TEXT NOT NULL, container_state TEXT NOT NULL, bcsid INTEGER NOT NULL, error_message TEXT NOT NULL, log_level TEXT NOT NULL, index_value INTEGER NOT NULL); -CREATE_CONTAINER_LOG_TABLE=CREATE TABLE IF NOT EXISTS ContainerLogTable (datanode_id INTEGER NOT NULL, container_id INTEGER NOT NULL, latest_state TEXT NOT NULL, latest_bcsid INTEGER NOT NULL, PRIMARY KEY (datanode_id, container_id)); +CREATE_DATANODE_CONTAINER_LOG_TABLE=CREATE TABLE IF NOT EXISTS DatanodeContainerLogTable (datanode_id INTEGER NOT NULL, container_id INTEGER NOT NULL, timestamp TEXT NOT NULL, container_state TEXT, bcsid INTEGER, error_message TEXT, log_level TEXT NOT NULL, index_value INTEGER); +CREATE_CONTAINER_LOG_TABLE=CREATE TABLE IF NOT EXISTS ContainerLogTable (datanode_id INTEGER NOT NULL, container_id INTEGER NOT NULL, latest_state TEXT, latest_bcsid INTEGER, PRIMARY KEY (datanode_id, container_id)); CREATE_DATANODE_CONTAINER_INDEX=CREATE INDEX IF NOT EXISTS idx_datanode_container ON DatanodeContainerLogTable (datanode_id, container_id, timestamp); INSERT_DATANODE_CONTAINER_LOG=INSERT INTO DatanodeContainerLogTable (datanode_id, container_id, timestamp, container_state, bcsid, error_message, log_level, index_value) VALUES (?, ?, ?, ?, ?, ?, ?, ?); INSERT_CONTAINER_LOG=INSERT OR REPLACE INTO ContainerLogTable (datanode_id, container_id, latest_state, latest_bcsid) VALUES (?, ?, ?, ?); From 79dae79c86c86744da31f3c570418950df57895c Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Fri, 11 Apr 2025 09:53:28 +0530 Subject: [PATCH 07/13] Removed Unwanted chnages --- .../ozone/containerlog/parser/ContainerDatanodeDatabase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java index 7cf0d5864d22..7cdfc1739852 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java @@ -170,9 +170,9 @@ public void insertLatestContainerLogData() throws SQLException { PreparedStatement selectStmt = connection.prepareStatement(selectSQL); ResultSet resultSet = selectStmt.executeQuery(); PreparedStatement insertStmt = connection.prepareStatement(insertSQL)) { - + int count = 0; - + while (resultSet.next()) { long datanodeId = resultSet.getLong("datanode_id"); long containerId = resultSet.getLong("container_id"); From 65e38eb16faa177b3d6e2ed5b8f2e5a2c083198f Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Fri, 11 Apr 2025 11:28:56 +0530 Subject: [PATCH 08/13] Added javadoc --- .../parser/ContainerDatanodeDatabase.java | 14 ++++++++++++- .../parser/ContainerLogFileParser.java | 21 ++++++++++++++++++- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java index 7cdfc1739852..d41298b602dd 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java @@ -35,7 +35,8 @@ /** - * Datanode container Database. + * Handles creation and interaction with the database. + * Provides methods for table creation, log data insertion, and index setup. */ public class ContainerDatanodeDatabase { @@ -116,6 +117,12 @@ private void createContainerLogTable() throws SQLException { } } + /** + * Inserts a list of container log entries into the DatanodeContainerLogTable. + * + * @param transitionList List of container log entries to insert into the table. + */ + public synchronized void insertContainerDatanodeData(List transitionList) throws SQLException { String insertSQL = queries.get("INSERT_DATANODE_CONTAINER_LOG"); @@ -161,6 +168,11 @@ private void createDatanodeContainerIndex(Statement stmt) throws SQLException { stmt.execute(createIndexSQL); } + /** + * Extracts the latest container log data from the DatanodeContainerLogTable + * and inserts it into ContainerLogTable. + */ + public void insertLatestContainerLogData() throws SQLException { createContainerLogTable(); String selectSQL = queries.get("SELECT_LATEST_CONTAINER_LOG"); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java index 89f5e18ad3e9..2408f36e4717 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java @@ -33,7 +33,8 @@ import java.util.stream.Stream; /** - * Container log file Parsing logic. + * Parses container log files and stores container details into a database. + * Uses multithreading to process multiple log files concurrently. */ public class ContainerLogFileParser { @@ -50,6 +51,15 @@ public class ContainerLogFileParser { private static final String KEY_STATE = "State"; private static final String KEY_INDEX = "Index"; + /** + * Scans the specified log directory, processes each file in a separate thread. + * Expects each log filename to follow the format: dn-container-datanodeId.log + * + * @param logDirectoryPath Path to the directory containing container log files. + * @param dbstore Database object used to persist parsed container data. + * @param threadCount Number of threads to use for parallel processing. + */ + public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase dbstore, int threadCount) throws SQLException { try (Stream paths = Files.walk(Paths.get(logDirectoryPath))) { @@ -108,6 +118,15 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase } } + /** + * Processes a single container log file and extracts container details. + * Parses, batches, and writes valid container log entries into the database. + * + * @param logFilePath Path to the log file. + * @param dbstore Database object used to persist parsed container data. + * @param datanodeId Datanode ID derived from the log filename. + */ + private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, long datanodeId) throws SQLException { List batchList = new ArrayList<>(5100); From acaa5f0c72acdc668df30474752a0b11318b3185 Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Fri, 11 Apr 2025 14:40:21 +0530 Subject: [PATCH 09/13] Updated datanode ID parsing logic --- .../parser/ContainerDatanodeDatabase.java | 6 ++-- .../parser/ContainerLogFileParser.java | 30 +++++++++---------- .../parser/DatanodeContainerInfo.java | 10 +++---- .../container-log-db-queries.properties | 4 +-- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java index d41298b602dd..d65bbcae7bed 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java @@ -133,7 +133,7 @@ public synchronized void insertContainerDatanodeData(List int count = 0; for (DatanodeContainerInfo info : transitionList) { - preparedStatement.setLong(1, info.getDatanodeId()); + preparedStatement.setString(1, info.getDatanodeId()); preparedStatement.setLong(2, info.getContainerId()); preparedStatement.setString(3, info.getTimestamp()); preparedStatement.setString(4, info.getState()); @@ -186,12 +186,12 @@ public void insertLatestContainerLogData() throws SQLException { int count = 0; while (resultSet.next()) { - long datanodeId = resultSet.getLong("datanode_id"); + String datanodeId = resultSet.getString("datanode_id"); long containerId = resultSet.getLong("container_id"); String containerState = resultSet.getString("container_state"); long bcsid = resultSet.getLong("bcsid"); try { - insertStmt.setLong(1, datanodeId); + insertStmt.setString(1, datanodeId); insertStmt.setLong(2, containerId); insertStmt.setString(3, containerState); insertStmt.setLong(4, bcsid); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java index 2408f36e4717..b5e56be84f62 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java @@ -42,8 +42,7 @@ public class ContainerLogFileParser { private ExecutorService executorService; private static final int MAX_OBJ_IN_LIST = 5000; - private static final String FILENAME_PARTS_REGEX = "-"; - private static final String DATANODE_ID_REGEX = "\\."; + private static final String LOG_FILE_MARKER = ".log."; private static final String LOG_LINE_SPLIT_REGEX = " \\| "; private static final String KEY_VALUE_SPLIT_REGEX = "="; private static final String KEY_ID = "ID"; @@ -53,7 +52,7 @@ public class ContainerLogFileParser { /** * Scans the specified log directory, processes each file in a separate thread. - * Expects each log filename to follow the format: dn-container-datanodeId.log + * Expects each log filename to follow the format: dn-container-.log. * * @param logDirectoryPath Path to the directory containing container log files. * @param dbstore Database object used to persist parsed container data. @@ -72,20 +71,20 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase for (Path file : files) { Path fileNamePath = file.getFileName(); String fileName = (fileNamePath != null) ? fileNamePath.toString() : ""; - String[] parts = fileName.split(FILENAME_PARTS_REGEX); - - if (parts.length < 3) { - System.out.println("Filename format is incorrect (not enough parts): " + fileName); + + int pos = fileName.indexOf(LOG_FILE_MARKER); + if (pos == -1) { + System.out.println("Filename format is incorrect (missing .log.): " + fileName); continue; } - - String datanodeIdStr = parts[2]; - if (datanodeIdStr.contains(".log")) { - datanodeIdStr = datanodeIdStr.split(DATANODE_ID_REGEX)[0]; + + String datanodeId = fileName.substring(pos + 5); + + if (datanodeId.trim().isEmpty()) { + System.out.println("Filename format is incorrect, datanodeId is missing or empty: " + fileName); + continue; } - - long datanodeId = Long.parseLong(datanodeIdStr); - + executorService.submit(() -> { String threadName = Thread.currentThread().getName(); @@ -127,7 +126,8 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase * @param datanodeId Datanode ID derived from the log filename. */ - private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, long datanodeId) throws SQLException { + private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, String datanodeId) + throws SQLException { List batchList = new ArrayList<>(5100); try (BufferedReader reader = Files.newBufferedReader(Paths.get(logFilePath), StandardCharsets.UTF_8)) { diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java index 82e068e50106..a4134c0b3a8d 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java @@ -24,7 +24,7 @@ public final class DatanodeContainerInfo { private final long containerId; - private final long datanodeId; + private final String datanodeId; private final String timestamp; private final String state; private final long bcsid; @@ -44,12 +44,12 @@ private DatanodeContainerInfo(Builder builder) { } /** - * Builder for DatanodeContainerInfo.. + * Builder for DatanodeContainerInfo. */ public static class Builder { private long containerId; - private long datanodeId; + private String datanodeId; private String timestamp; private String state; private long bcsid; @@ -62,7 +62,7 @@ public Builder setContainerId(long containerId) { return this; } - public Builder setDatanodeId(long datanodeId) { + public Builder setDatanodeId(String datanodeId) { this.datanodeId = datanodeId; return this; } @@ -106,7 +106,7 @@ public long getContainerId() { return containerId; } - public long getDatanodeId() { + public String getDatanodeId() { return datanodeId; } diff --git a/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties b/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties index 2decc3f9a1a4..01a405556708 100644 --- a/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties +++ b/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties @@ -15,8 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # -CREATE_DATANODE_CONTAINER_LOG_TABLE=CREATE TABLE IF NOT EXISTS DatanodeContainerLogTable (datanode_id INTEGER NOT NULL, container_id INTEGER NOT NULL, timestamp TEXT NOT NULL, container_state TEXT, bcsid INTEGER, error_message TEXT, log_level TEXT NOT NULL, index_value INTEGER); -CREATE_CONTAINER_LOG_TABLE=CREATE TABLE IF NOT EXISTS ContainerLogTable (datanode_id INTEGER NOT NULL, container_id INTEGER NOT NULL, latest_state TEXT, latest_bcsid INTEGER, PRIMARY KEY (datanode_id, container_id)); +CREATE_DATANODE_CONTAINER_LOG_TABLE=CREATE TABLE IF NOT EXISTS DatanodeContainerLogTable (datanode_id TEXT NOT NULL, container_id INTEGER NOT NULL, timestamp TEXT NOT NULL, container_state TEXT, bcsid INTEGER, error_message TEXT, log_level TEXT NOT NULL, index_value INTEGER); +CREATE_CONTAINER_LOG_TABLE=CREATE TABLE IF NOT EXISTS ContainerLogTable (datanode_id TEXT NOT NULL, container_id INTEGER NOT NULL, latest_state TEXT, latest_bcsid INTEGER, PRIMARY KEY (datanode_id, container_id)); CREATE_DATANODE_CONTAINER_INDEX=CREATE INDEX IF NOT EXISTS idx_datanode_container ON DatanodeContainerLogTable (datanode_id, container_id, timestamp); INSERT_DATANODE_CONTAINER_LOG=INSERT INTO DatanodeContainerLogTable (datanode_id, container_id, timestamp, container_state, bcsid, error_message, log_level, index_value) VALUES (?, ?, ?, ?, ?, ?, ?, ?); INSERT_CONTAINER_LOG=INSERT OR REPLACE INTO ContainerLogTable (datanode_id, container_id, latest_state, latest_bcsid) VALUES (?, ?, ?, ?); From 51f56a662bbc2f61868625fcaed1ac27c7657e32 Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Fri, 11 Apr 2025 15:13:20 +0530 Subject: [PATCH 10/13] fixed pmd issue --- .../ozone/containerlog/parser/ContainerLogFileParser.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java index b5e56be84f62..ce1a3cfd046a 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java @@ -80,7 +80,7 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase String datanodeId = fileName.substring(pos + 5); - if (datanodeId.trim().isEmpty()) { + if (datanodeId.isEmpty()) { System.out.println("Filename format is incorrect, datanodeId is missing or empty: " + fileName); continue; } From 74f745fdd51c5dc90f750ff740384e17dc992227 Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Fri, 11 Apr 2025 16:06:35 +0530 Subject: [PATCH 11/13] Updated sql exception handling --- .../containerlog/parser/ContainerLogFileParser.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java index ce1a3cfd046a..59fbb3155bf3 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java @@ -29,6 +29,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -49,7 +50,8 @@ public class ContainerLogFileParser { private static final String KEY_BCSID = "BCSID"; private static final String KEY_STATE = "State"; private static final String KEY_INDEX = "Index"; - + AtomicBoolean hasErrorOccurred = new AtomicBoolean(false); + /** * Scans the specified log directory, processes each file in a separate thread. * Expects each log filename to follow the format: dn-container-.log. @@ -94,6 +96,7 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase } catch (Exception e) { System.err.println("Thread " + threadName + " is stopping to process the file: " + file.toString() + " due to SQLException: " + e.getMessage()); + hasErrorOccurred.set(true); executorService.shutdown(); } finally { try { @@ -109,6 +112,10 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase latch.await(); executorService.shutdown(); + + if (hasErrorOccurred.get()) { + throw new SQLException("Log file processing failed."); + } } catch (IOException | InterruptedException e) { e.printStackTrace(); From 296a776108fefd8dc3aedf16eacbf5a7a7e75f44 Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Fri, 11 Apr 2025 16:30:07 +0530 Subject: [PATCH 12/13] Fixed checkstyle --- .../ozone/containerlog/parser/ContainerLogFileParser.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java index 59fbb3155bf3..eeec97c8f9aa 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java @@ -50,7 +50,7 @@ public class ContainerLogFileParser { private static final String KEY_BCSID = "BCSID"; private static final String KEY_STATE = "State"; private static final String KEY_INDEX = "Index"; - AtomicBoolean hasErrorOccurred = new AtomicBoolean(false); + private final AtomicBoolean hasErrorOccurred = new AtomicBoolean(false); /** * Scans the specified log directory, processes each file in a separate thread. From dbf51abbdcd00ceeb5d7dd398f2e22ed3cafc30e Mon Sep 17 00:00:00 2001 From: Sreeja Chintalapati Date: Sat, 12 Apr 2025 11:54:03 +0530 Subject: [PATCH 13/13] Fixed threadCount validation, path checking and removed premature executor shutdown --- .../parser/ContainerDatanodeDatabase.java | 12 +++++++++--- .../parser/ContainerLogFileParser.java | 9 ++++----- .../debug/container/ContainerLogParser.java | 16 ++++++++++++++++ 3 files changed, 29 insertions(+), 8 deletions(-) diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java index d65bbcae7bed..de3ab8439979 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java @@ -127,14 +127,20 @@ public synchronized void insertContainerDatanodeData(List String insertSQL = queries.get("INSERT_DATANODE_CONTAINER_LOG"); + long containerId = 0; + String datanodeId = null; + try (Connection connection = getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(insertSQL)) { int count = 0; for (DatanodeContainerInfo info : transitionList) { - preparedStatement.setString(1, info.getDatanodeId()); - preparedStatement.setLong(2, info.getContainerId()); + datanodeId = info.getDatanodeId(); + containerId = info.getContainerId(); + + preparedStatement.setString(1, datanodeId); + preparedStatement.setLong(2, containerId); preparedStatement.setString(3, info.getTimestamp()); preparedStatement.setString(4, info.getState()); preparedStatement.setLong(5, info.getBcsid()); @@ -155,7 +161,7 @@ public synchronized void insertContainerDatanodeData(List preparedStatement.executeBatch(); } } catch (SQLException e) { - LOG.error("Failed to insert container log", e); + LOG.error("Failed to insert container log for container {} on datanode {}", containerId, datanodeId, e); throw e; } catch (Exception e) { LOG.error(e.getMessage()); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java index eeec97c8f9aa..bafce798e5a7 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerLogFileParser.java @@ -97,7 +97,6 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase System.err.println("Thread " + threadName + " is stopping to process the file: " + file.toString() + " due to SQLException: " + e.getMessage()); hasErrorOccurred.set(true); - executorService.shutdown(); } finally { try { latch.countDown(); @@ -135,7 +134,7 @@ public void processLogEntries(String logDirectoryPath, ContainerDatanodeDatabase private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, String datanodeId) throws SQLException { - List batchList = new ArrayList<>(5100); + List batchList = new ArrayList<>(MAX_OBJ_IN_LIST + 100); try (BufferedReader reader = Files.newBufferedReader(Paths.get(logFilePath), StandardCharsets.UTF_8)) { String line; @@ -144,6 +143,7 @@ private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, String timestamp = parts[0].trim(); String logLevel = parts[1].trim(); String id = null, index = null; + String errorMessage = "No error"; DatanodeContainerInfo.Builder builder = new DatanodeContainerInfo.Builder() .setDatanodeId(datanodeId) @@ -180,12 +180,11 @@ private void processFile(String logFilePath, ContainerDatanodeDatabase dbstore, } } else { if (!part.isEmpty()) { - builder.setErrorMessage(part.replace("|", "").trim()); - } else { - builder.setErrorMessage("No error"); + errorMessage = part.replace("|", "").trim(); } } } + builder.setErrorMessage(errorMessage); if (index == null || !index.equals("0")) { continue; //Currently only ratis replicated containers are considered. diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java index 3e11fd630507..c9ef86d5dd0f 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/container/ContainerLogParser.java @@ -17,6 +17,9 @@ package org.apache.hadoop.ozone.debug.container; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.sql.SQLException; import java.util.concurrent.Callable; import org.apache.hadoop.ozone.containerlog.parser.ContainerDatanodeDatabase; @@ -32,6 +35,8 @@ description = "parse the container logs" ) public class ContainerLogParser implements Callable { + private static final int DEFAULT_THREAD_COUNT = 10; + @CommandLine.Option(names = {"--parse"}, description = "path to the dir which contains log files") private String path; @@ -46,7 +51,18 @@ public class ContainerLogParser implements Callable { @Override public Void call() throws Exception { + if (threadCount <= 0) { + System.out.println("Invalid threadCount value provided (" + threadCount + "). Using default value: " + + DEFAULT_THREAD_COUNT); + threadCount = DEFAULT_THREAD_COUNT; + } + if (path != null) { + Path logPath = Paths.get(path); + if (!Files.exists(logPath) || !Files.isDirectory(logPath)) { + System.err.println("Invalid path provided: " + path); + return null; + } ContainerDatanodeDatabase cdd = new ContainerDatanodeDatabase(); ContainerLogFileParser parser = new ContainerLogFileParser();