2017-05-25 3 views
1

Was wäre der "empfohlene" Weg, um jede Nachricht zu verarbeiten, wie sie durch die strukturierte Streaming-Pipeline kommt (ich bin auf Spark 2.1.1 mit der Quelle kafka 0.10.2.1)?Structured Streaming - Jede Nachricht konsumieren

Bisher bin ich auf dataframe.mapPartitions (da ich mit hbase verbinden müssen, deren Client-Verbindungsklassen nicht serizierbar sind, daher mapPartitions).

Ideen?

Antwort

1

sollten Sie in der Lage sein, eine foreach Ausgang Spüle zu verwenden: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks und https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

Auch wenn der Kunde nicht serialisierbar ist, Sie müssen nicht öffnen Sie es in Ihrem ForeachWriter Konstruktor. Lassen Sie es einfach None/null und initialisieren Sie es in der open Methode, die nach Serialisierung genannt wird, aber nur einmal pro Aufgabe.

In sort-of-Pseudocode:

class HBaseForeachWriter extends ForeachWriter[MyType] { 
    var client: Option[HBaseClient] = None 
    def open(partitionId: Long, version: Long): Boolean = { 
    client = Some(... open a client ...) 
    } 
    def process(record: MyType) = { 
    client match { 
     case None => throw Exception("shouldn't happen") 
     case Some(cl) => { 
     ... use cl to write record ... 
     } 
    } 
    } 
    def close(errorOrNull: Throwable): Unit = { 
    client.foreach(cl => cl.close()) 
    } 
} 
Verwandte Themen