-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feat]: Display info and warning in enhanced console when weak TLS versions are used and optionally block connecting via insecure TLS versions #553
Comments
Actually - I think we can extend this to also check the certificates themselves and warn users about not just the TLS version, but also the certificate size, expiry etc.. Again - the goal is not to change the behaviour but just let users be aware that their setup is not secure but they can choose to ignore it if they want. We need to have settings in the security to allow users to disable any of these checks individually if they want. Not just a global setting - each part of this should be individually configurable wth a select all/de-select all options also. We need to incorporate standard best practices for SSL/TLS security checks. This includes checking for:
Code Examplefrom datetime import datetime
from OpenSSL import crypto
import ssl
class CassandraConnectionSecurityChecker:
def __init__(self, session):
"""
Initialize with an existing Cassandra session
:param session: Existing Cassandra session object
"""
self.session = session
self.security_issues = []
def extract_ssl_socket(self):
"""
Extract SSL socket from an existing Cassandra session
"""
try:
# Extract connection object from the session's cluster
connection = self.session.cluster.connection_class.get_connection()
# Check if the connection has an SSL socket
if hasattr(connection, 'ssl_socket'):
return connection.ssl_socket
else:
self.security_issues.append("No SSL socket found in the connection.")
return None
except Exception as e:
self.security_issues.append(f"Error extracting SSL socket: {str(e)}")
return None
def check_certificate_details(self, ssl_socket):
"""
Extract and analyze certificate details from the SSL socket
:param ssl_socket: The SSL socket extracted from the Cassandra connection
"""
try:
# Get peer certificate in binary form (DER format)
der_cert = ssl_socket.getpeercert(binary_form=True)
# Convert DER to OpenSSL X509 object for detailed inspection
x509_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, der_cert)
# Extract certificate details
cert_info = {
'subject': x509_cert.get_subject().CN,
'issuer': x509_cert.get_issuer().CN,
'not_before': datetime.strptime(x509_cert.get_notBefore().decode('ascii'), '%Y%m%d%H%M%SZ'),
'not_after': datetime.strptime(x509_cert.get_notAfter().decode('ascii'), '%Y%m%d%H%M%SZ'),
'is_self_signed': self._is_self_signed(x509_cert),
'key_size': x509_cert.get_pubkey().bits(),
'signature_algorithm': x509_cert.get_signature_algorithm().decode('utf-8')
}
# Check certificate validity period
now = datetime.now()
if now < cert_info['not_before'] or now > cert_info['not_after']:
self.security_issues.append("Certificate is not currently valid.")
# Check key size (minimum 2048 bits for RSA or 256 bits for ECDSA)
if cert_info['key_size'] < 2048:
self.security_issues.append(f"Weak key size detected: {cert_info['key_size']} bits.")
# Check signature algorithm (avoid MD5 and SHA-1)
if "md5" in cert_info['signature_algorithm'].lower() or "sha1" in cert_info['signature_algorithm'].lower():
self.security_issues.append(f"Weak signature algorithm detected: {cert_info['signature_algorithm']}.")
return cert_info
except Exception as e:
self.security_issues.append(f"Error checking certificate details: {str(e)}")
return None
def _is_self_signed(self, x509_cert):
"""
Check if certificate is self-signed by comparing issuer and subject
:param x509_cert: OpenSSL X509 certificate object
:return: True if self-signed, False otherwise
"""
try:
return x509_cert.get_issuer() == x509_cert.get_subject()
except Exception as e:
self.security_issues.append(f"Error checking if certificate is self-signed: {str(e)}")
return False
def check_tls_version(self, ssl_socket):
"""
Check the TLS version of the SSL connection
:param ssl_socket: The SSL socket extracted from the Cassandra connection
"""
try:
tls_version = ssl_socket.version()
# Only TLS 1.2 and TLS 1.3 are considered secure
if tls_version not in ['TLSv1.2', 'TLSv1.3']:
self.security_issues.append(f"Insecure TLS version detected: {tls_version}")
return tls_version
except Exception as e:
self.security_issues.append(f"Error checking TLS version: {str(e)}")
return None
def check_cipher_suite(self, ssl_socket):
"""
Check the cipher suite used in the SSL connection
:param ssl_socket: The SSL socket extracted from the Cassandra connection
"""
try:
cipher_suite = ssl_socket.cipher()
# Extract cipher name and strength details
cipher_name, protocol_version, key_strength = cipher_suite
# Avoid weak ciphers (e.g., NULL, ADH, CBC without AEAD)
weak_ciphers = ['NULL', 'ADH', 'CBC']
if any(weak_cipher in cipher_name for weak_cipher in weak_ciphers):
self.security_issues.append(f"Weak cipher suite detected: {cipher_name}")
return cipher_suite
except Exception as e:
self.security_issues.append(f"Error checking cipher suite: {str(e)}")
return None
def perform_security_check(self):
"""
Perform a comprehensive security check on the Cassandra connection
:return: Dictionary containing results of security checks
"""
ssl_socket = self.extract_ssl_socket()
if ssl_socket:
cert_details = self.check_certificate_details(ssl_socket)
tls_version = self.check_tls_version(ssl_socket)
cipher_suite = self.check_cipher_suite(ssl_socket)
return {
'ssl_socket_found': True,
'certificate_details': cert_details,
'tls_version': tls_version,
'cipher_suite': cipher_suite,
'security_issues': self.security_issues
}
return {
'ssl_socket_found': False,
'security_issues': self.security_issues
}
# Example usage with an existing Cassandra session object
def main():
# Assume you already have a connected Cassandra session object (session)
# For example:
# cluster = Cluster(['your-cassandra-host'], ssl_context=ssl_context)
# session = cluster.connect()
checker = CassandraConnectionSecurityChecker(session) # Pass your existing session here
security_report = checker.perform_security_check()
print("Security Check Report:")
if security_report['ssl_socket_found']:
print("\nCertificate Details:")
cert_details = security_report['certificate_details']
if cert_details:
for key, value in cert_details.items():
print(f"{key}: {value}")
print(f"\nTLS Version Used: {security_report['tls_version']}")
print(f"\nCipher Suite Used: {security_report['cipher_suite']}")
if security_report['security_issues']:
print("\nDetected Security Issues:")
for issue in security_report['security_issues']:
print(f"- {issue}")
if __name__ == "__main__":
main() 1. TLS Version Check:
2. Key Size Validation:
3. Signature Algorithm Validation:
4. Cipher Suite Validation:
5. Certificate Expiration & Self-Signed Detection:
Best Practices Followed:
Citations: |
In CQLSH the TLS versions are user determined. However, there are a lot of clusters out there where the TLS version is not secure and has plenty of vulnerabilities.
We need to let users know if the TLS versions they are using is insecure and warn them like we do when they connect as super user, non-SSL etc.. or block insecure connections.
For example, this code in CQLSH will allow for insecure versions of TLS to be used as it lets the Cassandra Python driver auto-negotiate the highest supported TLS based on what the DB accepts:
https://github.com/apache/cassandra/blob/fd4113d5fef09f6361bea88a35655db2ecb46427/pylib/cqlshlib/sslhandling.py#L51
This is not behaviour we want to change, however we want to display a warning to users if they are using TLS version less than v1.2 when connecting to a cluster.
eg
Info message in enhanced console
Update the info message we display when connected using SSL/TLS to log out the TLS version being used.
Warning message in enhanced console:
WARNING: Your connection to Cassandra is using TLS version %s which is considered insecure. TLS versions prior to 1.2 contain known cryptographic weaknesses that could allow attackers to decrypt or modify traffic between Workbench and Cassandra. To protect your data in transit, configure Cassandra to use TLS 1.2 or higher.
Automatically Disconnect
Additionally, add a system setting to allow users to optionally prevent them connecting using insecure TLS versions. In this scenario we would connect, check the TLS version and then disconnect with an error dialog.
Have this enabled by default.
Message to use:
WARNING: Cassandra connection
<connection name>
in workspace<workspace name>
is using TLS version %s which is considered insecure and has been automatically disconnected.TLS versions prior to 1.2 contain known cryptographic weaknesses that could allow attackers to decrypt or modify traffic between Workbench and Cassandra. To protect your data in transit, configure Cassandra to use TLS 1.2 or higher.
To disable this behaviour in Workbench and allow connections using TLS version %s please update the Workspace security settings here
<insert instructions on how to disable this or ideally deep link to the actual setting dialog for them to change this check>
Just as an FYI:
Use TLS v1.2 or TLS v1.3
Even though TLS V1.3 is available, using TLS v1.2 is still considered good and secure practice by the cryptography community.
The use of TLS v1.2 ensures compatibility with a wide range of platforms and enables seamless communication between different systems that do not yet have TLS v1.3 support.
The only drawback depends on whether the framework used is outdated: its TLS v1.2 settings may enable older and insecure cipher suites that are deprecated as insecure.
On the other hand, TLS v1.3 removes support for older and weaker cryptographic algorithms, eliminates known vulnerabilities from previous TLS versions, and improves performance.
The text was updated successfully, but these errors were encountered: