diff --git a/.github/workflows/ci-cassandra.yml b/.github/workflows/ci-cassandra.yml index deb80c06591..3f4f21e8ebb 100644 --- a/.github/workflows/ci-cassandra.yml +++ b/.github/workflows/ci-cassandra.yml @@ -9,6 +9,18 @@ on: jobs: cassandra: runs-on: ubuntu-latest + strategy: + matrix: + version: + - distribution: cassandra + major: 3.x + image: 3.11 + schema: v003 + - distribution: cassandra + major: 4.x + image: 4.0 + schema: v004 + name: ${{ matrix.version.distribution }} ${{ matrix.version.major }} steps: - uses: actions/checkout@v2.3.4 @@ -17,4 +29,4 @@ jobs: go-version: ^1.17 - name: Run cassandra integration tests - run: bash scripts/cassandra-integration-test.sh + run: bash scripts/cassandra-integration-test.sh ${{ matrix.version.image }} ${{ matrix.version.schema }} diff --git a/docker-compose/jaeger-docker-compose.yml b/docker-compose/jaeger-docker-compose.yml index 8cf98d03e54..64277fc08a2 100644 --- a/docker-compose/jaeger-docker-compose.yml +++ b/docker-compose/jaeger-docker-compose.yml @@ -36,7 +36,7 @@ services: - jaeger-collector cassandra: - image: cassandra:3.9 + image: cassandra:4.0 cassandra-schema: image: jaegertracing/jaeger-cassandra-schema diff --git a/plugin/storage/cassandra/Dockerfile b/plugin/storage/cassandra/Dockerfile index c8e5de3fea6..7dade5d841f 100644 --- a/plugin/storage/cassandra/Dockerfile +++ b/plugin/storage/cassandra/Dockerfile @@ -1,4 +1,4 @@ -FROM cassandra:3.11 +FROM cassandra:4.0 COPY schema/* /cassandra-schema/ diff --git a/plugin/storage/cassandra/schema/create.sh b/plugin/storage/cassandra/schema/create.sh index eb0d6c5b89e..76fef2ab68e 100755 --- a/plugin/storage/cassandra/schema/create.sh +++ b/plugin/storage/cassandra/schema/create.sh @@ -21,9 +21,30 @@ function usage { trace_ttl=${TRACE_TTL:-172800} dependencies_ttl=${DEPENDENCIES_TTL:-0} +# Extract cassandra version +# +# $ cqlsh -e "show version" +# [cqlsh 5.0.1 | Cassandra 3.11.11 | CQL spec 3.4.4 | Native protocol v4] +# +cas_version=$(cqlsh -e "show version" \ + | awk -F "|" '{print $2}' \ + | awk -F " " '{print $2}' \ + | awk -F "." '{print $1}' \ +) + template=$1 if [[ "$template" == "" ]]; then - template=$(ls $(dirname $0)/*cql.tmpl | sort | tail -1) + case "$cas_version" in + 3) + template=$(dirname $0)/v003.cql.tmpl + ;; + 4) + template=$(dirname $0)/v004.cql.tmpl + ;; + *) + template=$(ls $(dirname $0)/*cql.tmpl | sort | tail -1) + ;; + esac fi if [[ "$MODE" == "" ]]; then diff --git a/plugin/storage/cassandra/schema/v004.cql.tmpl b/plugin/storage/cassandra/schema/v004.cql.tmpl new file mode 100644 index 00000000000..69a7aa8340b --- /dev/null +++ b/plugin/storage/cassandra/schema/v004.cql.tmpl @@ -0,0 +1,197 @@ +-- +-- Creates Cassandra keyspace with tables for traces and dependencies. +-- +-- Required parameters: +-- +-- keyspace +-- name of the keyspace +-- replication +-- replication strategy for the keyspace, such as +-- for prod environments +-- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' } +-- for test environments +-- {'class': 'SimpleStrategy', 'replication_factor': '1'} +-- trace_ttl +-- default time to live for trace data, in seconds +-- dependencies_ttl +-- default time to live for dependencies data, in seconds (0 for no TTL) +-- +-- Non-configurable settings: +-- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/ +-- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html + +CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication}; + +CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue ( + key text, + value_type text, + value_string text, + value_bool boolean, + value_long bigint, + value_double double, + value_binary blob, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.log ( + ts bigint, // microseconds since epoch + fields list>, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref ( + ref_type text, + trace_id blob, + span_id bigint, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.process ( + service_name text, + tags list>, +); + +-- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID. +-- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table". +-- start_time is bigint instead of timestamp as we require microsecond precision +CREATE TABLE IF NOT EXISTS ${keyspace}.traces ( + trace_id blob, + span_id bigint, + span_hash bigint, + parent_id bigint, + operation_name text, + flags int, + start_time bigint, // microseconds since epoch + duration bigint, // microseconds + tags list>, + logs list>, + refs list>, + process frozen, + PRIMARY KEY (trace_id, span_id, span_hash) +) + WITH compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_names ( + service_name text, + PRIMARY KEY (service_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names_v2 ( + service_name text, + span_kind text, + operation_name text, + PRIMARY KEY ((service_name), span_kind, operation_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- index of trace IDs by service + operation names, sorted by span start_time. +CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index ( + service_name text, + operation_name text, + start_time bigint, // microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, operation_name), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index ( + service_name text, + bucket int, + start_time bigint, // microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, bucket), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index ( + service_name text, // service name + operation_name text, // operation name, or blank for queries without span name + bucket timestamp, // time bucket, - the start_time of the given span rounded to an hour + duration bigint, // span duration, in microseconds + start_time bigint, // microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id) +) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- a bucketing strategy may have to be added for tag queries +-- we can make this table even better by adding a timestamp to it +CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index ( + service_name text, + tag_key text, + tag_value text, + start_time bigint, // microseconds since epoch + trace_id blob, + span_id bigint, + PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id) +) + WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TYPE IF NOT EXISTS ${keyspace}.dependency ( + parent text, + child text, + call_count bigint, + source text, +); + +-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data +CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies_v2 ( + ts_bucket timestamp, + ts timestamp, + dependencies list>, + PRIMARY KEY (ts_bucket, ts) +) WITH CLUSTERING ORDER BY (ts DESC) + AND compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = ${dependencies_ttl}; diff --git a/plugin/storage/integration/cassandra_test.go b/plugin/storage/integration/cassandra_test.go index cfba419941d..b3c1f955287 100644 --- a/plugin/storage/integration/cassandra_test.go +++ b/plugin/storage/integration/cassandra_test.go @@ -84,20 +84,6 @@ func (s *CassandraStorageIntegration) initializeCassandra() error { return nil } -func (s *CassandraStorageIntegration) initializeCassandraDependenciesV2() error { - f, err := s.initializeCassandraFactory([]string{ - "--cassandra.keyspace=jaeger_v1_dc1", - "--cassandra.port=9043", - }) - if err != nil { - return err - } - if err = s.initializeDependencyReaderAndWriter(f); err != nil { - return err - } - return nil -} - func (s *CassandraStorageIntegration) initializeDependencyReaderAndWriter(f *cassandra.Factory) error { var ( err error @@ -144,10 +130,7 @@ func TestCassandraStorage(t *testing.T) { t.Skip("Integration test against Cassandra skipped; set STORAGE env var to cassandra to run this") } s1 := newCassandraStorageIntegration() - s2 := newCassandraStorageIntegration() require.NoError(t, s1.initializeCassandra()) - require.NoError(t, s2.initializeCassandraDependenciesV2()) // TODO: Support all other tests. t.Run("GetDependencies", s1.testCassandraGetDependencies) - t.Run("GetDependenciesV2", s2.testCassandraGetDependencies) } diff --git a/scripts/cassandra-integration-test.sh b/scripts/cassandra-integration-test.sh index 4f516bf8cfc..35970a12b8a 100755 --- a/scripts/cassandra-integration-test.sh +++ b/scripts/cassandra-integration-test.sh @@ -1,30 +1,68 @@ #!/bin/bash -set -ex - -# Clean up before starting. -docker rm -f cassandra || true -docker rm -f cassandra2 || true -docker network rm integration_test || true - -# Create a network so that the schema container can communicate with the cassandra containers. -docker network create integration_test - -# Start cassandra containers whose ports are exposed to localhost to facilitate testing. -docker run -d --name cassandra --network integration_test -p 9042:9042 -p 9160:9160 cassandra:3.9 -docker run -d --name cassandra2 --network integration_test -p 9043:9042 -p 9161:9160 cassandra:3.9 - -# Build the schema container and run it rather than using the existing container in Docker Hub since that -# requires this current build to succeed before this test can use it; chicken and egg problem. -docker build -t jaeger-cassandra-schema-integration-test plugin/storage/cassandra/ -docker run --network integration_test -e CQLSH_HOST=cassandra -e TEMPLATE=/cassandra-schema/v001.cql.tmpl jaeger-cassandra-schema-integration-test -docker run --network integration_test -e CQLSH_HOST=cassandra2 -e TEMPLATE=/cassandra-schema/v002.cql.tmpl jaeger-cassandra-schema-integration-test - -# Run the test. -export STORAGE=cassandra -make storage-integration-test - -# Tear down after. -docker rm -f cassandra -docker rm -f cassandra2 -docker network rm integration_test +set -uxf -o pipefail + +usage() { + echo $"Usage: $0 " + exit 1 +} + +check_arg() { + if [ ! $# -eq 2 ]; then + echo "ERROR: need exactly two arguments, " + usage + fi +} + +setup_cassandra() { + local tag=$1 + local image=cassandra + local params=( + --rm + --detach + --publish 9042:9042 + --publish 9160:9160 + ) + local cid=$(docker run ${params[@]} ${image}:${tag}) + echo ${cid} +} + +teardown_cassandra() { + local cid=$1 + docker kill ${cid} + exit ${exit_status} +} + +apply_schema() { + local image=cassandra-schema + local schema_dir=plugin/storage/cassandra/ + local schema_version=$1 + local params=( + --rm + --env CQLSH_HOST=localhost + --env CQLSH_PORT=9042 + --env TEMPLATE=/cassandra-schema/${schema_version}.cql.tmpl + --network host + ) + docker build -t ${image} ${schema_dir} + docker run ${params[@]} ${image} +} + +run_integration_test() { + local version=$1 + local schema_version=$2 + local cid=$(setup_cassandra ${version}) + apply_schema "$2" + STORAGE=cassandra make storage-integration-test + exit_status=$? + trap 'teardown_cassandra ${cid}' EXIT +} + +main() { + check_arg "$@" + + echo "Executing integration test for $1 with schema $2.cql.tmpl" + run_integration_test "$1" "$2" +} + +main "$@" diff --git a/scripts/es-integration-test.sh b/scripts/es-integration-test.sh index 8d17891c2fd..613070e37f2 100755 --- a/scripts/es-integration-test.sh +++ b/scripts/es-integration-test.sh @@ -78,7 +78,7 @@ run_integration_test() { if [ ${distro} = "elasticsearch" ]; then cid=$(setup_es ${version}) elif [ ${distro} == "opensearch" ]; then - cid=$(setup_opensearch ${version}) + cid=$(setup_opensearch ${version}) else echo "Unknown distribution $distro. Valid options are opensearch or elasticsearch" usage