diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java index 9704c67e..477cddd8 100755 --- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java +++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.kafka; +import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.Properties; @@ -36,13 +37,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.org.apache.commons.cli.BasicParser; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; - /** * connects to kafka and reads from the passed in topics. Parses each message into an avro object * and dumps it to the console. @@ -52,7 +52,6 @@ public final class DumpToStringListener { private static final Logger LOG = LoggerFactory.getLogger(DumpToStringListener.class); private DumpToStringListener(){ - } public static void main(String[] args) { @@ -65,12 +64,14 @@ public static void main(String[] args) { options.addRequiredOption("t", "kafkatopics", true,"Kafka Topics " + "to subscribe to (comma delimited)"); CommandLine commandLine = null; + try { - commandLine = new BasicParser().parse(options, args); + commandLine = new DefaultParser().parse(options, args); } catch (ParseException e) { LOG.error("Could not parse: ", e); printUsageAndExit(options, -1); } + SpecificDatumReader dreader = new SpecificDatumReader<>(HBaseKafkaEvent.SCHEMA$); @@ -81,11 +82,11 @@ public static void main(String[] args) { props.put("key.deserializer", ByteArrayDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); - try (KafkaConsumer consumer = new KafkaConsumer<>(props);){ + try (KafkaConsumer consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.stream(topic.split(",")).collect(Collectors.toList())); while (true) { - ConsumerRecords records = consumer.poll(10000); + ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); Iterator> it = records.iterator(); while (it.hasNext()) { ConsumerRecord record = it.next(); @@ -108,5 +109,4 @@ private static void printUsageAndExit(Options options, int exitCode) { "[-k ] \n", true); System.exit(exitCode); } - } diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java index 16bf544c..2c1aca12 100755 --- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java +++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java @@ -49,8 +49,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.org.apache.commons.cli.BasicParser; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; @@ -162,7 +162,7 @@ public static void main(String[] args) throws Exception { String[] restArgs = parser.getRemainingArgs(); try { - commandLine = new BasicParser().parse(options, restArgs); + commandLine = new DefaultParser().parse(options, restArgs); } catch (ParseException e) { LOG.error("Could not parse: ", e); printUsageAndExit(options, -1); diff --git a/spark/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java b/spark/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java index e5a8ddd0..802938ef 100644 --- a/spark/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java +++ b/spark/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java @@ -28,7 +28,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; -import org.apache.commons.lang3.RandomStringUtils; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -224,8 +224,7 @@ public Iterator> call(Integer v1, Iterator v2) throws Exception { // Insert record into a list List tmp1 = Arrays.asList(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow)); List tmp2 = Arrays.asList(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i)); - List tmp3 = Arrays.asList(rk, DATA_FAM, chainIdArray, Bytes.toBytes( - RandomStringUtils.randomAlphabetic(50))); + List tmp3 = Arrays.asList(rk, DATA_FAM, chainIdArray, Bytes.toBytes("random" + i)); res.add(tmp1); res.add(tmp2); res.add(tmp3);