Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,7 @@ jobs:
- { modules: plugin/trino-sqlserver }
- { modules: plugin/trino-memsql }
- { modules: plugin/trino-oracle }
# TODO: re-enable kudu tests after this issue is resolved: https://github.com/trinodb/trino/issues/11203
# - { modules: plugin/trino-kudu }
- { modules: plugin/trino-kudu }
- { modules: plugin/trino-druid }
- { modules: plugin/trino-iceberg }
- { modules: plugin/trino-iceberg, profile: test-failure-recovery }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ public KuduTable openTable(SchemaTableName schemaTableName)
catch (KuduException e) {
log.debug(e, "Error on doOpenTable");
if (!listSchemaNames().contains(schemaTableName.getSchemaName())) {
throw new SchemaNotFoundException(schemaTableName.getSchemaName());
throw new SchemaNotFoundException(schemaTableName.getSchemaName(), e);
}
throw new TableNotFoundException(schemaTableName);
throw new TableNotFoundException(schemaTableName, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduOperationApplier;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.Upsert;
Expand Down Expand Up @@ -58,13 +58,12 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.kudu.client.KuduOperationApplier.applyOperationAndVerifySucceeded;

public class KuduPageSink
implements ConnectorPageSink
{
private final ConnectorSession connectorSession;
private final KuduSession session;
private final KuduClientSession session;
private final KuduTable table;
private final List<Type> columnTypes;
private final List<Type> originalColumnTypes;
Expand Down Expand Up @@ -102,35 +101,35 @@ private KuduPageSink(
this.generateUUID = mapping.isGenerateUUID();

this.table = table;
this.session = clientSession.newSession();
this.session = clientSession;
uuid = UUID.randomUUID().toString();
}

@Override
public CompletableFuture<?> appendPage(Page page)
{
for (int position = 0; position < page.getPositionCount(); position++) {
Upsert upsert = table.newUpsert();
PartialRow row = upsert.getRow();
int start = 0;
if (generateUUID) {
String id = format("%s-%08x", uuid, nextSubId++);
row.addString(0, id);
start = 1;
}

for (int channel = 0; channel < page.getChannelCount(); channel++) {
appendColumn(row, page, position, channel, channel + start);
}

try {
applyOperationAndVerifySucceeded(session, upsert);
}
catch (KuduException e) {
throw new RuntimeException(e);
try (KuduOperationApplier operationApplier = KuduOperationApplier.fromKuduClientSession(session)) {
for (int position = 0; position < page.getPositionCount(); position++) {
Upsert upsert = table.newUpsert();
PartialRow row = upsert.getRow();
int start = 0;
if (generateUUID) {
String id = format("%s-%08x", uuid, nextSubId++);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Should nextSubId be marked volatile (it's a member being updated in a CompletableFuture)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The completablefuture NOT_BLOCKED is just a placeholder to signal the page sink is not blocked. NOT_BLOCKED does not hold a reference to nextSubId, so I do not think there are any issues here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear, non-volatile means that any other thread may not see the new value of nextSubId++ (it's not volatile, so there's no guarantee a separate thread will ever see the updated value, if it had ever read the value before).
If that's not important, then feel free to ignore! (But as an aside, I think it's generally better to be explicit about these scenarios, and I would mark it volatile to be correct - rather than have code which could be "incorrect" under certain circumstances, and just depending on those situations being "impossible" - someone may always re-use this code and fall foul of the invisible update!).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concrete example:

executor.submit(() -> kuduPageSink.appendPage(page));
executor.submit(() -> kuduPageSink.appendPage(anotherPage));

If that were a multi-threaded Exector, this line could yield the same value in both threads:

String id = format("%s-%08x", uuid, nextSubId++);

(both threads read kuduPageSink.nextSubId , both increment it, neither see the value of the other, so it's both a race and a lost update - but even if only 1 thread did updates, because it's not volatile, there's no guarantee anyone who has a reference to kuduPageSink instance will see the update nextSubId++, hence the problem).

On reflection, I'd use AtomicInteger.getAndIncrement() for simplicity sake (and to avoid the race I mentioned - volatile wouldn't protect against that...).

But anyways - just wanted to throw that out there for your consideration!

Copy link
Copy Markdown
Contributor Author

@grantatspothero grantatspothero Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I'm wrong, but page sinks are not expected to be thread safe.

Nowhere in the javadoc for ConnectorPageSink does it list thread safety as a requirement. The closest thing I could find was:

    /**
     * Returns a future that will be completed when the page sink can accept
     * more pages.  If the page sink can accept more pages immediately,
     * this method should return {@code NOT_BLOCKED}.
     */
    CompletableFuture<?> appendPage(Page page);

Here we only return NOT_BLOCKED after the critical section, so there is no risk of concurrent access to nextSubId

row.addString(0, id);
start = 1;
}

for (int channel = 0; channel < page.getChannelCount(); channel++) {
appendColumn(row, page, position, channel, channel + start);
}

operationApplier.applyOperationAsync(upsert);
}
return NOT_BLOCKED;
}
catch (KuduException e) {
throw new RuntimeException(e);
}
return NOT_BLOCKED;
}

private void appendColumn(PartialRow row, Page page, int position, int channel, int destChannel)
Expand Down Expand Up @@ -191,23 +190,11 @@ else if (type instanceof DecimalType) {
@Override
public CompletableFuture<Collection<Slice>> finish()
{
closeSession();
return completedFuture(ImmutableList.of());
}

@Override
public void abort()
{
closeSession();
}

private void closeSession()
{
try {
session.close();
}
catch (KuduException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@
import org.apache.kudu.client.Delete;
import org.apache.kudu.client.KeyEncoderAccessor;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduOperationApplier;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;

import static org.apache.kudu.client.KuduOperationApplier.applyOperationAndVerifySucceeded;

public class KuduUpdatablePageSource
implements UpdatablePageSource
{
Expand All @@ -50,20 +48,14 @@ public KuduUpdatablePageSource(KuduRecordSet recordSet)
public void deleteRows(Block rowIds)
{
Schema schema = table.getSchema();
KuduSession session = clientSession.newSession();
try {
try {
for (int i = 0; i < rowIds.getPositionCount(); i++) {
int len = rowIds.getSliceLength(i);
Slice slice = rowIds.getSlice(i, 0, len);
PartialRow row = KeyEncoderAccessor.decodePrimaryKey(schema, slice.getBytes());
Delete delete = table.newDelete();
RowHelper.copyPrimaryKey(schema, row, delete.getRow());
applyOperationAndVerifySucceeded(session, delete);
}
}
finally {
session.close();
try (KuduOperationApplier operationApplier = KuduOperationApplier.fromKuduClientSession(clientSession)) {
for (int i = 0; i < rowIds.getPositionCount(); i++) {
int len = rowIds.getSliceLength(i);
Slice slice = rowIds.getSlice(i, 0, len);
PartialRow row = KeyEncoderAccessor.decodePrimaryKey(schema, slice.getBytes());
Delete delete = table.newDelete();
RowHelper.copyPrimaryKey(schema, row, delete.getRow());
operationApplier.applyOperationAsync(delete);
}
}
catch (KuduException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@
import org.apache.kudu.Type;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.Delete;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduOperationApplier;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.RowResultIterator;
Expand All @@ -40,7 +39,6 @@
import static io.trino.plugin.kudu.KuduClientSession.DEFAULT_SCHEMA;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
import static org.apache.kudu.client.KuduOperationApplier.applyOperationAndVerifySucceeded;

public class SchemaEmulationByTableNameConvention
implements SchemaEmulation
Expand All @@ -62,17 +60,11 @@ public void createSchema(KuduClientWrapper client, String schemaName)
throw new SchemaAlreadyExistsException(schemaName);
}
else {
try {
try (KuduOperationApplier operationApplier = KuduOperationApplier.fromKuduClientWrapper(client)) {
KuduTable schemasTable = getSchemasTable(client);
KuduSession session = client.newSession();
try {
Upsert upsert = schemasTable.newUpsert();
upsert.getRow().addString(0, schemaName);
applyOperationAndVerifySucceeded(session, upsert);
}
finally {
session.close();
}
Upsert upsert = schemasTable.newUpsert();
upsert.getRow().addString(0, schemaName);
operationApplier.applyOperationAsync(upsert);
}
catch (KuduException e) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, e);
Expand All @@ -99,22 +91,16 @@ public void dropSchema(KuduClientWrapper client, String schemaName)
throw new TrinoException(GENERIC_USER_ERROR, "Deleting default schema not allowed.");
}
else {
try {
try (KuduOperationApplier operationApplier = KuduOperationApplier.fromKuduClientWrapper(client)) {
String prefix = getPrefixForTablesOfSchema(schemaName);
for (String name : client.getTablesList(prefix).getTablesList()) {
client.deleteTable(name);
}

KuduTable schemasTable = getSchemasTable(client);
KuduSession session = client.newSession();
try {
Delete delete = schemasTable.newDelete();
delete.getRow().addString(0, schemaName);
applyOperationAndVerifySucceeded(session, delete);
}
finally {
session.close();
}
Delete delete = schemasTable.newDelete();
delete.getRow().addString(0, schemaName);
operationApplier.applyOperationAsync(delete);
}
catch (KuduException e) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, e);
Expand All @@ -127,9 +113,7 @@ public List<String> listSchemaNames(KuduClientWrapper client)
{
try {
if (rawSchemasTable == null) {
if (!client.tableExists(rawSchemasTableName)) {
createAndFillSchemasTable(client);
}
createAndFillSchemasTable(client);
rawSchemasTable = getSchemasTable(client);
}

Expand Down Expand Up @@ -167,17 +151,29 @@ private void createAndFillSchemasTable(KuduClientWrapper client)
Schema schema = new Schema(ImmutableList.of(schemaColumnSchema));
CreateTableOptions options = new CreateTableOptions();
options.addHashPartitions(ImmutableList.of(schemaColumnSchema.getName()), 2);
KuduTable schemasTable = client.createTable(rawSchemasTableName, schema, options);
KuduSession session = client.newSession();
try {

KuduTable schemasTable = createTableIfNotExists(client, schema, rawSchemasTableName, options);
try (KuduOperationApplier operationApplier = KuduOperationApplier.fromKuduClientWrapper(client)) {
for (String schemaName : existingSchemaNames) {
Insert insert = schemasTable.newInsert();
insert.getRow().addString(0, schemaName);
applyOperationAndVerifySucceeded(session, insert);
Upsert upsert = schemasTable.newUpsert();
upsert.getRow().addString(0, schemaName);
operationApplier.applyOperationAsync(upsert);
}
}
finally {
session.close();
}

private static KuduTable createTableIfNotExists(KuduClientWrapper client, Schema schema, String name, CreateTableOptions options)
throws KuduException
{
try {
return client.createTable(name, schema, options);
}
catch (KuduException e) {
if (e.getStatus().isAlreadyPresent()) {
// Table already exists
return client.openTable(name);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: (For complete correctness) should we do a comparison on the existing table's options to be sure they're effectively "the same"?
(I know this is fixing a race in test code, but in Prod code if someone attempted to create a table w/incompatible options at the same time as someone else, they may (unexpectedly) get back a KuduTable which doesn't match the create options they were trying to apply?)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Looking a little more closely, it seems the only usage is from createAndFillSchemasTable, which means the case I suggested can't manifest - so ignore!).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole approach of emulating schemas in kudu using a kudu table is not safe given kudu's consistency semantics and trino does not propagate client timestamps in kudu.

I talked with some kudu maintainers, impala uses the hive metastore to emulate schemas in kudu, that introduces a big dependency however.

}
throw e;
}
}

Expand Down
Loading