From a441fa19922f54a468d6f6557e25c36eb70a72f1 Mon Sep 17 00:00:00 2001 From: RaghwendraSingh Date: Sun, 8 Sep 2024 17:58:15 +0530 Subject: [PATCH 01/20] added interceptor for Fail Fast in jdbc interpreter DI-2332 --- .../java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index f5302dc48ae..a65fe2fd0d0 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -809,6 +809,13 @@ private InterpreterResult executeSql(String dbPrefix, String sql, HiveUtils.startHiveMonitorThread(statement, context, Boolean.parseBoolean(getProperty("hive.log.display", "true")), this); } + // TODO: add async query optimizer checks + // adding test code + if (sqlToExecute.contains("fail_fast_kill")) { + cancel(context.getLocalProperties().put(CANCEL_REASON, "Fail Fast custom error")); + } + + boolean isResultSetAvailable = statement.execute(sqlToExecute); getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful(dbPrefix); if (isResultSetAvailable) { From abf00ed31799a58b2244b91088496c8bdfbdb07f Mon Sep 17 00:00:00 2001 From: RaghwendraSingh Date: Sun, 8 Sep 2024 18:09:41 +0530 Subject: [PATCH 02/20] fixing compilation error DI-2332 --- .../main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index a65fe2fd0d0..29fe9583331 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -812,7 +812,8 @@ private InterpreterResult executeSql(String dbPrefix, String sql, // TODO: add async query optimizer checks // adding test code if (sqlToExecute.contains("fail_fast_kill")) { - cancel(context.getLocalProperties().put(CANCEL_REASON, "Fail Fast custom error")); + context.getLocalProperties().put(CANCEL_REASON, "Fail Fast custom error") + cancel(context); } From fb049764af3a9a9215c15d15cf0d8917cf416f71 Mon Sep 17 00:00:00 2001 From: RaghwendraSingh Date: Sun, 8 Sep 2024 18:13:59 +0530 Subject: [PATCH 03/20] fixing compilation error added ; DI-2332 --- .../src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 29fe9583331..eeadf17f453 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -812,7 +812,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, // TODO: add async query optimizer checks // adding test code if (sqlToExecute.contains("fail_fast_kill")) { - context.getLocalProperties().put(CANCEL_REASON, "Fail Fast custom error") + context.getLocalProperties().put(CANCEL_REASON, "Fail Fast custom error"); cancel(context); } From 00bd7998d751bfa493649b233e240459634f8b7e Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Wed, 11 Sep 2024 19:11:49 +0530 Subject: [PATCH 04/20] added pre fail check --- jdbc/pom.xml | 6 + .../apache/zeppelin/jdbc/JDBCInterpreter.java | 110 +++++++++++++++++- .../zeppelin/jdbc/ValidationRequest.java | 24 ++++ .../zeppelin/jdbc/ValidationResponse.java | 57 +++++++++ 4 files changed, 195 insertions(+), 2 deletions(-) create mode 100644 jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java create mode 100644 jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java diff --git a/jdbc/pom.xml b/jdbc/pom.xml index c4c35388ca0..0a9b9ccc4a1 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -46,6 +46,12 @@ + + org.json + json + 20210307 + + org.postgresql postgresql diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index eeadf17f453..d59136d8b23 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -37,6 +37,8 @@ import org.apache.zeppelin.jdbc.hive.HiveUtils; import org.apache.zeppelin.tabledata.TableDataUtils; import org.apache.zeppelin.util.PropertiesUtil; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +77,12 @@ import org.apache.zeppelin.user.UserCredentials; import org.apache.zeppelin.user.UsernamePassword; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; + /** * JDBC interpreter for Zeppelin. This interpreter can also be used for accessing HAWQ, * GreenplumDB, MariaDB, MySQL, Postgres and Redshift. @@ -143,6 +151,7 @@ public class JDBCInterpreter extends KerberosInterpreter { "zeppelin.jdbc.concurrent.max_connection"; private static final String DBCP_STRING = "jdbc:apache:commons:dbcp:"; private static final String MAX_ROWS_KEY = "zeppelin.jdbc.maxRows"; + private static final String FAIL_FAST_VALIDATE_URL = "http://localhost:8080/api/validate"; private static final Set PRESTO_PROPERTIES = new HashSet<>(Arrays.asList( "user", "password", @@ -350,6 +359,52 @@ public void close() { } } + public static ValidationResponse sendValidationRequest(ValidationRequest request) throws Exception { + HttpURLConnection connection = createConnection(); + sendRequest(connection, request); + return readResponse(connection); + } + + private static HttpURLConnection createConnection() throws Exception { + URL url = new URL(FAIL_FAST_VALIDATE_URL); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/json"); + connection.setDoOutput(true); // Enable sending request body + return connection; + } + + private static void sendRequest(HttpURLConnection connection, ValidationRequest request) throws Exception { + try (OutputStream os = connection.getOutputStream()) { + // Manually convert the request object to a JSON string + String jsonRequest = request.toJson(); + byte[] input = jsonRequest.getBytes("utf-8"); + os.write(input, 0, input.length); + } + } + + private static ValidationResponse readResponse(HttpURLConnection connection) throws Exception { + int statusCode = connection.getResponseCode(); + BufferedReader reader; + + if (statusCode == HttpURLConnection.HTTP_OK) { + reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8")); + } else { + reader = new BufferedReader(new InputStreamReader(connection.getErrorStream(), "utf-8")); + } + + StringBuilder responseBuilder = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + responseBuilder.append(line.trim()); + } + + reader.close(); + connection.disconnect(); + + return ValidationResponse.fromJson(responseBuilder.toString()); + } + /* Get user of this sql. * 1. If shiro is enabled, use the login user * 2. Otherwise try to get it from interpreter setting, e.g. default.user @@ -809,8 +864,59 @@ private InterpreterResult executeSql(String dbPrefix, String sql, HiveUtils.startHiveMonitorThread(statement, context, Boolean.parseBoolean(getProperty("hive.log.display", "true")), this); } - // TODO: add async query optimizer checks - // adding test code + + ValidationRequest request = new ValidationRequest(sqlToExecute); + try { + ValidationResponse response = sendValidationRequest(request); + if (response.isPreSubmitFail()) { + String outputMessage = response.getMessage(); + String userName = getUser(context); + System.out.println(userName); + StringBuilder finalOutput = new StringBuilder(); + + if (response.isFailFast()) { + JSONObject jsonObject = new JSONObject(outputMessage); + finalOutput.append("The following TABLE(s) used in the query are not using partition filter:\n"); + + JSONArray tableNames = jsonObject.names(); + if (tableNames != null) { + for (int i = 0; i < tableNames.length(); i++) { + String table = tableNames.getString(i); + JSONArray partitions = jsonObject.getJSONArray(table); + finalOutput.append(table).append(" -> "); + + for (int j = 0; j < partitions.length(); j++) { + finalOutput.append(partitions.getString(j)); + if (j < partitions.length() - 1) { + finalOutput.append(", "); + } + } + finalOutput.append("\n"); + } + } + } else if (response.isFailedByDeprecatedTable()) { + JSONObject jsonObject = new JSONObject(outputMessage); + finalOutput.append("The following TABLE(s) used in the query are restricted:\n"); + + JSONArray tableNames = jsonObject.names(); + if (tableNames != null) { + for (int i = 0; i < tableNames.length(); i++) { + String table = tableNames.getString(i); + finalOutput.append(table).append(" -> ").append(jsonObject.getString(table)).append("\n"); + } + } + } + finalOutput.append(userName); + context.getLocalProperties().put(CANCEL_REASON, finalOutput.toString()); + cancel(context); + return new InterpreterResult(Code.ERROR, finalOutput.toString()); + } + + } catch (Exception e) { + System.err.println("Error occurred while sending request: " + e.getMessage()); + e.printStackTrace(); + } + if (sqlToExecute.contains("fail_fast_kill")) { context.getLocalProperties().put(CANCEL_REASON, "Fail Fast custom error"); cancel(context); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java new file mode 100644 index 00000000000..7dea7f564b0 --- /dev/null +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java @@ -0,0 +1,24 @@ +package org.apache.zeppelin.jdbc; + +public class ValidationRequest { + private String queryText; + + // Constructor + public ValidationRequest(String queryText) { + this.queryText = queryText; + } + + // Getter and Setter + public String getQueryText() { + return queryText; + } + + public void setQueryText(String queryText) { + this.queryText = queryText; + } + + public String toJson() { + return "{\"query_text\":\"" + queryText + "\"}"; + } +} + diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java new file mode 100644 index 00000000000..801a5aa82c8 --- /dev/null +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -0,0 +1,57 @@ +package org.apache.zeppelin.jdbc; + +public class ValidationResponse { + private boolean preSubmitFail; + private boolean failFast; + private boolean failedByDeprecatedTable; + private String message; + + // Getters and Setters + public boolean isPreSubmitFail() { + return preSubmitFail; + } + + public void setPreSubmitFail(boolean preSubmitFail) { + this.preSubmitFail = preSubmitFail; + } + + public boolean isFailFast() { + return failFast; + } + + public void setFailFast(boolean failFast) { + this.failFast = failFast; + } + + public boolean isFailedByDeprecatedTable() { + return failedByDeprecatedTable; + } + + public void setFailedByDeprecatedTable(boolean failedByDeprecatedTable) { + this.failedByDeprecatedTable = failedByDeprecatedTable; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public static ValidationResponse fromJson(String jsonResponse) { + ValidationResponse response = new ValidationResponse(); + // Use simple JSON parsing (can replace with a library like Jackson or Gson) + response.setPreSubmitFail(jsonResponse.contains("\"pre_submit_fail\":true")); + response.setFailFast(jsonResponse.contains("\"fail_fast\":true")); + response.setFailedByDeprecatedTable(jsonResponse.contains("\"failed_by_deprecated_table\":true")); + + int messageIndex = jsonResponse.indexOf("\"message\":\""); + if (messageIndex != -1) { + int messageEnd = jsonResponse.indexOf("\"", messageIndex + 10); + String message = jsonResponse.substring(messageIndex + 10, messageEnd); + response.setMessage(message); + } + return response; + } +} From 73f30b0cbb91ebeee2b729e867398f70a1e96c8f Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Wed, 11 Sep 2024 22:21:52 +0530 Subject: [PATCH 05/20] added dependency --- jdbc/pom.xml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/jdbc/pom.xml b/jdbc/pom.xml index 0a9b9ccc4a1..5eeb554719e 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -51,8 +51,14 @@ json 20210307 + + org.codehaus.jettison + jettison + 1.4.1 + - + + org.postgresql postgresql ${postgresql.version} From 357f481615f3fae553097ee608b8f4b30e55fa2b Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Wed, 11 Sep 2024 23:33:03 +0530 Subject: [PATCH 06/20] added out --- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index d59136d8b23..350e72b35be 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -871,10 +871,11 @@ private InterpreterResult executeSql(String dbPrefix, String sql, if (response.isPreSubmitFail()) { String outputMessage = response.getMessage(); String userName = getUser(context); - System.out.println(userName); + context.out.write(userName); StringBuilder finalOutput = new StringBuilder(); if (response.isFailFast()) { + context.out.write("Fail Fast custom error"); JSONObject jsonObject = new JSONObject(outputMessage); finalOutput.append("The following TABLE(s) used in the query are not using partition filter:\n"); @@ -907,20 +908,22 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } } finalOutput.append(userName); + context.out.write(finalOutput.toString()); context.getLocalProperties().put(CANCEL_REASON, finalOutput.toString()); cancel(context); return new InterpreterResult(Code.ERROR, finalOutput.toString()); } } catch (Exception e) { + context.out.write("Error occurred while sending request"); System.err.println("Error occurred while sending request: " + e.getMessage()); e.printStackTrace(); } - if (sqlToExecute.contains("fail_fast_kill")) { - context.getLocalProperties().put(CANCEL_REASON, "Fail Fast custom error"); - cancel(context); - } +// if (sqlToExecute.contains("fail_fast_kill")) { +// context.getLocalProperties().put(CANCEL_REASON, "Fail Fast custom error"); +// cancel(context); +// } boolean isResultSetAvailable = statement.execute(sqlToExecute); From 98aa97afdb58f1ba37cb96f10985768db256d907 Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Tue, 17 Sep 2024 15:47:12 +0530 Subject: [PATCH 07/20] updated json string to parse --- jdbc/pom.xml | 5 ++++ .../apache/zeppelin/jdbc/JDBCInterpreter.java | 9 ------- .../zeppelin/jdbc/ValidationRequest.java | 2 -- .../zeppelin/jdbc/ValidationResponse.java | 25 +++++++++++-------- 4 files changed, 20 insertions(+), 21 deletions(-) diff --git a/jdbc/pom.xml b/jdbc/pom.xml index 5eeb554719e..bfa0a600cc9 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -56,6 +56,11 @@ jettison 1.4.1 + + com.google.code.gson + gson + 2.8.9 + diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 350e72b35be..b67e92c68b3 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -908,7 +908,6 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } } finalOutput.append(userName); - context.out.write(finalOutput.toString()); context.getLocalProperties().put(CANCEL_REASON, finalOutput.toString()); cancel(context); return new InterpreterResult(Code.ERROR, finalOutput.toString()); @@ -916,16 +915,8 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } catch (Exception e) { context.out.write("Error occurred while sending request"); - System.err.println("Error occurred while sending request: " + e.getMessage()); - e.printStackTrace(); } -// if (sqlToExecute.contains("fail_fast_kill")) { -// context.getLocalProperties().put(CANCEL_REASON, "Fail Fast custom error"); -// cancel(context); -// } - - boolean isResultSetAvailable = statement.execute(sqlToExecute); getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful(dbPrefix); if (isResultSetAvailable) { diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java index 7dea7f564b0..94de605e3d3 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java @@ -3,12 +3,10 @@ public class ValidationRequest { private String queryText; - // Constructor public ValidationRequest(String queryText) { this.queryText = queryText; } - // Getter and Setter public String getQueryText() { return queryText; } diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index 801a5aa82c8..027f91b7300 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -1,5 +1,8 @@ package org.apache.zeppelin.jdbc; +import com.google.gson.Gson; +import com.google.gson.JsonObject; + public class ValidationResponse { private boolean preSubmitFail; private boolean failFast; @@ -40,18 +43,20 @@ public void setMessage(String message) { } public static ValidationResponse fromJson(String jsonResponse) { + Gson gson = new Gson(); ValidationResponse response = new ValidationResponse(); - // Use simple JSON parsing (can replace with a library like Jackson or Gson) - response.setPreSubmitFail(jsonResponse.contains("\"pre_submit_fail\":true")); - response.setFailFast(jsonResponse.contains("\"fail_fast\":true")); - response.setFailedByDeprecatedTable(jsonResponse.contains("\"failed_by_deprecated_table\":true")); - - int messageIndex = jsonResponse.indexOf("\"message\":\""); - if (messageIndex != -1) { - int messageEnd = jsonResponse.indexOf("\"", messageIndex + 10); - String message = jsonResponse.substring(messageIndex + 10, messageEnd); - response.setMessage(message); + + JsonObject jsonObject = gson.fromJson(jsonResponse, JsonObject.class); + + response.setPreSubmitFail(jsonObject.get("pre_submit_fail").getAsBoolean()); + response.setFailFast(jsonObject.get("fail_fast").getAsBoolean()); + response.setFailedByDeprecatedTable(jsonObject.get("failed_by_deprecated_table").getAsBoolean()); + + // Extract the "message" field + if (jsonObject.has("message")) { + response.setMessage(jsonObject.get("message").getAsString()); } + return response; } } From 39f0a7bab0dd5732c0f1fcd6e3b017ecbb2f837e Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Tue, 17 Sep 2024 16:58:58 +0530 Subject: [PATCH 08/20] added loggers --- .../java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 9 ++++++--- .../org/apache/zeppelin/jdbc/ValidationResponse.java | 2 -- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index b67e92c68b3..a3eaa99ddc2 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -867,11 +867,14 @@ private InterpreterResult executeSql(String dbPrefix, String sql, ValidationRequest request = new ValidationRequest(sqlToExecute); try { + context.out.write("Sending request for validation"); ValidationResponse response = sendValidationRequest(request); + context.out.write("Response received for validation"); if (response.isPreSubmitFail()) { + context.out.write("Pre Submit custom error check"); String outputMessage = response.getMessage(); - String userName = getUser(context); - context.out.write(userName); +// String userName = getUser(context); +// context.out.write(userName); StringBuilder finalOutput = new StringBuilder(); if (response.isFailFast()) { @@ -907,7 +910,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } } } - finalOutput.append(userName); +// finalOutput.append(userName); context.getLocalProperties().put(CANCEL_REASON, finalOutput.toString()); cancel(context); return new InterpreterResult(Code.ERROR, finalOutput.toString()); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index 027f91b7300..0f0a30cedf4 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -51,8 +51,6 @@ public static ValidationResponse fromJson(String jsonResponse) { response.setPreSubmitFail(jsonObject.get("pre_submit_fail").getAsBoolean()); response.setFailFast(jsonObject.get("fail_fast").getAsBoolean()); response.setFailedByDeprecatedTable(jsonObject.get("failed_by_deprecated_table").getAsBoolean()); - - // Extract the "message" field if (jsonObject.has("message")) { response.setMessage(jsonObject.get("message").getAsString()); } From b6225df40b710667e997fd4afde71fe07d1bddbf Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Tue, 17 Sep 2024 18:00:15 +0530 Subject: [PATCH 09/20] added stract trace --- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index a3eaa99ddc2..6e7c90afdc1 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -867,14 +867,14 @@ private InterpreterResult executeSql(String dbPrefix, String sql, ValidationRequest request = new ValidationRequest(sqlToExecute); try { - context.out.write("Sending request for validation"); + context.out.write("Sending request for validation\n"); ValidationResponse response = sendValidationRequest(request); context.out.write("Response received for validation"); if (response.isPreSubmitFail()) { context.out.write("Pre Submit custom error check"); String outputMessage = response.getMessage(); -// String userName = getUser(context); -// context.out.write(userName); + String userName = getUser(context); + context.out.write(userName); StringBuilder finalOutput = new StringBuilder(); if (response.isFailFast()) { @@ -910,14 +910,19 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } } } -// finalOutput.append(userName); + finalOutput.append(userName); context.getLocalProperties().put(CANCEL_REASON, finalOutput.toString()); cancel(context); return new InterpreterResult(Code.ERROR, finalOutput.toString()); } } catch (Exception e) { - context.out.write("Error occurred while sending request"); + String error = "Error occurred while sending request" + e.getMessage(); + String stackTrace = e.getStackTrace().toString(); + String mess = e.getLocalizedMessage(); + context.out.write(error); + context.out.write(stackTrace); + context.out.write(mess); } boolean isResultSetAvailable = statement.execute(sqlToExecute); From b06c1619f786226ee67d45a43e570a05fd8c694a Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Tue, 17 Sep 2024 20:13:45 +0530 Subject: [PATCH 10/20] removed test loggers --- jdbc/pom.xml | 2 +- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 14 +++----------- .../apache/zeppelin/jdbc/ValidationRequest.java | 14 ++++---------- 3 files changed, 8 insertions(+), 22 deletions(-) diff --git a/jdbc/pom.xml b/jdbc/pom.xml index bfa0a600cc9..56467b4d074 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -59,7 +59,7 @@ com.google.code.gson gson - 2.8.9 + 2.8.9 diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 6e7c90afdc1..21c29b21121 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -865,20 +865,15 @@ private InterpreterResult executeSql(String dbPrefix, String sql, Boolean.parseBoolean(getProperty("hive.log.display", "true")), this); } - ValidationRequest request = new ValidationRequest(sqlToExecute); + String userName = getUser(context); + ValidationRequest request = new ValidationRequest(sqlToExecute, userName); try { - context.out.write("Sending request for validation\n"); ValidationResponse response = sendValidationRequest(request); - context.out.write("Response received for validation"); if (response.isPreSubmitFail()) { - context.out.write("Pre Submit custom error check"); String outputMessage = response.getMessage(); - String userName = getUser(context); - context.out.write(userName); StringBuilder finalOutput = new StringBuilder(); if (response.isFailFast()) { - context.out.write("Fail Fast custom error"); JSONObject jsonObject = new JSONObject(outputMessage); finalOutput.append("The following TABLE(s) used in the query are not using partition filter:\n"); @@ -910,18 +905,15 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } } } - finalOutput.append(userName); context.getLocalProperties().put(CANCEL_REASON, finalOutput.toString()); cancel(context); - return new InterpreterResult(Code.ERROR, finalOutput.toString()); + return new InterpreterResult(Code.ERROR, "Failed by Fail Fast"); } } catch (Exception e) { String error = "Error occurred while sending request" + e.getMessage(); - String stackTrace = e.getStackTrace().toString(); String mess = e.getLocalizedMessage(); context.out.write(error); - context.out.write(stackTrace); context.out.write(mess); } diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java index 94de605e3d3..94e0a6edf40 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java @@ -2,21 +2,15 @@ public class ValidationRequest { private String queryText; + private String user; - public ValidationRequest(String queryText) { - this.queryText = queryText; - } - - public String getQueryText() { - return queryText; - } - - public void setQueryText(String queryText) { + public ValidationRequest(String queryText, String user) { this.queryText = queryText; + this.user = user; } public String toJson() { - return "{\"query_text\":\"" + queryText + "\"}"; + return "{\"queryText\":\"" + queryText + "\",\"user\":\"" + user + "\"}"; } } From 08cf063f0223123fe1188aae4cb7438a28599289 Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Tue, 17 Sep 2024 22:26:12 +0530 Subject: [PATCH 11/20] updated request payload --- .../src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 2 -- .../main/java/org/apache/zeppelin/jdbc/ValidationRequest.java | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 21c29b21121..0058f9f45dd 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -376,7 +376,6 @@ private static HttpURLConnection createConnection() throws Exception { private static void sendRequest(HttpURLConnection connection, ValidationRequest request) throws Exception { try (OutputStream os = connection.getOutputStream()) { - // Manually convert the request object to a JSON string String jsonRequest = request.toJson(); byte[] input = jsonRequest.getBytes("utf-8"); os.write(input, 0, input.length); @@ -909,7 +908,6 @@ private InterpreterResult executeSql(String dbPrefix, String sql, cancel(context); return new InterpreterResult(Code.ERROR, "Failed by Fail Fast"); } - } catch (Exception e) { String error = "Error occurred while sending request" + e.getMessage(); String mess = e.getLocalizedMessage(); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java index 94e0a6edf40..71d8ad17418 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationRequest.java @@ -10,7 +10,7 @@ public ValidationRequest(String queryText, String user) { } public String toJson() { - return "{\"queryText\":\"" + queryText + "\",\"user\":\"" + user + "\"}"; + return "{\"query_text\":\"" + queryText + "\",\"user\":\"" + user + "\"}"; } } From 682c3dcb11bf62b6aaea7cfe4c412a9a1e0547cd Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Wed, 18 Sep 2024 13:06:42 +0530 Subject: [PATCH 12/20] updated sql --- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 10 ++++++++-- .../apache/zeppelin/jdbc/ValidationResponse.java | 15 ++++++++++----- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 0058f9f45dd..099083b1c8d 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -865,7 +865,13 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } String userName = getUser(context); - ValidationRequest request = new ValidationRequest(sqlToExecute, userName); + String sqlToValidate = sqlToExecute + .replace("\n", "\\n") // Newlines + .replace("\r", "\\r") // Carriage return + .replace("\t", "\\t") // Tabs + .replace("\"", "\\\"") // Double quotes + .replace("\\", "\\\\"); // Backslashes + ValidationRequest request = new ValidationRequest(sqlToValidate, userName); try { ValidationResponse response = sendValidationRequest(request); if (response.isPreSubmitFail()) { @@ -909,7 +915,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, return new InterpreterResult(Code.ERROR, "Failed by Fail Fast"); } } catch (Exception e) { - String error = "Error occurred while sending request" + e.getMessage(); + String error = "Error occurred while sending request " + e.getMessage(); String mess = e.getLocalizedMessage(); context.out.write(error); context.out.write(mess); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index 0f0a30cedf4..2128dfb86a1 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -48,13 +48,18 @@ public static ValidationResponse fromJson(String jsonResponse) { JsonObject jsonObject = gson.fromJson(jsonResponse, JsonObject.class); - response.setPreSubmitFail(jsonObject.get("pre_submit_fail").getAsBoolean()); - response.setFailFast(jsonObject.get("fail_fast").getAsBoolean()); - response.setFailedByDeprecatedTable(jsonObject.get("failed_by_deprecated_table").getAsBoolean()); - if (jsonObject.has("message")) { + if (jsonObject.has("pre_submit_fail") && !jsonObject.get("pre_submit_fail").isJsonNull()) { + response.setPreSubmitFail(jsonObject.get("pre_submit_fail").getAsBoolean()); + } + if (jsonObject.has("fail_fast") && !jsonObject.get("fail_fast").isJsonNull()) { + response.setFailFast(jsonObject.get("fail_fast").getAsBoolean()); + } + if (jsonObject.has("failed_by_deprecated_table") && !jsonObject.get("failed_by_deprecated_table").isJsonNull()) { + response.setFailedByDeprecatedTable(jsonObject.get("failed_by_deprecated_table").getAsBoolean()); + } + if (jsonObject.has("message") && !jsonObject.get("message").isJsonNull()) { response.setMessage(jsonObject.get("message").getAsString()); } - return response; } } From 0e70d81bf62f2aabca3b141abd89683aabfc4936 Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Wed, 18 Sep 2024 15:48:19 +0530 Subject: [PATCH 13/20] updated escapae character --- .../java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 099083b1c8d..8c1f6057012 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -866,11 +866,9 @@ private InterpreterResult executeSql(String dbPrefix, String sql, String userName = getUser(context); String sqlToValidate = sqlToExecute - .replace("\n", "\\n") // Newlines - .replace("\r", "\\r") // Carriage return - .replace("\t", "\\t") // Tabs - .replace("\"", "\\\"") // Double quotes - .replace("\\", "\\\\"); // Backslashes + .replace("\n", " ") + .replace("\r", " ") + .replace("\t", " "); ValidationRequest request = new ValidationRequest(sqlToValidate, userName); try { ValidationResponse response = sendValidationRequest(request); From e0a936dab8417559ba227ad7242cd07ba03e2dd1 Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Wed, 18 Sep 2024 17:28:16 +0530 Subject: [PATCH 14/20] updated url --- .../main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 8c1f6057012..37414f95dee 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -151,7 +151,7 @@ public class JDBCInterpreter extends KerberosInterpreter { "zeppelin.jdbc.concurrent.max_connection"; private static final String DBCP_STRING = "jdbc:apache:commons:dbcp:"; private static final String MAX_ROWS_KEY = "zeppelin.jdbc.maxRows"; - private static final String FAIL_FAST_VALIDATE_URL = "http://localhost:8080/api/validate"; + private static final String FAIL_FAST_VALIDATE_URL = "http://spark-event-listener.prd.meesho.int/api/validate"; private static final Set PRESTO_PROPERTIES = new HashSet<>(Arrays.asList( "user", "password", @@ -877,6 +877,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, StringBuilder finalOutput = new StringBuilder(); if (response.isFailFast()) { + context.out.write("Query failed because partitions were not used in the query. Please ensure that partition filters are applied.\n"); JSONObject jsonObject = new JSONObject(outputMessage); finalOutput.append("The following TABLE(s) used in the query are not using partition filter:\n"); @@ -897,6 +898,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } } } else if (response.isFailedByDeprecatedTable()) { + context.out.write("Query failed as Restricted table(s) are used\n"); JSONObject jsonObject = new JSONObject(outputMessage); finalOutput.append("The following TABLE(s) used in the query are restricted:\n"); @@ -910,7 +912,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } context.getLocalProperties().put(CANCEL_REASON, finalOutput.toString()); cancel(context); - return new InterpreterResult(Code.ERROR, "Failed by Fail Fast"); + return new InterpreterResult(Code.ERROR); } } catch (Exception e) { String error = "Error occurred while sending request " + e.getMessage(); From 76db3df10248abcf6914e856ec9b28188159ac12 Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Tue, 1 Oct 2024 16:48:46 +0530 Subject: [PATCH 15/20] added timeout for rca cluster --- .../main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 37414f95dee..f9fe9dd4f87 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -839,6 +839,12 @@ private InterpreterResult executeSql(String dbPrefix, String sql, LOGGER.info("Execute sql: " + sqlToExecute); statement = connection.createStatement(); + String interpreterName = getProperty("zeppelin.jdbc.interpreter.name"); + + if (interpreterName != null && interpreterName.startsWith("spark_")) { + statement.setQueryTimeout(60); // 10800 seconds = 3 hours + } + // fetch n+1 rows in order to indicate there's more rows available (for large selects) statement.setFetchSize(context.getIntLocalProperty("limit", getMaxResult())); statement.setMaxRows(context.getIntLocalProperty("limit", maxRows)); From f0d6248c44250271bb5ed3060f06470fc2ccf05a Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Wed, 2 Oct 2024 20:30:30 +0530 Subject: [PATCH 16/20] added logging --- .../main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index f9fe9dd4f87..3b8dd8b6c05 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -840,9 +840,11 @@ private InterpreterResult executeSql(String dbPrefix, String sql, statement = connection.createStatement(); String interpreterName = getProperty("zeppelin.jdbc.interpreter.name"); + context.out.write("Interpreter Name: " + interpreterName); if (interpreterName != null && interpreterName.startsWith("spark_")) { - statement.setQueryTimeout(60); // 10800 seconds = 3 hours + statement.setQueryTimeout(5); // 10800 seconds = 3 hours + context.out.write("Query Timeout: 5 seconds"); } // fetch n+1 rows in order to indicate there's more rows available (for large selects) From 761455984b258b1e629ac51d8b9bcc23e096d348 Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Tue, 8 Oct 2024 14:41:40 +0530 Subject: [PATCH 17/20] updated get interpreterName --- .../apache/zeppelin/jdbc/JDBCInterpreter.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 3b8dd8b6c05..b34862723be 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -839,9 +839,13 @@ private InterpreterResult executeSql(String dbPrefix, String sql, LOGGER.info("Execute sql: " + sqlToExecute); statement = connection.createStatement(); - String interpreterName = getProperty("zeppelin.jdbc.interpreter.name"); + String interpreterName = getInterpreterGroup().getId(); context.out.write("Interpreter Name: " + interpreterName); + String className = getClassName(); + context.out.write("Class Name: " + className); + + if (interpreterName != null && interpreterName.startsWith("spark_")) { statement.setQueryTimeout(5); // 10800 seconds = 3 hours context.out.write("Query Timeout: 5 seconds"); @@ -885,16 +889,17 @@ private InterpreterResult executeSql(String dbPrefix, String sql, StringBuilder finalOutput = new StringBuilder(); if (response.isFailFast()) { - context.out.write("Query failed because partitions were not used in the query. Please ensure that partition filters are applied.\n"); + context.out.write("Query Error: Partition Filters Missing\n" + + "Your query failed because some tables are missing partition filters. To avoid this, please ensure partition filters are applied to improve performance."); JSONObject jsonObject = new JSONObject(outputMessage); - finalOutput.append("The following TABLE(s) used in the query are not using partition filter:\n"); + finalOutput.append("The following table(s) are missing partition filters:\n"); JSONArray tableNames = jsonObject.names(); if (tableNames != null) { for (int i = 0; i < tableNames.length(); i++) { String table = tableNames.getString(i); JSONArray partitions = jsonObject.getJSONArray(table); - finalOutput.append(table).append(" -> "); + finalOutput.append("Table: ").append(table).append(" Partition filters: "); for (int j = 0; j < partitions.length(); j++) { finalOutput.append(partitions.getString(j)); @@ -906,15 +911,15 @@ private InterpreterResult executeSql(String dbPrefix, String sql, } } } else if (response.isFailedByDeprecatedTable()) { - context.out.write("Query failed as Restricted table(s) are used\n"); + context.out.write("Query Error: Restricted Table Used\n"); JSONObject jsonObject = new JSONObject(outputMessage); - finalOutput.append("The following TABLE(s) used in the query are restricted:\n"); + finalOutput.append("It seems you're trying to use a restricted table:\n"); JSONArray tableNames = jsonObject.names(); if (tableNames != null) { for (int i = 0; i < tableNames.length(); i++) { String table = tableNames.getString(i); - finalOutput.append(table).append(" -> ").append(jsonObject.getString(table)).append("\n"); + finalOutput.append("Use: ").append(jsonObject.getString(table)).append(" in place of ").append(table).append("\n"); } } } From b4e1d0d12a27a787465dce904d1f74757056c6e1 Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Tue, 8 Oct 2024 17:16:52 +0530 Subject: [PATCH 18/20] removed logger --- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index b34862723be..48592b21914 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -840,15 +840,9 @@ private InterpreterResult executeSql(String dbPrefix, String sql, statement = connection.createStatement(); String interpreterName = getInterpreterGroup().getId(); - context.out.write("Interpreter Name: " + interpreterName); - String className = getClassName(); - context.out.write("Class Name: " + className); - - - if (interpreterName != null && interpreterName.startsWith("spark_")) { - statement.setQueryTimeout(5); // 10800 seconds = 3 hours - context.out.write("Query Timeout: 5 seconds"); + if (interpreterName != null && interpreterName.startsWith("spark_rca_")) { + statement.setQueryTimeout(10800); // 10800 seconds = 3 hours } // fetch n+1 rows in order to indicate there's more rows available (for large selects) @@ -890,7 +884,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, if (response.isFailFast()) { context.out.write("Query Error: Partition Filters Missing\n" + - "Your query failed because some tables are missing partition filters. To avoid this, please ensure partition filters are applied to improve performance."); + "Your query failed because some tables are missing partition filters. To avoid this, please ensure partition filters are applied to improve performance.\n"); JSONObject jsonObject = new JSONObject(outputMessage); finalOutput.append("The following table(s) are missing partition filters:\n"); @@ -899,7 +893,7 @@ private InterpreterResult executeSql(String dbPrefix, String sql, for (int i = 0; i < tableNames.length(); i++) { String table = tableNames.getString(i); JSONArray partitions = jsonObject.getJSONArray(table); - finalOutput.append("Table: ").append(table).append(" Partition filters: "); + finalOutput.append("Table: ").append(table).append(", Partition filter's: "); for (int j = 0; j < partitions.length(); j++) { finalOutput.append(partitions.getString(j)); From 12dfddcfc9c1f2d2088405535828fc0df0537c43 Mon Sep 17 00:00:00 2001 From: Aman Singh Chauhan Date: Thu, 10 Oct 2024 13:53:11 +0530 Subject: [PATCH 19/20] updated Gson object --- .../zeppelin/jdbc/ValidationResponse.java | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java index 2128dfb86a1..05716cc2edb 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/ValidationResponse.java @@ -1,6 +1,7 @@ package org.apache.zeppelin.jdbc; import com.google.gson.Gson; +import com.google.gson.JsonElement; import com.google.gson.JsonObject; public class ValidationResponse { @@ -46,19 +47,28 @@ public static ValidationResponse fromJson(String jsonResponse) { Gson gson = new Gson(); ValidationResponse response = new ValidationResponse(); - JsonObject jsonObject = gson.fromJson(jsonResponse, JsonObject.class); + JsonElement jsonElement = gson.fromJson(jsonResponse, JsonElement.class); - if (jsonObject.has("pre_submit_fail") && !jsonObject.get("pre_submit_fail").isJsonNull()) { - response.setPreSubmitFail(jsonObject.get("pre_submit_fail").getAsBoolean()); - } - if (jsonObject.has("fail_fast") && !jsonObject.get("fail_fast").isJsonNull()) { - response.setFailFast(jsonObject.get("fail_fast").getAsBoolean()); - } - if (jsonObject.has("failed_by_deprecated_table") && !jsonObject.get("failed_by_deprecated_table").isJsonNull()) { - response.setFailedByDeprecatedTable(jsonObject.get("failed_by_deprecated_table").getAsBoolean()); - } - if (jsonObject.has("message") && !jsonObject.get("message").isJsonNull()) { - response.setMessage(jsonObject.get("message").getAsString()); + if (jsonElement.isJsonObject()) { + JsonObject jsonObject = jsonElement.getAsJsonObject(); + + if (jsonObject.has("pre_submit_fail") && !jsonObject.get("pre_submit_fail").isJsonNull()) { + response.setPreSubmitFail(jsonObject.get("pre_submit_fail").getAsBoolean()); + } + if (jsonObject.has("fail_fast") && !jsonObject.get("fail_fast").isJsonNull()) { + response.setFailFast(jsonObject.get("fail_fast").getAsBoolean()); + } + if (jsonObject.has("failed_by_deprecated_table") && !jsonObject.get("failed_by_deprecated_table").isJsonNull()) { + response.setFailedByDeprecatedTable(jsonObject.get("failed_by_deprecated_table").getAsBoolean()); + } + if (jsonObject.has("message") && !jsonObject.get("message").isJsonNull()) { + response.setMessage(jsonObject.get("message").getAsString()); + } + } else { + response.setPreSubmitFail(false); + response.setFailFast(false); + response.setFailedByDeprecatedTable(false); + response.setMessage(""); // Default message } return response; } From e54907b5e3ea0520048b47f59b7eea1bae883e9f Mon Sep 17 00:00:00 2001 From: shagil-meesho Date: Thu, 17 Oct 2024 23:53:53 +0530 Subject: [PATCH 20/20] feat: suffixing the STATEMENT_TIMEOUT = 10800 before rca interpreter fired queries --- .../java/org/apache/zeppelin/jdbc/JDBCInterpreter.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 48592b21914..930e42e0a27 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -1058,8 +1058,14 @@ public InterpreterResult internalInterpret(String cmd, InterpreterContext contex LOGGER.debug("Run SQL command '{}'", cmd); String dbPrefix = getDBPrefix(context); LOGGER.debug("DBPrefix: {}, SQL command: '{}'", dbPrefix, cmd); + String interpreterName = getInterpreterGroup().getId(); + if (interpreterName!=null && interpreterName.startsWith("spark_rca_")) { + cmd = "set STATEMENT_TIMEOUT=10800;\n"+cmd; + } + LOGGER.debug("InterpreterName: {}, SQL command: '{}'", interpreterName, cmd); + String finalCmd = cmd; if (!isRefreshMode(context)) { - return executeSql(dbPrefix, cmd, context); + return executeSql(dbPrefix, finalCmd, context); } else { int refreshInterval = Integer.parseInt(context.getLocalProperties().get("refreshInterval")); paragraphCancelMap.put(context.getParagraphId(), false); @@ -1070,7 +1076,7 @@ public InterpreterResult internalInterpret(String cmd, InterpreterContext contex refreshExecutor.scheduleAtFixedRate(() -> { context.out.clear(false); try { - InterpreterResult result = executeSql(dbPrefix, cmd, context); + InterpreterResult result = executeSql(dbPrefix, finalCmd, context); context.out.flush(); interpreterResultRef.set(result); if (result.code() != Code.SUCCESS) {