2017-02-09 2 views
0

Anonyme Funktion funktioniert gut.Spark 2.0: Eine benannte Funktion in mapGroups für sql.KeyValueGroupedDataset Ursache java.io.NotSerializableException

Für folgende Code das Problem ein:

import sparkSession.implicits._ 
val sparkSession = SparkSession.builder.appName("demo").getOrCreate() 
val sc = sparkSession.sparkContext 

case class DemoRow(keyId: Int, evenOddId: Int) 
case class EvenOddCountRow(keyId: Int, oddCnt: Int, evenCnt: Int) 

val demoDS = sc.parallelize(Seq(DemoRow(1, 1), 
           DemoRow(1, 2), 
           DemoRow(1, 3), 
           DemoRow(2, 1), 
           DemoRow(2, 2))).toDS() 

Zeige die demoDS.show():

+-----+---------+ 
|keyId|evenOddId| 
+-----+---------+ 
| 1|  1| 
| 1|  2| 
| 1|  3| 
| 2|  1| 
| 2|  2| 
+-----+---------+ 

Mit der Anonymous-Funktion id => id % 2 == 1 innerhalb mapGroups() funktioniert:

val demoGroup = demoDS.groupByKey(_.keyId).mapGroups((key, iter) => { 
    val evenOddIds = iter.map(_.evenOddId).toList 
    val (oddIds, evenIds) = evenOddIds.partition(id => id % 2 == 1) 
    EvenOddCountRow(key, oddIds.size, evenIds.size) 
}) 

Das Ergebnis demoGroup.show() ist was wir erwartet hatten:

def isOdd(id: Int) = id % 2 == 1 

val demoGroup = demoDS.groupByKey(_.keyId).mapGroups((key, iter) => { 
    val evenOddIds = iter.map(_.evenOddId).toList 
    val (oddIds, evenIds) = evenOddIds.partition(isOdd) 
    EvenOddCountRow(key, oddIds.size, evenIds.size) 
}) 

Verursacht durch: java.io

+-----+------+-------+ 
|keyId|oddCnt|evenCnt| 
+-----+------+-------+ 
| 1|  2|  1| 
| 2|  1|  1| 
+-----+------+-------+ 

Nun, wenn ich die isOdd Funktion definieren, und im Inneren mapGroups() wie unten in die Funktion setzen, wird es Exception erhöhen .NotSerializableException: scala.collection.LinearSeqLike $$ anon 1 $

ich habe versucht, verschiedene Arten definieren die isOdd Funktion Versuchen Sie, es serialisierbar zu machen:

val isOdd = (id: Int) => id % 2 == 1 // does not work 

case object isOdd extends Function[Int, Boolean] with Serializable { 
    def apply(id: Int) = id % 2 == 1 
} // still does not work 

Habe ich etwas oder etwas falsch vermisst? Danke im Voraus!

Antwort

0

Die folgenden Werke für mich:

object Utils { 
    def isOdd(id: Int) = id % 2 == 1 
} 

Und dann verwenden:

evenOddIds.partition(Utils.isOdd) 
+0

Seltsam. Nach dem Neustart der Spark-Shell, funktioniert nicht nur Ihre Lösung, meine bisherigen Lösungen funktionieren auch. ... –

Verwandte Themen