diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 464c4fcb65f6..3d6b8e0c8402 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -1856,7 +1856,9 @@ public void testDropTableWithSuffix() throws Exception { } MetastoreTaskThread houseKeeperService = new AcidHouseKeeperService(); houseKeeperService.setConf(hiveConf); - + + Thread.sleep(MetastoreConf.getTimeVar(hiveConf, + MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS)); houseKeeperService.run(); count = TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + tableName + "'"); @@ -1945,6 +1947,8 @@ public void testDropMaterializedViewWithSuffix() throws Exception { MetastoreTaskThread houseKeeperService = new AcidHouseKeeperService(); houseKeeperService.setConf(hiveConf); + Thread.sleep(MetastoreConf.getTimeVar(hiveConf, + MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS)); houseKeeperService.run(); count = TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + mviewName + "'"); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 77d18fdffcd8..c992c5970e13 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -2373,6 +2373,7 @@ public void testCleanerForTxnToWriteId() throws Exception { txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR)); runWorker(hiveConf); runCleaner(hiveConf); + txnHandler.performWriteSetGC(); txnHandler.cleanTxnToWriteIdTable(); // After compaction/cleanup, all entries from TXN_TO_WRITE_ID should be cleaned up as all txns are committed. @@ -2416,6 +2417,7 @@ public void testCleanerForTxnToWriteId() throws Exception { // aborted txn would be removed from TXNS only after the compaction. Also, committed txn > open txn is retained. // As open txn doesn't allocate writeid, the 2 entries for aborted and committed should be retained. txnHandler.cleanEmptyAbortedAndCommittedTxns(); + txnHandler.performWriteSetGC(); txnHandler.cleanTxnToWriteIdTable(); Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause), 3, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause)); @@ -2428,6 +2430,7 @@ public void testCleanerForTxnToWriteId() throws Exception { runWorker(hiveConf); runCleaner(hiveConf); txnHandler.cleanEmptyAbortedAndCommittedTxns(); + txnHandler.performWriteSetGC(); txnHandler.cleanTxnToWriteIdTable(); Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"), 3, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID")); @@ -2439,6 +2442,7 @@ public void testCleanerForTxnToWriteId() throws Exception { // The txn opened after the compaction commit should not effect the Cleaner runCleaner(hiveConf); txnHandler.cleanEmptyAbortedAndCommittedTxns(); + txnHandler.performWriteSetGC(); txnHandler.cleanTxnToWriteIdTable(); Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"), diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java index a2446d63ad15..6301b6d02900 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java @@ -110,7 +110,7 @@ public void testRenameTable() throws Exception { "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='s'")); Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPACTION_QUEUE where CQ_TABLE='s'")); - Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(hiveConf, + Assert.assertEquals(2, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from WRITE_SET where WS_TABLE='s'")); Assert.assertEquals(3, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='s'")); @@ -125,7 +125,7 @@ public void testRenameTable() throws Exception { "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='bar'")); Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPACTION_QUEUE where CQ_TABLE='bar'")); - Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(hiveConf, + Assert.assertEquals(2, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from WRITE_SET where WS_TABLE='bar'")); Assert.assertEquals(4, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='bar'")); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java index 5787951a3bf0..628e51f2a387 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConfForTest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -78,7 +79,9 @@ public void setUp() throws Exception { HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_LOCKS_PARTITION_THRESHOLD, -1); HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, false); HiveConf.setBoolVar(conf, HiveConf.ConfVars.TXN_WRITE_X_LOCK, false); - MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true); + + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, true); + TxnHandler.ConfVars.setUseMinHistoryWriteId(true); driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build()); driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build()); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 89b1b2a13ec6..4ab68b1efb25 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.metastore.txn.service.CompactionHouseKeeperService; import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService; import org.apache.hadoop.hive.ql.Context; @@ -1235,6 +1236,9 @@ public void testWriteSetTracking4() throws Exception { */ @Test public void testWriteSetTracking5() throws Exception { + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false); + TxnHandler.ConfVars.setUseMinHistoryWriteId(false); + dropTable(new String[] {"TAB_PART"}); Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, "select count(*) from \"WRITE_SET\"")); driver.run("create table if not exists TAB_PART (a int, b int) " + @@ -2109,6 +2113,9 @@ public void testMergeUnpartitionedConflictSharedWrite() throws Exception { * @param causeConflict true to make 2 operations such that they update the same entity */ private void testMergeUnpartitioned(boolean causeConflict, boolean sharedWrite) throws Exception { + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false); + TxnHandler.ConfVars.setUseMinHistoryWriteId(false); + dropTable(new String[] {"target","source"}); conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); @@ -2873,6 +2880,9 @@ public void testMergePartitionedConflictSharedWrite() throws Exception { * @param causeConflict - true to make the operations cause a Write conflict */ private void testMergePartitioned(boolean causeConflict, boolean sharedWrite) throws Exception { + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false); + TxnHandler.ConfVars.setUseMinHistoryWriteId(false); + dropTable(new String[] {"target","source"}); conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite); @@ -3537,7 +3547,8 @@ public void testSkipAcquireLocksForExplain() throws Exception { @Test public void testInsertSnapshotIsolationMinHistoryDisabled() throws Exception { - MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, false); + TxnHandler.ConfVars.setUseMinHistoryWriteId(false); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false); testInsertSnapshotIsolation(); } @@ -3555,6 +3566,7 @@ public void testInsertSnapshotIsolation() throws Exception { swapTxnManager(txnMgr); driver.run(); + txnHandler.performWriteSetGC(); txnHandler.cleanTxnToWriteIdTable(); swapTxnManager(txnMgr2); @@ -3566,7 +3578,8 @@ public void testInsertSnapshotIsolation() throws Exception { @Test public void testUpdateSnapshotIsolationMinHistoryDisabled() throws Exception { - MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, false); + TxnHandler.ConfVars.setUseMinHistoryWriteId(false); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false); testUpdateSnapshotIsolation(); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java index b33f8917c74f..42b0d6cc6dfb 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java @@ -86,6 +86,7 @@ public class TestCompactionMetrics extends CompactorTest { public void setUp() throws Exception { MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, true); MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true); + TxnHandler.ConfVars.setUseMinHistoryLevel(true); MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true); MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true); // re-initialize metrics diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 63a0d106d27c..2292770c8e50 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -1675,10 +1675,11 @@ public enum ConfVars { "time after which transactions are declared aborted if the client has not sent a heartbeat."), TXN_OPENTXN_TIMEOUT("metastore.txn.opentxn.timeout", "hive.txn.opentxn.timeout", 1000, TimeUnit.MILLISECONDS, "Time before an open transaction operation should persist, otherwise it is considered invalid and rolled back"), + @Deprecated TXN_USE_MIN_HISTORY_LEVEL("metastore.txn.use.minhistorylevel", "hive.txn.use.minhistorylevel", true, "Set this to false, for the TxnHandler and Cleaner to not use MIN_HISTORY_LEVEL table and take advantage of openTxn optimisation.\n" + "If the table is dropped HMS will switch this flag to false, any other value changes need a restart to take effect."), - TXN_USE_MIN_HISTORY_WRITE_ID("metastore.txn.use.minhistorywriteid", "hive.txn.use.minhistorywriteid", false, + TXN_USE_MIN_HISTORY_WRITE_ID("metastore.txn.use.minhistorywriteid", "hive.txn.use.minhistorywriteid", true, "Set this to true, to avoid global minOpenTxn check in Cleaner.\n" + "If the table is dropped HMS will switch this flag to false."), LOCK_NUMRETRIES("metastore.lock.numretries", "hive.lock.numretries", 100, diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 393cb9443e5e..4d7e9349003a 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -212,7 +212,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { /** * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by - * min(max(TXNS.txn_id), min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)). + * min(max(TXNS.txn_id), min(WRITE_SET.WS_TXNID), min(Aborted TXNS.txn_id)). */ @Override @RetrySemantics.SafeToRetry @@ -374,7 +374,7 @@ private void updateStatus(CompactionInfo ci) throws MetaException { String strState = CompactionState.fromSqlConst(ci.state).toString(); LOG.debug("Marking as {}: CompactionInfo: {}", strState, ci); - CompactionInfo ciActual = jdbcResource.execute(new GetCompactionInfoHandler(ci.id, false)); + CompactionInfo ciActual = jdbcResource.execute(new GetCompactionInfoHandler(ci.id, false)); long endTime = getDbTime().getTime(); if (ciActual != null) { @@ -505,7 +505,7 @@ public long findMinOpenTxnIdForCleaner() throws MetaException { @RetrySemantics.Idempotent @Deprecated public long findMinTxnIdSeenOpen() { - if (!ConfVars.useMinHistoryLevel() || ConfVars.useMinHistoryWriteId()) { + if (!ConfVars.useMinHistoryLevel()) { return Long.MAX_VALUE; } try { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index e1e060c12564..be7917f4ce7d 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -205,7 +205,7 @@ private ConfVars() {} private boolean useMinHistoryWriteId; public boolean useMinHistoryLevel() { - return useMinHistoryLevel; + return useMinHistoryLevel && !useMinHistoryWriteId; } public void setUseMinHistoryLevel(boolean useMinHistoryLevel) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 13f32f646302..bd208f4f1a9d 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -715,7 +715,7 @@ Set findPotentialCompactions(int abortedThreshold, long abortedT /** * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by - * min(max(TXNS.txn_id), min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)). + * min(max(TXNS.txn_id), min(WRITE_SET.WS_TXNID), min(Aborted TXNS.txn_id)). */ @SqlRetry @Transactional(POOL_COMPACTOR) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CleanTxnToWriteIdTableFunction.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CleanTxnToWriteIdTableFunction.java index 6457cd27f04a..946ecb393474 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CleanTxnToWriteIdTableFunction.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CleanTxnToWriteIdTableFunction.java @@ -18,36 +18,18 @@ package org.apache.hadoop.hive.metastore.txn.jdbc.functions; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; -import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus; import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource; import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionalFunction; +import org.apache.hadoop.hive.metastore.txn.jdbc.queries.MinUncommittedTxnIdHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; -import java.sql.Types; - public class CleanTxnToWriteIdTableFunction implements TransactionalFunction { private static final Logger LOG = LoggerFactory.getLogger(CleanTxnToWriteIdTableFunction.class); - //language=SQL - private static String minHistoryLevelSql = "SELECT MIN(\"RES\".\"ID\") AS \"ID\" FROM (" + - " SELECT MAX(\"TXN_ID\") + 1 AS \"ID\" FROM \"TXNS\"" + - " UNION" + - " SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = :abortedState) \"RES\""; - //language=SQL - private static String noMinHistoryLevelSql = "SELECT MIN(\"RES\".\"ID\") AS \"ID\" FROM (" + - " SELECT MAX(\"TXN_ID\") + 1 AS \"ID\" FROM \"TXNS\"" + - " UNION" + - " SELECT MIN(\"WS_TXNID\") AS \"ID\" FROM \"WRITE_SET\"" + - " UNION" + - " SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.ABORTED + - " OR \"TXN_STATE\" = " + TxnStatus.OPEN + - " ) \"RES\""; - private final long minTxnIdSeenOpen; public CleanTxnToWriteIdTableFunction(long minTxnIdSeenOpen) { @@ -56,32 +38,17 @@ public CleanTxnToWriteIdTableFunction(long minTxnIdSeenOpen) { @Override public Void execute(MultiDataSourceJdbcResource jdbcResource) throws MetaException { - NamedParameterJdbcTemplate jdbcTemplate = jdbcResource.getJdbcTemplate(); - String sql = TxnHandler.ConfVars.useMinHistoryLevel() ? minHistoryLevelSql : noMinHistoryLevelSql; - MapSqlParameterSource params = new MapSqlParameterSource() - .addValue("abortedState", TxnStatus.ABORTED.getSqlConst(), Types.CHAR); - if (!TxnHandler.ConfVars.useMinHistoryLevel()) { - params.addValue("openState", TxnStatus.OPEN.getSqlConst(), Types.CHAR); - } - // First need to find the min_uncommitted_txnid which is currently seen by any open transactions. // If there are no txns which are currently open or aborted in the system, then current value of // max(TXNS.txn_id) could be min_uncommitted_txnid. - Long minTxnId = jdbcTemplate.query(sql, params, rs -> { - if (rs.next()) { - return rs.getLong(1); - } else { - return null; - } - }); - + Long minTxnId = jdbcResource.execute(new MinUncommittedTxnIdHandler()); if (minTxnId == null) { throw new MetaException("Transaction tables not properly initialized, no record found in TXNS"); } long minUncommitedTxnid = Math.min(minTxnId, minTxnIdSeenOpen); - // As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed // to clean up the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table. + NamedParameterJdbcTemplate jdbcTemplate = jdbcResource.getJdbcTemplate(); int rc = jdbcTemplate.update("DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" < :txnId", new MapSqlParameterSource("txnId", minUncommitedTxnid)); LOG.info("Removed {} rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: {}", rc, minUncommitedTxnid); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java index bcd5226e0148..661f7b37e6eb 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java @@ -36,11 +36,11 @@ import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; import org.apache.hadoop.hive.metastore.txn.entities.OperationType; import org.apache.hadoop.hive.metastore.txn.TxnErrorMsg; -import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.txn.entities.TxnWriteDetails; +import org.apache.hadoop.hive.metastore.txn.jdbc.InClauseBatchCommand; import org.apache.hadoop.hive.metastore.txn.jdbc.commands.DeleteReplTxnMapEntryCommand; import org.apache.hadoop.hive.metastore.txn.jdbc.commands.InsertCompletedTxnComponentsCommand; import org.apache.hadoop.hive.metastore.txn.jdbc.commands.RemoveTxnsFromMinHistoryLevelCommand; @@ -71,9 +71,11 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.hadoop.hive.metastore.txn.TxnHandler.ConfVars; import static org.apache.hadoop.hive.metastore.txn.TxnHandler.notifyCommitOrAbortEvent; import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn; import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier; @@ -126,7 +128,7 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce throw new RollbackException(null); } assert targetTxnIds.size() == 1; - txnid = targetTxnIds.get(0); + txnid = targetTxnIds.getFirst(); } /** @@ -154,21 +156,32 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce TxnUtils.raiseTxnUnexpectedState(actualTxnStatus, txnid); } - String conflictSQLSuffix = "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"=" + txnid + " AND \"TC_OPERATION_TYPE\" IN (" + - OperationType.UPDATE + "," + OperationType.DELETE + ")"; + String conflictSQLSuffix = String.format(""" + FROM "TXN_COMPONENTS" WHERE "TC_TXNID" = :txnId AND "TC_OPERATION_TYPE" IN (%s, %s) + """, OperationType.UPDATE, OperationType.DELETE); long tempCommitId = TxnUtils.generateTemporaryId(); if (txnType == TxnType.SOFT_DELETE || txnType == TxnType.COMPACTION) { - new AcquireTxnLockFunction(false).execute(jdbcResource); + if (!ConfVars.useMinHistoryWriteId()) { + new AcquireTxnLockFunction(false).execute(jdbcResource); + } commitId = jdbcResource.execute(new GetHighWaterMarkHandler()); } else if (txnType != TxnType.READ_ONLY && !isReplayedReplTxn) { - String writeSetInsertSql = "INSERT INTO \"WRITE_SET\" (\"WS_DATABASE\", \"WS_TABLE\", \"WS_PARTITION\"," + - " \"WS_TXNID\", \"WS_COMMIT_ID\", \"WS_OPERATION_TYPE\")" + - " SELECT DISTINCT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_TXNID\", " + tempCommitId + ", \"TC_OPERATION_TYPE\" "; + String writeSetInsertSql = """ + INSERT INTO "WRITE_SET" + ("WS_DATABASE", "WS_TABLE", "WS_PARTITION", "WS_TXNID", "WS_COMMIT_ID", "WS_OPERATION_TYPE") + SELECT DISTINCT + "TC_DATABASE", "TC_TABLE", "TC_PARTITION", "TC_TXNID", + :commitId, + "TC_OPERATION_TYPE" + """; boolean isUpdateOrDelete = Boolean.TRUE.equals(jdbcResource.getJdbcTemplate().query( - jdbcResource.getSqlGenerator().addLimitClause(1, "\"TC_OPERATION_TYPE\" " + conflictSQLSuffix), + jdbcResource.getSqlGenerator() + .addLimitClause(1, "\"TC_OPERATION_TYPE\" " + conflictSQLSuffix), + new MapSqlParameterSource() + .addValue("txnId", txnid), ResultSet::next)); if (isUpdateOrDelete) { @@ -188,11 +201,13 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce */ Object undoWriteSetForCurrentTxn = context.createSavepoint(); jdbcResource.getJdbcTemplate().update( - writeSetInsertSql + (TxnHandler.ConfVars.useMinHistoryLevel() ? conflictSQLSuffix : - "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"= :txnId AND \"TC_OPERATION_TYPE\" <> :type"), + writeSetInsertSql + (ConfVars.useMinHistoryLevel() ? conflictSQLSuffix : + "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"= :txnId" + ( + (txnType != TxnType.REBALANCE_COMPACTION) ? "" : " AND \"TC_OPERATION_TYPE\" <> :type")), new MapSqlParameterSource() .addValue("txnId", txnid) - .addValue("type", OperationType.COMPACT.getSqlConst())); + .addValue("type", OperationType.COMPACT.getSqlConst()) + .addValue("commitId", tempCommitId)); /** * This S4U will mutex with other commitTxn() and openTxns(). @@ -235,12 +250,11 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce throw new TxnAbortedException(msg); } } - } else if (!TxnHandler.ConfVars.useMinHistoryLevel()) { - jdbcResource.getJdbcTemplate().update(writeSetInsertSql + "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = :txnId AND \"TC_OPERATION_TYPE\" <> :type", + } else if (!ConfVars.useMinHistoryLevel()) { + jdbcResource.getJdbcTemplate().update(writeSetInsertSql + "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = :txnId", new MapSqlParameterSource() .addValue("txnId", txnid) - .addValue("type", OperationType.COMPACT.getSqlConst())); - commitId = jdbcResource.execute(new GetHighWaterMarkHandler()); + .addValue("commitId", jdbcResource.execute(new GetHighWaterMarkHandler()))); } } else { /* @@ -256,7 +270,6 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce assert true; } - if (txnType != TxnType.READ_ONLY && !isReplayedReplTxn && !MetaStoreServerUtils.isCompactionTxn(txnType)) { moveTxnComponentsToCompleted(jdbcResource, txnid, isUpdateDelete); } else if (isReplayedReplTxn) { @@ -266,8 +279,7 @@ public TxnType execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExce jdbcResource.execute(new DeleteReplTxnMapEntryCommand(sourceTxnId, rqst.getReplPolicy())); } updateWSCommitIdAndCleanUpMetadata(jdbcResource, txnid, txnType, commitId, tempCommitId); - jdbcResource.execute(new RemoveTxnsFromMinHistoryLevelCommand(ImmutableList.of(txnid))); - jdbcResource.execute(new RemoveWriteIdsFromMinHistoryCommand(ImmutableList.of(txnid))); + if (rqst.isSetKeyValue()) { updateKeyValueAssociatedWithTxn(jdbcResource, rqst); } @@ -554,11 +566,8 @@ private void updateKeyValueAssociatedWithTxn(MultiDataSourceJdbcResource jdbcRes } } - /** - * See overridden method in CompactionTxnHandler also. - */ - private void updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbcResource, long txnid, TxnType txnType, - Long commitId, long tempId) throws MetaException { + private void updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbcResource, + long txnid, TxnType txnType, Long commitId, long tempId) throws MetaException { List queryBatch = new ArrayList<>(6); // update write_set with real commitId if (commitId != null) { @@ -575,13 +584,21 @@ private void updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbc if (txnType == TxnType.MATER_VIEW_REBUILD) { queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); } - if (txnType == TxnType.SOFT_DELETE || txnType == TxnType.COMPACTION) { + if (txnType == TxnType.SOFT_DELETE || MetaStoreServerUtils.isCompactionTxn(txnType)) { queryBatch.add("UPDATE \"COMPACTION_QUEUE\" SET \"CQ_NEXT_TXN_ID\" = " + commitId + ", \"CQ_COMMIT_TIME\" = " + getEpochFn(jdbcResource.getDatabaseProduct()) + " WHERE \"CQ_TXN_ID\" = " + txnid); } - + // execute all in one batch jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(queryBatch.toArray(new String[0])); + + List, InClauseBatchCommand>> commands = List.of( + RemoveTxnsFromMinHistoryLevelCommand::new, + RemoveWriteIdsFromMinHistoryCommand::new + ); + for (var cmd : commands) { + jdbcResource.execute(cmd.apply(ImmutableList.of(txnid))); + } } /** diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/AbortTxnInfoHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/AbortTxnInfoHandler.java deleted file mode 100644 index cf7f50956d49..000000000000 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/AbortTxnInfoHandler.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore.txn.jdbc.queries; - -import org.apache.hadoop.hive.metastore.DatabaseProduct; -import org.apache.hadoop.hive.metastore.api.CompactionType; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; -import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus; -import org.apache.hadoop.hive.metastore.txn.TxnStore; -import org.apache.hadoop.hive.metastore.txn.jdbc.QueryHandler; -import org.springframework.dao.DataAccessException; -import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; -import org.springframework.jdbc.core.namedparam.SqlParameterSource; - -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Types; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.hadoop.hive.metastore.txn.TxnStore.READY_FOR_CLEANING; -import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn; - -public class AbortTxnInfoHandler implements QueryHandler> { - - // Three inner sub-queries which are under left-join to fetch the required data for aborted txns. - //language=SQL - private static final String SELECT_ABORTS_WITH_MIN_OPEN_WRITETXN_QUERY = - " \"res1\".\"TC_DATABASE\" AS \"DB\", \"res1\".\"TC_TABLE\" AS \"TBL\", \"res1\".\"TC_PARTITION\" AS \"PART\", " + - " \"res1\".\"MIN_TXN_START_TIME\" AS \"MIN_TXN_START_TIME\", \"res1\".\"ABORTED_TXN_COUNT\" AS \"ABORTED_TXN_COUNT\", " + - " \"res2\".\"MIN_OPEN_WRITE_TXNID\" AS \"MIN_OPEN_WRITE_TXNID\", \"res3\".\"RETRY_RETENTION\" AS \"RETRY_RETENTION\", " + - " \"res3\".\"ID\" AS \"RETRY_CQ_ID\" " + - " FROM " + - // First sub-query - Gets the aborted txns with min txn start time, number of aborted txns - // for corresponding db, table, partition. - " ( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", MIN(\"TXN_STARTED\") AS \"MIN_TXN_START_TIME\", " + - " COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\", \"TXN_COMPONENTS\" " + - " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = :abortedState" + - " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" %s ) \"res1\" " + - " LEFT JOIN" + - // Second sub-query - Gets the min open txn id for corresponding db, table, partition. - "( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", MIN(\"TC_TXNID\") AS \"MIN_OPEN_WRITE_TXNID\" " + - " FROM \"TXNS\", \"TXN_COMPONENTS\" " + - " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = :openState" + - " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" ) \"res2\"" + - " ON \"res1\".\"TC_DATABASE\" = \"res2\".\"TC_DATABASE\"" + - " AND \"res1\".\"TC_TABLE\" = \"res2\".\"TC_TABLE\"" + - " AND (\"res1\".\"TC_PARTITION\" = \"res2\".\"TC_PARTITION\" " + - " OR (\"res1\".\"TC_PARTITION\" IS NULL AND \"res2\".\"TC_PARTITION\" IS NULL)) " + - " LEFT JOIN " + - // Third sub-query - Gets the retry entries for corresponding db, table, partition. - "( SELECT \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", MAX(\"CQ_ID\") AS \"ID\", " + - " MAX(\"CQ_RETRY_RETENTION\") AS \"RETRY_RETENTION\", " + - " MIN(\"CQ_COMMIT_TIME\") - %s + MAX(\"CQ_RETRY_RETENTION\") AS \"RETRY_RECORD_CHECK\" FROM \"COMPACTION_QUEUE\" " + - " WHERE \"CQ_TYPE\" = :type" + - " GROUP BY \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\") \"res3\" " + - " ON \"res1\".\"TC_DATABASE\" = \"res3\".\"CQ_DATABASE\" " + - " AND \"res1\".\"TC_TABLE\" = \"res3\".\"CQ_TABLE\" " + - " AND (\"res1\".\"TC_PARTITION\" = \"res3\".\"CQ_PARTITION\" " + - " OR (\"res1\".\"TC_PARTITION\" IS NULL AND \"res3\".\"CQ_PARTITION\" IS NULL))" + - " WHERE \"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR \"res3\".\"RETRY_RECORD_CHECK\" IS NULL"; - - private final long abortedTimeThreshold; - private final int abortedThreshold; - private final int fetchSize; - - public String getParameterizedQueryString(DatabaseProduct dbProduct) throws MetaException { - return dbProduct.addLimitClause( - fetchSize, - String.format(AbortTxnInfoHandler.SELECT_ABORTS_WITH_MIN_OPEN_WRITETXN_QUERY, - abortedTimeThreshold >= 0 ? "" : " HAVING COUNT(*) > " + abortedThreshold, getEpochFn(dbProduct))); - } - - @Override - public SqlParameterSource getQueryParameters() { - return new MapSqlParameterSource() - .addValue("abortedState", TxnStatus.ABORTED.getSqlConst(), Types.CHAR) - .addValue("openState", TxnStatus.OPEN.getSqlConst(), Types.CHAR) - .addValue("type", Character.toString(TxnStore.ABORT_TXN_CLEANUP_TYPE), Types.CHAR); - } - - @Override - public List extractData(ResultSet rs) throws DataAccessException, SQLException { - List readyToCleanAborts = new ArrayList<>(); - long systemTime = System.currentTimeMillis(); - boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0; - while (rs.next()) { - boolean pastTimeThreshold = - checkAbortedTimeThreshold && rs.getLong("MIN_TXN_START_TIME") + abortedTimeThreshold < systemTime; - int numAbortedTxns = rs.getInt("ABORTED_TXN_COUNT"); - if (numAbortedTxns > abortedThreshold || pastTimeThreshold) { - CompactionInfo info = new CompactionInfo(); - info.dbname = rs.getString("DB"); - info.tableName = rs.getString("TBL"); - info.partName = rs.getString("PART"); - // In this case, this field contains min open write txn ID. - long value = rs.getLong("MIN_OPEN_WRITE_TXNID"); - info.minOpenWriteTxnId = value > 0 ? value : Long.MAX_VALUE; - // The specific type, state assigned to abort cleanup. - info.type = CompactionType.ABORT_TXN_CLEANUP; - info.state = READY_FOR_CLEANING; - info.retryRetention = rs.getLong("RETRY_RETENTION"); - info.id = rs.getLong("RETRY_CQ_ID"); - readyToCleanAborts.add(info); - } - } - return readyToCleanAborts; - } - - public AbortTxnInfoHandler(long abortedTimeThreshold, int abortedThreshold, int fetchSize) { - this.abortedTimeThreshold = abortedTimeThreshold; - this.abortedThreshold = abortedThreshold; - this.fetchSize = fetchSize; - } -} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/LatestTxnIdInConflictHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/LatestTxnIdInConflictHandler.java index f63748ca9662..ccbe0c512da4 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/LatestTxnIdInConflictHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/LatestTxnIdInConflictHandler.java @@ -58,15 +58,15 @@ public String getParameterizedQueryString(DatabaseProduct databaseProduct) throw " ) \"CUR\"" + " ON \"COMMITTED\".\"WS_DATABASE\" = \"CUR\".\"TC_DATABASE\"" + " AND \"COMMITTED\".\"WS_TABLE\" = \"CUR\".\"TC_TABLE\"" + - (TxnHandler.ConfVars.useMinHistoryLevel() ? "" : - " AND \"COMMITTED\".\"WS_OPERATION_TYPE\" != :wsType") + // For partitioned table we always track writes at partition level (never at table) // and for non partitioned - always at table level, thus the same table should never // have entries with partition key and w/o " AND (\"COMMITTED\".\"WS_PARTITION\" = \"CUR\".\"TC_PARTITION\" OR" + " \"CUR\".\"TC_PARTITION\" IS NULL) " + // txns overlap - " WHERE \"CUR\".\"TC_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\""; + " WHERE \"CUR\".\"TC_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\"" + + (TxnHandler.ConfVars.useMinHistoryLevel() ? "" : + " AND \"COMMITTED\".\"WS_OPERATION_TYPE\" != :wsType"); } @Override diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/MinUncommittedTxnIdHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/MinUncommittedTxnIdHandler.java index 327963a5a8d5..716fdd19e3aa 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/MinUncommittedTxnIdHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/MinUncommittedTxnIdHandler.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus; import org.apache.hadoop.hive.metastore.txn.jdbc.QueryHandler; import org.springframework.dao.DataAccessException; @@ -48,8 +49,8 @@ public class MinUncommittedTxnIdHandler implements QueryHandler { private final boolean useMinHistoryLevel; - public MinUncommittedTxnIdHandler(boolean useMinHistoryLevel) { - this.useMinHistoryLevel = useMinHistoryLevel; + public MinUncommittedTxnIdHandler() { + this.useMinHistoryLevel = TxnHandler.ConfVars.useMinHistoryLevel(); } @Override