2016-05-27 11 views
0

Fragen für Spark 1.6.1, pysparkforeachRDD und foreach mit einem rdd in pyspark iterieren

ich Streaming-Daten wie wie

{"event":4,"Userid":12345,"time":123456789,"device_model":"iPhone OS", "some_other_property": "value", "row_key": 555} 

Ich habe so genannte Funktion kommen in haben, die HBase schreibt writeToHBase (RDD), eine rdd erwartet, die Tupel in der folgenden Struktur hat:

(rowkey, [rowkey, column-family, key, value]) 

Wie Sie aus dem Eingabeformat sehen können, habe ich meine ursprünglichen Daten-Set zu übernehmen und alle Schlüssel iterieren, senden jedes Schlüssel/Wert-Paar mit einem Sendefunktionsaufruf.

Von den Funken zu lesen Programmieranleitung Streaming im Abschnitt „Design Patterns für foreachRDD mit“ http://spark.apache.org/docs/latest/streaming-programming-guide.html#tab_python_13

Es scheint, dass die empfohlene foreachRDD zu verwenden, wenn etwas außerhalb des Datensatzes zu tun. In meinem Fall möchte ich Daten zu HBase über das Netzwerk schreiben, so dass ich foreachRDD auf meinem Streaming-Daten und die Funktion aufrufen, die die Daten verarbeiten wird gesendet:

stream.foreachRDD(lambda k: process(k)) 

Mein Verständnis von Funken Funktionen ist ziemlich beschränktes Recht Jetzt bin ich nicht in der Lage, eine Möglichkeit zu finden, mein ursprüngliches Dataset zu durchlaufen, um meine Schreibfunktion zu verwenden. wenn es ein Python iterable wäre, würde ich in der Lage sein, dies zu tun:

def process(rdd): 
    for key, value in my_rdd.iteritems(): 
     writeToHBase(sc.parallelize(rowkey, [rowkey, 'column-family', key, value])) 

wo RowKey, indem du es in dem rdd selbst

rdd.map(lambda x: x['rowkey']) 

Wie erreichen ich, was Prozess erhalten haben würde () soll in Pyspark tun? Ich sehe einige Beispiele, die foreach benutzen, aber ich bin nicht ganz in der Lage, das zu tun, was ich will.

Antwort

2

warum möchten Sie über RDD iterieren, während Ihre WriteToHBase-Funktion eine RDD als Argument erwartet. Rufen Sie einfach writeToHBase(rdd) in Ihrer Prozessfunktion an, das war's.

Wenn Sie jeden Datensatz aus der RDD holen möchten, können Sie

def processRecord(record): 
     print(record) 
rdd.foreach(processRecord) 

In processRecord Funktion aufrufen, werden Sie einzelnen Datensatz Prozess.

Verwandte Themen