Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,31 @@
*/
package org.apache.hadoop.ozone.audit.parser.common;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.ozone.audit.parser.model.AuditEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.sql.*;
import static java.nio.charset.StandardCharsets.UTF_8;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.ozone.audit.parser.model.AuditEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Database helper for ozone audit parser tool.
Expand All @@ -43,24 +57,20 @@ private DatabaseHelper() {
private static Map<String, String> properties;

public static boolean setup(String dbName, String logs) {
//loadProperties();
if(createAuditTable(dbName)) {
if (createAuditTable(dbName)) {
return insertAudits(dbName, logs);
} else {
return false;
}
}

private static Connection getConnection(String dbName) {

Connection connection = null;
try{
Class.forName(ParserConsts.DRIVER);
connection = DriverManager.getConnection(
ParserConsts.CONNECTION_PREFIX + dbName);
} catch (ClassNotFoundException e) {
LOG.error(e.getMessage());
} catch (SQLException e) {
} catch (ClassNotFoundException | SQLException e) {
LOG.error(e.getMessage());
}
return connection;
Expand Down Expand Up @@ -90,50 +100,54 @@ private static void loadProperties() {
}

private static boolean createAuditTable(String dbName) {

try(Connection connection = getConnection(dbName);
Statement st = connection.createStatement()) {

st.executeUpdate(properties.get(ParserConsts.CREATE_AUDIT_TABLE));
try (Connection connection = getConnection(dbName)) {
if (connection != null) {
try (Statement st = connection.createStatement()) {
st.executeUpdate(properties.get(ParserConsts.CREATE_AUDIT_TABLE));
}
}
} catch (SQLException e) {
LOG.error(e.getMessage());
return false;
}
return true;
}

@SuppressFBWarnings("REC_CATCH_EXCEPTION")
private static boolean insertAudits(String dbName, String logs) {

try(Connection connection = getConnection(dbName);
PreparedStatement preparedStatement = connection.prepareStatement(
try (Connection connection = getConnection(dbName)) {
if (connection != null) {
try (PreparedStatement preparedStatement = connection.prepareStatement(
properties.get(ParserConsts.INSERT_AUDITS))) {

ArrayList<AuditEntry> auditEntries = parseAuditLogs(logs);
ArrayList<AuditEntry> auditEntries = parseAuditLogs(logs);

final int batchSize = 1000;
int count = 0;
final int batchSize = 1000;
int count = 0;

//Insert list to db
for(AuditEntry audit : auditEntries) {
preparedStatement.setString(1, audit.getTimestamp());
preparedStatement.setString(2, audit.getLevel());
preparedStatement.setString(3, audit.getLogger());
preparedStatement.setString(4, audit.getUser());
preparedStatement.setString(5, audit.getIp());
preparedStatement.setString(6, audit.getOp());
preparedStatement.setString(7, audit.getParams());
preparedStatement.setString(8, audit.getResult());
preparedStatement.setString(9, audit.getException());
//Insert list to db
for (AuditEntry audit : auditEntries) {
preparedStatement.setString(1, audit.getTimestamp());
preparedStatement.setString(2, audit.getLevel());
preparedStatement.setString(3, audit.getLogger());
preparedStatement.setString(4, audit.getUser());
preparedStatement.setString(5, audit.getIp());
preparedStatement.setString(6, audit.getOp());
preparedStatement.setString(7, audit.getParams());
preparedStatement.setString(8, audit.getResult());
preparedStatement.setString(9, audit.getException());

preparedStatement.addBatch();
preparedStatement.addBatch();

if(++count % batchSize == 0) {
preparedStatement.executeBatch();
if (++count % batchSize == 0) {
preparedStatement.executeBatch();
}
}
if (!auditEntries.isEmpty()) {
preparedStatement.executeBatch(); // insert remaining records
}
}
}
if(auditEntries.size() > 0) {
preparedStatement.executeBatch(); // insert remaining records
}
} catch (Exception e) {
LOG.error(e.getMessage());
return false;
Expand All @@ -142,27 +156,27 @@ private static boolean insertAudits(String dbName, String logs) {
}

private static ArrayList<AuditEntry> parseAuditLogs(String filePath)
throws Exception {
ArrayList<AuditEntry> listResult = new ArrayList<AuditEntry>();
try(FileInputStream fis = new FileInputStream(filePath);
InputStreamReader isr = new InputStreamReader(fis, "UTF-8");
throws IOException {
ArrayList<AuditEntry> listResult = new ArrayList<>();
try (FileInputStream fis = new FileInputStream(filePath);
InputStreamReader isr = new InputStreamReader(fis, UTF_8);
BufferedReader bReader = new BufferedReader(isr)) {
String currentLine = null;
String[] entry = null;
String currentLine;
String[] entry;
AuditEntry tempEntry = null;
String nextLine = null;
String nextLine;
currentLine = bReader.readLine();
nextLine = bReader.readLine();

while(true) {
if(tempEntry == null){
while (true) {
if (tempEntry == null){
tempEntry = new AuditEntry();
}

if(currentLine == null) {
if (currentLine == null) {
break;
} else {
if(!currentLine.matches(ParserConsts.DATE_REGEX)){
if (!currentLine.matches(ParserConsts.DATE_REGEX)){
tempEntry.appendException(currentLine);
} else {
entry = StringUtils.stripAll(currentLine.split("\\|"));
Expand All @@ -178,22 +192,18 @@ private static ArrayList<AuditEntry> parseAuditLogs(String filePath)
.setParams(ops[1])
.setResult(entry[6].substring(entry[6].indexOf('=') + 1))
.build();
if(entry.length == 8){
if (entry.length == 8){
tempEntry.setException(entry[7]);
}
}
if(nextLine == null || nextLine.matches(ParserConsts.DATE_REGEX)){
if (nextLine == null || nextLine.matches(ParserConsts.DATE_REGEX)){
listResult.add(tempEntry);
tempEntry = null;
}
currentLine = nextLine;
nextLine = bReader.readLine();
}
}
} catch (RuntimeException rx) {
throw rx;
} catch (Exception ex) {
throw ex;
}

return listResult;
Expand All @@ -213,27 +223,24 @@ public static String executeTemplate(String dbName, String template)
private static String executeStatement(String dbName, String sql)
throws SQLException {
StringBuilder result = new StringBuilder();
ResultSet rs = null;
Statement st = null;
ResultSetMetaData rsm = null;
try(Connection connection = getConnection(dbName)) {
//loadProperties();

if(connection != null){
st = connection.createStatement();
rs = st.executeQuery(sql);
if(rs != null) {
rsm = rs.getMetaData();
int cols = rsm.getColumnCount();
while(rs.next()){
for(int index =1; index<=cols; index++){
result.append(rs.getObject(index) + "\t");
ResultSetMetaData rsm;
try (Connection connection = getConnection(dbName)) {
if (connection != null){
try (Statement st = connection.createStatement()) {
try (ResultSet rs = st.executeQuery(sql)) {
if (rs != null) {
rsm = rs.getMetaData();
int cols = rsm.getColumnCount();
while (rs.next()){
for (int index = 1; index <= cols; index++){
result.append(rs.getObject(index));
result.append("\t");
}
result.append("\n");
}
}
result.append("\n");
}
}
st.close();
rs.close();
}
}
return result.toString();
Expand Down