Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ public class Analysis
private Optional<TableHandle> callTarget = Optional.empty();
private Optional<QuerySpecification> targetQuery = Optional.empty();

// for create vector index
private Optional<CreateVectorIndexAnalysis> createVectorIndexAnalysis = Optional.empty();

// for create table
private Optional<QualifiedObjectName> createTableDestination = Optional.empty();
private Map<String, Expression> createTableProperties = ImmutableMap.of();
Expand Down Expand Up @@ -700,6 +703,16 @@ public Optional<QualifiedObjectName> getCreateTableDestination()
return createTableDestination;
}

public void setCreateVectorIndexAnalysis(CreateVectorIndexAnalysis analysis)
{
this.createVectorIndexAnalysis = Optional.of(analysis);
}

public Optional<CreateVectorIndexAnalysis> getCreateVectorIndexAnalysis()
{
return createVectorIndexAnalysis;
}

public Optional<QualifiedObjectName> getProcedureName()
{
return procedureName;
Expand Down Expand Up @@ -1937,4 +1950,53 @@ public Scope getTargetTableScope()
return targetTableScope;
}
}

@Immutable
public static final class CreateVectorIndexAnalysis
{
private final QualifiedObjectName sourceTableName;
private final QualifiedObjectName targetTableName;
private final List<Identifier> columns;
private final Map<String, Expression> properties;
private final Optional<Expression> updatingFor;

public CreateVectorIndexAnalysis(
QualifiedObjectName sourceTableName,
QualifiedObjectName targetTableName,
List<Identifier> columns,
Map<String, Expression> properties,
Optional<Expression> updatingFor)
{
this.sourceTableName = requireNonNull(sourceTableName, "sourceTableName is null");
this.targetTableName = requireNonNull(targetTableName, "targetTableName is null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
this.properties = ImmutableMap.copyOf(requireNonNull(properties, "properties is null"));
this.updatingFor = requireNonNull(updatingFor, "updatingFor is null");
}

public QualifiedObjectName getSourceTableName()
{
return sourceTableName;
}

public QualifiedObjectName getTargetTableName()
{
return targetTableName;
}

public List<Identifier> getColumns()
{
return columns;
}

public Map<String, Expression> getProperties()
{
return properties;
}

public Optional<Expression> getUpdatingFor()
{
return updatingFor;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.facebook.presto.sql.tree.CreateTableAsSelect;
import com.facebook.presto.sql.tree.CreateTag;
import com.facebook.presto.sql.tree.CreateType;
import com.facebook.presto.sql.tree.CreateVectorIndex;
import com.facebook.presto.sql.tree.CreateView;
import com.facebook.presto.sql.tree.Deallocate;
import com.facebook.presto.sql.tree.Delete;
Expand Down Expand Up @@ -107,6 +108,7 @@ private StatementUtils() {}
builder.put(CreateTableAsSelect.class, QueryType.INSERT);
builder.put(Insert.class, QueryType.INSERT);
builder.put(RefreshMaterializedView.class, QueryType.INSERT);
builder.put(CreateVectorIndex.class, QueryType.INSERT);

builder.put(Delete.class, QueryType.DELETE);
builder.put(Update.class, QueryType.UPDATE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import com.facebook.presto.operator.aggregation.ClassificationThresholdsAggregation;
import com.facebook.presto.operator.aggregation.CountAggregation;
import com.facebook.presto.operator.aggregation.CountIfAggregation;
import com.facebook.presto.operator.aggregation.CreateVectorIndexAggregation;
import com.facebook.presto.operator.aggregation.DefaultApproximateCountDistinctAggregation;
import com.facebook.presto.operator.aggregation.DoubleCorrelationAggregation;
import com.facebook.presto.operator.aggregation.DoubleCovarianceAggregation;
Expand Down Expand Up @@ -710,6 +711,7 @@ private List<? extends SqlFunction> getBuiltInFunctions(FunctionsConfig function
.aggregate(GeometryUnionAgg.class)
.aggregate(SpatialPartitioningAggregateFunction.class)
.aggregate(SpatialPartitioningInternalAggregateFunction.class)
.aggregates(CreateVectorIndexAggregation.class)
.aggregates(CountAggregation.class)
.aggregates(VarianceAggregation.class)
.aggregates(CentralMomentsAggregation.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator.aggregation;

import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.operator.aggregation.state.SliceState;
import com.facebook.presto.spi.function.AggregationFunction;
import com.facebook.presto.spi.function.AggregationState;
import com.facebook.presto.spi.function.CombineFunction;
import com.facebook.presto.spi.function.InputFunction;
import com.facebook.presto.spi.function.OutputFunction;
import com.facebook.presto.spi.function.SqlType;
import com.facebook.presto.spi.function.TypeParameter;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;

import static com.facebook.presto.common.type.VarcharType.VARCHAR;

/**
* Dummy aggregate function for CREATE VECTOR INDEX planning.
* This function is never executed — the connector optimizer replaces
* the plan tree before execution.
*/
@AggregationFunction("create_vector_index")
public final class CreateVectorIndexAggregation
{
private CreateVectorIndexAggregation() {}

// 1-arg overloads: embedding only (no id)

@InputFunction
public static void inputRealArray(
@AggregationState SliceState state,
@SqlType("array(real)") Block embedding)
{
}

@InputFunction
public static void inputDoubleArray(
@AggregationState SliceState state,
@SqlType("array(double)") Block embedding)
{
}

// 2-arg overloads: id + embedding (matches SQL syntax: ON table(id, embedding))

@InputFunction
@TypeParameter("T")
public static void inputRealArrayWithLongId(
@AggregationState SliceState state,
@SqlType("T") long id,
@SqlType("array(real)") Block embedding)
{
}

@InputFunction
@TypeParameter("T")
public static void inputRealArrayWithDoubleId(
@AggregationState SliceState state,
@SqlType("T") double id,
@SqlType("array(real)") Block embedding)
{
}

@InputFunction
@TypeParameter("T")
public static void inputRealArrayWithSliceId(
@AggregationState SliceState state,
@SqlType("T") Slice id,
@SqlType("array(real)") Block embedding)
{
}

@InputFunction
@TypeParameter("T")
public static void inputDoubleArrayWithLongId(
@AggregationState SliceState state,
@SqlType("T") long id,
@SqlType("array(double)") Block embedding)
{
}

@InputFunction
@TypeParameter("T")
public static void inputDoubleArrayWithDoubleId(
@AggregationState SliceState state,
@SqlType("T") double id,
@SqlType("array(double)") Block embedding)
{
}

@InputFunction
@TypeParameter("T")
public static void inputDoubleArrayWithSliceId(
@AggregationState SliceState state,
@SqlType("T") Slice id,
@SqlType("array(double)") Block embedding)
{
}

@CombineFunction
public static void combine(
@AggregationState SliceState state,
@AggregationState SliceState otherState)
{
}

@OutputFunction(StandardTypes.VARCHAR)
public static void output(@AggregationState SliceState state, BlockBuilder out)
{
VARCHAR.writeSlice(out, Slices.utf8Slice(""));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import com.facebook.presto.sql.tree.CreateSchema;
import com.facebook.presto.sql.tree.CreateTable;
import com.facebook.presto.sql.tree.CreateTableAsSelect;
import com.facebook.presto.sql.tree.CreateVectorIndex;
import com.facebook.presto.sql.tree.CreateView;
import com.facebook.presto.sql.tree.Cube;
import com.facebook.presto.sql.tree.Deallocate;
Expand Down Expand Up @@ -1144,6 +1145,71 @@ protected Scope visitCreateTable(CreateTable node, Optional<Scope> scope)
return createAndAssignScope(node, scope);
}

@Override
protected Scope visitCreateVectorIndex(CreateVectorIndex node, Optional<Scope> scope)
{
QualifiedObjectName sourceTableName = createQualifiedObjectName(session, node, node.getTableName(), metadata);
if (!metadataResolver.tableExists(sourceTableName)) {
throw new SemanticException(MISSING_TABLE, node, "Source table '%s' does not exist", sourceTableName);
}

QualifiedObjectName targetTable = createQualifiedObjectName(session, node, node.getIndexName(), metadata);
if (metadataResolver.tableExists(targetTable)) {
throw new SemanticException(TABLE_ALREADY_EXISTS, node, "Destination table '%s' already exists", targetTable);
}

// Analyze the source table to build a proper scope with typed columns
// Use AllowAllAccessControl since we check permissions separately below
StatementAnalyzer analyzer = new StatementAnalyzer(
analysis,
metadata,
sqlParser,
new AllowAllAccessControl(),
session,
warningCollector);

Table sourceTable = new Table(node.getTableName());
Scope tableScope = analyzer.analyze(sourceTable, scope);

// Validate that specified columns exist in the source table
TableHandle sourceTableHandle = metadataResolver.getTableHandle(sourceTableName).get();
Map<String, ColumnHandle> sourceColumns = metadataResolver.getColumnHandles(sourceTableHandle);
for (Identifier column : node.getColumns()) {
if (!sourceColumns.containsKey(column.getValue())) {
throw new SemanticException(MISSING_COLUMN, column, "Column '%s' does not exist in source table '%s'", column.getValue(), sourceTableName);
}
}

// Analyze UPDATING FOR predicate (validates column references, types, etc.)
node.getUpdatingFor().ifPresent(where -> analyzeWhere(node, tableScope, where));

validateProperties(node.getProperties(), scope);

Map<String, Expression> allProperties = mapFromProperties(node.getProperties());

// user must have read permission on the source table to create a vector index
Multimap<QualifiedObjectName, Subfield> tableColumnMap = ImmutableMultimap.<QualifiedObjectName, Subfield>builder()
.putAll(sourceTableName, sourceColumns.keySet().stream()
.map(column -> new Subfield(column, ImmutableList.of()))
.collect(toImmutableSet()))
.build();
analysis.addTableColumnAndSubfieldReferences(accessControl, session.getIdentity(),
session.getTransactionId(), session.getAccessControlContext(), tableColumnMap, tableColumnMap);

analysis.addAccessControlCheckForTable(TABLE_CREATE,
new AccessControlInfoForTable(accessControl, session.getIdentity(),
session.getTransactionId(), session.getAccessControlContext(), targetTable));

analysis.setCreateVectorIndexAnalysis(new Analysis.CreateVectorIndexAnalysis(
sourceTableName,
targetTable,
node.getColumns(),
allProperties,
node.getUpdatingFor()));

return createAndAssignScope(node, scope, Field.newUnqualified(node.getLocation(), "result", VARCHAR));
}

@Override
protected Scope visitProperty(Property node, Optional<Scope> scope)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.SAMPLE_PERCENTAGE_OUT_OF_RANGE;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.SCHEMA_NOT_SPECIFIED;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.STANDALONE_LAMBDA;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.TABLE_ALREADY_EXISTS;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.TABLE_FUNCTION_COLUMN_NOT_FOUND;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.TABLE_FUNCTION_DUPLICATE_RANGE_VARIABLE;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.TABLE_FUNCTION_IMPLEMENTATION_ERROR;
Expand Down Expand Up @@ -2399,4 +2400,48 @@ public void testInvalidMerge()
assertFails(NOT_SUPPORTED, "line 1:1: Merging into materialized views is not supported",
"MERGE INTO mv1 USING t1 ON mv1.a = t1.a WHEN MATCHED THEN UPDATE SET id = bar.id + 1");
}

@Test
public void testCreateVectorIndex()
{
// basic success cases
analyze("CREATE VECTOR INDEX test_index ON t1(a, b)");
analyze("CREATE VECTOR INDEX test_index ON t1(a, b) WITH (p1 = 'val1')");
analyze("CREATE VECTOR INDEX test_index ON t1(a, b) WITH (p1 = 'val1', p2 = 'val2')");

// with UPDATING FOR clause
analyze("CREATE VECTOR INDEX test_index ON t1(a, b) UPDATING FOR a > 10");
analyze("CREATE VECTOR INDEX test_index ON t1(a, b) WITH (p1 = 'val1') UPDATING FOR a BETWEEN 1 AND 100");

// single column
analyze("CREATE VECTOR INDEX test_index ON t1(a)");

// source table does not exist
assertFails(MISSING_TABLE, ".*Source table '.*' does not exist",
"CREATE VECTOR INDEX test_index ON nonexistent_table(a, b)");

// destination table already exists (using an existing table name as the index name)
assertFails(TABLE_ALREADY_EXISTS, ".*already exists",
"CREATE VECTOR INDEX t1 ON t2(a, b)");

// column does not exist in source table
assertFails(MISSING_COLUMN, ".*Column 'unknown' does not exist in source table '.*'",
"CREATE VECTOR INDEX test_index ON t1(a, unknown)");
assertFails(MISSING_COLUMN, ".*Column 'nonexistent' does not exist in source table '.*'",
"CREATE VECTOR INDEX test_index ON t1(nonexistent)");

// duplicate properties
assertFails(DUPLICATE_PROPERTY, ".* Duplicate property: p1",
"CREATE VECTOR INDEX test_index ON t1(a, b) WITH (p1 = 'v1', p2 = 'v2', p1 = 'v3')");
assertFails(DUPLICATE_PROPERTY, ".* Duplicate property: p1",
"CREATE VECTOR INDEX test_index ON t1(a, b) WITH (p1 = 'v1', \"p1\" = 'v2')");

// unresolved property value
assertFails(MISSING_ATTRIBUTE, ".*'y' cannot be resolved",
"CREATE VECTOR INDEX test_index ON t1(a, b) WITH (p1 = y)");

// UPDATING FOR with invalid column reference
assertFails(MISSING_ATTRIBUTE, ".*",
"CREATE VECTOR INDEX test_index ON t1(a, b) UPDATING FOR nonexistent_col > 10");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,22 @@ default Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession ses
throw new PrestoException(GENERIC_INTERNAL_ERROR, "ConnectorMetadata beginCreateTable() is implemented without finishCreateTable()");
}

/**
* Begin the atomic creation of a vector index with data.
*/
default ConnectorOutputTableHandle beginCreateVectorIndex(ConnectorSession session, ConnectorTableMetadata indexMetadata, Optional<ConnectorNewTableLayout> layout, SchemaTableName sourceTableName)
{
throw new PrestoException(NOT_SUPPORTED, "This connector does not support creating vector indexes");
}

/**
* Finish a vector index creation with data after the data is written.
*/
default Optional<ConnectorOutputMetadata> finishCreateVectorIndex(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
throw new PrestoException(NOT_SUPPORTED, "This connector does not support creating vector indexes");
}

/**
* Start a SELECT/UPDATE/INSERT/DELETE query. This notification is triggered after the planning phase completes.
*/
Expand Down
Loading
Loading