Skip to content

Commit

Permalink
Add cassandra v4 support (#3225)
Browse files Browse the repository at this point in the history
* Add schema for cassandra v4.0.

We don't need to add v003tov004.sh because the change (removal of dclocal_read_repair_chance config) is in table metadata which gets handled seemlessly within cassandra upgrade when users upgrade their cassandra cluster from v3.11 to v4.0. Refactor cassandra integration tests to run on both cassandra v3.11 and v4.0.

Signed-off-by: Ashmita Bohara <[email protected]>

* Aligning nomenclature of cassandra integration test with elasticsearch one

Signed-off-by: Ashmita Bohara <[email protected]>

* Feedbacks

Signed-off-by: Ashmita Bohara <[email protected]>

* Feedbacks

Signed-off-by: Ashmita Bohara <[email protected]>

* Feedbacks

Signed-off-by: Ashmita Bohara <[email protected]>

* Apply v3 and v4 based on cassandra version

Signed-off-by: Ashmita Bohara <[email protected]>

* Add comment

Signed-off-by: Ashmita Bohara <[email protected]>
  • Loading branch information
Ashmita152 authored Aug 29, 2021
1 parent 0055bd8 commit fb4be5d
Show file tree
Hide file tree
Showing 8 changed files with 301 additions and 50 deletions.
14 changes: 13 additions & 1 deletion .github/workflows/ci-cassandra.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ on:
jobs:
cassandra:
runs-on: ubuntu-latest
strategy:
matrix:
version:
- distribution: cassandra
major: 3.x
image: 3.11
schema: v003
- distribution: cassandra
major: 4.x
image: 4.0
schema: v004
name: ${{ matrix.version.distribution }} ${{ matrix.version.major }}
steps:
- uses: actions/[email protected]

Expand All @@ -17,4 +29,4 @@ jobs:
go-version: ^1.17

- name: Run cassandra integration tests
run: bash scripts/cassandra-integration-test.sh
run: bash scripts/cassandra-integration-test.sh ${{ matrix.version.image }} ${{ matrix.version.schema }}
2 changes: 1 addition & 1 deletion docker-compose/jaeger-docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ services:
- jaeger-collector

cassandra:
image: cassandra:3.9
image: cassandra:4.0

cassandra-schema:
image: jaegertracing/jaeger-cassandra-schema
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/cassandra/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM cassandra:3.11
FROM cassandra:4.0

COPY schema/* /cassandra-schema/

Expand Down
23 changes: 22 additions & 1 deletion plugin/storage/cassandra/schema/create.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,30 @@ function usage {
trace_ttl=${TRACE_TTL:-172800}
dependencies_ttl=${DEPENDENCIES_TTL:-0}

# Extract cassandra version
#
# $ cqlsh -e "show version"
# [cqlsh 5.0.1 | Cassandra 3.11.11 | CQL spec 3.4.4 | Native protocol v4]
#
cas_version=$(cqlsh -e "show version" \
| awk -F "|" '{print $2}' \
| awk -F " " '{print $2}' \
| awk -F "." '{print $1}' \
)

template=$1
if [[ "$template" == "" ]]; then
template=$(ls $(dirname $0)/*cql.tmpl | sort | tail -1)
case "$cas_version" in
3)
template=$(dirname $0)/v003.cql.tmpl
;;
4)
template=$(dirname $0)/v004.cql.tmpl
;;
*)
template=$(ls $(dirname $0)/*cql.tmpl | sort | tail -1)
;;
esac
fi

if [[ "$MODE" == "" ]]; then
Expand Down
197 changes: 197 additions & 0 deletions plugin/storage/cassandra/schema/v004.cql.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
--
-- Creates Cassandra keyspace with tables for traces and dependencies.
--
-- Required parameters:
--
-- keyspace
-- name of the keyspace
-- replication
-- replication strategy for the keyspace, such as
-- for prod environments
-- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' }
-- for test environments
-- {'class': 'SimpleStrategy', 'replication_factor': '1'}
-- trace_ttl
-- default time to live for trace data, in seconds
-- dependencies_ttl
-- default time to live for dependencies data, in seconds (0 for no TTL)
--
-- Non-configurable settings:
-- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/
-- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html

CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication};

CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue (
key text,
value_type text,
value_string text,
value_bool boolean,
value_long bigint,
value_double double,
value_binary blob,
);

CREATE TYPE IF NOT EXISTS ${keyspace}.log (
ts bigint, // microseconds since epoch
fields list<frozen<keyvalue>>,
);

CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref (
ref_type text,
trace_id blob,
span_id bigint,
);

CREATE TYPE IF NOT EXISTS ${keyspace}.process (
service_name text,
tags list<frozen<keyvalue>>,
);

-- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID.
-- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table".
-- start_time is bigint instead of timestamp as we require microsecond precision
CREATE TABLE IF NOT EXISTS ${keyspace}.traces (
trace_id blob,
span_id bigint,
span_hash bigint,
parent_id bigint,
operation_name text,
flags int,
start_time bigint, // microseconds since epoch
duration bigint, // microseconds
tags list<frozen<keyvalue>>,
logs list<frozen<log>>,
refs list<frozen<span_ref>>,
process frozen<process>,
PRIMARY KEY (trace_id, span_id, span_hash)
)
WITH compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TABLE IF NOT EXISTS ${keyspace}.service_names (
service_name text,
PRIMARY KEY (service_name)
)
WITH compaction = {
'min_threshold': '4',
'max_threshold': '32',
'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
}
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names_v2 (
service_name text,
span_kind text,
operation_name text,
PRIMARY KEY ((service_name), span_kind, operation_name)
)
WITH compaction = {
'min_threshold': '4',
'max_threshold': '32',
'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
}
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

-- index of trace IDs by service + operation names, sorted by span start_time.
CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index (
service_name text,
operation_name text,
start_time bigint, // microseconds since epoch
trace_id blob,
PRIMARY KEY ((service_name, operation_name), start_time)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index (
service_name text,
bucket int,
start_time bigint, // microseconds since epoch
trace_id blob,
PRIMARY KEY ((service_name, bucket), start_time)
) WITH CLUSTERING ORDER BY (start_time DESC)
AND compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index (
service_name text, // service name
operation_name text, // operation name, or blank for queries without span name
bucket timestamp, // time bucket, - the start_time of the given span rounded to an hour
duration bigint, // span duration, in microseconds
start_time bigint, // microseconds since epoch
trace_id blob,
PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id)
) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC)
AND compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

-- a bucketing strategy may have to be added for tag queries
-- we can make this table even better by adding a timestamp to it
CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index (
service_name text,
tag_key text,
tag_value text,
start_time bigint, // microseconds since epoch
trace_id blob,
span_id bigint,
PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id)
)
WITH CLUSTERING ORDER BY (start_time DESC)
AND compaction = {
'compaction_window_size': '1',
'compaction_window_unit': 'HOURS',
'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy'
}
AND default_time_to_live = ${trace_ttl}
AND speculative_retry = 'NONE'
AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes

CREATE TYPE IF NOT EXISTS ${keyspace}.dependency (
parent text,
child text,
call_count bigint,
source text,
);

-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data
CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies_v2 (
ts_bucket timestamp,
ts timestamp,
dependencies list<frozen<dependency>>,
PRIMARY KEY (ts_bucket, ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {
'min_threshold': '4',
'max_threshold': '32',
'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
}
AND default_time_to_live = ${dependencies_ttl};
17 changes: 0 additions & 17 deletions plugin/storage/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,6 @@ func (s *CassandraStorageIntegration) initializeCassandra() error {
return nil
}

func (s *CassandraStorageIntegration) initializeCassandraDependenciesV2() error {
f, err := s.initializeCassandraFactory([]string{
"--cassandra.keyspace=jaeger_v1_dc1",
"--cassandra.port=9043",
})
if err != nil {
return err
}
if err = s.initializeDependencyReaderAndWriter(f); err != nil {
return err
}
return nil
}

func (s *CassandraStorageIntegration) initializeDependencyReaderAndWriter(f *cassandra.Factory) error {
var (
err error
Expand Down Expand Up @@ -144,10 +130,7 @@ func TestCassandraStorage(t *testing.T) {
t.Skip("Integration test against Cassandra skipped; set STORAGE env var to cassandra to run this")
}
s1 := newCassandraStorageIntegration()
s2 := newCassandraStorageIntegration()
require.NoError(t, s1.initializeCassandra())
require.NoError(t, s2.initializeCassandraDependenciesV2())
// TODO: Support all other tests.
t.Run("GetDependencies", s1.testCassandraGetDependencies)
t.Run("GetDependenciesV2", s2.testCassandraGetDependencies)
}
94 changes: 66 additions & 28 deletions scripts/cassandra-integration-test.sh
Original file line number Diff line number Diff line change
@@ -1,30 +1,68 @@
#!/bin/bash

set -ex

# Clean up before starting.
docker rm -f cassandra || true
docker rm -f cassandra2 || true
docker network rm integration_test || true

# Create a network so that the schema container can communicate with the cassandra containers.
docker network create integration_test

# Start cassandra containers whose ports are exposed to localhost to facilitate testing.
docker run -d --name cassandra --network integration_test -p 9042:9042 -p 9160:9160 cassandra:3.9
docker run -d --name cassandra2 --network integration_test -p 9043:9042 -p 9161:9160 cassandra:3.9

# Build the schema container and run it rather than using the existing container in Docker Hub since that
# requires this current build to succeed before this test can use it; chicken and egg problem.
docker build -t jaeger-cassandra-schema-integration-test plugin/storage/cassandra/
docker run --network integration_test -e CQLSH_HOST=cassandra -e TEMPLATE=/cassandra-schema/v001.cql.tmpl jaeger-cassandra-schema-integration-test
docker run --network integration_test -e CQLSH_HOST=cassandra2 -e TEMPLATE=/cassandra-schema/v002.cql.tmpl jaeger-cassandra-schema-integration-test

# Run the test.
export STORAGE=cassandra
make storage-integration-test

# Tear down after.
docker rm -f cassandra
docker rm -f cassandra2
docker network rm integration_test
set -uxf -o pipefail

usage() {
echo $"Usage: $0 <cassandra_version> <schema_version>"
exit 1
}

check_arg() {
if [ ! $# -eq 2 ]; then
echo "ERROR: need exactly two arguments, <cassandra_version> <schema_version>"
usage
fi
}

setup_cassandra() {
local tag=$1
local image=cassandra
local params=(
--rm
--detach
--publish 9042:9042
--publish 9160:9160
)
local cid=$(docker run ${params[@]} ${image}:${tag})
echo ${cid}
}

teardown_cassandra() {
local cid=$1
docker kill ${cid}
exit ${exit_status}
}

apply_schema() {
local image=cassandra-schema
local schema_dir=plugin/storage/cassandra/
local schema_version=$1
local params=(
--rm
--env CQLSH_HOST=localhost
--env CQLSH_PORT=9042
--env TEMPLATE=/cassandra-schema/${schema_version}.cql.tmpl
--network host
)
docker build -t ${image} ${schema_dir}
docker run ${params[@]} ${image}
}

run_integration_test() {
local version=$1
local schema_version=$2
local cid=$(setup_cassandra ${version})
apply_schema "$2"
STORAGE=cassandra make storage-integration-test
exit_status=$?
trap 'teardown_cassandra ${cid}' EXIT
}

main() {
check_arg "$@"

echo "Executing integration test for $1 with schema $2.cql.tmpl"
run_integration_test "$1" "$2"
}

main "$@"
Loading

0 comments on commit fb4be5d

Please sign in to comment.