diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java new file mode 100644 index 000000000000..d00d31c66d15 --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/ContainerDatanodeDatabase.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.containerlog.parser; + +import java.io.FileNotFoundException; +import java.io.InputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.sqlite.SQLiteConfig; + + +/** + * Datanode container Database. + */ + +public class ContainerDatanodeDatabase { + + private static Map queries; + public static final String CONTAINER_KEY_DELIMITER = "#"; + + static { + loadProperties(); + } + + private static final Logger LOG = + LoggerFactory.getLogger(ContainerDatanodeDatabase.class); + + private static void loadProperties() { + Properties props = new Properties(); + try (InputStream inputStream = ContainerDatanodeDatabase.class.getClassLoader() + .getResourceAsStream(DBConsts.PROPS_FILE)) { + + if (inputStream != null) { + props.load(inputStream); + queries = props.entrySet().stream() + .collect(Collectors.toMap( + e -> e.getKey().toString(), + e -> e.getValue().toString() + )); + } else { + throw new FileNotFoundException("Property file '" + DBConsts.PROPS_FILE + "' not found."); + } + } catch (Exception e) { + LOG.error(e.getMessage()); + } + } + + private static Connection getConnection() throws Exception { + Class.forName(DBConsts.DRIVER); + + SQLiteConfig config = new SQLiteConfig(); + + config.setJournalMode(SQLiteConfig.JournalMode.OFF); + config.setCacheSize(DBConsts.CACHE_SIZE); + config.setLockingMode(SQLiteConfig.LockingMode.EXCLUSIVE); + config.setSynchronous(SQLiteConfig.SynchronousMode.OFF); + config.setTempStore(SQLiteConfig.TempStore.MEMORY); + + return DriverManager.getConnection(DBConsts.CONNECTION_PREFIX + DBConsts.DATABASE_NAME, config.toProperties()); + } + + public void createDatanodeContainerLogTable() throws SQLException { + String createTableSQL = queries.get("CREATE_DATANODE_CONTAINER_LOG_TABLE"); + try (Connection connection = getConnection(); + Statement dropStmt = connection.createStatement(); + Statement createStmt = connection.createStatement()) { + dropTable(DBConsts.DATANODE_CONTAINER_LOG_TABLE_NAME, dropStmt); + createStmt.execute(createTableSQL); + createDatanodeContainerIndex(createStmt); + } catch (SQLException e) { + LOG.error("Error while creating the table: {}", e.getMessage()); + throw e; + } catch (Exception e) { + LOG.error(e.getMessage()); + throw new RuntimeException(e); + } + } + + private void createContainerLogTable() throws SQLException { + String createTableSQL = queries.get("CREATE_CONTAINER_LOG_TABLE"); + try (Connection connection = getConnection(); + Statement dropStmt = connection.createStatement(); + Statement createStmt = connection.createStatement()) { + dropTable(DBConsts.CONTAINER_LOG_TABLE_NAME, dropStmt); + createStmt.execute(createTableSQL); + } catch (SQLException e) { + LOG.error("Error while creating the table: {}", e.getMessage()); + throw e; + } catch (Exception e) { + LOG.error(e.getMessage()); + throw new RuntimeException(e); + } + } + + public void insertContainerDatanodeData(String key, List transitionList) throws SQLException { + String[] parts = key.split(CONTAINER_KEY_DELIMITER); + if (parts.length != 2) { + System.err.println("Invalid key format: " + key); + return; + } + + long containerId = Long.parseLong(parts[0]); + long datanodeId = Long.parseLong(parts[1]); + + String insertSQL = queries.get("INSERT_DATANODE_CONTAINER_LOG"); + + try (Connection connection = getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(insertSQL)) { + + int count = 0; + + for (DatanodeContainerInfo info : transitionList) { + preparedStatement.setLong(1, datanodeId); + preparedStatement.setLong(2, containerId); + preparedStatement.setString(3, info.getTimestamp()); + preparedStatement.setString(4, info.getState()); + preparedStatement.setLong(5, info.getBcsid()); + preparedStatement.setString(6, info.getErrorMessage()); + preparedStatement.setString(7, info.getLogLevel()); + preparedStatement.setInt(8, info.getIndexValue()); + preparedStatement.addBatch(); + + count++; + + if (count % DBConsts.BATCH_SIZE == 0) { + preparedStatement.executeBatch(); + count = 0; + } + } + + if (count != 0) { + preparedStatement.executeBatch(); + } + } catch (SQLException e) { + LOG.error("Failed to insert container log for container {} on datanode {}", containerId, datanodeId, e); + throw e; + } catch (Exception e) { + LOG.error(e.getMessage()); + throw new RuntimeException(e); + } + } + + private void createDatanodeContainerIndex(Statement stmt) throws SQLException { + String createIndexSQL = queries.get("CREATE_DATANODE_CONTAINER_INDEX"); + stmt.execute(createIndexSQL); + } + + public void insertLatestContainerLogData() throws SQLException { + createContainerLogTable(); + String selectSQL = queries.get("SELECT_LATEST_CONTAINER_LOG"); + String insertSQL = queries.get("INSERT_CONTAINER_LOG"); + + try (Connection connection = getConnection(); + PreparedStatement selectStmt = connection.prepareStatement(selectSQL); + ResultSet resultSet = selectStmt.executeQuery(); + PreparedStatement insertStmt = connection.prepareStatement(insertSQL)) { + + int count = 0; + + while (resultSet.next()) { + long datanodeId = resultSet.getLong("datanode_id"); + long containerId = resultSet.getLong("container_id"); + String containerState = resultSet.getString("container_state"); + long bcsid = resultSet.getLong("bcsid"); + try { + insertStmt.setLong(1, datanodeId); + insertStmt.setLong(2, containerId); + insertStmt.setString(3, containerState); + insertStmt.setLong(4, bcsid); + insertStmt.addBatch(); + + count++; + + if (count % DBConsts.BATCH_SIZE == 0) { + insertStmt.executeBatch(); + count = 0; + } + } catch (SQLException e) { + LOG.error("Failed to insert container log entry for container {} on datanode {} ", + containerId, datanodeId, e); + throw e; + } + } + + if (count != 0) { + insertStmt.executeBatch(); + } + } catch (SQLException e) { + LOG.error("Failed to insert container log entry: {}", e.getMessage()); + throw e; + } catch (Exception e) { + LOG.error(e.getMessage()); + throw new RuntimeException(e); + } + } + + private void dropTable(String tableName, Statement stmt) throws SQLException { + String dropTableSQL = queries.get("DROP_TABLE").replace("{table_name}", tableName); + stmt.executeUpdate(dropTableSQL); + } + +} + diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DBConsts.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DBConsts.java new file mode 100644 index 000000000000..b00f77a9efba --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DBConsts.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.containerlog.parser; + +/** + * Constants used for ContainerDatanodeDatabase. + */ +public final class DBConsts { + + private DBConsts() { + //Never constructed + } + + public static final String DRIVER = "org.sqlite.JDBC"; + public static final String CONNECTION_PREFIX = "jdbc:sqlite:"; + public static final String DATABASE_NAME = "container_datanode.db"; + public static final String PROPS_FILE = "container-log-db-queries.properties"; + public static final int CACHE_SIZE = 1000000; + public static final int BATCH_SIZE = 1000; + public static final String DATANODE_CONTAINER_LOG_TABLE_NAME = "DatanodeContainerLogTable"; + public static final String CONTAINER_LOG_TABLE_NAME = "ContainerLogTable"; + +} diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java new file mode 100644 index 000000000000..18a1825f5994 --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/DatanodeContainerInfo.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.containerlog.parser; + +/** + *Holds information about a container. + */ + +public class DatanodeContainerInfo { + + private String timestamp; + private String state; + private long bcsid; + private String errorMessage; + private String logLevel; + private int indexValue; + + public DatanodeContainerInfo() { + } + public DatanodeContainerInfo(String timestamp, String state, long bcsid, String errorMessage, + String logLevel, int indexValue) { + this.timestamp = timestamp; + this.state = state; + this.bcsid = bcsid; + this.errorMessage = errorMessage; + this.logLevel = logLevel; + this.indexValue = indexValue; + } + + public String getTimestamp() { + return timestamp; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + + public long getBcsid() { + return bcsid; + } + + public void setBcsid(long bcsid) { + this.bcsid = bcsid; + } + + public String getErrorMessage() { + return errorMessage; + } + + public void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + + public String getLogLevel() { + return logLevel; + } + + public void setLogLevel(String logLevel) { + this.logLevel = logLevel; + } + + public int getIndexValue() { + return indexValue; + } + + public void setIndexValue(int indexValue) { + this.indexValue = indexValue; + } + +} diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/package-info.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/package-info.java new file mode 100644 index 000000000000..9a22345a0627 --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/containerlog/parser/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Classes used for Ozone Container Log parser tool. + */ + +package org.apache.hadoop.ozone.containerlog.parser; diff --git a/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties b/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties new file mode 100644 index 000000000000..c47cb6fcaaf7 --- /dev/null +++ b/hadoop-ozone/tools/src/main/resources/container-log-db-queries.properties @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +CREATE_DATANODE_CONTAINER_LOG_TABLE=CREATE TABLE IF NOT EXISTS DatanodeContainerLogTable (datanode_id INTEGER NOT NULL, container_id INTEGER NOT NULL, timestamp TEXT NOT NULL, container_state TEXT NOT NULL, bcsid INTEGER NOT NULL, error_message TEXT NOT NUL, log_level TEXT NOT NULL, index_value INTEGER NOT NULL); +CREATE_CONTAINER_LOG_TABLE=CREATE TABLE IF NOT EXISTS ContainerLogTable (datanode_id INTEGER NOT NULL, container_id INTEGER NOT NULL, latest_state TEXT NOT NULL, latest_bcsid INTEGER NOT NULL, PRIMARY KEY (datanode_id, container_id)); +CREATE_DATANODE_CONTAINER_INDEX=CREATE INDEX IF NOT EXISTS idx_datanode_container ON DatanodeContainerLogTable (datanode_id, container_id, timestamp); +INSERT_DATANODE_CONTAINER_LOG=INSERT INTO DatanodeContainerLogTable (datanode_id, container_id, timestamp, container_state, bcsid, error_message, log_level, index_value) VALUES (?, ?, ?, ?, ?, ?, ?, ?); +INSERT_CONTAINER_LOG=INSERT OR REPLACE INTO ContainerLogTable (datanode_id, container_id, latest_state, latest_bcsid) VALUES (?, ?, ?, ?); +SELECT_LATEST_CONTAINER_LOG=SELECT a.datanode_id, a.container_id, a.container_state, a.bcsid, a.timestamp FROM DatanodeContainerLogTable AS a JOIN (SELECT datanode_id, container_id, MAX(timestamp) as timestamp FROM DatanodeContainerLogTable GROUP BY datanode_id, container_id) as b ON a.datanode_id = b.datanode_id AND a.container_id = b.container_id AND a.timestamp=b.timestamp; +DROP_TABLE=DROP TABLE IF EXISTS {table_name};