1+ #
2+ # Licensed to the Apache Software Foundation (ASF) under one or more
3+ # contributor license agreements. See the NOTICE file distributed with
4+ # this work for additional information regarding copyright ownership.
5+ # The ASF licenses this file to You under the Apache License, Version 2.0
6+ # (the "License"); you may not use this file except in compliance with
7+ # the License. You may obtain a copy of the License at
8+ #
9+ # http://www.apache.org/licenses/LICENSE-2.0
10+ #
11+ # Unless required by applicable law or agreed to in writing, software
12+ # distributed under the License is distributed on an "AS IS" BASIS,
13+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ # See the License for the specific language governing permissions and
15+ # limitations under the License.
16+ #
17+
18+
19+ from py4j .java_collections import MapConverter
20+ from py4j .java_gateway import java_import , Py4JError
21+
22+ from pyspark .storagelevel import StorageLevel
23+ from pyspark .serializers import PairDeserializer , UTF8Deserializer
24+ from pyspark .streaming import DStream
25+
26+ __all__ = ['KafkaUtils' ]
27+
28+
29+ class KafkaUtils (object ):
30+
31+ @staticmethod
32+ def createStream (ssc , zkQuorum , groupId , topics ,
33+ storageLevel = StorageLevel .MEMORY_AND_DISK_SER_2 ,
34+ keyDecoder = None , valueDecoder = None ):
35+ """
36+ Create an input stream that pulls messages from a Kafka Broker.
37+
38+ :param ssc: StreamingContext object
39+ :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
40+ :param groupId: The group id for this consumer.
41+ :param topics: Dict of (topic_name -> numPartitions) to consume.
42+ Each partition is consumed in its own thread.
43+ :param storageLevel: RDD storage level.
44+ :param keyDecoder: A function used to decode key
45+ :param valueDecoder: A function used to decode value
46+ :return: A DStream object
47+ """
48+ java_import (ssc ._jvm , "org.apache.spark.streaming.kafka.KafkaUtils" )
49+
50+ if not isinstance (topics , dict ):
51+ raise TypeError ("topics should be dict" )
52+ jtopics = MapConverter ().convert (topics , ssc .sparkContext ._gateway ._gateway_client )
53+ jlevel = ssc ._sc ._getJavaStorageLevel (storageLevel )
54+ try :
55+ jstream = ssc ._jvm .KafkaUtils .createStream (ssc ._jssc , zkQuorum , groupId , jtopics , jlevel )
56+ except Py4JError , e :
57+ if 'call a package' in e .message :
58+ print "No kafka package, please build it and add it into classpath:"
59+ print " $ sbt/sbt streaming-kafka/package"
60+ print " $ bin/submit --driver-class-path external/kafka/target/scala-2.10/" \
61+ "spark-streaming-kafka_2.10-1.3.0-SNAPSHOT.jar"
62+ raise Exception ("No kafka package" )
63+ raise e
64+ ser = PairDeserializer (UTF8Deserializer (), UTF8Deserializer ())
65+ stream = DStream (jstream , ssc , ser )
66+
67+ if keyDecoder is not None :
68+ stream = stream .map (lambda (k , v ): (keyDecoder (k ), v ))
69+ if valueDecoder is not None :
70+ stream = stream .mapValues (valueDecoder )
71+ return stream
0 commit comments