Skip to content

Commit

Permalink
Fixes apache#4011.
Browse files Browse the repository at this point in the history
add default config for kafka sink and source
  • Loading branch information
Jett committed Jul 13, 2023
1 parent da7ae82 commit 9b2a96d
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,19 @@

@Data
public class SinkConnectorConfig {
private String connectorName;
private String tasksMax;
private String topic;
private String bootstrapServers;
private String groupID;
private String keyConverter;
private String valueConverter;
private String offsetFlushIntervalMS;
private String offsetStorageTopic;
private String offsetStorageReplicationFactor;
private String configStorageTopic;
private String configStorageReplicationFactor;
private String statusStorageTopic;
private String statusStorageReplicationFactor;
private String offsetCommitTimeoutMS;
private String offsetCommitIntervalMS;
private String heartbeatIntervalMS;
private String sessionTimeoutMS;

private String connectorName = "kafkaSink";
private String topic = "TopicTest";
private String ack = "all";
private String bootstrapServers = "127.0.0.1:9092";
private String keyConverter = "org.apache.kafka.common.serialization.StringSerializer";
private String valueConverter = "org.apache.kafka.common.serialization.StringSerializer";
private String maxRequestSize = "1048576";
private String bufferMemory = "33554432";
private String batchSize = "16384";
private String lingerMs = "0";
private String requestTimeoutMs = "30000";
private String maxInFightRequestsPerConnection = "5";
private String retries = "0";
private String compressionType = "none";
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,20 @@ public Class<? extends Config> configClass() {
}

@Override
public void init(Config config) throws Exception {
public void init(Config config) {
this.sinkConfig = (KafkaSinkConfig) config;
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sinkConfig.getConnectorConfig().getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, sinkConfig.getConnectorConfig().getKeyConverter());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, sinkConfig.getConnectorConfig().getValueConverter());
props.put(ProducerConfig.ACKS_CONFIG, sinkConfig.getConnectorConfig().getAck());
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, sinkConfig.getConnectorConfig().getMaxRequestSize());
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, sinkConfig.getConnectorConfig().getBufferMemory());
props.put(ProducerConfig.BATCH_SIZE_CONFIG, sinkConfig.getConnectorConfig().getBatchSize());
props.put(ProducerConfig.LINGER_MS_CONFIG, sinkConfig.getConnectorConfig().getLingerMs());
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, sinkConfig.getConnectorConfig().getRequestTimeoutMs());
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, sinkConfig.getConnectorConfig().getMaxInFightRequestsPerConnection());
props.put(ProducerConfig.RETRIES_CONFIG, sinkConfig.getConnectorConfig().getRetries());
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, sinkConfig.getConnectorConfig().getCompressionType());
producer = new KafkaProducer<>(props);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,16 @@

@Data
public class SourceConnectorConfig {
private String connectorName;
private String tasksMax;
private String topic;
private String bootstrapServers;
private String groupID;
private String keyConverter;
private String valueConverter;
private String offsetFlushIntervalMS;
private String offsetStorageTopic;
private String offsetStorageReplicationFactor;
private String configStorageTopic;
private String configStorageReplicationFactor;
private String statusStorageTopic;
private String statusStorageReplicationFactor;
private String offsetCommitTimeoutMS;
private String offsetCommitIntervalMS;
private String heartbeatIntervalMS;
private String sessionTimeoutMS;

private String connectorName = "kafkaSource";
private String topic = "TopicTest";
private String bootstrapServers = "127.0.0.1:9092";
private String groupID = "kafkaSource";
private String keyConverter = "org.apache.kafka.common.serialization.StringSerializer";
private String valueConverter = "org.apache.kafka.common.serialization.StringSerializer";
private String autoCommitIntervalMS = "1000";
private String enableAutoCommit = "false";
private String sessionTimeoutMS = "3000";
private String maxPollRecords = "1000";
private int pollTimeOut = 100;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -44,6 +43,8 @@ public class KafkaSourceConnector implements Source {

private KafkaConsumer<String, String> kafkaConsumer;

private int pollTimeOut = 100;

@Override
public Class<? extends Config> configClass() {
return KafkaSourceConfig.class;
Expand All @@ -54,11 +55,14 @@ public void init(Config config) throws Exception {
this.sourceConfig = (KafkaSourceConfig) config;
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, sourceConfig.getConnectorConfig().getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, sourceConfig.getConnectorConfig().getKeyConverter());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, sourceConfig.getConnectorConfig().getValueConverter());
props.put(ConsumerConfig.GROUP_ID_CONFIG, sourceConfig.getConnectorConfig().getGroupID());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, sourceConfig.getConnectorConfig().getEnableAutoCommit());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, sourceConfig.getConnectorConfig().getMaxPollRecords());
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, sourceConfig.getConnectorConfig().getAutoCommitIntervalMS());
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sourceConfig.getConnectorConfig().getSessionTimeoutMS());
this.pollTimeOut = sourceConfig.getConnectorConfig().getPollTimeOut();
this.kafkaConsumer = new KafkaConsumer<String, String>(props);
}

Expand All @@ -84,7 +88,7 @@ public void stop() {

@Override
public List<ConnectRecord> poll() {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(pollTimeOut));
List<ConnectRecord> connectRecords = new ArrayList<>(records.count());
for (ConsumerRecord<String, String> record : records) {
Long timestamp = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ connectorConfig:
bootstrapServers: 127.0.0.1:9090
topic: TopicTest
groupID: kafkaSource
maxPollRecords: 1000

0 comments on commit 9b2a96d

Please sign in to comment.