Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ project(':iceberg-core') {
implementation libs.jackson.databind
implementation libs.caffeine
implementation libs.roaringbitmap
implementation libs.failsafe
compileOnly(libs.hadoop3.client) {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

/** Context object with optional arguments for a TableScan. */
@Value.Immutable
abstract class TableScanContext {
public abstract class TableScanContext {

@Nullable
public abstract Long snapshotId();
Expand Down
50 changes: 50 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/ErrorHandlers.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchPlanIdException;
import org.apache.iceberg.exceptions.NoSuchPlanTaskException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.exceptions.NotAuthorizedException;
Expand Down Expand Up @@ -73,6 +75,14 @@ public static Consumer<ErrorResponse> tableCommitHandler() {
return CommitErrorHandler.INSTANCE;
}

public static Consumer<ErrorResponse> planErrorHandler() {
return PlanErrorHandler.INSTANCE;
}

public static Consumer<ErrorResponse> planTaskHandler() {
return PlanTaskErrorHandler.INSTANCE;
}

public static Consumer<ErrorResponse> defaultErrorHandler() {
return DefaultErrorHandler.INSTANCE;
}
Expand Down Expand Up @@ -125,6 +135,46 @@ public void accept(ErrorResponse error) {
}
}

/** Plan level error handler. */
private static class PlanErrorHandler extends DefaultErrorHandler {
private static final ErrorHandler INSTANCE = new PlanErrorHandler();

@Override
public void accept(ErrorResponse error) {
if (error.code() == 404) {
if (NoSuchNamespaceException.class.getSimpleName().equals(error.type())) {
throw new NoSuchNamespaceException("%s", error.message());
} else if (NoSuchTableException.class.getSimpleName().equals(error.type())) {
throw new NoSuchTableException("%s", error.message());
} else {
throw new NoSuchPlanIdException("%s", error.message());
}
}

super.accept(error);
}
}

/** PlanTask level error handler. */
private static class PlanTaskErrorHandler extends DefaultErrorHandler {
private static final ErrorHandler INSTANCE = new PlanTaskErrorHandler();

@Override
public void accept(ErrorResponse error) {
if (error.code() == 404) {
if (NoSuchNamespaceException.class.getSimpleName().equals(error.type())) {
throw new NoSuchNamespaceException("%s", error.message());
} else if (NoSuchTableException.class.getSimpleName().equals(error.type())) {
throw new NoSuchTableException("%s", error.message());
} else {
throw new NoSuchPlanTaskException("%s", error.message());
}
}

super.accept(error);
}
}

/** View commit error handler. */
private static class ViewCommitErrorHandler extends DefaultErrorHandler {
private static final ErrorHandler INSTANCE = new ViewCommitErrorHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ private RESTCatalogProperties() {}

public static final String NAMESPACE_SEPARATOR = "namespace-separator";

// Enable planning on the REST server side
public static final String REST_SCAN_PLANNING_ENABLED = "rest-scan-planning-enabled";
public static final boolean REST_SCAN_PLANNING_ENABLED_DEFAULT = false;

public enum SnapshotMode {
ALL,
REFS
Expand Down
41 changes: 41 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.Transactions;
import org.apache.iceberg.catalog.BaseViewSessionCatalog;
Expand Down Expand Up @@ -158,6 +159,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private MetricsReporter reporter = null;
private boolean reportingViaRestEnabled;
private Integer pageSize = null;
private boolean restScanPlanningEnabled;
private CloseableGroup closeables = null;
private Set<Endpoint> endpoints;
private Supplier<Map<String, String>> mutationHeaders = Map::of;
Expand Down Expand Up @@ -270,6 +272,11 @@ public void initialize(String name, Map<String, String> unresolved) {
RESTCatalogProperties.NAMESPACE_SEPARATOR,
RESTUtil.NAMESPACE_SEPARATOR_URLENCODED_UTF_8);

this.restScanPlanningEnabled =
PropertyUtil.propertyAsBoolean(
mergedProps,
RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED,
RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED_DEFAULT);
super.initialize(name, mergedProps);
}

Expand Down Expand Up @@ -476,6 +483,13 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {

trackFileIO(ops);

RESTTable restTable = restTableForScanPlanning(ops, finalIdentifier, tableClient);
// RestTable should be only be returned for non-metadata tables, because client would
// not have access to metadata files for example manifests, since all it needs is catalog.
if (restTable != null) {
return restTable;
}

BaseTable table =
new BaseTable(
ops,
Expand All @@ -488,6 +502,23 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
return table;
}

private RESTTable restTableForScanPlanning(
TableOperations ops, TableIdentifier finalIdentifier, RESTClient restClient) {
// server supports remote planning endpoint and server / client wants to do server side planning
if (endpoints.contains(Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN) && restScanPlanningEnabled) {
return new RESTTable(
ops,
fullTableName(finalIdentifier),
metricsReporter(paths.metrics(finalIdentifier), restClient),
restClient,
Map::of,
finalIdentifier,
paths,
endpoints);
}
return null;
}

private void trackFileIO(RESTTableOperations ops) {
if (io != ops.io()) {
fileIOTracker.track(ops);
Expand Down Expand Up @@ -556,6 +587,11 @@ public Table registerTable(

trackFileIO(ops);

RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient);
if (restTable != null) {
return restTable;
}

return new BaseTable(
ops, fullTableName(ident), metricsReporter(paths.metrics(ident), tableClient));
}
Expand Down Expand Up @@ -820,6 +856,11 @@ public Table create() {

trackFileIO(ops);

RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient);
if (restTable != null) {
return restTable;
}

return new BaseTable(
ops, fullTableName(ident), metricsReporter(paths.metrics(ident), tableClient));
}
Expand Down
70 changes: 70 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/RESTTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest;

import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ImmutableTableScanContext;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.metrics.MetricsReporter;

class RESTTable extends BaseTable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@singhpk234 The one comment I had about this is that we probably want to override all of the mutating operations and disallow/throw because a user may not know that they can't perform an update or start a transaction. This would likely surface as a file system access error which could be confusing/misleading.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Dan, I agree, will publish a change right away

Copy link
Contributor Author

@singhpk234 singhpk234 Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing i realized its probably pretty late when the table api (to write iceberg metadata) will be invoked to commit to the table, because lets say in case of spark the executors would have attempted to write data files and only if they would have succeeded (non Fine grained cases) then would have sent back message to the driver whose responsibility now is to commit to table i.e generate metadata, which will not be the case.

Essentially main question is do we need to block the writes upfront because we dont know if the table is protected or not, atleast this point server can reject the writes with 403 it its is indeed protected

Me and @amogh-jahagirdar discussed this from spark pov if we can write an analyzer rule via iceberg spark extension, which detects that the query / df wants to do a write and then fails in the analysis phase saying write is not supported, WDYT

private final RESTClient client;
private final Supplier<Map<String, String>> headers;
private final MetricsReporter reporter;
private final ResourcePaths resourcePaths;
private final TableIdentifier tableIdentifier;
private final Set<Endpoint> supportedEndpoints;

RESTTable(
TableOperations ops,
String name,
MetricsReporter reporter,
RESTClient client,
Supplier<Map<String, String>> headers,
TableIdentifier tableIdentifier,
ResourcePaths resourcePaths,
Set<Endpoint> supportedEndpoints) {
super(ops, name, reporter);
this.reporter = reporter;
this.client = client;
this.headers = headers;
this.tableIdentifier = tableIdentifier;
this.resourcePaths = resourcePaths;
this.supportedEndpoints = supportedEndpoints;
}

@Override
public TableScan newScan() {
return new RESTTableScan(
this,
schema(),
ImmutableTableScanContext.builder().metricsReporter(reporter).build(),
client,
headers.get(),
operations(),
tableIdentifier,
resourcePaths,
supportedEndpoints);
}
}
Loading