Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions docs/src/main/sphinx/connector/cassandra.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ Property Name Description
This is a global setting used for all connections, regardless
of the user connected to Trino.

``cassandra.protocol-version`` It is possible to override the protocol version for older Cassandra clusters.
By default, the values from the highest protocol version the driver can use.
Possible values include ``V2``, ``V3`` and ``V4``.
``cassandra.protocol-version`` It is possible to override the protocol version for older Cassandra
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mosabua FYI

clusters.
By default, the value corresponds to the default protocol version
used in the underlying Cassandra java driver.
Possible values include ``V3``, ``V4``, ``V5``, ``V6``.
================================================== ======================================================================

.. note::
Expand Down
43 changes: 30 additions & 13 deletions plugin/trino-cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.casandra.version>4.14.0</dep.casandra.version>
<dep.native-protocol.version>1.5.1</dep.native-protocol.version>
</properties>

<dependencies>
Expand All @@ -22,11 +24,6 @@
<artifactId>trino-plugin-toolkit</artifactId>
</dependency>

<dependency>
<groupId>io.trino.cassandra</groupId>
<artifactId>cassandra-driver</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>bootstrap</artifactId>
Expand All @@ -49,12 +46,37 @@

<dependency>
<groupId>io.airlift</groupId>
<artifactId>security</artifactId>
<artifactId>units</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>${dep.casandra.version}</version>
<exclusions>
<exclusion>
<groupId>org.ow2.asm</groupId>
<artifactId>asm-analysis</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>${dep.casandra.version}</version>
<exclusions>
<exclusion>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>native-protocol</artifactId>
<version>${dep.native-protocol.version}</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -93,11 +115,6 @@
<artifactId>validation-api</artifactId>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>

<dependency>
<groupId>org.weakref</groupId>
<artifactId>jmxutils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,63 +13,105 @@
*/
package io.trino.plugin.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.WriteType;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.policies.DefaultRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.retry.RetryDecision;
import com.datastax.oss.driver.api.core.retry.RetryPolicy;
import com.datastax.oss.driver.api.core.servererrors.CoordinatorException;
import com.datastax.oss.driver.api.core.servererrors.DefaultWriteType;
import com.datastax.oss.driver.api.core.servererrors.WriteType;
import com.datastax.oss.driver.api.core.session.Request;
import io.airlift.log.Logger;

import java.util.concurrent.ThreadLocalRandom;

public class BackoffRetryPolicy
implements RetryPolicy
{
public static final BackoffRetryPolicy INSTANCE = new BackoffRetryPolicy();
private static final Logger log = Logger.get(BackoffRetryPolicy.class);

private BackoffRetryPolicy() {}
private final String logPrefix;

public BackoffRetryPolicy(DriverContext context, String profileName)
{
this.logPrefix = (context != null ? context.getSessionName() : null) + "|" + profileName;
}

@Override
public RetryDecision onReadTimeout(Request request, ConsistencyLevel consistencyLevel, int blockFor, int received, boolean dataPresent, int retryCount)
{
RetryDecision decision =
(retryCount == 0 && received >= blockFor && !dataPresent)
? RetryDecision.RETRY_SAME
: RetryDecision.RETHROW;

if (decision == RetryDecision.RETRY_SAME) {
log.debug(
"[%s] Retrying on read timeout on same host (consistency: %s, required responses: %s, received responses: %s, data retrieved: %s, retries: %s)",
logPrefix,
consistencyLevel,
blockFor,
received,
false,
retryCount);
}

return decision;
}

@Override
public RetryDecision onWriteTimeout(Request request, ConsistencyLevel consistencyLevel, WriteType writeType, int blockFor, int received, int retryCount)
{
RetryDecision decision =
(retryCount == 0 && writeType == DefaultWriteType.BATCH_LOG)
? RetryDecision.RETRY_SAME
: RetryDecision.RETHROW;

if (decision == RetryDecision.RETRY_SAME && log.isDebugEnabled()) {
log.debug(
"[%s] Retrying on write timeout on same host (consistency: %s, write type: %s, required acknowledgments: %s, received acknowledgments: %s, retries: %s)",
logPrefix,
consistencyLevel,
writeType,
blockFor,
received,
retryCount);
}
return decision;
}

@Override
public RetryDecision onUnavailable(Statement statement, ConsistencyLevel consistencyLevel, int requiredReplica, int aliveReplica, int retries)
public RetryDecision onUnavailable(Request request, ConsistencyLevel consistencyLevel, int required, int alive, int retries)
{
if (retries >= 10) {
return RetryDecision.rethrow();
return RetryDecision.RETHROW;
}

try {
int jitter = ThreadLocalRandom.current().nextInt(100);
int delay = (100 * (retries + 1)) + jitter;
Thread.sleep(delay);
return RetryDecision.retry(consistencyLevel);
return RetryDecision.RETRY_SAME;
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return RetryDecision.rethrow();
return RetryDecision.RETHROW;
}
}

@Override
public RetryDecision onReadTimeout(Statement statement, ConsistencyLevel cl, int requiredResponses, int receivedResponses, boolean dataRetrieved, int nbRetry)
public RetryDecision onRequestAborted(Request request, Throwable error, int retryCount)
{
return DefaultRetryPolicy.INSTANCE.onReadTimeout(statement, cl, requiredResponses, receivedResponses, dataRetrieved, nbRetry);
return RetryDecision.RETHROW;
}

@Override
public RetryDecision onWriteTimeout(Statement statement, ConsistencyLevel cl, WriteType writeType, int requiredAcks, int receivedAcks, int nbRetry)
public RetryDecision onErrorResponse(Request request, CoordinatorException error, int retryCount)
{
return DefaultRetryPolicy.INSTANCE.onWriteTimeout(statement, cl, writeType, requiredAcks, receivedAcks, nbRetry);
log.debug(error, "[%s] Retrying on node error on next host (retries: %s)", logPrefix, retryCount);
return RetryDecision.RETRY_NEXT;
}

@Override
public RetryDecision onRequestError(Statement statement, ConsistencyLevel cl, DriverException e, int nbRetry)
{
return RetryDecision.tryNextHost(cl);
}

@Override
public void init(Cluster cluster) {}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
*/
package io.trino.plugin.cassandra;

import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.SocketOptions;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.configuration.Config;
Expand Down Expand Up @@ -58,8 +59,8 @@ public class CassandraClientConfig
private boolean allowDropTable;
private String username;
private String password;
private Duration clientReadTimeout = new Duration(SocketOptions.DEFAULT_READ_TIMEOUT_MILLIS, MILLISECONDS);
private Duration clientConnectTimeout = new Duration(SocketOptions.DEFAULT_CONNECT_TIMEOUT_MILLIS, MILLISECONDS);
private Duration clientReadTimeout = new Duration(12_000, MILLISECONDS);
private Duration clientConnectTimeout = new Duration(5_000, MILLISECONDS);
private Integer clientSoLinger;
private RetryPolicyType retryPolicy = RetryPolicyType.DEFAULT;
Copy link
Copy Markdown
Member

@ebyhr ebyhr Jan 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use RetryPolicyType.BACKOFF as default.

private boolean useDCAware;
Expand Down Expand Up @@ -119,7 +120,7 @@ public ConsistencyLevel getConsistencyLevel()
}

@Config("cassandra.consistency-level")
public CassandraClientConfig setConsistencyLevel(ConsistencyLevel level)
public CassandraClientConfig setConsistencyLevel(DefaultConsistencyLevel level)
{
this.consistencyLevel = level;
return this;
Expand Down Expand Up @@ -411,7 +412,7 @@ public ProtocolVersion getProtocolVersion()
}

@Config("cassandra.protocol-version")
public CassandraClientConfig setProtocolVersion(ProtocolVersion version)
public CassandraClientConfig setProtocolVersion(DefaultProtocolVersion version)
{
this.protocolVersion = version;
return this;
Expand Down
Loading