Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,6 @@ http-client.env.json
/doctest/sql-cli/
/doctest/opensearch-job-scheduler/
.factorypath

# Claude Code files
.claude/
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.legacy;

import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DOG;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_PEOPLE;

import java.io.IOException;
import java.util.Locale;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.Test;

/** Integration tests for JOIN_TIME_OUT hint functionality. */
public class JoinTimeoutHintIT extends SQLIntegTestCase {

@Override
protected void init() throws Exception {
loadIndex(Index.DOG);
loadIndex(Index.PEOPLE);
loadIndex(Index.GAME_OF_THRONES);
}

/**
* Core integration test: Verify that JOIN_TIME_OUT hint prevents PIT expiration errors by setting
* PIT keepalive to match the hint value instead of default 60 seconds.
*
* <p>This is the primary test for the GitHub issue fix.
*/
@Test
public void testJoinTimeoutHintPreventsPointInTimeExpiration() throws IOException {
String query =
String.format(
Locale.ROOT,
"SELECT /*! JOIN_TIME_OUT(120) */ a.firstname, a.lastname, d.dog_name "
+ "FROM %s a JOIN %s d ON d.holdersName = a.firstname "
+ "WHERE a.age > 25 LIMIT 10",
TEST_INDEX_PEOPLE,
TEST_INDEX_DOG);

// The main test: this should execute without throwing "Point In Time id doesn't exist" error
JSONObject result = executeQuerySafely(query);
assertNotNull(
"Query with JOIN_TIME_OUT hint should execute without PIT expiration error", result);

int resultsCount = getResultsCount(result);
assertTrue("Should execute successfully", resultsCount >= 0);
assertTrue("Should respect LIMIT", resultsCount <= 10);
}

/**
* Regression test: Verify queries without JOIN_TIME_OUT hint still work to ensure implementation
* doesn't break existing functionality
*/
@Test
public void testJoinWithoutTimeoutHintStillWorks() throws IOException {
String query =
String.format(
Locale.ROOT,
"SELECT a.firstname, d.dog_name "
+ "FROM %s a JOIN %s d ON d.holdersName = a.firstname "
+ "WHERE a.age > 25 LIMIT 5",
TEST_INDEX_PEOPLE,
TEST_INDEX_DOG);

JSONObject result = executeQuerySafely(query);
assertNotNull("Query without JOIN_TIME_OUT hint should still work", result);

int resultsCount = getResultsCount(result);
assertTrue("Should work without timeout hint", resultsCount >= 0);
assertTrue("Should respect LIMIT", resultsCount <= 5);
}

private JSONObject executeQuerySafely(String query) throws IOException {
JSONObject result = executeQuery(query);

if (result.has("error")) {
throw new RuntimeException("Query failed: " + result.getJSONObject("error"));
}

return result;
}

private int getResultsCount(JSONObject result) {
// Check for SQL response format first
if (result.has("total")) {
return result.getInt("total");
}

// Check for traditional OpenSearch hits format
if (result.has("hits")) {
JSONArray hits = getHits(result);
return hits.length();
}

// No results found
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE;

import java.util.Optional;
import java.util.concurrent.ExecutionException;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -20,13 +21,15 @@
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.transport.client.Client;

/** Handler for Point In Time */
public class PointInTimeHandlerImpl implements PointInTimeHandler {
private final Client client;
private String[] indices;
private final Optional<TimeValue> customKeepAlive;
@Getter @Setter private String pitId;
private static final Logger LOG = LogManager.getLogger();

Expand All @@ -39,6 +42,20 @@ public class PointInTimeHandlerImpl implements PointInTimeHandler {
public PointInTimeHandlerImpl(Client client, String[] indices) {
this.client = client;
this.indices = indices;
this.customKeepAlive = Optional.empty();
}

/**
* Constructor with custom keepalive timeout from JOIN_TIME_OUT hint
*
* @param client OpenSearch client
* @param indices list of indices
* @param customKeepAlive Custom keepalive timeout (from JOIN_TIME_OUT hint)
*/
public PointInTimeHandlerImpl(Client client, String[] indices, TimeValue customKeepAlive) {
this.client = client;
this.indices = indices;
this.customKeepAlive = Optional.ofNullable(customKeepAlive);
}

/**
Expand All @@ -50,20 +67,23 @@ public PointInTimeHandlerImpl(Client client, String[] indices) {
public PointInTimeHandlerImpl(Client client, String pitId) {
this.client = client;
this.pitId = pitId;
this.customKeepAlive = Optional.empty();
}

/** Create PIT for given indices */
@Override
public void create() {
CreatePitRequest createPitRequest =
new CreatePitRequest(
LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE), false, indices);
TimeValue keepAlive = getEffectiveKeepAlive();

LOG.info("Creating PIT with keepalive: {} ({}ms)", keepAlive, keepAlive.getMillis());

CreatePitRequest createPitRequest = new CreatePitRequest(keepAlive, false, indices);
ActionFuture<CreatePitResponse> execute =
client.execute(CreatePitAction.INSTANCE, createPitRequest);
try {
CreatePitResponse pitResponse = execute.get();
pitId = pitResponse.getId();
LOG.info("Created Point In Time {} successfully.", pitId);
LOG.info("Created Point In Time {} with keepalive {} successfully.", pitId, keepAlive);
} catch (OpenSearchSecurityException e) {
throw e;
} catch (InterruptedException | ExecutionException e) {
Expand All @@ -86,4 +106,25 @@ public void delete() {
throw new RuntimeException("Error occurred while deleting PIT.", e);
}
}

/**
* Get effective keepalive value by checking custom timeout first, then falling back to default
*
* @return TimeValue for PIT keepalive
*/
private TimeValue getEffectiveKeepAlive() {
if (customKeepAlive.isPresent()) {
LOG.info(
"Using custom PIT keepalive from JOIN_TIME_OUT hint: {} ({}ms)",
customKeepAlive.get(),
customKeepAlive.get().getMillis());
return customKeepAlive.get();
}

// Fallback: use default
TimeValue defaultKeepAlive = LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE);
LOG.info(
"Using default PIT keepalive: {} ({}ms)", defaultKeepAlive, defaultKeepAlive.getMillis());
return defaultKeepAlive;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.util.List;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.legacy.domain.Field;
import org.opensearch.sql.legacy.domain.Select;

Expand All @@ -17,6 +18,7 @@ public class TableInJoinRequestBuilder {
private List<Field> returnedFields;
private Select originalSelect;
private Integer hintLimit;
private TimeValue hintJoinTimeout;

public TableInJoinRequestBuilder() {}

Expand Down Expand Up @@ -59,4 +61,12 @@ public Integer getHintLimit() {
public void setHintLimit(Integer hintLimit) {
this.hintLimit = hintLimit;
}

public TimeValue getHintJoinTimeout() {
return hintJoinTimeout;
}

public void setHintJoinTimeout(TimeValue hintJoinTimeout) {
this.hintJoinTimeout = hintJoinTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.sql.legacy.query.planner;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.legacy.query.join.HashJoinElasticRequestBuilder;
import org.opensearch.sql.legacy.query.planner.core.Config;
import org.opensearch.sql.legacy.query.planner.core.QueryParams;
Expand All @@ -18,6 +21,7 @@
* how it is assembled.
*/
public class HashJoinQueryPlanRequestBuilder extends HashJoinElasticRequestBuilder {
private static final Logger LOG = LogManager.getLogger();

/** Client connection to OpenSearch cluster */
private final Client client;
Expand Down Expand Up @@ -49,6 +53,15 @@ public QueryPlanner plan() {
getTotalLimit(), getFirstTable().getHintLimit(), getSecondTable().getHintLimit());
config.configureTermsFilterOptimization(isUseTermFiltersOptimization());

if (config.timeout() != Config.DEFAULT_TIME_OUT) {
TimeValue joinTimeout = TimeValue.timeValueSeconds(config.timeout());
LOG.info(
"HashJoinQueryPlanRequestBuilder: Using JOIN_TIME_OUT hint: {} seconds",
config.timeout());
getFirstTable().setHintJoinTimeout(joinTimeout);
getSecondTable().setHintJoinTimeout(joinTimeout);
}

return new QueryPlanner(
client,
config,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.sql.legacy.query.planner.physical.node.pointInTime;

import static org.opensearch.sql.opensearch.storage.OpenSearchIndex.METADATA_FIELD_ID;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.PointInTimeBuilder;
Expand All @@ -13,6 +19,7 @@

/** OpenSearch Search API with Point in time as physical implementation of TableScan */
public class PointInTime extends Paginate {
private static final Logger LOG = LogManager.getLogger();

private String pitId;
private PointInTimeHandlerImpl pit;
Expand All @@ -35,8 +42,20 @@ public void close() {

@Override
protected void loadFirstBatch() {
// Create PIT and set to request object
pit = new PointInTimeHandlerImpl(client, request.getOriginalSelect().getIndexArr());
// Check if this table has JOIN_TIME_OUT hint configured
if (request.getHintJoinTimeout() != null) {
TimeValue customTimeout = request.getHintJoinTimeout();
LOG.info(
"PointInTime: Creating PIT with JOIN_TIME_OUT hint: {} seconds",
customTimeout.getSeconds());
pit =
new PointInTimeHandlerImpl(
client, request.getOriginalSelect().getIndexArr(), customTimeout);
} else {
LOG.info("PointInTime: Creating PIT with default timeout value: {}");
pit = new PointInTimeHandlerImpl(client, request.getOriginalSelect().getIndexArr());
}

pit.create();
pitId = pit.getPitId();

Expand Down
Loading
Loading