Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public void configure(Binder binder)
{
binder.bind(CassandraConnectorId.class).toInstance(new CassandraConnectorId(connectorId));
binder.bind(CassandraConnector.class).in(Scopes.SINGLETON);
binder.bind(CassandraMetadata.class).in(Scopes.SINGLETON);
binder.bind(CassandraSplitManager.class).in(Scopes.SINGLETON);
binder.bind(CassandraTokenSplitManager.class).in(Scopes.SINGLETON);
binder.bind(CassandraRecordSetProvider.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package com.facebook.presto.cassandra;

import com.facebook.airlift.bootstrap.LifeCycleManager;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorCommitHandle;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
Expand All @@ -26,45 +28,80 @@
import jakarta.inject.Inject;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static com.facebook.presto.spi.connector.EmptyConnectorCommitHandle.INSTANCE;
import static com.facebook.presto.spi.transaction.IsolationLevel.READ_UNCOMMITTED;
import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class CassandraConnector
implements Connector
{
private static final Logger log = Logger.get(CassandraConnector.class);

private final CassandraConnectorId connectorId;
private final LifeCycleManager lifeCycleManager;
private final CassandraMetadata metadata;
private final CassandraPartitionManager partitionManager;
private final CassandraClientConfig config;
private final CassandraSession cassandraSession;
private final CassandraSplitManager splitManager;
private final ConnectorRecordSetProvider recordSetProvider;
private final ConnectorPageSinkProvider pageSinkProvider;
private final List<PropertyMetadata<?>> sessionProperties;
private final JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec;
private final ConcurrentMap<ConnectorTransactionHandle, CassandraMetadata> transactions = new ConcurrentHashMap<>();

@Inject
public CassandraConnector(
CassandraConnectorId connectorId,
LifeCycleManager lifeCycleManager,
CassandraMetadata metadata,
CassandraSplitManager splitManager,
CassandraRecordSetProvider recordSetProvider,
CassandraPageSinkProvider pageSinkProvider,
CassandraSessionProperties sessionProperties)
CassandraSessionProperties sessionProperties,
CassandraSession cassandraSession,
CassandraPartitionManager partitionManager,
JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec,
CassandraClientConfig config)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null");
this.sessionProperties = requireNonNull(sessionProperties.getSessionProperties(), "sessionProperties is null");
this.partitionManager = requireNonNull(partitionManager, "partitionManager is null");
this.cassandraSession = requireNonNull(cassandraSession, "cassandraSession is null");
this.config = requireNonNull(config, "config is null");
this.extraColumnMetadataCodec = requireNonNull(extraColumnMetadataCodec, "extraColumnMetadataCodec is null");
}

@Override
public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly)
{
checkConnectorSupports(READ_UNCOMMITTED, isolationLevel);
return CassandraTransactionHandle.INSTANCE;
CassandraTransactionHandle transaction = new CassandraTransactionHandle();
transactions.put(transaction,
new CassandraMetadata(connectorId, cassandraSession, partitionManager, extraColumnMetadataCodec, config));
return transaction;
}

@Override
public ConnectorCommitHandle commit(ConnectorTransactionHandle transaction)
{
checkArgument(transactions.remove(transaction) != null, "no such transaction: %s", transaction);
return INSTANCE;
}

@Override
public void rollback(ConnectorTransactionHandle transaction)
{
CassandraMetadata metadata = transactions.remove(transaction);
checkArgument(metadata != null, "no such transaction: %s", transaction);
metadata.rollback();
}

@Override
Expand All @@ -74,8 +111,10 @@ public boolean isSingleStatementWritesOnly()
}

@Override
public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle)
public ConnectorMetadata getMetadata(ConnectorTransactionHandle transaction)
{
CassandraMetadata metadata = transactions.get(transaction);
checkArgument(metadata != null, "no such transaction: %s", transaction);
return metadata;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import jakarta.inject.Inject;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static com.facebook.presto.cassandra.CassandraType.toCassandraType;
Expand All @@ -57,6 +57,7 @@
import static com.facebook.presto.spi.StandardErrorCode.PERMISSION_DENIED;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Locale.ROOT;
import static java.util.Objects.requireNonNull;
Expand All @@ -72,8 +73,8 @@ public class CassandraMetadata
private boolean caseSensitiveNameMatchingEnabled;

private final JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec;
private final AtomicReference<Runnable> rollbackAction = new AtomicReference<>();

@Inject
public CassandraMetadata(
CassandraConnectorId connectorId,
CassandraSession cassandraSession,
Expand Down Expand Up @@ -319,6 +320,9 @@ private CassandraOutputTableHandle createTable(ConnectorSession session, Connect

// We need to create the Cassandra table before commit because the record needs to be written to the table.
cassandraSession.execute(queryBuilder.toString());

// set a rollback to delete the created table in case of an abort / failure.
setRollback(schemaName, tableName);
return new CassandraOutputTableHandle(
connectorId,
schemaName,
Expand All @@ -330,6 +334,7 @@ private CassandraOutputTableHandle createTable(ConnectorSession session, Connect
@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
clearRollback();
return Optional.empty();
}

Expand Down Expand Up @@ -365,4 +370,30 @@ public String normalizeIdentifier(ConnectorSession session, String identifier)
{
return caseSensitiveNameMatchingEnabled ? identifier : identifier.toLowerCase(ROOT);
}

public void rollback()
{
Runnable action = rollbackAction.getAndSet(null);
if (action == null) {
return; // nothing to roll back
}

if (!allowDropTable) {
throw new PrestoException(
PERMISSION_DENIED,
"Table creation was aborted and requires rollback, but cleanup failed because DROP TABLE is disabled in this Cassandra catalog.");
}

action.run();
}

private void setRollback(String schemaName, String tableName)
{
checkState(rollbackAction.compareAndSet(null, () -> cassandraSession.execute(String.format("DROP TABLE \"%s\".\"%s\"", schemaName, tableName))), "rollback action is already set");
}

private void clearRollback()
{
rollbackAction.set(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,61 @@
package com.facebook.presto.cassandra;

import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public enum CassandraTransactionHandle
import java.util.Objects;
import java.util.UUID;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public class CassandraTransactionHandle
implements ConnectorTransactionHandle
{
INSTANCE
private final UUID uuid;

public CassandraTransactionHandle()
{
this(UUID.randomUUID());
}

@JsonCreator
public CassandraTransactionHandle(@JsonProperty("uuid") UUID uuid)
{
this.uuid = requireNonNull(uuid, "uuid is null");
}

@JsonProperty
public UUID getUuid()
{
return uuid;
}

@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if ((obj == null) || (getClass() != obj.getClass())) {
return false;
}
CassandraTransactionHandle other = (CassandraTransactionHandle) obj;
return Objects.equals(uuid, other.uuid);
}

@Override
public int hashCode()
{
return Objects.hash(uuid);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("uuid", uuid)
.toString();
}
}
Loading
Loading