Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,13 @@ public LockResponse checkLock(long lockId)
return runWithHandle(() -> delegate.checkLock(lockId));
}

@Override
public void unlock(long lockId)
throws TException
{
runWithHandle(() -> delegate.unlock(lockId));
}

@Override
public String getValidWriteIds(List<String> tableList, long currentTransactionId)
throws TException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1676,13 +1676,12 @@ private void acquireSharedLock(DataOperationType operation, boolean isDynamicPar
}
}));

long lockId = response.getLockid();
long waitStart = nanoTime();
while (response.getState() == LockState.WAITING) {
long lockId = response.getLockid();

if (Duration.nanosSince(waitStart).compareTo(maxWaitForLock) > 0) {
// timed out
throw new TrinoException(HIVE_TABLE_LOCK_NOT_ACQUIRED, format("Timed out waiting for lock %d in hive transaction %s for query %s", lockId, transactionId, queryId));
throw unlockSuppressing(identity, lockId, new TrinoException(HIVE_TABLE_LOCK_NOT_ACQUIRED, format("Timed out waiting for lock %d in hive transaction %s for query %s", lockId, transactionId, queryId)));
}

log.debug("Waiting for lock %d in hive transaction %s for query %s", lockId, transactionId, queryId);
Expand All @@ -1697,7 +1696,7 @@ private void acquireSharedLock(DataOperationType operation, boolean isDynamicPar
}

if (response.getState() != LockState.ACQUIRED) {
throw new TrinoException(HIVE_TABLE_LOCK_NOT_ACQUIRED, "Could not acquire lock. Lock in state " + response.getState());
throw unlockSuppressing(identity, lockId, new TrinoException(HIVE_TABLE_LOCK_NOT_ACQUIRED, "Could not acquire lock. Lock in state " + response.getState()));
}
}
catch (TException e) {
Expand All @@ -1708,6 +1707,37 @@ private void acquireSharedLock(DataOperationType operation, boolean isDynamicPar
}
}

private <T extends Exception> T unlockSuppressing(HiveIdentity identity, long lockId, T exception)
{
try {
unlockTableLock(identity, lockId);
}
catch (RuntimeException e) {
exception.addSuppressed(e);
}
return exception;
}

private void unlockTableLock(HiveIdentity identity, long lockId)
{
try {
retry()
.stopOn(NoSuchTxnException.class, NoSuchLockException.class, TxnAbortedException.class, MetaException.class)
.run("unlock", stats.getUnlock().wrap(() -> {
try (ThriftMetastoreClient metastoreClient = createMetastoreClient(identity)) {
metastoreClient.unlock(lockId);
}
return null;
}));
}
catch (TException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, e);
}
catch (Exception e) {
throw propagate(e);
}
}

private static LockComponent createLockComponentForOperation(SchemaTableName table, DataOperationType operation, boolean isDynamicPartitionWrite, Optional<String> partitionName)
{
requireNonNull(table, "table is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.hadoop.hive.metastore.api.TableStatsRequest;
import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
Expand Down Expand Up @@ -503,6 +504,13 @@ public LockResponse checkLock(long lockId)
return client.check_lock(new CheckLockRequest(lockId));
}

@Override
public void unlock(long lockId)
throws TException
{
client.unlock(new UnlockRequest(lockId));
}

@Override
public String getValidWriteIds(List<String> tableList, long currentTransactionId)
throws TException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ LockResponse acquireLock(LockRequest lockRequest)
LockResponse checkLock(long lockId)
throws TException;

void unlock(long lockId)
throws TException;

String getValidWriteIds(List<String> tableList, long currentTransactionId)
throws TException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class ThriftMetastoreStats
private final ThriftMetastoreApiStats rollbackTransaction = new ThriftMetastoreApiStats();
private final ThriftMetastoreApiStats acquireLock = new ThriftMetastoreApiStats();
private final ThriftMetastoreApiStats checkLock = new ThriftMetastoreApiStats();
private final ThriftMetastoreApiStats unlock = new ThriftMetastoreApiStats();
private final ThriftMetastoreApiStats validWriteIds = new ThriftMetastoreApiStats();
private final ThriftMetastoreApiStats allocateWriteId = new ThriftMetastoreApiStats();
private final ThriftMetastoreApiStats updateTableWriteId = new ThriftMetastoreApiStats();
Expand Down Expand Up @@ -353,6 +354,13 @@ public ThriftMetastoreApiStats getCheckLock()
return checkLock;
}

@Managed
@Nested
public ThriftMetastoreApiStats getUnlock()
{
return unlock;
}

@Managed
@Nested
public ThriftMetastoreApiStats getValidWriteIds()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,13 @@ public LockResponse checkLock(long lockId)
throw new UnsupportedOperationException();
}

@Override
public void unlock(long lockId)
throws TException
{
throw new UnsupportedOperationException();
}

@Override
public String getValidWriteIds(List<String> tableList, long currentTransactionId)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.trino.spi.TrinoException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.security.ConnectorIdentity;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.mapred.FileInputFormat;
Expand Down Expand Up @@ -221,13 +220,11 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata)
}

PrincipalPrivileges privileges = buildInitialPrivilegeSet(table.getOwner());
ConnectorIdentity identity = ConnectorIdentity.ofUser(table.getOwner());
HiveIdentity context = new HiveIdentity(identity);
if (base == null) {
metastore.createTable(context, table, privileges);
metastore.createTable(identity, table, privileges);
}
else {
metastore.replaceTable(context, database, tableName, table, privileges);
metastore.replaceTable(identity, database, tableName, table, privileges);
}

shouldRefresh = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class TestIcebergV2
{
private HiveMetastore metastore;
private HdfsEnvironment hdfsEnvironment;
private java.nio.file.Path tempDir;
private File metastoreDir;

@Override
Expand All @@ -75,8 +76,8 @@ protected QueryRunner createQueryRunner()
HdfsConfiguration configuration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(config), ImmutableSet.of());
hdfsEnvironment = new HdfsEnvironment(configuration, config, new NoHdfsAuthentication());

File tempDir = Files.createTempDirectory("test_iceberg_v2").toFile();
metastoreDir = new File(tempDir, "iceberg_data");
tempDir = Files.createTempDirectory("test_iceberg_v2");
metastoreDir = tempDir.resolve("iceberg_data").toFile();
metastore = createTestingFileHiveMetastore(metastoreDir);

return createIcebergQueryRunner(ImmutableMap.of(), ImmutableMap.of(), ImmutableList.of(NATION), Optional.of(metastoreDir));
Expand All @@ -86,7 +87,7 @@ protected QueryRunner createQueryRunner()
public void tearDown()
throws IOException
{
deleteRecursively(metastoreDir.getParentFile().toPath(), ALLOW_INSECURE);
deleteRecursively(tempDir, ALLOW_INSECURE);
}

@Test
Expand Down