Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc
.collect(toImmutableList());
}

String insertSql = jdbcClient.buildInsertSql(handle, columnWriters);
String sinkSql = getSinkSql(jdbcClient, handle, columnWriters);
try {
insertSql = remoteQueryModifier.apply(session, insertSql);
statement = connection.prepareStatement(insertSql);
sinkSql = remoteQueryModifier.apply(session, sinkSql);
statement = connection.prepareStatement(sinkSql);
}
catch (TrinoException e) {
throw closeAllSuppress(e, connection);
Expand All @@ -128,6 +128,11 @@ public JdbcPageSink(ConnectorSession session, JdbcOutputTableHandle handle, Jdbc
this.maxBatchSize = getWriteBatchSize(session);
}

protected String getSinkSql(JdbcClient jdbcClient, JdbcOutputTableHandle outputTableHandle, List<WriteFunction> columnWriters)
{
return jdbcClient.buildInsertSql(outputTableHandle, columnWriters);
}

@Override
public CompletableFuture<?> appendPage(Page page)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_JOIN_PUSHDOWN_WITH_VARCHAR_EQUALITY;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_JOIN_PUSHDOWN_WITH_VARCHAR_INEQUALITY;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_LIMIT_PUSHDOWN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_MERGE;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_PREDICATE_ARITHMETIC_EXPRESSION_PUSHDOWN;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_PREDICATE_EXPRESSION_PUSHDOWN_WITH_LIKE;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_PREDICATE_PUSHDOWN;
Expand Down Expand Up @@ -1545,6 +1546,10 @@ public void testDeleteWithVarcharGreaterAndLowerPredicate()
public void testDeleteWithComplexPredicate()
{
skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE) && hasBehavior(SUPPORTS_ROW_LEVEL_DELETE));
if (hasBehavior(SUPPORTS_MERGE)) {
super.testDeleteWithComplexPredicate();
return;
}
assertThatThrownBy(super::testDeleteWithComplexPredicate)
.hasStackTraceContaining("TrinoException: " + MODIFYING_ROWS_MESSAGE);
}
Expand All @@ -1553,6 +1558,10 @@ public void testDeleteWithComplexPredicate()
public void testDeleteWithSubquery()
{
skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE) && hasBehavior(SUPPORTS_ROW_LEVEL_DELETE));
if (hasBehavior(SUPPORTS_MERGE)) {
super.testDeleteWithSubquery();
return;
}
assertThatThrownBy(super::testDeleteWithSubquery)
.hasStackTraceContaining("TrinoException: " + MODIFYING_ROWS_MESSAGE);
}
Expand All @@ -1561,6 +1570,10 @@ public void testDeleteWithSubquery()
public void testExplainAnalyzeWithDeleteWithSubquery()
{
skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE) && hasBehavior(SUPPORTS_ROW_LEVEL_DELETE));
if (hasBehavior(SUPPORTS_MERGE)) {
super.testExplainAnalyzeWithDeleteWithSubquery();
return;
}
assertThatThrownBy(super::testExplainAnalyzeWithDeleteWithSubquery)
.hasStackTraceContaining("TrinoException: " + MODIFYING_ROWS_MESSAGE);
}
Expand All @@ -1569,6 +1582,10 @@ public void testExplainAnalyzeWithDeleteWithSubquery()
public void testDeleteWithSemiJoin()
{
skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE) && hasBehavior(SUPPORTS_ROW_LEVEL_DELETE));
if (hasBehavior(SUPPORTS_MERGE)) {
super.testDeleteWithSemiJoin();
return;
}
assertThatThrownBy(super::testDeleteWithSemiJoin)
.hasStackTraceContaining("TrinoException: " + MODIFYING_ROWS_MESSAGE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
Expand All @@ -55,6 +56,7 @@
import io.trino.spi.type.CharType;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
Expand Down Expand Up @@ -122,6 +124,9 @@
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterators.tryFind;
import static io.trino.plugin.jdbc.DecimalConfig.DecimalMapping.ALLOW_OVERFLOW;
import static io.trino.plugin.jdbc.DecimalSessionSessionProperties.getDecimalDefaultScale;
import static io.trino.plugin.jdbc.DecimalSessionSessionProperties.getDecimalRounding;
Expand Down Expand Up @@ -206,7 +211,12 @@
public class PhoenixClient
extends BaseJdbcClient
{
private static final String ROWKEY = "ROWKEY";
public static final String MERGE_ROW_ID_COLUMN_NAME = "$merge_row_id";
public static final String ROWKEY = "ROWKEY";
public static final JdbcColumnHandle ROWKEY_COLUMN_HANDLE = new JdbcColumnHandle(
ROWKEY,
new JdbcTypeHandle(Types.BIGINT, Optional.of("BIGINT"), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()),
BIGINT);

private static final String DATE_FORMAT = "y-MM-dd G";
private static final DateTimeFormatter LOCAL_DATE_FORMATTER = DateTimeFormatter.ofPattern(DATE_FORMAT);
Expand Down Expand Up @@ -904,4 +914,85 @@ private static ResultSet getResultSet(PhoenixInputSplit split, QueryPlan queryPl
throw new TrinoException(PhoenixErrorCode.PHOENIX_INTERNAL_ERROR, "Error while copying scan", e);
}
}

public JdbcTableHandle buildPlainTable(JdbcTableHandle handle)
{
checkArgument(handle.isNamedRelation(), "Only allow build plain table from named relation table");

SchemaTableName schemaTableName = handle.getRequiredNamedRelation().getSchemaTableName();
RemoteTableName remoteTableName = handle.getRequiredNamedRelation().getRemoteTableName();
return new JdbcTableHandle(schemaTableName, remoteTableName, Optional.empty());
}

public JdbcTableHandle updatedScanColumnTable(ConnectorSession session, ConnectorTableHandle table, Optional<List<JdbcColumnHandle>> originalColumns, JdbcColumnHandle mergeRowIdColumnHandle)
{
JdbcTableHandle tableHandle = (JdbcTableHandle) table;
if (originalColumns.isEmpty()) {
return tableHandle;
}
List<JdbcColumnHandle> scanColumnHandles = originalColumns.get();
checkArgument(!scanColumnHandles.isEmpty(), "Scan columns should not empty");
checkArgument(tryFind(scanColumnHandles.iterator(), column -> MERGE_ROW_ID_COLUMN_NAME.equalsIgnoreCase(column.getColumnName())).isPresent(), "Merge row id column must exist in original columns");

return new JdbcTableHandle(
tableHandle.getRelationHandle(),
tableHandle.getConstraint(),
tableHandle.getConstraintExpressions(),
tableHandle.getSortOrder(),
tableHandle.getLimit(),
Optional.of(getUpdatedScanColumnHandles(session, tableHandle, scanColumnHandles, mergeRowIdColumnHandle)),
tableHandle.getOtherReferencedTables(),
tableHandle.getNextSyntheticColumnId(),
tableHandle.getAuthorization());
}

private List<JdbcColumnHandle> getUpdatedScanColumnHandles(ConnectorSession session, JdbcTableHandle tableHandle, List<JdbcColumnHandle> scanColumnHandles, JdbcColumnHandle mergeRowIdColumnHandle)
{
RowType columnType = (RowType) mergeRowIdColumnHandle.getColumnType();
List<JdbcColumnHandle> primaryKeyColumnHandles = getPrimaryKeyColumnHandles(session, tableHandle);
Set<String> mergeRowIdFieldNames = columnType.getFields().stream()
.map(RowType.Field::getName)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toImmutableSet());
Set<String> primaryKeyColumnNames = primaryKeyColumnHandles.stream()
.map(JdbcColumnHandle::getColumnName)
.collect(toImmutableSet());
checkArgument(mergeRowIdFieldNames.containsAll(primaryKeyColumnNames), "Merge row id fields should contains all primary keys");

ImmutableList.Builder<JdbcColumnHandle> columnHandleBuilder = ImmutableList.builder();
scanColumnHandles.stream()
.filter(jdbcColumnHandle -> !MERGE_ROW_ID_COLUMN_NAME.equalsIgnoreCase(jdbcColumnHandle.getColumnName()))
.forEach(columnHandleBuilder::add);
// Add merge row id fields
for (JdbcColumnHandle columnHandle : primaryKeyColumnHandles) {
String columnName = columnHandle.getColumnName();
if (ROWKEY.equalsIgnoreCase(columnName)) {
checkArgument(primaryKeyColumnHandles.size() == 1, "Wrong primary keys");
columnHandleBuilder.add(ROWKEY_COLUMN_HANDLE);
break;
}

if (!tryFind(scanColumnHandles.iterator(), column -> column.getColumnName().equalsIgnoreCase(columnName)).isPresent()) {
columnHandleBuilder.add(columnHandle);
}
}

return columnHandleBuilder.build();
}

public List<JdbcColumnHandle> getPrimaryKeyColumnHandles(ConnectorSession session, JdbcTableHandle tableHandle)
{
if (tableHandle.getColumns().isPresent()) {
tableHandle = buildPlainTable(tableHandle);
}

Map<String, Object> tableProperties = getTableProperties(session, tableHandle);
List<JdbcColumnHandle> primaryKeyColumnHandles = getColumns(session, tableHandle)
.stream()
.filter(columnHandle -> PhoenixColumnProperties.isPrimaryKey(columnHandle.getColumnMetadata(), tableProperties))
.collect(toImmutableList());
verify(!primaryKeyColumnHandles.isEmpty(), "Phoenix primary key is empty");
return primaryKeyColumnHandles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorRecordSetProvider;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
import io.trino.plugin.base.classloader.ForClassLoaderSafe;
import io.trino.plugin.jdbc.ConfiguringConnectionFactory;
Expand All @@ -41,8 +41,6 @@
import io.trino.plugin.jdbc.JdbcDynamicFilteringSplitManager;
import io.trino.plugin.jdbc.JdbcMetadataConfig;
import io.trino.plugin.jdbc.JdbcMetadataSessionProperties;
import io.trino.plugin.jdbc.JdbcPageSinkProvider;
import io.trino.plugin.jdbc.JdbcRecordSetProvider;
import io.trino.plugin.jdbc.JdbcWriteConfig;
import io.trino.plugin.jdbc.JdbcWriteSessionProperties;
import io.trino.plugin.jdbc.LazyConnectionFactory;
Expand All @@ -58,7 +56,7 @@
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorRecordSetProvider;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorSplitManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -102,10 +100,11 @@ protected void setup(Binder binder)
binder.bind(ConnectorSplitManager.class).annotatedWith(ForJdbcDynamicFiltering.class).to(PhoenixSplitManager.class).in(Scopes.SINGLETON);
binder.bind(ConnectorSplitManager.class).annotatedWith(ForClassLoaderSafe.class).to(JdbcDynamicFilteringSplitManager.class).in(Scopes.SINGLETON);
binder.bind(ConnectorSplitManager.class).to(ClassLoaderSafeConnectorSplitManager.class).in(Scopes.SINGLETON);
binder.bind(ConnectorRecordSetProvider.class).annotatedWith(ForClassLoaderSafe.class).to(JdbcRecordSetProvider.class).in(Scopes.SINGLETON);
binder.bind(ConnectorRecordSetProvider.class).to(ClassLoaderSafeConnectorRecordSetProvider.class).in(Scopes.SINGLETON);
binder.bind(ConnectorPageSinkProvider.class).annotatedWith(ForClassLoaderSafe.class).to(JdbcPageSinkProvider.class).in(Scopes.SINGLETON);
binder.bind(ConnectorPageSinkProvider.class).annotatedWith(ForClassLoaderSafe.class).to(PhoenixPageSinkProvider.class).in(Scopes.SINGLETON);
binder.bind(ConnectorPageSinkProvider.class).to(ClassLoaderSafeConnectorPageSinkProvider.class).in(Scopes.SINGLETON);
binder.bind(ConnectorPageSourceProvider.class).annotatedWith(ForClassLoaderSafe.class).to(PhoenixPageSourceProvider.class).in(Scopes.SINGLETON);
binder.bind(ConnectorPageSourceProvider.class).to(ClassLoaderSafeConnectorPageSourceProvider.class).in(Scopes.SINGLETON);

binder.bind(QueryBuilder.class).to(DefaultQueryBuilder.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, Key.get(int.class, MaxDomainCompactionThreshold.class));
configBinder(binder).bindConfigDefaults(JdbcMetadataConfig.class, config -> config.setDomainCompactionThreshold(DEFAULT_DOMAIN_COMPACTION_THRESHOLD));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorRecordSetProvider;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorTransactionHandle;
Expand All @@ -40,8 +40,8 @@ public class PhoenixConnector
private final LifeCycleManager lifeCycleManager;
private final ConnectorMetadata metadata;
private final ConnectorSplitManager splitManager;
private final ConnectorRecordSetProvider recordSetProvider;
private final ConnectorPageSinkProvider pageSinkProvider;
private final ConnectorPageSourceProvider pageSourceProvider;
private final List<PropertyMetadata<?>> tableProperties;
private final PhoenixColumnProperties columnProperties;
private final List<PropertyMetadata<?>> sessionProperties;
Expand All @@ -51,17 +51,17 @@ public PhoenixConnector(
LifeCycleManager lifeCycleManager,
ConnectorMetadata metadata,
ConnectorSplitManager splitManager,
ConnectorRecordSetProvider recordSetProvider,
ConnectorPageSinkProvider pageSinkProvider,
ConnectorPageSourceProvider pageSourceProvider,
Set<TablePropertiesProvider> tableProperties,
PhoenixColumnProperties columnProperties,
Set<SessionPropertiesProvider> sessionProperties)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
this.tableProperties = tableProperties.stream()
.flatMap(tablePropertiesProvider -> tablePropertiesProvider.getTableProperties().stream())
.collect(toImmutableList());
Expand Down Expand Up @@ -90,15 +90,15 @@ public ConnectorSplitManager getSplitManager()
}

@Override
public ConnectorRecordSetProvider getRecordSetProvider()
public ConnectorPageSinkProvider getPageSinkProvider()
{
return recordSetProvider;
return pageSinkProvider;
}

@Override
public ConnectorPageSinkProvider getPageSinkProvider()
public ConnectorPageSourceProvider getPageSourceProvider()
{
return pageSinkProvider;
return pageSourceProvider;
}

@Override
Expand Down
Loading