From 79b144900c23b4be3243c8180f3f3b26bd640e90 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Fri, 17 Jan 2025 12:16:06 +0100 Subject: [PATCH 1/3] Cassandra: allow extracting keyspace from statement result --- .../decorator/DatabaseClientDecorator.java | 20 +++++--- .../test/groovy/CassandraClientTest.groovy | 49 +++++++++++++------ .../cassandra/CassandraClientDecorator.java | 14 ++++++ .../test/groovy/CassandraClientTest.groovy | 49 +++++++++++++------ .../cassandra4/CassandraClientDecorator.java | 27 ++++++++-- .../test/groovy/CassandraClientTest.groovy | 42 +++++++++++----- .../datadog/trace/api/ConfigDefaults.java | 1 + .../config/TraceInstrumentationConfig.java | 2 + .../main/java/datadog/trace/api/Config.java | 11 +++++ 9 files changed, 160 insertions(+), 55 deletions(-) diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/DatabaseClientDecorator.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/DatabaseClientDecorator.java index bef0194b1b4..9f88059611e 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/DatabaseClientDecorator.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/DatabaseClientDecorator.java @@ -70,14 +70,7 @@ public String getDbType() { public AgentSpan onConnection(final AgentSpan span, final CONNECTION connection) { if (connection != null) { span.setTag(Tags.DB_USER, dbUser(connection)); - final String instanceName = dbInstance(connection); - span.setTag(Tags.DB_INSTANCE, instanceName); - - String serviceName = dbClientService(instanceName); - if (null != serviceName) { - span.setServiceName(serviceName); - } - + onInstance(span, dbInstance(connection)); CharSequence hostName = dbHostname(connection); if (hostName != null) { span.setTag(Tags.PEER_HOSTNAME, hostName); @@ -90,6 +83,17 @@ public AgentSpan onConnection(final AgentSpan span, final CONNECTION connection) return span; } + protected AgentSpan onInstance(final AgentSpan span, final String dbInstance) { + if (dbInstance != null) { + span.setTag(Tags.DB_INSTANCE, dbInstance); + String serviceName = dbClientService(dbInstance); + if (null != serviceName) { + span.setServiceName(serviceName); + } + } + return span; + } + public String dbService(final String dbType, final String instanceName) { if (instanceName != null && Config.get().isDbClientSplitByInstance()) { return dbClientService(instanceName); diff --git a/dd-java-agent/instrumentation/datastax-cassandra-3.8/src/test/groovy/CassandraClientTest.groovy b/dd-java-agent/instrumentation/datastax-cassandra-3.8/src/test/groovy/CassandraClientTest.groovy index 946e1b762a5..f97bed3a7b2 100644 --- a/dd-java-agent/instrumentation/datastax-cassandra-3.8/src/test/groovy/CassandraClientTest.groovy +++ b/dd-java-agent/instrumentation/datastax-cassandra-3.8/src/test/groovy/CassandraClientTest.groovy @@ -1,5 +1,6 @@ import static datadog.trace.agent.test.utils.TraceUtils.basicSpan import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace +import static datadog.trace.api.config.TraceInstrumentationConfig.CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED import static datadog.trace.api.config.TraceInstrumentationConfig.DB_CLIENT_HOST_SPLIT_BY_INSTANCE import com.datastax.driver.core.Cluster @@ -57,8 +58,11 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { def "test sync"() { setup: - Session session = cluster.connect(keyspace) + Session session = keyspace ? cluster.connect(keyspace) : cluster.connect() injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService") + if (extractFromStatement) { + injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true") + } when: session.execute(statement) @@ -71,7 +75,7 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { } } trace(1) { - cassandraSpan(it, statement, keyspace, renameService) + cassandraSpan(it, statement, expectedKeySpace, renameService) } } @@ -79,12 +83,19 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { session.close() where: - statement | keyspace | renameService - "DROP KEYSPACE IF EXISTS sync_test" | null | false - "CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | true - "CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | false - "INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "sync_test" | false - "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "sync_test" | true + statement | keyspace | expectedKeySpace | renameService | extractFromStatement + "DROP KEYSPACE IF EXISTS sync_test" | null | null | false | true + "DROP KEYSPACE IF EXISTS a_ks" | null | null | false | true + "CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | true + "CREATE KEYSPACE a_ks WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | false | true + "CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | "sync_test" | false | true + "INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "sync_test" | "sync_test" | false | true + "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "sync_test" | "sync_test" | true | true + "SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "sync_test" | false | true + "SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "sync_test" | true | true + "SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | "sync_test" | false | true + "SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | "sync_test" | true | true + "SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | null | false | false } def "test async"() { @@ -92,6 +103,9 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { def callbackExecuted = new CountDownLatch(1) injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService") + if (extractFromStatement) { + injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true") + } when: Session session = cluster.connect(keyspace) @@ -117,7 +131,7 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { trace(3) { sortSpansByStart() basicSpan(it, "parent") - cassandraSpan(it, statement, keyspace, renameService, span(0)) + cassandraSpan(it, statement, expectedKeySpace, renameService, span(0)) basicSpan(it, "callbackListener", span(0)) } } @@ -126,12 +140,17 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { session.close() where: - statement | keyspace | renameService - "DROP KEYSPACE IF EXISTS async_test" | null | false - "CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | true - "CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | false - "INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "async_test" | false - "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "async_test" | true + statement | keyspace | expectedKeySpace | renameService | extractFromStatement + "DROP KEYSPACE IF EXISTS async_test" | null | null | false | false + "DROP KEYSPACE IF EXISTS a_ks" | null | null | false | false + "CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | false + "CREATE KEYSPACE a_ks WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | false + "CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | "async_test" | false | false + "INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "async_test" | "async_test" | false | false + "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "async_test" | "async_test" | false | false + "SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | null | null | false | false + "SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "a_ks" | false | false + "SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | null | "async_test" | true | true } String normalize(String statement){ diff --git a/dd-java-agent/instrumentation/datastax-cassandra-3/src/main/java/datadog/trace/instrumentation/datastax/cassandra/CassandraClientDecorator.java b/dd-java-agent/instrumentation/datastax-cassandra-3/src/main/java/datadog/trace/instrumentation/datastax/cassandra/CassandraClientDecorator.java index ffe7c81fb25..fcc2f087c8a 100644 --- a/dd-java-agent/instrumentation/datastax-cassandra-3/src/main/java/datadog/trace/instrumentation/datastax/cassandra/CassandraClientDecorator.java +++ b/dd-java-agent/instrumentation/datastax-cassandra-3/src/main/java/datadog/trace/instrumentation/datastax/cassandra/CassandraClientDecorator.java @@ -1,8 +1,12 @@ package datadog.trace.instrumentation.datastax.cassandra; +import static datadog.trace.bootstrap.instrumentation.api.Tags.DB_INSTANCE; + +import com.datastax.driver.core.ColumnDefinitions; import com.datastax.driver.core.Host; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; +import datadog.trace.api.Config; import datadog.trace.api.cache.DDCache; import datadog.trace.api.cache.DDCaches; import datadog.trace.api.naming.SpanNaming; @@ -11,6 +15,7 @@ import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.bootstrap.instrumentation.decorator.DBTypeProcessingDatabaseClientDecorator; +import datadog.trace.util.Strings; import java.util.function.ToIntFunction; public class CassandraClientDecorator extends DBTypeProcessingDatabaseClientDecorator { @@ -82,6 +87,15 @@ public AgentSpan onResponse(final AgentSpan span, final ResultSet result) { if (result != null) { final Host host = result.getExecutionInfo().getQueriedHost(); onPeerConnection(span, host.getSocketAddress()); + if (Config.get().isCassandraKeyspaceStatementExtractionEnabled()) { + final ColumnDefinitions defs = result.getColumnDefinitions(); + if (defs != null && defs.size() > 0) { + final String keySpace = defs.getKeyspace(0); + if (Strings.isNotBlank(keySpace) && !keySpace.equals(span.getTag(DB_INSTANCE))) { + onInstance(span, keySpace); + } + } + } } return span; } diff --git a/dd-java-agent/instrumentation/datastax-cassandra-3/src/test/groovy/CassandraClientTest.groovy b/dd-java-agent/instrumentation/datastax-cassandra-3/src/test/groovy/CassandraClientTest.groovy index 946e1b762a5..f97bed3a7b2 100644 --- a/dd-java-agent/instrumentation/datastax-cassandra-3/src/test/groovy/CassandraClientTest.groovy +++ b/dd-java-agent/instrumentation/datastax-cassandra-3/src/test/groovy/CassandraClientTest.groovy @@ -1,5 +1,6 @@ import static datadog.trace.agent.test.utils.TraceUtils.basicSpan import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace +import static datadog.trace.api.config.TraceInstrumentationConfig.CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED import static datadog.trace.api.config.TraceInstrumentationConfig.DB_CLIENT_HOST_SPLIT_BY_INSTANCE import com.datastax.driver.core.Cluster @@ -57,8 +58,11 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { def "test sync"() { setup: - Session session = cluster.connect(keyspace) + Session session = keyspace ? cluster.connect(keyspace) : cluster.connect() injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService") + if (extractFromStatement) { + injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true") + } when: session.execute(statement) @@ -71,7 +75,7 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { } } trace(1) { - cassandraSpan(it, statement, keyspace, renameService) + cassandraSpan(it, statement, expectedKeySpace, renameService) } } @@ -79,12 +83,19 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { session.close() where: - statement | keyspace | renameService - "DROP KEYSPACE IF EXISTS sync_test" | null | false - "CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | true - "CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | false - "INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "sync_test" | false - "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "sync_test" | true + statement | keyspace | expectedKeySpace | renameService | extractFromStatement + "DROP KEYSPACE IF EXISTS sync_test" | null | null | false | true + "DROP KEYSPACE IF EXISTS a_ks" | null | null | false | true + "CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | true + "CREATE KEYSPACE a_ks WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | false | true + "CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | "sync_test" | false | true + "INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "sync_test" | "sync_test" | false | true + "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "sync_test" | "sync_test" | true | true + "SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "sync_test" | false | true + "SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "sync_test" | true | true + "SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | "sync_test" | false | true + "SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | "sync_test" | true | true + "SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | null | false | false } def "test async"() { @@ -92,6 +103,9 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { def callbackExecuted = new CountDownLatch(1) injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService") + if (extractFromStatement) { + injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true") + } when: Session session = cluster.connect(keyspace) @@ -117,7 +131,7 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { trace(3) { sortSpansByStart() basicSpan(it, "parent") - cassandraSpan(it, statement, keyspace, renameService, span(0)) + cassandraSpan(it, statement, expectedKeySpace, renameService, span(0)) basicSpan(it, "callbackListener", span(0)) } } @@ -126,12 +140,17 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { session.close() where: - statement | keyspace | renameService - "DROP KEYSPACE IF EXISTS async_test" | null | false - "CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | true - "CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | false - "INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "async_test" | false - "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "async_test" | true + statement | keyspace | expectedKeySpace | renameService | extractFromStatement + "DROP KEYSPACE IF EXISTS async_test" | null | null | false | false + "DROP KEYSPACE IF EXISTS a_ks" | null | null | false | false + "CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | false + "CREATE KEYSPACE a_ks WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | false + "CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | "async_test" | false | false + "INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "async_test" | "async_test" | false | false + "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "async_test" | "async_test" | false | false + "SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | null | null | false | false + "SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "a_ks" | false | false + "SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | null | "async_test" | true | true } String normalize(String statement){ diff --git a/dd-java-agent/instrumentation/datastax-cassandra-4/src/main/java/datadog/trace/instrumentation/datastax/cassandra4/CassandraClientDecorator.java b/dd-java-agent/instrumentation/datastax-cassandra-4/src/main/java/datadog/trace/instrumentation/datastax/cassandra4/CassandraClientDecorator.java index dc1cb7d5b31..73badd7fbd2 100644 --- a/dd-java-agent/instrumentation/datastax-cassandra-4/src/main/java/datadog/trace/instrumentation/datastax/cassandra4/CassandraClientDecorator.java +++ b/dd-java-agent/instrumentation/datastax-cassandra-4/src/main/java/datadog/trace/instrumentation/datastax/cassandra4/CassandraClientDecorator.java @@ -1,10 +1,14 @@ package datadog.trace.instrumentation.datastax.cassandra4; +import static datadog.trace.bootstrap.instrumentation.api.Tags.DB_INSTANCE; + import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.servererrors.CoordinatorException; import com.datastax.oss.driver.api.core.session.Session; +import datadog.trace.api.Config; import datadog.trace.api.cache.DDCache; import datadog.trace.api.cache.DDCaches; import datadog.trace.api.naming.SpanNaming; @@ -13,6 +17,7 @@ import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.bootstrap.instrumentation.decorator.DBTypeProcessingDatabaseClientDecorator; +import datadog.trace.util.Strings; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Objects; @@ -84,7 +89,8 @@ public AgentSpan onStatement(final AgentSpan span, final CharSequence statement) public AgentSpan onResponse(final AgentSpan span, final ResultSet result) { if (result != null) { - return onResponse(span, result.getExecutionInfo().getCoordinator()); + return onResponse( + span, result.getExecutionInfo().getCoordinator(), result.getColumnDefinitions()); } return span; @@ -92,7 +98,8 @@ public AgentSpan onResponse(final AgentSpan span, final ResultSet result) { public AgentSpan onResponse(final AgentSpan span, final AsyncResultSet result) { if (result != null) { - return onResponse(span, result.getExecutionInfo().getCoordinator()); + return onResponse( + span, result.getExecutionInfo().getCoordinator(), result.getColumnDefinitions()); } return span; @@ -103,20 +110,30 @@ public AgentSpan onError(final AgentSpan span, final Throwable throwable) { super.onError(span, throwable); if (throwable instanceof CoordinatorException) { - onResponse(span, ((CoordinatorException) throwable).getCoordinator()); + onResponse(span, ((CoordinatorException) throwable).getCoordinator(), null); } return span; } - private AgentSpan onResponse(AgentSpan span, Node coordinator) { + private AgentSpan onResponse(AgentSpan span, Node coordinator, ColumnDefinitions columns) { if (coordinator != null) { SocketAddress address = coordinator.getEndPoint().resolve(); if (address instanceof InetSocketAddress) { onPeerConnection(span, (InetSocketAddress) address); } } - + try { + if (Config.get().isCassandraKeyspaceStatementExtractionEnabled() + && columns != null + && columns.size() > 0) { + final String keySpace = columns.get(0).getKeyspace().toString(); + if (Strings.isNotBlank(keySpace) && !keySpace.equals(span.getTag(DB_INSTANCE))) { + onInstance(span, keySpace); + } + } + } catch (final Throwable ignored) { + } return span; } } diff --git a/dd-java-agent/instrumentation/datastax-cassandra-4/src/test/groovy/CassandraClientTest.groovy b/dd-java-agent/instrumentation/datastax-cassandra-4/src/test/groovy/CassandraClientTest.groovy index 21bb4bf3ff3..35eabe01d9e 100644 --- a/dd-java-agent/instrumentation/datastax-cassandra-4/src/test/groovy/CassandraClientTest.groovy +++ b/dd-java-agent/instrumentation/datastax-cassandra-4/src/test/groovy/CassandraClientTest.groovy @@ -1,3 +1,5 @@ +import static datadog.trace.api.config.TraceInstrumentationConfig.CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED + import com.datastax.oss.driver.api.core.CqlSession import com.datastax.oss.driver.api.core.config.DefaultDriverOption import com.datastax.oss.driver.api.core.config.DriverConfigLoader @@ -50,7 +52,9 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { runUnderTrace("setup") { Session session = sessionBuilder().build() session.execute("DROP KEYSPACE IF EXISTS test_keyspace") + session.execute("DROP KEYSPACE IF EXISTS a_ks") session.execute("CREATE KEYSPACE test_keyspace WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}") + session.execute("CREATE KEYSPACE a_ks WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}") session.execute("CREATE TABLE test_keyspace.users ( id UUID PRIMARY KEY, name text )") } @@ -66,6 +70,9 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { setup: Session session = sessionBuilder().withKeyspace((String) keyspace).build() injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService") + if (extractFromStatement) { + injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true") + } when: session.execute(statement) @@ -73,7 +80,7 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { then: assertTraces(1) { trace(1) { - cassandraSpan(it, statement, keyspace, renameService) + cassandraSpan(it, statement, expectedKeySpace, renameService) } } @@ -81,11 +88,17 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { session?.close() where: - statement | keyspace | renameService - "DROP KEYSPACE IF EXISTS does_not_exist" | null | false - "DROP KEYSPACE IF EXISTS does_not_exist" | null | true - "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "test_keyspace" | false - "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "test_keyspace" | true + + statement | keyspace | expectedKeySpace | renameService | extractFromStatement + "DROP KEYSPACE IF EXISTS does_not_exist" | null | null | false | true + "DROP KEYSPACE IF EXISTS does_not_exist" | null | null | false | true + "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "test_keyspace" | "test_keyspace" | false | true + "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "test_keyspace" | "test_keyspace" | true | true + "SELECT * FROM test_keyspace.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "test_keyspace" | false | true + "SELECT * FROM test_keyspace.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "test_keyspace" | true | true + "SELECT * FROM test_keyspace.users where name = 'alice' ALLOW FILTERING" | null | "test_keyspace" | false | true + "SELECT * FROM test_keyspace.users where name = 'alice' ALLOW FILTERING" | null | "test_keyspace" | true | true + "SELECT * FROM test_keyspace.users where name = 'alice' ALLOW FILTERING" | null | null | false | false } def "test sync with error"() { @@ -120,6 +133,9 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { setup: CqlSession session = sessionBuilder().withKeyspace((String) keyspace).build() injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService") + if (extractFromStatement) { + injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true") + } def callbackExecuted = new CountDownLatch(1) @@ -141,7 +157,7 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { trace(3) { sortSpansByStart() basicSpan(it, "parent") - cassandraSpan(it, statement, keyspace, renameService, span(0)) + cassandraSpan(it, statement, expectedKeySpace, renameService, span(0)) basicSpan(it, "callbackListener", span(0)) } } @@ -150,11 +166,13 @@ abstract class CassandraClientTest extends VersionedNamingTestBase { session?.close() where: - statement | keyspace | renameService - "DROP KEYSPACE IF EXISTS does_not_exist" | null | false - "DROP KEYSPACE IF EXISTS does_not_exist" | null | true - "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "test_keyspace" | false - "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "test_keyspace" | true + statement | keyspace | expectedKeySpace | renameService | extractFromStatement + "DROP KEYSPACE IF EXISTS does_not_exist" | null | null | false | false + "DROP KEYSPACE IF EXISTS does_not_exist" | null | null | false | false + "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "test_keyspace" | "test_keyspace" | false | false + "SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "test_keyspace" | "test_keyspace" | true | false + "SELECT * FROM test_keyspace.users where name = 'alice' ALLOW FILTERING" | null | null | false | false + "SELECT * FROM test_keyspace.users where name = 'alice' ALLOW FILTERING" | null | "test_keyspace" | true | true } def "test async with error"() { diff --git a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java index 816a9c0145b..8bbdc38f272 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java @@ -243,6 +243,7 @@ public final class ConfigDefaults { static final long DEFAULT_TRACE_POST_PROCESSING_TIMEOUT = 1000; // 1 second + static final boolean DEFAULT_CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED = false; static final boolean DEFAULT_COUCHBASE_INTERNAL_SPANS_ENABLED = true; static final boolean DEFAULT_ELASTICSEARCH_BODY_ENABLED = false; static final boolean DEFAULT_ELASTICSEARCH_PARAMS_ENABLED = true; diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java index f15f2899d79..6535e1c464a 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/TraceInstrumentationConfig.java @@ -148,6 +148,8 @@ public final class TraceInstrumentationConfig { public static final String RESOLVER_USE_URL_CACHES = "resolver.use.url.caches"; public static final String RESOLVER_RESET_INTERVAL = "resolver.reset.interval"; public static final String RESOLVER_NAMES_ARE_UNIQUE = "resolver.names.are.unique"; + public static final String CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED = + "trace.cassandra.keyspace.statement.extraction.enabled"; public static final String COUCHBASE_INTERNAL_SPANS_ENABLED = "trace.couchbase.internal-spans.enabled"; public static final String ELASTICSEARCH_BODY_ENABLED = "trace.elasticsearch.body.enabled"; diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index 83813153077..c3d4b9e0071 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -519,6 +519,7 @@ public static String getHostName() { private final boolean longRunningTraceEnabled; private final long longRunningTraceInitialFlushInterval; private final long longRunningTraceFlushInterval; + private final boolean cassandraKeyspaceStatementExtractionEnabled; private final boolean couchbaseInternalSpansEnabled; private final boolean elasticsearchBodyEnabled; private final boolean elasticsearchParamsEnabled; @@ -622,6 +623,10 @@ private Config(final ConfigProvider configProvider, final InstrumenterConfig ins } else { secureRandom = configProvider.getBoolean(SECURE_RANDOM, DEFAULT_SECURE_RANDOM); } + cassandraKeyspaceStatementExtractionEnabled = + configProvider.getBoolean( + CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, + DEFAULT_CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED); couchbaseInternalSpansEnabled = configProvider.getBoolean( COUCHBASE_INTERNAL_SPANS_ENABLED, DEFAULT_COUCHBASE_INTERNAL_SPANS_ENABLED); @@ -3333,6 +3338,10 @@ public BitSet getGrpcClientErrorStatuses() { return grpcClientErrorStatuses; } + public boolean isCassandraKeyspaceStatementExtractionEnabled() { + return cassandraKeyspaceStatementExtractionEnabled; + } + public boolean isCouchbaseInternalSpansEnabled() { return couchbaseInternalSpansEnabled; } @@ -4557,6 +4566,8 @@ public String toString() { + longRunningTraceInitialFlushInterval + ", longRunningTraceFlushInterval=" + longRunningTraceFlushInterval + + ", cassandraKeyspaceStatementExtractionEnabled=" + + cassandraKeyspaceStatementExtractionEnabled + ", couchbaseInternalSpansEnabled=" + couchbaseInternalSpansEnabled + ", elasticsearchBodyEnabled=" From f4a7d1d72846a4976e5ba5085fd350a5247360c5 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Fri, 17 Jan 2025 14:13:41 +0100 Subject: [PATCH 2/3] testing --- .../decorator/DatabaseClientDecoratorTest.groovy | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/decorator/DatabaseClientDecoratorTest.groovy b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/decorator/DatabaseClientDecoratorTest.groovy index 0269f51d5a8..30d6ff6fe94 100644 --- a/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/decorator/DatabaseClientDecoratorTest.groovy +++ b/dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/decorator/DatabaseClientDecoratorTest.groovy @@ -47,7 +47,9 @@ class DatabaseClientDecoratorTest extends ClientDecoratorTest { then: if (session) { 1 * span.setTag(Tags.DB_USER, session.user) - 1 * span.setTag(Tags.DB_INSTANCE, session.instance) + if (session.instance != null) { + 1 * span.setTag(Tags.DB_INSTANCE, session.instance) + } if (session.hostname != null) { 1 * span.setTag(Tags.PEER_HOSTNAME, session.hostname) } From 94da60d25503153a1a337ab735471b88c3bcd46c Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Tue, 21 Jan 2025 13:06:50 +0100 Subject: [PATCH 3/3] review --- .../cassandra/CassandraClientDecorator.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/dd-java-agent/instrumentation/datastax-cassandra-3/src/main/java/datadog/trace/instrumentation/datastax/cassandra/CassandraClientDecorator.java b/dd-java-agent/instrumentation/datastax-cassandra-3/src/main/java/datadog/trace/instrumentation/datastax/cassandra/CassandraClientDecorator.java index fcc2f087c8a..09f83cd84fa 100644 --- a/dd-java-agent/instrumentation/datastax-cassandra-3/src/main/java/datadog/trace/instrumentation/datastax/cassandra/CassandraClientDecorator.java +++ b/dd-java-agent/instrumentation/datastax-cassandra-3/src/main/java/datadog/trace/instrumentation/datastax/cassandra/CassandraClientDecorator.java @@ -87,14 +87,17 @@ public AgentSpan onResponse(final AgentSpan span, final ResultSet result) { if (result != null) { final Host host = result.getExecutionInfo().getQueriedHost(); onPeerConnection(span, host.getSocketAddress()); - if (Config.get().isCassandraKeyspaceStatementExtractionEnabled()) { - final ColumnDefinitions defs = result.getColumnDefinitions(); - if (defs != null && defs.size() > 0) { - final String keySpace = defs.getKeyspace(0); - if (Strings.isNotBlank(keySpace) && !keySpace.equals(span.getTag(DB_INSTANCE))) { - onInstance(span, keySpace); + try { + if (Config.get().isCassandraKeyspaceStatementExtractionEnabled()) { + final ColumnDefinitions defs = result.getColumnDefinitions(); + if (defs != null && defs.size() > 0) { + final String keySpace = defs.getKeyspace(0); + if (Strings.isNotBlank(keySpace) && !keySpace.equals(span.getTag(DB_INSTANCE))) { + onInstance(span, keySpace); + } } } + } catch (final Throwable ignored) { } } return span;