From 2ff00afac6cef4805695d33a9bff7ddce9ba5ff4 Mon Sep 17 00:00:00 2001 From: Wenning Ding Date: Tue, 10 Dec 2019 15:08:04 -0800 Subject: [PATCH 1/2] [HUDI-238] Make Hudi support Scala 2.12 --- docker/demo/config/kafka-source.properties | 4 +- hudi-cli/pom.xml | 10 +- hudi-client/pom.xml | 4 +- hudi-spark/pom.xml | 17 ++- hudi-utilities/pom.xml | 35 +++-- .../utilities/sources/AvroKafkaSource.java | 17 +-- .../utilities/sources/JsonKafkaSource.java | 13 +- .../sources/helpers/KafkaOffsetGen.java | 120 +++++++----------- .../utilities/sources/TestKafkaSource.java | 22 ++-- .../kafka-source.properties | 4 +- packaging/hudi-spark-bundle/pom.xml | 4 +- packaging/hudi-utilities-bundle/pom.xml | 8 +- pom.xml | 13 +- 13 files changed, 137 insertions(+), 134 deletions(-) diff --git a/docker/demo/config/kafka-source.properties b/docker/demo/config/kafka-source.properties index dbeba47fee7d1..4d7f77cef9963 100644 --- a/docker/demo/config/kafka-source.properties +++ b/docker/demo/config/kafka-source.properties @@ -26,5 +26,5 @@ hoodie.deltastreamer.schemaprovider.target.schema.file=/var/demo/config/schema.a # Kafka Source hoodie.deltastreamer.source.kafka.topic=stock_ticks #Kafka props -metadata.broker.list=kafkabroker:9092 -auto.offset.reset=smallest +bootstrap.servers=kafkabroker:9092 +auto.offset.reset=earliest diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml index 62b55de4ddb86..e1359cb96ea43 100644 --- a/hudi-cli/pom.xml +++ b/hudi-cli/pom.xml @@ -153,16 +153,22 @@ org.apache.parquet parquet-avro + + + com.thoughtworks.paranamer + paranamer + + org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml index 02842e9244376..63a62182f7165 100644 --- a/hudi-client/pom.xml +++ b/hudi-client/pom.xml @@ -95,11 +95,11 @@ org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} diff --git a/hudi-spark/pom.xml b/hudi-spark/pom.xml index d69954a1812c7..fc0aa6916c84c 100644 --- a/hudi-spark/pom.xml +++ b/hudi-spark/pom.xml @@ -186,13 +186,20 @@ com.fasterxml.jackson.module - jackson-module-scala_2.11 + jackson-module-scala_${scala.binary.version} org.apache.avro avro + + + + com.thoughtworks.paranamer + paranamer + + @@ -204,17 +211,17 @@ org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} org.apache.spark - spark-avro_2.11 + spark-avro_${scala.binary.version} provided @@ -289,7 +296,7 @@ org.scalatest - scalatest_2.11 + scalatest_${scala.binary.version} ${scalatest.version} test diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 53a3a1a3acec0..5f6419c913554 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -128,19 +128,25 @@ com.fasterxml.jackson.module - jackson-module-scala_2.11 + jackson-module-scala_${scala.binary.version} org.apache.parquet parquet-avro + + + com.thoughtworks.paranamer + paranamer + + org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} javax.servlet @@ -151,7 +157,7 @@ org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} javax.servlet @@ -162,20 +168,26 @@ org.apache.spark - spark-avro_2.11 + spark-avro_${scala.binary.version} provided org.apache.spark - spark-streaming_2.11 + spark-streaming_${scala.binary.version} ${spark.version} org.apache.spark - spark-streaming-kafka-0-8_2.11 + spark-streaming-kafka-0-10_${scala.binary.version} ${spark.version} + + org.apache.spark + spark-streaming-kafka-0-10_${scala.binary.version} + ${spark.version} + tests + @@ -197,8 +209,8 @@ com.twitter - bijection-avro_2.11 - 0.9.2 + bijection-avro_${scala.binary.version} + 0.9.3 @@ -223,6 +235,13 @@ 3.0.0 + + org.apache.kafka + kafka_${scala.binary.version} + ${kafka.version} + test + + org.apache.httpcomponents diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index 18ebff419717a..b2139882c4fa6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -24,16 +24,17 @@ import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; -import io.confluent.kafka.serializers.KafkaAvroDecoder; -import kafka.serializer.StringDecoder; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; -import org.apache.spark.streaming.kafka.KafkaUtils; -import org.apache.spark.streaming.kafka.OffsetRange; +import org.apache.spark.streaming.kafka010.KafkaUtils; +import org.apache.spark.streaming.kafka010.LocationStrategies; +import org.apache.spark.streaming.kafka010.OffsetRange; /** * Reads avro serialized Kafka data, based on the confluent schema-registry. @@ -47,6 +48,8 @@ public class AvroKafkaSource extends AvroSource { public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { super(props, sparkContext, sparkSession, schemaProvider); + props.put("key.deserializer", StringDeserializer.class); + props.put("value.deserializer", KafkaAvroDeserializer.class); offsetGen = new KafkaOffsetGen(props); } @@ -64,9 +67,7 @@ protected InputBatch> fetchNewData(Option lastChe } private JavaRDD toRDD(OffsetRange[] offsetRanges) { - JavaRDD recordRDD = - KafkaUtils.createRDD(sparkContext, String.class, Object.class, StringDecoder.class, KafkaAvroDecoder.class, - offsetGen.getKafkaParams(), offsetRanges).values().map(obj -> (GenericRecord) obj); - return recordRDD; + return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, + LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value()); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index bd922ac6bd0c3..51a1ae1de2e48 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -24,14 +24,15 @@ import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; -import kafka.serializer.StringDecoder; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; -import org.apache.spark.streaming.kafka.KafkaUtils; -import org.apache.spark.streaming.kafka.OffsetRange; +import org.apache.spark.streaming.kafka010.KafkaUtils; +import org.apache.spark.streaming.kafka010.LocationStrategies; +import org.apache.spark.streaming.kafka010.OffsetRange; /** * Read json kafka data. @@ -45,6 +46,8 @@ public class JsonKafkaSource extends JsonSource { public JsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { super(properties, sparkContext, sparkSession, schemaProvider); + properties.put("key.deserializer", StringDeserializer.class); + properties.put("value.deserializer", StringDeserializer.class); offsetGen = new KafkaOffsetGen(properties); } @@ -61,7 +64,7 @@ protected InputBatch> fetchNewData(Option lastCheckpoint } private JavaRDD toRDD(OffsetRange[] offsetRanges) { - return KafkaUtils.createRDD(sparkContext, String.class, String.class, StringDecoder.class, StringDecoder.class, - offsetGen.getKafkaParams(), offsetRanges).values(); + return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, + LocationStrategies.PreferConsistent()).map(x -> (String) x.value()); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index c17a5cff7c6ee..ed5e4e944b1a1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -22,30 +22,24 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.exception.HoodieNotSupportedException; -import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; -import kafka.common.TopicAndPartition; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.streaming.kafka.KafkaCluster; -import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset; -import org.apache.spark.streaming.kafka.OffsetRange; +import org.apache.spark.streaming.kafka010.OffsetRange; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; -import scala.Predef; -import scala.collection.JavaConverters; -import scala.collection.immutable.Map; -import scala.collection.immutable.Set; -import scala.collection.mutable.ArrayBuffer; -import scala.collection.mutable.StringBuilder; -import scala.util.Either; - /** * Source to read data from Kafka, incrementally. */ @@ -58,8 +52,8 @@ public static class CheckpointUtils { /** * Reconstruct checkpoint from string. */ - public static HashMap strToOffsets(String checkpointStr) { - HashMap offsetMap = new HashMap<>(); + public static HashMap strToOffsets(String checkpointStr) { + HashMap offsetMap = new HashMap<>(); if (checkpointStr.length() == 0) { return offsetMap; } @@ -67,8 +61,7 @@ public static HashMap strToOffsets String topic = splits[0]; for (int i = 1; i < splits.length; i++) { String[] subSplits = splits[i].split(":"); - offsetMap.put(new TopicAndPartition(topic, Integer.parseInt(subSplits[0])), - new KafkaCluster.LeaderOffset("", -1, Long.parseLong(subSplits[1]))); + offsetMap.put(new TopicPartition(topic, Integer.parseInt(subSplits[0])), Long.parseLong(subSplits[1])); } return offsetMap; } @@ -83,7 +76,7 @@ public static String offsetsToStr(OffsetRange[] ranges) { // at least 1 partition will be present. sb.append(ranges[0].topic() + ","); sb.append(Arrays.stream(ranges).map(r -> String.format("%s:%d", r.partition(), r.untilOffset())) - .collect(Collectors.joining(","))); + .collect(Collectors.joining(","))); return sb.toString(); } @@ -94,32 +87,32 @@ public static String offsetsToStr(OffsetRange[] ranges) { * @param toOffsetMap offsets of where each partitions is currently at * @param numEvents maximum number of events to read. */ - public static OffsetRange[] computeOffsetRanges(HashMap fromOffsetMap, - HashMap toOffsetMap, long numEvents) { + public static OffsetRange[] computeOffsetRanges(Map fromOffsetMap, + Map toOffsetMap, long numEvents) { Comparator byPartition = Comparator.comparing(OffsetRange::partition); // Create initial offset ranges for each 'to' partition, with from = to offsets. OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()]; toOffsetMap.entrySet().stream().map(e -> { - TopicAndPartition tp = e.getKey(); - long fromOffset = fromOffsetMap.getOrDefault(tp, new LeaderOffset("", -1, 0)).offset(); + TopicPartition tp = e.getKey(); + long fromOffset = fromOffsetMap.getOrDefault(tp, 0L); return OffsetRange.create(tp, fromOffset, fromOffset); }).sorted(byPartition).collect(Collectors.toList()).toArray(ranges); long allocedEvents = 0; - java.util.Set exhaustedPartitions = new HashSet<>(); + Set exhaustedPartitions = new HashSet<>(); // keep going until we have events to allocate and partitions still not exhausted. while (allocedEvents < numEvents && exhaustedPartitions.size() < toOffsetMap.size()) { long remainingEvents = numEvents - allocedEvents; long eventsPerPartition = - (long) Math.ceil((1.0 * remainingEvents) / (toOffsetMap.size() - exhaustedPartitions.size())); + (long) Math.ceil((1.0 * remainingEvents) / (toOffsetMap.size() - exhaustedPartitions.size())); // Allocate the remaining events to non-exhausted partitions, in round robin fashion for (int i = 0; i < ranges.length; i++) { OffsetRange range = ranges[i]; if (!exhaustedPartitions.contains(range.partition())) { - long toOffsetMax = toOffsetMap.get(range.topicAndPartition()).offset(); + long toOffsetMax = toOffsetMap.get(range.topicPartition()); long toOffset = Math.min(toOffsetMax, range.untilOffset() + eventsPerPartition); if (toOffset == toOffsetMax) { exhaustedPartitions.add(range.partition()); @@ -130,7 +123,7 @@ public static OffsetRange[] computeOffsetRanges(HashMap java conversions. (oh my!) - */ - static class ScalaHelpers { - - public static Map toScalaMap(HashMap m) { - return JavaConverters.mapAsScalaMapConverter(m).asScala().toMap(Predef.conforms()); - } - - public static Set toScalaSet(HashSet s) { - return JavaConverters.asScalaSetConverter(s).asScala().toSet(); - } - - public static java.util.Map toJavaMap(Map m) { - return JavaConverters.mapAsJavaMapConverter(m).asJava(); - } - } - /** * Kafka reset offset strategies. */ enum KafkaResetOffsetStrategies { - LARGEST, SMALLEST + LATEST, EARLIEST } /** @@ -175,20 +150,20 @@ public static class Config { private static final String KAFKA_TOPIC_NAME = "hoodie.deltastreamer.source.kafka.topic"; private static final String MAX_EVENTS_FROM_KAFKA_SOURCE_PROP = "hoodie.deltastreamer.kafka.source.maxEvents"; - private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LARGEST; + private static final KafkaResetOffsetStrategies DEFAULT_AUTO_RESET_OFFSET = KafkaResetOffsetStrategies.LATEST; public static final long DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE = 5000000; public static long maxEventsFromKafkaSource = DEFAULT_MAX_EVENTS_FROM_KAFKA_SOURCE; } - private final HashMap kafkaParams; + private final HashMap kafkaParams; private final TypedProperties props; protected final String topicName; public KafkaOffsetGen(TypedProperties props) { this.props = props; - kafkaParams = new HashMap(); + kafkaParams = new HashMap<>(); for (Object prop : props.keySet()) { - kafkaParams.put(prop.toString(), props.getString(prop.toString())); + kafkaParams.put(prop.toString(), props.get(prop.toString())); } DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.KAFKA_TOPIC_NAME)); topicName = props.getString(Config.KAFKA_TOPIC_NAME); @@ -197,31 +172,25 @@ public KafkaOffsetGen(TypedProperties props) { public OffsetRange[] getNextOffsetRanges(Option lastCheckpointStr, long sourceLimit) { // Obtain current metadata for the topic - KafkaCluster cluster = new KafkaCluster(ScalaHelpers.toScalaMap(kafkaParams)); - Either, Set> either = - cluster.getPartitions(ScalaHelpers.toScalaSet(new HashSet<>(Collections.singletonList(topicName)))); - if (either.isLeft()) { - // log errors. and bail out. - throw new HoodieDeltaStreamerException("Error obtaining partition metadata", either.left().get().head()); - } - Set topicPartitions = either.right().get(); + KafkaConsumer consumer = new KafkaConsumer(kafkaParams); + List partitionInfoList; + partitionInfoList = consumer.partitionsFor(topicName); + Set topicPartitions = partitionInfoList.stream() + .map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet()); // Determine the offset ranges to read from - HashMap fromOffsets; - HashMap checkpointOffsets; + Map fromOffsets; if (lastCheckpointStr.isPresent()) { - fromOffsets = checkupValidOffsets(cluster, lastCheckpointStr, topicPartitions); + fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions); } else { KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies - .valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase()); + .valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase()); switch (autoResetValue) { - case SMALLEST: - fromOffsets = - new HashMap(ScalaHelpers.toJavaMap(cluster.getEarliestLeaderOffsets(topicPartitions).right().get())); + case EARLIEST: + fromOffsets = consumer.beginningOffsets(topicPartitions); break; - case LARGEST: - fromOffsets = - new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get())); + case LATEST: + fromOffsets = consumer.endOffsets(topicPartitions); break; default: throw new HoodieNotSupportedException("Auto reset value must be one of 'smallest' or 'largest' "); @@ -229,8 +198,7 @@ public OffsetRange[] getNextOffsetRanges(Option lastCheckpointStr, long } // Obtain the latest offsets. - HashMap toOffsets = - new HashMap(ScalaHelpers.toJavaMap(cluster.getLatestLeaderOffsets(topicPartitions).right().get())); + Map toOffsets = consumer.endOffsets(topicPartitions); // Come up with final set of OffsetRanges to read (account for new partitions, limit number of events) long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP, @@ -245,15 +213,13 @@ public OffsetRange[] getNextOffsetRanges(Option lastCheckpointStr, long // check up checkpoint offsets is valid or not, if true, return checkpoint offsets, // else return earliest offsets - private HashMap checkupValidOffsets(KafkaCluster cluster, - Option lastCheckpointStr, Set topicPartitions) { - HashMap checkpointOffsets = - CheckpointUtils.strToOffsets(lastCheckpointStr.get()); - HashMap earliestOffsets = - new HashMap(ScalaHelpers.toJavaMap(cluster.getEarliestLeaderOffsets(topicPartitions).right().get())); + private Map checkupValidOffsets(KafkaConsumer consumer, + Option lastCheckpointStr, Set topicPartitions) { + Map checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); + Map earliestOffsets = consumer.beginningOffsets(topicPartitions); boolean checkpointOffsetReseter = checkpointOffsets.entrySet().stream() - .anyMatch(offset -> offset.getValue().offset() < earliestOffsets.get(offset.getKey()).offset()); + .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey())); return checkpointOffsetReseter ? earliestOffsets : checkpointOffsets; } @@ -261,7 +227,7 @@ public String getTopicName() { return topicName; } - public HashMap getKafkaParams() { + public HashMap getKafkaParams() { return kafkaParams; } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java index d5eb6c3ed745b..b58247f14c620 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java @@ -28,14 +28,14 @@ import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config; -import kafka.common.TopicAndPartition; import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.TopicPartition; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset; -import org.apache.spark.streaming.kafka.KafkaTestUtils; -import org.apache.spark.streaming.kafka.OffsetRange; +import org.apache.spark.streaming.kafka010.KafkaTestUtils; +import org.apache.spark.streaming.kafka010.OffsetRange; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -44,6 +44,7 @@ import java.io.IOException; import java.util.HashMap; +import java.util.UUID; import static org.junit.Assert.assertEquals; @@ -84,13 +85,12 @@ public void teardown() throws Exception { private TypedProperties createPropsForJsonSource(Long maxEventsToReadFromKafkaSource) { TypedProperties props = new TypedProperties(); props.setProperty("hoodie.deltastreamer.source.kafka.topic", TEST_TOPIC_NAME); - props.setProperty("metadata.broker.list", testUtils.brokerAddress()); - props.setProperty("auto.offset.reset", "smallest"); - props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.setProperty("bootstrap.servers", testUtils.brokerAddress()); + props.setProperty("auto.offset.reset", "earliest"); props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) : String.valueOf(Config.maxEventsFromKafkaSource)); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); return props; } @@ -214,10 +214,10 @@ public void testJsonKafkaSourceWithConfigurableUpperCap() throws IOException { assertEquals(Option.empty(), fetch6.getBatch()); } - private static HashMap makeOffsetMap(int[] partitions, long[] offsets) { - HashMap map = new HashMap<>(); + private static HashMap makeOffsetMap(int[] partitions, long[] offsets) { + HashMap map = new HashMap<>(); for (int i = 0; i < partitions.length; i++) { - map.put(new TopicAndPartition(TEST_TOPIC_NAME, partitions[i]), new LeaderOffset("", -1, offsets[i])); + map.put(new TopicPartition(TEST_TOPIC_NAME, partitions[i]), offsets[i]); } return map; } diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties b/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties index 392720f463a0c..e256b8c77fbbc 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties +++ b/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties @@ -25,6 +25,6 @@ hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/ #hoodie.deltastreamer.source.kafka.topic=uber_trips hoodie.deltastreamer.source.kafka.topic=impressions #Kafka props -metadata.broker.list=localhost:9092 -auto.offset.reset=smallest +bootstrap.servers=localhost:9092 +auto.offset.reset=earliest schema.registry.url=http://localhost:8081 diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml index c56c789295b49..29b7ca792144c 100644 --- a/packaging/hudi-spark-bundle/pom.xml +++ b/packaging/hudi-spark-bundle/pom.xml @@ -83,8 +83,8 @@ org.antlr:stringtemplate org.apache.parquet:parquet-avro - com.twitter:bijection-avro_2.11 - com.twitter:bijection-core_2.11 + com.twitter:bijection-avro_${scala.binary.version} + com.twitter:bijection-core_${scala.binary.version} io.dropwizard.metrics:metrics-core io.dropwizard.metrics:metrics-graphite com.yammer.metrics:metrics-core diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index 110ff0fde1c4f..102aec79529c2 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -85,8 +85,8 @@ org.antlr:stringtemplate org.apache.parquet:parquet-avro - com.twitter:bijection-avro_2.11 - com.twitter:bijection-core_2.11 + com.twitter:bijection-avro_${scala.binary.version} + com.twitter:bijection-core_${scala.binary.version} io.confluent:kafka-avro-serializer io.confluent:common-config io.confluent:common-utils @@ -94,8 +94,8 @@ io.dropwizard.metrics:metrics-core io.dropwizard.metrics:metrics-graphite com.yammer.metrics:metrics-core - org.apache.spark:spark-streaming-kafka-0-8_2.11 - org.apache.kafka:kafka_2.11 + org.apache.spark:spark-streaming-kafka-0-10_${scala.binary.version} + org.apache.kafka:kafka_${scala.binary.version} com.101tec:zkclient org.apache.kafka:kafka-clients diff --git a/pom.xml b/pom.xml index d4d47c0bb6e9a..3a5e20461a04b 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,7 @@ 1.8 2.6.7 + 2.0.0 2.17 1.10.1 4.11 @@ -91,7 +92,7 @@ 2.4.4 1.8.2 2.11.8 - 2.11 + 2.11 0.12 3.3.1 3.0.1 @@ -423,8 +424,8 @@ com.fasterxml.jackson.module - jackson-module-scala_2.11 - ${fasterxml.version} + jackson-module-scala_${scala.binary.version} + ${fasterxml.version}.1 @@ -470,13 +471,13 @@ org.apache.spark - spark-core_2.11 + spark-core_${scala.binary.version} ${spark.version} provided org.apache.spark - spark-sql_2.11 + spark-sql_${scala.binary.version} ${spark.version} provided @@ -484,7 +485,7 @@ org.apache.spark - spark-avro_2.11 + spark-avro_${scala.binary.version} ${spark.version} provided From 6787496712f2ba16e65ddd94c702a71135c356e2 Mon Sep 17 00:00:00 2001 From: Wenning Ding Date: Tue, 14 Jan 2020 16:07:51 -0800 Subject: [PATCH 2/2] Rename scala related artifactId & add maven profile to support Scala 2.12 --- LICENSE | 2 + dev/change-scala-version.sh | 66 +++++++++++++++++++++++++ docker/hoodie/hadoop/hive_base/pom.xml | 4 +- docker/hoodie/hadoop/pom.xml | 2 +- hudi-cli/pom.xml | 2 +- hudi-integ-test/pom.xml | 4 +- hudi-spark/pom.xml | 2 +- hudi-spark/run_hoodie_app.sh | 2 +- hudi-utilities/pom.xml | 4 +- packaging/hudi-spark-bundle/pom.xml | 8 +-- packaging/hudi-utilities-bundle/pom.xml | 10 ++-- pom.xml | 37 ++++++++++++++ 12 files changed, 124 insertions(+), 19 deletions(-) create mode 100755 dev/change-scala-version.sh diff --git a/LICENSE b/LICENSE index 341988ee73550..6b2997fe5375b 100644 --- a/LICENSE +++ b/LICENSE @@ -245,6 +245,8 @@ This product includes code from Apache Spark * org.apache.hudi.AvroConversionHelper copied from classes in org/apache/spark/sql/avro package +* dev/change-scala-version.sh copied from https://github.com/apache/spark/blob/branch-2.4/dev/change-scala-version.sh + Copyright: 2014 and onwards The Apache Software Foundation Home page: http://spark.apache.org/ License: http://www.apache.org/licenses/LICENSE-2.0 diff --git a/dev/change-scala-version.sh b/dev/change-scala-version.sh new file mode 100755 index 0000000000000..151581def60e9 --- /dev/null +++ b/dev/change-scala-version.sh @@ -0,0 +1,66 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -e + +VALID_VERSIONS=( 2.11 2.12 ) + +usage() { + echo "Usage: $(basename $0) [-h|--help] +where : + -h| --help Display this help text + valid version values : ${VALID_VERSIONS[*]} +" 1>&2 + exit 1 +} + +if [[ ($# -ne 1) || ( $1 == "--help") || $1 == "-h" ]]; then + usage +fi + +TO_VERSION=$1 + +check_scala_version() { + for i in ${VALID_VERSIONS[*]}; do [ $i = "$1" ] && return 0; done + echo "Invalid Scala version: $1. Valid versions: ${VALID_VERSIONS[*]}" 1>&2 + exit 1 +} + +check_scala_version "$TO_VERSION" + +if [ $TO_VERSION = "2.11" ]; then + FROM_VERSION="2.12" +else + FROM_VERSION="2.11" +fi + +sed_i() { + sed -e "$1" "$2" > "$2.tmp" && mv "$2.tmp" "$2" +} + +export -f sed_i + +BASEDIR=$(dirname $0)/.. +find "$BASEDIR" -name 'pom.xml' -not -path '*target*' -print \ + -exec bash -c "sed_i 's/\(artifactId.*\)_'$FROM_VERSION'/\1_'$TO_VERSION'/g' {}" \; + +# Also update in parent POM +# Match any scala binary version to ensure idempotency +sed_i '1,/[0-9]*\.[0-9]*[0-9]*\.[0-9]*'$TO_VERSION' - + diff --git a/docker/hoodie/hadoop/pom.xml b/docker/hoodie/hadoop/pom.xml index 55e5c65409709..84380f7789607 100644 --- a/docker/hoodie/hadoop/pom.xml +++ b/docker/hoodie/hadoop/pom.xml @@ -42,7 +42,7 @@ org.apache.hudi - hudi-spark-bundle + hudi-spark-bundle_${scala.binary.version} ${project.version} diff --git a/hudi-cli/pom.xml b/hudi-cli/pom.xml index e1359cb96ea43..9098cbb00a51a 100644 --- a/hudi-cli/pom.xml +++ b/hudi-cli/pom.xml @@ -140,7 +140,7 @@ org.apache.hudi - hudi-utilities + hudi-utilities_${scala.binary.version} ${project.version} diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml index af82818af24e8..6c74a12242400 100644 --- a/hudi-integ-test/pom.xml +++ b/hudi-integ-test/pom.xml @@ -54,7 +54,7 @@ org.apache.hudi - hudi-spark + hudi-spark_${scala.binary.version} ${project.version} @@ -83,7 +83,7 @@ org.apache.hudi - hudi-spark + hudi-spark_${scala.binary.version} ${project.version} tests test-jar diff --git a/hudi-spark/pom.xml b/hudi-spark/pom.xml index fc0aa6916c84c..5fc700ce15fd3 100644 --- a/hudi-spark/pom.xml +++ b/hudi-spark/pom.xml @@ -23,7 +23,7 @@ 4.0.0 - hudi-spark + hudi-spark_2.11 jar diff --git a/hudi-spark/run_hoodie_app.sh b/hudi-spark/run_hoodie_app.sh index 17ba925a86b71..88ae209008eb9 100755 --- a/hudi-spark/run_hoodie_app.sh +++ b/hudi-spark/run_hoodie_app.sh @@ -25,7 +25,7 @@ function error_exit { DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" #Ensure we pick the right jar even for hive11 builds -HUDI_JAR=`ls -c $DIR/../packaging/hudi-spark-bundle/target/hudi-spark-bundle-*.jar | grep -v source | head -1` +HUDI_JAR=`ls -c $DIR/../packaging/hudi-spark-bundle/target/hudi-spark-bundle*.jar | grep -v source | head -1` if [ -z "$HADOOP_CONF_DIR" ]; then echo "setting hadoop conf dir" diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 5f6419c913554..59aae72ba350a 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -23,7 +23,7 @@ 4.0.0 - hudi-utilities + hudi-utilities_2.11 jar @@ -109,7 +109,7 @@ org.apache.hudi - hudi-spark + hudi-spark_${scala.binary.version} ${project.version} diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml index 29b7ca792144c..a7e7664506cfc 100644 --- a/packaging/hudi-spark-bundle/pom.xml +++ b/packaging/hudi-spark-bundle/pom.xml @@ -23,7 +23,7 @@ ../../pom.xml 4.0.0 - hudi-spark-bundle + hudi-spark-bundle_2.11 jar @@ -32,7 +32,7 @@ - + org.apache.rat apache-rat-plugin @@ -66,7 +66,7 @@ org.apache.hudi:hudi-common org.apache.hudi:hudi-client - org.apache.hudi:hudi-spark + org.apache.hudi:hudi-spark_${scala.binary.version} org.apache.hudi:hudi-hive org.apache.hudi:hudi-hadoop-mr org.apache.hudi:hudi-timeline-service @@ -190,7 +190,7 @@ org.apache.hudi - hudi-spark + hudi-spark_${scala.binary.version} ${project.version} diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index 102aec79529c2..3346144ef51ce 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -24,7 +24,7 @@ ../../pom.xml 4.0.0 - hudi-utilities-bundle + hudi-utilities-bundle_2.11 jar @@ -67,8 +67,8 @@ org.apache.hudi:hudi-common org.apache.hudi:hudi-client - org.apache.hudi:hudi-utilities - org.apache.hudi:hudi-spark + org.apache.hudi:hudi-utilities_${scala.binary.version} + org.apache.hudi:hudi-spark_${scala.binary.version} org.apache.hudi:hudi-hive org.apache.hudi:hudi-hadoop-mr org.apache.hudi:hudi-timeline-service @@ -200,12 +200,12 @@ org.apache.hudi - hudi-spark + hudi-spark_${scala.binary.version} ${project.version} org.apache.hudi - hudi-utilities + hudi-utilities_${scala.binary.version} ${project.version} diff --git a/pom.xml b/pom.xml index 3a5e20461a04b..19f44b41e80f5 100644 --- a/pom.xml +++ b/pom.xml @@ -1041,6 +1041,43 @@ + + + scala-2.11 + + + + scala-2.12 + + 2.12.10 + 2.12 + + + + + org.apache.maven.plugins + maven-enforcer-plugin + + + enforce-versions + + enforce + + + + + + *:*_2.11 + + + + + + + + + +