Ich bin neu in Spark und Scala, jetzt bin ich irgendwie mit einem Problem stecken: wie mit verschiedenen Feld jeder Zeile nach Feldnamen, dann in eine neue rdd
.Spark Rdd behandeln verschiedene Felder jeder Zeile nach Feldname
Das ist mein Pseudo-Code ist:
val newRdd = df.rdd.map(x=>{
def Random1 => random(1,10000) //pseudo
def Random2 => random(10000,20000) //pseduo
x.schema.map(y=> {
if (y.name == "XXX1")
x.getAs[y.dataType](y.name)) = Random1
else if (y.name == "XXX2")
x.getAs[y.dataType](y.name)) = Random2
else
x.getAs[y.dataType](y.name)) //pseduo,keeper the same
})
})
Es gibt 2 weniger Fehler in oben:
- die zweite Karte, "x.getAs" ist ein Fehler Syntax
- wie in zu eine neue
rdd
Ich bin auf der Suche nach einer langen Zeit im Netz. Aber keine Verwendung. Bitte helfen oder versuchen Sie, einige Ideen zu geben, wie dies zu erreichen ist.
Dank Ramesh Maharjan, es funktioniert jetzt.
def randomString(len: Int): String = {
val rand = new scala.util.Random(System.nanoTime)
val sb = new StringBuilder(len)
val ab = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
for (i <- 0 until len) {
sb.append(ab(rand.nextInt(ab.length)))
}
sb.toString
}
def testUdf = udf((value: String) =>randomString(2))
val df = sqlContext.createDataFrame(Seq((1,"Android"), (2, "iPhone")))
df.withColumn("_2", testUdf(df("_2")))
+---+---+
| _1| _2|
+---+---+
| 1| F3|
| 2| Ag|
+---+---+
dank Ihre schnelle Antwort, aber ich möchte ein neues RDD erhalten, die c hange spezielle ungerade RDD eingereicht und andere Felder unverändert. – meng
ja, ich denke auch "udf". Aber in meinem Fall ist jedes Spezialfeld anders. withColumn würde für ein spezielles Feld alles gleich geben. – meng
'withColumn' würde die von der 'udf-Funktion' zurückgegebenen Ergebnisse liefern. So können Sie Fälle für die verschiedenen Felder im 'udf' abgleichen, um unterschiedliche Werte zu erhalten. –