Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode, boolean replace)
{
createTable(session, tableMetadata, SaveMode.FAIL);
return TestingHandle.INSTANCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,15 +612,6 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
}
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
{
Span span = startSpan("beginCreateTable", tableMetadata.getTable());
try (var _ = scopedSpan(span)) {
return delegate.beginCreateTable(session, tableMetadata, layout, retryMode);
}
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode, boolean replace)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ public Optional<ConnectorTableLayout> getLayoutForTableExecute(ConnectorSession
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode, boolean replace)
{
return new MockConnectorOutputTableHandle(tableMetadata.getTable());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,23 +728,6 @@ default void finishStatisticsCollection(ConnectorSession session, ConnectorTable
throw new TrinoException(GENERIC_INTERNAL_ERROR, "ConnectorMetadata beginStatisticsCollection() is implemented without finishStatisticsCollection()");
}

/**
* Begin the atomic creation of a table with data.
* <p>
* If connector does not support execution with retries, the method should throw:
* <pre>
* new TrinoException(NOT_SUPPORTED, "This connector does not support query retries")
* </pre>
* unless {@code retryMode} is set to {@code NO_RETRIES}.
*
* @deprecated use {@link #beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode, boolean replace)}
*/
@Deprecated
default ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
{
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with data");
}

/**
* Begin the atomic creation of a table with data.
* If connector does not support execution with retries, the method should throw:
Expand All @@ -756,10 +739,10 @@ default ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Co
default ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode, boolean replace)
{
// Redirect to deprecated SPI to not break existing connectors
if (!replace) {
return beginCreateTable(session, tableMetadata, layout, retryMode);
if (replace) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
}
throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with data");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,14 +547,6 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl
}
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
{
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) {
return delegate.beginCreateTable(session, tableMetadata, layout, retryMode);
}
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode, boolean replace)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,14 @@ public void dropSchema(ConnectorSession session, String schemaName, boolean casc
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode, boolean replace)
{
if (retryMode != NO_RETRIES) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries");
}
if (replace) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
}
if (tableMetadata.getComment().isPresent()) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with table comment");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1148,9 +1148,12 @@ public Optional<Type> getSupportedType(ConnectorSession session, Map<String, Obj
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode, boolean replace)
{
verifyRetryMode(session, retryMode);
if (replace) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
}
JdbcOutputTableHandle handle = jdbcClient.beginCreateTable(session, tableMetadata);
setRollback(() -> jdbcClient.rollbackCreateTable(session, handle));
return handle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void testSupportsRetriesValidation()
ImmutableSet.of());
ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(new SchemaTableName("example", "numbers"), ImmutableList.of());

assertThatThrownBy(() -> metadata.beginCreateTable(SESSION, tableMetadata, Optional.empty(), RetryMode.RETRIES_ENABLED))
assertThatThrownBy(() -> metadata.beginCreateTable(SESSION, tableMetadata, Optional.empty(), RetryMode.RETRIES_ENABLED, false))
.hasMessageContaining("This connector does not support query or task retries");

assertThatThrownBy(() -> metadata.beginInsert(SESSION, tableHandle, ImmutableList.of(), RetryMode.RETRIES_ENABLED))
Expand All @@ -117,7 +117,7 @@ public void testNonTransactionalInsertValidation()
PropertyMetadata.booleanProperty(JdbcWriteSessionProperties.NON_TRANSACTIONAL_INSERT, "description", true, false)))
.build();

assertThatThrownBy(() -> metadata.beginCreateTable(session, tableMetadata, Optional.empty(), RetryMode.RETRIES_ENABLED))
assertThatThrownBy(() -> metadata.beginCreateTable(session, tableMetadata, Optional.empty(), RetryMode.RETRIES_ENABLED, false))
.hasMessageContaining("Query and task retries are incompatible with non-transactional inserts");

assertThatThrownBy(() -> metadata.beginInsert(session, tableHandle, ImmutableList.of(), RetryMode.RETRIES_ENABLED))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,14 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode, boolean replace)
{
if (retryMode != NO_RETRIES) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries");
}
if (replace) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
}
return igniteClient.beginCreateTable(session, tableMetadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,16 @@ public synchronized void createTable(ConnectorSession session, ConnectorTableMet
if (saveMode == REPLACE) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
}
ConnectorOutputTableHandle outputTableHandle = beginCreateTable(session, tableMetadata, Optional.empty(), NO_RETRIES);
ConnectorOutputTableHandle outputTableHandle = beginCreateTable(session, tableMetadata, Optional.empty(), NO_RETRIES, false);
finishCreateTable(session, outputTableHandle, ImmutableList.of(), ImmutableList.of());
}

@Override
public synchronized MemoryOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
public synchronized MemoryOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode, boolean replace)
{
if (replace) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
}
checkSchemaExists(tableMetadata.getTable().getSchemaName());
checkTableNotExists(tableMetadata.getTable());
long tableId = nextTableId.getAndIncrement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public void tableIsCreatedAfterCommits()
SESSION,
new ConnectorTableMetadata(schemaTableName, ImmutableList.of(), ImmutableMap.of()),
Optional.empty(),
NO_RETRIES);
NO_RETRIES,
false);

metadata.finishCreateTable(SESSION, table, ImmutableList.of(), ImmutableList.of());

Expand Down Expand Up @@ -132,7 +133,8 @@ public void testReadTableBeforeCreationCompleted()
SESSION,
new ConnectorTableMetadata(tableName, ImmutableList.of(), ImmutableMap.of()),
Optional.empty(),
NO_RETRIES);
NO_RETRIES,
false);

List<SchemaTableName> tableNames = metadata.listTables(SESSION, Optional.empty());
assertThat(tableNames.size())
Expand Down Expand Up @@ -287,7 +289,8 @@ public void testCreateTableAndViewInNotExistSchema()
SESSION,
new ConnectorTableMetadata(table1, ImmutableList.of(), ImmutableMap.of()),
Optional.empty(),
NO_RETRIES))
NO_RETRIES,
false))
.hasErrorCode(NOT_FOUND)
.hasMessage("Schema test1 not found");
assertThat(metadata.getTableHandle(SESSION, table1, Optional.empty(), Optional.empty())).isNull();
Expand Down Expand Up @@ -317,7 +320,8 @@ public void testRenameTable()
SESSION,
new ConnectorTableMetadata(tableName, ImmutableList.of(), ImmutableMap.of()),
Optional.empty(),
NO_RETRIES);
NO_RETRIES,
false);
metadata.finishCreateTable(SESSION, table, ImmutableList.of(), ImmutableList.of());

// rename table to schema which does not exist
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,14 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode, boolean replace)
{
if (retryMode != NO_RETRIES) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries");
}
if (replace) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
}
return phoenixClient.beginCreateTable(session, tableMetadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
}
Optional<ConnectorTableLayout> layout = getNewTableLayout(session, tableMetadata);
finishCreateTable(session, beginCreateTable(session, tableMetadata, layout, NO_RETRIES), ImmutableList.of(), ImmutableList.of());
finishCreateTable(session, beginCreateTable(session, tableMetadata, layout, NO_RETRIES, false), ImmutableList.of(), ImmutableList.of());
}

@Override
Expand Down Expand Up @@ -556,11 +556,14 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode, boolean replace)
{
if (retryMode != NO_RETRIES) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries");
}
if (replace) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
}
if (tableMetadata.getComment().isPresent()) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with table comment");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public void testCreateBucketedTableAsSelect()
RaptorPartitioningHandle partitioning = (RaptorPartitioningHandle) layout.getPartitioning().get();
assertThat(partitioning.getDistributionId()).isEqualTo(1);

ConnectorOutputTableHandle outputHandle = metadata.beginCreateTable(SESSION, ordersTable, Optional.of(layout), NO_RETRIES);
ConnectorOutputTableHandle outputHandle = metadata.beginCreateTable(SESSION, ordersTable, Optional.of(layout), NO_RETRIES, false);
metadata.finishCreateTable(SESSION, outputHandle, ImmutableList.of(), ImmutableList.of());

ConnectorTableHandle tableHandle = metadata.getTableHandle(SESSION, DEFAULT_TEST_ORDERS, Optional.empty(), Optional.empty());
Expand Down Expand Up @@ -656,7 +656,7 @@ public void testTransactionTableWrite()
{
// start table creation
long transactionId = 1;
ConnectorOutputTableHandle outputHandle = metadata.beginCreateTable(SESSION, getOrdersTable(), Optional.empty(), NO_RETRIES);
ConnectorOutputTableHandle outputHandle = metadata.beginCreateTable(SESSION, getOrdersTable(), Optional.empty(), NO_RETRIES, false);

// transaction is in progress
assertThat(transactionExists(transactionId)).isTrue();
Expand Down Expand Up @@ -696,7 +696,7 @@ public void testTransactionAbort()
{
// start table creation
long transactionId = 1;
ConnectorOutputTableHandle outputHandle = metadata.beginCreateTable(SESSION, getOrdersTable(), Optional.empty(), NO_RETRIES);
ConnectorOutputTableHandle outputHandle = metadata.beginCreateTable(SESSION, getOrdersTable(), Optional.empty(), NO_RETRIES, false);

// transaction is in progress
assertThat(transactionExists(transactionId)).isTrue();
Expand Down