Skip to content

Commit

Permalink
DBZ-7904 Forced Shutdown for errored pubsub managed channel
Browse files Browse the repository at this point in the history
  • Loading branch information
ankurg03 authored and jpechane committed Jun 3, 2024
1 parent b3c91e9 commit aad4aa4
Showing 1 changed file with 24 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,6 @@
*/
package io.debezium.server.pubsub;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.*;

import io.debezium.util.Threads;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Named;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
Expand All @@ -45,21 +22,36 @@
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import io.debezium.util.Threads;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;

/**
* Implementation of the consumer that delivers the messages into Google Pub/Sub destination.
*
* @author Jiri Pechanec
*
*/
@Named("pubsub")
@Dependent
Expand Down Expand Up @@ -199,8 +191,7 @@ void connect() {
}

return builder.build();
}
catch (IOException e) {
} catch (IOException e) {
throw new DebeziumException(e);
}
};
Expand All @@ -213,8 +204,7 @@ void close() {
publishers.values().forEach(publisher -> {
try {
publisher.shutdown();
}
catch (Exception e) {
} catch (Exception e) {
LOGGER.warn("Exception while closing publisher: {}", e.getMessage(), e);
}
});
Expand Down Expand Up @@ -265,8 +255,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitt
List<String> messageIds;
try {
messageIds = ApiFutures.allAsList(deliveries).get(waitMessageDeliveryTimeout, TimeUnit.MILLISECONDS);
}
catch (ExecutionException | TimeoutException e) {
} catch (ExecutionException | TimeoutException e) {
throw new DebeziumException(e);
}
LOGGER.trace("Sent messages with ids: {}", messageIds);
Expand All @@ -281,23 +270,19 @@ private PubsubMessage buildPubSubMessage(ChangeEvent<Object, Object> record) {
if (orderingKey.isEmpty()) {
if (record.key() == null) {
pubsubMessage.setOrderingKey(nullKey);
}
else if (record.key() instanceof String) {
} else if (record.key() instanceof String) {
pubsubMessage.setOrderingKey((String) record.key());
}
else if (record.key() instanceof byte[]) {
} else if (record.key() instanceof byte[]) {
pubsubMessage.setOrderingKeyBytes(ByteString.copyFrom((byte[]) record.key()));
}
}
else {
} else {
pubsubMessage.setOrderingKey(orderingKey.get());
}
}

if (record.value() instanceof String) {
pubsubMessage.setData(ByteString.copyFromUtf8((String) record.value()));
}
else if (record.value() instanceof byte[]) {
} else if (record.value() instanceof byte[]) {
pubsubMessage.setData(ByteString.copyFrom((byte[]) record.value()));
}

Expand Down

0 comments on commit aad4aa4

Please sign in to comment.