Skip to content

Commit

Permalink
introduce broker entry metadta interceptor for adding broker side met…
Browse files Browse the repository at this point in the history
…adata
  • Loading branch information
aloyszhang committed Dec 10, 2020
1 parent a91c771 commit 397939b
Show file tree
Hide file tree
Showing 23 changed files with 361 additions and 204 deletions.
3 changes: 0 additions & 3 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,4 @@ packagesReplicas=1
# The bookkeeper ledger root path
packagesManagementLedgerRootPath=/ledger

# Enable raw metadata
brokerTimestampForMessageEnable=true

### --- Packages management service configuration variables (end) --- ###
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import com.google.common.base.Charsets;
import java.time.Clock;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
Expand All @@ -33,6 +35,7 @@
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;

import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;

/**
Expand Down Expand Up @@ -75,7 +78,7 @@ public class ManagedLedgerConfig {
private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
private int newEntriesCheckDelayInMillis = 10;
private Clock clock = Clock.systemUTC();
private boolean brokerTimestampForMessageEnable = false;
private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors = new HashSet<>();

public boolean isCreateIfMissing() {
return createIfMissing;
Expand Down Expand Up @@ -638,15 +641,15 @@ public void setNewEntriesCheckDelayInMillis(int newEntriesCheckDelayInMillis) {
this.newEntriesCheckDelayInMillis = newEntriesCheckDelayInMillis;
}

public boolean isBrokerTimestampForMessageEnable() {
return brokerTimestampForMessageEnable;
public Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
return brokerEntryMetadataInterceptors;
}

public void setBrokerTimestampForMessageEnable(boolean brokerTimestampForMessageEnable) {
this.brokerTimestampForMessageEnable = brokerTimestampForMessageEnable;
public void setBrokerEntryMetadataInterceptors(Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors) {
this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors;
}

public boolean isRawMetadataEnable() {
return this.brokerTimestampForMessageEnable;
public boolean isBrokerEntryMetaEnabled() {
return brokerEntryMetadataInterceptors.size() > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1357,8 +1357,8 @@ public synchronized void updateLedgersIdsComplete(Stat stat) {
// If op is used by another ledger handle, we need to close it and create a new one
if (existsOp.ledger != null) {
existsOp.close();
if (config.isRawMetadataEnable()) {
existsOp = OpAddEntry.create(existsOp.ml, existsOp.dataWithRawMetadata, existsOp.callback, existsOp.ctx);
if (config.isBrokerEntryMetaEnabled()) {
existsOp = OpAddEntry.create(existsOp.ml, existsOp.dataWithBrokerEntryMetadata, existsOp.callback, existsOp.ctx);
} else {
existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, existsOp.callback, existsOp.ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
volatile long lastInitTime;
@SuppressWarnings("unused")
ByteBuf data;
ByteBuf dataWithRawMetadata;
ByteBuf dataWithBrokerEntryMetadata;
private int dataLength;

private static final AtomicReferenceFieldUpdater<OpAddEntry, OpAddEntry.State> STATE_UPDATER = AtomicReferenceFieldUpdater
Expand Down Expand Up @@ -109,9 +109,10 @@ public void initiate() {
if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, State.INITIATED)) {

ByteBuf duplicateBuffer = data.retainedDuplicate();
if (ml.getConfig().isRawMetadataEnable()) {
duplicateBuffer = Commands.addRawMessageMetadata(duplicateBuffer);
dataWithRawMetadata = duplicateBuffer.retainedDuplicate();
if (ml.getConfig().isBrokerEntryMetaEnabled()) {
duplicateBuffer = Commands.addBrokerEntryMetadata(duplicateBuffer,
ml.getConfig().getBrokerEntryMetadataInterceptors());
dataWithBrokerEntryMetadata = duplicateBuffer.retainedDuplicate();
}
// internally asyncAddEntry() will take the ownership of the buffer and release it at the end
addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
Expand All @@ -129,8 +130,8 @@ public void failed(ManagedLedgerException e) {
cb.addFailed(e, ctx);
ml.mbean.recordAddEntryError();
}
if (dataWithRawMetadata != null) {
ReferenceCountUtil.release(dataWithRawMetadata);
if (dataWithBrokerEntryMetadata != null) {
ReferenceCountUtil.release(dataWithBrokerEntryMetadata);
}
}

Expand Down Expand Up @@ -180,8 +181,8 @@ public void safeRun() {
if (ml.hasActiveCursors()) {
// Avoid caching entries if no cursor has been created
EntryImpl entry = null;
if (ml.getConfig().isRawMetadataEnable()) {
entry = EntryImpl.create(ledger.getId(), entryId, dataWithRawMetadata);
if (ml.getConfig().isBrokerEntryMetaEnabled()) {
entry = EntryImpl.create(ledger.getId(), entryId, dataWithBrokerEntryMetadata);
} else {
entry = EntryImpl.create(ledger.getId(), entryId, data);
}
Expand All @@ -193,8 +194,8 @@ public void safeRun() {

// We are done using the byte buffer
ReferenceCountUtil.release(data);
if (dataWithRawMetadata != null) {
ReferenceCountUtil.release(dataWithRawMetadata);
if (dataWithBrokerEntryMetadata != null) {
ReferenceCountUtil.release(dataWithBrokerEntryMetadata);
}

PositionImpl lastEntry = PositionImpl.get(ledger.getId(), entryId);
Expand Down Expand Up @@ -306,7 +307,7 @@ public void recycle() {
ml = null;
ledger = null;
data = null;
dataWithRawMetadata = null;
dataWithBrokerEntryMetadata = null;
dataLength = -1;
callback = null;
ctx = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.util.internal.PlatformDependent;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -865,6 +866,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
"please enable the system topic first.")
private boolean topicLevelPoliciesEnabled = false;

@FieldContext(
category = CATEGORY_SERVER,
doc = "List of interceptors for broker metadata.")
private Set<String> brokerEntryMetadataInterceptors = new HashSet<>();

/***** --- TLS --- ****/
@FieldContext(
category = CATEGORY_TLS,
Expand Down Expand Up @@ -1428,11 +1434,6 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "Of course, this may degrade consumption throughput. Default is 10ms.")
private int managedLedgerNewEntriesCheckDelayInMillis = 10;

@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "Enable broker side timestamp for message. Default is false.")
private boolean brokerTimestampForMessageEnable = false;


/*** --- Load balancer --- ****/
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -1248,7 +1249,9 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
managedLedgerConfig.setBrokerTimestampForMessageEnable(serviceConfig.isBrokerTimestampForMessageEnable());
managedLedgerConfig.setBrokerEntryMetadataInterceptors(BrokerEntryMetadataUtils
.loadBrokerEntryMetadataInterceptors(serviceConfig.getBrokerEntryMetadataInterceptors(),
BrokerService.class.getClassLoader()));

OffloadPolicies nsLevelOffloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
OffloadPolicies offloadPolicies = OffloadPolicies.mergeConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
// so, we can get chance to call entry.release
metadataAndPayload.retain();
// skip raw message metadata since broker timestamp only used in broker side
Commands.skipRawMessageMetadataIfExist(metadataAndPayload);
Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
// skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification
if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) {
Commands.skipChecksumIfPresent(metadataAndPayload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2186,8 +2186,8 @@ boolean supportsAuthenticationRefresh() {
}


boolean supportRawMessageMetadata() {
return features != null && features.getSupportsRawMessageMetadata();
boolean supportBrokerMetadata() {
return features != null && features.getSupportsBrokerEntryMetadata();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void sendMessage(Entry entry) {
ByteBuf headersAndPayload = entry.getDataBuffer();
MessageImpl msg;
try {
msg = MessageImpl.deserializeSkipRawMetaData(headersAndPayload);
msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload);
} catch (Throwable t) {
log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", topicName,
localCluster, remoteCluster, entry.getPosition(), length, t.getMessage(), t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ public void expireMessages(int messageTTLInSeconds) {
messageTTLInSeconds);

cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
Pair<MessageImpl<byte[]>, PulsarApi.RawMessageMetadata> pair = null;
Pair<MessageImpl<byte[]>, PulsarApi.BrokerEntryMetadata> pair = null;
try {
pair = MessageImpl.deserializeWithRawMetaData(entry.getDataBuffer());
pair = MessageImpl.deserializeWithBrokerEntryMetaData(entry.getDataBuffer());
MessageImpl msg = pair.getLeft();
PulsarApi.RawMessageMetadata rawMessageMetadata = pair.getRight();
if (rawMessageMetadata != null) {
return isExpired(messageTTLInSeconds, rawMessageMetadata.getBrokerTimestamp());
PulsarApi.BrokerEntryMetadata brokerMetadata = pair.getRight();
if (brokerMetadata != null) {
return isExpired(messageTTLInSeconds, brokerMetadata.getBrokerTimestamp());
} else {
return msg.isExpired(messageTTLInSeconds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback
}

cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> {
Pair<MessageImpl<byte[]>, PulsarApi.RawMessageMetadata> pair = null;
Pair<MessageImpl<byte[]>, PulsarApi.BrokerEntryMetadata> pair = null;
try {
pair = MessageImpl.deserializeWithRawMetaData(entry.getDataBuffer());
pair = MessageImpl.deserializeWithBrokerEntryMetaData(entry.getDataBuffer());
MessageImpl msg = pair.getLeft();
PulsarApi.RawMessageMetadata rawMessageMetadata = pair.getRight();
if (rawMessageMetadata != null) {
return rawMessageMetadata.getBrokerTimestamp() < timestamp;
PulsarApi.BrokerEntryMetadata brokerMetadata = pair.getRight();
if (brokerMetadata != null) {
return brokerMetadata.getBrokerTimestamp() < timestamp;
} else {
return msg.getPublishTime() < timestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
ByteBuf headersAndPayload = entry.getDataBuffer();
MessageImpl msg;
try {
msg = MessageImpl.deserializeSkipRawMetaData(headersAndPayload);
msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload);
} catch (Throwable t) {
log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", topicName,
localCluster, remoteCluster, entry.getPosition(), length, t.getMessage(), t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2146,18 +2146,18 @@ public void terminateFailed(ManagedLedgerException exception, Object ctx) {
}

public boolean isOldestMessageExpired(ManagedCursor cursor, long messageTTLInSeconds) {
Pair<MessageImpl<byte[]>, PulsarApi.RawMessageMetadata> pair = null;
Pair<MessageImpl<byte[]>, PulsarApi.BrokerEntryMetadata> pair = null;
Entry entry = null;
boolean isOldestMessageExpired = false;
try {
entry = cursor.getNthEntry(1, IndividualDeletedEntries.Include);
if (entry != null) {
pair = MessageImpl.deserializeWithRawMetaData(entry.getDataBuffer());
pair = MessageImpl.deserializeWithBrokerEntryMetaData(entry.getDataBuffer());
MessageImpl msg = pair.getLeft();
PulsarApi.RawMessageMetadata rawMessageMetadata = pair.getRight();
PulsarApi.BrokerEntryMetadata brokerMetadata = pair.getRight();
if (messageTTLInSeconds != 0) {
if (rawMessageMetadata != null) {
isOldestMessageExpired = System.currentTimeMillis() > (rawMessageMetadata.getBrokerTimestamp()
if (brokerMetadata != null) {
isOldestMessageExpired = System.currentTimeMillis() > (brokerMetadata.getBrokerTimestamp()
+ TimeUnit.SECONDS.toMillis((long) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD)));
} else {
isOldestMessageExpired = System.currentTimeMillis() > (msg.getPublishTime()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import io.netty.buffer.UnpooledByteBufAllocator;

import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -51,6 +53,8 @@
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
Expand Down Expand Up @@ -228,17 +232,15 @@ void testPersistentMessageFinderWithBrokerTimestampForMessage() throws Exception
List<Entry> entryList = cursor.readEntries(3);
for (Entry entry : entryList) {
// entry has no raw metadata if BrokerTimestampForMessage is disable
assertNull(Commands.parseRawMetadataIfExist(entry.getDataBuffer()));
assertNull(Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer()));
}

result.reset();
cursor.close();
ledger.close();



ManagedLedgerConfig configNew = new ManagedLedgerConfig();
configNew.setBrokerTimestampForMessageEnable(true);
configNew.setBrokerEntryMetadataInterceptors(getBrokerEntryMetadataInterceptors());
ManagedLedger ledgerNew = factory.open(ledgerAndCursorNameForBrokerTimestampMessage, configNew);
ManagedCursorImpl cursorNew = (ManagedCursorImpl) ledgerNew.openCursor(ledgerAndCursorNameForBrokerTimestampMessage);
// build message which has publish time first
Expand Down Expand Up @@ -276,9 +278,9 @@ void testPersistentMessageFinderWithBrokerTimestampForMessage() throws Exception
List<Entry> entryListNew = cursorNew.readEntries(4);
for (Entry entry : entryListNew) {
// entry should have raw metadata since BrokerTimestampForMessage is enable
PulsarApi.RawMessageMetadata rawMessageMetadata = Commands.parseRawMetadataIfExist(entry.getDataBuffer());
assertNotNull(rawMessageMetadata);
assertTrue(rawMessageMetadata.getBrokerTimestamp() > timeAfterPublishTime);
PulsarApi.BrokerEntryMetadata brokerMetadata = Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer());
assertNotNull(brokerMetadata);
assertTrue(brokerMetadata.getBrokerTimestamp() > timeAfterPublishTime);
}

result.reset();
Expand All @@ -287,6 +289,12 @@ void testPersistentMessageFinderWithBrokerTimestampForMessage() throws Exception
factory.shutdown();
}

public Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
Set<String> interceptorNames = new HashSet<>();
interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");
return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames,
Thread.currentThread().getContextClassLoader());
}
/**
* It tests that message expiry doesn't get stuck if it can't read deleted ledger's entry.
*
Expand Down
Loading

0 comments on commit 397939b

Please sign in to comment.