Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.lucene.search.TotalHits.Relation;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.mapper.MapperService;
Expand All @@ -25,6 +26,9 @@
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.sql.legacy.domain.Field;
import org.opensearch.sql.legacy.domain.Select;
import org.opensearch.sql.legacy.domain.hints.Hint;
import org.opensearch.sql.legacy.domain.hints.HintType;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.executor.ElasticHitsExecutor;
import org.opensearch.sql.legacy.metrics.MetricName;
Expand All @@ -47,6 +51,7 @@ public abstract class ElasticJoinExecutor extends ElasticHitsExecutor {
private final Set<String> aliasesOnReturn;
private final boolean allFieldsReturn;
protected final String[] indices;
protected final JoinRequestBuilder requestBuilder; // Added to store request builder

protected ElasticJoinExecutor(Client client, JoinRequestBuilder requestBuilder) {
metaResults = new MetaSearchResult();
Expand All @@ -58,6 +63,7 @@ protected ElasticJoinExecutor(Client client, JoinRequestBuilder requestBuilder)
&& (secondTableReturnedField == null || secondTableReturnedField.size() == 0);
indices = getIndices(requestBuilder);
this.client = client;
this.requestBuilder = requestBuilder; // Store request builder for hint access
}

public void sendResponse(RestChannel channel) throws IOException {
Expand Down Expand Up @@ -87,7 +93,23 @@ public void sendResponse(RestChannel channel) throws IOException {
public void run() throws IOException, SqlParseException {
try {
long timeBefore = System.currentTimeMillis();
pit = new PointInTimeHandlerImpl(client, indices);

LOG.info("🔍 Starting join execution, checking for JOIN_TIME_OUT hints...");

// ✅ Extract JOIN_TIME_OUT hint and create PIT with custom keepalive
TimeValue customKeepAlive = extractJoinTimeoutFromHints();

if (customKeepAlive != null) {
LOG.info(
"✅ Using custom PIT keepalive from JOIN_TIME_OUT hint: {} seconds ({}ms)",
customKeepAlive.getSeconds(),
customKeepAlive.getMillis());
pit = new PointInTimeHandlerImpl(client, indices, customKeepAlive);
} else {
LOG.info("⚠️ No JOIN_TIME_OUT hint found, using default PIT keepalive");
pit = new PointInTimeHandlerImpl(client, indices);
}

pit.create();
results = innerRun();
long joinTimeInMilli = System.currentTimeMillis() - timeBefore;
Expand All @@ -105,6 +127,158 @@ public void run() throws IOException, SqlParseException {
}
}

/**
* Extract JOIN_TIME_OUT hint value from the request builder
*
* @return TimeValue for custom keepalive, or null if no hint found
*/
protected TimeValue extractJoinTimeoutFromHints() {
try {
LOG.info("🔍 DEBUG: Starting hint extraction");
LOG.info(
"🔍 DEBUG: requestBuilder = {}",
requestBuilder != null ? requestBuilder.getClass().getSimpleName() : "null");

// Debug first table
TableInJoinRequestBuilder firstTable = requestBuilder.getFirstTable();
LOG.info("🔍 DEBUG: firstTable = {}", firstTable != null ? "exists" : "null");

if (firstTable != null) {
Select firstSelect = firstTable.getOriginalSelect();
LOG.info(
"🔍 DEBUG: firstTable.getOriginalSelect() = {}",
firstSelect != null ? "exists" : "null");

if (firstSelect != null) {
List<Hint> firstHints = firstSelect.getHints();
LOG.info(
"🔍 DEBUG: firstTable hints count = {}",
firstHints != null ? firstHints.size() : "null");

if (firstHints != null && !firstHints.isEmpty()) {
LOG.info("🔍 DEBUG: First table hints:");
for (int i = 0; i < firstHints.size(); i++) {
Hint hint = firstHints.get(i);
LOG.info(
"🔍 DEBUG: Hint[{}]: type={}, params={}",
i,
hint.getType(),
hint.getParams() != null ? java.util.Arrays.toString(hint.getParams()) : "null");
}
}
}
}

// Debug second table
TableInJoinRequestBuilder secondTable = requestBuilder.getSecondTable();
LOG.info("🔍 DEBUG: secondTable = {}", secondTable != null ? "exists" : "null");

if (secondTable != null) {
Select secondSelect = secondTable.getOriginalSelect();
LOG.info(
"🔍 DEBUG: secondTable.getOriginalSelect() = {}",
secondSelect != null ? "exists" : "null");

if (secondSelect != null) {
List<Hint> secondHints = secondSelect.getHints();
LOG.info(
"🔍 DEBUG: secondTable hints count = {}",
secondHints != null ? secondHints.size() : "null");

if (secondHints != null && !secondHints.isEmpty()) {
LOG.info("🔍 DEBUG: Second table hints:");
for (int i = 0; i < secondHints.size(); i++) {
Hint hint = secondHints.get(i);
LOG.info(
"🔍 DEBUG: Hint[{}]: type={}, params={}",
i,
hint.getType(),
hint.getParams() != null ? java.util.Arrays.toString(hint.getParams()) : "null");
}
}
}
}

// Continue with original logic
TimeValue timeout = getJoinTimeoutFromTable(requestBuilder.getFirstTable());
if (timeout != null) {
return timeout;
}

timeout = getJoinTimeoutFromTable(requestBuilder.getSecondTable());
if (timeout != null) {
return timeout;
}

LOG.info("🔍 DEBUG: No JOIN_TIME_OUT hint found after checking both tables");
return null;

} catch (Exception e) {
LOG.error("🔍 DEBUG: Exception during hint extraction", e);
return null;
}
}

/** Extract JOIN_TIME_OUT hint from a specific table */
private TimeValue getJoinTimeoutFromTable(TableInJoinRequestBuilder table) {
if (table == null) {
LOG.debug("Table is null, no hints to extract");
return null;
}

Select originalSelect = table.getOriginalSelect();
if (originalSelect == null) {
LOG.debug("Original select is null, no hints to extract");
return null;
}

// Get hints from the Select object
List<Hint> hints = originalSelect.getHints();
LOG.debug("Found {} hints in select statement", hints != null ? hints.size() : 0);

if (hints != null && !hints.isEmpty()) {
for (Hint hint : hints) {
LOG.debug("Processing hint type: {}", hint.getType());
}
}

return findJoinTimeoutInHints(hints);
}

/** Find JOIN_TIME_OUT hint in a list of hints */
private TimeValue findJoinTimeoutInHints(List<Hint> hints) {
if (hints == null) {
LOG.debug("Hints list is null");
return null;
}

LOG.debug("Searching for JOIN_TIME_OUT in {} hints", hints.size());

for (Hint hint : hints) {
LOG.debug(
"Checking hint: type={}, params={}",
hint.getType(),
hint.getParams() != null ? java.util.Arrays.toString(hint.getParams()) : "null");

if (hint.getType() == HintType.JOIN_TIME_OUT) {
Object[] params = hint.getParams();
if (params != null && params.length > 0) {
Integer timeoutSeconds = (Integer) params[0];
LOG.info(
"✅ FOUND JOIN_TIME_OUT hint: {} seconds, converting to TimeValue", timeoutSeconds);
TimeValue result = TimeValue.timeValueSeconds(timeoutSeconds);
LOG.info("✅ Converted to TimeValue: {} ({}ms)", result, result.getMillis());
return result;
} else {
LOG.warn("JOIN_TIME_OUT hint found but has no parameters");
}
}
}

LOG.debug("No JOIN_TIME_OUT hint found in hints list");
return null;
}

protected abstract List<SearchHit> innerRun() throws IOException, SqlParseException;

public SearchHits getHits() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,19 @@

package org.opensearch.sql.legacy.executor.join;

import java.io.IOException;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.SearchHit;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl;
import org.opensearch.sql.legacy.query.planner.HashJoinQueryPlanRequestBuilder;
import org.opensearch.sql.legacy.query.planner.core.QueryPlanner;
import org.opensearch.sql.legacy.request.SqlRequest;
import org.opensearch.transport.client.Client;

/**
Expand All @@ -19,10 +28,128 @@
class QueryPlanElasticExecutor extends ElasticJoinExecutor {

private final QueryPlanner queryPlanner;
private final HashJoinQueryPlanRequestBuilder planRequestBuilder;

QueryPlanElasticExecutor(Client client, HashJoinQueryPlanRequestBuilder request) {
super(client, request);
this.queryPlanner = request.plan();
this.planRequestBuilder = request;
TimeValue customKeepAlive = extractJoinTimeoutFromSqlRequest();
if (customKeepAlive != null) {
LOG.info(
"✅ QueryPlanElasticExecutor: Passing custom timeout to QueryPlanner: {} seconds",
customKeepAlive.getSeconds());
this.queryPlanner = request.plan(customKeepAlive);
} else {
LOG.info("⚠️ QueryPlanElasticExecutor: No custom timeout, using default QueryPlanner");
this.queryPlanner = request.plan();
}
}

@Override
public void run() throws IOException, SqlParseException {
try {
long timeBefore = System.currentTimeMillis();

LOG.info(
"🔍 QueryPlanElasticExecutor: Starting execution, checking for JOIN_TIME_OUT hints...");

// ✅ Extract JOIN_TIME_OUT hint from the original SQL request
TimeValue customKeepAlive = extractJoinTimeoutFromSqlRequest();

if (customKeepAlive != null) {
LOG.info(
"✅ QueryPlanElasticExecutor: Using custom PIT keepalive from JOIN_TIME_OUT hint: {}"
+ " seconds ({}ms)",
customKeepAlive.getSeconds(),
customKeepAlive.getMillis());
pit = new PointInTimeHandlerImpl(client, indices, customKeepAlive);
} else {
LOG.info(
"⚠️ QueryPlanElasticExecutor: No JOIN_TIME_OUT hint found, using default PIT"
+ " keepalive");
pit = new PointInTimeHandlerImpl(client, indices);
}

pit.create();
results = innerRun();
long joinTimeInMilli = System.currentTimeMillis() - timeBefore;
this.metaResults.setTookImMilli(joinTimeInMilli);
} catch (Exception e) {
LOG.error("Failed during QueryPlan join query run.", e);
throw new IllegalStateException("Error occurred during QueryPlan join query run", e);
} finally {
try {
if (pit != null) {
pit.delete();
}
} catch (RuntimeException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
LOG.info("Error deleting point in time {} ", pit);
}
}
}

/** Extract JOIN_TIME_OUT hint directly from the original SQL request */
private TimeValue extractJoinTimeoutFromSqlRequest() {
try {
LOG.info("🔍 Extracting hints from original SQL request...");

// Access the private 'request' field from HashJoinQueryPlanRequestBuilder
java.lang.reflect.Field requestField =
planRequestBuilder.getClass().getDeclaredField("request");
requestField.setAccessible(true);
SqlRequest sqlRequest = (SqlRequest) requestField.get(planRequestBuilder);

if (sqlRequest != null) {
String originalSql = sqlRequest.getSql();
LOG.info("🔍 Original SQL: {}", originalSql);

// Parse JOIN_TIME_OUT hint from the SQL string
TimeValue timeout = parseJoinTimeoutFromSql(originalSql);
if (timeout != null) {
LOG.info(
"✅ Successfully extracted JOIN_TIME_OUT from SQL: {} seconds", timeout.getSeconds());
return timeout;
} else {
LOG.info("⚠️ No JOIN_TIME_OUT hint found in SQL string");
}
} else {
LOG.warn("⚠️ SqlRequest is null");
}

} catch (Exception e) {
LOG.warn("⚠️ Error accessing original SQL request", e);
}

return null;
}

/** Parse JOIN_TIME_OUT(number) hint from SQL string using regex */
private TimeValue parseJoinTimeoutFromSql(String sql) {
if (sql == null) {
return null;
}

try {
// Regex pattern to match /*! JOIN_TIME_OUT(number) */
Pattern pattern =
Pattern.compile(
"/\\*!\\s*JOIN_TIME_OUT\\s*\\(\\s*(\\d+)\\s*\\)\\s*\\*/", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(sql);

if (matcher.find()) {
String timeoutStr = matcher.group(1);
int timeoutSeconds = Integer.parseInt(timeoutStr);

LOG.info("✅ Parsed JOIN_TIME_OUT hint: {} seconds", timeoutSeconds);
return TimeValue.timeValueSeconds(timeoutSeconds);
}

} catch (Exception e) {
LOG.warn("Error parsing JOIN_TIME_OUT from SQL: {}", sql, e);
}

return null;
}

@Override
Expand Down
Loading
Loading