Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ public void commitTransaction(HiveIdentity identity, long transactionId)
delegate.commitTransaction(identity, transactionId);
}

public void abortTransaction(HiveIdentity identity, long transactionId)
{
delegate.abortTransaction(identity, transactionId);
}

public void sendTransactionHeartbeat(HiveIdentity identity, long transactionId)
{
delegate.sendTransactionHeartbeat(identity, transactionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ default void commitTransaction(HiveIdentity identity, long transactionId)
throw new UnsupportedOperationException();
}

default void abortTransaction(HiveIdentity identity, long transactionId)
{
throw new UnsupportedOperationException();
}

default void sendTransactionHeartbeat(HiveIdentity identity, long transactionId)
{
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1375,15 +1375,37 @@ public synchronized void cleanupQuery(ConnectorSession session)
return;
}

commit();
try {
commit();
}
catch (Throwable commitFailure) {
try {
postCommitCleanup(identity, transaction, false);
}
catch (Throwable cleanupFailure) {
if (cleanupFailure != commitFailure) {
commitFailure.addSuppressed(cleanupFailure);
}
}
throw commitFailure;
}
postCommitCleanup(identity, transaction, true);
}

private void postCommitCleanup(HiveIdentity identity, Optional<HiveTransaction> transaction, boolean commit)
{
clearCurrentTransaction();
long transactionId = transaction.get().getTransactionId();
ScheduledFuture<?> heartbeatTask = transaction.get().getHeartbeatTask();
heartbeatTask.cancel(true);

// Any failure around aborted transactions, etc would be handled by Hive Metastore commit and TrinoException will be thrown
delegate.commitTransaction(identity, transactionId);
if (commit) {
// Any failure around aborted transactions, etc would be handled by Hive Metastore commit and TrinoException will be thrown
delegate.commitTransaction(identity, transactionId);
}
else {
delegate.abortTransaction(identity, transactionId);
}
}

@GuardedBy("this")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,12 @@ public void commitTransaction(HiveIdentity identity, long transactionId)
delegate.commitTransaction(identity, transactionId);
}

@Override
public void abortTransaction(HiveIdentity identity, long transactionId)
{
delegate.abortTransaction(identity, transactionId);
}

@Override
public void sendTransactionHeartbeat(HiveIdentity identity, long transactionId)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,12 @@ public void commitTransaction(HiveIdentity identity, long transactionId)
delegate.commitTransaction(identity, transactionId);
}

@Override
public void abortTransaction(HiveIdentity identity, long transactionId)
{
delegate.abortTransaction(identity, transactionId);
}

@Override
public void sendTransactionHeartbeat(HiveIdentity identity, long transactionId)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,27 @@ public void commitTransaction(HiveIdentity identity, long transactionId)
}
}

@Override
public void abortTransaction(HiveIdentity identity, long transactionId)
{
try {
retry()
.stopOnIllegalExceptions()
.run("abortTransaction", stats.getAbortTransaction().wrap(() -> {
try (ThriftMetastoreClient metastoreClient = createMetastoreClient(identity)) {
metastoreClient.abortTransaction(transactionId);
}
return null;
}));
}
catch (TException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, e);
}
catch (Exception e) {
throw propagate(e);
}
}

@Override
public void sendTransactionHeartbeat(HiveIdentity identity, long transactionId)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ default void commitTransaction(HiveIdentity identity, long transactionId)
throw new UnsupportedOperationException();
}

default void abortTransaction(HiveIdentity identity, long transactionId)
{
throw new UnsupportedOperationException();
}

default void sendTransactionHeartbeat(HiveIdentity identity, long transactionId)
{
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class ThriftMetastoreStats
private final ThriftMetastoreApiStats dropRole = new ThriftMetastoreApiStats();
private final ThriftMetastoreApiStats openTransaction = new ThriftMetastoreApiStats();
private final ThriftMetastoreApiStats commitTransaction = new ThriftMetastoreApiStats();
private final ThriftMetastoreApiStats rollbackTransaction = new ThriftMetastoreApiStats();
private final ThriftMetastoreApiStats abortTransaction = new ThriftMetastoreApiStats();
private final ThriftMetastoreApiStats acquireLock = new ThriftMetastoreApiStats();
private final ThriftMetastoreApiStats checkLock = new ThriftMetastoreApiStats();
private final ThriftMetastoreApiStats unlock = new ThriftMetastoreApiStats();
Expand Down Expand Up @@ -335,9 +335,9 @@ public ThriftMetastoreApiStats getCommitTransaction()

@Managed
@Nested
public ThriftMetastoreApiStats getRollbackTransaction()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was unused, right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep

public ThriftMetastoreApiStats getAbortTransaction()
{
return rollbackTransaction;
return abortTransaction;
}

@Managed
Expand Down