Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ private long acquireLock() throws UnknownHostException, TException, InterruptedE
long duration = 0;
boolean timeout = false;

if (state.get().equals(LockState.WAITING)) {
try {
try {
if (state.get().equals(LockState.WAITING)) {
// Retry count is the typical "upper bound of retries" for Tasks.run() function. In fact, the maximum number of
// attempts the Tasks.run() would try is `retries + 1`. Here, for checking locks, we use timeout as the
// upper bound of retries. So it is just reasonable to set a large retry count. However, if we set
Expand Down Expand Up @@ -348,9 +348,13 @@ private long acquireLock() throws UnknownHostException, TException, InterruptedE
LOG.warn("Interrupted while waiting for lock.", e);
}
}, TException.class);
} catch (WaitingForLockException waitingForLockException) {
timeout = true;
duration = System.currentTimeMillis() - start;
}
} catch (WaitingForLockException waitingForLockException) {
timeout = true;
duration = System.currentTimeMillis() - start;
} finally {
if (!state.get().equals(LockState.ACQUIRED)) {
unlock(Optional.of(lockId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? If the lock wasn't acquired, why would it be correct to unlock it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While client calling lock, hive metastore creates an exclusive lock with WAITING state on the hive table first. If there is no other lock on the table, then metastore changes the state to ACQUIRED. Otherwise, updates hl_blockedby_ext_id to the latest lockId. No matter whether the lock state is acquired, the lock information will stored in HIVE_LOCKS as below.

hl_lock_ext_id hl_table hl_lock_state hl_lock_type hl_last_heartbeat hl_blockedby_ext_id
1651514 test_table a e 1614062112285  
1651516 test_table w e 1614062132342 1651514
1651518 test_table w e 1614062152426 1651516

In this case, a new lock will remain in the waiting state until the latest lock is deleted by client or txn cleanup thread.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, HiveMetaStoreClient.lock() will enqueue a new lock request into the HMS HIVE_LOCKS table and if we throw an exception here upon timeout, our lock request will be stuck there in WAITING state (potentially blocking other subsequent requests) unless we call unlock() which makes sure to remove the lock from the HMS table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup process will remove these locks when the timeout is reached, but until that time this can block other lock requests. So it is good to clean these locks when possible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, and passing the lock ID into unlock ensures that this is only for the lock request that was created when trying to acquire. Sounds good to me. Thanks for the explanation, @ZorTsou!

}
Copy link
Contributor

@pvary pvary Feb 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a little bit hard to see the final state of the code based on the PR, but I think it is this:

    } catch (WaitingForLockException waitingForLockException) {
      // timeout and do not have lock acquired
      throw new CommitFailedException("Timed out after %s ms waiting for lock on %s.%s",
          System.currentTimeMillis() - start, database, tableName);
    } finally {
      if (!state.get().equals(LockState.ACQUIRED)) {
        unlock(Optional.of(lockId));
      }
    }

    if (!state.get().equals(LockState.ACQUIRED)) {
      throw new CommitFailedException("Could not acquire the lock on %s.%s, " +
          "lock request ended in state %s", database, tableName, state);
    }
    return lockId;
  }

I have preferred the original wording where the exceptions are handled in one place. Maybe something like this:

    } catch (WaitingForLockException waitingForLockException) {
      timeout = true;
      duration = System.currentTimeMillis() - start;
    } finally {
      if (!state.get().equals(LockState.ACQUIRED)) {
        unlock(Optional.of(lockId));
      }
    }

    // timeout and do not have lock acquired
    if (timeout && !state.get().equals(LockState.ACQUIRED)) {
      throw new CommitFailedException("Timed out after %s ms waiting for lock on %s.%s",
          duration, database, tableName);
    }

    if (!state.get().equals(LockState.ACQUIRED)) {
      throw new CommitFailedException("Could not acquire the lock on %s.%s, " +
          "lock request ended in state %s", database, tableName, state);
    }

    return lockId;
  }

What do you think @ZorTsou?

Oh, and nice catch BTW! 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. It looks more clear. I updated the code, thanks.

}

Expand Down