2017-05-17 2 views
-1

Ich bin neu in Spark und Scala, jetzt bin ich irgendwie mit einem Problem stecken: wie mit verschiedenen Feld jeder Zeile nach Feldnamen, dann in eine neue rdd.Spark Rdd behandeln verschiedene Felder jeder Zeile nach Feldname

Das ist mein Pseudo-Code ist:

val newRdd = df.rdd.map(x=>{ 
     def Random1 => random(1,10000) //pseudo 
     def Random2 => random(10000,20000) //pseduo 
     x.schema.map(y=> { 
      if (y.name == "XXX1") 
      x.getAs[y.dataType](y.name)) = Random1 
      else if (y.name == "XXX2") 
      x.getAs[y.dataType](y.name)) = Random2 
      else 
      x.getAs[y.dataType](y.name)) //pseduo,keeper the same 
     }) 
     }) 

Es gibt 2 weniger Fehler in oben:

  1. die zweite Karte, "x.getAs" ist ein Fehler Syntax
  2. wie in zu eine neue rdd

Ich bin auf der Suche nach einer langen Zeit im Netz. Aber keine Verwendung. Bitte helfen oder versuchen Sie, einige Ideen zu geben, wie dies zu erreichen ist.


Dank Ramesh Maharjan, es funktioniert jetzt.

def randomString(len: Int): String = { 
    val rand = new scala.util.Random(System.nanoTime) 
    val sb = new StringBuilder(len) 
    val ab = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" 
    for (i <- 0 until len) { 
     sb.append(ab(rand.nextInt(ab.length))) 
    } 
    sb.toString 
    } 
def testUdf = udf((value: String) =>randomString(2)) 
val df = sqlContext.createDataFrame(Seq((1,"Android"), (2, "iPhone"))) 
df.withColumn("_2", testUdf(df("_2"))) 
+---+---+ 
| _1| _2| 
+---+---+ 
| 1| F3| 
| 2| Ag| 
+---+---+ 

Antwort

1

Wenn Sie beabsichtigen, bestimmte felds „XXX1“ filtern „XXX2“ dann einfach select Funktion sollte den Trick

tun und konvertieren, das zu rdd
Wenn Sie etwas beabsichtigen, sind anders dann Ihr x.getAs sollte wie folgt aussehen:

val random1 = x.getAs(y.name) 

Es scheint, dass yo u versuchen, Werte in einigen Spalten „XXX1“ zu ändern und „XXX2“
Für die einfache udf Funktion und withColumn sollte den Trick
Einfache udf Funktion tun ist, wie unten

def testUdf = udf((value: String) => { 
    //do your logics here and what you return from here would be reflected in the value you passed from the column 
    }) 

Und Sie können das Gespräch UDF-Funktion als

df.withColumn("XXX1", testUdf(df("XXX1"))) 

Ähnlich Sie für „XXX2“ tun können

+0

dank Ihre schnelle Antwort, aber ich möchte ein neues RDD erhalten, die c hange spezielle ungerade RDD eingereicht und andere Felder unverändert. – meng

+0

ja, ich denke auch "udf". Aber in meinem Fall ist jedes Spezialfeld anders. withColumn würde für ein spezielles Feld alles gleich geben. – meng

+0

'withColumn' würde die von der 'udf-Funktion' zurückgegebenen Ergebnisse liefern. So können Sie Fälle für die verschiedenen Felder im 'udf' abgleichen, um unterschiedliche Werte zu erhalten. –