ich erfolgreich kafka und saprk integriert. Ich möchte streamen von kafka zu spark.and jetzt bin ich in der Lage, Strom zu spark.Ich möchte diesen Stream in RDD, so dass ich createRDD() -Funktion, um rdds zu erstellen. Aber ich habe nur einige Massege aus Kafka in RDD. weil es vom Versatzbereich abhängt. So kann mir jeder sagen, wie man offsetRange() in der Funktion createRDD() von kafka-funke einstellt.set offsetRange() funktion in pyspark rdd kafka
0
A
Antwort
0
Verwenden Sie einfach in Ihrem Code-Snippet
// Import dependencies and create kafka params as in Create Direct Stream
val offsetRanges = Array(
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange("test", 0, 0, 100),
OffsetRange("test", 1, 0, 100)
)
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
Laut Dokumentation: pyspark kafka streaming
erster Satz offsetranges für kafka Thema partion mit
pyspark.streaming.kafka.OffsetRange(topic, partition, fromOffset, untilOffset)
Initiali sation wie folgt aussieht:
fromOffset = 0
untilOffset = 10
partition = 0
topic = 'topic'
offset = OffsetRange(topic, partition, fromOffset, untilOffset)
offsets = [offset]
Dann werden Sie Ihre RDD
kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams, offsets)
Verwandte Themen
- 1. Pyspark Kafka Offset-Bereich Einheiten
- 2. Trans RDD in PySpark
- 3. RDD in iterable konvertieren: PySpark?
- 4. Pyspark: shuffle RDD
- 5. pyspark RDD zu DataFrame
- 6. Filterung zwei RDD in pyspark
- 7. pyspark: convert RDD [DenseVector]
- 8. Pyspark Kafka Streaming
- 9. pyspark - mit MatrixFactorizationModel in RDD der Map-Funktion
- 10. Sendung ein Wörterbuch RDD in PySpark
- 11. So klonen RDD-Objekt [Pyspark]
- 12. Konvertieren Row in Liste RDD in pyspark
- 13. Split RDD in n Teile in pySpark
- 14. Pyspark: Get Indizes eines RDD Elemente aus einem anderen RDD
- 15. sammeln RDD mit Puffer in pyspark
- 16. Extrahieren von Sequenzen von RDD in Pyspark
- 17. konvertieren numpy matrix in pyspark rdd
- 18. Partition eine Matrix RDD in pyspark
- 19. Schreiben von Pyspark Rdd in CSV-Datei
- 20. Passing-Funktion in pyspark
- 21. Konvertierung komplexer RDD zu einem Flatten RDD mit PySpark
- 22. PySpark: Iterations über dict Typ RDD
- 23. Pyspark Konvertieren RDD von Tupeln zu Datenrahmen
- 24. Gibt es irgendeine Paginierung für pyspark rdd?
- 25. foreachRDD und foreach mit einem rdd in pyspark iterieren
- 26. ValueError: RDD ist leer-- Pyspark (Windows Standalone)
- 27. Pyspark Filter leere Zeilen aus RDD nicht
- 28. ein rdd in ein lokales Wörterbuch in PySpark Drehen
- 29. PySpark RDD Liste Split von Delimeter
- 30. Pyspark Extrahieren von vier Tupeln von RDD
yas schaffen können, aber wie es –
in pyspark zu verwenden, welche Version von Funken verwenden Sie? – FaigB
Ich benutze Funken 1.5.1 –