diff --git a/jdbc/pom.xml b/jdbc/pom.xml index c4c35388ca0..56467b4d074 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -46,7 +46,24 @@ - + + org.json + json + 20210307 + + + org.codehaus.jettison + jettison + 1.4.1 + + + com.google.code.gson + gson + 2.8.9 + + + + org.postgresql postgresql ${postgresql.version} diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index f5302dc48ae..930e42e0a27 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -37,6 +37,8 @@ import org.apache.zeppelin.jdbc.hive.HiveUtils; import org.apache.zeppelin.tabledata.TableDataUtils; import org.apache.zeppelin.util.PropertiesUtil; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +77,12 @@ import org.apache.zeppelin.user.UserCredentials; import org.apache.zeppelin.user.UsernamePassword; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; + /** * JDBC interpreter for Zeppelin. This interpreter can also be used for accessing HAWQ, * GreenplumDB, MariaDB, MySQL, Postgres and Redshift. @@ -143,6 +151,7 @@ public class JDBCInterpreter extends KerberosInterpreter { "zeppelin.jdbc.concurrent.max_connection"; private static final String DBCP_STRING = "jdbc:apache:commons:dbcp:"; private static final String MAX_ROWS_KEY = "zeppelin.jdbc.maxRows"; + private static final String FAIL_FAST_VALIDATE_URL = "http://spark-event-listener.prd.meesho.int/api/validate"; private static final Set PRESTO_PROPERTIES = new HashSet<>(Arrays.asList( "user", "password", @@ -350,6 +359,51 @@ public void close() { } } + public static ValidationResponse sendValidationRequest(ValidationRequest request) throws Exception { + HttpURLConnection connection = createConnection(); + sendRequest(connection, request); + return readResponse(connection); + } + + private static HttpURLConnection createConnection() throws Exception { + URL url = new URL(FAIL_FAST_VALIDATE_URL); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/json"); + connection.setDoOutput(true); // Enable sending request body + return connection; + } + + private static void sendRequest(HttpURLConnection connection, ValidationRequest request) throws Exception { + try (OutputStream os = connection.getOutputStream()) { + String jsonRequest = request.toJson(); + byte[] input = jsonRequest.getBytes("utf-8"); + os.write(input, 0, input.length); + } + } + + private static ValidationResponse readResponse(HttpURLConnection connection) throws Exception { + int statusCode = connection.getResponseCode(); + BufferedReader reader; + + if (statusCode == HttpURLConnection.HTTP_OK) { + reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8")); + } else { + reader = new BufferedReader(new InputStreamReader(connection.getErrorStream(), "utf-8")); + } + + StringBuilder responseBuilder = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + responseBuilder.append(line.trim()); + } + + reader.close(); + connection.disconnect(); + + return ValidationResponse.fromJson(responseBuilder.toString()); + } + /* Get user of this sql. * 1. If shiro is enabled, use the login user * 2. Otherwise try to get it from interpreter setting, e.g. default.user @@ -785,6 +839,12 @@ private InterpreterResult executeSql(String dbPrefix, String sql, LOGGER.info("Execute sql: " + sqlToExecute); statement = connection.createStatement(); + String interpreterName = getInterpreterGroup().getId(); + + if (interpreterName != null && interpreterName.startsWith("spark_rca_")) { + statement.setQueryTimeout(10800); // 10800 seconds = 3 hours + } + // fetch n+1 rows in order to indicate there's more rows available (for large selects) statement.setFetchSize(context.getIntLocalProperty("limit", getMaxResult())); statement.setMaxRows(context.getIntLocalProperty("limit", maxRows)); @@ -809,6 +869,65 @@ private InterpreterResult executeSql(String dbPrefix, String sql, HiveUtils.startHiveMonitorThread(statement, context, Boolean.parseBoolean(getProperty("hive.log.display", "true")), this); } + + String userName = getUser(context); + String sqlToValidate = sqlToExecute + .replace("\n", " ") + .replace("\r", " ") + .replace("\t", " "); + ValidationRequest request = new ValidationRequest(sqlToValidate, userName); + try { + ValidationResponse response = sendValidationRequest(request); + if (response.isPreSubmitFail()) { + String outputMessage = response.getMessage(); + StringBuilder finalOutput = new StringBuilder(); + + if (response.isFailFast()) { + context.out.write("Query Error: Partition Filters Missing\n" + + "Your query failed because some tables are missing partition filters. To avoid this, please ensure partition filters are applied to improve performance.\n"); + JSONObject jsonObject = new JSONObject(outputMessage); + finalOutput.append("The following table(s) are missing partition filters:\n"); + + JSONArray tableNames = jsonObject.names(); + if (tableNames != null) { + for (int i = 0; i < tableNames.length(); i++) { + String table = tableNames.getString(i); + JSONArray partitions = jsonObject.getJSONArray(table); + finalOutput.append("Table: ").append(table).append(", Partition filter's: "); + + for (int j = 0; j < partitions.length(); j++) { + finalOutput.append(partitions.getString(j)); + if (j < partitions.length() - 1) { + finalOutput.append(", "); + } + } + finalOutput.append("\n"); + } + } + } else if (response.isFailedByDeprecatedTable()) { + context.out.write("Query Error: Restricted Table Used\n"); + JSONObject jsonObject = new JSONObject(outputMessage); + finalOutput.append("It seems you're trying to use a restricted table:\n"); + + JSONArray tableNames = jsonObject.names(); + if (tableNames != null) { + for (int i = 0; i < tableNames.length(); i++) { + String table = tableNames.getString(i); + finalOutput.append("Use: ").append(jsonObject.getString(table)).append(" in place of ").append(table).append("\n"); + } + } + } + context.getLocalProperties().put(CANCEL_REASON, finalOutput.toString()); + cancel(context); + return new InterpreterResult(Code.ERROR); + } + } catch (Exception e) { + String error = "Error occurred while sending request " + e.getMessage(); + String mess = e.getLocalizedMessage(); + context.out.write(error); + context.out.write(mess); + } + boolean isResultSetAvailable = statement.execute(sqlToExecute); getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful(dbPrefix); if (isResultSetAvailable) { @@ -939,8 +1058,14 @@ public InterpreterResult internalInterpret(String cmd, InterpreterContext contex LOGGER.debug("Run SQL command '{}'", cmd); String dbPrefix = getDBPrefix(context); LOGGER.debug("DBPrefix: {}, SQL command: '{}'", dbPrefix, cmd); + String interpreterName = getInterpreterGroup().getId(); + if (interpreterName!=null && interpreterName.startsWith("spark_rca_")) { + cmd = "set STATEMENT_TIMEOUT=10800;\n"+cmd; + } + LOGGER.debug("InterpreterName: {}, SQL command: '{}'", interpreterName, cmd); + String finalCmd = cmd; if (!isRefreshMode(context)) { - return executeSql(dbPrefix, cmd, context); + return executeSql(dbPrefix, finalCmd, context); } else { int refreshInterval = Integer.parseInt(context.getLocalProperties().get("refreshInterval")); paragraphCancelMap.put(context.getParagraphId(), false); @@ -951,7 +1076,7 @@ public InterpreterResult internalInterpret(String cmd, InterpreterContext contex refreshExecutor.scheduleAtFixedRate(() -> { context.out.clear(false); try { - InterpreterResult result = executeSql(dbPrefix, cmd, context); + InterpreterResult result = executeSql(dbPrefix, finalCmd, context); context.out.flush(); interpreterResultRef.set(result); if (result.code() != Code.SUCCESS) { diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java new file mode 100644 index 00000000000..71d8ad17418 --- /dev/null +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java @@ -0,0 +1,16 @@ +package org.apache.zeppelin.jdbc; + +public class ValidationRequest { + private String queryText; + private String user; + + public ValidationRequest(String queryText, String user) { + this.queryText = queryText; + this.user = user; + } + + public String toJson() { + return "{\"query_text\":\"" + queryText + "\",\"user\":\"" + user + "\"}"; + } +} + diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java new file mode 100644 index 00000000000..05716cc2edb --- /dev/null +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -0,0 +1,75 @@ +package org.apache.zeppelin.jdbc; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +public class ValidationResponse { + private boolean preSubmitFail; + private boolean failFast; + private boolean failedByDeprecatedTable; + private String message; + + // Getters and Setters + public boolean isPreSubmitFail() { + return preSubmitFail; + } + + public void setPreSubmitFail(boolean preSubmitFail) { + this.preSubmitFail = preSubmitFail; + } + + public boolean isFailFast() { + return failFast; + } + + public void setFailFast(boolean failFast) { + this.failFast = failFast; + } + + public boolean isFailedByDeprecatedTable() { + return failedByDeprecatedTable; + } + + public void setFailedByDeprecatedTable(boolean failedByDeprecatedTable) { + this.failedByDeprecatedTable = failedByDeprecatedTable; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public static ValidationResponse fromJson(String jsonResponse) { + Gson gson = new Gson(); + ValidationResponse response = new ValidationResponse(); + + JsonElement jsonElement = gson.fromJson(jsonResponse, JsonElement.class); + + if (jsonElement.isJsonObject()) { + JsonObject jsonObject = jsonElement.getAsJsonObject(); + + if (jsonObject.has("pre_submit_fail") && !jsonObject.get("pre_submit_fail").isJsonNull()) { + response.setPreSubmitFail(jsonObject.get("pre_submit_fail").getAsBoolean()); + } + if (jsonObject.has("fail_fast") && !jsonObject.get("fail_fast").isJsonNull()) { + response.setFailFast(jsonObject.get("fail_fast").getAsBoolean()); + } + if (jsonObject.has("failed_by_deprecated_table") && !jsonObject.get("failed_by_deprecated_table").isJsonNull()) { + response.setFailedByDeprecatedTable(jsonObject.get("failed_by_deprecated_table").getAsBoolean()); + } + if (jsonObject.has("message") && !jsonObject.get("message").isJsonNull()) { + response.setMessage(jsonObject.get("message").getAsString()); + } + } else { + response.setPreSubmitFail(false); + response.setFailFast(false); + response.setFailedByDeprecatedTable(false); + response.setMessage(""); // Default message + } + return response; + } +}