2017-02-06 5 views
0

Ich erstelle eine Funktion für Mappartitions, um das Maximum und Minimum pro Partition zu berechnen. Ich habe die Funktion in pyspark erstellt, aber ich kann sie nicht erfolgreich in scala übersetzen. Ich wende diese Funktion zweimal an und möchte im Ergebnis eine Zip-Funktion ausführen. Dies ist die errot, die ich bekomme:Spark (Scala) Problem Iteratoren

result.zip (res)

Typenkonflikt;

[error] found : org.apache.spark.rdd.RDD[(Int, Int)] 
[error] required: scala.collection.GenIterable[?] 

Hier können Sie meine Funktion in Python haben:

def minmaxInt(iterator): 
    firsttime = 0 
    min = 0 
    max = 0 
    for x in iterator: 
    if(x!= '' and x!='NULL' and x is not None): 
     y=int(x)  
      if (firsttime == 0): 
        min = y; 
        max = y; 
        firsttime = 1 
      else: 
        if y > max: 
         max = y 
        if y < min: 
         min = y 
    return (min, max) 

Und hier mein Code in Scala

def minmaxInt(iterator: Iterator[String]) : Iterator[(Int,Int)]={ 

    var firsttime = 0 
    var min = 0 
    var max = 0 
    var res=List[(Int,Int)]() 
    for(x <- iterator){ 
     if(x!= "" && x!= null){ 
    var y=x.toInt 

     if(firsttime == 0){ 
      min = y 
      max = y 
      firsttime = 1} 
     else{ 
      if (y > max){ 
       max = y} 
      if (y < min){ 
       min = y} 
     } 
     } 
    } 

    res.::=(min,max) 
    return res.iterator 

} 

Vielen Dank im Voraus

Update:

Dank für dein schnelle Antwort! Der Code ist großartig, aber ich habe immer noch Probleme mit dem Zip. Ich habe zweimal die letzten Code für rdd.mapPartitions und dann ausführen ich die Zip:

[error] found : org.apache.spark.rdd.RDD[(Int, Int)] 
[error] required: scala.collection.GenIterable[?] 
[error]    result.zip(res) 
+0

einigen Kontext fehlt - der Fehler im Code zu sein scheint, die hier nicht dargestellt ist (es gibt keine Hinweise auf 'RDD's im Code, den Sie eingefügt) –

+0

Dies ist, was ich mache, nur eine Spalte von einem CSV nehmen und die Minmax-Funktion anwenden. ' val file = sc.textFile (Pfad) val split = Datei.map (x => x.split (", ")) val col = split.map (x => x (0)) var Ergebnis = col.mapPartitions (minmaxInt) ... Später, die gleichen ... var res = col.mapPartitions (minmaxInt) result.zip (res) ' – Javier

+0

keinen Code in Kommentare hinzufügen - [bearbeiten] (http://stackoverflow.com/posts/42067415/edit) die Post, die fehlenden Informationen –

Antwort

0

Hier ist eine einfacher (und mehr idiomatische) Umsetzung von minMaxInt:

def minMaxInt(iterator: Iterator[String]) : Iterator[(Int,Int)]= { 
    val tuple = iterator 
    .filter(_ != null).filter(!_.isEmpty) 
    .map(_.toInt).map(i => (i, i)) 
    .reduce[(Int, Int)] { case ((min, max), (i1, i2)) => (Math.min(min, i1), Math.max(max, i2)) } 

    Seq(tuple).iterator 
} 

, die zu einem RDD[String] angewandt werden können, wie folgt:

// some sample data 
def col = sc.parallelize(Seq("1", "4", "12", "3", "", null, "2")) 

// "use twice" and zip 
var result: RDD[(Int, Int)] = col.mapPartitions(minmaxInt) 
var res: RDD[(Int, Int)] = col.mapPartitions(minmaxInt) 

result.zip(res).foreach(println) 
// prints: 
// ((1,1),(1,1)) 
// ((2,2),(2,2)) 
// ((3,3),(3,3)) 
// ((4,12),(4,12)) 
+0

Dank für Ihre Eingabe hinzuzufügen! Wie Sie in der aktualisierten Frage sehen können, habe ich immer noch das Problem. Irgendeine Idee? – Javier

+0

Erstens - es ist schwer, mit einigen des Code im Kommentar und einig in der Post Körper folgen - bitte den gesamten Code in der Post zeigen, so kann ich es selbst versuchen, das Problem zu finden. Zweitens - das klingt wie eine andere Frage, die nicht mit dieser Methode, so dass nicht sicher, es gehört auch in diesem Beitrag. –

+0

siehe aktualisierte Antwort - der Code, den Sie Werke eingefügt, das Problem irgendwo im Code, die Sie nicht teilen, so kann ich nicht weiter helfen .... –