From 98cf096e870e4cd09ef8b905e714ef4ced213ab8 Mon Sep 17 00:00:00 2001 From: Ke Wang Date: Wed, 18 Mar 2026 19:34:18 -0700 Subject: [PATCH 1/2] feat: Add analysis support for CREATE VECTOR INDEX (#27036) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: ## High level design The process for executing a CREATE VECTOR INDEX SQL statement is as follows: 1. SQL Input & Parsing: SQL: CREATE VECTOR INDEX my_index ON my_table(id, embedding) WITH (...) UPDATING FOR ... The Parser (SqlBase.g4) generates a CreateVectorIndex Abstract Syntax Tree (AST) node. 2. Statement Analysis: **StatementAnalyzer.visitCreateVectorIndex() validates the source/target tables and extracts index properties.** **This results in a structured CreateVectorIndexAnalysis object.** 3. Logical Planning & Query Generation: • LogicalPlanner.createVectorIndexPlan() builds the core execution query: CREATE index_table AS SELECT create_vector_index(embedding, id) FROM my_table WHERE ds BETWEEN ... • The resulting plan tree includes: TableFinishNode(target = CreateVectorIndexReference) └── TableWriterNode(target = CreateVectorIndexReference) └── query plan 4. Connector Plan Optimization (Rewriting): PRISM: The CreateVectorIndexRewriteOptimizer detects the CreateVectorIndexReference and rewrites the plan for optimization. ICEBERG/OTHER: Other connector-specific optimizers may fire during this phase. 5. Execution and Metadata Handling (For connectors that don't rewrite): TableWriteInfo Routing: The CreateVectorIndexReference triggers metadata.beginCreateVectorIndex(). Local Execution & Commit: The finisher and committer use the CreateVectorIndexHandle to call metadata.finishCreateVectorIndex() and metadata.commitPageSinkAsync(). 6. ConnectorMetadata SPI: Default: The standard implementation throws NOT_SUPPORTED. Iceberg Override: The Iceberg connector implements this SPI to create the underlying table via the begin/finish calls. ## Release Notes ``` == NO RELEASE NOTE == ``` Differential Revision: D91524358 Pulled By: skyelves --- .../presto/sql/analyzer/Analysis.java | 62 +++++++++++++++++ .../sql/analyzer/utils/StatementUtils.java | 2 + .../sql/analyzer/StatementAnalyzer.java | 66 +++++++++++++++++++ .../presto/sql/analyzer/TestAnalyzer.java | 45 +++++++++++++ 4 files changed, 175 insertions(+) diff --git a/presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java b/presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java index c3c1f43f8e142..974a49d9d9f34 100644 --- a/presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java +++ b/presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java @@ -186,6 +186,9 @@ public class Analysis private Optional callTarget = Optional.empty(); private Optional targetQuery = Optional.empty(); + // for create vector index + private Optional createVectorIndexAnalysis = Optional.empty(); + // for create table private Optional createTableDestination = Optional.empty(); private Map createTableProperties = ImmutableMap.of(); @@ -700,6 +703,16 @@ public Optional getCreateTableDestination() return createTableDestination; } + public void setCreateVectorIndexAnalysis(CreateVectorIndexAnalysis analysis) + { + this.createVectorIndexAnalysis = Optional.of(analysis); + } + + public Optional getCreateVectorIndexAnalysis() + { + return createVectorIndexAnalysis; + } + public Optional getProcedureName() { return procedureName; @@ -1937,4 +1950,53 @@ public Scope getTargetTableScope() return targetTableScope; } } + + @Immutable + public static final class CreateVectorIndexAnalysis + { + private final QualifiedObjectName sourceTableName; + private final QualifiedObjectName targetTableName; + private final List columns; + private final Map properties; + private final Optional updatingFor; + + public CreateVectorIndexAnalysis( + QualifiedObjectName sourceTableName, + QualifiedObjectName targetTableName, + List columns, + Map properties, + Optional updatingFor) + { + this.sourceTableName = requireNonNull(sourceTableName, "sourceTableName is null"); + this.targetTableName = requireNonNull(targetTableName, "targetTableName is null"); + this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null")); + this.properties = ImmutableMap.copyOf(requireNonNull(properties, "properties is null")); + this.updatingFor = requireNonNull(updatingFor, "updatingFor is null"); + } + + public QualifiedObjectName getSourceTableName() + { + return sourceTableName; + } + + public QualifiedObjectName getTargetTableName() + { + return targetTableName; + } + + public List getColumns() + { + return columns; + } + + public Map getProperties() + { + return properties; + } + + public Optional getUpdatingFor() + { + return updatingFor; + } + } } diff --git a/presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/utils/StatementUtils.java b/presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/utils/StatementUtils.java index 1be20437e8a1c..dfeb0b7899e37 100644 --- a/presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/utils/StatementUtils.java +++ b/presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/utils/StatementUtils.java @@ -30,6 +30,7 @@ import com.facebook.presto.sql.tree.CreateTableAsSelect; import com.facebook.presto.sql.tree.CreateTag; import com.facebook.presto.sql.tree.CreateType; +import com.facebook.presto.sql.tree.CreateVectorIndex; import com.facebook.presto.sql.tree.CreateView; import com.facebook.presto.sql.tree.Deallocate; import com.facebook.presto.sql.tree.Delete; @@ -107,6 +108,7 @@ private StatementUtils() {} builder.put(CreateTableAsSelect.class, QueryType.INSERT); builder.put(Insert.class, QueryType.INSERT); builder.put(RefreshMaterializedView.class, QueryType.INSERT); + builder.put(CreateVectorIndex.class, QueryType.INSERT); builder.put(Delete.class, QueryType.DELETE); builder.put(Update.class, QueryType.UPDATE); diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java index 8b1f7c0809fba..d22a1d64310a1 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java @@ -110,6 +110,7 @@ import com.facebook.presto.sql.tree.CreateSchema; import com.facebook.presto.sql.tree.CreateTable; import com.facebook.presto.sql.tree.CreateTableAsSelect; +import com.facebook.presto.sql.tree.CreateVectorIndex; import com.facebook.presto.sql.tree.CreateView; import com.facebook.presto.sql.tree.Cube; import com.facebook.presto.sql.tree.Deallocate; @@ -1144,6 +1145,71 @@ protected Scope visitCreateTable(CreateTable node, Optional scope) return createAndAssignScope(node, scope); } + @Override + protected Scope visitCreateVectorIndex(CreateVectorIndex node, Optional scope) + { + QualifiedObjectName sourceTableName = createQualifiedObjectName(session, node, node.getTableName(), metadata); + if (!metadataResolver.tableExists(sourceTableName)) { + throw new SemanticException(MISSING_TABLE, node, "Source table '%s' does not exist", sourceTableName); + } + + QualifiedObjectName targetTable = createQualifiedObjectName(session, node, node.getIndexName(), metadata); + if (metadataResolver.tableExists(targetTable)) { + throw new SemanticException(TABLE_ALREADY_EXISTS, node, "Destination table '%s' already exists", targetTable); + } + + // Analyze the source table to build a proper scope with typed columns + // Use AllowAllAccessControl since we check permissions separately below + StatementAnalyzer analyzer = new StatementAnalyzer( + analysis, + metadata, + sqlParser, + new AllowAllAccessControl(), + session, + warningCollector); + + Table sourceTable = new Table(node.getTableName()); + Scope tableScope = analyzer.analyze(sourceTable, scope); + + // Validate that specified columns exist in the source table + TableHandle sourceTableHandle = metadataResolver.getTableHandle(sourceTableName).get(); + Map sourceColumns = metadataResolver.getColumnHandles(sourceTableHandle); + for (Identifier column : node.getColumns()) { + if (!sourceColumns.containsKey(column.getValue())) { + throw new SemanticException(MISSING_COLUMN, column, "Column '%s' does not exist in source table '%s'", column.getValue(), sourceTableName); + } + } + + // Analyze UPDATING FOR predicate (validates column references, types, etc.) + node.getUpdatingFor().ifPresent(where -> analyzeWhere(node, tableScope, where)); + + validateProperties(node.getProperties(), scope); + + Map allProperties = mapFromProperties(node.getProperties()); + + // user must have read permission on the source table to create a vector index + Multimap tableColumnMap = ImmutableMultimap.builder() + .putAll(sourceTableName, sourceColumns.keySet().stream() + .map(column -> new Subfield(column, ImmutableList.of())) + .collect(toImmutableSet())) + .build(); + analysis.addTableColumnAndSubfieldReferences(accessControl, session.getIdentity(), + session.getTransactionId(), session.getAccessControlContext(), tableColumnMap, tableColumnMap); + + analysis.addAccessControlCheckForTable(TABLE_CREATE, + new AccessControlInfoForTable(accessControl, session.getIdentity(), + session.getTransactionId(), session.getAccessControlContext(), targetTable)); + + analysis.setCreateVectorIndexAnalysis(new Analysis.CreateVectorIndexAnalysis( + sourceTableName, + targetTable, + node.getColumns(), + allProperties, + node.getUpdatingFor())); + + return createAndAssignScope(node, scope, Field.newUnqualified(node.getLocation(), "result", VARCHAR)); + } + @Override protected Scope visitProperty(Property node, Optional scope) { diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestAnalyzer.java b/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestAnalyzer.java index f6bfcc5d75e58..21ea729936869 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestAnalyzer.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestAnalyzer.java @@ -82,6 +82,7 @@ import static com.facebook.presto.sql.analyzer.SemanticErrorCode.SAMPLE_PERCENTAGE_OUT_OF_RANGE; import static com.facebook.presto.sql.analyzer.SemanticErrorCode.SCHEMA_NOT_SPECIFIED; import static com.facebook.presto.sql.analyzer.SemanticErrorCode.STANDALONE_LAMBDA; +import static com.facebook.presto.sql.analyzer.SemanticErrorCode.TABLE_ALREADY_EXISTS; import static com.facebook.presto.sql.analyzer.SemanticErrorCode.TABLE_FUNCTION_COLUMN_NOT_FOUND; import static com.facebook.presto.sql.analyzer.SemanticErrorCode.TABLE_FUNCTION_DUPLICATE_RANGE_VARIABLE; import static com.facebook.presto.sql.analyzer.SemanticErrorCode.TABLE_FUNCTION_IMPLEMENTATION_ERROR; @@ -2399,4 +2400,48 @@ public void testInvalidMerge() assertFails(NOT_SUPPORTED, "line 1:1: Merging into materialized views is not supported", "MERGE INTO mv1 USING t1 ON mv1.a = t1.a WHEN MATCHED THEN UPDATE SET id = bar.id + 1"); } + + @Test + public void testCreateVectorIndex() + { + // basic success cases + analyze("CREATE VECTOR INDEX test_index ON t1(a, b)"); + analyze("CREATE VECTOR INDEX test_index ON t1(a, b) WITH (p1 = 'val1')"); + analyze("CREATE VECTOR INDEX test_index ON t1(a, b) WITH (p1 = 'val1', p2 = 'val2')"); + + // with UPDATING FOR clause + analyze("CREATE VECTOR INDEX test_index ON t1(a, b) UPDATING FOR a > 10"); + analyze("CREATE VECTOR INDEX test_index ON t1(a, b) WITH (p1 = 'val1') UPDATING FOR a BETWEEN 1 AND 100"); + + // single column + analyze("CREATE VECTOR INDEX test_index ON t1(a)"); + + // source table does not exist + assertFails(MISSING_TABLE, ".*Source table '.*' does not exist", + "CREATE VECTOR INDEX test_index ON nonexistent_table(a, b)"); + + // destination table already exists (using an existing table name as the index name) + assertFails(TABLE_ALREADY_EXISTS, ".*already exists", + "CREATE VECTOR INDEX t1 ON t2(a, b)"); + + // column does not exist in source table + assertFails(MISSING_COLUMN, ".*Column 'unknown' does not exist in source table '.*'", + "CREATE VECTOR INDEX test_index ON t1(a, unknown)"); + assertFails(MISSING_COLUMN, ".*Column 'nonexistent' does not exist in source table '.*'", + "CREATE VECTOR INDEX test_index ON t1(nonexistent)"); + + // duplicate properties + assertFails(DUPLICATE_PROPERTY, ".* Duplicate property: p1", + "CREATE VECTOR INDEX test_index ON t1(a, b) WITH (p1 = 'v1', p2 = 'v2', p1 = 'v3')"); + assertFails(DUPLICATE_PROPERTY, ".* Duplicate property: p1", + "CREATE VECTOR INDEX test_index ON t1(a, b) WITH (p1 = 'v1', \"p1\" = 'v2')"); + + // unresolved property value + assertFails(MISSING_ATTRIBUTE, ".*'y' cannot be resolved", + "CREATE VECTOR INDEX test_index ON t1(a, b) WITH (p1 = y)"); + + // UPDATING FOR with invalid column reference + assertFails(MISSING_ATTRIBUTE, ".*", + "CREATE VECTOR INDEX test_index ON t1(a, b) UPDATING FOR nonexistent_col > 10"); + } } From 37b2e82146a79b4d4015909c213e676f2acaedbf Mon Sep 17 00:00:00 2001 From: Ke Wang Date: Wed, 18 Mar 2026 19:34:18 -0700 Subject: [PATCH 2/2] feat: Add CreateVectorIndexReference WriterTarget and SPI methods (#27261) Summary: Add dedicated WriterTarget subclass and ConnectorMetadata SPI for CREATE VECTOR INDEX, enabling each connector to implement vector index creation independently. - CreateVectorIndexReference: plan-time target carrying index metadata and source table reference - beginCreateVectorIndex/finishCreateVectorIndex: SPI defaults to NOT_SUPPORTED so connectors must opt in - ClassLoaderSafeConnectorMetadata: delegation wrappers ## Release Notes ``` == NO RELEASE NOTE == ``` Differential Revision: D95325176 --- .../spi/connector/ConnectorMetadata.java | 16 +++++ .../ClassLoaderSafeConnectorMetadata.java | 16 +++++ .../presto/spi/plan/TableWriterNode.java | 63 +++++++++++++++++++ 3 files changed, 95 insertions(+) diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java index 5540734c7ed47..8e7be0e171b5c 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorMetadata.java @@ -502,6 +502,22 @@ default Optional finishCreateTable(ConnectorSession ses throw new PrestoException(GENERIC_INTERNAL_ERROR, "ConnectorMetadata beginCreateTable() is implemented without finishCreateTable()"); } + /** + * Begin the atomic creation of a vector index with data. + */ + default ConnectorOutputTableHandle beginCreateVectorIndex(ConnectorSession session, ConnectorTableMetadata indexMetadata, Optional layout, SchemaTableName sourceTableName) + { + throw new PrestoException(NOT_SUPPORTED, "This connector does not support creating vector indexes"); + } + + /** + * Finish a vector index creation with data after the data is written. + */ + default Optional finishCreateVectorIndex(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments, Collection computedStatistics) + { + throw new PrestoException(NOT_SUPPORTED, "This connector does not support creating vector indexes"); + } + /** * Start a SELECT/UPDATE/INSERT/DELETE query. This notification is triggered after the planning phase completes. */ diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java index 63e0f082b788d..7e7c6c99bf4e7 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/classloader/ClassLoaderSafeConnectorMetadata.java @@ -450,6 +450,22 @@ public Optional finishCreateTable(ConnectorSession sess } } + @Override + public ConnectorOutputTableHandle beginCreateVectorIndex(ConnectorSession session, ConnectorTableMetadata indexMetadata, Optional layout, SchemaTableName sourceTableName) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.beginCreateVectorIndex(session, indexMetadata, layout, sourceTableName); + } + } + + @Override + public Optional finishCreateVectorIndex(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection fragments, Collection computedStatistics) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return delegate.finishCreateVectorIndex(session, tableHandle, fragments, computedStatistics); + } + } + @Override public void beginQuery(ConnectorSession session) { diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/TableWriterNode.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/TableWriterNode.java index 3fcf43a488c8e..d11a28383bf51 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/plan/TableWriterNode.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/TableWriterNode.java @@ -698,4 +698,67 @@ public String toString() return procedureName.toString(); } } + + public static class CreateVectorIndexReference + extends WriterTarget + { + private final ConnectorId connectorId; + private final ConnectorTableMetadata tableMetadata; + private final Optional layout; + private final Optional> columns; + private final SchemaTableName sourceTableName; + + public CreateVectorIndexReference( + ConnectorId connectorId, + ConnectorTableMetadata tableMetadata, + Optional layout, + Optional> columns, + SchemaTableName sourceTableName) + { + this.connectorId = requireNonNull(connectorId, "connectorId is null"); + this.tableMetadata = requireNonNull(tableMetadata, "tableMetadata is null"); + this.layout = requireNonNull(layout, "layout is null"); + this.columns = requireNonNull(columns, "columns is null"); + this.sourceTableName = requireNonNull(sourceTableName, "sourceTableName is null"); + } + + @Override + public ConnectorId getConnectorId() + { + return connectorId; + } + + public ConnectorTableMetadata getTableMetadata() + { + return tableMetadata; + } + + public Optional getLayout() + { + return layout; + } + + @Override + public SchemaTableName getSchemaTableName() + { + return tableMetadata.getTable(); + } + + @Override + public String toString() + { + return connectorId + "." + tableMetadata.getTable(); + } + + @Override + public Optional> getOutputColumns() + { + return columns; + } + + public SchemaTableName getSourceTableName() + { + return sourceTableName; + } + } }