Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ protected JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorT
columnNames.build(),
columnTypes.build(),
Optional.empty(),
remoteTargetTableName);
Optional.of(remoteTargetTableName));
}
}

Expand Down Expand Up @@ -582,7 +582,7 @@ public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTabl
columnNames.build(),
columnTypes.build(),
Optional.of(jdbcColumnTypes.build()),
remoteTable);
Optional.empty());
}

String remoteTemporaryTableName = identifierMapping.toRemoteTableName(identity, connection, remoteSchema, generateTemporaryTableName());
Expand All @@ -595,7 +595,7 @@ public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, JdbcTabl
columnNames.build(),
columnTypes.build(),
Optional.of(jdbcColumnTypes.build()),
remoteTemporaryTableName);
Optional.of(remoteTemporaryTableName));
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
Expand Down Expand Up @@ -626,7 +626,7 @@ public void commitCreateTable(ConnectorSession session, JdbcOutputTableHandle ha
session,
handle.getCatalogName(),
handle.getSchemaName(),
handle.getTemporaryTableName(),
handle.getTemporaryTableName().orElseThrow(() -> new IllegalStateException("Temporary table name missing")),
new SchemaTableName(handle.getSchemaName(), handle.getTableName()));
}

Expand Down Expand Up @@ -659,14 +659,14 @@ protected void renameTable(ConnectorSession session, String catalogName, String
public void finishInsertTable(ConnectorSession session, JdbcOutputTableHandle handle)
{
if (isNonTransactionalInsert(session)) {
checkState(handle.getTemporaryTableName().equals(handle.getTableName()), "Unexpected use of temporary table when non transactional inserts are enabled");
checkState(handle.getTemporaryTableName().isEmpty(), "Unexpected use of temporary table when non transactional inserts are enabled");
return;
}

RemoteTableName temporaryTable = new RemoteTableName(
Optional.ofNullable(handle.getCatalogName()),
Optional.ofNullable(handle.getSchemaName()),
handle.getTemporaryTableName());
handle.getTemporaryTableName().orElseThrow());
RemoteTableName targetTable = new RemoteTableName(
Optional.ofNullable(handle.getCatalogName()),
Optional.ofNullable(handle.getSchemaName()),
Expand Down Expand Up @@ -755,11 +755,13 @@ private void dropTable(ConnectorSession session, RemoteTableName remoteTableName
@Override
public void rollbackCreateTable(ConnectorSession session, JdbcOutputTableHandle handle)
{
dropTable(session, new JdbcTableHandle(
new SchemaTableName(handle.getSchemaName(), handle.getTemporaryTableName()),
handle.getCatalogName(),
handle.getSchemaName(),
handle.getTemporaryTableName()));
if (handle.getTemporaryTableName().isPresent()) {
dropTable(session, new JdbcTableHandle(
new SchemaTableName(handle.getSchemaName(), handle.getTemporaryTableName().get()),
handle.getCatalogName(),
handle.getSchemaName(),
handle.getTemporaryTableName().get()));
}
}

@Override
Expand All @@ -768,7 +770,7 @@ public String buildInsertSql(JdbcOutputTableHandle handle, List<WriteFunction> c
checkArgument(handle.getColumnNames().size() == columnWriters.size(), "handle and columnWriters mismatch: %s, %s", handle, columnWriters);
return format(
"INSERT INTO %s (%s) VALUES (%s)",
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTemporaryTableName()),
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTemporaryTableName().orElseGet(handle::getTableName)),
handle.getColumnNames().stream()
.map(this::quoted)
.collect(joining(", ")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class JdbcOutputTableHandle
private final List<String> columnNames;
private final List<Type> columnTypes;
private final Optional<List<JdbcTypeHandle>> jdbcColumnTypes;
private final String temporaryTableName;
private final Optional<String> temporaryTableName;

@JsonCreator
public JdbcOutputTableHandle(
Expand All @@ -49,7 +49,7 @@ public JdbcOutputTableHandle(
@JsonProperty("columnNames") List<String> columnNames,
@JsonProperty("columnTypes") List<Type> columnTypes,
@JsonProperty("jdbcColumnTypes") Optional<List<JdbcTypeHandle>> jdbcColumnTypes,
@JsonProperty("temporaryTableName") String temporaryTableName)
@JsonProperty("temporaryTableName") Optional<String> temporaryTableName)
{
this.catalogName = catalogName;
this.schemaName = schemaName;
Expand Down Expand Up @@ -105,7 +105,7 @@ public Optional<List<JdbcTypeHandle>> getJdbcColumnTypes()
}

@JsonProperty
public String getTemporaryTableName()
public Optional<String> getTemporaryTableName()
{
return temporaryTableName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void testJsonRoundTrip()
ImmutableList.of("abc", "xyz"),
ImmutableList.of(VARCHAR, VARCHAR),
Optional.empty(),
"tmp_table");
Optional.of("tmp_table"));

assertJsonRoundTrip(OUTPUT_TABLE_CODEC, handleForCreate);

Expand All @@ -46,7 +46,7 @@ public void testJsonRoundTrip()
ImmutableList.of("abc", "xyz"),
ImmutableList.of(VARCHAR, VARCHAR),
Optional.of(ImmutableList.of(JDBC_VARCHAR, JDBC_VARCHAR)),
"tmp_table");
Optional.of("tmp_table"));

assertJsonRoundTrip(OUTPUT_TABLE_CODEC, handleForInsert);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public PhoenixOutputTableHandle(
@JsonProperty("jdbcColumnTypes") Optional<List<JdbcTypeHandle>> jdbcColumnTypes,
@JsonProperty("rowkeyColumn") Optional<String> rowkeyColumn)
{
super("", schemaName, tableName, columnNames, columnTypes, jdbcColumnTypes, "");
super("", schemaName, tableName, columnNames, columnTypes, jdbcColumnTypes, Optional.empty());
this.rowkeyColumn = requireNonNull(rowkeyColumn, "rowkeyColumn is null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public PhoenixOutputTableHandle(
@JsonProperty("jdbcColumnTypes") Optional<List<JdbcTypeHandle>> jdbcColumnTypes,
@JsonProperty("rowkeyColumn") Optional<String> rowkeyColumn)
{
super("", schemaName, tableName, columnNames, columnTypes, jdbcColumnTypes, "");
super("", schemaName, tableName, columnNames, columnTypes, jdbcColumnTypes, Optional.empty());
this.rowkeyColumn = requireNonNull(rowkeyColumn, "rowkeyColumn is null");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,20 @@ public void testForeignTable()
onRemoteDatabase().execute("DROP SERVER devnull");
}

@Test
public void testErrorDuringInsert()
{
onRemoteDatabase().execute("CREATE TABLE test_with_constraint (x bigint primary key)");
assertTrue(getQueryRunner().tableExists(getSession(), "test_with_constraint"));
Session nonTransactional = Session.builder(getSession())
.setCatalogSessionProperty("postgresql", "non_transactional_insert", "true")
.build();
assertUpdate(nonTransactional, "INSERT INTO test_with_constraint VALUES (1)", 1);
assertQueryFails(nonTransactional, "INSERT INTO test_with_constraint VALUES (1)", "[\\s\\S]*ERROR: duplicate key value[\\s\\S]*");
assertTrue(getQueryRunner().tableExists(getSession(), "test_with_constraint"));
onRemoteDatabase().execute("DROP TABLE test_with_constraint");
}

@Test
public void testSystemTable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ protected void enableTableLockOnBulkLoadTableOption(ConnectorSession session, Jd
// 'table lock on bulk load' table option causes the bulk load processes on user-defined tables to obtain a bulk update lock
// note: this is not a request to lock a table immediately
String sql = format("EXEC sp_tableoption '%s', 'table lock on bulk load', '1'",
quoted(table.getCatalogName(), table.getSchemaName(), table.getTemporaryTableName()));
quoted(table.getCatalogName(), table.getSchemaName(), table.getTemporaryTableName().orElseGet(table::getTableName)));
execute(connection, sql);
}
catch (SQLException e) {
Expand Down