Skip to content

Commit

Permalink
Cassandra: allow extracting keyspace from statement result (#8239)
Browse files Browse the repository at this point in the history
* Cassandra: allow extracting keyspace from statement result

* testing

* review
  • Loading branch information
amarziali authored Jan 21, 2025
1 parent 693c605 commit 2b72cb5
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,7 @@ public String getDbType() {
public AgentSpan onConnection(final AgentSpan span, final CONNECTION connection) {
if (connection != null) {
span.setTag(Tags.DB_USER, dbUser(connection));
final String instanceName = dbInstance(connection);
span.setTag(Tags.DB_INSTANCE, instanceName);

String serviceName = dbClientService(instanceName);
if (null != serviceName) {
span.setServiceName(serviceName);
}

onInstance(span, dbInstance(connection));
CharSequence hostName = dbHostname(connection);
if (hostName != null) {
span.setTag(Tags.PEER_HOSTNAME, hostName);
Expand All @@ -90,6 +83,17 @@ public AgentSpan onConnection(final AgentSpan span, final CONNECTION connection)
return span;
}

protected AgentSpan onInstance(final AgentSpan span, final String dbInstance) {
if (dbInstance != null) {
span.setTag(Tags.DB_INSTANCE, dbInstance);
String serviceName = dbClientService(dbInstance);
if (null != serviceName) {
span.setServiceName(serviceName);
}
}
return span;
}

public String dbService(final String dbType, final String instanceName) {
if (instanceName != null && Config.get().isDbClientSplitByInstance()) {
return dbClientService(instanceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ class DatabaseClientDecoratorTest extends ClientDecoratorTest {
then:
if (session) {
1 * span.setTag(Tags.DB_USER, session.user)
1 * span.setTag(Tags.DB_INSTANCE, session.instance)
if (session.instance != null) {
1 * span.setTag(Tags.DB_INSTANCE, session.instance)
}
if (session.hostname != null) {
1 * span.setTag(Tags.PEER_HOSTNAME, session.hostname)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
import static datadog.trace.api.config.TraceInstrumentationConfig.CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED
import static datadog.trace.api.config.TraceInstrumentationConfig.DB_CLIENT_HOST_SPLIT_BY_INSTANCE

import com.datastax.driver.core.Cluster
Expand Down Expand Up @@ -57,8 +58,11 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
def "test sync"() {
setup:

Session session = cluster.connect(keyspace)
Session session = keyspace ? cluster.connect(keyspace) : cluster.connect()
injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService")
if (extractFromStatement) {
injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true")
}

when:
session.execute(statement)
Expand All @@ -71,27 +75,37 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
}
}
trace(1) {
cassandraSpan(it, statement, keyspace, renameService)
cassandraSpan(it, statement, expectedKeySpace, renameService)
}
}

cleanup:
session.close()

where:
statement | keyspace | renameService
"DROP KEYSPACE IF EXISTS sync_test" | null | false
"CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | true
"CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | false
"INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "sync_test" | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "sync_test" | true
statement | keyspace | expectedKeySpace | renameService | extractFromStatement
"DROP KEYSPACE IF EXISTS sync_test" | null | null | false | true
"DROP KEYSPACE IF EXISTS a_ks" | null | null | false | true
"CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | true
"CREATE KEYSPACE a_ks WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | false | true
"CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | "sync_test" | false | true
"INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "sync_test" | "sync_test" | false | true
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "sync_test" | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "sync_test" | false | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | "sync_test" | false | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | null | false | false
}

def "test async"() {
setup:

def callbackExecuted = new CountDownLatch(1)
injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService")
if (extractFromStatement) {
injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true")
}

when:
Session session = cluster.connect(keyspace)
Expand All @@ -117,7 +131,7 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
trace(3) {
sortSpansByStart()
basicSpan(it, "parent")
cassandraSpan(it, statement, keyspace, renameService, span(0))
cassandraSpan(it, statement, expectedKeySpace, renameService, span(0))
basicSpan(it, "callbackListener", span(0))
}
}
Expand All @@ -126,12 +140,17 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
session.close()

where:
statement | keyspace | renameService
"DROP KEYSPACE IF EXISTS async_test" | null | false
"CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | true
"CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | false
"INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "async_test" | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "async_test" | true
statement | keyspace | expectedKeySpace | renameService | extractFromStatement
"DROP KEYSPACE IF EXISTS async_test" | null | null | false | false
"DROP KEYSPACE IF EXISTS a_ks" | null | null | false | false
"CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | false
"CREATE KEYSPACE a_ks WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | false
"CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | "async_test" | false | false
"INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "async_test" | "async_test" | false | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "async_test" | "async_test" | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | null | null | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "a_ks" | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | null | "async_test" | true | true
}

String normalize(String statement){
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package datadog.trace.instrumentation.datastax.cassandra;

import static datadog.trace.bootstrap.instrumentation.api.Tags.DB_INSTANCE;

import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import datadog.trace.api.Config;
import datadog.trace.api.cache.DDCache;
import datadog.trace.api.cache.DDCaches;
import datadog.trace.api.naming.SpanNaming;
Expand All @@ -11,6 +15,7 @@
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.bootstrap.instrumentation.decorator.DBTypeProcessingDatabaseClientDecorator;
import datadog.trace.util.Strings;
import java.util.function.ToIntFunction;

public class CassandraClientDecorator extends DBTypeProcessingDatabaseClientDecorator<Session> {
Expand Down Expand Up @@ -82,6 +87,18 @@ public AgentSpan onResponse(final AgentSpan span, final ResultSet result) {
if (result != null) {
final Host host = result.getExecutionInfo().getQueriedHost();
onPeerConnection(span, host.getSocketAddress());
try {
if (Config.get().isCassandraKeyspaceStatementExtractionEnabled()) {
final ColumnDefinitions defs = result.getColumnDefinitions();
if (defs != null && defs.size() > 0) {
final String keySpace = defs.getKeyspace(0);
if (Strings.isNotBlank(keySpace) && !keySpace.equals(span.getTag(DB_INSTANCE))) {
onInstance(span, keySpace);
}
}
}
} catch (final Throwable ignored) {
}
}
return span;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
import static datadog.trace.api.config.TraceInstrumentationConfig.CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED
import static datadog.trace.api.config.TraceInstrumentationConfig.DB_CLIENT_HOST_SPLIT_BY_INSTANCE

import com.datastax.driver.core.Cluster
Expand Down Expand Up @@ -57,8 +58,11 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
def "test sync"() {
setup:

Session session = cluster.connect(keyspace)
Session session = keyspace ? cluster.connect(keyspace) : cluster.connect()
injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService")
if (extractFromStatement) {
injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true")
}

when:
session.execute(statement)
Expand All @@ -71,27 +75,37 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
}
}
trace(1) {
cassandraSpan(it, statement, keyspace, renameService)
cassandraSpan(it, statement, expectedKeySpace, renameService)
}
}

cleanup:
session.close()

where:
statement | keyspace | renameService
"DROP KEYSPACE IF EXISTS sync_test" | null | false
"CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | true
"CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | false
"INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "sync_test" | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "sync_test" | true
statement | keyspace | expectedKeySpace | renameService | extractFromStatement
"DROP KEYSPACE IF EXISTS sync_test" | null | null | false | true
"DROP KEYSPACE IF EXISTS a_ks" | null | null | false | true
"CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | true
"CREATE KEYSPACE a_ks WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | false | true
"CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | "sync_test" | false | true
"INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "sync_test" | "sync_test" | false | true
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "sync_test" | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "sync_test" | false | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | "sync_test" | false | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | null | false | false
}

def "test async"() {
setup:

def callbackExecuted = new CountDownLatch(1)
injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService")
if (extractFromStatement) {
injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true")
}

when:
Session session = cluster.connect(keyspace)
Expand All @@ -117,7 +131,7 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
trace(3) {
sortSpansByStart()
basicSpan(it, "parent")
cassandraSpan(it, statement, keyspace, renameService, span(0))
cassandraSpan(it, statement, expectedKeySpace, renameService, span(0))
basicSpan(it, "callbackListener", span(0))
}
}
Expand All @@ -126,12 +140,17 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
session.close()

where:
statement | keyspace | renameService
"DROP KEYSPACE IF EXISTS async_test" | null | false
"CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | true
"CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | false
"INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "async_test" | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "async_test" | true
statement | keyspace | expectedKeySpace | renameService | extractFromStatement
"DROP KEYSPACE IF EXISTS async_test" | null | null | false | false
"DROP KEYSPACE IF EXISTS a_ks" | null | null | false | false
"CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | false
"CREATE KEYSPACE a_ks WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | false
"CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | "async_test" | false | false
"INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "async_test" | "async_test" | false | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "async_test" | "async_test" | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | null | null | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "a_ks" | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | null | "async_test" | true | true
}

String normalize(String statement){
Expand Down
Loading

0 comments on commit 2b72cb5

Please sign in to comment.