2017-05-08 2 views
1

Ich versuche, Datenrahmen zu cassandra einzufügen:Leere Werte in Funkendatenrahmen

result.rdd.saveToCassandra(keyspaceName, tableName) 

jedoch einige der Spaltenwerte leer sind und damit komme ich Ausnahmen:

java.lang.NumberFormatException: empty String 
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842) 
at sun.misc.FloatingDecimal.parseFloat(FloatingDecimal.java:122) 
at java.lang.Float.parseFloat(Float.java:451) 
at scala.collection.immutable.StringLike$class.toFloat(StringLike.scala:231) 
at scala.collection.immutable.StringOps.toFloat(StringOps.scala:31) 
at com.datastax.spark.connector.types.TypeConverter$FloatConverter$$anonfun$convertPF$4.applyOrElse(TypeConverter.scala:216) 

Gibt es eine Möglichkeit zu Ersetzen Sie alle leeren Werte durch Null im Datenrahmen und würde dies das Problem lösen? Bei dieser Frage läßt vermuten, das der Datenrahmen df:

col1 | col2 | col3 
"A" | "B" | 1 
"E" | "F" | 
"S" | "K" | 5 

Wie kann ich die leere Wert in col3 mit null ersetzen?

Antwort

0

Wenn Sie die Datenrahmen Spalte auf Ihre numerischen Typ gegossen dann alle Werte, die in nulls gedreht wird, um den entsprechenden Typ verglichen werden darf.

import org.apache.spark.sql.types.IntegerType 
df.select(
    $"col1", 
    $"col2", 
    $"col3" cast IntegerType 
) 

oder wenn Sie nicht über eine select-Anweisung

df.withColumn("col3", df("col3") cast IntegerType) 

Wenn Sie viele Spalten, die Sie wollen, dass diese bewerben und das Gefühl, es zu tun würde, zu unbequem dies in einer select-Anweisung zu tun oder wenn Casting nicht für Ihren Fall funktioniert, können Sie in rdd konvertieren, um die Transformation anzuwenden, und dann zu einem Datenrahmen zurückkehren. Vielleicht möchten Sie eine Methode dafür definieren.

def emptyToNull(df: DataFrame): DataFrame = { 
    val sqlCtx = df.sqlContext 
    val schema = df.schema 

    val rdd = df.rdd.map(
     row => 
     row.toSeq.map { 
      case "" => null 
      case otherwise => otherwise 
     }) 
     .map(Row.fromSeq) 

    sqlCtx.createDataFrame(rdd, schema) 
    } 
+0

Danke, diese Transformation war wonach ich gesucht habe – Ahmed

0

Sie können eine udf für dieses schreiben:

val df = Seq(("A", "B", "1"), ("E", "F", ""), ("S", "K", "1")).toDF("col1", "col2", "col3") 
// make a udf that converts String to option[String] 
val nullif = udf((s: String) => if(s == "") None else Some(s)) 

df.withColumn("col3", nullif($"col3")).show 

+----+----+----+ 
|col1|col2|col3| 
+----+----+----+ 
| A| B| 1| 
| E| F|null| 
| S| K| 1| 
+----+----+----+ 

Sie auch when.otherwise verwenden können, wenn Sie die Verwendung von UDF vermeiden wollen:

df.withColumn("col3", when($"col3" === "", null).otherwise($"col3")).show 

+----+----+----+ 
|col1|col2|col3| 
+----+----+----+ 
| A| B| 1| 
| E| F|null| 
| S| K| 1| 
+----+----+----+ 

Oder Sie können SQL nullif Funktion zu Conv ert leere Zeichenfolge null:

df.selectExpr("col1", "col2", "nullif(col3, \"\") as col3").show 
+----+----+----+ 
|col1|col2|col3| 
+----+----+----+ 
| A| B| 1| 
| E| F|null| 
| S| K| 1| 
+----+----+----+ 
+0

Ich hatte gehofft, für eine andere Lösung als UDF, wie eine irgendwie Datenrahmen Transformation wie .map (...) – Ahmed

+0

Aktualisiert zwei andere Methoden für leere Zeichenfolge Umwandlung auf null. – Psidom

+1

Ihr 'when' Beispiel könnte vereinfacht werden zu' when ($ "col3"! == "" $ col3 ")'. 'When' wird standardmäßig auf null gesetzt, wenn es keine' sonst' Klausel gibt. – puhlen