diff --git a/kafka/hbase-kafka-proxy/pom.xml b/kafka/hbase-kafka-proxy/pom.xml index a4372d48..356838d7 100755 --- a/kafka/hbase-kafka-proxy/pom.xml +++ b/kafka/hbase-kafka-proxy/pom.xml @@ -161,6 +161,13 @@ kafka-clients ${kafka-clients.version} + + org.apache.kafka + kafka-clients + test + test + ${kafka-clients.version} + org.apache.commons diff --git a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java index 7b767ca9..e800501b 100644 --- a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java +++ b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java @@ -19,35 +19,28 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.test.MockSerializer; /** * Mocks Kafka producer for testing */ -public class ProducerForTesting implements Producer { +public class ProducerForTesting extends MockProducer { Map> messages = new HashMap<>(); SpecificDatumReader dreader = new SpecificDatumReader<>(HBaseKafkaEvent.SCHEMA$); - public Map> getMessages() { - return messages; + public ProducerForTesting() { + super(true, new MockSerializer(), new MockSerializer()); } - @Override - public void abortTransaction() throws ProducerFencedException { + public Map> getMessages() { + return messages; } @Override @@ -59,79 +52,9 @@ public Future send(ProducerRecord producerRecord messages.put(producerRecord.topic(), new ArrayList<>()); } messages.get(producerRecord.topic()).add(event); - return new Future() { - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return false; - } - - @Override - public RecordMetadata get() { - return new RecordMetadata(null, 1, 1, 1, (long)1, 1, 1); - } - - @Override - public RecordMetadata get(long timeout, TimeUnit unit) { - return null; - } - }; + return super.send(producerRecord); } catch (Exception e) { throw new RuntimeException(e); } } - - @Override - public Future send(ProducerRecord producerRecord, - Callback callback) { - return null; - } - - @Override - public void flush() { - } - - @Override - public List partitionsFor(String s) { - return null; - } - - @Override - public Map metrics() { - return null; - } - - @Override - public void close() { - } - - @Override - public void close(long l, TimeUnit timeUnit) { - } - - @Override - public void initTransactions() { - } - - @Override - public void beginTransaction() throws ProducerFencedException { - } - - @Override - public void sendOffsetsToTransaction(Map offsets, - String consumerGroupId) throws ProducerFencedException { - } - - @Override - public void commitTransaction() throws ProducerFencedException { - } }