Skip to content

Commit

Permalink
Configurable data source for offloaded messages (#8717)
Browse files Browse the repository at this point in the history
Fix issue: #8591

This PR include:
* API change in command tools
* Related implementation with tests
* Related docs in cookbook

By the way  log4j dependency is removed for module `managed-ledger` because now the whole project use log4j2 as the default logger framework.

(cherry picked from commit 7c09f5c)
  • Loading branch information
Renkai authored and codelipenghui committed Jan 31, 2021
1 parent cb55dae commit efff0ee
Show file tree
Hide file tree
Showing 16 changed files with 313 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
Expand Down Expand Up @@ -1554,7 +1555,18 @@ CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {

LedgerInfo info = ledgers.get(ledgerId);
CompletableFuture<ReadHandle> openFuture = new CompletableFuture<>();
if (info != null && info.hasOffloadContext() && info.getOffloadContext().getComplete()) {

if (config.getLedgerOffloader() != null
&& config.getLedgerOffloader().getOffloadPolicies() != null
&& config.getLedgerOffloader().getOffloadPolicies()
.getManagedLedgerOffloadedReadPriority() == OffloadedReadPriority.BOOKKEEPER_FIRST
&& info != null && info.hasOffloadContext()
&& !info.getOffloadContext().getBookkeeperDeleted()) {
openFuture = bookKeeper.newOpenLedgerOp().withRecovery(!isReadOnly()).withLedgerId(ledgerId)
.withDigestType(config.getDigestType()).withPassword(config.getPassword()).execute();

} else if (info != null && info.hasOffloadContext() && info.getOffloadContext().getComplete()) {

UUID uid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb());
// TODO: improve this to load ledger offloader by driver name recorded in metadata
Map<String, String> offloadDriverMetadata = OffloadUtils.getOffloadDriverMetadata(info);
Expand Down
4 changes: 2 additions & 2 deletions managed-ledger/src/main/proto/MLDataFormats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ message ManagedLedgerInfo {
optional int64 timestamp = 4;
optional OffloadContext offloadContext = 5;
}
repeated LedgerInfo ledgerInfo = 1;

repeated LedgerInfo ledgerInfo = 1;

// If present, it signals the managed ledger has been
// terminated and this was the position of the last
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import static org.testng.Assert.assertEquals;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;

import io.netty.buffer.ByteBuf;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -39,7 +40,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
Expand All @@ -52,10 +52,12 @@
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.util.MockClock;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand All @@ -69,53 +71,140 @@ public void testOffloadRead() throws Exception {
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
config.setLedgerOffloader(offloader);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", config);

for (int i = 0; i < 25; i++) {
String content = "entry-" + i;
ledger.addEntry(content.getBytes());
}
Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3);
assertEquals(ledger.getLedgersInfoAsList().size(), 3);

ledger.offloadPrefix(ledger.getLastConfirmedEntry());

Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3);
Assert.assertEquals(ledger.getLedgersInfoAsList().stream()
.filter(e -> e.getOffloadContext().getComplete()).count(), 2);
assertEquals(ledger.getLedgersInfoAsList().size(), 3);
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete());
Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete());
Assert.assertFalse(ledger.getLedgersInfoAsList().get(2).getOffloadContext().getComplete());

UUID firstLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidMsb(),
ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb());
ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb());
UUID secondLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidMsb(),
ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidLsb());
ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidLsb());

ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
int i = 0;
for (Entry e : cursor.readEntries(10)) {
Assert.assertEquals(new String(e.getData()), "entry-" + i++);
assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(1))
.readOffloaded(anyLong(), any(), anyMap());
verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID), anyMap());

for (Entry e : cursor.readEntries(10)) {
Assert.assertEquals(new String(e.getData()), "entry-" + i++);
assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(2))
.readOffloaded(anyLong(), any(), anyMap());
.readOffloaded(anyLong(), any(), anyMap());
verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap());

for (Entry e : cursor.readEntries(5)) {
Assert.assertEquals(new String(e.getData()), "entry-" + i++);
assertEquals(new String(e.getData()), "entry-" + i++);
}
verify(offloader, times(2))
.readOffloaded(anyLong(), any(), anyMap());
.readOffloaded(anyLong(), any(), anyMap());
}

@Test
public void testBookkeeperFirstOffloadRead() throws Exception {
MockLedgerOffloader offloader = spy(new MockLedgerOffloader());
MockClock clock = new MockClock();
offloader.getOffloadPolicies()
.setManagedLedgerOffloadedReadPriority(OffloadedReadPriority.BOOKKEEPER_FIRST);
//delete after 5 minutes
offloader.getOffloadPolicies()
.setManagedLedgerOffloadDeletionLagInMillis(300000L);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
config.setLedgerOffloader(offloader);
config.setClock(clock);


ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_bookkeeper_first_test_ledger", config);

for (int i = 0; i < 25; i++) {
String content = "entry-" + i;
ledger.addEntry(content.getBytes());
}
assertEquals(ledger.getLedgersInfoAsList().size(), 3);

ledger.offloadPrefix(ledger.getLastConfirmedEntry());

assertEquals(ledger.getLedgersInfoAsList().size(), 3);
assertEquals(ledger.getLedgersInfoAsList().stream()
.filter(e -> e.getOffloadContext().getComplete()).count(), 2);

LedgerInfo firstLedger = ledger.getLedgersInfoAsList().get(0);
Assert.assertTrue(firstLedger.getOffloadContext().getComplete());
LedgerInfo secondLedger;
secondLedger = ledger.getLedgersInfoAsList().get(1);
Assert.assertTrue(secondLedger.getOffloadContext().getComplete());

UUID firstLedgerUUID = new UUID(firstLedger.getOffloadContext().getUidMsb(),
firstLedger.getOffloadContext().getUidLsb());
UUID secondLedgerUUID = new UUID(secondLedger.getOffloadContext().getUidMsb(),
secondLedger.getOffloadContext().getUidLsb());

ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
int i = 0;
for (Entry e : cursor.readEntries(10)) {
Assert.assertEquals(new String(e.getData()), "entry-" + i++);
}
// For offloaded first and not deleted ledgers, they should be read from bookkeeper.
verify(offloader, never())
.readOffloaded(anyLong(), any(), anyMap());

// Delete offladed message from bookkeeper
assertEventuallyTrue(() -> bkc.getLedgers().contains(firstLedger.getLedgerId()));
assertEventuallyTrue(() -> bkc.getLedgers().contains(secondLedger.getLedgerId()));
clock.advance(6, TimeUnit.MINUTES);
CompletableFuture<Void> promise = new CompletableFuture<>();
ledger.internalTrimConsumedLedgers(promise);
promise.join();

// assert bk ledger is deleted
assertEventuallyTrue(() -> !bkc.getLedgers().contains(firstLedger.getLedgerId()));
assertEventuallyTrue(() -> !bkc.getLedgers().contains(secondLedger.getLedgerId()));
Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getBookkeeperDeleted());
Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getBookkeeperDeleted());

for (Entry e : cursor.readEntries(10)) {
Assert.assertEquals(new String(e.getData()), "entry-" + i++);
}

// Ledgers deleted from bookkeeper, now should read from offloader
verify(offloader, atLeastOnce())
.readOffloaded(anyLong(), any(), anyMap());
verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap());

}


static class MockLedgerOffloader implements LedgerOffloader {
ConcurrentHashMap<UUID, ReadHandle> offloads = new ConcurrentHashMap<UUID, ReadHandle>();


OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "", "",
null, null,
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY);


@Override
public String getOffloadDriverName() {
return "mock";
Expand Down Expand Up @@ -150,7 +239,7 @@ public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid,

@Override
public OffloadPolicies getOffloadPolicies() {
return null;
return offloadPolicies;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,19 @@
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.ImmutableSet;

import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;

import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
Expand All @@ -51,12 +48,10 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.tuple.Pair;

import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.zookeeper.MockZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.testng.annotations.Test;

public class OffloadPrefixTest extends MockedBookKeeperTestCase {
Expand Down Expand Up @@ -1002,7 +997,8 @@ Set<Long> deletedOffloads() {
OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES,
OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES,
OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS);
OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS,
OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY);

@Override
public String getOffloadDriverName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import io.netty.util.internal.PlatformDependent;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand All @@ -34,14 +32,15 @@
import lombok.Setter;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.configuration.Category;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.sasl.SaslConstants;

/**
Expand Down Expand Up @@ -1433,17 +1432,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "Of course, this may degrade consumption throughput. Default is 10ms.")
private int managedLedgerNewEntriesCheckDelayInMillis = 10;

@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "Read priority when ledgers exists in both bookkeeper and the second layer storage.")
private String managedLedgerDataReadPriority = OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST
.getValue();

/*** --- Load balancer --- ****/
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "Enable load balancer"
category = CATEGORY_LOAD_BALANCER,
doc = "Enable load balancer"
)
private boolean loadBalancerEnabled = true;
@Deprecated
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
deprecated = true,
doc = "load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl)"
category = CATEGORY_LOAD_BALANCER,
deprecated = true,
doc = "load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl)"
)
private String loadBalancerPlacementStrategy = "leastLoadedServer"; // weighted random selection

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,11 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

import com.google.common.collect.Sets;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand Down Expand Up @@ -165,10 +162,11 @@ public void testOffloadPolicies() throws Exception {
String endpoint = "test-endpoint";
long offloadThresholdInBytes = 0;
long offloadDeletionLagInMillis = 100L;
OffloadPolicies.OffloadedReadPriority priority = OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST;

OffloadPolicies offload1 = OffloadPolicies.create(
driver, region, bucket, endpoint, null, null,
100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis);
100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis, priority);
admin.namespaces().setOffloadPolicies(namespaceName, offload1);
OffloadPolicies offload2 = admin.namespaces().getOffloadPolicies(namespaceName);
assertEquals(offload1, offload2);
Expand Down Expand Up @@ -214,7 +212,7 @@ public void testTopicLevelOffloadPartitioned() throws Exception {
Thread.sleep(2000);
testOffload(true);
}

@Test
public void testTopicLevelOffloadNonPartitioned() throws Exception {
//wait for cache init
Expand Down
Loading

0 comments on commit efff0ee

Please sign in to comment.