2017-12-07 1 views
0

DataSet#foreach(f) wendet die Funktion f auf jede Zeile im Dataset an. In einer Clusterumgebung werden die Daten über den Cluster verteilt. Wie können die Ergebnisse jeder dieser Funktionen gesammelt werden?Wie können Sie die Ergebnisse von einem forEach in Spark speichern

Angenommen, die Funktion würde die Anzahl der in jeder Zeile gespeicherten Zeichen zählen. Wie können Sie ein DataSet oder RDD erstellen, das die Ergebnisse jeder dieser Funktionen für jede Zeile enthält?

+0

Verwenden Sie 'map' anstelle von' foreach' – nabongs

Antwort

1

Die Definition für foreach sieht ungefähr so ​​aus:

final def foreach(f: (A) ⇒ Unit): Unit 

f: Die Funktion, die für ihre Nebenwirkung auf jedes Element angewendet wird. Das Ergebnis der Funktion wird f

verworfen

foreach in Scala wird im allgemeinen verwendet, um die Nutzung einer Funktion zu bezeichnen, die eine Nebenwirkung beinhaltet, z.B. Drucken auf STDOUT.

Wenn Sie etwas zurückgeben möchten, indem eine bestimmte Funktion anwenden, werden Sie map

final def map[B](f: (A) ⇒ B): List[B] 

ich die Syntax aus der Dokumentation für List kopiert verwenden, aber es wird etwas ähnliches sein für RDDs auch.

Wie Sie sehen, es funktioniert die Funktion f auf Datentyp A und gibt eine Auflistung von Datentyp B wo A und B können auch die gleichen Datentyp sein.

val rdd = sc.parallelize(Array(
     "String1", 
     "String2", 
     "String3")) 

scala> rdd.foreach(x => (x, x.length)) 

// Nothing happens 

rdd.map(x => (x, x.length)).collect 

// Array[(String, Int)] = Array((String1,7), (String2,7), (String3,7)) 
Verwandte Themen