Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client]Fixed getting an incorrect maxMessageSize value when accessing multiple clusters in the same process #22306

Merged
merged 5 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
Expand Down Expand Up @@ -3906,11 +3907,11 @@ public void testReleaseSemaphoreOnFailMessages() throws Exception {
.topic("persistent://my-property/my-ns/my-topic2");

@Cleanup
Producer<byte[]> producer = producerBuilder.create();
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)producerBuilder.create();
List<Future<MessageId>> futures = new ArrayList<>();

// Asynchronously produce messages
byte[] message = new byte[ClientCnx.getMaxMessageSize() + 1];
byte[] message = new byte[producer.getConnectionHandler().getMaxMessageSize() + 1];
for (int i = 0; i < maxPendingMessages + 10; i++) {
Future<MessageId> future = producer.sendAsync(message);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBufAllocator;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
Expand All @@ -33,7 +34,6 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -69,10 +69,12 @@ public void testProducerInvalidMessageMemoryRelease() throws Exception {
.create();
this.stopBroker();
try {
try (MockedStatic<ClientCnx> mockedStatic = Mockito.mockStatic(ClientCnx.class)) {
mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(8);
producer.send("memory-test".getBytes(StandardCharsets.UTF_8));
}
ConnectionHandler connectionHandler = Mockito.spy(producer.getConnectionHandler());
Field field = producer.getClass().getDeclaredField("connectionHandler");
field.setAccessible(true);
field.set(producer, connectionHandler);
when(connectionHandler.getMaxMessageSize()).thenReturn(8);
producer.send("memory-test".getBytes(StandardCharsets.UTF_8));
throw new IllegalStateException("can not reach here");
} catch (PulsarClientException.InvalidMessageException ex) {
PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBufAllocator;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
Expand All @@ -33,7 +34,6 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -72,24 +72,22 @@ public void testProducerSemaphoreInvalidMessage() throws Exception {
.maxPendingMessages(pendingQueueSize)
.enableBatching(true)
.create();

this.stopBroker();
try {
try (MockedStatic<ClientCnx> mockedStatic = Mockito.mockStatic(ClientCnx.class)) {
mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2);
producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
}
ConnectionHandler connectionHandler = Mockito.spy(producer.getConnectionHandler());
Field field = producer.getClass().getDeclaredField("connectionHandler");
field.setAccessible(true);
field.set(producer, connectionHandler);
when(connectionHandler.getMaxMessageSize()).thenReturn(2);
producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
throw new IllegalStateException("can not reach here");
} catch (PulsarClientException.InvalidMessageException ex) {
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
}

producer.conf.setBatchingEnabled(false);
try {
try (MockedStatic<ClientCnx> mockedStatic = Mockito.mockStatic(ClientCnx.class)) {
mockedStatic.when(ClientCnx::getMaxMessageSize).thenReturn(2);
producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
}
producer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
throw new IllegalStateException("can not reach here");
} catch (PulsarClientException.InvalidMessageException ex) {
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,17 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta
public boolean haveEnoughSpace(MessageImpl<?> msg) {
int messageSize = msg.getDataBuffer().readableBytes();
return (
(maxBytesInBatch <= 0 && (messageSize + currentBatchSizeBytes) <= ClientCnx.getMaxMessageSize())
(maxBytesInBatch <= 0 && (messageSize + currentBatchSizeBytes) <= getMaxMessageSize())
|| (maxBytesInBatch > 0 && (messageSize + currentBatchSizeBytes) <= maxBytesInBatch)
) && (maxNumMessagesInBatch <= 0 || numMessagesInBatch < maxNumMessagesInBatch);
}
protected int getMaxMessageSize() {
return (producer.getConnectionHandler().getMaxMessageSize());
}

protected boolean isBatchFull() {
return (maxBytesInBatch > 0 && currentBatchSizeBytes >= maxBytesInBatch)
|| (maxBytesInBatch <= 0 && currentBatchSizeBytes >= ClientCnx.getMaxMessageSize())
|| (maxBytesInBatch <= 0 && currentBatchSizeBytes >= getMaxMessageSize())
|| (maxNumMessagesInBatch > 0 && numMessagesInBatch >= maxNumMessagesInBatch);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public boolean add(MessageImpl<?> msg, SendCallback callback) {
lowestSequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
this.firstCallback = callback;
batchedMessageMetadataAndPayload = allocator.buffer(
Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
Math.min(maxBatchSize, getMaxMessageSize()));
updateAndReserveBatchAllocatedSize(batchedMessageMetadataAndPayload.capacity());
if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) {
currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits();
Expand Down Expand Up @@ -272,26 +272,26 @@ public OpSendMsg createOpSendMsg() throws IOException {
op.setBatchSizeByte(encryptedPayload.readableBytes());

// handle mgs size check as non-batched in `ProducerImpl.isMessageSizeExceeded`
if (op.getMessageHeaderAndPayloadSize() > ClientCnx.getMaxMessageSize()) {
if (op.getMessageHeaderAndPayloadSize() > getMaxMessageSize()) {
producer.semaphoreRelease(1);
producer.client.getMemoryLimitController().releaseMemory(
messages.get(0).getUncompressedSize() + batchAllocatedSizeBytes);
discard(new PulsarClientException.InvalidMessageException(
"Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
"Message size is bigger than " + getMaxMessageSize() + " bytes"));
return null;
}
lowestSequenceId = -1L;
return op;
}
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
if (encryptedPayload.readableBytes() > getMaxMessageSize()) {
producer.semaphoreRelease(messages.size());
messages.forEach(msg -> producer.client.getMemoryLimitController()
.releaseMemory(msg.getUncompressedSize()));
producer.client.getMemoryLimitController().releaseMemory(batchAllocatedSizeBytes);
discard(new PulsarClientException.InvalidMessageException(
"Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
"Message size is bigger than " + getMaxMessageSize() + " bytes"));
return null;
}
messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ public class ClientCnx extends PulsarHandler {
private volatile int numberOfRejectRequests = 0;

@Getter
private static int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;

private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
private final int maxNumberOfRejectedRequestPerConnection;
private final int rejectedRequestResetTimeSec = 60;
protected final int protocolVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.HandlerState.State;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +39,10 @@ public class ConnectionHandler {
AtomicReferenceFieldUpdater.newUpdater(ConnectionHandler.class, ClientCnx.class, "clientCnx");
@SuppressWarnings("unused")
private volatile ClientCnx clientCnx = null;
@Getter
@Setter
// Since the `clientCnx` variable will be set to null at some times, it is necessary to save this value here.
private volatile int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;

protected final HandlerState state;
protected final Backoff backoff;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ public void negativeAcknowledge(Message<?> message) {
@Override
public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
previousExceptions.clear();
getConnectionHandler().setMaxMessageSize(cnx.getMaxMessageSize());

final State state = getState();
if (state == State.Closing || state == State.Closed) {
Expand Down Expand Up @@ -1896,7 +1897,7 @@ private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetada
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
int uncompressedSize = msgMetadata.getUncompressedSize();
int payloadSize = payload.readableBytes();
if (checkMaxMessageSize && payloadSize > ClientCnx.getMaxMessageSize()) {
if (checkMaxMessageSize && payloadSize > getConnectionHandler().getMaxMessageSize()) {
// payload size is itself corrupted since it cannot be bigger than the MaxMessageSize
log.error("[{}][{}] Got corrupted payload message size {} at {}", topic, subscription, payloadSize,
messageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,6 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
this.userProvidedProducerName = StringUtils.isNotBlank(producerName);
this.partitionIndex = partitionIndex;
this.pendingMessages = createPendingMessagesQueue();
this.chunkMaxMessageSize = conf.getChunkMaxMessageSize() > 0
? Math.min(conf.getChunkMaxMessageSize(), ClientCnx.getMaxMessageSize())
: ClientCnx.getMaxMessageSize();
if (conf.getMaxPendingMessages() > 0) {
this.semaphore = Optional.of(new Semaphore(conf.getMaxPendingMessages(), true));
} else {
Expand Down Expand Up @@ -275,6 +272,9 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
.setMandatoryStop(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS)
.create(),
this);
this.chunkMaxMessageSize = conf.getChunkMaxMessageSize() > 0
? Math.min(conf.getChunkMaxMessageSize(), getMaxMessageSize())
: getMaxMessageSize();

grabCnx();
}
Expand Down Expand Up @@ -455,14 +455,14 @@ public void sendAsync(Message<?> message, SendCallback callback) {

// validate msg-size (For batching this will be check at the batch completion size)
int compressedSize = compressedPayload.readableBytes();
if (compressedSize > ClientCnx.getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
compressedPayload.release();
String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : "";
PulsarClientException.InvalidMessageException invalidMessageException =
new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds"
+ " %d bytes",
producerName, topic, compressedStr, compressedSize, ClientCnx.getMaxMessageSize()));
producerName, topic, compressedStr, compressedSize, getMaxMessageSize()));
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
return;
}
Expand Down Expand Up @@ -492,19 +492,19 @@ public void sendAsync(Message<?> message, SendCallback callback) {
int payloadChunkSize;
if (canAddToBatch(msg) || !conf.isChunkingEnabled()) {
totalChunks = 1;
payloadChunkSize = ClientCnx.getMaxMessageSize();
payloadChunkSize = getMaxMessageSize();
} else {
// Reserve current metadata size for chunk size to avoid message size overflow.
// NOTE: this is not strictly bounded, as metadata will be updated after chunking.
// So there is a small chance that the final message size is larger than ClientCnx.getMaxMessageSize().
// But it won't cause produce failure as broker have 10 KB padding space for these cases.
payloadChunkSize = ClientCnx.getMaxMessageSize() - msgMetadata.getSerializedSize();
payloadChunkSize = getMaxMessageSize() - msgMetadata.getSerializedSize();
if (payloadChunkSize <= 0) {
PulsarClientException.InvalidMessageException invalidMessageException =
new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s sends a message with %d bytes metadata that "
+ "exceeds %d bytes", producerName, topic,
msgMetadata.getSerializedSize(), ClientCnx.getMaxMessageSize()));
msgMetadata.getSerializedSize(), getMaxMessageSize()));
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
compressedPayload.release();
return;
Expand Down Expand Up @@ -1663,7 +1663,8 @@ public Iterator<OpSendMsg> iterator() {
@Override
public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
previousExceptions.clear();
chunkMaxMessageSize = Math.min(chunkMaxMessageSize, ClientCnx.getMaxMessageSize());
getConnectionHandler().setMaxMessageSize(cnx.getMaxMessageSize());
chunkMaxMessageSize = Math.min(chunkMaxMessageSize, getMaxMessageSize());

final long epoch;
synchronized (this) {
Expand Down Expand Up @@ -2323,18 +2324,22 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e
private boolean isMessageSizeExceeded(OpSendMsg op) {
if (op.msg != null && !conf.isChunkingEnabled()) {
int messageSize = op.getMessageHeaderAndPayloadSize();
if (messageSize > ClientCnx.getMaxMessageSize()) {
if (messageSize > getMaxMessageSize()) {
releaseSemaphoreForSendOp(op);
op.sendComplete(new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s sends a message with %d bytes that exceeds %d bytes",
producerName, topic, messageSize, ClientCnx.getMaxMessageSize()),
producerName, topic, messageSize, getMaxMessageSize()),
op.sequenceId));
return true;
}
}
return false;
}

private int getMaxMessageSize() {
return getConnectionHandler().getMaxMessageSize();
}

public long getDelayInMillis() {
OpSendMsg firstMsg = pendingMessages.peek();
if (firstMsg != null) {
Expand Down
Loading