diff --git a/src/main/java/org/vanilladb/core/storage/tx/concurrency/LockTable.java b/src/main/java/org/vanilladb/core/storage/tx/concurrency/LockTable.java index 9eab4113..76f2ae32 100644 --- a/src/main/java/org/vanilladb/core/storage/tx/concurrency/LockTable.java +++ b/src/main/java/org/vanilladb/core/storage/tx/concurrency/LockTable.java @@ -95,7 +95,7 @@ public void run() { } } - private Map lockerMap = new HashMap(); + private Map lockerMap = new ConcurrentHashMap<>(); private Map> lockByMap = new ConcurrentHashMap>(); private Set txnsToBeAborted = Collections.synchronizedSet(new HashSet()); private Map txWaitMap = new ConcurrentHashMap(); @@ -173,30 +173,33 @@ private void toBeAbortedAndNotified(long txNum) { void sLock(Object obj, long txNum) { Object anchor = getAnchor(obj); txWaitMap.put(txNum, anchor); - synchronized (anchor) { - Lockers lks = prepareLockers(obj); + try { + synchronized (anchor) { + Lockers lks = prepareLockers(obj); - if (hasSLock(lks, txNum)) - return; + if (hasSLock(lks, txNum)) + return; - try { - long timestamp = System.currentTimeMillis(); - while (!sLockable(lks, txNum) && !waitingTooLong(timestamp)) { - avoidDeadlock(lks, txNum, S_LOCK); - lks.requestSet.add(txNum); + try { + long timestamp = System.currentTimeMillis(); + while (!sLockable(lks, txNum) && !waitingTooLong(timestamp)) { + avoidDeadlock(lks, txNum, S_LOCK); + lks.requestSet.add(txNum); - anchor.wait(MAX_TIME); - lks.requestSet.remove(txNum); + anchor.wait(MAX_TIME); + lks.requestSet.remove(txNum); + } + if (!sLockable(lks, txNum)) + throw new LockAbortException(); + lks.sLockers.add(txNum); + getObjectSet(txNum).add(obj); + } catch (InterruptedException e) { + throw new LockAbortException("abort tx." + txNum + " by interrupted"); } - if (!sLockable(lks, txNum)) - throw new LockAbortException(); - lks.sLockers.add(txNum); - getObjectSet(txNum).add(obj); - } catch (InterruptedException e) { - throw new LockAbortException("abort tx." + txNum + " by interrupted"); } + } finally { + txWaitMap.remove(txNum); } - txWaitMap.remove(txNum); } /** @@ -212,30 +215,33 @@ void sLock(Object obj, long txNum) { void xLock(Object obj, long txNum) { Object anchor = getAnchor(obj); txWaitMap.put(txNum, anchor); - synchronized (anchor) { - Lockers lks = prepareLockers(obj); + try { + synchronized (anchor) { + Lockers lks = prepareLockers(obj); - if (hasXLock(lks, txNum)) - return; + if (hasXLock(lks, txNum)) + return; - try { - long timestamp = System.currentTimeMillis(); - while (!xLockable(lks, txNum) && !waitingTooLong(timestamp)) { - avoidDeadlock(lks, txNum, X_LOCK); - lks.requestSet.add(txNum); + try { + long timestamp = System.currentTimeMillis(); + while (!xLockable(lks, txNum) && !waitingTooLong(timestamp)) { + avoidDeadlock(lks, txNum, X_LOCK); + lks.requestSet.add(txNum); - anchor.wait(MAX_TIME); - lks.requestSet.remove(txNum); - } - if (!xLockable(lks, txNum)) + anchor.wait(MAX_TIME); + lks.requestSet.remove(txNum); + } + if (!xLockable(lks, txNum)) + throw new LockAbortException(); + lks.xLocker = txNum; + getObjectSet(txNum).add(obj); + } catch (InterruptedException e) { throw new LockAbortException(); - lks.xLocker = txNum; - getObjectSet(txNum).add(obj); - } catch (InterruptedException e) { - throw new LockAbortException(); + } } + } finally { + txWaitMap.remove(txNum); } - txWaitMap.remove(txNum); } /** @@ -251,30 +257,33 @@ void xLock(Object obj, long txNum) { void sixLock(Object obj, long txNum) { Object anchor = getAnchor(obj); txWaitMap.put(txNum, anchor); - synchronized (anchor) { - Lockers lks = prepareLockers(obj); + try { + synchronized (anchor) { + Lockers lks = prepareLockers(obj); - if (hasSixLock(lks, txNum)) - return; + if (hasSixLock(lks, txNum)) + return; - try { - long timestamp = System.currentTimeMillis(); - while (!sixLockable(lks, txNum) && !waitingTooLong(timestamp)) { - avoidDeadlock(lks, txNum, SIX_LOCK); - lks.requestSet.add(txNum); + try { + long timestamp = System.currentTimeMillis(); + while (!sixLockable(lks, txNum) && !waitingTooLong(timestamp)) { + avoidDeadlock(lks, txNum, SIX_LOCK); + lks.requestSet.add(txNum); - anchor.wait(MAX_TIME); - lks.requestSet.remove(txNum); - } - if (!sixLockable(lks, txNum)) + anchor.wait(MAX_TIME); + lks.requestSet.remove(txNum); + } + if (!sixLockable(lks, txNum)) + throw new LockAbortException(); + lks.sixLocker = txNum; + getObjectSet(txNum).add(obj); + } catch (InterruptedException e) { throw new LockAbortException(); - lks.sixLocker = txNum; - getObjectSet(txNum).add(obj); - } catch (InterruptedException e) { - throw new LockAbortException(); + } } + } finally { + txWaitMap.remove(txNum); } - txWaitMap.remove(txNum); } /** @@ -289,28 +298,31 @@ void sixLock(Object obj, long txNum) { void isLock(Object obj, long txNum) { Object anchor = getAnchor(obj); txWaitMap.put(txNum, anchor); - synchronized (anchor) { - Lockers lks = prepareLockers(obj); - if (hasIsLock(lks, txNum)) - return; - try { - long timestamp = System.currentTimeMillis(); - while (!isLockable(lks, txNum) && !waitingTooLong(timestamp)) { - avoidDeadlock(lks, txNum, IS_LOCK); - lks.requestSet.add(txNum); - - anchor.wait(MAX_TIME); - lks.requestSet.remove(txNum); - } - if (!isLockable(lks, txNum)) + try { + synchronized (anchor) { + Lockers lks = prepareLockers(obj); + if (hasIsLock(lks, txNum)) + return; + try { + long timestamp = System.currentTimeMillis(); + while (!isLockable(lks, txNum) && !waitingTooLong(timestamp)) { + avoidDeadlock(lks, txNum, IS_LOCK); + lks.requestSet.add(txNum); + + anchor.wait(MAX_TIME); + lks.requestSet.remove(txNum); + } + if (!isLockable(lks, txNum)) + throw new LockAbortException(); + lks.isLockers.add(txNum); + getObjectSet(txNum).add(obj); + } catch (InterruptedException e) { throw new LockAbortException(); - lks.isLockers.add(txNum); - getObjectSet(txNum).add(obj); - } catch (InterruptedException e) { - throw new LockAbortException(); + } } + } finally { + txWaitMap.remove(txNum); } - txWaitMap.remove(txNum); } /** @@ -325,30 +337,33 @@ void isLock(Object obj, long txNum) { void ixLock(Object obj, long txNum) { Object anchor = getAnchor(obj); txWaitMap.put(txNum, anchor); - synchronized (anchor) { - Lockers lks = prepareLockers(obj); + try { + synchronized (anchor) { + Lockers lks = prepareLockers(obj); - if (hasIxLock(lks, txNum)) - return; + if (hasIxLock(lks, txNum)) + return; - try { - long timestamp = System.currentTimeMillis(); - while (!ixLockable(lks, txNum) && !waitingTooLong(timestamp)) { - avoidDeadlock(lks, txNum, IX_LOCK); - lks.requestSet.add(txNum); + try { + long timestamp = System.currentTimeMillis(); + while (!ixLockable(lks, txNum) && !waitingTooLong(timestamp)) { + avoidDeadlock(lks, txNum, IX_LOCK); + lks.requestSet.add(txNum); - anchor.wait(MAX_TIME); - lks.requestSet.remove(txNum); - } - if (!ixLockable(lks, txNum)) + anchor.wait(MAX_TIME); + lks.requestSet.remove(txNum); + } + if (!ixLockable(lks, txNum)) + throw new LockAbortException(); + lks.ixLockers.add(txNum); + getObjectSet(txNum).add(obj); + } catch (InterruptedException e) { throw new LockAbortException(); - lks.ixLockers.add(txNum); - getObjectSet(txNum).add(obj); - } catch (InterruptedException e) { - throw new LockAbortException(); + } } + } finally { + txWaitMap.remove(txNum); } - txWaitMap.remove(txNum); } /**