Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sanitize cql in Apache Camel instrumentation #3717

Merged
merged 2 commits into from
Jul 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,28 @@ import spock.lang.Unroll

class SanitizationTest extends Specification {

@Unroll
def "sanitize cql #originalCql"() {

setup:
def decorator = new DbSpanDecorator("cql", "")
def exchange = Mock(Exchange) {
getIn() >> Mock(Message) {
getHeader("CamelCqlQuery") >> originalCql
}
}
def actualSanitized = decorator.getStatement(exchange, null)

expect:
actualSanitized == sanitizedCql

where:
originalCql | sanitizedCql
"FROM TABLE WHERE FIELD>=-1234" | "FROM TABLE WHERE FIELD>=?"
"SELECT Name, Phone.Number FROM Contact WHERE Address.State = 'NY'" | "SELECT Name, Phone.Number FROM Contact WHERE Address.State = ?"
"FROM col WHERE @Tag='Something'" | "FROM col WHERE @Tag=?"
}

@Unroll
def "sanitize jdbc #originalSql"() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
testLibrary("org.apache.camel:camel-jaxb-starter:$camelversion")
testLibrary("org.apache.camel:camel-undertow:$camelversion")
testLibrary("org.apache.camel:camel-aws:$camelversion")
testLibrary("org.apache.camel:camel-cassandraql:$camelversion")

testImplementation("org.springframework.boot:spring-boot-starter-test:1.5.17.RELEASE")
testImplementation("org.springframework.boot:spring-boot-starter:1.5.17.RELEASE")
Expand All @@ -43,6 +44,7 @@ dependencies {
testImplementation("org.elasticmq:elasticmq-rest-sqs_2.12:1.0.0")

testImplementation("org.testcontainers:localstack:${versions["org.testcontainers"]}")
testImplementation("org.testcontainers:cassandra:${versions["org.testcontainers"]}")

latestDepTestLibrary("org.apache.camel:camel-core:2.+")
latestDepTestLibrary("org.apache.camel:camel-spring-boot-starter:2.+")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,11 @@ public String getOperationName(

// visible for testing
String getStatement(Exchange exchange, Endpoint endpoint) {
// TODO: sanitize cql
switch (component) {
case "cql":
Object cqlObj = exchange.getIn().getHeader("CamelCqlQuery");
if (cqlObj != null) {
return cqlObj.toString();
}
Map<String, String> cqlParameters = toQueryParameters(endpoint.getEndpointUri());
if (cqlParameters.containsKey("cql")) {
return cqlParameters.get("cql");
return SqlStatementSanitizer.sanitize(cqlObj.toString()).getFullStatement();
}
return null;
case "jdbc":
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.apachecamel.decorators

import org.apache.camel.builder.RouteBuilder
import org.springframework.boot.SpringBootConfiguration
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.annotation.Bean

@SpringBootConfiguration
@EnableAutoConfiguration
class CassandraConfig {

@Bean
RouteBuilder serviceRoute() {
return new RouteBuilder() {

@Override
void configure() throws Exception {
from("direct:input")
.setHeader("CamelCqlQuery", simple("select * from test.users where id=1 ALLOW FILTERING"))
.toD("cql://{{cassandra.host}}:{{cassandra.port}}/test")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.apachecamel.decorators

import com.datastax.driver.core.Cluster
import com.datastax.driver.core.Session
import org.apache.camel.CamelContext
import org.apache.camel.ProducerTemplate
import org.testcontainers.containers.CassandraContainer
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.RetryOnAddressAlreadyInUseTrait
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.springframework.boot.SpringApplication
import org.springframework.context.ConfigurableApplicationContext
import org.testcontainers.containers.GenericContainer
import spock.lang.Shared

import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL

class CassandraTest extends AgentInstrumentationSpecification implements RetryOnAddressAlreadyInUseTrait {
trask marked this conversation as resolved.
Show resolved Hide resolved

@Shared
ConfigurableApplicationContext server
@Shared
GenericContainer cassandra
@Shared
Cluster cluster
@Shared
String host
@Shared
int port

Session session

def setupSpec() {
withRetryOnAddressAlreadyInUse({
setupSpecUnderRetry()
})
}

def setupSpecUnderRetry() {
cassandra = new CassandraContainer()
cassandra.withExposedPorts(9042)
cassandra.start()

port = cassandra.getFirstMappedPort()
host = cassandra.getHost()

cluster = cassandra.getCluster()

def app = new SpringApplication(CassandraConfig)
app.setDefaultProperties(["cassandra.host": host, "cassandra.port": port])
server = app.run()
}

def cleanupSpec() {
server?.close()
cluster?.close()
cassandra.stop()
}

def setup() {
session = cluster.connect()

session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':1};")
session.execute("CREATE TABLE IF NOT EXISTS test.users ( id int primary key, name text );")
session.execute("INSERT INTO test.users (id,name) VALUES (1, 'user1') IF NOT EXISTS;")
session.execute("INSERT INTO test.users (id,name) VALUES (2, 'user2') IF NOT EXISTS;")
}

def cleanup() {
session?.close()
}

def "test cassandra "() {

setup:
def camelContext = server.getBean(CamelContext)
ProducerTemplate template = camelContext.createProducerTemplate()

when:
def response = template.requestBody("direct:input", null)

then:
response.first().getString("name") == "user1"

assertTraces(1) {
trace(0, 2) {
span(0) {
kind INTERNAL
hasNoParent()
attributes {
"apache-camel.uri" "direct://input"
}
}
span(1){
kind CLIENT
attributes {
"apache-camel.uri" "cql://$host:$port/test"
"$SemanticAttributes.DB_NAME.key" "test"
"$SemanticAttributes.DB_STATEMENT.key" "select * from test.users where id=? ALLOW FILTERING"
"$SemanticAttributes.DB_SYSTEM.key" "cassandra"
}
}
}
}

}

}