diff --git a/build.gradle b/build.gradle index 3220f8ba8d98..336af942fbc0 100644 --- a/build.gradle +++ b/build.gradle @@ -377,6 +377,7 @@ project(':iceberg-core') { implementation libs.jackson.databind implementation libs.caffeine implementation libs.roaringbitmap + implementation libs.failsafe compileOnly(libs.hadoop3.client) { exclude group: 'org.apache.avro', module: 'avro' exclude group: 'org.slf4j', module: 'slf4j-log4j12' diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java b/core/src/main/java/org/apache/iceberg/TableScanContext.java index faa1c264d5f1..e40899556e11 100644 --- a/core/src/main/java/org/apache/iceberg/TableScanContext.java +++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java @@ -35,7 +35,7 @@ /** Context object with optional arguments for a TableScan. */ @Value.Immutable -abstract class TableScanContext { +public abstract class TableScanContext { @Nullable public abstract Long snapshotId(); diff --git a/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java b/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java index 0c21fed4de54..b1575035fcc0 100644 --- a/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java @@ -26,6 +26,8 @@ import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchPlanIdException; +import org.apache.iceberg.exceptions.NoSuchPlanTaskException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.exceptions.NotAuthorizedException; @@ -73,6 +75,14 @@ public static Consumer tableCommitHandler() { return CommitErrorHandler.INSTANCE; } + public static Consumer planErrorHandler() { + return PlanErrorHandler.INSTANCE; + } + + public static Consumer planTaskHandler() { + return PlanTaskErrorHandler.INSTANCE; + } + public static Consumer defaultErrorHandler() { return DefaultErrorHandler.INSTANCE; } @@ -125,6 +135,46 @@ public void accept(ErrorResponse error) { } } + /** Plan level error handler. */ + private static class PlanErrorHandler extends DefaultErrorHandler { + private static final ErrorHandler INSTANCE = new PlanErrorHandler(); + + @Override + public void accept(ErrorResponse error) { + if (error.code() == 404) { + if (NoSuchNamespaceException.class.getSimpleName().equals(error.type())) { + throw new NoSuchNamespaceException("%s", error.message()); + } else if (NoSuchTableException.class.getSimpleName().equals(error.type())) { + throw new NoSuchTableException("%s", error.message()); + } else { + throw new NoSuchPlanIdException("%s", error.message()); + } + } + + super.accept(error); + } + } + + /** PlanTask level error handler. */ + private static class PlanTaskErrorHandler extends DefaultErrorHandler { + private static final ErrorHandler INSTANCE = new PlanTaskErrorHandler(); + + @Override + public void accept(ErrorResponse error) { + if (error.code() == 404) { + if (NoSuchNamespaceException.class.getSimpleName().equals(error.type())) { + throw new NoSuchNamespaceException("%s", error.message()); + } else if (NoSuchTableException.class.getSimpleName().equals(error.type())) { + throw new NoSuchTableException("%s", error.message()); + } else { + throw new NoSuchPlanTaskException("%s", error.message()); + } + } + + super.accept(error); + } + } + /** View commit error handler. */ private static class ViewCommitErrorHandler extends DefaultErrorHandler { private static final ErrorHandler INSTANCE = new ViewCommitErrorHandler(); diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java index e71610622bac..79617b2982ff 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java @@ -37,6 +37,10 @@ private RESTCatalogProperties() {} public static final String NAMESPACE_SEPARATOR = "namespace-separator"; + // Enable planning on the REST server side + public static final String REST_SCAN_PLANNING_ENABLED = "rest-scan-planning-enabled"; + public static final boolean REST_SCAN_PLANNING_ENABLED_DEFAULT = false; + public enum SnapshotMode { ALL, REFS diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 814ed978c4ad..3f3626589566 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -41,6 +41,7 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.Transaction; import org.apache.iceberg.Transactions; import org.apache.iceberg.catalog.BaseViewSessionCatalog; @@ -158,6 +159,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private MetricsReporter reporter = null; private boolean reportingViaRestEnabled; private Integer pageSize = null; + private boolean restScanPlanningEnabled; private CloseableGroup closeables = null; private Set endpoints; private Supplier> mutationHeaders = Map::of; @@ -270,6 +272,11 @@ public void initialize(String name, Map unresolved) { RESTCatalogProperties.NAMESPACE_SEPARATOR, RESTUtil.NAMESPACE_SEPARATOR_URLENCODED_UTF_8); + this.restScanPlanningEnabled = + PropertyUtil.propertyAsBoolean( + mergedProps, + RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, + RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED_DEFAULT); super.initialize(name, mergedProps); } @@ -476,6 +483,13 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { trackFileIO(ops); + RESTTable restTable = restTableForScanPlanning(ops, finalIdentifier, tableClient); + // RestTable should be only be returned for non-metadata tables, because client would + // not have access to metadata files for example manifests, since all it needs is catalog. + if (restTable != null) { + return restTable; + } + BaseTable table = new BaseTable( ops, @@ -488,6 +502,23 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { return table; } + private RESTTable restTableForScanPlanning( + TableOperations ops, TableIdentifier finalIdentifier, RESTClient restClient) { + // server supports remote planning endpoint and server / client wants to do server side planning + if (endpoints.contains(Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN) && restScanPlanningEnabled) { + return new RESTTable( + ops, + fullTableName(finalIdentifier), + metricsReporter(paths.metrics(finalIdentifier), restClient), + restClient, + Map::of, + finalIdentifier, + paths, + endpoints); + } + return null; + } + private void trackFileIO(RESTTableOperations ops) { if (io != ops.io()) { fileIOTracker.track(ops); @@ -556,6 +587,11 @@ public Table registerTable( trackFileIO(ops); + RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient); + if (restTable != null) { + return restTable; + } + return new BaseTable( ops, fullTableName(ident), metricsReporter(paths.metrics(ident), tableClient)); } @@ -820,6 +856,11 @@ public Table create() { trackFileIO(ops); + RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient); + if (restTable != null) { + return restTable; + } + return new BaseTable( ops, fullTableName(ident), metricsReporter(paths.metrics(ident), tableClient)); } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTable.java b/core/src/main/java/org/apache/iceberg/rest/RESTTable.java new file mode 100644 index 000000000000..5565122aaa6f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTable.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.ImmutableTableScanContext; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.metrics.MetricsReporter; + +class RESTTable extends BaseTable { + private final RESTClient client; + private final Supplier> headers; + private final MetricsReporter reporter; + private final ResourcePaths resourcePaths; + private final TableIdentifier tableIdentifier; + private final Set supportedEndpoints; + + RESTTable( + TableOperations ops, + String name, + MetricsReporter reporter, + RESTClient client, + Supplier> headers, + TableIdentifier tableIdentifier, + ResourcePaths resourcePaths, + Set supportedEndpoints) { + super(ops, name, reporter); + this.reporter = reporter; + this.client = client; + this.headers = headers; + this.tableIdentifier = tableIdentifier; + this.resourcePaths = resourcePaths; + this.supportedEndpoints = supportedEndpoints; + } + + @Override + public TableScan newScan() { + return new RESTTableScan( + this, + schema(), + ImmutableTableScanContext.builder().metricsReporter(reporter).build(), + client, + headers.get(), + operations(), + tableIdentifier, + resourcePaths, + supportedEndpoints); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java new file mode 100644 index 000000000000..741f6e549779 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeException; +import dev.failsafe.RetryPolicy; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.DataTableScan; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.TableScanContext; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.FetchPlanningResultResponse; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class RESTTableScan extends DataTableScan { + private static final Logger LOG = LoggerFactory.getLogger(RESTTableScan.class); + private static final long MIN_SLEEP_MS = 1000; // Initial delay + private static final long MAX_SLEEP_MS = 60 * 1000; // Max backoff delay (1 minute) + private static final int MAX_ATTEMPTS = 10; // Max number of poll checks + private static final long MAX_WAIT_TIME_MS = 5 * 60 * 1000; // Total maximum duration (5 minutes) + private static final double SCALE_FACTOR = 2.0; // Exponential scale factor + + private final RESTClient client; + private final Map headers; + private final TableOperations operations; + private final Table table; + private final ResourcePaths resourcePaths; + private final TableIdentifier tableIdentifier; + private final Set supportedEndpoints; + private final ParserContext parserContext; + private String currentPlanId = null; + + RESTTableScan( + Table table, + Schema schema, + TableScanContext context, + RESTClient client, + Map headers, + TableOperations operations, + TableIdentifier tableIdentifier, + ResourcePaths resourcePaths, + Set supportedEndpoints) { + super(table, schema, context); + this.table = table; + this.client = client; + this.headers = headers; + this.operations = operations; + this.tableIdentifier = tableIdentifier; + this.resourcePaths = resourcePaths; + this.supportedEndpoints = supportedEndpoints; + this.parserContext = + ParserContext.builder() + .add("specsById", table.specs()) + .add("caseSensitive", context().caseSensitive()) + .build(); + } + + @Override + protected TableScan newRefinedScan( + Table refinedTable, Schema refinedSchema, TableScanContext refinedContext) { + return new RESTTableScan( + refinedTable, + refinedSchema, + refinedContext, + client, + headers, + operations, + tableIdentifier, + resourcePaths, + supportedEndpoints); + } + + @Override + public CloseableIterable planFiles() { + Long startSnapshotId = context().fromSnapshotId(); + Long endSnapshotId = context().toSnapshotId(); + Long snapshotId = snapshotId(); + List selectedColumns = + schema().columns().stream().map(Types.NestedField::name).collect(Collectors.toList()); + + List statsFields = null; + if (columnsToKeepStats() != null) { + statsFields = + columnsToKeepStats().stream() + .map(columnId -> schema().findColumnName(columnId)) + .collect(Collectors.toList()); + } + + PlanTableScanRequest.Builder builder = + PlanTableScanRequest.builder() + .withSelect(selectedColumns) + .withFilter(filter()) + .withCaseSensitive(isCaseSensitive()) + .withStatsFields(statsFields); + + if (startSnapshotId != null && endSnapshotId != null) { + builder + .withStartSnapshotId(startSnapshotId) + .withEndSnapshotId(endSnapshotId) + .withUseSnapshotSchema(true); + } else if (snapshotId != null) { + boolean useSnapShotSchema = snapshotId != table.currentSnapshot().snapshotId(); + builder.withSnapshotId(snapshotId).withUseSnapshotSchema(useSnapShotSchema); + } + + return planTableScan(builder.build()); + } + + private CloseableIterable planTableScan(PlanTableScanRequest planTableScanRequest) { + PlanTableScanResponse response = + client.post( + resourcePaths.planTableScan(tableIdentifier), + planTableScanRequest, + PlanTableScanResponse.class, + headers, + ErrorHandlers.tableErrorHandler(), + stringStringMap -> {}, + parserContext); + + PlanStatus planStatus = response.planStatus(); + switch (planStatus) { + case COMPLETED: + currentPlanId = response.planId(); + return scanTasksIterable(response.planTasks(), response.fileScanTasks()); + case SUBMITTED: + Endpoint.check(supportedEndpoints, Endpoint.V1_FETCH_TABLE_SCAN_PLAN); + return fetchPlanningResult(response.planId()); + case FAILED: + throw new IllegalStateException( + String.format( + "Received status: %s for planId: %s", PlanStatus.FAILED, response.planId())); + case CANCELLED: + throw new IllegalStateException( + String.format( + "Received status: %s for planId: %s", PlanStatus.CANCELLED, response.planId())); + default: + throw new IllegalStateException( + String.format("Invalid planStatus: %s for planId: %s", planStatus, response.planId())); + } + } + + private CloseableIterable fetchPlanningResult(String planId) { + currentPlanId = planId; + + RetryPolicy retryPolicy = + RetryPolicy.builder() + .handleResultIf(response -> response.planStatus() == PlanStatus.SUBMITTED) + .withBackoff( + Duration.ofMillis(MIN_SLEEP_MS), Duration.ofMillis(MAX_SLEEP_MS), SCALE_FACTOR) + .withJitter(0.1) // Add jitter up to 10% of the calculated delay + .withMaxAttempts(MAX_ATTEMPTS) + .withMaxDuration(Duration.ofMillis(MAX_WAIT_TIME_MS)) + .onFailedAttempt( + e -> { + // Log when a retry occurs + LOG.debug( + "Plan {} still SUBMITTED (Attempt {}/{}). Previous attempt took {} ms.", + planId, + e.getAttemptCount(), + MAX_ATTEMPTS, + e.getElapsedAttemptTime().toMillis()); + }) + .onFailure( + e -> { + LOG.warn( + "Polling for plan {} failed due to: {}", + planId, + e.getException().getMessage()); + cancelPlan(); + }) + .build(); + + try { + FetchPlanningResultResponse response = + Failsafe.with(retryPolicy) + .get( + () -> + client.get( + resourcePaths.plan(tableIdentifier, planId), + headers, + FetchPlanningResultResponse.class, + headers, + ErrorHandlers.planErrorHandler(), + parserContext)); + Preconditions.checkState( + response.planStatus() == PlanStatus.COMPLETED, + "Plan finished with unexpected status %s for planId: %s", + response.planStatus(), + planId); + + return scanTasksIterable(response.planTasks(), response.fileScanTasks()); + } catch (FailsafeException e) { + // FailsafeException is thrown when retries are exhausted (Max Attempts/Duration) + // Cleanup is handled by the .onFailure() hook, so we just wrap and rethrow. + throw new IllegalStateException( + String.format("Polling timed out or exceeded max attempts for planId: %s.", planId), e); + } catch (Exception e) { + // Catch any immediate non-retryable exceptions (e.g., I/O errors, auth errors) + try { + cancelPlan(); + } catch (Exception cancelException) { + // Ignore cancellation failures during exception handling + e.addSuppressed(cancelException); + } + throw e; + } + } + + private CloseableIterable scanTasksIterable( + List planTasks, List fileScanTasks) { + if (planTasks != null && !planTasks.isEmpty()) { + Endpoint.check(supportedEndpoints, Endpoint.V1_FETCH_TABLE_SCAN_PLAN_TASKS); + } + + return CloseableIterable.whenComplete( + new ScanTaskIterable( + planTasks, + fileScanTasks == null ? List.of() : fileScanTasks, + client, + resourcePaths, + tableIdentifier, + headers, + planExecutor(), + parserContext), + this::cancelPlan); + } + + @VisibleForTesting + @SuppressWarnings("checkstyle:RegexpMultiline") + public boolean cancelPlan() { + String planId = currentPlanId; + if (planId == null || !supportedEndpoints.contains(Endpoint.V1_CANCEL_TABLE_SCAN_PLAN)) { + return false; + } + + try { + client.delete( + resourcePaths.plan(tableIdentifier, planId), + Map.of(), + null, + headers, + ErrorHandlers.planErrorHandler()); + currentPlanId = null; + return true; + } catch (Exception e) { + // Plan might have already completed or failed, which is acceptable + return false; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java b/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java index d85b00d02ebe..231a966f8062 100644 --- a/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java +++ b/core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java @@ -151,6 +151,40 @@ public String renameView() { return SLASH.join("v1", prefix, "views", "rename"); } + public String planTableScan(TableIdentifier ident) { + return SLASH.join( + "v1", + prefix, + "namespaces", + pathEncode(ident.namespace()), + "tables", + RESTUtil.encodeString(ident.name()), + "plan"); + } + + public String plan(TableIdentifier ident, String planId) { + return SLASH.join( + "v1", + prefix, + "namespaces", + pathEncode(ident.namespace()), + "tables", + RESTUtil.encodeString(ident.name()), + "plan", + RESTUtil.encodeString(planId)); + } + + public String fetchScanTasks(TableIdentifier ident) { + return SLASH.join( + "v1", + prefix, + "namespaces", + pathEncode(ident.namespace()), + "tables", + RESTUtil.encodeString(ident.name()), + "tasks"); + } + private String pathEncode(Namespace ns) { return RESTUtil.encodeNamespace(ns, namespaceSeparator); } diff --git a/core/src/main/java/org/apache/iceberg/rest/ScanTaskIterable.java b/core/src/main/java/org/apache/iceberg/rest/ScanTaskIterable.java new file mode 100644 index 000000000000..a03be784da93 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/ScanTaskIterable.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.rest.requests.FetchScanTasksRequest; +import org.apache.iceberg.rest.responses.FetchScanTasksResponse; +import org.apache.iceberg.util.ThreadPools; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class ScanTaskIterable implements CloseableIterable { + + private static final Logger LOG = LoggerFactory.getLogger(ScanTaskIterable.class); + private static final int DEFAULT_TASK_QUEUE_CAPACITY = 1000; + private static final long QUEUE_POLL_TIMEOUT_MS = 100; + private static final int WORKER_POOL_SIZE = Math.max(1, ThreadPools.WORKER_THREAD_POOL_SIZE / 4); + private final BlockingQueue taskQueue; + private final ConcurrentLinkedQueue initialFileScanTasks; + private final ConcurrentLinkedQueue planTasks; + private final AtomicInteger activeWorkers = new AtomicInteger(0); + private final AtomicBoolean shutdown = new AtomicBoolean(false); + private final ExecutorService executorService; + private final RESTClient client; + private final ResourcePaths resourcePaths; + private final TableIdentifier tableIdentifier; + private final Map headers; + private final ParserContext parserContext; + + ScanTaskIterable( + List initialPlanTasks, + List initialFileScanTasks, + RESTClient client, + ResourcePaths resourcePaths, + TableIdentifier tableIdentifier, + Map headers, + ExecutorService executorService, + ParserContext parserContext) { + + this.taskQueue = new LinkedBlockingQueue<>(DEFAULT_TASK_QUEUE_CAPACITY); + this.planTasks = new ConcurrentLinkedQueue<>(); + // Initialize initial file scan tasks queue so that multiple workers can poll produce from it. + this.initialFileScanTasks = new ConcurrentLinkedQueue<>(initialFileScanTasks); + + this.client = client; + this.resourcePaths = resourcePaths; + this.tableIdentifier = tableIdentifier; + this.headers = headers; + this.executorService = executorService; + this.parserContext = parserContext; + + if (initialPlanTasks != null && !initialPlanTasks.isEmpty()) { + planTasks.addAll(initialPlanTasks); + } else if (initialFileScanTasks.isEmpty()) { + // nothing to do, no need to spawn workers. + return; + } + + submitFixedWorkers(); + } + + private void submitFixedWorkers() { + if (planTasks.isEmpty() && initialFileScanTasks.isEmpty()) { + // nothing to do + return; + } + + // need to spawn at least one worker to enqueue initial file scan tasks + int numWorkers = Math.min(WORKER_POOL_SIZE, Math.max(planTasks.size(), 1)); + + for (int i = 0; i < numWorkers; i++) { + executorService.execute(new PlanTaskWorker()); + } + } + + @Override + public CloseableIterator iterator() { + return new ScanTasksIterator(); + } + + @Override + public void close() throws IOException {} + + private class PlanTaskWorker implements Runnable { + + @Override + public void run() { + activeWorkers.incrementAndGet(); + + try { + while (!shutdown.get()) { + String planTask = planTasks.poll(); + if (planTask == null) { + // if there are no more plan tasks, see if we can just add any remaining initial + // file scan tasks before exiting. + while (!initialFileScanTasks.isEmpty()) { + FileScanTask initialFileScanTask = initialFileScanTasks.poll(); + if (initialFileScanTask != null) { + taskQueue.put(initialFileScanTask); + } + } + return; + } + + processPlanTask(planTask); + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw new RuntimeException("Worker failed processing planTask", e); + } finally { + int remaining = activeWorkers.decrementAndGet(); + + if (remaining == 0 + && !planTasks.isEmpty() + && !shutdown.get() + && !initialFileScanTasks.isEmpty()) { + executorService.execute(new PlanTaskWorker()); + } + } + } + + private void processPlanTask(String planTask) throws InterruptedException { + FetchScanTasksResponse response = fetchScanTasks(planTask); + // immediately add any new plan tasks to the queue so the idle workers can pick them up + if (response.planTasks() != null) { + planTasks.addAll(response.planTasks()); + } + + // before blocking on the task queue, check for shutdown again + if (shutdown.get()) { + return; + } + + // Now since the network IO is done, first add any initial file scan tasks + while (!initialFileScanTasks.isEmpty()) { + FileScanTask initialFileScanTask = initialFileScanTasks.poll(); + if (initialFileScanTask != null) { + taskQueue.put(initialFileScanTask); + } + } + + if (response.fileScanTasks() != null) { + for (FileScanTask task : response.fileScanTasks()) { + taskQueue.put(task); + } + } + } + + private FetchScanTasksResponse fetchScanTasks(String planTask) { + FetchScanTasksRequest request = new FetchScanTasksRequest(planTask); + + return client.post( + resourcePaths.fetchScanTasks(tableIdentifier), + request, + FetchScanTasksResponse.class, + headers, + ErrorHandlers.planTaskHandler(), + stringStringMap -> {}, + parserContext); + } + } + + private class ScanTasksIterator implements CloseableIterator { + private FileScanTask nextTask = null; + + @Override + public boolean hasNext() { + if (nextTask != null) { + return true; + } + + while (true) { + if (isDone()) { + return false; + } + + try { + nextTask = taskQueue.poll(QUEUE_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS); + if (nextTask != null) { + return true; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + } + + @Override + public FileScanTask next() { + if (!hasNext()) { + throw new NoSuchElementException("No more scan tasks available"); + } + FileScanTask result = nextTask; + nextTask = null; + return result; + } + + @Override + public void close() { + shutdown.set(true); + LOG.info( + "ScanTasksIterator is closing. Clearing {} queued tasks and {} plan tasks.", + taskQueue.size(), + planTasks.size()); + taskQueue.clear(); + planTasks.clear(); + } + + private boolean isDone() { + return taskQueue.isEmpty() + && planTasks.isEmpty() + && activeWorkers.get() == 0 + && initialFileScanTasks.isEmpty(); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java b/core/src/test/java/org/apache/iceberg/TestBase.java index 0929c1bd37e6..ddac3709443d 100644 --- a/core/src/test/java/org/apache/iceberg/TestBase.java +++ b/core/src/test/java/org/apache/iceberg/TestBase.java @@ -91,6 +91,15 @@ public class TestBase { .withPartitionPath("data_bucket=0") // easy way to set partition data for now .withRecordCount(1) .build(); + public static final DeleteFile FILE_A_EQUALITY_DELETES = + FileMetadata.deleteFileBuilder(SPEC) + .ofEqualityDeletes(1) // delete on column 1 (id column) + .withPath("/path/to/data-a-equality-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=0") // easy way to set partition data for now + .withRecordCount(1) + .build(); + static final DeleteFile FILE_A_DV = FileMetadata.deleteFileBuilder(SPEC) .ofPositionDeletes() @@ -127,6 +136,14 @@ public class TestBase { .withPartitionPath("data_bucket=1") // easy way to set partition data for now .withRecordCount(1) .build(); + public static final DeleteFile FILE_B_EQUALITY_DELETES = + FileMetadata.deleteFileBuilder(SPEC) + .ofEqualityDeletes(1) // delete on column 1 (id column) + .withPath("/path/to/data-b-equality-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=1") // same partition as FILE_B + .withRecordCount(1) + .build(); static final DeleteFile FILE_B_DV = FileMetadata.deleteFileBuilder(SPEC) .ofPositionDeletes() @@ -154,6 +171,14 @@ public class TestBase { .withPartitionPath("data_bucket=2") // easy way to set partition data for now .withRecordCount(1) .build(); + public static final DeleteFile FILE_C_EQUALITY_DELETES = + FileMetadata.deleteFileBuilder(SPEC) + .ofEqualityDeletes(1) // delete on column 1 (id column) + .withPath("/path/to/data-c-equality-deletes.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket=2") // same partition as FILE_C + .withRecordCount(1) + .build(); static final DataFile FILE_D = DataFiles.builder(SPEC) .withPath("/path/to/data-d.parquet") @@ -170,7 +195,7 @@ public class TestBase { .withPartitionPath("data_bucket=3") // easy way to set partition data for now .withRecordCount(1) .build(); - static final DataFile FILE_WITH_STATS = + public static final DataFile FILE_WITH_STATS = DataFiles.builder(SPEC) .withPath("/path/to/data-with-stats.parquet") .withMetrics( diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java index 586da0d10782..8d59ee0393fa 100644 --- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java +++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java @@ -112,7 +112,7 @@ public class RESTCatalogAdapter extends BaseHTTPClient { private final ViewCatalog asViewCatalog; private AuthSession authSession = AuthSession.EMPTY; - private final PlanningBehavior planningBehavior = planningBehavior(); + private PlanningBehavior planningBehavior; public RESTCatalogAdapter(Catalog catalog) { this.catalog = catalog; @@ -321,8 +321,8 @@ public T handleRequest( catalog, ident, request, - planningBehavior::shouldPlanTableScanAsync, - scan -> planningBehavior.numberFileScanTasksPerPlanTask())); + planningBehavior()::shouldPlanTableScanAsync, + scan -> planningBehavior().numberFileScanTasksPerPlanTask())); } case FETCH_PLANNING_RESULT: @@ -603,7 +603,11 @@ default boolean shouldPlanTableScanAsync(Scan scan) { } protected PlanningBehavior planningBehavior() { - return new PlanningBehavior() {}; + return this.planningBehavior == null ? new PlanningBehavior() {} : planningBehavior; + } + + protected void setPlanningBehavior(PlanningBehavior behavior) { + this.planningBehavior = behavior; } @Override diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java new file mode 100644 index 000000000000..ba13d1e3c1c4 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java @@ -0,0 +1,1003 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import static org.apache.iceberg.TestBase.FILE_A; +import static org.apache.iceberg.TestBase.FILE_A_DELETES; +import static org.apache.iceberg.TestBase.FILE_A_EQUALITY_DELETES; +import static org.apache.iceberg.TestBase.FILE_B; +import static org.apache.iceberg.TestBase.FILE_B_DELETES; +import static org.apache.iceberg.TestBase.FILE_B_EQUALITY_DELETES; +import static org.apache.iceberg.TestBase.FILE_C; +import static org.apache.iceberg.TestBase.FILE_C_EQUALITY_DELETES; +import static org.apache.iceberg.TestBase.SCHEMA; +import static org.apache.iceberg.TestBase.SPEC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Scan; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.rest.responses.ConfigResponse; +import org.apache.iceberg.rest.responses.ErrorResponse; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.gzip.GzipHandler; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.Mockito; + +public class TestRESTScanPlanning { + private static final ObjectMapper MAPPER = RESTObjectMapper.mapper(); + private static final Namespace NS = Namespace.of("ns"); + + private InMemoryCatalog backendCatalog; + private Server httpServer; + private RESTCatalogAdapter adapterForRESTServer; + private ParserContext parserContext; + @TempDir private Path temp; + private RESTCatalog restCatalogWithScanPlanning; + + @BeforeEach + public void setupCatalogs() throws Exception { + File warehouse = temp.toFile(); + this.backendCatalog = new InMemoryCatalog(); + this.backendCatalog.initialize( + "in-memory", + ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse.getAbsolutePath())); + + adapterForRESTServer = + Mockito.spy( + new RESTCatalogAdapter(backendCatalog) { + @Override + public T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> responseHeaders) { + if (ResourcePaths.config().equals(request.path())) { + return castResponse( + responseType, + ConfigResponse.builder() + .withEndpoints( + Arrays.stream(Route.values()) + .map(r -> Endpoint.create(r.method().name(), r.resourcePath())) + .collect(Collectors.toList())) + .withOverrides( + ImmutableMap.of( + RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true")) + .build()); + } + Object body = roundTripSerialize(request.body(), "request"); + HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build(); + T response = super.execute(req, responseType, errorHandler, responseHeaders); + return roundTripSerialize(response, "response"); + } + }); + + ServletContextHandler servletContext = + new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + servletContext.addServlet( + new ServletHolder(new RESTCatalogServlet(adapterForRESTServer)), "/*"); + servletContext.setHandler(new GzipHandler()); + + this.httpServer = new Server(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0)); + httpServer.setHandler(servletContext); + httpServer.start(); + + // Initialize catalog with scan planning enabled + this.restCatalogWithScanPlanning = initCatalog("prod-with-scan-planning", ImmutableMap.of()); + } + + @AfterEach + public void teardownCatalogs() throws Exception { + if (restCatalogWithScanPlanning != null) { + restCatalogWithScanPlanning.close(); + } + + if (backendCatalog != null) { + backendCatalog.close(); + } + + if (httpServer != null) { + httpServer.stop(); + httpServer.join(); + } + } + + // ==================== Helper Methods ==================== + + private RESTCatalog initCatalog(String catalogName, Map additionalProperties) { + RESTCatalog catalog = + new RESTCatalog( + SessionCatalog.SessionContext.createEmpty(), + (config) -> + HTTPClient.builder(config) + .uri(config.get(CatalogProperties.URI)) + .withHeaders(RESTUtil.configHeaders(config)) + .build()); + catalog.setConf(new Configuration()); + Map properties = + ImmutableMap.of( + CatalogProperties.URI, + httpServer.getURI().toString(), + CatalogProperties.FILE_IO_IMPL, + "org.apache.iceberg.inmemory.InMemoryFileIO"); + catalog.initialize( + catalogName, + ImmutableMap.builder() + .putAll(properties) + .putAll(additionalProperties) + .build()); + return catalog; + } + + @SuppressWarnings("unchecked") + private T roundTripSerialize(T payload, String description) { + if (payload == null) { + return null; + } + + try { + if (payload instanceof RESTMessage) { + RESTMessage message = (RESTMessage) payload; + ObjectReader reader = MAPPER.readerFor(message.getClass()); + if (parserContext != null && !parserContext.isEmpty()) { + reader = reader.with(parserContext.toInjectableValues()); + } + return reader.readValue(MAPPER.writeValueAsString(message)); + } else { + // use Map so that Jackson doesn't try to instantiate ImmutableMap from payload.getClass() + return (T) MAPPER.readValue(MAPPER.writeValueAsString(payload), Map.class); + } + } catch (JsonProcessingException e) { + throw new RuntimeException( + String.format("Failed to serialize and deserialize %s: %s", description, payload), e); + } + } + + private void setParserContext(Table table) { + parserContext = + ParserContext.builder().add("specsById", table.specs()).add("caseSensitive", false).build(); + } + + private RESTCatalog scanPlanningCatalog() { + return restCatalogWithScanPlanning; + } + + private void configurePlanningBehavior( + Function configurator) { + TestPlanningBehavior.Builder builder = TestPlanningBehavior.builder(); + adapterForRESTServer.setPlanningBehavior(configurator.apply(builder).build()); + } + + private Table createTableWithScanPlanning(RESTCatalog catalog, String tableName) { + return createTableWithScanPlanning(catalog, TableIdentifier.of(NS, tableName)); + } + + private Table createTableWithScanPlanning(RESTCatalog catalog, TableIdentifier identifier) { + catalog.createNamespace(identifier.namespace()); + return catalog.buildTable(identifier, SCHEMA).withPartitionSpec(SPEC).create(); + } + + private RESTTable restTableFor(RESTCatalog catalog, String tableName) { + Table table = createTableWithScanPlanning(catalog, tableName); + table.newAppend().appendFile(FILE_A).commit(); + assertThat(table).isInstanceOf(RESTTable.class); + return (RESTTable) table; + } + + private RESTTableScan restTableScanFor(Table table) { + assertThat(table).isInstanceOf(RESTTable.class); + RESTTable restTable = (RESTTable) table; + TableScan scan = restTable.newScan(); + assertThat(scan).isInstanceOf(RESTTableScan.class); + return (RESTTableScan) scan; + } + + // ==================== Test Planning Behavior ==================== + + /** Enum for parameterized tests to test both synchronous and asynchronous planning modes. */ + private enum PlanningMode + implements Function { + SYNCHRONOUS(TestPlanningBehavior.Builder::synchronous), + ASYNCHRONOUS(TestPlanningBehavior.Builder::asynchronous); + + private final Function configurer; + + PlanningMode(Function configurer) { + this.configurer = configurer; + } + + @Override + public TestPlanningBehavior.Builder apply(TestPlanningBehavior.Builder builder) { + return this.configurer.apply(builder); + } + } + + private static class TestPlanningBehavior implements RESTCatalogAdapter.PlanningBehavior { + private final boolean asyncPlanning; + private final int tasksPerPage; + + private TestPlanningBehavior(boolean asyncPlanning, int tasksPerPage) { + this.asyncPlanning = asyncPlanning; + this.tasksPerPage = tasksPerPage; + } + + private static Builder builder() { + return new Builder(); + } + + @Override + public boolean shouldPlanTableScanAsync(Scan scan) { + return asyncPlanning; + } + + @Override + public int numberFileScanTasksPerPlanTask() { + return tasksPerPage; + } + + protected static class Builder { + private boolean asyncPlanning; + private int tasksPerPage; + + Builder asyncPlanning(boolean async) { + asyncPlanning = async; + return this; + } + + Builder tasksPerPage(int tasks) { + tasksPerPage = tasks; + return this; + } + + // Convenience methods for common test scenarios + Builder synchronous() { + return asyncPlanning(false).tasksPerPage(100); + } + + Builder synchronousWithPagination() { + return asyncPlanning(false).tasksPerPage(1); + } + + Builder asynchronous() { + return asyncPlanning(true).tasksPerPage(100); + } + + TestPlanningBehavior build() { + return new TestPlanningBehavior(asyncPlanning, tasksPerPage); + } + } + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + void scanPlanningWithAllTasksInSingleResponse( + Function planMode) + throws IOException { + configurePlanningBehavior(planMode); + Table table = restTableFor(scanPlanningCatalog(), "all_tasks_table"); + setParserContext(table); + + // Verify actual data file is returned with correct count + try (CloseableIterable iterable = table.newScan().planFiles()) { + List tasks = Lists.newArrayList(iterable); + + assertThat(tasks).hasSize(1); + assertThat(tasks.get(0).file().location()).isEqualTo(FILE_A.location()); + assertThat(tasks.get(0).deletes()).isEmpty(); + } + } + + @Test + public void nestedPlanTaskPagination() throws IOException { + // Configure: synchronous planning with very small pages (creates nested plan task structure) + configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination); + + Table table = restTableFor(scanPlanningCatalog(), "nested_plan_task_table"); + // add one more files for proper pagination + table.newFastAppend().appendFile(FILE_B).commit(); + setParserContext(table); + + // Verify actual data file is returned via nested plan task fetching with correct count + try (CloseableIterable iterable = table.newScan().planFiles()) { + List tasks = Lists.newArrayList(iterable); + assertThat(tasks).hasSize(2); + assertThat(tasks) + .anySatisfy(task -> assertThat(task.file().location()).isEqualTo(FILE_A.location())); + assertThat(tasks) + .anySatisfy(task -> assertThat(task.file().location()).isEqualTo(FILE_B.location())); + assertThat(tasks.get(0).deletes()).isEmpty(); + assertThat(tasks.get(1).deletes()).isEmpty(); + } + } + + @Test + public void cancelPlanMethodAvailability() { + configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination); + RESTTable table = restTableFor(scanPlanningCatalog(), "cancel_method_table"); + RESTTableScan restTableScan = restTableScanFor(table); + + // Test that cancelPlan method is available and callable + // When no plan is active, it should return false + assertThat(restTableScan.cancelPlan()).isFalse(); + + // Verify the method doesn't throw exceptions when called multiple times + assertThat(restTableScan.cancelPlan()).isFalse(); + } + + @Test + public void iterableCloseTriggersCancel() throws IOException { + configurePlanningBehavior(TestPlanningBehavior.Builder::asynchronous); + RESTTable restTable = restTableFor(scanPlanningCatalog(), "iterable_close_test"); + setParserContext(restTable); + + TableScan scan = restTable.newScan(); + assertThat(scan).isInstanceOf(RESTTableScan.class); + RESTTableScan restTableScan = (RESTTableScan) scan; + + // Get the iterable + CloseableIterable iterable = restTableScan.planFiles(); + + // call cancelPlan before closing the iterable + boolean cancelled = restTableScan.cancelPlan(); + assertThat(cancelled).isTrue(); + + // Verify we can close the iterable without exceptions + // This tests that cancellation callbacks are properly wired through + iterable.close(); + } + + @ParameterizedTest + @EnumSource(MetadataTableType.class) + public void metadataTablesWithRemotePlanning(MetadataTableType type) throws IOException { + assumeThat(type) + .as("POSITION_DELETES table does not implement newScan() method") + .isNotEqualTo(MetadataTableType.POSITION_DELETES); + + configurePlanningBehavior(TestPlanningBehavior.Builder::synchronous); + RESTTable table = restTableFor(scanPlanningCatalog(), "metadata_tables_test"); + table.newAppend().appendFile(FILE_B).commit(); + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_B_EQUALITY_DELETES).commit(); + setParserContext(table); + // RESTTable should be only be returned for non-metadata tables, because client would + // not have access to metadata files for example manifests, since all it needs is file scan + // tasks, this test just verifies that metadata tables can be scanned with RESTTable. + Table metadataTableInstance = MetadataTableUtils.createMetadataTableInstance(table, type); + assertThat(metadataTableInstance).isNotNull(); + assertThat(metadataTableInstance.newScan().planFiles()).isNotEmpty(); + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + void remoteScanPlanningWithEmptyTable( + Function planMode) { + configurePlanningBehavior(planMode); + Table table = createTableWithScanPlanning(scanPlanningCatalog(), "empty_table_test"); + setParserContext(table); + assertThat(table.newScan().planFiles()).isEmpty(); + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + @Disabled("Pruning files based on columns is not yet supported in REST scan planning") + void remoteScanPlanningWithNonExistentColumn( + Function planMode) { + configurePlanningBehavior(planMode); + Table table = restTableFor(scanPlanningCatalog(), "non-existent_column"); + setParserContext(table); + assertThat(table.newScan().select("non-existent-column").planFiles()).isEmpty(); + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + void incrementalScan( + Function planMode) { + configurePlanningBehavior(planMode); + Table table = restTableFor(scanPlanningCatalog(), "incremental_scan"); + setParserContext(table); + + // Add second file to the table + table.newAppend().appendFile(FILE_B).commit(); + long startSnapshotId = table.currentSnapshot().snapshotId(); + // Add third file to the table + table.newAppend().appendFile(FILE_C).commit(); + long endSnapshotId = table.currentSnapshot().snapshotId(); + assertThat( + table + .newIncrementalAppendScan() + .fromSnapshotInclusive(startSnapshotId) + .toSnapshot(endSnapshotId) + .planFiles()) + .hasSize(2) + .extracting(task -> task.file().location()) + .contains(FILE_C.location(), FILE_B.location()); + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + void remoteScanPlanningWithPositionDeletes( + Function planMode) + throws IOException { + configurePlanningBehavior(planMode); + Table table = restTableFor(scanPlanningCatalog(), "position_deletes_test"); + setParserContext(table); + + // Add position deletes that correspond to FILE_A (which was added in table creation) + table.newRowDelta().addDeletes(FILE_A_DELETES).commit(); + + // Ensure we have a RESTTable with server-side planning enabled + assertThat(table).isInstanceOf(RESTTable.class); + + // Execute scan planning - should handle position deletes correctly + try (CloseableIterable iterable = table.newScan().planFiles()) { + List tasks = Lists.newArrayList(iterable); + + // Verify we get tasks back (specific count depends on implementation) + assertThat(tasks).hasSize(1); // 1 data file: FILE_A + + // Verify specific task content and delete file associations + FileScanTask taskWithDeletes = + assertThat(tasks) + .filteredOn(task -> !task.deletes().isEmpty()) + .first() + .as("Expected at least one task with delete files") + .actual(); + + assertThat(taskWithDeletes.file().location()).isEqualTo(FILE_A.location()); + assertThat(taskWithDeletes.deletes()).hasSize(1); // 1 delete file: FILE_A_DELETES + assertThat(taskWithDeletes.deletes().get(0).location()).isEqualTo(FILE_A_DELETES.location()); + } + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + void remoteScanPlanningWithEqualityDeletes( + Function planMode) + throws IOException { + configurePlanningBehavior(planMode); + Table table = restTableFor(scanPlanningCatalog(), "equality_deletes_test"); + setParserContext(table); + + // Add equality deletes that correspond to FILE_A + table.newRowDelta().addDeletes(FILE_A_EQUALITY_DELETES).commit(); + + // Execute scan planning - should handle equality deletes correctly + try (CloseableIterable iterable = table.newScan().planFiles()) { + List tasks = Lists.newArrayList(iterable); + + // Verify the task count and file paths + assertThat(tasks).hasSize(1); // 1 data file: FILE_A + + // Verify specific task content and equality delete file associations + FileScanTask taskWithDeletes = + assertThat(tasks) + .filteredOn(task -> !task.deletes().isEmpty()) + .first() + .as("Expected at least one task with delete files") + .actual(); + + assertThat(taskWithDeletes.file().location()).isEqualTo(FILE_A.location()); + assertThat(taskWithDeletes.deletes()).hasSize(1); + assertThat(taskWithDeletes.deletes().get(0).location()) + .isEqualTo(FILE_A_EQUALITY_DELETES.location()); + } + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + void remoteScanPlanningWithMixedDeletes( + Function planMode) + throws IOException { + configurePlanningBehavior(planMode); + Table table = restTableFor(scanPlanningCatalog(), "mixed_deletes_test"); + setParserContext(table); + + // Add both position and equality deletes in separate commits + table.newRowDelta().addDeletes(FILE_A_DELETES).commit(); + table + .newRowDelta() + .addDeletes(FILE_B_EQUALITY_DELETES) + .commit(); // Equality deletes for different partition + + // Execute scan planning - should handle mixed delete types correctly + try (CloseableIterable iterable = table.newScan().planFiles()) { + List tasks = Lists.newArrayList(iterable); + + // Verify task count: FILE_A only (FILE_B_EQUALITY_DELETES is in different partition) + assertThat(tasks).hasSize(1); // 1 data file: FILE_A + + // Verify FILE_A with position deletes (FILE_B_EQUALITY_DELETES not associated since no + // FILE_B) + FileScanTask fileATask = + assertThat(tasks) + .filteredOn(task -> task.file().location().equals(FILE_A.location())) + .first() + .as("Expected FILE_A in scan tasks") + .actual(); + + assertThat(fileATask.deletes()).hasSize(1); + assertThat(fileATask.deletes().get(0).location()).isEqualTo(FILE_A_DELETES.location()); + } + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + void remoteScanPlanningWithMultipleDeleteFiles( + Function planMode) + throws IOException { + configurePlanningBehavior(planMode); + Table table = restTableFor(scanPlanningCatalog(), "multiple_deletes_test"); + setParserContext(table); + + // Add FILE_B and FILE_C to the table (FILE_A is already added during table creation) + table.newAppend().appendFile(FILE_B).appendFile(FILE_C).commit(); + + // Add multiple delete files corresponding to FILE_A, FILE_B, FILE_C + table + .newRowDelta() + .addDeletes(FILE_A_DELETES) + .addDeletes(FILE_B_DELETES) + .addDeletes(FILE_C_EQUALITY_DELETES) + .commit(); + + // Execute scan planning with multiple delete files + try (CloseableIterable iterable = table.newScan().planFiles()) { + List tasks = Lists.newArrayList(iterable); + + // Verify we get tasks back (should have 3 data files: FILE_A, FILE_B, FILE_C) + assertThat(tasks).hasSize(3); // 3 data files + + // Verify FILE_A with position deletes + FileScanTask fileATask = + assertThat(tasks) + .filteredOn(task -> task.file().location().equals(FILE_A.location())) + .first() + .as("Expected FILE_A in scan tasks") + .actual(); + assertThat(fileATask.deletes()).isNotEmpty(); // Has delete files + assertThat(fileATask.deletes().stream().map(DeleteFile::location)) + .contains(FILE_A_DELETES.location()); // FILE_A_DELETES is present + + // Verify FILE_B with position deletes + FileScanTask fileBTask = + assertThat(tasks) + .filteredOn(task -> task.file().location().equals(FILE_B.location())) + .first() + .as("Expected FILE_B in scan tasks") + .actual(); + assertThat(fileBTask.deletes()).isNotEmpty(); // Has delete files + assertThat(fileBTask.deletes().stream().map(DeleteFile::location)) + .contains(FILE_B_DELETES.location()); // FILE_B_DELETES is present + + // Verify FILE_C with equality deletes + FileScanTask fileCTask = + assertThat(tasks) + .filteredOn(task -> task.file().location().equals(FILE_C.location())) + .first() + .as("Expected FILE_C in scan tasks") + .actual(); + assertThat(fileCTask.deletes()).isNotEmpty(); // Has delete files + assertThat(fileCTask.deletes().stream().map(DeleteFile::location)) + .contains(FILE_C_EQUALITY_DELETES.location()); // FILE_C_EQUALITY_DELETES is present + } + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + void remoteScanPlanningWithDeletesAndFiltering( + Function planMode) + throws IOException { + configurePlanningBehavior(planMode); + Table table = restTableFor(scanPlanningCatalog(), "deletes_filtering_test"); + setParserContext(table); + + // Add FILE_B to have more data for filtering + table.newAppend().appendFile(FILE_B).commit(); + + // Add equality delete for FILE_B + table.newRowDelta().addDeletes(FILE_B_EQUALITY_DELETES).commit(); + + // Create a filtered scan and execute scan planning with filtering and deletes + try (CloseableIterable iterable = + table.newScan().filter(Expressions.lessThan("id", 4)).planFiles()) { + List tasks = Lists.newArrayList(iterable); + + // Verify scan planning works with both filtering and deletes + assertThat(tasks).hasSize(2); // 2 data files: FILE_A, FILE_B + + // FILE_A should have no delete files + FileScanTask fileATask = + assertThat(tasks) + .filteredOn(task -> task.file().location().equals(FILE_A.location())) + .first() + .as("Expected FILE_A in scan tasks") + .actual(); + assertThat(fileATask.deletes()).isEmpty(); // 0 delete files for FILE_A + + // FILE_B should have FILE_B_EQUALITY_DELETES + FileScanTask fileBTask = + assertThat(tasks) + .filteredOn(task -> task.file().location().equals(FILE_B.location())) + .first() + .as("Expected FILE_B in scan tasks") + .actual(); + assertThat(fileBTask.deletes()).hasSize(1); // 1 delete file: FILE_B_EQUALITY_DELETES + assertThat(fileBTask.deletes().get(0).location()) + .isEqualTo(FILE_B_EQUALITY_DELETES.location()); + } + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + void remoteScanPlanningDeletesCancellation( + Function planMode) + throws IOException { + configurePlanningBehavior(planMode); + Table table = restTableFor(scanPlanningCatalog(), "deletes_cancellation_test"); + setParserContext(table); + + // Add deletes to make the scenario more complex + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A_EQUALITY_DELETES).commit(); + + RESTTableScan restTableScan = restTableScanFor(table); + + // Get the iterable (which may involve async planning with deletes) + try (CloseableIterable iterable = restTableScan.planFiles(); + CloseableIterator iterator = iterable.iterator()) { + // Test cancellation works with delete files present + // Resources will be closed automatically + } + + // Verify cancellation method is still accessible + assertThat(restTableScan.cancelPlan()).isFalse(); // No active plan at this point + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + void remoteScanPlanningWithTimeTravel( + Function planMode) { + // Test server-side scan planning with time travel (snapshot-based queries) + // Verify that snapshot IDs are correctly passed through the REST API + // and that historical scans return the correct files and deletes + configurePlanningBehavior(planMode); + + // Create table and add FILE_A (snapshot 1) + Table table = restTableFor(scanPlanningCatalog(), "snapshot_scan_test"); + setParserContext(table); + table.refresh(); + long snapshot1Id = table.currentSnapshot().snapshotId(); + + // Add FILE_B (snapshot 2) + table.newAppend().appendFile(FILE_B).commit(); + table.refresh(); + long snapshot2Id = table.currentSnapshot().snapshotId(); + assertThat(snapshot2Id).isNotEqualTo(snapshot1Id); + + // Add FILE_C and deletes (snapshots 3 and 4) + table.newAppend().appendFile(FILE_C).commit(); + table.newRowDelta().addDeletes(FILE_A_DELETES).commit(); + table.refresh(); + long snapshot4Id = table.currentSnapshot().snapshotId(); + assertThat(snapshot4Id).isNotEqualTo(snapshot2Id); + + // Test 1: Scan at snapshot 1 (should only see FILE_A, no deletes) + TableScan scan1 = table.newScan().useSnapshot(snapshot1Id); + CloseableIterable iterable1 = scan1.planFiles(); + List tasks1 = Lists.newArrayList(iterable1); + + assertThat(tasks1).hasSize(1); // Only FILE_A exists at snapshot 1 + assertThat(tasks1.get(0).file().location()).isEqualTo(FILE_A.location()); + assertThat(tasks1.get(0).deletes()).isEmpty(); // No deletes at snapshot 1 + + // Test 2: Scan at snapshot 2 (should see FILE_A and FILE_B, no deletes) + TableScan scan2 = table.newScan().useSnapshot(snapshot2Id); + CloseableIterable iterable2 = scan2.planFiles(); + List tasks2 = Lists.newArrayList(iterable2); + + assertThat(tasks2).hasSize(2); // FILE_A and FILE_B exist at snapshot 2 + assertThat(tasks2) + .map(task -> task.file().location()) + .containsExactlyInAnyOrder(FILE_A.location(), FILE_B.location()); + assertThat(tasks2).allMatch(task -> task.deletes().isEmpty()); // No deletes at snapshot 2 + + // Test 3: Scan at current snapshot (should see FILE_A, FILE_B, FILE_C, and FILE_A has deletes) + TableScan scan3 = table.newScan().useSnapshot(snapshot4Id); + CloseableIterable iterable3 = scan3.planFiles(); + List tasks3 = Lists.newArrayList(iterable3); + + assertThat(tasks3).hasSize(3); // All 3 data files exist at snapshot 4 + assertThat(tasks3) + .map(task -> task.file().location()) + .containsExactlyInAnyOrder(FILE_A.location(), FILE_B.location(), FILE_C.location()); + + // Verify FILE_A has deletes at snapshot 4 + FileScanTask fileATask = + assertThat(tasks3) + .filteredOn(task -> task.file().location().equals(FILE_A.location())) + .first() + .as("Expected FILE_A in scan tasks") + .actual(); + assertThat(fileATask.deletes()).hasSize(1); // FILE_A_DELETES present at snapshot 4 + assertThat(fileATask.deletes().get(0).location()).isEqualTo(FILE_A_DELETES.location()); + + // Verify FILE_B and FILE_C have no deletes at snapshot 4 + assertThat(tasks3) + .filteredOn( + task -> + task.file().location().equals(FILE_B.location()) + || task.file().location().equals(FILE_C.location())) + .allMatch(task -> task.deletes().isEmpty()); + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + public void scanPlanningWithMultiplePartitionSpecs() throws IOException { + configurePlanningBehavior(TestPlanningBehavior.Builder::synchronous); + + RESTTable table = restTableFor(scanPlanningCatalog(), "multiple_partition_specs"); + table.newFastAppend().appendFile(FILE_B).commit(); + + // Evolve partition spec to bucket by id with 8 buckets instead of 16 + table.updateSpec().removeField("data_bucket").addField(Expressions.bucket("data", 8)).commit(); + + // Create data file with new partition spec (spec-id=1) + PartitionSpec newSpec = table.spec(); + assertThat(newSpec.specId()).isEqualTo(1); + + DataFile fileWithNewSpec = + DataFiles.builder(newSpec) + .withPath("/path/to/data-new-spec.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("data_bucket_8=3") // 8-bucket partition + .withRecordCount(2) + .build(); + + table.newFastAppend().appendFile(fileWithNewSpec).commit(); + setParserContext(table); + + // Scan table - should return all 3 files despite different partition specs + try (CloseableIterable iterable = table.newScan().planFiles()) { + List tasks = Lists.newArrayList(iterable); + + // Verify all 3 files are present + assertThat(tasks).hasSize(3); + assertThat(tasks) + .map(task -> task.file().location()) + .containsExactlyInAnyOrder( + FILE_A.location(), FILE_B.location(), fileWithNewSpec.location()); + + // Verify files have correct partition spec IDs + assertThat(tasks) + .filteredOn( + task -> + task.file().location().equals(FILE_A.location()) + || task.file().location().equals(FILE_B.location())) + .allMatch(task -> task.spec().specId() == 0); + assertThat(tasks) + .filteredOn(task -> task.file().location().equals(fileWithNewSpec.location())) + .allMatch(task -> task.spec().specId() == 1); + } + } + + // ==================== Endpoint Support Tests ==================== + + /** Helper class to hold catalog and adapter for endpoint support tests. */ + private static class CatalogWithAdapter { + final RESTCatalog catalog; + final RESTCatalogAdapter adapter; + + CatalogWithAdapter(RESTCatalog catalog, RESTCatalogAdapter adapter) { + this.catalog = catalog; + this.adapter = adapter; + } + } + + // Helper: Create base catalog endpoints (namespace and table operations) + private List baseCatalogEndpoints() { + return ImmutableList.of( + Endpoint.V1_CREATE_NAMESPACE, + Endpoint.V1_LOAD_NAMESPACE, + Endpoint.V1_LIST_TABLES, + Endpoint.V1_CREATE_TABLE, + Endpoint.V1_LOAD_TABLE, + Endpoint.V1_UPDATE_TABLE); + } + + // Helper: Create endpoint list with base + specified planning endpoints + private List endpointsWithPlanning(Endpoint... planningEndpoints) { + return ImmutableList.builder() + .addAll(baseCatalogEndpoints()) + .add(planningEndpoints) + .build(); + } + + // Helper: Create catalog with custom endpoint support and optional planning behavior + private CatalogWithAdapter catalogWithEndpoints( + List endpoints, TestPlanningBehavior planningBehavior) { + RESTCatalogAdapter adapter = + Mockito.spy( + new RESTCatalogAdapter(backendCatalog) { + @Override + public T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> responseHeaders) { + if (ResourcePaths.config().equals(request.path())) { + return castResponse( + responseType, ConfigResponse.builder().withEndpoints(endpoints).build()); + } + return super.execute(request, responseType, errorHandler, responseHeaders); + } + }); + + if (planningBehavior != null) { + adapter.setPlanningBehavior(planningBehavior); + } + + RESTCatalog catalog = + new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); + catalog.initialize( + "test", + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, + "org.apache.iceberg.inmemory.InMemoryFileIO", + RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, + "true")); + return new CatalogWithAdapter(catalog, adapter); + } + + @Test + public void serverDoesNotSupportPlanningEndpoint() throws IOException { + // Server doesn't support scan planning at all - should fall back to client-side planning + CatalogWithAdapter catalogWithAdapter = catalogWithEndpoints(baseCatalogEndpoints(), null); + RESTCatalog catalog = catalogWithAdapter.catalog; + Table table = createTableWithScanPlanning(catalog, "no_planning_support"); + assertThat(table).isNotInstanceOf(RESTTable.class); + table.newAppend().appendFile(FILE_A).commit(); + + // Should fall back to client-side planning when endpoint is not supported + assertThat(table.newScan().planFiles()) + .hasSize(1) + .first() + .extracting(ContentScanTask::file) + .extracting(ContentFile::location) + .isEqualTo(FILE_A.location()); + } + + @Test + public void serverSupportsPlanningSyncOnlyNotAsync() { + // Server supports submit (sync) but not fetch (async polling) + // Use ASYNC planning to trigger SUBMITTED status, which will hit the Endpoint.check() + CatalogWithAdapter catalogWithAdapter = + catalogWithEndpoints( + endpointsWithPlanning( + Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN, Endpoint.V1_FETCH_TABLE_SCAN_PLAN_TASKS), + TestPlanningBehavior.builder().asynchronous().build()); + + RESTCatalog catalog = catalogWithAdapter.catalog; + RESTTable table = restTableFor(catalog, "async_not_supported"); + setParserContext(table); + + // Should fail with UnsupportedOperationException when trying to fetch async plan result + // because V1_FETCH_TABLE_SCAN_PLAN endpoint is not supported + assertThatThrownBy(restTableScanFor(table)::planFiles) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Server does not support endpoint: %s", Endpoint.V1_FETCH_TABLE_SCAN_PLAN); + } + + @Test + public void serverSupportsPlanningButNotPagination() { + // Server supports planning but not task pagination endpoint + // Use synchronousWithPagination (tasksPerPage=1) to trigger pagination, which will hit + // Endpoint.check() + CatalogWithAdapter catalogWithAdapter = + catalogWithEndpoints( + endpointsWithPlanning( + Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN, + Endpoint.V1_FETCH_TABLE_SCAN_PLAN, + Endpoint.V1_CANCEL_TABLE_SCAN_PLAN), + TestPlanningBehavior.builder().synchronousWithPagination().build()); + + RESTCatalog catalog = catalogWithAdapter.catalog; + RESTTable table = restTableFor(catalog, "pagination_not_supported"); + table.newAppend().appendFile(FILE_B).commit(); + setParserContext(table); + RESTTableScan scan = restTableScanFor(table); + + // Should fail with UnsupportedOperationException when trying to fetch paginated tasks + // because V1_FETCH_TABLE_SCAN_PLAN_TASKS endpoint is not supported + assertThatThrownBy(scan::planFiles) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "Server does not support endpoint: %s", Endpoint.V1_FETCH_TABLE_SCAN_PLAN_TASKS); + } + + @Test + public void serverSupportsPlanningButNotCancellation() throws IOException { + // Server supports planning but not the cancel endpoint + CatalogWithAdapter catalogWithAdapter = + catalogWithEndpoints( + endpointsWithPlanning( + Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN, + Endpoint.V1_FETCH_TABLE_SCAN_PLAN, + Endpoint.V1_FETCH_TABLE_SCAN_PLAN_TASKS), + TestPlanningBehavior.builder().asynchronous().build()); + + RESTCatalog catalog = catalogWithAdapter.catalog; + RESTTable table = restTableFor(catalog, "cancellation_not_supported"); + setParserContext(table); + RESTTableScan scan = restTableScanFor(table); + + // Get the iterable - this starts async planning + CloseableIterable iterable = scan.planFiles(); + + // Cancellation should not fail even though server doesn't support it + // The client should handle this gracefully by returning false + boolean cancelled = scan.cancelPlan(); + iterable.close(); + + // Verify no exception was thrown - cancelPlan returns false when endpoint not supported + assertThat(cancelled).isFalse(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/TestResourcePaths.java b/core/src/test/java/org/apache/iceberg/rest/TestResourcePaths.java index ebcced1b3766..fcf97e3abc26 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestResourcePaths.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestResourcePaths.java @@ -225,4 +225,67 @@ public void viewWithMultipartNamespace() { assertThat(withPrefix.view(ident)).isEqualTo("v1/ws/catalog/namespaces/n%1Fs/views/view-name"); assertThat(withoutPrefix.view(ident)).isEqualTo("v1/namespaces/n%1Fs/views/view-name"); } + + @Test + public void planEndpointPath() { + TableIdentifier tableId = TableIdentifier.of("test_namespace", "test_table"); + + assertThat(withPrefix.planTableScan(tableId)) + .isEqualTo("v1/ws/catalog/namespaces/test_namespace/tables/test_table/plan"); + assertThat(withoutPrefix.planTableScan(tableId)) + .isEqualTo("v1/namespaces/test_namespace/tables/test_table/plan"); + + // Test with different identifiers + TableIdentifier complexId = TableIdentifier.of(Namespace.of("db", "schema"), "my_table"); + assertThat(withPrefix.planTableScan(complexId)) + .isEqualTo("v1/ws/catalog/namespaces/db%1Fschema/tables/my_table/plan"); + assertThat(withoutPrefix.planTableScan(complexId)) + .isEqualTo("v1/namespaces/db%1Fschema/tables/my_table/plan"); + } + + @Test + public void fetchScanTasksPath() { + TableIdentifier tableId = TableIdentifier.of("test_namespace", "test_table"); + + assertThat(withPrefix.fetchScanTasks(tableId)) + .isEqualTo("v1/ws/catalog/namespaces/test_namespace/tables/test_table/tasks"); + assertThat(withoutPrefix.fetchScanTasks(tableId)) + .isEqualTo("v1/namespaces/test_namespace/tables/test_table/tasks"); + + // Test with different identifiers + TableIdentifier complexId = TableIdentifier.of(Namespace.of("db", "schema"), "my_table"); + assertThat(withPrefix.fetchScanTasks(complexId)) + .isEqualTo("v1/ws/catalog/namespaces/db%1Fschema/tables/my_table/tasks"); + assertThat(withoutPrefix.fetchScanTasks(complexId)) + .isEqualTo("v1/namespaces/db%1Fschema/tables/my_table/tasks"); + } + + @Test + public void cancelPlanEndpointPath() { + TableIdentifier tableId = TableIdentifier.of("test_namespace", "test_table"); + String planId = "plan-abc-123"; + + assertThat(withPrefix.plan(tableId, planId)) + .isEqualTo("v1/ws/catalog/namespaces/test_namespace/tables/test_table/plan/plan-abc-123"); + assertThat(withoutPrefix.plan(tableId, planId)) + .isEqualTo("v1/namespaces/test_namespace/tables/test_table/plan/plan-abc-123"); + + // The planId contains a space which needs to be encoded + String spaceSeperatedPlanId = "plan with spaces"; + // The expected encoded version of the planId + String encodedPlanId = "plan+with+spaces"; + + assertThat(withPrefix.plan(tableId, spaceSeperatedPlanId)) + .isEqualTo( + "v1/ws/catalog/namespaces/test_namespace/tables/test_table/plan/" + encodedPlanId); + assertThat(withoutPrefix.plan(tableId, spaceSeperatedPlanId)) + .isEqualTo("v1/namespaces/test_namespace/tables/test_table/plan/" + encodedPlanId); + + // Test with different identifiers + TableIdentifier complexId = TableIdentifier.of(Namespace.of("db", "schema"), "my_table"); + assertThat(withPrefix.plan(complexId, "plan-xyz-789")) + .isEqualTo("v1/ws/catalog/namespaces/db%1Fschema/tables/my_table/plan/plan-xyz-789"); + assertThat(withoutPrefix.plan(complexId, "plan-xyz-789")) + .isEqualTo("v1/namespaces/db%1Fschema/tables/my_table/plan/plan-xyz-789"); + } }