2017-08-22 2 views
1

Ich bin neu in der BigData Eco-System und Art von Einstieg.Lesen Sie von Kafka und schreiben Sie an hdfs in Parkett

Ich habe mehrere Artikel über das Lesen eines Kafka-Themas mit Spark-Streaming gelesen, möchte aber wissen, ob es möglich ist, mit einem Spark-Job von Kafka zu lesen statt zu streamen? Wenn ja, könntest du mir helfen, auf Artikel oder Code-Snippets hinzuweisen, mit denen ich anfangen kann.

Mein zweiter Teil der Frage ist das Schreiben zu hdfs im Parkettformat. Sobald ich von Kafka gelesen habe, nehme ich an, dass ich eine rdd haben werde. Konvertieren Sie diese RDD in einen Datenrahmen und schreiben Sie dann den Datenrahmen als Parkettdatei. Ist das der richtige Ansatz?

Jede Hilfe wird geschätzt.

Dank

Antwort

1

Für Daten von Kafka zu lesen und sie zu HDFS, in Parkett-Format zu schreiben, Spark-Batch-Job statt Streaming verwenden, können Sie Spark Structured Streaming verwenden.

Structured Streaming ist eine skalierbare und fehlertolerante Stream-Verarbeitungs-Engine, die auf der Spark SQL-Engine basiert. Sie können Ihre Streaming-Berechnung genauso ausdrücken, wie Sie eine Stapelberechnung für statische Daten ausführen würden. Die Spark SQL-Engine sorgt dafür, dass sie inkrementell und kontinuierlich ausgeführt wird und das Endergebnis aktualisiert wird, sobald Streamingdaten ankommen. Sie können die Dataset/DataFrame-API in Scala, Java, Python oder R verwenden, um Streaming-Aggregationen, Ereigniszeitfenster, Stream-zu-Batch-Joins usw. auszudrücken. Die Berechnung wird auf derselben optimierten Spark SQL-Engine ausgeführt. Schließlich garantiert das System Ende-zu-Ende-Fehler-Toleranz-Garantien durch Checkpointing und Write Ahead Logs. Kurz gesagt, Structured Streaming bietet eine schnelle, skalierbare, fehlertolerante End-to-End-Streaming-Verarbeitung, die genau einmal ausgeführt wird, ohne dass der Benutzer über das Streaming nachdenken muss.

Es kommt mit Kafka als eine eingebaute Quelle, d. H., Wir können Daten von Kafka abfragen. Es ist kompatibel mit Kafka Broker Versionen 0.10.0 oder höher.

Um die Daten aus Kafka im Batch-Modus zu ziehen, können Sie einen Datensatz/Datenrahmen für einen definierten Bereich von Offsets erstellen.

// Subscribe to 1 topic defaults to the earliest and latest offsets 
val df = spark 
    .read 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") 
    .option("subscribe", "topic1") 
    .load() 
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 
    .as[(String, String)] 

// Subscribe to multiple topics, specifying explicit Kafka offsets 
val df = spark 
    .read 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") 
    .option("subscribe", "topic1,topic2") 
    .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") 
    .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") 
    .load() 
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 
    .as[(String, String)] 

// Subscribe to a pattern, at the earliest and latest offsets 
val df = spark 
    .read 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") 
    .option("subscribePattern", "topic.*") 
    .option("startingOffsets", "earliest") 
    .option("endingOffsets", "latest") 
    .load() 
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 
    .as[(String, String)] 

Jede Zeile in der Quelle hat folgendes Schema: Weitere

df.write.parquet("hdfs://data.parquet") 

:

| Column   | Type   | 
|:-----------------|--------------:| 
| key    |  binary | 
| value   |  binary | 
| topic   |  string | 
| partition  |   int | 
| offset   |   long | 
| timestamp  |   long | 
| timestampType |   int | 

Jetzt kann Daten auf HDFS in Parkett-Format, folgender Code zu schreiben geschrieben Informationen zu Spark Structured Streaming + Kafka finden Sie in der folgenden Anleitung - Kafka Integration Guide

Ich hoffe es hilft!

+0

Ist diese Antwort hilfreich? – himanshuIIITian

+1

Danke Himanshu, das war hilfreich. Scheint, dass Spark 2.2 benötigt wird, gibt es eine andere Möglichkeit, dies in den niedrigeren Versionen von Spark wie 2.0 zu tun. – Henosis

Verwandte Themen