Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra: allow extracting keyspace from statement result #8239

Merged
merged 3 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,7 @@ public String getDbType() {
public AgentSpan onConnection(final AgentSpan span, final CONNECTION connection) {
if (connection != null) {
span.setTag(Tags.DB_USER, dbUser(connection));
final String instanceName = dbInstance(connection);
span.setTag(Tags.DB_INSTANCE, instanceName);

String serviceName = dbClientService(instanceName);
if (null != serviceName) {
span.setServiceName(serviceName);
}

onInstance(span, dbInstance(connection));
CharSequence hostName = dbHostname(connection);
if (hostName != null) {
span.setTag(Tags.PEER_HOSTNAME, hostName);
Expand All @@ -90,6 +83,17 @@ public AgentSpan onConnection(final AgentSpan span, final CONNECTION connection)
return span;
}

protected AgentSpan onInstance(final AgentSpan span, final String dbInstance) {
if (dbInstance != null) {
span.setTag(Tags.DB_INSTANCE, dbInstance);
String serviceName = dbClientService(dbInstance);
if (null != serviceName) {
span.setServiceName(serviceName);
}
}
return span;
}

public String dbService(final String dbType, final String instanceName) {
if (instanceName != null && Config.get().isDbClientSplitByInstance()) {
return dbClientService(instanceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ class DatabaseClientDecoratorTest extends ClientDecoratorTest {
then:
if (session) {
1 * span.setTag(Tags.DB_USER, session.user)
1 * span.setTag(Tags.DB_INSTANCE, session.instance)
if (session.instance != null) {
1 * span.setTag(Tags.DB_INSTANCE, session.instance)
}
if (session.hostname != null) {
1 * span.setTag(Tags.PEER_HOSTNAME, session.hostname)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
import static datadog.trace.api.config.TraceInstrumentationConfig.CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED
import static datadog.trace.api.config.TraceInstrumentationConfig.DB_CLIENT_HOST_SPLIT_BY_INSTANCE

import com.datastax.driver.core.Cluster
Expand Down Expand Up @@ -57,8 +58,11 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
def "test sync"() {
setup:

Session session = cluster.connect(keyspace)
Session session = keyspace ? cluster.connect(keyspace) : cluster.connect()
injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService")
if (extractFromStatement) {
injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true")
}

when:
session.execute(statement)
Expand All @@ -71,27 +75,37 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
}
}
trace(1) {
cassandraSpan(it, statement, keyspace, renameService)
cassandraSpan(it, statement, expectedKeySpace, renameService)
}
}

cleanup:
session.close()

where:
statement | keyspace | renameService
"DROP KEYSPACE IF EXISTS sync_test" | null | false
"CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | true
"CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | false
"INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "sync_test" | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "sync_test" | true
statement | keyspace | expectedKeySpace | renameService | extractFromStatement
"DROP KEYSPACE IF EXISTS sync_test" | null | null | false | true
"DROP KEYSPACE IF EXISTS a_ks" | null | null | false | true
"CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | true
"CREATE KEYSPACE a_ks WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | false | true
"CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | "sync_test" | false | true
"INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "sync_test" | "sync_test" | false | true
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "sync_test" | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "sync_test" | false | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | "sync_test" | false | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | null | false | false
}

def "test async"() {
setup:

def callbackExecuted = new CountDownLatch(1)
injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService")
if (extractFromStatement) {
injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true")
}

when:
Session session = cluster.connect(keyspace)
Expand All @@ -117,7 +131,7 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
trace(3) {
sortSpansByStart()
basicSpan(it, "parent")
cassandraSpan(it, statement, keyspace, renameService, span(0))
cassandraSpan(it, statement, expectedKeySpace, renameService, span(0))
basicSpan(it, "callbackListener", span(0))
}
}
Expand All @@ -126,12 +140,17 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
session.close()

where:
statement | keyspace | renameService
"DROP KEYSPACE IF EXISTS async_test" | null | false
"CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | true
"CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | false
"INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "async_test" | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "async_test" | true
statement | keyspace | expectedKeySpace | renameService | extractFromStatement
"DROP KEYSPACE IF EXISTS async_test" | null | null | false | false
"DROP KEYSPACE IF EXISTS a_ks" | null | null | false | false
"CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | false
"CREATE KEYSPACE a_ks WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | false
"CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | "async_test" | false | false
"INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "async_test" | "async_test" | false | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "async_test" | "async_test" | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | null | null | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "a_ks" | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | null | "async_test" | true | true
}

String normalize(String statement){
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package datadog.trace.instrumentation.datastax.cassandra;

import static datadog.trace.bootstrap.instrumentation.api.Tags.DB_INSTANCE;

import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import datadog.trace.api.Config;
import datadog.trace.api.cache.DDCache;
import datadog.trace.api.cache.DDCaches;
import datadog.trace.api.naming.SpanNaming;
Expand All @@ -11,6 +15,7 @@
import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.bootstrap.instrumentation.decorator.DBTypeProcessingDatabaseClientDecorator;
import datadog.trace.util.Strings;
import java.util.function.ToIntFunction;

public class CassandraClientDecorator extends DBTypeProcessingDatabaseClientDecorator<Session> {
Expand Down Expand Up @@ -82,6 +87,18 @@ public AgentSpan onResponse(final AgentSpan span, final ResultSet result) {
if (result != null) {
final Host host = result.getExecutionInfo().getQueriedHost();
onPeerConnection(span, host.getSocketAddress());
try {
if (Config.get().isCassandraKeyspaceStatementExtractionEnabled()) {
final ColumnDefinitions defs = result.getColumnDefinitions();
if (defs != null && defs.size() > 0) {
final String keySpace = defs.getKeyspace(0);
if (Strings.isNotBlank(keySpace) && !keySpace.equals(span.getTag(DB_INSTANCE))) {
onInstance(span, keySpace);
}
}
}
} catch (final Throwable ignored) {
}
}
return span;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
import static datadog.trace.api.config.TraceInstrumentationConfig.CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED
import static datadog.trace.api.config.TraceInstrumentationConfig.DB_CLIENT_HOST_SPLIT_BY_INSTANCE

import com.datastax.driver.core.Cluster
Expand Down Expand Up @@ -57,8 +58,11 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
def "test sync"() {
setup:

Session session = cluster.connect(keyspace)
Session session = keyspace ? cluster.connect(keyspace) : cluster.connect()
injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService")
if (extractFromStatement) {
injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true")
}

when:
session.execute(statement)
Expand All @@ -71,27 +75,37 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
}
}
trace(1) {
cassandraSpan(it, statement, keyspace, renameService)
cassandraSpan(it, statement, expectedKeySpace, renameService)
}
}

cleanup:
session.close()

where:
statement | keyspace | renameService
"DROP KEYSPACE IF EXISTS sync_test" | null | false
"CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | true
"CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | false
"INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "sync_test" | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "sync_test" | true
statement | keyspace | expectedKeySpace | renameService | extractFromStatement
"DROP KEYSPACE IF EXISTS sync_test" | null | null | false | true
"DROP KEYSPACE IF EXISTS a_ks" | null | null | false | true
"CREATE KEYSPACE sync_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | true
"CREATE KEYSPACE a_ks WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | false | true
"CREATE TABLE sync_test.users ( id UUID PRIMARY KEY, name text )" | "sync_test" | "sync_test" | false | true
"INSERT INTO sync_test.users (id, name) values (uuid(), 'alice')" | "sync_test" | "sync_test" | false | true
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "sync_test" | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "sync_test" | false | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | "sync_test" | false | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | "sync_test" | true | true
"SELECT * FROM sync_test.users where name = 'alice' ALLOW FILTERING" | null | null | false | false
}

def "test async"() {
setup:

def callbackExecuted = new CountDownLatch(1)
injectSysConfig(DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "$renameService")
if (extractFromStatement) {
injectSysConfig(CASSANDRA_KEYSPACE_STATEMENT_EXTRACTION_ENABLED, "true")
}

when:
Session session = cluster.connect(keyspace)
Expand All @@ -117,7 +131,7 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
trace(3) {
sortSpansByStart()
basicSpan(it, "parent")
cassandraSpan(it, statement, keyspace, renameService, span(0))
cassandraSpan(it, statement, expectedKeySpace, renameService, span(0))
basicSpan(it, "callbackListener", span(0))
}
}
Expand All @@ -126,12 +140,17 @@ abstract class CassandraClientTest extends VersionedNamingTestBase {
session.close()

where:
statement | keyspace | renameService
"DROP KEYSPACE IF EXISTS async_test" | null | false
"CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | true
"CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | false
"INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "async_test" | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "async_test" | true
statement | keyspace | expectedKeySpace | renameService | extractFromStatement
"DROP KEYSPACE IF EXISTS async_test" | null | null | false | false
"DROP KEYSPACE IF EXISTS a_ks" | null | null | false | false
"CREATE KEYSPACE async_test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | false
"CREATE KEYSPACE a_ks WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3}" | null | null | true | false
"CREATE TABLE async_test.users ( id UUID PRIMARY KEY, name text )" | "async_test" | "async_test" | false | false
"INSERT INTO async_test.users (id, name) values (uuid(), 'alice')" | "async_test" | "async_test" | false | false
"SELECT * FROM users where name = 'alice' ALLOW FILTERING" | "async_test" | "async_test" | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | null | null | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | "a_ks" | "a_ks" | false | false
"SELECT * FROM async_test.users where name = 'alice' ALLOW FILTERING" | null | "async_test" | true | true
}

String normalize(String statement){
Expand Down
Loading
Loading