diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a4e2dd8ba0b1..58b0015db2e9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -397,8 +397,7 @@ jobs: - { modules: plugin/trino-sqlserver } - { modules: plugin/trino-memsql } - { modules: plugin/trino-oracle } - # TODO: re-enable kudu tests after this issue is resolved: https://github.com/trinodb/trino/issues/11203 - # - { modules: plugin/trino-kudu } + - { modules: plugin/trino-kudu } - { modules: plugin/trino-druid } - { modules: plugin/trino-iceberg } - { modules: plugin/trino-iceberg, profile: test-failure-recovery } diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java index 0762a7b2f6aa..6bd4351020d7 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduClientSession.java @@ -234,9 +234,9 @@ public KuduTable openTable(SchemaTableName schemaTableName) catch (KuduException e) { log.debug(e, "Error on doOpenTable"); if (!listSchemaNames().contains(schemaTableName.getSchemaName())) { - throw new SchemaNotFoundException(schemaTableName.getSchemaName()); + throw new SchemaNotFoundException(schemaTableName.getSchemaName(), e); } - throw new TableNotFoundException(schemaTableName); + throw new TableNotFoundException(schemaTableName, e); } } diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java index 7f27c5d16827..961f61dfde93 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduPageSink.java @@ -27,7 +27,7 @@ import io.trino.spi.type.Type; import io.trino.spi.type.VarcharType; import org.apache.kudu.client.KuduException; -import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.KuduOperationApplier; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.PartialRow; import org.apache.kudu.client.Upsert; @@ -58,13 +58,12 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.apache.kudu.client.KuduOperationApplier.applyOperationAndVerifySucceeded; public class KuduPageSink implements ConnectorPageSink { private final ConnectorSession connectorSession; - private final KuduSession session; + private final KuduClientSession session; private final KuduTable table; private final List columnTypes; private final List originalColumnTypes; @@ -102,35 +101,35 @@ private KuduPageSink( this.generateUUID = mapping.isGenerateUUID(); this.table = table; - this.session = clientSession.newSession(); + this.session = clientSession; uuid = UUID.randomUUID().toString(); } @Override public CompletableFuture appendPage(Page page) { - for (int position = 0; position < page.getPositionCount(); position++) { - Upsert upsert = table.newUpsert(); - PartialRow row = upsert.getRow(); - int start = 0; - if (generateUUID) { - String id = format("%s-%08x", uuid, nextSubId++); - row.addString(0, id); - start = 1; - } - - for (int channel = 0; channel < page.getChannelCount(); channel++) { - appendColumn(row, page, position, channel, channel + start); - } - - try { - applyOperationAndVerifySucceeded(session, upsert); - } - catch (KuduException e) { - throw new RuntimeException(e); + try (KuduOperationApplier operationApplier = KuduOperationApplier.fromKuduClientSession(session)) { + for (int position = 0; position < page.getPositionCount(); position++) { + Upsert upsert = table.newUpsert(); + PartialRow row = upsert.getRow(); + int start = 0; + if (generateUUID) { + String id = format("%s-%08x", uuid, nextSubId++); + row.addString(0, id); + start = 1; + } + + for (int channel = 0; channel < page.getChannelCount(); channel++) { + appendColumn(row, page, position, channel, channel + start); + } + + operationApplier.applyOperationAsync(upsert); } + return NOT_BLOCKED; + } + catch (KuduException e) { + throw new RuntimeException(e); } - return NOT_BLOCKED; } private void appendColumn(PartialRow row, Page page, int position, int channel, int destChannel) @@ -191,23 +190,11 @@ else if (type instanceof DecimalType) { @Override public CompletableFuture> finish() { - closeSession(); return completedFuture(ImmutableList.of()); } @Override public void abort() { - closeSession(); - } - - private void closeSession() - { - try { - session.close(); - } - catch (KuduException e) { - throw new RuntimeException(e); - } } } diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduUpdatablePageSource.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduUpdatablePageSource.java index 9f503d5f9113..e76c67159da6 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduUpdatablePageSource.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/KuduUpdatablePageSource.java @@ -22,7 +22,7 @@ import org.apache.kudu.client.Delete; import org.apache.kudu.client.KeyEncoderAccessor; import org.apache.kudu.client.KuduException; -import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.KuduOperationApplier; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.PartialRow; @@ -30,8 +30,6 @@ import java.util.Collections; import java.util.concurrent.CompletableFuture; -import static org.apache.kudu.client.KuduOperationApplier.applyOperationAndVerifySucceeded; - public class KuduUpdatablePageSource implements UpdatablePageSource { @@ -50,20 +48,14 @@ public KuduUpdatablePageSource(KuduRecordSet recordSet) public void deleteRows(Block rowIds) { Schema schema = table.getSchema(); - KuduSession session = clientSession.newSession(); - try { - try { - for (int i = 0; i < rowIds.getPositionCount(); i++) { - int len = rowIds.getSliceLength(i); - Slice slice = rowIds.getSlice(i, 0, len); - PartialRow row = KeyEncoderAccessor.decodePrimaryKey(schema, slice.getBytes()); - Delete delete = table.newDelete(); - RowHelper.copyPrimaryKey(schema, row, delete.getRow()); - applyOperationAndVerifySucceeded(session, delete); - } - } - finally { - session.close(); + try (KuduOperationApplier operationApplier = KuduOperationApplier.fromKuduClientSession(clientSession)) { + for (int i = 0; i < rowIds.getPositionCount(); i++) { + int len = rowIds.getSliceLength(i); + Slice slice = rowIds.getSlice(i, 0, len); + PartialRow row = KeyEncoderAccessor.decodePrimaryKey(schema, slice.getBytes()); + Delete delete = table.newDelete(); + RowHelper.copyPrimaryKey(schema, row, delete.getRow()); + operationApplier.applyOperationAsync(delete); } } catch (KuduException e) { diff --git a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java index e5009f52b325..72d558adab2d 100644 --- a/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java +++ b/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java @@ -23,10 +23,9 @@ import org.apache.kudu.Type; import org.apache.kudu.client.CreateTableOptions; import org.apache.kudu.client.Delete; -import org.apache.kudu.client.Insert; import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduOperationApplier; import org.apache.kudu.client.KuduScanner; -import org.apache.kudu.client.KuduSession; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.RowResult; import org.apache.kudu.client.RowResultIterator; @@ -40,7 +39,6 @@ import static io.trino.plugin.kudu.KuduClientSession.DEFAULT_SCHEMA; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR; -import static org.apache.kudu.client.KuduOperationApplier.applyOperationAndVerifySucceeded; public class SchemaEmulationByTableNameConvention implements SchemaEmulation @@ -62,17 +60,11 @@ public void createSchema(KuduClientWrapper client, String schemaName) throw new SchemaAlreadyExistsException(schemaName); } else { - try { + try (KuduOperationApplier operationApplier = KuduOperationApplier.fromKuduClientWrapper(client)) { KuduTable schemasTable = getSchemasTable(client); - KuduSession session = client.newSession(); - try { - Upsert upsert = schemasTable.newUpsert(); - upsert.getRow().addString(0, schemaName); - applyOperationAndVerifySucceeded(session, upsert); - } - finally { - session.close(); - } + Upsert upsert = schemasTable.newUpsert(); + upsert.getRow().addString(0, schemaName); + operationApplier.applyOperationAsync(upsert); } catch (KuduException e) { throw new TrinoException(GENERIC_INTERNAL_ERROR, e); @@ -99,22 +91,16 @@ public void dropSchema(KuduClientWrapper client, String schemaName) throw new TrinoException(GENERIC_USER_ERROR, "Deleting default schema not allowed."); } else { - try { + try (KuduOperationApplier operationApplier = KuduOperationApplier.fromKuduClientWrapper(client)) { String prefix = getPrefixForTablesOfSchema(schemaName); for (String name : client.getTablesList(prefix).getTablesList()) { client.deleteTable(name); } KuduTable schemasTable = getSchemasTable(client); - KuduSession session = client.newSession(); - try { - Delete delete = schemasTable.newDelete(); - delete.getRow().addString(0, schemaName); - applyOperationAndVerifySucceeded(session, delete); - } - finally { - session.close(); - } + Delete delete = schemasTable.newDelete(); + delete.getRow().addString(0, schemaName); + operationApplier.applyOperationAsync(delete); } catch (KuduException e) { throw new TrinoException(GENERIC_INTERNAL_ERROR, e); @@ -127,9 +113,7 @@ public List listSchemaNames(KuduClientWrapper client) { try { if (rawSchemasTable == null) { - if (!client.tableExists(rawSchemasTableName)) { - createAndFillSchemasTable(client); - } + createAndFillSchemasTable(client); rawSchemasTable = getSchemasTable(client); } @@ -167,17 +151,29 @@ private void createAndFillSchemasTable(KuduClientWrapper client) Schema schema = new Schema(ImmutableList.of(schemaColumnSchema)); CreateTableOptions options = new CreateTableOptions(); options.addHashPartitions(ImmutableList.of(schemaColumnSchema.getName()), 2); - KuduTable schemasTable = client.createTable(rawSchemasTableName, schema, options); - KuduSession session = client.newSession(); - try { + + KuduTable schemasTable = createTableIfNotExists(client, schema, rawSchemasTableName, options); + try (KuduOperationApplier operationApplier = KuduOperationApplier.fromKuduClientWrapper(client)) { for (String schemaName : existingSchemaNames) { - Insert insert = schemasTable.newInsert(); - insert.getRow().addString(0, schemaName); - applyOperationAndVerifySucceeded(session, insert); + Upsert upsert = schemasTable.newUpsert(); + upsert.getRow().addString(0, schemaName); + operationApplier.applyOperationAsync(upsert); } } - finally { - session.close(); + } + + private static KuduTable createTableIfNotExists(KuduClientWrapper client, Schema schema, String name, CreateTableOptions options) + throws KuduException + { + try { + return client.createTable(name, schema, options); + } + catch (KuduException e) { + if (e.getStatus().isAlreadyPresent()) { + // Table already exists + return client.openTable(name); + } + throw e; } } diff --git a/plugin/trino-kudu/src/main/java/org/apache/kudu/client/KuduOperationApplier.java b/plugin/trino-kudu/src/main/java/org/apache/kudu/client/KuduOperationApplier.java index 7891c0e15b33..f57941ff38ab 100644 --- a/plugin/trino-kudu/src/main/java/org/apache/kudu/client/KuduOperationApplier.java +++ b/plugin/trino-kudu/src/main/java/org/apache/kudu/client/KuduOperationApplier.java @@ -13,28 +13,118 @@ */ package org.apache.kudu.client; +import io.trino.plugin.kudu.KuduClientSession; +import io.trino.plugin.kudu.KuduClientWrapper; import io.trino.spi.TrinoException; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkState; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static java.lang.String.format; +import static org.apache.kudu.client.SessionConfiguration.FlushMode.MANUAL_FLUSH; /** - * Operation.getChangeType() is package private + * Not thread safe + * This class is used to buffer operations and apply them in a batch and verify the batch was successfully applied. + * This gives: + * - performance of not flushing after every operation + * - correctness of not flushing in the background and hoping operations succeed + * The buffer is flushed when: + * - The KuduOperationApplier is closed + * - Or if the KuduOperationApplier reaches the bufferMaxOperations + * Note: Operation.getChangeType() is package private */ public final class KuduOperationApplier + implements AutoCloseable { - private KuduOperationApplier() + private static final int bufferMaxOperations = 1000; + + private int currentOperationsInBuffer; + private final KuduSession kuduSession; + + private KuduOperationApplier(KuduSession kuduSession) { + kuduSession.setFlushMode(MANUAL_FLUSH); + kuduSession.setMutationBufferSpace(bufferMaxOperations); + this.kuduSession = kuduSession; + currentOperationsInBuffer = 0; } - public static OperationResponse applyOperationAndVerifySucceeded(KuduSession kuduSession, Operation operation) + public static KuduOperationApplier fromKuduClientWrapper(KuduClientWrapper kuduClientWrapper) + { + KuduSession session = kuduClientWrapper.newSession(); + return new KuduOperationApplier(session); + } + + public static KuduOperationApplier fromKuduClientSession(KuduClientSession kuduClientSession) + { + KuduSession session = kuduClientSession.newSession(); + return new KuduOperationApplier(session); + } + + /** + * Not thread safe + * Applies an operation without waiting for it to be flushed, operations are flushed in the background + * @param operation kudu operation + * @throws KuduException + */ + public void applyOperationAsync(Operation operation) throws KuduException { + if (currentOperationsInBuffer >= bufferMaxOperations) { + List operationResponses = kuduSession.flush(); + verifyNoErrors(operationResponses); + } OperationResponse operationResponse = kuduSession.apply(operation); - if (operationResponse != null && operationResponse.hasRowError()) { - throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Error while applying kudu operation %s: %s", - operation.getChangeType().toString(), operationResponse.getRowError())); + checkState(operationResponse == null, "KuduSession must be configured with MANUAL_FLUSH mode"); + currentOperationsInBuffer += 1; + } + + private void verifyNoErrors(List operationResponses) + { + List failedOperations = operationResponses.stream() + .map(FailedOperation::fromOperationResponse) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + + if (!failedOperations.isEmpty()) { + FailedOperation firstError = failedOperations.get(0); + throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Error while applying %s kudu operation(s); First error: %s: %s", + failedOperations.size(), + firstError.operationResponse.getOperation().getChangeType().toString(), + firstError.rowError)); + } + currentOperationsInBuffer = 0; + } + + @Override + public void close() + throws KuduException + { + List operationResponses = kuduSession.close(); + verifyNoErrors(operationResponses); + } + + private static class FailedOperation + { + public RowError rowError; + public OperationResponse operationResponse; + + public static Optional fromOperationResponse(OperationResponse operationResponse) + { + return Optional.ofNullable(operationResponse) + .flatMap(response -> Optional.ofNullable(response.getRowError())) + .map(rowError -> new FailedOperation(operationResponse, rowError)); + } + + private FailedOperation(OperationResponse operationResponse, RowError rowError) + { + this.operationResponse = operationResponse; + this.rowError = rowError; } - return operationResponse; } } diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduWithDisabledInferSchemaConnectorTest.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduWithDisabledInferSchemaConnectorTest.java index d1d927867da1..548e8177656b 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduWithDisabledInferSchemaConnectorTest.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduWithDisabledInferSchemaConnectorTest.java @@ -13,10 +13,16 @@ */ package io.trino.plugin.kudu; +import io.trino.tpch.TpchTable; import org.testng.annotations.Test; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; +import static java.util.stream.Collectors.toUnmodifiableList; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; public abstract class AbstractKuduWithDisabledInferSchemaConnectorTest @@ -31,7 +37,16 @@ protected Optional getKuduSchemaEmulationPrefix() @Test public void testListingOfTableForDefaultSchema() { - assertQuery("SHOW TABLES FROM default", "VALUES ('customer'), ('nation'), ('orders'), ('region')"); + // Test methods may run in parallel and create tables in the default schema + // Assert at least the TPCH tables exist but there may be more + List rows = new ArrayList<>(computeActual("SHOW TABLES FROM default").getMaterializedRows()) + .stream() + .map(row -> ((String) row.getField(0))) + .collect(toUnmodifiableList()); + assertThat(rows).containsAll( + REQUIRED_TPCH_TABLES.stream() + .map(TpchTable::getTableName) + .collect(Collectors.toList())); } @Test diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduWithStandardInferSchemaConnectorTest.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduWithStandardInferSchemaConnectorTest.java index 6e44ec4fdad8..28689d44c8a3 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduWithStandardInferSchemaConnectorTest.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduWithStandardInferSchemaConnectorTest.java @@ -32,7 +32,9 @@ protected Optional getKuduSchemaEmulationPrefix() @Test public void testListingOfTableForDefaultSchema() { - assertEquals(computeActual("SHOW TABLES FROM default").getRowCount(), 0); + // The special $schemas table is created when listing schema names with schema emulation enabled + // Depending on test ordering, this table may or may not be created when this test runs, so filter it out + assertEquals(computeActual("SHOW TABLES FROM default LIKE '%$schemas'").getRowCount(), 0); } @Test diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestingKuduServer.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestingKuduServer.java index cb9d0679e9f1..2aaa204bc2ea 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestingKuduServer.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestingKuduServer.java @@ -102,7 +102,10 @@ public TestingKuduServer(String kuduVersion) public HostAndPort getMasterAddress() { - return HostAndPort.fromParts(master.getContainerIpAddress(), master.getMappedPort(KUDU_MASTER_PORT)); + // Do not use master.getContainerIpAddress(), it returns "localhost" which the kudu client resolves to: + // localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1 + // Instead explicitly list only the ipv4 loopback address 127.0.0.1 + return HostAndPort.fromParts("127.0.0.1", master.getMappedPort(KUDU_MASTER_PORT)); } @Override