Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -1856,7 +1856,9 @@ public void testDropTableWithSuffix() throws Exception {
}
MetastoreTaskThread houseKeeperService = new AcidHouseKeeperService();
houseKeeperService.setConf(hiveConf);


Thread.sleep(MetastoreConf.getTimeVar(hiveConf,
MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
houseKeeperService.run();
count = TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + tableName + "'");
Expand Down Expand Up @@ -1945,6 +1947,8 @@ public void testDropMaterializedViewWithSuffix() throws Exception {
MetastoreTaskThread houseKeeperService = new AcidHouseKeeperService();
houseKeeperService.setConf(hiveConf);

Thread.sleep(MetastoreConf.getTimeVar(hiveConf,
MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
houseKeeperService.run();
count = TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + mviewName + "'");
Expand Down
4 changes: 4 additions & 0 deletions ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
Original file line number Diff line number Diff line change
Expand Up @@ -2373,6 +2373,7 @@ public void testCleanerForTxnToWriteId() throws Exception {
txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR));
runWorker(hiveConf);
runCleaner(hiveConf);
txnHandler.performWriteSetGC();
txnHandler.cleanTxnToWriteIdTable();

// After compaction/cleanup, all entries from TXN_TO_WRITE_ID should be cleaned up as all txns are committed.
Expand Down Expand Up @@ -2416,6 +2417,7 @@ public void testCleanerForTxnToWriteId() throws Exception {
// aborted txn would be removed from TXNS only after the compaction. Also, committed txn > open txn is retained.
// As open txn doesn't allocate writeid, the 2 entries for aborted and committed should be retained.
txnHandler.cleanEmptyAbortedAndCommittedTxns();
txnHandler.performWriteSetGC();
txnHandler.cleanTxnToWriteIdTable();
Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause),
3, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause));
Expand All @@ -2428,6 +2430,7 @@ public void testCleanerForTxnToWriteId() throws Exception {
runWorker(hiveConf);
runCleaner(hiveConf);
txnHandler.cleanEmptyAbortedAndCommittedTxns();
txnHandler.performWriteSetGC();
txnHandler.cleanTxnToWriteIdTable();
Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"),
3, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID"));
Expand All @@ -2439,6 +2442,7 @@ public void testCleanerForTxnToWriteId() throws Exception {
// The txn opened after the compaction commit should not effect the Cleaner
runCleaner(hiveConf);
txnHandler.cleanEmptyAbortedAndCommittedTxns();
txnHandler.performWriteSetGC();
txnHandler.cleanTxnToWriteIdTable();

Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"),
Expand Down
4 changes: 2 additions & 2 deletions ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testRenameTable() throws Exception {
"select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='s'"));
Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from COMPACTION_QUEUE where CQ_TABLE='s'"));
Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(hiveConf,
Assert.assertEquals(2, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from WRITE_SET where WS_TABLE='s'"));
Assert.assertEquals(3, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='s'"));
Expand All @@ -125,7 +125,7 @@ public void testRenameTable() throws Exception {
"select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='bar'"));
Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from COMPACTION_QUEUE where CQ_TABLE='bar'"));
Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(hiveConf,
Assert.assertEquals(2, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from WRITE_SET where WS_TABLE='bar'"));
Assert.assertEquals(4, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='bar'"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfForTest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
Expand Down Expand Up @@ -78,7 +79,9 @@ public void setUp() throws Exception {
HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_LOCKS_PARTITION_THRESHOLD, -1);
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, false);
HiveConf.setBoolVar(conf, HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true);

MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, true);
TxnHandler.ConfVars.setUseMinHistoryWriteId(true);

driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build());
driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.metastore.txn.service.CompactionHouseKeeperService;
import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService;
import org.apache.hadoop.hive.ql.Context;
Expand Down Expand Up @@ -1235,6 +1236,9 @@ public void testWriteSetTracking4() throws Exception {
*/
@Test
public void testWriteSetTracking5() throws Exception {
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false);
TxnHandler.ConfVars.setUseMinHistoryWriteId(false);

dropTable(new String[] {"TAB_PART"});
Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, "select count(*) from \"WRITE_SET\""));
driver.run("create table if not exists TAB_PART (a int, b int) " +
Expand Down Expand Up @@ -2109,6 +2113,9 @@ public void testMergeUnpartitionedConflictSharedWrite() throws Exception {
* @param causeConflict true to make 2 operations such that they update the same entity
*/
private void testMergeUnpartitioned(boolean causeConflict, boolean sharedWrite) throws Exception {
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false);
TxnHandler.ConfVars.setUseMinHistoryWriteId(false);

dropTable(new String[] {"target","source"});
conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite);

Expand Down Expand Up @@ -2873,6 +2880,9 @@ public void testMergePartitionedConflictSharedWrite() throws Exception {
* @param causeConflict - true to make the operations cause a Write conflict
*/
private void testMergePartitioned(boolean causeConflict, boolean sharedWrite) throws Exception {
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false);
TxnHandler.ConfVars.setUseMinHistoryWriteId(false);

dropTable(new String[] {"target","source"});
conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite);

Expand Down Expand Up @@ -3537,7 +3547,8 @@ public void testSkipAcquireLocksForExplain() throws Exception {

@Test
public void testInsertSnapshotIsolationMinHistoryDisabled() throws Exception {
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, false);
TxnHandler.ConfVars.setUseMinHistoryWriteId(false);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false);
testInsertSnapshotIsolation();
}

Expand Down Expand Up @@ -3566,7 +3577,8 @@ public void testInsertSnapshotIsolation() throws Exception {

@Test
public void testUpdateSnapshotIsolationMinHistoryDisabled() throws Exception {
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, false);
TxnHandler.ConfVars.setUseMinHistoryWriteId(false);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false);
testUpdateSnapshotIsolation();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class TestCompactionMetrics extends CompactorTest {
public void setUp() throws Exception {
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, true);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true);
TxnHandler.ConfVars.setUseMinHistoryLevel(true);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
// re-initialize metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1675,10 +1675,11 @@ public enum ConfVars {
"time after which transactions are declared aborted if the client has not sent a heartbeat."),
TXN_OPENTXN_TIMEOUT("metastore.txn.opentxn.timeout", "hive.txn.opentxn.timeout", 1000, TimeUnit.MILLISECONDS,
"Time before an open transaction operation should persist, otherwise it is considered invalid and rolled back"),
@Deprecated
TXN_USE_MIN_HISTORY_LEVEL("metastore.txn.use.minhistorylevel", "hive.txn.use.minhistorylevel", true,
"Set this to false, for the TxnHandler and Cleaner to not use MIN_HISTORY_LEVEL table and take advantage of openTxn optimisation.\n"
+ "If the table is dropped HMS will switch this flag to false, any other value changes need a restart to take effect."),
TXN_USE_MIN_HISTORY_WRITE_ID("metastore.txn.use.minhistorywriteid", "hive.txn.use.minhistorywriteid", false,
TXN_USE_MIN_HISTORY_WRITE_ID("metastore.txn.use.minhistorywriteid", "hive.txn.use.minhistorywriteid", true,
"Set this to true, to avoid global minOpenTxn check in Cleaner.\n"
+ "If the table is dropped HMS will switch this flag to false."),
LOCK_NUMRETRIES("metastore.lock.numretries", "hive.lock.numretries", 100,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ private void updateStatus(CompactionInfo ci) throws MetaException {
String strState = CompactionState.fromSqlConst(ci.state).toString();

LOG.debug("Marking as {}: CompactionInfo: {}", strState, ci);
CompactionInfo ciActual = jdbcResource.execute(new GetCompactionInfoHandler(ci.id, false));
CompactionInfo ciActual = jdbcResource.execute(new GetCompactionInfoHandler(ci.id, false));

long endTime = getDbTime().getTime();
if (ciActual != null) {
Expand Down Expand Up @@ -505,7 +505,7 @@ public long findMinOpenTxnIdForCleaner() throws MetaException {
@RetrySemantics.Idempotent
@Deprecated
public long findMinTxnIdSeenOpen() {
if (!ConfVars.useMinHistoryLevel() || ConfVars.useMinHistoryWriteId()) {
if (!ConfVars.useMinHistoryLevel()) {
return Long.MAX_VALUE;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private ConfVars() {}
private boolean useMinHistoryWriteId;

public boolean useMinHistoryLevel() {
return useMinHistoryLevel;
return useMinHistoryLevel && !useMinHistoryWriteId;
}

public void setUseMinHistoryLevel(boolean useMinHistoryLevel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,18 @@
package org.apache.hadoop.hive.metastore.txn.jdbc.functions;

import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus;
import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource;
import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionalFunction;
import org.apache.hadoop.hive.metastore.txn.jdbc.queries.MinUncommittedTxnIdHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;

import java.sql.Types;

public class CleanTxnToWriteIdTableFunction implements TransactionalFunction<Void> {

private static final Logger LOG = LoggerFactory.getLogger(CleanTxnToWriteIdTableFunction.class);

//language=SQL
private static String minHistoryLevelSql = "SELECT MIN(\"RES\".\"ID\") AS \"ID\" FROM (" +
" SELECT MAX(\"TXN_ID\") + 1 AS \"ID\" FROM \"TXNS\"" +
" UNION" +
" SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = :abortedState) \"RES\"";
//language=SQL
private static String noMinHistoryLevelSql = "SELECT MIN(\"RES\".\"ID\") AS \"ID\" FROM (" +
" SELECT MAX(\"TXN_ID\") + 1 AS \"ID\" FROM \"TXNS\"" +
" UNION" +
" SELECT MIN(\"WS_TXNID\") AS \"ID\" FROM \"WRITE_SET\"" +
" UNION" +
" SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.ABORTED +
" OR \"TXN_STATE\" = " + TxnStatus.OPEN +
" ) \"RES\"";

private final long minTxnIdSeenOpen;

public CleanTxnToWriteIdTableFunction(long minTxnIdSeenOpen) {
Expand All @@ -56,32 +38,17 @@ public CleanTxnToWriteIdTableFunction(long minTxnIdSeenOpen) {

@Override
public Void execute(MultiDataSourceJdbcResource jdbcResource) throws MetaException {
NamedParameterJdbcTemplate jdbcTemplate = jdbcResource.getJdbcTemplate();
String sql = TxnHandler.ConfVars.useMinHistoryLevel() ? minHistoryLevelSql : noMinHistoryLevelSql;
MapSqlParameterSource params = new MapSqlParameterSource()
.addValue("abortedState", TxnStatus.ABORTED.getSqlConst(), Types.CHAR);
if (!TxnHandler.ConfVars.useMinHistoryLevel()) {
params.addValue("openState", TxnStatus.OPEN.getSqlConst(), Types.CHAR);
}

// First need to find the min_uncommitted_txnid which is currently seen by any open transactions.
// If there are no txns which are currently open or aborted in the system, then current value of
// max(TXNS.txn_id) could be min_uncommitted_txnid.
Long minTxnId = jdbcTemplate.query(sql, params, rs -> {
if (rs.next()) {
return rs.getLong(1);
} else {
return null;
}
});

Long minTxnId = jdbcResource.execute(new MinUncommittedTxnIdHandler());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existing MinUncommittedTxnIdHandler wasn't used for unknown reason

if (minTxnId == null) {
throw new MetaException("Transaction tables not properly initialized, no record found in TXNS");
}
long minUncommitedTxnid = Math.min(minTxnId, minTxnIdSeenOpen);

// As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed
// to clean up the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table.
NamedParameterJdbcTemplate jdbcTemplate = jdbcResource.getJdbcTemplate();
int rc = jdbcTemplate.update("DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" < :txnId",
new MapSqlParameterSource("txnId", minUncommitedTxnid));
LOG.info("Removed {} rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: {}", rc, minUncommitedTxnid);
Expand Down
Loading