Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,10 @@ private record FunctionKey(ResolvedFunction resolvedFunction, InvocationConventi
public static FunctionManager createTestingFunctionManager()
{
TypeOperators typeOperators = new TypeOperators();
GlobalFunctionCatalog functionCatalog = new GlobalFunctionCatalog();
GlobalFunctionCatalog functionCatalog = new GlobalFunctionCatalog(
() -> { throw new UnsupportedOperationException(); },
() -> { throw new UnsupportedOperationException(); },
() -> { throw new UnsupportedOperationException(); });
functionCatalog.addFunctions(SystemFunctionBundle.create(new FeaturesConfig(), typeOperators, new BlockTypeOperators(typeOperators), UNKNOWN));
functionCatalog.addFunctions(new InternalFunctionBundle(new LiteralFunction(new InternalBlockEncodingSerde(new BlockEncodingManager(), TESTING_TYPE_MANAGER))));
return new FunctionManager(CatalogServiceProvider.fail(), functionCatalog, LanguageFunctionProvider.DISABLED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.trino.connector.system.GlobalSystemConnector;
import io.trino.operator.table.ExcludeColumns.ExcludeColumnsFunctionHandle;
import io.trino.operator.table.Sequence.SequenceFunctionHandle;
import io.trino.operator.table.json.JsonTable.JsonTableFunctionHandle;
import io.trino.spi.function.AggregationFunctionMetadata;
import io.trino.spi.function.AggregationImplementation;
import io.trino.spi.function.BoundSignature;
Expand All @@ -37,6 +40,7 @@
import io.trino.spi.function.WindowFunctionSupplier;
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
import io.trino.spi.function.table.TableFunctionProcessorProvider;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;

import java.util.Collection;
Expand All @@ -53,19 +57,33 @@
import static io.trino.metadata.OperatorNameUtil.unmangleOperator;
import static io.trino.operator.table.ExcludeColumns.getExcludeColumnsFunctionProcessorProvider;
import static io.trino.operator.table.Sequence.getSequenceFunctionProcessorProvider;
import static io.trino.operator.table.json.JsonTable.getJsonTableFunctionProcessorProvider;
import static io.trino.spi.function.FunctionKind.AGGREGATE;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.IntegerType.INTEGER;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

@ThreadSafe
public class GlobalFunctionCatalog
implements FunctionProvider
{
public static final String BUILTIN_SCHEMA = "builtin";

private final Provider<Metadata> metadata;
private final Provider<TypeManager> typeManager;
private final Provider<FunctionManager> functionManager;
private volatile FunctionMap functions = new FunctionMap();

@Inject
public GlobalFunctionCatalog(Provider<Metadata> metadata, Provider<TypeManager> typeManager, Provider<FunctionManager> functionManager)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.functionManager = requireNonNull(functionManager, "functionManager is null");
}

public final synchronized void addFunctions(FunctionBundle functionBundle)
{
for (FunctionMetadata functionMetadata : functionBundle.getFunctions()) {
Expand Down Expand Up @@ -187,6 +205,9 @@ public TableFunctionProcessorProvider getTableFunctionProcessorProvider(Connecto
if (functionHandle instanceof SequenceFunctionHandle) {
return getSequenceFunctionProcessorProvider();
}
if (functionHandle instanceof JsonTableFunctionHandle) {
return getJsonTableFunctionProcessorProvider(metadata.get(), typeManager.get(), functionManager.get());
}

return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2761,7 +2761,10 @@ public MetadataManager build()

GlobalFunctionCatalog globalFunctionCatalog = this.globalFunctionCatalog;
if (globalFunctionCatalog == null) {
globalFunctionCatalog = new GlobalFunctionCatalog();
globalFunctionCatalog = new GlobalFunctionCatalog(
() -> { throw new UnsupportedOperationException(); },
() -> { throw new UnsupportedOperationException(); },
() -> { throw new UnsupportedOperationException(); });
TypeOperators typeOperators = new TypeOperators();
globalFunctionCatalog.addFunctions(SystemFunctionBundle.create(new FeaturesConfig(), typeOperators, new BlockTypeOperators(typeOperators), UNKNOWN));
globalFunctionCatalog.addFunctions(new InternalFunctionBundle(new LiteralFunction(new InternalBlockEncodingSerde(new BlockEncodingManager(), typeManager))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ public TableFunctionOperatorFactory(
{
requireNonNull(planNodeId, "planNodeId is null");
requireNonNull(tableFunctionProvider, "tableFunctionProvider is null");
requireNonNull(catalogHandle, "catalogHandle is null");
requireNonNull(functionHandle, "functionHandle is null");
requireNonNull(requiredChannels, "requiredChannels is null");
requireNonNull(markerChannels, "markerChannels is null");
Expand Down Expand Up @@ -272,6 +271,7 @@ public TableFunctionOperator(

this.operatorContext = operatorContext;
this.session = operatorContext.getSession().toConnectorSession(catalogHandle);

this.processEmptyInput = !pruneWhenEmpty;

PagesIndex pagesIndex = pagesIndexFactory.newPagesIndex(sourceTypes, expectedPositions);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.operator.table.json;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import io.trino.metadata.FunctionManager;
import io.trino.metadata.Metadata;
import io.trino.operator.table.json.execution.JsonTableProcessingFragment;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.block.SqlRow;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
import io.trino.spi.function.table.TableFunctionDataProcessor;
import io.trino.spi.function.table.TableFunctionProcessorProvider;
import io.trino.spi.function.table.TableFunctionProcessorState;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.operator.scalar.json.ParameterUtil.getParametersArray;
import static io.trino.operator.table.json.execution.ExecutionPlanner.getExecutionPlan;
import static io.trino.spi.function.table.TableFunctionProcessorState.Finished.FINISHED;
import static io.trino.spi.function.table.TableFunctionProcessorState.Processed.produced;
import static io.trino.spi.function.table.TableFunctionProcessorState.Processed.usedInput;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.TypeUtils.readNativeValue;
import static io.trino.spi.type.TypeUtils.writeNativeValue;
import static io.trino.type.Json2016Type.JSON_2016;
import static java.util.Objects.requireNonNull;

/**
* Implements feature ISO/IEC 9075-2:2023(E) 7.11 'JSON table'
* including features T824, T827, T838
*/
public class JsonTable
{
private JsonTable() {}

/**
* This class comprises all information necessary to execute the json_table function:
*
* @param processingPlan the root of the processing plan tree
* @param outer the parent-child relationship between the input relation and the processingPlan result
* @param errorOnError the error behavior: true for ERROR ON ERROR, false for EMPTY ON ERROR
* @param parametersType type of the row containing JSON path parameters for the root JSON path. The function expects the parameters row in the channel 1.
* Other channels in the input page correspond to JSON context item (channel 0), and default values for the value columns. Each value column in the processingPlan
* knows the indexes of its default channels.
* @param outputTypes types of the proper columns produced by the function
*/
public record JsonTableFunctionHandle(JsonTablePlanNode processingPlan, boolean outer, boolean errorOnError, RowType parametersType, Type[] outputTypes)
implements ConnectorTableFunctionHandle
{
public JsonTableFunctionHandle
{
requireNonNull(processingPlan, "processingPlan is null");
requireNonNull(parametersType, "parametersType is null");
requireNonNull(outputTypes, "outputTypes is null");
}
}

public static TableFunctionProcessorProvider getJsonTableFunctionProcessorProvider(Metadata metadata, TypeManager typeManager, FunctionManager functionManager)
{
return new TableFunctionProcessorProvider()
{
@Override
public TableFunctionDataProcessor getDataProcessor(ConnectorSession session, ConnectorTableFunctionHandle handle)
{
JsonTableFunctionHandle jsonTableFunctionHandle = (JsonTableFunctionHandle) handle;
Object[] newRow = new Object[jsonTableFunctionHandle.outputTypes().length];
JsonTableProcessingFragment executionPlan = getExecutionPlan(
jsonTableFunctionHandle.processingPlan(),
newRow,
jsonTableFunctionHandle.errorOnError(),
jsonTableFunctionHandle.outputTypes(),
session,
metadata,
typeManager,
functionManager);
return new JsonTableFunctionProcessor(executionPlan, newRow, jsonTableFunctionHandle.outputTypes(), jsonTableFunctionHandle.parametersType(), jsonTableFunctionHandle.outer());
}
};
}

public static class JsonTableFunctionProcessor
implements TableFunctionDataProcessor
{
private final PageBuilder pageBuilder;
private final int properColumnsCount;
private final JsonTableProcessingFragment executionPlan;
private final Object[] newRow;
private final RowType parametersType;
private final boolean outer;

private long totalPositionsProcessed;
private int currentPosition = -1;
private boolean currentPositionAlreadyProduced;

public JsonTableFunctionProcessor(JsonTableProcessingFragment executionPlan, Object[] newRow, Type[] outputTypes, RowType parametersType, boolean outer)
{
this.pageBuilder = new PageBuilder(ImmutableList.<Type>builder()
.add(outputTypes)
.add(BIGINT) // add additional position for pass-through index
.build());
this.properColumnsCount = outputTypes.length;
this.executionPlan = requireNonNull(executionPlan, "executionPlan is null");
this.newRow = requireNonNull(newRow, "newRow is null");
this.parametersType = requireNonNull(parametersType, "parametersType is null");
this.outer = outer;
}

@Override
public TableFunctionProcessorState process(List<Optional<Page>> input)
{
// no more input pages
if (input == null) {
if (pageBuilder.isEmpty()) {
return FINISHED;
}
return flushPageBuilder();
}

Page inputPage = getOnlyElement(input).orElseThrow();
while (!pageBuilder.isFull()) {
// new input page
if (currentPosition == -1) {
if (inputPage.getPositionCount() == 0) {
return usedInput();
}
else {
currentPosition = 0;
currentPositionAlreadyProduced = false;
totalPositionsProcessed++;
SqlRow parametersRow = (SqlRow) readNativeValue(parametersType, inputPage.getBlock(1), currentPosition);
executionPlan.resetRoot(
(JsonNode) readNativeValue(JSON_2016, inputPage.getBlock(0), currentPosition),
inputPage,
currentPosition,
getParametersArray(parametersType, parametersRow));
}
}

// try to get output row for the current position (one position can produce multiple rows)
boolean gotNewRow = executionPlan.getRow();
if (gotNewRow) {
currentPositionAlreadyProduced = true;
addOutputRow();
}
else {
if (outer && !currentPositionAlreadyProduced) {
addNullPaddedRow();
}
// go to next position in the input page
currentPosition++;
if (currentPosition < inputPage.getPositionCount()) {
currentPositionAlreadyProduced = false;
totalPositionsProcessed++;
SqlRow parametersRow = (SqlRow) readNativeValue(parametersType, inputPage.getBlock(1), currentPosition);
executionPlan.resetRoot(
(JsonNode) readNativeValue(JSON_2016, inputPage.getBlock(0), currentPosition),
inputPage,
currentPosition,
getParametersArray(parametersType, parametersRow));
}
else {
currentPosition = -1;
return usedInput();
}
}
}

return flushPageBuilder();
}

private TableFunctionProcessorState flushPageBuilder()
{
TableFunctionProcessorState result = produced(pageBuilder.build());
pageBuilder.reset();
return result;
}

private void addOutputRow()
{
pageBuilder.declarePosition();
for (int channel = 0; channel < properColumnsCount; channel++) {
writeNativeValue(pageBuilder.getType(channel), pageBuilder.getBlockBuilder(channel), newRow[channel]);
}
// pass-through index from partition start
BIGINT.writeLong(pageBuilder.getBlockBuilder(properColumnsCount), totalPositionsProcessed - 1);
}

private void addNullPaddedRow()
{
Arrays.fill(newRow, null);
addOutputRow();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.operator.table.json;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
property = "@type")
@JsonSubTypes({
@JsonSubTypes.Type(value = JsonTableOrdinalityColumn.class, name = "ordinality"),
@JsonSubTypes.Type(value = JsonTableQueryColumn.class, name = "query"),
@JsonSubTypes.Type(value = JsonTableValueColumn.class, name = "value"),
})

public sealed interface JsonTableColumn
permits JsonTableOrdinalityColumn, JsonTableQueryColumn, JsonTableValueColumn
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.operator.table.json;

public record JsonTableOrdinalityColumn(int outputIndex)
implements JsonTableColumn
{
}
Loading