2016-10-26 4 views
0

ich erfolgreich kafka und saprk integriert. Ich möchte streamen von kafka zu spark.and jetzt bin ich in der Lage, Strom zu spark.Ich möchte diesen Stream in RDD, so dass ich createRDD() -Funktion, um rdds zu erstellen. Aber ich habe nur einige Massege aus Kafka in RDD. weil es vom Versatzbereich abhängt. So kann mir jeder sagen, wie man offsetRange() in der Funktion createRDD() von kafka-funke einstellt.set offsetRange() funktion in pyspark rdd kafka

Antwort

0

Verwenden Sie einfach in Ihrem Code-Snippet

// Import dependencies and create kafka params as in Create Direct Stream 

    val offsetRanges = Array(
     // topic, partition, inclusive starting offset, exclusive ending offset 
     OffsetRange("test", 0, 0, 100), 
     OffsetRange("test", 1, 0, 100) 
    ) 

    val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent) 

Spark Kafka Integration guide

Laut Dokumentation: pyspark kafka streaming

erster Satz offsetranges für kafka Thema partion mit

pyspark.streaming.kafka.OffsetRange(topic, partition, fromOffset, untilOffset) 

Initiali sation wie folgt aussieht:

fromOffset = 0 
untilOffset = 10 
partition = 0 
topic = 'topic' 
offset = OffsetRange(topic, partition, fromOffset, untilOffset) 
offsets = [offset] 

Dann werden Sie Ihre RDD

kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams, offsets) 
+0

yas schaffen können, aber wie es –

+0

in pyspark zu verwenden, welche Version von Funken verwenden Sie? – FaigB

+0

Ich benutze Funken 1.5.1 –