diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBStoreHAManager.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBStoreHAManager.java index 48ffce3d1ed7..8bdb27f1569a 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBStoreHAManager.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBStoreHAManager.java @@ -28,4 +28,8 @@ public interface DBStoreHAManager { default Table getTransactionInfoTable() { return null; } + + default Table getFlushedTransactionsTable() { + return null; + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FlushedTransactionInfo.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FlushedTransactionInfo.java new file mode 100644 index 000000000000..89948f42de52 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FlushedTransactionInfo.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils; + +import java.util.Objects; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.utils.db.Codec; +import org.apache.hadoop.hdds.utils.db.DelegatedCodec; +import org.apache.hadoop.hdds.utils.db.Proto2Codec; +import org.apache.ratis.server.protocol.TermIndex; + +/** + * Represents information about a flushed transaction, including its term and transaction index. + * This class is a lightweight container used to track specific transaction metadata and provides + * methods for serialization and deserialization using a codec. + */ +public class FlushedTransactionInfo { + + private static final Codec CODEC = new DelegatedCodec<>( + Proto2Codec.get(HddsProtos.FlushedTransactionInfo.getDefaultInstance()), + FlushedTransactionInfo::getFromProtobuf, + FlushedTransactionInfo::getProtobuf, + FlushedTransactionInfo.class); + + private final long term; + private final long transactionIndex; + + FlushedTransactionInfo(TermIndex termIndex) { + this.transactionIndex = termIndex.getIndex(); + this.term = termIndex.getTerm(); + } + + public static FlushedTransactionInfo valueOf(long currentTerm, long transactionIndex) { + return valueOf(TermIndex.valueOf(currentTerm, transactionIndex)); + } + + public static FlushedTransactionInfo valueOf(TermIndex termIndex) { + return new FlushedTransactionInfo(termIndex); + } + + public static Codec getCodec() { + return CODEC; + } + + public long getTerm() { + return term; + } + + public long getTransactionIndex() { + return transactionIndex; + } + + public static FlushedTransactionInfo getFromProtobuf(HddsProtos.FlushedTransactionInfo transactionInfo) { + return new FlushedTransactionInfo(TermIndex.valueOf(transactionInfo.getTermIndex(), + transactionInfo.getTransactionId())); + } + + private HddsProtos.FlushedTransactionInfo getProtobuf() { + return HddsProtos.FlushedTransactionInfo.newBuilder().setTermIndex(this.getTerm()) + .setTransactionId(this.getTransactionIndex()).build(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FlushedTransactionInfo that = (FlushedTransactionInfo) o; + return this.getTerm() == that.getTerm() && this.getTransactionIndex() == that.getTransactionIndex(); + } + + @Override + public int hashCode() { + return Objects.hash(getTerm(), getTransactionIndex()); + } + + @Override + public String toString() { + return "FlushedTransactionInfo{" + + "term=" + term + + ", transactionIndex=" + transactionIndex + + '}'; + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/TransactionInfo.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/TransactionInfo.java index be0f81ac8b0c..86dda85c6f17 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/TransactionInfo.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/TransactionInfo.java @@ -37,7 +37,7 @@ *

* This class is immutable. */ -public final class TransactionInfo implements Comparable { +public class TransactionInfo implements Comparable { private static final Codec CODEC = new DelegatedCodec<>( StringCodec.get(), TransactionInfo::valueOf, @@ -99,7 +99,7 @@ public static TermIndex getTermIndex(long transactionIndex) { return TermIndex.valueOf(NON_RATIS_TERM, transactionIndex); } - private TransactionInfo(TermIndex termIndex) { + TransactionInfo(TermIndex termIndex) { this.transactionInfoString = termIndex.getTerm() + TRANSACTION_INFO_SPLIT_KEY + termIndex.getIndex(); this.snapshotInfo = new SnapshotInfo() { @Override diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBColumnFamilyDefinition.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBColumnFamilyDefinition.java index d5c61827258d..0e6159f2128b 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBColumnFamilyDefinition.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBColumnFamilyDefinition.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Function; import org.apache.hadoop.hdds.utils.CollectionUtils; import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType; import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions; @@ -80,6 +81,11 @@ public TypedTable getTable(DBStore db, CacheType cacheType) return db.getTable(tableName, keyCodec, valueCodec, cacheType); } + public TypedTable getTable(DBStore db, CacheType cacheType, Function keyValidator) + throws RocksDatabaseException, CodecException { + return db.getTable(tableName, keyCodec, valueCodec, cacheType, keyValidator); + } + public String getName() { return tableName; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java index 0fb91f42d90a..500a21c2aaac 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java @@ -20,6 +20,7 @@ import java.io.File; import java.util.List; import java.util.Map; +import java.util.function.Function; import org.apache.hadoop.hdds.annotation.InterfaceStability; import org.apache.hadoop.hdds.utils.db.cache.TableCache; import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType; @@ -64,6 +65,21 @@ TypedTable getTable( String name, Codec keyCodec, Codec valueCodec, TableCache.CacheType cacheType) throws RocksDatabaseException, CodecException; + /** + * Gets table store with implict key/value conversion. + * + * @param name - table name + * @param keyCodec - key codec + * @param valueCodec - value codec + * @param cacheType - cache type + * @param keyValidatorFunction - function to validate key before put/delete + * @return - Table Store + */ + TypedTable getTable( + String name, Codec keyCodec, Codec valueCodec, TableCache.CacheType cacheType, + Function keyValidatorFunction) throws RocksDatabaseException, CodecException; + + /** * Lists the Known list of Tables in a DB. * diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java index e3853a84211c..a76c0bbf9cee 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.RocksDBStoreMetrics; @@ -280,6 +281,13 @@ public TypedTable getTable( return new TypedTable<>(getTable(name), keyCodec, valueCodec, cacheType); } + @Override + public TypedTable getTable( + String name, Codec keyCodec, Codec valueCodec, TableCache.CacheType cacheType, + Function keyValidator) throws RocksDatabaseException, CodecException { + return new TypedTable<>(getTable(name), keyCodec, valueCodec, cacheType, keyValidator); + } + @Override public List> listTables() { final List> returnList = new ArrayList<>(); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index cd02c91ecb30..60727ded6586 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Function; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter; import org.apache.hadoop.hdds.utils.TableCacheMetrics; @@ -66,6 +67,12 @@ public class TypedTable implements Table { private final CodecBuffer.Capacity bufferCapacity = new CodecBuffer.Capacity(this, BUFFER_SIZE_DEFAULT); private final TableCache cache; + private final Function keyValidatorFunction; + + TypedTable(RDBTable rawTable, Codec keyCodec, Codec valueCodec, CacheType cacheType) + throws RocksDatabaseException, CodecException { + this(rawTable, keyCodec, valueCodec, cacheType, (k) -> true); + } /** * Create an TypedTable from the raw table with specified cache type. @@ -74,13 +81,14 @@ public class TypedTable implements Table { * @param keyCodec The key codec. * @param valueCodec The value codec. * @param cacheType How to cache the entries? + * @param keyValidatorFunction A function to validate the key before performing a write operation. */ - TypedTable(RDBTable rawTable, Codec keyCodec, Codec valueCodec, CacheType cacheType) - throws RocksDatabaseException, CodecException { + TypedTable(RDBTable rawTable, Codec keyCodec, Codec valueCodec, CacheType cacheType, + Function keyValidatorFunction) throws RocksDatabaseException, CodecException { this.rawTable = Objects.requireNonNull(rawTable, "rawTable==null"); this.keyCodec = Objects.requireNonNull(keyCodec, "keyCodec == null"); this.valueCodec = Objects.requireNonNull(valueCodec, "valueCodec == null"); - + this.keyValidatorFunction = keyValidatorFunction; this.info = getClassSimpleName(getClass()) + "-" + getName() + "(" + getClassSimpleName(keyCodec.getTypeClass()) + "->" + getClassSimpleName(valueCodec.getTypeClass()) + ")"; @@ -132,6 +140,7 @@ private VALUE decodeValue(byte[] value) throws CodecException { @Override public void put(KEY key, VALUE value) throws RocksDatabaseException, CodecException { + assert this.keyValidatorFunction.apply(key); if (supportCodecBuffer) { try (CodecBuffer k = keyCodec.toDirectCodecBuffer(key); CodecBuffer v = valueCodec.toDirectCodecBuffer(value)) { @@ -144,6 +153,7 @@ public void put(KEY key, VALUE value) throws RocksDatabaseException, CodecExcept @Override public void putWithBatch(BatchOperation batch, KEY key, VALUE value) throws RocksDatabaseException, CodecException { + assert this.keyValidatorFunction.apply(key); if (supportCodecBuffer) { CodecBuffer keyBuffer = null; CodecBuffer valueBuffer = null; @@ -366,6 +376,7 @@ private VALUE getFromTableIfExist(KEY key) throws RocksDatabaseException, CodecE @Override public void delete(KEY key) throws RocksDatabaseException, CodecException { + assert this.keyValidatorFunction.apply(key); if (keyCodec.supportCodecBuffer()) { try (CodecBuffer buffer = keyCodec.toDirectCodecBuffer(key)) { rawTable.delete(buffer.asReadOnlyByteBuffer()); @@ -377,16 +388,19 @@ public void delete(KEY key) throws RocksDatabaseException, CodecException { @Override public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException { + assert this.keyValidatorFunction.apply(key); rawTable.deleteWithBatch(batch, encodeKey(key)); } @Override public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException { + assert this.keyValidatorFunction.apply(beginKey) && this.keyValidatorFunction.apply(endKey); rawTable.deleteRangeWithBatch(batch, encodeKey(beginKey), encodeKey(endKey)); } @Override public void deleteRange(KEY beginKey, KEY endKey) throws RocksDatabaseException, CodecException { + assert this.keyValidatorFunction.apply(beginKey) && this.keyValidatorFunction.apply(endKey); rawTable.deleteRange(encodeKey(beginKey), encodeKey(endKey)); } diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index ef76205d91f7..a7f8934923ab 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -550,3 +550,8 @@ message InnerNode { optional uint32 numOfLeaves = 2; repeated ChildrenMap childrenMap = 3; } + +message FlushedTransactionInfo { + required uint64 termIndex = 1; + required uint64 transactionId = 2; +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java index 4aae413c0c2b..154490325b07 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.utils.FlushedTransactionInfo; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.hdds.utils.db.ByteStringCodec; import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition; @@ -75,6 +76,13 @@ public class SCMDBDefinition extends DBDefinition.WithMap { ContainerID.getCodec(), ContainerInfo.getCodec()); + public static final DBColumnFamilyDefinition + FLUSHEDTRANSACTIONS = + new DBColumnFamilyDefinition<>( + "scmFlushedTransactions", + LongCodec.get(), + FlushedTransactionInfo.getCodec()); + public static final DBColumnFamilyDefinition TRANSACTIONINFO = new DBColumnFamilyDefinition<>( @@ -123,6 +131,7 @@ public class SCMDBDefinition extends DBDefinition.WithMap { PIPELINES, SEQUENCE_ID, STATEFUL_SERVICE_CONFIG, + FLUSHEDTRANSACTIONS, TRANSACTIONINFO, VALID_CERTS, VALID_SCM_CERTS); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java index 254ce10edbe4..4997a54ca0c3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CONTAINERS; import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.DELETED_BLOCKS; +import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.FLUSHEDTRANSACTIONS; import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.META; import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.MOVE; import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES; @@ -43,12 +44,14 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.utils.FlushedTransactionInfo; import org.apache.hadoop.hdds.utils.HAUtils; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.hdds.utils.db.BatchOperationHandler; import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.utils.db.DBStoreBuilder; import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.cache.TableCache; import org.apache.ratis.util.ExitUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +72,8 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore { private Table pipelineTable; + private Table flushedTransactionsTable; + private Table transactionInfoTable; private Table sequenceIdTable; @@ -142,6 +147,10 @@ public void start(OzoneConfiguration config) checkAndPopulateTable(containerTable, CONTAINERS.getName()); + flushedTransactionsTable = FLUSHEDTRANSACTIONS.getTable(store, TableCache.CacheType.PARTIAL_CACHE, + (tid) -> tid >= 0); + checkAndPopulateTable(flushedTransactionsTable, FLUSHEDTRANSACTIONS.getName()); + transactionInfoTable = TRANSACTIONINFO.getTable(store); checkAndPopulateTable(transactionInfoTable, TRANSACTIONINFO.getName()); @@ -203,6 +212,11 @@ public Table getTransactionInfoTable() { return transactionInfoTable; } + @Override + public Table getFlushedTransactionsTable() { + return flushedTransactionsTable; + } + @Override public BatchOperationHandler getBatchHandler() { return this.store; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestSCMMetadataStore.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestSCMMetadataStore.java new file mode 100644 index 000000000000..ef991339e448 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestSCMMetadataStore.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.metadata; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.File; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.FlushedTransactionInfo; +import org.apache.hadoop.hdds.utils.db.CodecException; +import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; +import org.apache.hadoop.hdds.utils.db.Table; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Tests SCM MetadataManager. + */ +public class TestSCMMetadataStore { + private SCMMetadataStore scmMetadataStore; + private OzoneConfiguration ozoneConfiguration; + @TempDir + private File folder; + + @BeforeEach + public void setup() throws Exception { + ozoneConfiguration = new OzoneConfiguration(); + ozoneConfiguration.set(HddsConfigKeys.OZONE_METADATA_DIRS, folder.getPath()); + scmMetadataStore = new SCMMetadataStoreImpl(ozoneConfiguration); + } + + @Test + public void testFlushedTransactionTable() throws RocksDatabaseException, CodecException { + List flushedTransactionInfos = new ArrayList<>(); + Assertions.assertThrows(AssertionError.class, () -> { + scmMetadataStore.getFlushedTransactionsTable().put(-100L, FlushedTransactionInfo.valueOf(-100L, -100L)); + }); + for (long i = 10; i >= 0; i--) { + FlushedTransactionInfo flushedTransactionInfo = FlushedTransactionInfo.valueOf(i, i); + scmMetadataStore.getFlushedTransactionsTable().put(i, flushedTransactionInfo); + flushedTransactionInfos.add(flushedTransactionInfo); + } + flushedTransactionInfos.sort(Comparator.comparingLong(FlushedTransactionInfo::getTransactionIndex)); + List dbList = new ArrayList<>(); + try (Table.KeyValueIterator itr = + scmMetadataStore.getFlushedTransactionsTable().iterator()) { + while (itr.hasNext()) { + dbList.add(itr.next().getValue()); + } + } + assertEquals(flushedTransactionInfos, dbList); + } + +} diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java index a8e5b2dab418..d19e7e73a44c 100644 --- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java +++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.DBStoreHAManager; +import org.apache.hadoop.hdds.utils.FlushedTransactionInfo; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.utils.db.Table; @@ -455,6 +456,9 @@ String getMultipartKeyFSO(String volume, String bucket, String key, String @Override Table getTransactionInfoTable(); + @Override + Table getFlushedTransactionsTable(); + Table getTenantAccessIdTable(); Table getPrincipalToAccessIdsTable(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index 417241c9e4fe..36f069dd3cc9 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -57,11 +57,13 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.FlushedTransactionInfo; import org.apache.hadoop.hdds.utils.TableCacheMetrics; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.hdds.utils.db.BatchOperation; @@ -152,6 +154,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager, private TypedTable dTokenTable; private TypedTable prefixTable; private TypedTable transactionInfoTable; + private TypedTable flushedTransactionsTable; private TypedTable metaTable; // Tables required for multi-tenancy @@ -442,6 +445,8 @@ protected void initializeOmTables(CacheType cacheType, prefixTable = initializer.get(OMDBDefinition.PREFIX_TABLE_DEF); transactionInfoTable = initializer.get(OMDBDefinition.TRANSACTION_INFO_TABLE_DEF); + flushedTransactionsTable = initializer.get(OMDBDefinition.FLUSHED_TRANSACTIONS_DEF, CacheType.PARTIAL_CACHE, + (tid) -> tid >= 0); metaTable = initializer.get(OMDBDefinition.META_TABLE_DEF); @@ -1598,6 +1603,11 @@ public Table getTransactionInfoTable() { return transactionInfoTable; } + @Override + public TypedTable getFlushedTransactionsTable() { + return flushedTransactionsTable; + } + @Override public Table getMetaTable() { return metaTable; @@ -1851,6 +1861,12 @@ TypedTable get(DBColumnFamilyDefinition def return get(definition.getTable(store, cacheType)); } + TypedTable get(DBColumnFamilyDefinition definition, CacheType cacheType, + Function keyValidator) + throws IOException { + return get(definition.getTable(store, cacheType, keyValidator)); + } + private TypedTable get(TypedTable table) { Objects.requireNonNull(table, "table == null"); final String name = table.getName(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java index 9894e8f5d6bf..7a111c15efcc 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.hadoop.hdds.utils.FlushedTransactionInfo; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition; import org.apache.hadoop.hdds.utils.db.DBDefinition; @@ -191,6 +192,11 @@ public final class OMDBDefinition extends DBDefinition.WithMap { StringCodec.get(), TransactionInfo.getCodec()); + /** flushedTransactionTable: TransactionIndex :- TermIndex. */ + public static final String FLUSHED_TRANSACTIONS = "flushedTransactionsTable"; + public static final DBColumnFamilyDefinition FLUSHED_TRANSACTIONS_DEF = + new DBColumnFamilyDefinition<>(FLUSHED_TRANSACTIONS, LongCodec.get(), FlushedTransactionInfo.getCodec()); + public static final String META_TABLE = "metaTable"; /** metaTable: metaDataKey :- metaDataValue. */ public static final DBColumnFamilyDefinition META_TABLE_DEF @@ -339,7 +345,8 @@ public final class OMDBDefinition extends DBDefinition.WithMap { TENANT_STATE_TABLE_DEF, TRANSACTION_INFO_TABLE_DEF, USER_TABLE_DEF, - VOLUME_TABLE_DEF); + VOLUME_TABLE_DEF, + FLUSHED_TRANSACTIONS_DEF); private static final OMDBDefinition INSTANCE = new OMDBDefinition(); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java index bebc58807888..dbecfa09a362 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java @@ -31,6 +31,7 @@ import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DIRECTORY_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FLUSHED_TRANSACTIONS; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.META_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_INFO_TABLE; @@ -63,6 +64,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -76,7 +78,11 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.StorageType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.utils.FlushedTransactionInfo; import org.apache.hadoop.hdds.utils.TransactionInfo; +import org.apache.hadoop.hdds.utils.db.CodecException; +import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; +import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.om.codec.OMDBDefinition; @@ -102,6 +108,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PartKeyInfo; import org.apache.hadoop.ozone.snapshot.ListSnapshotResponse; import org.apache.hadoop.util.Time; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -137,7 +144,8 @@ public class TestOmMetadataManager { TENANT_STATE_TABLE, SNAPSHOT_INFO_TABLE, SNAPSHOT_RENAMED_TABLE, - COMPACTION_LOG_TABLE + COMPACTION_LOG_TABLE, + FLUSHED_TRANSACTIONS }; private OMMetadataManager omMetadataManager; @@ -171,6 +179,29 @@ public void testTransactionTable() throws Exception { assertEquals(250, transactionInfo.getTransactionIndex()); } + @Test + public void testFlushedTransactionTable() throws RocksDatabaseException, CodecException { + + List flushedTransactionInfos = new ArrayList<>(); + Assertions.assertThrows(AssertionError.class, () -> { + omMetadataManager.getFlushedTransactionsTable().put(-100L, FlushedTransactionInfo.valueOf(-100L, -100L)); + }); + for (long i = 10; i >= 0; i--) { + FlushedTransactionInfo flushedTransactionInfo = FlushedTransactionInfo.valueOf(i, i); + omMetadataManager.getFlushedTransactionsTable().put(i, flushedTransactionInfo); + flushedTransactionInfos.add(flushedTransactionInfo); + } + flushedTransactionInfos.sort(Comparator.comparingLong(FlushedTransactionInfo::getTransactionIndex)); + List dbList = new ArrayList<>(); + try (Table.KeyValueIterator itr = + omMetadataManager.getFlushedTransactionsTable().iterator()) { + while (itr.hasNext()) { + dbList.add(itr.next().getValue()); + } + } + assertEquals(flushedTransactionInfos, dbList); + } + @Test public void testListVolumes() throws Exception { String ownerName = "owner";