Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up subscription #622

Merged
merged 7 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions .ci/helm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,15 @@ function ci::verify_source() {
ci::verify_mongodb_source 30
}

function ci::verify_batch_source() {
sleep 30
kubectl logs --all-containers=true --tail=-1 -l compute.functionmesh.io/name=batch-source-sample | grep "Setting up instance consumer for BatchSource intermediate topic"
while [[ $? -ne 0 ]]; do
sleep 5
kubectl logs --all-containers=true --tail=-1 -l compute.functionmesh.io/name=batch-source-sample | grep "Setting up instance consumer for BatchSource intermediate topic"
done
}

function ci::verify_crypto_function() {
ci::verify_function_with_encryption "persistent://public/default/java-function-crypto-input-topic" "persistent://public/default/java-function-crypto-output-topic" "test-message" "test-message!" 10
}
Expand Down Expand Up @@ -501,3 +510,57 @@ function ci::verify_function_with_multi_pvs() {
num=$(kubectl get pvc -n ${NAMESPACE} -l compute.functionmesh.io/name="${function_name}" --no-headers|wc -l)
done
}

function ci::verify_cleanup_subscription() {
topic=$1
sub=$2
num=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin topics subscriptions ${topic} | grep ${sub} | wc -l)
retry=0
while [[ ${num} -ne 0 && $retry -lt 10 ]]; do
sleep 5
num=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin topics subscriptions ${topic} | grep ${sub} | wc -l)
retry=$((retry+1))
done

if [ $retry -eq 10 ]; then
exit 1
fi
}

function ci::verify_cleanup_subscription_with_auth() {
topic=$1
sub=$2
command="kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- sh -c 'bin/pulsar-admin --auth-plugin \$brokerClientAuthenticationPlugin --auth-params \$brokerClientAuthenticationParameters topics subscriptions $topic' | grep $sub | wc -l"
num=$(sh -c "$command")
retry=0
while [[ ${num} -ne 0 && $retry -lt 10 ]]; do
sleep 5
num=$(sh -c "$command")
retry=$((retry+1))
done

if [ $retry -eq 10 ]; then
exit 1
fi
}

function ci::verify_cleanup_batch_source_with_auth() {
topic=$1
sub=$2
num=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- sh -c 'bin/pulsar-admin --auth-plugin $brokerClientAuthenticationPlugin --auth-params $brokerClientAuthenticationParameters topics list public/default' | grep ${topic} | wc -l)
ci::verify_cleanup_subscription_with_auth ${topic} ${sub}
retry=0
while [[ ${num} -ne 0 && $retry -lt 10 ]]; do
sleep 5
num=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- sh -c 'bin/pulsar-admin --auth-plugin $brokerClientAuthenticationPlugin --auth-params $brokerClientAuthenticationParameters topics list public/default' | grep ${topic} | wc -l)
retry=$((retry+1))
done
if [ $retry -eq 10 ]; then
exit 1
fi
}

function ci::create_topic() {
topic=$1
kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin topics create ${topic}
}
64 changes: 64 additions & 0 deletions .ci/tests/integration-oauth2/cases/batch-source/manifests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
apiVersion: compute.functionmesh.io/v1alpha1
kind: Source
metadata:
name: batch-source-sample
spec:
className: org.apache.pulsar.io.batchdatagenerator.BatchDataGeneratorSource
replicas: 1
maxReplicas: 1
output:
producerConf:
maxPendingMessages: 1000
maxPendingMessagesAcrossPartitions: 50000
useThreadLocalProducers: true
topic: persistent://public/default/output-source-topic
typeClassName: org.apache.pulsar.io.batchdatagenerator.Person
forwardSourceMessageProperty: true
resources:
limits:
cpu: "0.2"
memory: 1.1G
requests:
cpu: "0.1"
memory: 1G
batchSourceConfig:
discoveryTriggererClassName: "org.apache.pulsar.io.batchdiscovery.ImmediateTriggerer"
sourceConfig:
test: test
pulsar:
pulsarConfig: "test-source"
tlsConfig:
enabled: false
allowInsecure: false
hostnameVerification: true
certSecretName: sn-platform-tls-broker
certSecretKey: ""
authConfig:
oauth2Config:
audience: api://56c1bd14-3ba7-4804-b47b-d46de6dce33e/.default
issuerUrl: https://sts.windows.net/06a8a086-ae6e-45b5-a22e-ad90de23013e/v2.0
scope: api://56c1bd14-3ba7-4804-b47b-d46de6dce33e/.default
keySecretName: sn-platform-oauth2-private-key
keySecretKey: auth.json
cleanupAuthConfig:
oauth2Config:
audience: api://56c1bd14-3ba7-4804-b47b-d46de6dce33e/.default
issuerUrl: https://sts.windows.net/06a8a086-ae6e-45b5-a22e-ad90de23013e/v2.0
scope: api://56c1bd14-3ba7-4804-b47b-d46de6dce33e/.default
keySecretName: sn-platform-oauth2-private-key
keySecretKey: auth.json
image: streamnative/pulsar-io-batch-data-generator:2.9.2.23
java:
jar: connectors/pulsar-io-batch-data-generator-2.9.2.23.nar
jarLocation: "" # use pulsar provided connectors
# use package name:
# jarLocation: function://public/default/nul-test-java-source@v1
clusterName: test-pulsar
---
apiVersion: v1
kind: ConfigMap
metadata:
name: test-source
data:
webServiceURL: http://sn-platform-pulsar-broker.default.svc.cluster.local:8080
brokerServiceURL: pulsar://sn-platform-pulsar-broker.default.svc.cluster.local:6650
59 changes: 59 additions & 0 deletions .ci/tests/integration-oauth2/cases/batch-source/verify.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

set -e

E2E_DIR=$(dirname "$0")
BASE_DIR=$(cd "${E2E_DIR}"/../../../../..;pwd)
PULSAR_NAMESPACE=${PULSAR_NAMESPACE:-"default"}
PULSAR_RELEASE_NAME=${PULSAR_RELEASE_NAME:-"sn-platform"}
E2E_KUBECONFIG=${E2E_KUBECONFIG:-"/tmp/e2e-k8s.config"}

source "${BASE_DIR}"/.ci/helm.sh

if [ ! "$KUBECONFIG" ]; then
export KUBECONFIG=${E2E_KUBECONFIG}
fi

manifests_file="${BASE_DIR}"/.ci/tests/integration-oauth2/cases/batch-source/manifests.yaml

kubectl apply -f "${manifests_file}" > /dev/null 2>&1

verify_fm_result=$(ci::verify_function_mesh batch-source-sample 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_fm_result"
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
exit 1
fi

verify_source_result=$(ci::verify_batch_source 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_source_result"
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
exit 1
fi
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true

verify_cleanup_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_cleanup_batch_source_with_auth persistent://public/default/batch-source-sample-intermediate BatchSourceExecutor-public/default/batch-source-sample 2>&1)
if [ $? -eq 0 ]; then
echo "e2e-test: ok" | yq eval -
else
echo "$verify_cleanup_result"
fi
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ metadata:
namespace: default
spec:
className: org.apache.pulsar.functions.api.examples.ExclamationFunction
cleanupSubscription: true
subscriptionName: java-download-subscription
forwardSourceMessageProperty: true
maxPendingAsyncRequests: 1000
replicas: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,16 @@ if [ $? -ne 0 ]; then
fi

verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_download_java_function true 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_java_result"
kubectl delete -f "${BASE_DIR}"/.ci/tests/integration-oauth2/cases/java-download-function/manifests.yaml > /dev/null 2>&1 || true
exit 1
fi

kubectl delete -f "${BASE_DIR}"/.ci/tests/integration-oauth2/cases/java-download-function/manifests.yaml > /dev/null 2>&1 || true
verify_cleanup_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_cleanup_subscription_with_auth persistent://public/default/input-download-java-topic java-download-subscription 2>&1)
if [ $? -eq 0 ]; then
echo "e2e-test: ok" | yq eval -
else
echo "$verify_java_result"
echo "$verify_cleanup_result"
fi
kubectl delete -f "${BASE_DIR}"/.ci/tests/integration-oauth2/cases/java-download-function/manifests.yaml > /dev/null 2>&1 || true
2 changes: 2 additions & 0 deletions .ci/tests/integration-oauth2/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,5 @@ verify:
expected: expected.data.yaml
- query: timeout 5m bash .ci/tests/integration-oauth2/cases/py-download-function-legacy/verify.sh
expected: expected.data.yaml
- query: timeout 5m bash .ci/tests/integration-oauth2/cases/batch-source/verify.sh
expected: expected.data.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ metadata:
name: sink-sample
spec:
className: org.apache.pulsar.io.elasticsearch.ElasticSearchSink
cleanupSubscription: true
replicas: 1
maxReplicas: 1
input:
topics:
- persistent://public/default/input-sink-topic
- persistent://public/default/input-sink-topid
- persistent://public/default/input-sink-topie
typeClassName: "org.apache.pulsar.client.api.schema.GenericObject"
sinkConfig:
elasticSearchUrl: "http://quickstart-es-http.default.svc.cluster.local:9200"
Expand All @@ -29,9 +32,9 @@ spec:
requests:
cpu: "0.1"
memory: 1G
image: streamnative/pulsar-io-elastic-search:2.9.2.23
image: streamnative/pulsar-io-elastic-search:2.10.0.0-rc10
java:
jar: connectors/pulsar-io-elastic-search-2.9.2.23.nar
jar: connectors/pulsar-io-elastic-search-2.10.0.0-rc10.nar
jarLocation: "" # use pulsar provided connectors
clusterName: test-pulsar
autoAck: true
Expand Down
25 changes: 22 additions & 3 deletions .ci/tests/integration/cases/elasticsearch-sink/verify.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,29 @@ if [ $? -ne 0 ]; then
fi

verify_sink_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_sink 2>&1)
if [ $? -eq 0 ]; then
echo "e2e-test: ok" | yq eval -
else
if [ $? -ne 0 ]; then
echo "$verify_sink_result"
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
exit 1
fi
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
uninstall_elasticsearch_cluster > /dev/null 2>&1 || true

verify_cleanup_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_cleanup_subscription persistent://public/default/input-sink-topic public/default/sink-sample 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_cleanup_result"
exit 1
fi

verify_cleanup_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_cleanup_subscription persistent://public/default/input-sink-topid public/default/sink-sample 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_cleanup_result"
exit 1
fi

verify_cleanup_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_cleanup_subscription persistent://public/default/input-sink-topie public/default/sink-sample 2>&1)
if [ $? -eq 0 ]; then
echo "e2e-test: ok" | yq eval -
else
echo "$verify_cleanup_result"
fi
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ metadata:
namespace: default
spec:
className: org.apache.pulsar.functions.api.examples.ExclamationFunction
cleanupSubscription: true
subscriptionName: java-download-subscription
forwardSourceMessageProperty: true
maxPendingAsyncRequests: 1000
replicas: 1
maxReplicas: 5
logTopic: persistent://public/default/logging-function-logs
input:
topics:
- persistent://public/default/input-download-java-topic
topicPattern: "persistent://public/default/input-download-java-topi.*"
typeClassName: java.lang.String
output:
topic: persistent://public/default/output-download-java-topic
Expand Down
15 changes: 13 additions & 2 deletions .ci/tests/integration/cases/java-download-function/verify.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ if [ ! "$KUBECONFIG" ]; then
export KUBECONFIG=${E2E_KUBECONFIG}
fi

# TODO: for consumer using topic pattern, at least one topic should be created in advance, else consumer cannot
# subscribe to new created topic, error is: "Topic does not have schema to check"
ci::create_topic persistent://public/default/input-download-java-topic

kubectl apply -f "${BASE_DIR}"/.ci/tests/integration/cases/java-download-function/manifests.yaml > /dev/null 2>&1

verify_fm_result=$(ci::verify_function_mesh function-download-sample 2>&1)
Expand All @@ -42,9 +46,16 @@ if [ $? -ne 0 ]; then
fi

verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_download_java_function 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_java_result"
kubectl delete -f "${BASE_DIR}"/.ci/tests/integration/cases/java-download-function/manifests.yaml > /dev/null 2>&1 || true
exit 1
fi

kubectl delete -f "${BASE_DIR}"/.ci/tests/integration/cases/java-download-function/manifests.yaml > /dev/null 2>&1 || true
verify_cleanup_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_cleanup_subscription persistent://public/default/input-download-java-topic java-download-subscription 2>&1)
if [ $? -eq 0 ]; then
echo "e2e-test: ok" | yq eval -
else
echo "$verify_java_result"
echo "$verify_cleanup_result"
fi
kubectl delete -f "${BASE_DIR}"/.ci/tests/integration/cases/java-download-function/manifests.yaml > /dev/null 2>&1 || true
2 changes: 2 additions & 0 deletions api/compute/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type PulsarMessaging struct {

// To replace the AuthSecret
AuthConfig *AuthConfig `json:"authConfig,omitempty"`

CleanupAuthConfig *AuthConfig `json:"cleanupAuthConfig,omitempty"`
}

type TLSConfig struct {
Expand Down
2 changes: 2 additions & 0 deletions api/compute/v1alpha1/function_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type FunctionSpec struct {
MinReplicas *int32 `json:"minReplicas,omitempty"`

DownloaderImage string `json:"downloaderImage,omitempty"`
// the image used to clean up subscription, if empty, the runner image will be used
CleanupImage string `json:"cleanupImage,omitempty"`

// MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler
// If provided, a default HPA with CPU at average of 80% will be used.
Expand Down
2 changes: 2 additions & 0 deletions api/compute/v1alpha1/sink_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type SinkSpec struct {
MinReplicas *int32 `json:"minReplicas,omitempty"`

DownloaderImage string `json:"downloaderImage,omitempty"`
// the image used to clean up subscription, if empty, the runner image will be used
CleanupImage string `json:"cleanupImage,omitempty"`

// MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler
// If provided, a default HPA with CPU at average of 80% will be used.
Expand Down
2 changes: 2 additions & 0 deletions api/compute/v1alpha1/source_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type SourceSpec struct {
MinReplicas *int32 `json:"minReplicas,omitempty"`

DownloaderImage string `json:"downloaderImage,omitempty"`
// the image used to clean up subscription, if empty, the runner image will be used
CleanupImage string `json:"cleanupImage,omitempty"`

// MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler
// If provided, a default HPA with CPU at average of 80% will be used.
Expand Down
5 changes: 5 additions & 0 deletions api/compute/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading