Skip to content

Commit

Permalink
Close producer without state check while the topic does not exists. (a…
Browse files Browse the repository at this point in the history
…pache#7120)

Fixes apache#6838 related to apache#6879

### Motivation

Close producer without state check while the topic does not exists. The problem is the `State` of the producer failed to pass the check condition so that the producer can't be closed. If TopicDoesNotExsitsException happens on a producer, we can directly close this producer without state checking for the producer.
  • Loading branch information
codelipenghui authored and huangdx0726 committed Aug 24, 2020
1 parent 910ba2e commit 18ac4a8
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 62 deletions.
137 changes: 137 additions & 0 deletions .github/workflows/ci-unit-broker-others.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: CI - Unit - Brokers - Others
on:
pull_request:
branches:
- master

jobs:

unit-tests:
name:
runs-on: ubuntu-latest
timeout-minutes: 120

steps:
- name: checkout
uses: actions/checkout@v2
with:
fetch-depth: 0
ref: ${{ github.event.pull_request.head.sha }}

- name: Check if this pull request only changes documentation
id: docs
uses: apache/pulsar-test-infra/diff-only@master
with:
args: site2 .github deployment .asf.yaml .ci ct.yaml

- name: Set up JDK 1.8
uses: actions/setup-java@v1
if: steps.docs.outputs.changed_only == 'no'
with:
java-version: 1.8

- name: Set up Maven
uses: apache/pulsar-test-infra/setup-maven@master
if: steps.docs.outputs.changed_only == 'no'
with:
maven-version: 3.6.1

- name: clean disk
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
- name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn clean install -DskipTests

- name: run unit tests pulsar broker reader test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=ReaderTest' -pl pulsar-broker

- name: run unit tests pulsar broker rack aware test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=RackAwareTest' -pl pulsar-broker

- name: run unit tests pulsar broker simple producer consumer test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=SimpleProducerConsumerTest' -pl pulsar-broker

- name: run unit tests pulsar broker V1 producer consumer test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=V1_ProducerConsumerTest' -pl pulsar-broker

- name: run unit tests pulsar broker persistent failover end to end test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=PersistentFailoverE2ETest' -pl pulsar-broker

- name: run unit tests pulsar broker client integration test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=BrokerClientIntegrationTest' -pl pulsar-broker

- name: run unit tests pulsar broker replicatior rate limiter test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=ReplicatorRateLimiterTest' -pl pulsar-broker

- name: run unit tests pulsar broker persistent dispatcher failover consumer test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=PersistentDispatcherFailoverConsumerTest' -pl pulsar-broker

- name: run unit tests pulsar broker admin api test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=AdminApiTest' -pl pulsar-broker

- name: run unit tests pulsar broker v1 admin api test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=V1_AdminApiTest' -pl pulsar-broker

- name: run unit tests pulsar broker compaction test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=CompactionTest' -pl pulsar-broker

- name: run unit tests pulsar broker batch message test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=BatchMessageTest' -pl pulsar-broker

- name: run unit tests pulsar broker partitioned topics schema test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=PartitionedTopicsSchemaTest' -pl pulsar-broker

- name: package surefire artifacts
if: failure()
run: |
df -h
free -h
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts
- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
with:
name: surefire-artifacts
path: artifacts.zip
52 changes: 0 additions & 52 deletions .github/workflows/ci-unit-broker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,58 +67,6 @@ jobs:
- name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn clean install -DskipTests

- name: run unit tests pulsar broker reader test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=ReaderTest' -pl pulsar-broker

- name: run unit tests pulsar broker rack aware test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=RackAwareTest' -pl pulsar-broker

- name: run unit tests pulsar broker simple producer consumer test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=SimpleProducerConsumerTest' -pl pulsar-broker

- name: run unit tests pulsar broker V1 producer consumer test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=V1_ProducerConsumerTest' -pl pulsar-broker

- name: run unit tests pulsar broker persistent failover end to end test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=PersistentFailoverE2ETest' -pl pulsar-broker

- name: run unit tests pulsar broker client integration test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=BrokerClientIntegrationTest' -pl pulsar-broker

- name: run unit tests pulsar broker replicatior rate limiter test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=ReplicatorRateLimiterTest' -pl pulsar-broker

- name: run unit tests pulsar broker persistent dispatcher failover consumer test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=PersistentDispatcherFailoverConsumerTest' -pl pulsar-broker

- name: run unit tests pulsar broker admin api test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=AdminApiTest' -pl pulsar-broker

- name: run unit tests pulsar broker v1 admin api test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=V1_AdminApiTest' -pl pulsar-broker

- name: run unit tests pulsar broker compaction test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=CompactionTest' -pl pulsar-broker

- name: run unit tests pulsar broker batch message test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=BatchMessageTest' -pl pulsar-broker

- name: run unit tests pulsar broker partitioned topics schema test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=PartitionedTopicsSchemaTest' -pl pulsar-broker

- name: run unit test pulsar-broker
if: steps.docs.outputs.changed_only == 'no'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import io.netty.util.HashedWheelTimer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -49,15 +50,22 @@ public void cleanup() throws Exception {
super.internalCleanup();
}

@Test(expectedExceptions = PulsarClientException.TopicDoesNotExistException.class)
@Test
public void testCreateProducerOnNotExistsTopic() throws PulsarClientException, InterruptedException {
pulsarClient.newProducer()
.topic("persistent://public/default/" + UUID.randomUUID().toString())
.sendTimeout(1, TimeUnit.SECONDS)
.create();
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
try {
pulsarClient.newProducer()
.topic("persistent://public/default/" + UUID.randomUUID().toString())
.sendTimeout(100, TimeUnit.MILLISECONDS)
.create();
Assert.fail("Create producer should failed while topic does not exists.");
} catch (PulsarClientException ignore) {
}
Thread.sleep(2000);
HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
Assert.assertEquals(timer.pendingTimeouts(), 0);
Assert.assertEquals(((PulsarClientImpl) pulsarClient).producersCount(), 0);
pulsarClient.close();
}

@Test(expectedExceptions = PulsarClientException.TopicDoesNotExistException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1244,16 +1244,12 @@ public void connectionOpened(final ClientCnx cnx) {
}
log.error("[{}] [{}] Failed to create producer: {}", topic, producerName, cause.getMessage());
// Close the producer since topic does not exists.
if (getState() == State.Failed
&& cause instanceof PulsarClientException.TopicDoesNotExistException) {
if (cause instanceof PulsarClientException.TopicDoesNotExistException) {
closeAsync().whenComplete((v, ex) -> {
if (ex != null) {
log.error("Failed to close producer on TopicDoesNotExistException.", ex);
}
producerCreatedFuture.completeExceptionally(cause);
if (getState() == State.Closing || getState() == State.Closed) {
cnx.channel().close();
}
});
return null;
}
Expand Down

0 comments on commit 18ac4a8

Please sign in to comment.