From f95aad540d13132c7e248100536e236a2473608a Mon Sep 17 00:00:00 2001 From: Symious Date: Mon, 3 Jan 2022 15:30:47 +0800 Subject: [PATCH 1/5] HDDS-6147. Add ability in OM to get limited delta updates --- .../apache/hadoop/hdds/utils/db/DBStore.java | 10 ++++++++ .../apache/hadoop/hdds/utils/db/RDBStore.java | 11 +++++++++ .../hadoop/hdds/utils/db/TestRDBStore.java | 24 +++++++++++++++++++ .../src/main/proto/OmClientProtocol.proto | 1 + .../apache/hadoop/ozone/om/OzoneManager.java | 6 ++++- 5 files changed, 51 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java index f0096ed9d83a..2ac2bdc73062 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java @@ -187,4 +187,14 @@ void move(KEY sourceKey, KEY destKey, VALUE value, */ DBUpdatesWrapper getUpdatesSince(long sequenceNumber) throws SequenceNumberNotFoundException; + + /** + * Get limited data written to DB since a specific sequence number. + * @param sequenceNumber + * @param limitCount + * @return + * @throws SequenceNumberNotFoundException + */ + DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount) + throws SequenceNumberNotFoundException; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java index b50b46225e09..eb71ec178388 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java @@ -382,7 +382,15 @@ public CodecRegistry getCodecRegistry() { @Override public DBUpdatesWrapper getUpdatesSince(long sequenceNumber) throws SequenceNumberNotFoundException { + return getUpdatesSince(sequenceNumber, Long.MAX_VALUE); + } + @Override + public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount) + throws SequenceNumberNotFoundException { + if (limitCount <= 0) { + throw new IllegalArgumentException("Illegal count for getUpdatesSince."); + } DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper(); try { TransactionLogIterator transactionLogIterator = @@ -415,6 +423,9 @@ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber) } dbUpdatesWrapper.addWriteBatch(result.writeBatch().data(), result.sequenceNumber()); + if (currSequenceNumber - sequenceNumber >= limitCount) { + break; + } transactionLogIterator.next(); } } catch (RocksDBException e) { diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java index 34d348f416d1..f95a8ff0520b 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java @@ -348,6 +348,30 @@ public void testGetDBUpdatesSince() throws Exception { } } + @Test + public void testGetDBUpdatesSinceWithLimitCount() throws Exception { + + try (RDBStore newStore = + new RDBStore(folder.newFolder(), options, configSet)) { + + try (Table firstTable = newStore.getTable(families.get(1))) { + firstTable.put( + org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key1"), + org.apache.commons.codec.binary.StringUtils + .getBytesUtf16("Value1")); + firstTable.put( + org.apache.commons.codec.binary.StringUtils.getBytesUtf16("Key2"), + org.apache.commons.codec.binary.StringUtils + .getBytesUtf16("Value2")); + } + Assert.assertTrue( + newStore.getDb().getLatestSequenceNumber() == 2); + + DBUpdatesWrapper dbUpdatesSince = newStore.getUpdatesSince(0, 1); + Assert.assertEquals(1, dbUpdatesSince.getData().size()); + } + } + @Test public void testDowngrade() throws Exception { diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index 1dd922c8655d..e69f08dab29c 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -1098,6 +1098,7 @@ message ServiceListRequest { message DBUpdatesRequest { required uint64 sequenceNumber = 1; + optional uint64 limitCount = 2; } message ServiceListResponse { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 1640808d8e84..51e00bfb7289 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -3489,8 +3489,12 @@ public boolean isRatisEnabled() { public DBUpdates getDBUpdates( DBUpdatesRequest dbUpdatesRequest) throws SequenceNumberNotFoundException { + long limitCount = Long.MAX_VALUE; + if (dbUpdatesRequest.hasSequenceNumber()) { + limitCount = dbUpdatesRequest.getLimitCount(); + } DBUpdatesWrapper updatesSince = metadataManager.getStore() - .getUpdatesSince(dbUpdatesRequest.getSequenceNumber()); + .getUpdatesSince(dbUpdatesRequest.getSequenceNumber(), limitCount); DBUpdates dbUpdates = new DBUpdates(updatesSince.getData()); dbUpdates.setCurrentSequenceNumber(updatesSince.getCurrentSequenceNumber()); return dbUpdates; From 2b6b60a31d1266efd7bae859476640a600ea22d8 Mon Sep 17 00:00:00 2001 From: Symious Date: Mon, 3 Jan 2022 17:35:11 +0800 Subject: [PATCH 2/5] trigger new CI check From 7815f1a8941260dcc9731d640d521f4b1daa83f5 Mon Sep 17 00:00:00 2001 From: Symious Date: Mon, 3 Jan 2022 21:04:41 +0800 Subject: [PATCH 3/5] HDDS-6147. Fix typo --- .../src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 51e00bfb7289..900babde032a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -3490,7 +3490,7 @@ public DBUpdates getDBUpdates( DBUpdatesRequest dbUpdatesRequest) throws SequenceNumberNotFoundException { long limitCount = Long.MAX_VALUE; - if (dbUpdatesRequest.hasSequenceNumber()) { + if (dbUpdatesRequest.hasLimitCount()) { limitCount = dbUpdatesRequest.getLimitCount(); } DBUpdatesWrapper updatesSince = metadataManager.getStore() From 4a16385cc881336423ac6b0c7b0b3bef1d70b963 Mon Sep 17 00:00:00 2001 From: Symious Date: Mon, 3 Jan 2022 22:28:07 +0800 Subject: [PATCH 4/5] trigger new CI check From 61f44c1d3da495cce565c3b72ba77063fa6950a4 Mon Sep 17 00:00:00 2001 From: Symious Date: Tue, 4 Jan 2022 06:36:39 +0800 Subject: [PATCH 5/5] trigger new CI check