diff --git a/plugin/trino-cassandra/pom.xml b/plugin/trino-cassandra/pom.xml index 1975daaec625..86e38593f3a2 100644 --- a/plugin/trino-cassandra/pom.xml +++ b/plugin/trino-cassandra/pom.xml @@ -87,6 +87,11 @@ units + + io.opentelemetry.instrumentation + opentelemetry-cassandra-4.4 + + io.trino trino-plugin-toolkit diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClientModule.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClientModule.java index ef9877615165..d4a877fec9c2 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClientModule.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraClientModule.java @@ -28,6 +28,8 @@ import com.google.inject.Scopes; import com.google.inject.Singleton; import io.airlift.json.JsonCodec; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.cassandra.v4_4.CassandraTelemetry; import io.trino.plugin.cassandra.ptf.Query; import io.trino.spi.TrinoException; import io.trino.spi.function.table.ConnectorTableFunction; @@ -110,7 +112,11 @@ protected Type _deserialize(String value, DeserializationContext context) @Singleton @Provides - public static CassandraSession createCassandraSession(CassandraTypeManager cassandraTypeManager, CassandraClientConfig config, JsonCodec> extraColumnMetadataCodec) + public static CassandraSession createCassandraSession( + CassandraTypeManager cassandraTypeManager, + CassandraClientConfig config, + JsonCodec> extraColumnMetadataCodec, + OpenTelemetry openTelemetry) { requireNonNull(extraColumnMetadataCodec, "extraColumnMetadataCodec is null"); @@ -177,7 +183,8 @@ public static CassandraSession createCassandraSession(CassandraTypeManager cassa () -> { contactPoints.forEach(contactPoint -> cqlSessionBuilder.addContactPoint( createInetSocketAddress(contactPoint, config.getNativeProtocolPort()))); - return cqlSessionBuilder.build(); + CassandraTelemetry cassandraTelemetry = CassandraTelemetry.create(openTelemetry); + return cassandraTelemetry.wrap(cqlSessionBuilder.build()); }, config.getNoHostAvailableRetryTimeout()); } diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraConnectorFactory.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraConnectorFactory.java index 409ca30405b3..1b1a0c43bb5e 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraConnectorFactory.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/CassandraConnectorFactory.java @@ -16,6 +16,7 @@ import com.google.inject.Injector; import io.airlift.bootstrap.Bootstrap; import io.airlift.json.JsonModule; +import io.opentelemetry.api.OpenTelemetry; import io.trino.plugin.base.jmx.MBeanServerModule; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorContext; @@ -43,6 +44,7 @@ public Connector create(String catalogName, Map config, Connecto checkStrictSpiVersionMatch(context, this); Bootstrap app = new Bootstrap( + binder -> binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry()), new MBeanModule(), new JsonModule(), new CassandraClientModule(context.getTypeManager()), diff --git a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/ptf/Query.java b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/ptf/Query.java index 0711c7915a92..5fa121920ef5 100644 --- a/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/ptf/Query.java +++ b/plugin/trino-cassandra/src/main/java/io/trino/plugin/cassandra/ptf/Query.java @@ -15,6 +15,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import com.google.inject.Provider; @@ -24,6 +25,7 @@ import io.trino.plugin.cassandra.CassandraMetadata; import io.trino.plugin.cassandra.CassandraQueryRelationHandle; import io.trino.plugin.cassandra.CassandraTableHandle; +import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorAccessControl; import io.trino.spi.connector.ConnectorSession; @@ -38,6 +40,7 @@ import io.trino.spi.function.table.ScalarArgumentSpecification; import io.trino.spi.function.table.TableFunctionAnalysis; +import java.lang.reflect.UndeclaredThrowableException; import java.util.List; import java.util.Map; import java.util.Optional; @@ -45,6 +48,7 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT; import static io.trino.spi.function.table.ReturnTypeSpecification.GenericTable.GENERIC_TABLE; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.util.Objects.requireNonNull; @@ -98,7 +102,13 @@ public TableFunctionAnalysis analyze( String query = ((Slice) argument.getValue()).toStringUtf8(); CassandraQueryRelationHandle queryRelationHandle = new CassandraQueryRelationHandle(query); - List columnHandles = cassandraMetadata.getColumnHandles(query); + List columnHandles; + try { + columnHandles = cassandraMetadata.getColumnHandles(query); + } + catch (UndeclaredThrowableException e) { + throw new TrinoException(INVALID_FUNCTION_ARGUMENT, "Cannot get column definition", Throwables.getRootCause(e)); + } checkState(!columnHandles.isEmpty(), "Handle doesn't have columns info"); Descriptor returnedType = new Descriptor(columnHandles.stream() .map(CassandraColumnHandle.class::cast) diff --git a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java index 9d6e7a7086c1..4c0111c801a6 100644 --- a/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java +++ b/plugin/trino-cassandra/src/test/java/io/trino/plugin/cassandra/TestCassandraConnectorTest.java @@ -1463,7 +1463,8 @@ public void testNativeQueryPreparingStatementFailure() String tableName = "test_insert" + randomNameSuffix(); assertFalse(getQueryRunner().tableExists(getSession(), tableName)); assertThatThrownBy(() -> query("SELECT * FROM TABLE(cassandra.system.query(query => 'INSERT INTO tpch." + tableName + "(col) VALUES (1)'))")) - .hasMessageContaining("unconfigured table"); + .hasMessage("Cannot get column definition") + .hasStackTraceContaining("unconfigured table"); } @Test @@ -1490,7 +1491,8 @@ public void testNativeQueryUnsupportedStatement() public void testNativeQueryIncorrectSyntax() { assertThatThrownBy(() -> query("SELECT * FROM TABLE(system.query(query => 'some wrong syntax'))")) - .hasMessageContaining("no viable alternative at input 'some'"); + .hasMessage("Cannot get column definition") + .hasStackTraceContaining("no viable alternative at input 'some'"); } @Override