diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java index 5a17310cca3be..329853fc2df96 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java @@ -44,6 +44,7 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.SchemaTranslator; +import org.apache.flink.table.catalog.TableWritePrivilege; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.expressions.ApiExpressionUtils; import org.apache.flink.table.expressions.Expression; @@ -60,11 +61,14 @@ import org.apache.flink.table.operations.utils.OperationExpressionsUtils.CategorizedExpressions; import org.apache.flink.table.operations.utils.OperationTreeBuilder; +import org.apache.flink.shaded.guava33.com.google.common.collect.Sets; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -428,8 +432,9 @@ public TablePipeline insertInto(String tablePath, boolean overwrite) { tableEnvironment.getParser().parseIdentifier(tablePath); ObjectIdentifier objectIdentifier = tableEnvironment.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + Set privileges = Sets.newHashSet(TableWritePrivilege.INSERT); ContextResolvedTable contextResolvedTable = - tableEnvironment.getCatalogManager().getTableOrError(objectIdentifier); + tableEnvironment.getCatalogManager().getTableOrError(objectIdentifier, privileges); return insertInto(contextResolvedTable, overwrite); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index bc8708578073c..e81d0a8546877 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -687,7 +687,19 @@ public Optional getTable(ObjectIdentifier objectIdentifier resolveCatalogBaseTable(temporaryTable); return Optional.of(ContextResolvedTable.temporary(objectIdentifier, resolvedTable)); } else { - return getPermanentTable(objectIdentifier, null); + return getPermanentTable(objectIdentifier, null, null); + } + } + + public Optional getTable( + ObjectIdentifier objectIdentifier, Set privileges) { + CatalogBaseTable temporaryTable = temporaryTables.get(objectIdentifier); + if (temporaryTable != null) { + final ResolvedCatalogBaseTable resolvedTable = + resolveCatalogBaseTable(temporaryTable); + return Optional.of(ContextResolvedTable.temporary(objectIdentifier, resolvedTable)); + } else { + return getPermanentTable(objectIdentifier, null, privileges); } } @@ -708,7 +720,7 @@ public Optional getTable( resolveCatalogBaseTable(temporaryTable); return Optional.of(ContextResolvedTable.temporary(objectIdentifier, resolvedTable)); } else { - return getPermanentTable(objectIdentifier, timestamp); + return getPermanentTable(objectIdentifier, timestamp, null); } } @@ -753,6 +765,17 @@ public ContextResolvedTable getTableOrError(ObjectIdentifier objectIdentifier) { objectIdentifier, listCatalogs()))); } + public ContextResolvedTable getTableOrError( + ObjectIdentifier objectIdentifier, Set privileges) { + return getTable(objectIdentifier, privileges) + .orElseThrow( + () -> + new ValidationException( + String.format( + "Cannot find table '%s' in any of the catalogs %s, nor as a temporary table.", + objectIdentifier, listCatalogs()))); + } + /** * Retrieves a partition with a fully qualified table path and partition spec. If the path is * not yet fully qualified use{@link #qualifyIdentifier(UnresolvedIdentifier)} first. @@ -777,7 +800,9 @@ public Optional getPartition( } private Optional getPermanentTable( - ObjectIdentifier objectIdentifier, @Nullable Long timestamp) { + ObjectIdentifier objectIdentifier, + @Nullable Long timestamp, + @Nullable Set privileges) { Optional catalogOptional = getCatalog(objectIdentifier.getCatalogName()); ObjectPath objectPath = objectIdentifier.toObjectPath(); if (catalogOptional.isPresent()) { @@ -792,6 +817,8 @@ private Optional getPermanentTable( "%s is a view, but time travel is not supported for view.", objectIdentifier.asSummaryString())); } + } else if (privileges != null) { + table = currentCatalog.getTable(objectPath, privileges); } else { table = currentCatalog.getTable(objectPath); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java index e230b616cdbfd..e4d4c71260871 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java @@ -51,6 +51,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -282,6 +283,23 @@ default CatalogBaseTable getTable(ObjectPath tablePath, long timestamp) "getTable(ObjectPath, long) is not implemented for %s.", this.getClass())); } + /** + * Returns a {@link CatalogTable} or {@link CatalogView} with specific write privileges + * identified by the given {@link ObjectPath}. The framework will resolve the metadata objects + * when necessary. + * + * @param tablePath Path of the table or view + * @param writePrivileges specific write privileges for the table + * @return The requested table or view + * @throws TableNotExistException if the target does not exist + * @throws CatalogException in case of any runtime exception + */ + default CatalogBaseTable getTable( + ObjectPath tablePath, Set writePrivileges) + throws TableNotExistException, CatalogException { + return getTable(tablePath); + } + /** * Check if a table or view exists in this catalog. * diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableWritePrivilege.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableWritePrivilege.java new file mode 100644 index 0000000000000..6e56982fa55c1 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableWritePrivilege.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; + +/** The table writing privileges that will be provided when loading a table. */ +@PublicEvolving +public enum TableWritePrivilege { + /** The privilege of adding rows to the table. */ + INSERT, + + /** The privilege for changing existing rows in the table. */ + UPDATE, + + /** The privilege for deleting rows from the table. */ + DELETE +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java index 4fcc3d7340283..edc51be5771f1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java @@ -63,6 +63,7 @@ import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.TableWritePrivilege; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.connector.sink.DynamicTableSink; @@ -115,6 +116,8 @@ import org.apache.flink.table.planner.utils.OperationConverterUtils; import org.apache.flink.table.planner.utils.RowLevelModificationContextUtils; +import org.apache.flink.shaded.guava33.com.google.common.collect.Sets; + import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.hint.HintStrategyTable; import org.apache.calcite.rel.hint.RelHint; @@ -134,6 +137,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; /** @@ -323,9 +327,10 @@ private Operation convertSqlInsert(RichSqlInsert insert) { UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(targetTablePath); ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); + Set privileges = Sets.newHashSet(TableWritePrivilege.INSERT); // If it is materialized table, convert it to catalog table for query optimize ContextResolvedTable contextResolvedTable = - catalogManager.getTableOrError(identifier).toCatalogTable(); + catalogManager.getTableOrError(identifier, privileges).toCatalogTable(); PlannerQueryOperation query = (PlannerQueryOperation) @@ -655,9 +660,10 @@ private Operation convertDelete(SqlDelete sqlDelete) { LogicalTableModify tableModify = (LogicalTableModify) deleteRelational.rel; UnresolvedIdentifier unresolvedTableIdentifier = UnresolvedIdentifier.of(tableModify.getTable().getQualifiedName()); + Set privileges = Sets.newHashSet(TableWritePrivilege.DELETE); ContextResolvedTable contextResolvedTable = catalogManager.getTableOrError( - catalogManager.qualifyIdentifier(unresolvedTableIdentifier)); + catalogManager.qualifyIdentifier(unresolvedTableIdentifier), privileges); // try push down delete Optional optionalDynamicTableSink = DeletePushDownUtils.getDynamicTableSink(contextResolvedTable, tableModify); @@ -700,9 +706,10 @@ private Operation convertUpdate(SqlUpdate sqlUpdate) { LogicalTableModify tableModify = (LogicalTableModify) updateRelational.rel; UnresolvedIdentifier unresolvedTableIdentifier = UnresolvedIdentifier.of(tableModify.getTable().getQualifiedName()); + Set privileges = Sets.newHashSet(TableWritePrivilege.UPDATE); ContextResolvedTable contextResolvedTable = catalogManager.getTableOrError( - catalogManager.qualifyIdentifier(unresolvedTableIdentifier)); + catalogManager.qualifyIdentifier(unresolvedTableIdentifier), privileges); // get query PlannerQueryOperation queryOperation = new PlannerQueryOperation(