Skip to content

Commit

Permalink
[fix][broker] Fix ProducerBusy issue due to incorrect userCreatedProd…
Browse files Browse the repository at this point in the history
…ucerCount on non-persistent topic (#22685)

Co-authored-by: ruihongzhou <[email protected]>
(cherry picked from commit 253e650)
  • Loading branch information
hrzzzz authored and lhotari committed May 14, 2024
1 parent 20483f5 commit 89b545e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl.create;
import static org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
Expand Down Expand Up @@ -55,7 +54,6 @@
import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.StreamingStats;
import org.apache.pulsar.broker.service.Subscription;
Expand Down Expand Up @@ -244,14 +242,6 @@ public boolean isReplicationBacklogExist() {
return false;
}

@Override
public void removeProducer(Producer producer) {
checkArgument(producer.getTopic() == this);
if (producers.remove(producer.getProducerName(), producer)) {
handleProducerRemoved(producer);
}
}

@Override
public CompletableFuture<Void> checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) {
return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;

import java.lang.reflect.Field;
import java.util.UUID;
import lombok.Cleanup;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -119,4 +122,23 @@ public void testCreateNonExistentPartitions() throws PulsarAdminException, Pulsa
}
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 4);
}

@Test
public void testRemoveProducerOnNonPersistentTopic() throws Exception {
final String topicName = "non-persistent://prop/ns-abc/topic_" + UUID.randomUUID();

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();

NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
Field field = AbstractTopic.class.getDeclaredField("userCreatedProducerCount");
field.setAccessible(true);
int userCreatedProducerCount = (int) field.get(topic);
assertEquals(userCreatedProducerCount, 1);

producer.close();
userCreatedProducerCount = (int) field.get(topic);
assertEquals(userCreatedProducerCount, 0);
}
}

0 comments on commit 89b545e

Please sign in to comment.